1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
use async_trait::async_trait;
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use reqwest::{Response, StatusCode};
use serde_json::Value;
use std::collections::{hash_map::RandomState, HashMap};
use std::sync::Arc;
use url::Url;
use uuid::Uuid;

use crate::error::{ApiError, ApiErrorKind, ApiResult};
use crate::extractors::{notification::Notification, router_data_input::RouterDataInput};
use crate::headers::vapid::VapidHeaderWithKey;
use crate::routers::{Router, RouterError, RouterResponse};

use autopush_common::db::{client::DbClient, User};

/// The router for desktop user agents.
///
/// These agents are connected via an Autopush connection server. The correct
/// server is located via the database routing table. If the server is busy or
/// not available, the notification is stored in the database.
pub struct WebPushRouter {
    pub db: Box<dyn DbClient>,
    pub metrics: Arc<StatsdClient>,
    pub http: reqwest::Client,
    pub endpoint_url: Url,
}

#[async_trait(?Send)]
impl Router for WebPushRouter {
    fn register(
        &self,
        _router_input: &RouterDataInput,
        _app_id: &str,
    ) -> Result<HashMap<String, Value, RandomState>, RouterError> {
        // WebPush registration happens through the connection server
        Ok(HashMap::new())
    }

    async fn route_notification(&self, notification: &Notification) -> ApiResult<RouterResponse> {
        // The notification contains the original subscription information
        let user = &notification.subscription.user;
        debug!(
            "✉ Routing WebPush notification to UAID {}",
            notification.subscription.user.uaid
        );
        trace!("✉ Notification = {:?}", notification);

        // Check if there is a node connected to the client
        if let Some(node_id) = &user.node_id {
            trace!(
                "✉ User has a node ID, sending notification to node: {}",
                &node_id
            );

            // Try to send the notification to the node
            match self.send_notification(notification, node_id).await {
                Ok(response) => {
                    // The node might be busy, make sure it accepted the notification
                    if response.status() == 200 {
                        // The node has received the notification
                        trace!("✉ Node received notification");
                        return Ok(self.make_delivered_response(notification));
                    }

                    trace!(
                        "✉ Node did not receive the notification, response = {:?}",
                        response
                    );
                }
                Err(error) => {
                    if error.is_timeout() {
                        self.metrics.incr("error.node.timeout")?;
                    };
                    if error.is_connect() {
                        self.metrics.incr("error.node.connect")?;
                    }
                    debug!("✉ Error while sending webpush notification: {}", error);
                    self.remove_node_id(user, node_id).await?
                }
            }
        }

        if notification.headers.ttl == 0 {
            let topic = notification.headers.topic.is_some().to_string();
            trace!(
                "✉ Notification has a TTL of zero and was not successfully \
                 delivered, dropping it"
            );
            self.metrics
                .incr_with_tags("notification.message.expired")
                // TODO: include `internal` if meta is set.
                .with_tag("topic", &topic)
                .send();
            return Ok(self.make_delivered_response(notification));
        }

        // Save notification, node is not present or busy
        trace!("✉ Node is not present or busy, storing notification");
        self.store_notification(notification).await?;

        // Retrieve the user data again, they may have reconnected or the node
        // is no longer busy.
        let user = match self.db.get_user(&user.uaid).await {
            Ok(Some(user)) => user,
            Ok(None) => {
                trace!("✉ No user found, must have been deleted");
                return Err(self.handle_error(
                    ApiErrorKind::Router(RouterError::UserWasDeleted),
                    notification.subscription.vapid.clone(),
                ));
            }
            Err(e) => {
                // Database error, but we already stored the message so it's ok
                debug!("✉ Database error while re-fetching user: {}", e);
                return Ok(self.make_stored_response(notification));
            }
        };

        // Try to notify the node the user is currently connected to
        let node_id = match &user.node_id {
            Some(id) => id,
            // The user is not connected to a node, nothing more to do
            None => {
                trace!("✉ User is not connected to a node, returning stored response");
                return Ok(self.make_stored_response(notification));
            }
        };

        // Notify the node to check for messages
        trace!("✉ Notifying node to check for messages");
        match self.trigger_notification_check(&user.uaid, node_id).await {
            Ok(response) => {
                trace!("Response = {:?}", response);
                if response.status() == 200 {
                    trace!("✉ Node has delivered the message");
                    self.metrics
                        .time_with_tags(
                            "notification.total_request_time",
                            (notification.timestamp - autopush_common::util::sec_since_epoch())
                                * 1000,
                        )
                        .with_tag("platform", "websocket")
                        .with_tag("app_id", "direct")
                        .send();

                    Ok(self.make_delivered_response(notification))
                } else {
                    trace!("✉ Node has not delivered the message, returning stored response");
                    Ok(self.make_stored_response(notification))
                }
            }
            Err(error) => {
                // Can't communicate with the node, attempt to stop using it
                debug!("✉ Error while triggering notification check: {}", error);
                self.remove_node_id(&user, node_id).await?;
                Ok(self.make_stored_response(notification))
            }
        }
    }
}

