autoconnect_ws_sm/identified/
on_server_notif.rs

1#[cfg(feature = "reliable_report")]
2use std::mem;
3
4use cadence::Counted;
5
6use autoconnect_common::protocol::{ServerMessage, ServerNotification};
7use autopush_common::{
8    db::CheckStorageResponse, metric_name::MetricName, metrics::StatsdClientExt,
9    notification::Notification, util::sec_since_epoch,
10};
11
12use super::WebPushClient;
13use crate::error::{SMError, SMErrorKind};
14
15impl WebPushClient {
16    /// Handle a `ServerNotification` for this user
17    ///
18    /// `ServerNotification::Disconnect` is emitted by the same autoconnect
19    /// node receiving it when a User has logged into that same node twice to
20    /// "Ghost" (disconnect) the first user's session for its second session.
21    ///
22    /// Other variants are emitted by autoendpoint
23    pub async fn on_server_notif(
24        &mut self,
25        snotif: ServerNotification,
26    ) -> Result<Vec<ServerMessage>, SMError> {
27        match snotif {
28            ServerNotification::Notification(notif) => Ok(vec![self.notif(notif)?]),
29            ServerNotification::CheckStorage => self.check_storage().await,
30            ServerNotification::Disconnect => Err(SMErrorKind::Ghost.into()),
31        }
32    }
33
34    /// After disconnecting from the `ClientRegistry`, moves any queued Direct
35    /// Push Notifications to unacked_direct_notifs (to be stored in the db on
36    /// `shutdown`)
37    pub fn on_server_notif_shutdown(&mut self, snotif: ServerNotification) {
38        trace!("WebPushClient::on_server_notif_shutdown");
39        if let ServerNotification::Notification(notif) = snotif {
40            let key = notif.version.clone();
41            self.ack_state.unacked_direct_notifs.insert(key, notif);
42        }
43    }
44
45    /// Send a Direct Push Notification to this user
46    fn notif(&mut self, notif: Notification) -> Result<ServerMessage, SMError> {
47        trace!("WebPushClient::notif Sending a direct notif");
48        // The notification we return here is sent directly to the client.
49        // No reliability state is recorded.
50        let response = notif.clone();
51        if notif.ttl != 0 {
52            // Consume the original notification by adding it to the
53            // unacked map. This will eventually record the state.
54            let key = notif.version.clone();
55            self.ack_state.unacked_direct_notifs.insert(key, notif);
56        }
57        self.emit_send_metrics(&response, "Direct");
58        Ok(ServerMessage::Notification(response))
59    }
60
61    /// Top level read of Push Notifications from storage
62    ///
63    /// Initializes the top level `check_storage` and `include_topic` flags and
64    /// runs `check_storage_loop`
65    pub(super) async fn check_storage(&mut self) -> Result<Vec<ServerMessage>, SMError> {
66        trace!("🗄️ WebPushClient::check_storage");
67        self.flags.check_storage = true;
68        self.flags.include_topic = true;
69        self.check_storage_loop().await
70    }
71
72    /// Loop the read of Push Notifications from storage
73    ///
74    /// Loops until any unexpired Push Notifications are read or there's no
75    /// more Notifications in storage
76    pub(super) async fn check_storage_loop(&mut self) -> Result<Vec<ServerMessage>, SMError> {
77        trace!("🗄️ WebPushClient::check_storage_loop");
78        while self.flags.check_storage {
79            let smsgs = self.check_storage_advance().await?;
80            if !smsgs.is_empty() {
81                self.check_msg_limit().await?;
82                return Ok(smsgs);
83            }
84        }
85        // No more notifications (check_storage = false). Despite returning no
86        // notifs we may have advanced through expired timestamp messages and
87        // need to increment_storage to mark them as deleted
88        if self.flags.increment_storage {
89            debug!("🗄️ WebPushClient::check_storage_loop increment_storage");
90            self.increment_storage().await?;
91        }
92        Ok(vec![])
93    }
94
95    /// Read a chunk (max count 10 returned) of Notifications from storage
96    ///
97    /// This filters out expired Notifications and may return an empty result
98    /// set when there's still pending Notifications to be read: so it should
99    /// be called in a loop to advance through all Notification records
100    async fn check_storage_advance(&mut self) -> Result<Vec<ServerMessage>, SMError> {
101        trace!("🗄️ WebPushClient::check_storage_advance");
102        let CheckStorageResponse {
103            include_topic,
104            mut messages,
105            timestamp,
106        } = self.do_check_storage().await?;
107
108        debug!(
109            "🗄️ WebPushClient::check_storage_advance \
110                 include_topic: {} -> {} \
111                 unacked_stored_highest: {:?} -> {:?}",
112            self.flags.include_topic,
113            include_topic,
114            self.ack_state.unacked_stored_highest,
115            timestamp
116        );
117        self.flags.include_topic = include_topic;
118        self.ack_state.unacked_stored_highest = timestamp;
119
120        if messages.is_empty() {
121            trace!("🗄️ WebPushClient::check_storage_advance finished");
122            self.flags.check_storage = false;
123            self.sent_from_storage = 0;
124            return Ok(vec![]);
125        }
126
127        // Filter out TTL expired messages
128        let now_sec = sec_since_epoch();
129        // Topic messages require immediate deletion from the db
130        let mut expired_messages = vec![];
131        // NOTE: Vec::extract_if (stabilizing soon) can negate the need for the
132        // inner msg.clone()
133        messages.retain(|msg| {
134            if !msg.expired(now_sec) {
135                return true;
136            }
137            if msg.sortkey_timestamp.is_none() {
138                expired_messages.push(msg.clone());
139            }
140            false
141        });
142        // TODO: A batch remove_messages would be nicer
143        #[allow(unused_mut)]
144        for mut msg in expired_messages {
145            let chidmessageid = msg.chidmessageid();
146            trace!("🉑 removing expired topic chidmessageid: {chidmessageid}");
147            self.app_state
148                .db
149                .remove_message(&self.uaid, &chidmessageid)
150                .await?;
151            #[cfg(feature = "reliable_report")]
152            msg.record_reliability(
153                &self.app_state.reliability,
154                autopush_common::reliability::ReliabilityState::Expired,
155            )
156            .await;
157        }
158
159        self.flags.increment_storage = !include_topic && timestamp.is_some();
160
161        if messages.is_empty() {
162            trace!("🗄️ WebPushClient::check_storage_advance empty response (filtered expired)");
163            return Ok(vec![]);
164        }
165
166        for msg in messages.iter() {
167            let key = msg.version.clone();
168            self.ack_state
169                .unacked_stored_notifs
170                .insert(key, msg.clone());
171        }
172        let smsgs: Vec<_> = messages
173            .into_iter()
174            .inspect(|msg| {
175                trace!("🗄️ WebPushClient::check_storage_advance Sending stored");
176                self.emit_send_metrics(msg, "Stored")
177            })
178            .map(ServerMessage::Notification)
179            .collect();
180
181        let count = smsgs.len() as u32;
182        debug!(
183            "🗄️ WebPushClient::check_storage_advance: sent_from_storage: {}, +{}",
184            self.sent_from_storage, count
185        );
186        self.sent_from_storage += count;
187        Ok(smsgs)
188    }
189
190    #[cfg(feature = "reliable_report")]
191    /// Record and transition the state for trackable messages.
192    async fn record_state(
193        &self,
194        messages: &mut Vec<Notification>,
195        state: autopush_common::reliability::ReliabilityState,
196    ) {
197        // *Note* because `.map()` is sync
198        // we can't call the async func without additional hoops.
199        for message in messages {
200            message
201                .record_reliability(&self.app_state.reliability, state)
202                .await;
203        }
204    }
205
206    /// Read a chunk (max count 10 returned) of Notifications from storage
207    ///
208    /// This alternates between reading Topic Notifications and Timestamp
209    /// Notifications which are stored separately in storage.
210    ///
211    /// Topic Messages differ in that they replace pending Notifications with
212    /// new ones if they have matching Topic names. They are used when a sender
213    /// desires a scenario where multiple Messages sent to an offline device
214    /// result in the user only seeing the latest Message when the device comes
215    /// online.
216    async fn do_check_storage(&self) -> Result<CheckStorageResponse, SMError> {
217        // start at the latest unacked timestamp or the previous, latest timestamp.
218        let timestamp = self
219            .ack_state
220            .unacked_stored_highest
221            .or(self.current_timestamp);
222        trace!("🗄️ WebPushClient::do_check_storage {:?}", &timestamp);
223        // if we're to include topic messages, do those first.
224        // NOTE: Bigtable can't fetch `current_timestamp`, so we can't rely on
225        // `fetch_topic_messages()` returning a reasonable timestamp.
226        let topic_resp = if self.flags.include_topic {
227            trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
228            // Get the most recent max 11 messages.
229            #[allow(unused_mut)]
230            let mut messages = self
231                .app_state
232                .db
233                .fetch_topic_messages(&self.uaid, 11)
234                .await?;
235            #[cfg(feature = "reliable_report")]
236            // Since we pulled these from storage, mark them as "retrieved"
237            self.record_state(
238                &mut messages.messages,
239                autopush_common::reliability::ReliabilityState::Retrieved,
240            )
241            .await;
242            messages
243        } else {
244            Default::default()
245        };
246        // if we have topic messages...
247        if !topic_resp.messages.is_empty() {
248            trace!(
249                "🗄️ WebPushClient::do_check_storage: Topic message returns: {:#?}",
250                topic_resp.messages
251            );
252            self.app_state
253                .metrics
254                .count_with_tags(
255                    "notification.message.retrieved",
256                    topic_resp.messages.len() as i64,
257                )
258                .with_tag("topic", "true")
259                .send();
260            return Ok(CheckStorageResponse {
261                include_topic: true,
262                messages: topic_resp.messages,
263                timestamp: topic_resp.timestamp,
264            });
265        }
266        // No topic messages, so carry on with normal ones, starting from the latest timestamp.
267        let timestamp = if self.flags.include_topic {
268            // See above, but Bigtable doesn't return the last message read timestamp when polling
269            // for topic messages. Instead, we'll use the explicitly set one we store in the User
270            // record and copy into the WebPushClient struct.
271            topic_resp.timestamp.or(self.current_timestamp)
272        } else {
273            timestamp
274        };
275        trace!(
276            "🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
277            timestamp
278        );
279        #[allow(unused_mut)]
280        let mut timestamp_resp = self
281            .app_state
282            .db
283            .fetch_timestamp_messages(&self.uaid, timestamp, 10)
284            .await?;
285        if !timestamp_resp.messages.is_empty() {
286            trace!(
287                "🗄️ WebPushClient::do_check_storage: Timestamp message returns: {:#?}",
288                timestamp_resp.messages
289            );
290            self.app_state
291                .metrics
292                .count_with_tags(
293                    "notification.message.retrieved",
294                    timestamp_resp.messages.len() as i64,
295                )
296                .with_tag("topic", "false")
297                .send();
298            #[cfg(feature = "reliable_report")]
299            // Since we pulled these from storage, mark them as "retrieved"
300            self.record_state(
301                &mut timestamp_resp.messages,
302                autopush_common::reliability::ReliabilityState::Retrieved,
303            )
304            .await;
305        }
306
307        Ok(CheckStorageResponse {
308            include_topic: false,
309            messages: timestamp_resp.messages,
310            // If we didn't get a timestamp off the last query, use the
311            // original value if passed one
312            timestamp: timestamp_resp.timestamp.or(timestamp),
313        })
314    }
315
316    /// Update the user's last Message read timestamp (for timestamp Messages)
317    ///
318    /// Called when a Client's Ack'd all timestamp messages sent to it to move
319    /// the timestamp Messages' "pointer". See
320    /// `AckState::unacked_stored_highest` for further information.
321    pub(super) async fn increment_storage(&mut self) -> Result<(), SMError> {
322        trace!(
323            "🗄️ WebPushClient::increment_storage: unacked_stored_highest: {:?}",
324            self.ack_state.unacked_stored_highest
325        );
326        let Some(timestamp) = self.ack_state.unacked_stored_highest else {
327            return Err(SMErrorKind::Internal(
328                "increment_storage w/ no unacked_stored_highest".to_owned(),
329            )
330            .into());
331        };
332        self.current_timestamp = Some(timestamp);
333        self.app_state
334            .db
335            .increment_storage(&self.uaid, timestamp)
336            .await?;
337        #[cfg(feature = "reliable_report")]
338        {
339            let mut notifs = mem::take(&mut self.ack_state.acked_stored_timestamp_notifs);
340            self.record_state(
341                &mut notifs,
342                autopush_common::reliability::ReliabilityState::Delivered,
343            )
344            .await;
345        }
346        self.flags.increment_storage = false;
347        Ok(())
348    }
349
350    /// Ensure this user hasn't exceeded the maximum allowed number of messages
351    /// read from storage (`Settings::msg_limit`)
352    ///
353    /// Drops the user record and returns the `SMErrorKind::UaidReset` error if
354    /// they have
355    async fn check_msg_limit(&mut self) -> Result<(), SMError> {
356        trace!(
357            "WebPushClient::check_msg_limit: sent_from_storage: {} msg_limit: {}",
358            self.sent_from_storage,
359            self.app_state.settings.msg_limit
360        );
361        if self.sent_from_storage > self.app_state.settings.msg_limit {
362            // Exceeded the max limit of stored messages: drop the user to
363            // trigger a re-register
364            self.app_state
365                .metrics
366                .incr_with_tags(MetricName::UaExpiration)
367                .with_tag("reason", "too_many_messages")
368                .send();
369            self.app_state.db.remove_user(&self.uaid).await?;
370            return Err(SMErrorKind::UaidReset.into());
371        }
372        Ok(())
373    }
374
375    /// Emit metrics for a Notification to be sent to the user
376    fn emit_send_metrics(&self, notif: &Notification, source: &'static str) {
377        let metrics = &self.app_state.metrics;
378        let ua_info = &self.ua_info;
379        metrics
380            .incr_with_tags(MetricName::UaNotificationSent)
381            .with_tag("source", source)
382            .with_tag("topic", &notif.topic.is_some().to_string())
383            .with_tag("os", &ua_info.metrics_os)
384            // TODO: include `internal` if meta is set
385            .send();
386        metrics
387            .count_with_tags(
388                "ua.message_data",
389                notif.data.as_ref().map_or(0, |data| data.len() as i64),
390            )
391            .with_tag("source", source)
392            .with_tag("os", &ua_info.metrics_os)
393            .send();
394    }
395}