use crate::error::{ApiError, ApiResult};
use crate::extractors::notification::Notification;
use crate::headers::vapid::VapidHeaderWithKey;
use crate::routers::RouterError;
use actix_web::http::StatusCode;
use autopush_common::db::client::DbClient;
use autopush_common::util::InsertOpt;
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use std::collections::HashMap;
use uuid::Uuid;
pub fn build_message_data(notification: &Notification) -> ApiResult<HashMap<&'static str, String>> {
let mut message_data = HashMap::new();
message_data.insert("chid", notification.subscription.channel_id.to_string());
if let Some(data) = ¬ification.data {
message_data.insert("body", data.clone());
message_data.insert_opt("con", notification.headers.encoding.as_ref());
message_data.insert_opt("enc", notification.headers.encryption.as_ref());
message_data.insert_opt("cryptokey", notification.headers.crypto_key.as_ref());
message_data.insert_opt("enckey", notification.headers.encryption_key.as_ref());
trace!(
"🔍 Sending Reliability ID: {:?}",
notification.subscription.reliability_id
);
message_data.insert_opt("rid", notification.subscription.reliability_id.as_ref());
}
Ok(message_data)
}
pub fn message_size_check(data: &[u8], max_data: usize) -> Result<(), RouterError> {
if data.len() > max_data {
trace!("Data is too long by {} bytes", data.len() - max_data);
Err(RouterError::TooMuchData(data.len() - max_data))
} else {
Ok(())
}
}
pub async fn handle_error(
error: RouterError,
metrics: &StatsdClient,
db: &dyn DbClient,
platform: &str,
app_id: &str,
uaid: Uuid,
vapid: Option<VapidHeaderWithKey>,
) -> ApiError {
match &error {
RouterError::Authentication => {
error!("Bridge authentication error");
incr_error_metric(
metrics,
platform,
app_id,
"authentication",
error.status(),
error.errno(),
);
}
RouterError::GCMAuthentication => {
warn!("GCM Authentication error");
incr_error_metric(
metrics,
platform,
app_id,
"gcm authentication",
error.status(),
error.errno(),
);
}
RouterError::RequestTimeout => {
info!("Bridge timeout");
incr_error_metric(
metrics,
platform,
app_id,
"timeout",
error.status(),
error.errno(),
);
}
RouterError::Connect(e) => {
warn!("Bridge unavailable: {}", e);
incr_error_metric(
metrics,
platform,
app_id,
"connection_unavailable",
error.status(),
error.errno(),
);
}
RouterError::NotFound => {
debug!("Bridge recipient not found, removing user");
incr_error_metric(
metrics,
platform,
app_id,
"recipient_gone",
error.status(),
error.errno(),
);
if let Err(e) = db.remove_user(&uaid).await {
warn!("Error while removing user due to bridge not_found: {}", e);
}
}
RouterError::Upstream { .. } => {
warn!("{}", error.to_string());
incr_error_metric(
metrics,
platform,
app_id,
"server_error",
error.status(),
error.errno(),
);
}
RouterError::TooMuchData(_) => {
incr_error_metric(
metrics,
platform,
app_id,
"too_much_data",
error.status(),
error.errno(),
);
}
_ => {
warn!("Unknown error while sending bridge request: {}", error);
incr_error_metric(
metrics,
platform,
app_id,
"unknown",
error.status(),
error.errno(),
);
}
}
let mut err = ApiError::from(error);
if let Some(Ok(claims)) = vapid.map(|v| v.vapid.claims()) {
let mut extras = err.extras.unwrap_or_default();
extras.extend([("sub".to_owned(), claims.sub.unwrap_or_default())]);
err.extras = Some(extras);
};
err
}
pub fn incr_error_metric(
metrics: &StatsdClient,
platform: &str,
app_id: &str,
reason: &str,
status: StatusCode,
errno: Option<usize>,
) {
metrics
.incr_with_tags("notification.bridge.error")
.with_tag("platform", platform)
.with_tag("app_id", app_id)
.with_tag("reason", reason)
.with_tag("error", &status.to_string())
.with_tag("errno", &errno.unwrap_or(0).to_string())
.send();
}
pub fn incr_success_metrics(
metrics: &StatsdClient,
platform: &str,
app_id: &str,
notification: &Notification,
) {
metrics
.incr_with_tags("notification.bridge.sent")
.with_tag("platform", platform)
.with_tag("app_id", app_id)
.send();
metrics
.count_with_tags(
"notification.message_data",
notification.data.as_ref().map(String::len).unwrap_or(0) as i64,
)
.with_tag("platform", platform)
.with_tag("app_id", app_id)
.with_tag("destination", "Direct")
.send();
metrics
.time_with_tags(
"notification.total_request_time",
(autopush_common::util::sec_since_epoch() - notification.timestamp) * 1000,
)
.with_tag("platform", platform)
.with_tag("app_id", app_id)
.send();
}
#[cfg(test)]
pub mod tests {
use crate::extractors::notification::Notification;
use crate::extractors::notification_headers::NotificationHeaders;
use crate::extractors::routers::RouterType;
use crate::extractors::subscription::Subscription;
use autopush_common::db::User;
use std::collections::HashMap;
use uuid::Uuid;
pub const CHANNEL_ID: &str = "deadbeef-13f9-4639-87f9-2ff731824f34";
pub fn channel_id() -> Uuid {
Uuid::parse_str(CHANNEL_ID).unwrap()
}
pub fn make_notification(
router_data: HashMap<String, serde_json::Value>,
data: Option<String>,
router_type: RouterType,
) -> Notification {
let user = User::builder()
.router_data(router_data)
.router_type(router_type.to_string())
.build()
.unwrap();
Notification {
message_id: "test-message-id".to_string(),
subscription: Subscription {
user,
channel_id: channel_id(),
vapid: None,
reliability_id: None,
},
headers: NotificationHeaders {
ttl: 0,
topic: Some("test-topic".to_string()),
encoding: Some("test-encoding".to_string()),
encryption: Some("test-encryption".to_string()),
encryption_key: Some("test-encryption-key".to_string()),
crypto_key: Some("test-crypto-key".to_string()),
},
timestamp: 0,
sort_key_timestamp: 0,
data,
}
}
}