autoconnect_ws_sm/identified/
mod.rs

1use std::{fmt, mem, sync::Arc};
2
3use actix_web::rt;
4use cadence::Timed;
5use futures::channel::mpsc;
6use uuid::Uuid;
7
8use autoconnect_common::{
9    broadcast::{Broadcast, BroadcastSubs},
10    protocol::{ServerMessage, ServerNotification},
11};
12
13use autoconnect_settings::{AppState, Settings};
14use autopush_common::{
15    db::User,
16    notification::Notification,
17    util::{ms_since_epoch, user_agent::UserAgentInfo},
18};
19
20use crate::error::{SMError, SMErrorKind};
21
22mod on_client_msg;
23mod on_server_notif;
24
25/// A WebPush Client that's successfully identified itself to the server via a
26/// Hello message.
27///
28/// The `webpush_ws` handler feeds input from both the WebSocket connection
29/// (`ClientMessage`) and the `ClientRegistry` (`ServerNotification`)
30/// triggered by autoendpoint to this type's `on_client_msg` and
31/// `on_server_notif` methods whose impls reside in their own modules.
32///
33/// Note the `check_storage` method (in the `on_server_notif` module) is
34/// triggered by both a `ServerNotification` and also the `new` constructor
35pub struct WebPushClient {
36    /// Push User Agent identifier. Each Push client recieves a unique UAID
37    pub uaid: Uuid,
38    /// Unique, local (to each autoconnect instance) identifier
39    pub uid: Uuid,
40    /// The User Agent information block derived from the User-Agent header
41    pub ua_info: UserAgentInfo,
42
43    /// Broadcast Subscriptions this Client is subscribed to
44    broadcast_subs: BroadcastSubs,
45
46    /// Set of session specific flags
47    flags: ClientFlags,
48    /// Notification Ack(knowledgement) related state
49    ack_state: AckState,
50    /// Count of messages sent from storage (for enforcing
51    /// `settings.msg_limit`). Resets to 0 when storage is emptied
52    sent_from_storage: u32,
53    /// Exists for new User records: these are not written to the db during
54    /// Hello, instead they're lazily added to the db on their first Register
55    /// message
56    deferred_add_user: Option<User>,
57
58    /// WebPush Session Statistics
59    stats: SessionStatistics,
60
61    /// Timestamp of when the UA connected (used by database lookup, thus u64)
62    connected_at: u64,
63    /// Timestamp of the last WebPush Ping message
64    last_ping: u64,
65    /// The last notification timestamp.
66    // TODO: RENAME THIS TO `last_notification_timestamp`
67    current_timestamp: Option<u64>,
68
69    app_state: Arc<AppState>,
70}
71
72impl fmt::Debug for WebPushClient {
73    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
74        fmt.debug_struct("WebPushClient")
75            .field("uaid", &self.uaid)
76            .field("uid", &self.uid)
77            .field("ua_info", &self.ua_info)
78            .field("broadcast_subs", &self.broadcast_subs)
79            .field("flags", &self.flags)
80            .field("ack_state", &self.ack_state)
81            .field("sent_from_storage", &self.sent_from_storage)
82            .field("deferred_add_user", &self.deferred_add_user)
83            .field("stats", &self.stats)
84            .field("connected_at", &self.connected_at)
85            .field("last_ping", &self.last_ping)
86            .finish()
87    }
88}
89
90impl WebPushClient {
91    #[allow(clippy::too_many_arguments)]
92    pub async fn new(
93        uaid: Uuid,
94        ua: String,
95        broadcast_subs: BroadcastSubs,
96        flags: ClientFlags,
97        connected_at: u64,
98        current_timestamp: Option<u64>,
99        deferred_add_user: Option<User>,
100        app_state: Arc<AppState>,
101    ) -> Result<(Self, Vec<ServerMessage>), SMError> {
102        trace!("👁‍🗨WebPushClient::new");
103        let stats = SessionStatistics {
104            existing_uaid: deferred_add_user.is_none(),
105            ..Default::default()
106        };
107        let mut client = WebPushClient {
108            uaid,
109            uid: Uuid::new_v4(),
110            ua_info: UserAgentInfo::from(ua.as_str()),
111            broadcast_subs,
112            flags,
113            ack_state: Default::default(),
114            sent_from_storage: Default::default(),
115            connected_at,
116            current_timestamp,
117            deferred_add_user,
118            last_ping: Default::default(),
119            stats,
120            app_state,
121        };
122
123        let smsgs = if client.flags.check_storage {
124            let smsgs = client.check_storage().await?;
125            debug!(
126                "WebPushClient::new: check_storage smsgs.len(): {}",
127                smsgs.len()
128            );
129            smsgs
130        } else {
131            vec![]
132        };
133        Ok((client, smsgs))
134    }
135
136    /// Return a reference to `AppState`'s `Settings`
137    pub fn app_settings(&self) -> &Settings {
138        &self.app_state.settings
139    }
140
141    #[cfg(feature = "reliable_report")]
142    pub fn app_reliability(&self) -> &autopush_common::reliability::PushReliability {
143        &self.app_state.reliability
144    }
145
146    /// Connect this `WebPushClient` to the `ClientRegistry`
147    ///
148    /// Returning a `Stream` of `ServerNotification`s from the `ClientRegistry`
149    pub async fn registry_connect(&self) -> mpsc::UnboundedReceiver<ServerNotification> {
150        self.app_state.clients.connect(self.uaid, self.uid).await
151    }
152
153    /// Disconnect this `WebPushClient` from the `ClientRegistry`
154    pub async fn registry_disconnect(&self) {
155        // Ignore disconnect (Client wasn't connected) Errors
156        let _ = self
157            .app_state
158            .clients
159            .disconnect(&self.uaid, &self.uid)
160            .await;
161    }
162
163    /// Return the difference between the Client's Broadcast Subscriptions and
164    /// the this server's Broadcasts
165    pub async fn broadcast_delta(&mut self) -> Option<Vec<Broadcast>> {
166        self.app_state
167            .broadcaster
168            .read()
169            .await
170            .change_count_delta(&mut self.broadcast_subs)
171    }
172
173    /// Cleanup after the session has ended
174    pub fn shutdown(&mut self, reason: Option<String>) {
175        trace!("👁‍🗨WebPushClient::shutdown");
176        self.save_and_notify_unacked_direct_notifs();
177
178        let ua_info = &self.ua_info;
179        let stats = &self.stats;
180        let elapsed_sec = (ms_since_epoch() - self.connected_at) / 1_000;
181        self.app_state
182            .metrics
183            .time_with_tags("ua.connection.lifespan", elapsed_sec)
184            .with_tag("ua_os_family", &ua_info.metrics_os)
185            .with_tag("ua_browser_family", &ua_info.metrics_browser)
186            .send();
187
188        // Log out the final stats message
189        info!("Session";
190            "uaid_hash" => self.uaid.as_simple().to_string(),
191            "uaid_reset" => self.flags.old_record_version,
192            "existing_uaid" => stats.existing_uaid,
193            "connection_type" => "webpush",
194            "ua_name" => &ua_info.browser_name,
195            "ua_os_family" => &ua_info.metrics_os,
196            "ua_os_ver" => &ua_info.os_version,
197            "ua_browser_family" => &ua_info.metrics_browser,
198            "ua_browser_ver" => &ua_info.browser_version,
199            "ua_category" => &ua_info.category,
200            "connection_time" => elapsed_sec,
201            "direct_acked" => stats.direct_acked,
202            "direct_storage" => stats.direct_storage,
203            "stored_retrieved" => stats.stored_retrieved,
204            "stored_acked" => stats.stored_acked,
205            "nacks" => stats.nacks,
206            "registers" => stats.registers,
207            "unregisters" => stats.unregisters,
208            "disconnect_reason" => reason.unwrap_or_else(|| "".to_owned()),
209        );
210    }
211
212    /// Save any Direct unAck'd messages to the db (on shutdown)
213    ///
214    /// Direct messages are solely stored in memory until Ack'd by the Client,
215    /// so on shutdown, any not Ack'd are stored in the db to not be lost
216    fn save_and_notify_unacked_direct_notifs(&mut self) {
217        let mut notifs = mem::take(&mut self.ack_state.unacked_direct_notifs);
218        trace!(
219            "👁‍🗨WebPushClient::save_and_notify_unacked_direct_notifs len: {}",
220            notifs.len()
221        );
222        if notifs.is_empty() {
223            return;
224        }
225
226        self.stats.direct_storage += notifs.len() as i32;
227        // TODO: clarify this comment re the Python version
228        // Ensure we don't store these as legacy by setting a 0 as the
229        // sortkey_timestamp. This ensures the Python side doesn't mark it as
230        // legacy during conversion and still get the correct default us_time
231        // when saving
232        for notif in &mut notifs {
233            notif.sortkey_timestamp = Some(0);
234        }
235
236        let app_state = Arc::clone(&self.app_state);
237        let uaid = self.uaid;
238        let connected_at = self.connected_at;
239        rt::spawn(async move {
240            #[cfg(not(feature = "reliable_report"))]
241            app_state.db.save_messages(&uaid, notifs).await?;
242            #[cfg(feature = "reliable_report")]
243            {
244                app_state.db.save_messages(&uaid, notifs.clone()).await?;
245                for mut notif in notifs {
246                    notif
247                        .record_reliability(
248                            &app_state.reliability,
249                            autopush_common::reliability::ReliabilityState::Stored,
250                        )
251                        .await;
252                }
253            }
254            debug!("Finished saving unacked direct notifs, checking for reconnect");
255            let Some(user) = app_state.db.get_user(&uaid).await? else {
256                return Err(SMErrorKind::Internal(format!(
257                    "User not found for unacked direct notifs: {uaid}"
258                )));
259            };
260            if connected_at == user.connected_at {
261                return Ok(());
262            }
263            if let Some(node_id) = user.node_id {
264                app_state
265                    .http
266                    .put(format!("{}/notif/{}", node_id, uaid.as_simple()))
267                    .send()
268                    .await?
269                    .error_for_status()?;
270            }
271            Ok(())
272        });
273    }
274
275    /// Add User information and tags for this Client to a Sentry Event
276    pub fn add_sentry_info(self, event: &mut sentry::protocol::Event) {
277        event.user = Some(sentry::User {
278            id: Some(self.uaid.as_simple().to_string()),
279            ..Default::default()
280        });
281        let ua_info = self.ua_info;
282        event
283            .tags
284            .insert("ua_name".to_owned(), ua_info.browser_name);
285        event
286            .tags
287            .insert("ua_os_family".to_owned(), ua_info.metrics_os);
288        event
289            .tags
290            .insert("ua_os_ver".to_owned(), ua_info.os_version);
291        event
292            .tags
293            .insert("ua_browser_family".to_owned(), ua_info.metrics_browser);
294        event
295            .tags
296            .insert("ua_browser_ver".to_owned(), ua_info.browser_version);
297    }
298}
299
300#[derive(Debug)]
301pub struct ClientFlags {
302    /// Whether check_storage queries for topic (not "timestamped") messages
303    pub include_topic: bool,
304    /// Flags the need to increment the last read for timestamp for timestamped messages
305    pub increment_storage: bool,
306    /// Whether this client needs to check storage for messages
307    pub check_storage: bool,
308    /// Flags the need to drop the user record
309    pub old_record_version: bool,
310    /// First time a user has connected "today"
311    pub emit_channel_metrics: bool,
312}
313
314impl Default for ClientFlags {
315    fn default() -> Self {
316        Self {
317            include_topic: true,
318            increment_storage: false,
319            check_storage: false,
320            old_record_version: false,
321            emit_channel_metrics: false,
322        }
323    }
324}
325
326/// WebPush Session Statistics
327///
328/// Tracks statistics about the session that are logged when the session's
329/// closed
330#[derive(Debug, Default)]
331pub struct SessionStatistics {
332    /// Number of acknowledged messages that were sent directly (not via storage)
333    direct_acked: i32,
334    /// Number of messages sent to storage
335    direct_storage: i32,
336    /// Number of messages taken from storage
337    stored_retrieved: i32,
338    /// Number of message pulled from storage and acknowledged
339    stored_acked: i32,
340    /// Number of messages total that are not acknowledged.
341    nacks: i32,
342    /// Number of unregister requests
343    unregisters: i32,
344    /// Number of register requests
345    registers: i32,
346    /// Whether this uaid was previously registered
347    existing_uaid: bool,
348}
349
350/// Record of Notifications sent to the Client.
351#[derive(Debug, Default)]
352struct AckState {
353    /// List of unAck'd directly sent (never stored) notifications
354    unacked_direct_notifs: Vec<Notification>,
355    /// List of unAck'd sent notifications from storage
356    unacked_stored_notifs: Vec<Notification>,
357    /// List of Ack'd timestamp notifications from storage, cleared
358    /// via `increment_storage`
359    #[cfg(feature = "reliable_report")]
360    acked_stored_timestamp_notifs: Vec<Notification>,
361    /// Either the `current_timestamp` value in storage (returned from
362    /// `fetch_messages`) or the last unAck'd timestamp Message's
363    /// `sortkey_timestamp` (returned from `fetch_timestamp_messages`).
364    ///
365    /// This represents the "pointer" to the beginning (more specifically the
366    /// record preceeding the beginning used in a Greater Than query) of the
367    /// next batch of timestamp Messages.
368    ///
369    /// Thus this value is:
370    ///
371    /// a) initially None, then
372    ///
373    /// b) retrieved from `current_timestamp` in storage then passed as the
374    /// `timestamp` to `fetch_timestamp_messages`. When all of those timestamp
375    /// Messages are Ack'd, this value's then
376    ///
377    /// c) written back to `current_timestamp` in storage via
378    /// `increment_storage`
379    unacked_stored_highest: Option<u64>,
380}
381
382impl AckState {
383    /// Whether the Client has outstanding notifications sent to it that it has
384    /// yet to Ack
385    fn unacked_notifs(&self) -> bool {
386        !self.unacked_stored_notifs.is_empty() || !self.unacked_direct_notifs.is_empty()
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use std::sync::Arc;
393
394    use uuid::Uuid;
395
396    use autoconnect_common::{
397        protocol::{ClientMessage, ServerMessage, ServerNotification},
398        test_support::{DUMMY_CHID, DUMMY_UAID, UA},
399    };
400    use autoconnect_settings::AppState;
401    use autopush_common::{
402        db::{client::FetchMessageResponse, mock::MockDbClient},
403        notification::Notification,
404        util::{ms_since_epoch, sec_since_epoch},
405    };
406
407    use super::WebPushClient;
408
409    async fn wpclient(uaid: Uuid, app_state: AppState) -> (WebPushClient, Vec<ServerMessage>) {
410        WebPushClient::new(
411            uaid,
412            UA.to_owned(),
413            Default::default(),
414            Default::default(),
415            ms_since_epoch(),
416            None,
417            None,
418            Arc::new(app_state),
419        )
420        .await
421        .unwrap()
422    }
423
424    /// Generate a dummy timestamp `Notification`
425    fn new_timestamp_notif(channel_id: &Uuid, ttl: u64) -> Notification {
426        Notification {
427            channel_id: *channel_id,
428            ttl,
429            timestamp: sec_since_epoch(),
430            sortkey_timestamp: Some(ms_since_epoch()),
431            ..Default::default()
432        }
433    }
434
435    #[actix_rt::test]
436    async fn webpush_ping() {
437        let (mut client, _) = wpclient(DUMMY_UAID, Default::default()).await;
438        let pong = client.on_client_msg(ClientMessage::Ping).await.unwrap();
439        assert!(matches!(pong.as_slice(), [ServerMessage::Ping]));
440    }
441
442    #[actix_rt::test]
443    async fn expired_increments_storage() {
444        let mut db = MockDbClient::new();
445        let mut seq = mockall::Sequence::new();
446        let timestamp = sec_since_epoch();
447        // No topic messages
448        db.expect_fetch_topic_messages()
449            .times(1)
450            .in_sequence(&mut seq)
451            .return_once(move |_, _| {
452                Ok(FetchMessageResponse {
453                    timestamp: None,
454                    messages: vec![],
455                })
456            });
457        // Return expired notifs (default ttl of 0)
458        db.expect_fetch_timestamp_messages()
459            .times(1)
460            .in_sequence(&mut seq)
461            .withf(move |_, ts, _| ts.is_none())
462            .return_once(move |_, _, _| {
463                Ok(FetchMessageResponse {
464                    timestamp: Some(timestamp),
465                    messages: vec![
466                        new_timestamp_notif(&DUMMY_CHID, 0),
467                        new_timestamp_notif(&DUMMY_CHID, 0),
468                    ],
469                })
470            });
471        // EOF
472        db.expect_fetch_timestamp_messages()
473            .times(1)
474            .in_sequence(&mut seq)
475            .withf(move |_, ts, _| ts == &Some(timestamp))
476            .return_once(|_, _, _| {
477                Ok(FetchMessageResponse {
478                    timestamp: None,
479                    messages: vec![],
480                })
481            });
482        // Ensure increment_storage's called to advance the timestamp messages
483        // despite check_storage returning nothing (all filtered out as
484        // expired)
485        db.expect_increment_storage()
486            .times(1)
487            .in_sequence(&mut seq)
488            .withf(move |_, ts| ts == &timestamp)
489            .return_once(|_, _| Ok(()));
490
491        // No check_storage called here (via default ClientFlags)
492        let (mut client, _) = wpclient(
493            DUMMY_UAID,
494            AppState {
495                db: db.into_boxed_arc(),
496                ..Default::default()
497            },
498        )
499        .await;
500
501        let smsgs = client
502            .on_server_notif(ServerNotification::CheckStorage)
503            .await
504            .expect("CheckStorage failed");
505        assert!(smsgs.is_empty())
506    }
507}