autoendpoint/routers/fcm/
router.rs

1use autopush_common::db::client::DbClient;
2#[cfg(feature = "reliable_report")]
3use autopush_common::reliability::{PushReliability, ReliabilityState};
4
5use crate::error::ApiResult;
6use crate::extractors::notification::Notification;
7use crate::extractors::router_data_input::RouterDataInput;
8use crate::routers::common::{build_message_data, handle_error, incr_success_metrics};
9use crate::routers::fcm::client::FcmClient;
10use crate::routers::fcm::error::FcmError;
11use crate::routers::fcm::settings::{FcmServerCredential, FcmSettings};
12use crate::routers::{Router, RouterError, RouterResponse};
13use async_trait::async_trait;
14use cadence::StatsdClient;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use url::Url;
19use uuid::Uuid;
20
21/// Firebase Cloud Messaging router
22pub struct FcmRouter {
23    settings: FcmSettings,
24    endpoint_url: Url,
25    metrics: Arc<StatsdClient>,
26    db: Box<dyn DbClient>,
27    /// A map from application ID to an authenticated FCM client
28    clients: HashMap<String, FcmClient>,
29    #[cfg(feature = "reliable_report")]
30    reliability: Arc<PushReliability>,
31    max_notification_ttl_secs: i64,
32}
33
34impl FcmRouter {
35    /// Create a new `FcmRouter`
36    pub async fn new(
37        settings: FcmSettings,
38        endpoint_url: Url,
39        http: reqwest::Client,
40        metrics: Arc<StatsdClient>,
41        db: Box<dyn DbClient>,
42        #[cfg(feature = "reliable_report")] reliability: Arc<PushReliability>,
43    ) -> Result<Self, FcmError> {
44        let server_credentials = settings.credentials()?;
45        let clients = Self::create_clients(&settings, server_credentials, http.clone())
46            .await
47            .map_err(FcmError::OAuthClientBuild)?;
48        let max_ttl = settings.max_ttl;
49        Ok(Self {
50            settings,
51            endpoint_url,
52            metrics,
53            db,
54            clients,
55            #[cfg(feature = "reliable_report")]
56            reliability,
57            max_notification_ttl_secs: max_ttl,
58        })
59    }
60
61    /// Create FCM clients for each application
62    async fn create_clients(
63        settings: &FcmSettings,
64        server_credentials: HashMap<String, FcmServerCredential>,
65        http: reqwest::Client,
66    ) -> std::io::Result<HashMap<String, FcmClient>> {
67        let mut clients = HashMap::new();
68
69        for (profile, server_credential) in server_credentials {
70            clients.insert(
71                profile,
72                FcmClient::new(settings, server_credential, http.clone()).await?,
73            );
74        }
75        trace!("Initialized {} FCM clients", clients.len());
76        Ok(clients)
77    }
78
79    /// if we have any clients defined, this connection is "active"
80    pub fn active(&self) -> bool {
81        !self.clients.is_empty()
82    }
83
84    /// Do the gauntlet check to get the routing credentials, these are the
85    /// sender/project ID, and the subscription specific user routing token.
86    /// FCM stores the values in the top hash as `token` & `app_id`, GCM stores them
87    /// in a sub-hash as `creds.auth` and `creds.senderID`.
88    /// If any of these error out, it's probably because of a corrupted key.
89    fn routing_info(
90        &self,
91        router_data: &HashMap<String, Value>,
92        uaid: &Uuid,
93    ) -> ApiResult<(String, String)> {
94        let creds = router_data.get("creds").and_then(Value::as_object);
95        // GCM and FCM both should store the client registration_token as token in the router_data.
96        // There was some confusion about router table records that may store the client
97        // routing token in `creds.auth`, but it's believed that this a duplicate of the
98        // server authentication token and can be ignored since we use the value specified
99        // in the settings.
100        let routing_token = match router_data.get("token").and_then(Value::as_str) {
101            Some(v) => v.to_owned(),
102            None => {
103                warn!("No Registration token found for user {}", uaid.to_string());
104                return Err(FcmError::NoRegistrationToken.into());
105            }
106        };
107        let app_id = match router_data.get("app_id").and_then(Value::as_str) {
108            Some(v) => v.to_owned(),
109            None => {
110                if creds.is_none() {
111                    warn!("No App_id found for user {}", uaid.to_string());
112                    return Err(FcmError::NoAppId.into());
113                }
114                match creds
115                    .unwrap()
116                    .get("senderID")
117                    .map(|v| v.as_str())
118                    .unwrap_or(None)
119                {
120                    Some(v) => v.to_owned(),
121                    None => return Err(FcmError::NoAppId.into()),
122                }
123            }
124        };
125        Ok((routing_token, app_id))
126    }
127}
128
129#[async_trait(?Send)]
130impl Router for FcmRouter {
131    fn register(
132        &self,
133        router_data_input: &RouterDataInput,
134        app_id: &str,
135    ) -> Result<HashMap<String, Value>, RouterError> {
136        if !self.clients.contains_key(app_id) {
137            return Err(FcmError::InvalidAppId(app_id.to_owned()).into());
138        }
139
140        let mut router_data = HashMap::new();
141        router_data.insert(
142            "token".to_string(),
143            serde_json::to_value(&router_data_input.token).unwrap(),
144        );
145        router_data.insert("app_id".to_string(), serde_json::to_value(app_id).unwrap());
146
147        Ok(router_data)
148    }
149
150    #[allow(unused_mut)]
151    async fn route_notification(
152        &self,
153        mut notification: Notification,
154    ) -> ApiResult<RouterResponse> {
155        debug!(
156            "Sending FCM notification to UAID {}",
157            notification.subscription.user.uaid
158        );
159        trace!("Notification = {:?}", notification);
160
161        let router_data = notification
162            .subscription
163            .user
164            .router_data
165            .as_ref()
166            .ok_or(FcmError::NoRegistrationToken)?;
167
168        let (routing_token, app_id) =
169            self.routing_info(router_data, &notification.subscription.user.uaid)?;
170        let ttl = (self.max_notification_ttl_secs as u64)
171            .min(self.settings.min_ttl.max(notification.headers.ttl as u64));
172
173        // Send the notification to FCM
174        let client = self
175            .clients
176            .get(&app_id)
177            .ok_or_else(|| FcmError::InvalidAppId(app_id.clone()))?;
178
179        let message_data = build_message_data(&notification)?;
180        let platform = "fcmv1";
181        trace!("Sending message to {platform}: [{:?}]", &app_id);
182        if let Err(e) = client.send(message_data, routing_token, ttl).await {
183            #[cfg(feature = "reliable_report")]
184            notification
185                .record_reliability(&self.reliability, ReliabilityState::Errored)
186                .await;
187            return Err(handle_error(
188                e,
189                &self.metrics,
190                self.db.as_ref(),
191                platform,
192                &app_id,
193                notification.subscription.user.uaid,
194                notification.subscription.vapid.clone(),
195            )
196            .await);
197        };
198        incr_success_metrics(&self.metrics, platform, &app_id, &notification);
199        #[cfg(feature = "reliable_report")]
200        // Record that we've sent the message out to FCM.
201        // We can't set the state here because the notification isn't
202        // mutable, but we are also essentially consuming the
203        // notification nothing else should modify it.
204        notification
205            .record_reliability(&self.reliability, ReliabilityState::BridgeTransmitted)
206            .await;
207        // Sent successfully, update metrics and make response
208        trace!("Send request was successful");
209
210        Ok(RouterResponse::success(
211            self.endpoint_url
212                .join(&format!("/m/{}", notification.message_id))
213                .expect("Message ID is not URL-safe")
214                .to_string(),
215            notification.headers.ttl as usize,
216        ))
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use crate::error::ApiErrorKind;
223    use crate::extractors::routers::RouterType;
224    use crate::routers::common::tests::{make_notification, CHANNEL_ID};
225    use crate::routers::fcm::client::tests::{
226        make_service_key, mock_fcm_endpoint_builder, mock_token_endpoint, GCM_PROJECT_ID,
227        PROJECT_ID,
228    };
229    use crate::routers::fcm::error::FcmError;
230    use crate::routers::fcm::router::FcmRouter;
231    use crate::routers::fcm::settings::FcmSettings;
232    use crate::routers::RouterError;
233    use crate::routers::{Router, RouterResponse};
234    use autopush_common::db::client::DbClient;
235    use autopush_common::db::mock::MockDbClient;
236    #[cfg(feature = "reliable_report")]
237    use autopush_common::{redis_util::MAX_TRANSACTION_LOOP, reliability::PushReliability};
238    use std::sync::Arc;
239
240    use cadence::StatsdClient;
241    use mockall::predicate;
242    use std::collections::HashMap;
243    use url::Url;
244
245    const FCM_TOKEN: &str = "test-token";
246
247    /// Create a router for testing, using the given service auth file
248    async fn make_router(
249        server: &mut mockito::ServerGuard,
250        fcm_credential: String,
251        gcm_credential: String,
252        db: Box<dyn DbClient>,
253    ) -> FcmRouter {
254        let url = &server.url();
255        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
256
257        FcmRouter::new(
258            FcmSettings {
259                base_url: Url::parse(url).unwrap(),
260                server_credentials: serde_json::json!({
261                    "dev": {
262                        "project_id": PROJECT_ID,
263                        "credential": fcm_credential
264                    },
265                    GCM_PROJECT_ID: {
266                        "project_id": GCM_PROJECT_ID,
267                        "credential": gcm_credential,
268                        "is_gcm": true,
269                    }
270                })
271                .to_string(),
272                ..Default::default()
273            },
274            Url::parse("http://localhost:8080/").unwrap(),
275            reqwest::Client::new(),
276            metrics.clone(),
277            db.clone(),
278            #[cfg(feature = "reliable_report")]
279            Arc::new(
280                PushReliability::new(&None, db.clone(), &metrics, MAX_TRANSACTION_LOOP).unwrap(),
281            ),
282        )
283        .await
284        .unwrap()
285    }
286
287    /// Create default user router data
288    fn default_router_data() -> HashMap<String, serde_json::Value> {
289        let mut map = HashMap::new();
290        map.insert(
291            "token".to_string(),
292            serde_json::to_value(FCM_TOKEN).unwrap(),
293        );
294        map.insert("app_id".to_string(), serde_json::to_value("dev").unwrap());
295        map
296    }
297
298    /// A notification with no data is sent to FCM
299    #[tokio::test]
300    async fn successful_routing_no_data() {
301        let mut server = mockito::Server::new_async().await;
302
303        let mdb = MockDbClient::new();
304        let db = mdb.into_boxed_arc();
305        let service_key = make_service_key(&server);
306        let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
307        assert!(router.active());
308        let _token_mock = mock_token_endpoint(&mut server).await;
309        let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
310            .match_body(
311                serde_json::json!({
312                    "message": {
313                        "android": {
314                            "data": {
315                                "chid": CHANNEL_ID
316                            },
317                            "ttl": "60s"
318                        },
319                        "token": "test-token"
320                    }
321                })
322                .to_string()
323                .as_str(),
324            )
325            .create();
326        let notification = make_notification(default_router_data(), None, RouterType::FCM);
327
328        let result = router.route_notification(notification).await;
329        assert!(result.is_ok(), "result = {result:?}");
330        assert_eq!(
331            result.unwrap(),
332            RouterResponse::success("http://localhost:8080/m/test-message-id".to_string(), 0)
333        );
334        fcm_mock.assert();
335    }
336
337    /// A notification with data is sent to FCM
338    #[tokio::test]
339    async fn successful_routing_with_data() {
340        let mut server = mockito::Server::new_async().await;
341
342        let mdb = MockDbClient::new();
343        let db = mdb.into_boxed_arc();
344        let service_key = make_service_key(&server);
345        let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
346        let _token_mock = mock_token_endpoint(&mut server).await;
347        let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
348            .match_body(
349                serde_json::json!({
350                    "message": {
351                        "android": {
352                            "data": {
353                                "chid": CHANNEL_ID,
354                                "body": "test-data",
355                                "con": "test-encoding",
356                                "enc": "test-encryption",
357                                "cryptokey": "test-crypto-key",
358                                "enckey": "test-encryption-key"
359                            },
360                            "ttl": "60s"
361                        },
362                        "token": "test-token"
363                    }
364                })
365                .to_string()
366                .as_str(),
367            )
368            .create();
369        let data = "test-data".to_string();
370        let notification = make_notification(default_router_data(), Some(data), RouterType::FCM);
371
372        let result = router.route_notification(notification).await;
373        assert!(result.is_ok(), "result = {result:?}");
374        assert_eq!(
375            result.unwrap(),
376            RouterResponse::success("http://localhost:8080/m/test-message-id".to_string(), 0)
377        );
378        fcm_mock.assert();
379    }
380
381    /// If there is no client for the user's app ID, an error is returned and
382    /// the FCM request is not sent.
383    #[tokio::test]
384    async fn missing_client() {
385        let mut server = mockito::Server::new_async().await;
386
387        let db = MockDbClient::new().into_boxed_arc();
388        let service_key = make_service_key(&server);
389        let router = make_router(&mut server, service_key, "whatever".to_string(), db).await;
390        let _token_mock = mock_token_endpoint(&mut server).await;
391        let fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
392            .expect(0)
393            .create_async()
394            .await;
395        let mut router_data = default_router_data();
396        let app_id = "app_id".to_string();
397        router_data.insert(
398            app_id.clone(),
399            serde_json::to_value("unknown-app-id").unwrap(),
400        );
401        let notification = make_notification(router_data, None, RouterType::FCM);
402
403        let result = router.route_notification(notification).await;
404        assert!(result.is_err());
405        assert!(
406            matches!(
407                &result.as_ref().unwrap_err().kind,
408                ApiErrorKind::Router(RouterError::Fcm(FcmError::InvalidAppId(_app_id)))
409            ),
410            "result = {result:?}"
411        );
412        fcm_mock.assert();
413    }
414
415    /// If the FCM user no longer exists (404), we drop the user from our database
416    #[tokio::test]
417    async fn no_fcm_user() {
418        let mut server = mockito::Server::new_async().await;
419
420        let notification = make_notification(default_router_data(), None, RouterType::FCM);
421        let mut db = MockDbClient::new();
422        db.expect_remove_user()
423            .with(predicate::eq(notification.subscription.user.uaid))
424            .times(1)
425            .return_once(|_| Ok(()));
426
427        let service_key = make_service_key(&server);
428        let router = make_router(
429            &mut server,
430            service_key,
431            "whatever".to_string(),
432            db.into_boxed_arc(),
433        )
434        .await;
435        let _token_mock = mock_token_endpoint(&mut server).await;
436        let _fcm_mock = mock_fcm_endpoint_builder(&mut server, PROJECT_ID)
437            .with_status(404)
438            .with_body(r#"{"error":{"status":"NOT_FOUND","message":"test-message"}}"#)
439            .create_async()
440            .await;
441
442        let result = router.route_notification(notification).await;
443        assert!(result.is_err());
444        assert!(
445            matches!(
446                result.as_ref().unwrap_err().kind,
447                ApiErrorKind::Router(RouterError::NotFound)
448            ),
449            "result = {result:?}"
450        );
451    }
452}