1use autopush_common::db::client::DbClient;
2#[cfg(feature = "reliable_report")]
3use autopush_common::reliability::{PushReliability, ReliabilityState};
4
5use crate::error::ApiResult;
6use crate::extractors::notification::Notification;
7use crate::extractors::router_data_input::RouterDataInput;
8use crate::routers::common::{build_message_data, handle_error, incr_success_metrics};
9use crate::routers::fcm::client::FcmClient;
10use crate::routers::fcm::error::FcmError;
11use crate::routers::fcm::settings::{FcmServerCredential, FcmSettings};
12use crate::routers::{Router, RouterError, RouterResponse};
13use async_trait::async_trait;
14use cadence::StatsdClient;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use url::Url;
19use uuid::Uuid;
20
21pub struct FcmRouter {
23 settings: FcmSettings,
24 endpoint_url: Url,
25 metrics: Arc<StatsdClient>,
26 db: Box<dyn DbClient>,
27 clients: HashMap<String, FcmClient>,
29 #[cfg(feature = "reliable_report")]
30 reliability: Arc<PushReliability>,
31 max_notification_ttl_secs: i64,
32}
33
34impl FcmRouter {
35 pub async fn new(
37 settings: FcmSettings,
38 endpoint_url: Url,
39 http: reqwest::Client,
40 metrics: Arc<StatsdClient>,
41 db: Box<dyn DbClient>,
42 #[cfg(feature = "reliable_report")] reliability: Arc<PushReliability>,
43 ) -> Result<Self, FcmError> {
44 let server_credentials = settings.credentials()?;
45 let clients = Self::create_clients(&settings, server_credentials, http.clone())
46 .await
47 .map_err(FcmError::OAuthClientBuild)?;
48 let max_ttl = settings.max_ttl;
49 Ok(Self {
50 settings,
51 endpoint_url,
52 metrics,
53 db,
54 clients,
55 #[cfg(feature = "reliable_report")]
56 reliability,
57 max_notification_ttl_secs: max_ttl,
58 })
59 }
60
61 async fn create_clients(
63 settings: &FcmSettings,
64 server_credentials: HashMap<String, FcmServerCredential>,
65 http: reqwest::Client,
66 ) -> std::io::Result<HashMap<String, FcmClient>> {
67 let mut clients = HashMap::new();
68
69 for (profile, server_credential) in server_credentials {
70 clients.insert(
71 profile,
72 FcmClient::new(settings, server_credential, http.clone()).await?,
73 );
74 }
75 trace!("Initialized {} FCM clients", clients.len());
76 Ok(clients)
77 }
78
79 pub fn active(&self) -> bool {
81 !self.clients.is_empty()
82 }
83
84 fn routing_info(
90 &self,
91 router_data: &HashMap<String, Value>,
92 uaid: &Uuid,
93 ) -> ApiResult<(String, String)> {
94 let creds = router_data.get("creds").and_then(Value::as_object);
95 let routing_token = match router_data.get("token").and_then(Value::as_str) {
101 Some(v) => v.to_owned(),
102 None => {
103 warn!("No Registration token found for user {}", uaid.to_string());
104 return Err(FcmError::NoRegistrationToken.into());
105 }
106 };
107 let app_id = match router_data.get("app_id").and_then(Value::as_str) {
108 Some(v) => v.to_owned(),
109 None => {
110 if creds.is_none() {
111 warn!("No App_id found for user {}", uaid.to_string());
112 return Err(FcmError::NoAppId.into());
113 }
114 match creds
115 .unwrap()
116 .get("senderID")
117 .map(|v| v.as_str())
118 .unwrap_or(None)
119 {
120 Some(v) => v.to_owned(),
121 None => return Err(FcmError::NoAppId.into()),
122 }
123 }
124 };
125 Ok((routing_token, app_id))
126 }
127}
128
129#[async_trait(?Send)]
130impl Router for FcmRouter {
131 fn register(
132 &self,
133 router_data_input: &RouterDataInput,
134 app_id: &str,
135 ) -> Result<HashMap<String, Value>, RouterError> {
136 if !self.clients.contains_key(app_id) {
137 return Err(FcmError::InvalidAppId(app_id.to_owned()).into());
138 }
139
140 let mut router_data = HashMap::new();
141 router_data.insert(
142 "token".to_string(),
143 serde_json::to_value(&router_data_input.token).unwrap(),
144 );
145 router_data.insert("app_id".to_string(), serde_json::to_value(app_id).unwrap());
146
147 Ok(router_data)
148 }
149
150 #[allow(unused_mut)]
151 async fn route_notification(
152 &self,
153 mut notification: Notification,
154 ) -> ApiResult<RouterResponse> {
155 debug!(
156 "Sending FCM notification to UAID {}",
157 notification.subscription.user.uaid
158 );
159 trace!("Notification = {:?}", notification);
160
161 let router_data = notification
162 .subscription
163 .user
164 .router_data
165 .as_ref()
166 .ok_or(FcmError::NoRegistrationToken)?;
167
168 let (routing_token, app_id) =
169 self.routing_info(router_data, ¬ification.subscription.user.uaid)?;
170 let ttl = (self.max_notification_ttl_secs as u64)
171 .min(self.settings.min_ttl.max(notification.headers.ttl as u64));
172
173 let client = self
175 .clients
176 .get(&app_id)
177 .ok_or_else(|| FcmError::InvalidAppId(app_id.clone()))?;
178
179 let message_data = build_message_data(¬ification)?;
180 let platform = "fcmv1";
181 trace!("Sending message to {platform}: [{:?}]", &app_id);
182 if let Err(e) = client.send(message_data, routing_token, ttl).await {
183 #[cfg(feature = "reliable_report")]
184 notification
185 .record_reliability(&self.reliability, ReliabilityState::Errored)
186 .await;
187 return Err(handle_error(
188 e,
189 &self.metrics,
190 self.db.as_ref(),
191 platform,
192 &app_id,
193 notification.subscription.user.uaid,
194 notification.subscription.vapid.clone(),
195 )
196 .await);
197 };
198 incr_success_metrics(&self.metrics, platform, &app_id, ¬ification);
199 #[cfg(feature = "reliable_report")]
200 notification
205 .record_reliability(&self.reliability, ReliabilityState::BridgeTransmitted)
206 .await;
207 trace!("Send request was successful");
209
210 Ok(RouterResponse::success(
211 self.endpoint_url
212 .join(&format!("/m/{}", notification.message_id))
213 .expect("Message ID is not URL-safe")
214 .to_string(),
215 notification.headers.ttl as usize,
216 ))
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use crate::error::ApiErrorKind;
223 use crate::extractors::routers::RouterType;
224 use crate::routers::common::tests::{make_notification, CHANNEL_ID};
225 use crate::routers::fcm::client::tests::{
226 make_service_key, mock_fcm_endpoint_builder, mock_token_endpoint, GCM_PROJECT_ID,
227 PROJECT_ID,
228 };
229 use crate::routers::fcm::error::FcmError;
230 use crate::routers::fcm::router::FcmRouter;
231 use crate::routers::fcm::settings::FcmSettings;
232 use crate::routers::RouterError;
233 use crate::routers::{Router, RouterResponse};
234 use autopush_common::db::client::DbClient;
235 use autopush_common::db::mock::MockDbClient;
236 #[cfg(feature = "reliable_report")]
237 use autopush_common::{redis_util::MAX_TRANSACTION_LOOP, reliability::PushReliability};
238 use std::sync::Arc;
239
240 use cadence::StatsdClient;
241 use mockall::predicate;
242 use std::collections::HashMap;
243 use url::Url;
244
245 const FCM_TOKEN: &str = "test-token";
246
247 async fn make_router(
249 server: &mut mockito::ServerGuard,
250 fcm_credential: String,
251 gcm_credential: String,
252 db: Box<dyn DbClient>,
253 ) -> FcmRouter {
254 let url = &server.url();
255 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
256
257 FcmRouter::new(
258 FcmSettings {
259 base_url: Url::parse(url).unwrap(),
260 server_credentials: serde_json::json!({
261 "dev": {
262 "project_id": PROJECT_ID,
263 "credential": fcm_credential
264 },
265 GCM_PROJECT_ID: {
266 "project_id": GCM_PROJECT_ID,
267 "credential": gcm_credential,
268 "is_gcm": true,
269 }
270 })
271 .to_string(),
272 ..Default::default()
273 },
274 Url::parse("http://localhost:8080/").unwrap(),
275 reqwest::Client::new(),
276 metrics.clone(),
277 db.clone(),
278 #[cfg(feature = "reliable_report")]
279 Arc::new(
280 PushReliability::new(&None, db.clone(), &metrics, MAX_TRANSACTION_LOOP).unwrap(),
281 ),
282 )
283 .await
284 .unwrap()
285 }
286
287 fn default_router_data() -> HashMap<String, serde_json::Value> {
289 let mut map = HashMap::new();
290 map.insert(
291 "token".to_string(),
292 serde_json::to_value(FCM_TOKEN).unwrap(),
293 );
294 map.insert("app_id".to_string(), serde_json::to_value("dev").unwrap());
295 map
296 }
297
298 #[tokio::test]
300 async fn successful_routing_no_data() {
301 let mut server = mockito::Server::new_async().await;
302
303 let mdb = MockDbClient::new();
304 let db = mdb.into_boxed_arc();
305 let service_key = make_service_key(&server);
306 let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
307 assert!(router.active());
308 let _token_mock = mock_token_endpoint(&mut server).await;
309 let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
310 .match_body(
311 serde_json::json!({
312 "message": {
313 "android": {
314 "data": {
315 "chid": CHANNEL_ID
316 },
317 "ttl": "60s"
318 },
319 "token": "test-token"
320 }
321 })
322 .to_string()
323 .as_str(),
324 )
325 .create();
326 let notification = make_notification(default_router_data(), None, RouterType::FCM);
327
328 let result = router.route_notification(notification).await;
329 assert!(result.is_ok(), "result = {result:?}");
330 assert_eq!(
331 result.unwrap(),
332 RouterResponse::success("http://localhost:8080/m/test-message-id".to_string(), 0)
333 );
334 fcm_mock.assert();
335 }
336
337 #[tokio::test]
339 async fn successful_routing_with_data() {
340 let mut server = mockito::Server::new_async().await;
341
342 let mdb = MockDbClient::new();
343 let db = mdb.into_boxed_arc();
344 let service_key = make_service_key(&server);
345 let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
346 let _token_mock = mock_token_endpoint(&mut server).await;
347 let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
348 .match_body(
349 serde_json::json!({
350 "message": {
351 "android": {
352 "data": {
353 "chid": CHANNEL_ID,
354 "body": "test-data",
355 "con": "test-encoding",
356 "enc": "test-encryption",
357 "cryptokey": "test-crypto-key",
358 "enckey": "test-encryption-key"
359 },
360 "ttl": "60s"
361 },
362 "token": "test-token"
363 }
364 })
365 .to_string()
366 .as_str(),
367 )
368 .create();
369 let data = "test-data".to_string();
370 let notification = make_notification(default_router_data(), Some(data), RouterType::FCM);
371
372 let result = router.route_notification(notification).await;
373 assert!(result.is_ok(), "result = {result:?}");
374 assert_eq!(
375 result.unwrap(),
376 RouterResponse::success("http://localhost:8080/m/test-message-id".to_string(), 0)
377 );
378 fcm_mock.assert();
379 }
380
381 #[tokio::test]
384 async fn missing_client() {
385 let mut server = mockito::Server::new_async().await;
386
387 let db = MockDbClient::new().into_boxed_arc();
388 let service_key = make_service_key(&server);
389 let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
390 let _token_mock = mock_token_endpoint(&mut server).await;
391 let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
392 .expect(0)
393 .create_async()
394 .await;
395 let mut router_data = default_router_data();
396 let app_id = "app_id".to_string();
397 router_data.insert(
398 app_id.clone(),
399 serde_json::to_value("unknown-app-id").unwrap(),
400 );
401 let notification = make_notification(router_data, None, RouterType::FCM);
402
403 let result = router.route_notification(notification).await;
404 assert!(result.is_err());
405 assert!(
406 matches!(
407 &result.as_ref().unwrap_err().kind,
408 ApiErrorKind::Router(RouterError::Fcm(FcmError::InvalidAppId(_app_id)))
409 ),
410 "result = {result:?}"
411 );
412 fcm_mock.assert();
413 }
414
415 #[tokio::test]
417 async fn no_fcm_user() {
418 let mut server = mockito::Server::new_async().await;
419
420 let notification = make_notification(default_router_data(), None, RouterType::FCM);
421 let mut db = MockDbClient::new();
422 db.expect_remove_user()
423 .with(predicate::eq(notification.subscription.user.uaid))
424 .times(1)
425 .return_once(|_| Ok(()));
426
427 let service_key = make_service_key(&server);
428 let router = make_router(
429 &mut server,
430 service_key,
431 "whatever".to_string(),
432 db.into_boxed_arc(),
433 )
434 .await;
435 let _token_mock = mock_token_endpoint(&mut server).await;
436 let _fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
437 .with_status(404)
438 .with_body(r#"{"error":{"status":"NOT_FOUND","message":"test-message"}}"#)
439 .create_async()
440 .await;
441
442 let result = router.route_notification(notification).await;
443 assert!(result.is_err());
444 assert!(
445 matches!(
446 result.as_ref().unwrap_err().kind,
447 ApiErrorKind::Router(RouterError::NotFound)
448 ),
449 "result = {result:?}"
450 );
451 }
452}