autoendpoint/routers/fcm/
router.rs

1use autopush_common::db::client::DbClient;
2#[cfg(feature = "reliable_report")]
3use autopush_common::reliability::{PushReliability, ReliabilityState};
4
5use crate::error::ApiResult;
6use crate::extractors::notification::Notification;
7use crate::extractors::router_data_input::RouterDataInput;
8use crate::routers::common::{build_message_data, handle_error, incr_success_metrics};
9use crate::routers::fcm::client::FcmClient;
10use crate::routers::fcm::error::FcmError;
11use crate::routers::fcm::settings::{FcmServerCredential, FcmSettings};
12use crate::routers::{Router, RouterError, RouterResponse};
13use async_trait::async_trait;
14use cadence::StatsdClient;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use url::Url;
19use uuid::Uuid;
20
21/// Firebase Cloud Messaging router
22pub struct FcmRouter {
23    settings: FcmSettings,
24    endpoint_url: Url,
25    metrics: Arc<StatsdClient>,
26    db: Box<dyn DbClient>,
27    /// A map from application ID to an authenticated FCM client
28    clients: HashMap<String, FcmClient>,
29    #[cfg(feature = "reliable_report")]
30    reliability: Arc<PushReliability>,
31    max_notification_ttl_secs: i64,
32}
33
34impl FcmRouter {
35    /// Create a new `FcmRouter`
36    pub async fn new(
37        settings: FcmSettings,
38        endpoint_url: Url,
39        http: reqwest::Client,
40        metrics: Arc<StatsdClient>,
41        db: Box<dyn DbClient>,
42        #[cfg(feature = "reliable_report")] reliability: Arc<PushReliability>,
43    ) -> Result<Self, FcmError> {
44        let server_credentials = settings.credentials()?;
45        let clients = Self::create_clients(&settings, server_credentials, http.clone())
46            .await
47            .map_err(FcmError::OAuthClientBuild)?;
48        let max_ttl = settings.max_ttl;
49        Ok(Self {
50            settings,
51            endpoint_url,
52            metrics,
53            db,
54            clients,
55            #[cfg(feature = "reliable_report")]
56            reliability,
57            max_notification_ttl_secs: max_ttl,
58        })
59    }
60
61    /// Create FCM clients for each application
62    async fn create_clients(
63        settings: &FcmSettings,
64        server_credentials: HashMap<String, FcmServerCredential>,
65        http: reqwest::Client,
66    ) -> std::io::Result<HashMap<String, FcmClient>> {
67        let mut clients = HashMap::new();
68
69        for (profile, server_credential) in server_credentials {
70            clients.insert(
71                profile,
72                FcmClient::new(settings, server_credential, http.clone()).await?,
73            );
74        }
75        trace!("Initialized {} FCM clients", clients.len());
76        Ok(clients)
77    }
78
79    /// if we have any clients defined, this connection is "active"
80    pub fn active(&self) -> bool {
81        !self.clients.is_empty()
82    }
83
84    /// Do the gauntlet check to get the routing credentials, these are the
85    /// sender/project ID, and the subscription specific user routing token.
86    /// FCM stores the values in the top hash as `token` & `app_id`, GCM stores them
87    /// in a sub-hash as `creds.auth` and `creds.senderID`.
88    /// If any of these error out, it's probably because of a corrupted key.
89    fn routing_info(
90        &self,
91        router_data: &HashMap<String, Value>,
92        uaid: &Uuid,
93    ) -> ApiResult<(String, String)> {
94        let creds = router_data.get("creds").and_then(Value::as_object);
95        // GCM and FCM both should store the client registration_token as token in the router_data.
96        // There was some confusion about router table records that may store the client
97        // routing token in `creds.auth`, but it's believed that this a duplicate of the
98        // server authentication token and can be ignored since we use the value specified
99        // in the settings.
100        let routing_token = match router_data.get("token").and_then(Value::as_str) {
101            Some(v) => v.to_owned(),
102            None => {
103                warn!("No Registration token found for user {}", uaid.to_string());
104                return Err(FcmError::NoRegistrationToken.into());
105            }
106        };
107        let app_id = match router_data.get("app_id").and_then(Value::as_str) {
108            Some(v) => v.to_owned(),
109            None => {
110                if creds.is_none() {
111                    warn!("No App_id found for user {}", uaid.to_string());
112                    return Err(FcmError::NoAppId.into());
113                }
114                match creds
115                    .unwrap()
116                    .get("senderID")
117                    .map(|v| v.as_str())
118                    .unwrap_or(None)
119                {
120                    Some(v) => v.to_owned(),
121                    None => return Err(FcmError::NoAppId.into()),
122                }
123            }
124        };
125        Ok((routing_token, app_id))
126    }
127}
128
129#[async_trait(?Send)]
130impl Router for FcmRouter {
131    fn register(
132        &self,
133        router_data_input: &RouterDataInput,
134        app_id: &str,
135    ) -> Result<HashMap<String, Value>, RouterError> {
136        if !self.clients.contains_key(app_id) {
137            return Err(FcmError::InvalidAppId(app_id.to_owned()).into());
138        }
139
140        let mut router_data = HashMap::new();
141        router_data.insert(
142            "token".to_string(),
143            serde_json::to_value(&router_data_input.token).unwrap(),
144        );
145        router_data.insert("app_id".to_string(), serde_json::to_value(app_id).unwrap());
146
147        Ok(router_data)
148    }
149
150    #[allow(unused_mut)]
151    async fn route_notification(
152        &self,
153        mut notification: Notification,
154    ) -> ApiResult<RouterResponse> {
155        debug!(
156            "Sending FCM notification to UAID {}",
157            notification.subscription.user.uaid
158        );
159        trace!("Notification = {:?}", notification);
160
161        let router_data = notification
162            .subscription
163            .user
164            .router_data
165            .as_ref()
166            .ok_or(FcmError::NoRegistrationToken)?;
167
168        let (routing_token, app_id) =
169            self.routing_info(router_data, &notification.subscription.user.uaid)?;
170        let ttl = (self.max_notification_ttl_secs as u64)
171            .min(self.settings.min_ttl.max(notification.headers.ttl as u64));
172
173        // Send the notification to FCM
174        let client = self
175            .clients
176            .get(&app_id)
177            .ok_or_else(|| FcmError::InvalidAppId(app_id.clone()))?;
178
179        let message_data = build_message_data(&notification)?;
180        let platform = "fcmv1";
181        trace!("Sending message to {platform}: [{:?}]", &app_id);
182        if let Err(e) = client.send(message_data, routing_token, ttl).await {
183            #[cfg(feature = "reliable_report")]
184            notification
185                .record_reliability(&self.reliability, ReliabilityState::Errored)
186                .await;
187            return Err(handle_error(
188                e,
189                &self.metrics,
190                self.db.as_ref(),
191                platform,
192                &app_id,
193                notification.subscription.user.uaid,
194                notification.subscription.vapid.clone(),
195            )
196            .await);
197        };
198        incr_success_metrics(&self.metrics, platform, &app_id, &notification);
199        #[cfg(feature = "reliable_report")]
200        // Record that we've sent the message out to FCM.
201        // We can't set the state here because the notification isn't
202        // mutable, but we are also essentially consuming the
203        // notification nothing else should modify it.
204        notification
205            .record_reliability(&self.reliability, ReliabilityState::BridgeTransmitted)
206            .await;
207        // Sent successfully, update metrics and make response
208        trace!("Send request was successful");
209
210        Ok(RouterResponse::success(
211            self.endpoint_url
212                .join(&format!("/m/{}", notification.message_id))
213                .expect("Message ID is not URL-safe")
214                .to_string(),
215            notification.headers.ttl as usize,
216        ))
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use crate::error::ApiErrorKind;
223    use crate::extractors::routers::RouterType;
224    use crate::routers::RouterError;
225    use crate::routers::common::tests::{CHANNEL_ID, make_notification};
226    use crate::routers::fcm::client::tests::{
227        GCM_PROJECT_ID, PROJECT_ID, make_service_key, mock_fcm_endpoint_builder,
228        mock_token_endpoint,
229    };
230    use crate::routers::fcm::error::FcmError;
231    use crate::routers::fcm::router::FcmRouter;
232    use crate::routers::fcm::settings::FcmSettings;
233    use crate::routers::{Router, RouterResponse};
234    use autopush_common::db::client::DbClient;
235    use autopush_common::db::mock::MockDbClient;
236    #[cfg(feature = "reliable_report")]
237    use autopush_common::{redis_util::MAX_TRANSACTION_LOOP, reliability::PushReliability};
238    use std::sync::Arc;
239
240    use actix_web::http::StatusCode;
241    use cadence::StatsdClient;
242    use mockall::predicate;
243    use std::collections::HashMap;
244    use url::Url;
245
246    const FCM_TOKEN: &str = "test-token";
247
248    /// Create a router for testing, using the given service auth file
249    async fn make_router(
250        server: &mut mockito::ServerGuard,
251        fcm_credential: String,
252        gcm_credential: String,
253        db: Box<dyn DbClient>,
254    ) -> FcmRouter {
255        let url = &server.url();
256        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
257
258        FcmRouter::new(
259            FcmSettings {
260                base_url: Url::parse(url).unwrap(),
261                server_credentials: serde_json::json!({
262                    "dev": {
263                        "project_id": PROJECT_ID,
264                        "credential": fcm_credential
265                    },
266                    GCM_PROJECT_ID: {
267                        "project_id": GCM_PROJECT_ID,
268                        "credential": gcm_credential,
269                        "is_gcm": true,
270                    }
271                })
272                .to_string(),
273                ..Default::default()
274            },
275            Url::parse("http://localhost:8080/").unwrap(),
276            reqwest::Client::new(),
277            metrics.clone(),
278            db.clone(),
279            #[cfg(feature = "reliable_report")]
280            Arc::new(
281                PushReliability::new(&None, db.clone(), &metrics, MAX_TRANSACTION_LOOP).unwrap(),
282            ),
283        )
284        .await
285        .unwrap()
286    }
287
288    /// Create default user router data
289    fn default_router_data() -> HashMap<String, serde_json::Value> {
290        let mut map = HashMap::new();
291        map.insert(
292            "token".to_string(),
293            serde_json::to_value(FCM_TOKEN).unwrap(),
294        );
295        map.insert("app_id".to_string(), serde_json::to_value("dev").unwrap());
296        map
297    }
298
299    /// A notification with no data is sent to FCM
300    #[tokio::test]
301    async fn successful_routing_no_data() {
302        let mut server = mockito::Server::new_async().await;
303
304        let mdb = MockDbClient::new();
305        let db = mdb.into_boxed_arc();
306        let service_key = make_service_key(&server);
307        let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
308        assert!(router.active());
309        let _token_mock = mock_token_endpoint(&mut server).await;
310        let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
311            .match_body(
312                serde_json::json!({
313                    "message": {
314                        "android": {
315                            "data": {
316                                "chid": CHANNEL_ID
317                            },
318                            "ttl": "60s"
319                        },
320                        "token": "test-token"
321                    }
322                })
323                .to_string()
324                .as_str(),
325            )
326            .create();
327        let notification = make_notification(default_router_data(), None, RouterType::FCM);
328
329        let result = router.route_notification(notification).await;
330        assert!(result.is_ok(), "result = {result:?}");
331        assert_eq!(
332            result.unwrap(),
333            RouterResponse::success("http://localhost:8080/m/test-message-id".to_string(), 0)
334        );
335        fcm_mock.assert();
336    }
337
338    /// A notification with data is sent to FCM
339    #[tokio::test]
340    async fn successful_routing_with_data() {
341        let mut server = mockito::Server::new_async().await;
342
343        let mdb = MockDbClient::new();
344        let db = mdb.into_boxed_arc();
345        let service_key = make_service_key(&server);
346        let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
347        let _token_mock = mock_token_endpoint(&mut server).await;
348        let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
349            .match_body(
350                serde_json::json!({
351                    "message": {
352                        "android": {
353                            "data": {
354                                "chid": CHANNEL_ID,
355                                "body": "test-data",
356                                "con": "test-encoding",
357                                "enc": "test-encryption",
358                                "cryptokey": "test-crypto-key",
359                                "enckey": "test-encryption-key"
360                            },
361                            "ttl": "60s"
362                        },
363                        "token": "test-token"
364                    }
365                })
366                .to_string()
367                .as_str(),
368            )
369            .create();
370        let data = "test-data".to_string();
371        let notification = make_notification(default_router_data(), Some(data), RouterType::FCM);
372
373        let result = router.route_notification(notification).await;
374        assert!(result.is_ok(), "result = {result:?}");
375        assert_eq!(
376            result.unwrap(),
377            RouterResponse::success("http://localhost:8080/m/test-message-id".to_string(), 0)
378        );
379        fcm_mock.assert();
380    }
381
382    /// If there is no client for the user's app ID, an error is returned and
383    /// the FCM request is not sent.
384    #[tokio::test]
385    async fn missing_client() {
386        let mut server = mockito::Server::new_async().await;
387
388        let db = MockDbClient::new().into_boxed_arc();
389        let service_key = make_service_key(&server);
390        let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
391        let _token_mock = mock_token_endpoint(&mut server).await;
392        let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
393            .expect(0)
394            .create_async()
395            .await;
396        let mut router_data = default_router_data();
397        let app_id = "app_id".to_string();
398        router_data.insert(
399            app_id.clone(),
400            serde_json::to_value("unknown-app-id").unwrap(),
401        );
402        let notification = make_notification(router_data, None, RouterType::FCM);
403
404        let result = router.route_notification(notification).await;
405        assert!(result.is_err());
406        let result_kind = &result.as_ref().err().unwrap().kind;
407        assert!(
408            matches!(
409                &result_kind,
410                ApiErrorKind::Router(RouterError::Fcm(FcmError::InvalidAppId(_app_id)))
411            ),
412            "result = {result:?}"
413        );
414        assert_eq!(result_kind.status(), StatusCode::BAD_REQUEST);
415        fcm_mock.assert();
416    }
417
418    /// If the FCM user no longer exists (404), we drop the user from our database
419    #[tokio::test]
420    async fn no_fcm_user() {
421        let mut server = mockito::Server::new_async().await;
422
423        let notification = make_notification(default_router_data(), None, RouterType::FCM);
424        let mut db = MockDbClient::new();
425        db.expect_remove_user()
426            .with(predicate::eq(notification.subscription.user.uaid))
427            .times(1)
428            .return_once(|_| Ok(()));
429
430        let service_key = make_service_key(&server);
431        let router = make_router(
432            &mut server,
433            service_key,
434            "whatever".to_string(),
435            db.into_boxed_arc(),
436        )
437        .await;
438        let _token_mock = mock_token_endpoint(&mut server).await;
439        let _fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
440            .with_status(404)
441            .with_body(r#"{"error":{"status":"NOT_FOUND","message":"test-message"}}"#)
442            .create_async()
443            .await;
444
445        let result = router.route_notification(notification).await;
446        assert!(result.is_err());
447        assert!(
448            matches!(
449                result.as_ref().unwrap_err().kind,
450                ApiErrorKind::Router(RouterError::NotFound)
451            ),
452            "result = {result:?}"
453        );
454    }
455}