impl WebPushRouter {
    /// Use the same sort of error chokepoint that all the mobile clients use.
    fn handle_error(&self, error: ApiErrorKind, vapid: Option<VapidHeaderWithKey>) -> ApiError {
        let mut err = ApiError::from(error);
        if let Some(Ok(claims)) = vapid.map(|v| v.vapid.claims()) {
            let mut extras = err.extras.unwrap_or_default();
            extras.extend([("sub".to_owned(), claims.sub)]);
            err.extras = Some(extras);
        };
        err
    }

    /// Send the notification to the node
    async fn send_notification(
        &self,
        notification: &Notification,
        node_id: &str,
    ) -> Result<Response, reqwest::Error> {
        let url = format!("{}/push/{}", node_id, notification.subscription.user.uaid);
        let notification = notification.serialize_for_delivery();

        self.http.put(&url).json(&notification).send().await
    }

    /// Notify the node to check for notifications for the user
    async fn trigger_notification_check(
        &self,
        uaid: &Uuid,
        node_id: &str,
    ) -> Result<Response, reqwest::Error> {
        let url = format!("{node_id}/notif/{uaid}");

        self.http.put(&url).send().await
    }

    /// Store a notification in the database
    async fn store_notification(&self, notification: &Notification) -> ApiResult<()> {
        self.db
            .save_message(
                &notification.subscription.user.uaid,
                notification.clone().into(),
            )
            .await
            .map_err(|e| {
                self.handle_error(
                    ApiErrorKind::Router(RouterError::SaveDb(
                        e,
                        // try to extract the `sub` from the VAPID clamis.
                        notification
                            .subscription
                            .vapid
                            .as_ref()
                            .map(|vapid| vapid.vapid.claims().map(|c| c.sub).unwrap_or_default()),
                    )),
                    notification.subscription.vapid.clone(),
                )
            })
    }

    /// Remove the node ID from a user. This is done if the user is no longer
    /// connected to the node.
    async fn remove_node_id(&self, user: &User, node_id: &str) -> ApiResult<()> {
        self.metrics.incr("updates.client.host_gone").ok();
        let removed = self
            .db
            .remove_node_id(&user.uaid, node_id, user.connected_at, &user.version)
            .await?;
        if !removed {
            debug!("✉ The node id was not removed");
        }
        Ok(())
    }

    /// Update metrics and create a response for when a notification has been directly forwarded to
    /// an autopush server.
    fn make_delivered_response(&self, notification: &Notification) -> RouterResponse {
        self.make_response(notification, "Direct", StatusCode::CREATED)
    }

    /// Update metrics and create a response for when a notification has been stored in the database
    /// for future transmission.
    fn make_stored_response(&self, notification: &Notification) -> RouterResponse {
        self.make_response(notification, "Stored", StatusCode::CREATED)
    }

    /// Update metrics and create a response after routing a notification
    fn make_response(
        &self,
        notification: &Notification,
        destination_tag: &str,
        status: StatusCode,
    ) -> RouterResponse {
        self.metrics
            .count_with_tags(
                "notification.message_data",
                notification.data.as_ref().map(String::len).unwrap_or(0) as i64,
            )
            .with_tag("destination", destination_tag)
            .send();

        RouterResponse {
            status: actix_http::StatusCode::from_u16(status.as_u16()).unwrap_or_default(),
            headers: {
                let mut map = HashMap::new();
                map.insert(
                    "Location",
                    self.endpoint_url
                        .join(&format!("/m/{}", notification.message_id))
                        .expect("Message ID is not URL-safe")
                        .to_string(),
                );
                map.insert("TTL", notification.headers.ttl.to_string());
                map
            },
            body: None,
        }
    }
}

#[cfg(test)]
mod test {
    use std::boxed::Box;
    use std::sync::Arc;

    use reqwest;

    use crate::extractors::subscription::tests::{make_vapid, PUB_KEY};
    use crate::headers::vapid::VapidClaims;
    use autopush_common::errors::ReportableError;

    use super::*;
    use autopush_common::db::mock::MockDbClient;

    fn make_router(db: Box<dyn DbClient>) -> WebPushRouter {
        WebPushRouter {
            db,
            metrics: Arc::new(StatsdClient::from_sink("autopush", cadence::NopMetricSink)),
            http: reqwest::Client::new(),
            endpoint_url: Url::parse("http://localhost:8080/").unwrap(),
        }
    }

    #[tokio::test]
    async fn pass_extras() {
        let router = make_router(Box::new(MockDbClient::new()));
        let sub = "foo@example.com";
        let vapid = make_vapid(
            sub,
            "https://push.services.mozilla.org",
            VapidClaims::default_exp(),
            PUB_KEY.to_owned(),
        );

        let err = router.handle_error(ApiErrorKind::LogCheck, Some(vapid));
        assert!(err.extras().contains(&("sub", sub.to_owned())));
    }
}