autoconnect_ws_sm/identified/
on_server_notif.rs

1use std::mem;
2
3use cadence::Counted;
4
5use autoconnect_common::protocol::{ServerMessage, ServerNotification};
6use autopush_common::{
7    db::CheckStorageResponse, metric_name::MetricName, metrics::StatsdClientExt,
8    notification::Notification, util::sec_since_epoch,
9};
10
11use super::WebPushClient;
12use crate::error::{SMError, SMErrorKind};
13
14impl WebPushClient {
15    /// Handle a `ServerNotification` for this user
16    ///
17    /// `ServerNotification::Disconnect` is emitted by the same autoconnect
18    /// node receiving it when a User has logged into that same node twice to
19    /// "Ghost" (disconnect) the first user's session for its second session.
20    ///
21    /// Other variants are emitted by autoendpoint
22    pub async fn on_server_notif(
23        &mut self,
24        snotif: ServerNotification,
25    ) -> Result<Vec<ServerMessage>, SMError> {
26        match snotif {
27            ServerNotification::Notification(notif) => Ok(vec![self.notif(notif)?]),
28            ServerNotification::CheckStorage => self.check_storage().await,
29            ServerNotification::Disconnect => Err(SMErrorKind::Ghost.into()),
30        }
31    }
32
33    /// After disconnecting from the `ClientRegistry`, moves any queued Direct
34    /// Push Notifications to unacked_direct_notifs (to be stored in the db on
35    /// `shutdown`)
36    pub fn on_server_notif_shutdown(&mut self, snotif: ServerNotification) {
37        trace!("WebPushClient::on_server_notif_shutdown");
38        if let ServerNotification::Notification(notif) = snotif {
39            self.ack_state.unacked_direct_notifs.push(notif);
40        }
41    }
42
43    /// Send a Direct Push Notification to this user
44    fn notif(&mut self, notif: Notification) -> Result<ServerMessage, SMError> {
45        trace!("WebPushClient::notif Sending a direct notif");
46        // The notification we return here is sent directly to the client.
47        // No reliability state is recorded.
48        let response = notif.clone();
49        if notif.ttl != 0 {
50            // Consume the original notification by adding it to the
51            // unacked stack. This will eventually record the state.
52            self.ack_state.unacked_direct_notifs.push(notif);
53        }
54        self.emit_send_metrics(&response, "Direct");
55        Ok(ServerMessage::Notification(response))
56    }
57
58    /// Top level read of Push Notifications from storage
59    ///
60    /// Initializes the top level `check_storage` and `include_topic` flags and
61    /// runs `check_storage_loop`
62    pub(super) async fn check_storage(&mut self) -> Result<Vec<ServerMessage>, SMError> {
63        trace!("🗄️ WebPushClient::check_storage");
64        self.flags.check_storage = true;
65        self.flags.include_topic = true;
66        self.check_storage_loop().await
67    }
68
69    /// Loop the read of Push Notifications from storage
70    ///
71    /// Loops until any unexpired Push Notifications are read or there's no
72    /// more Notifications in storage
73    pub(super) async fn check_storage_loop(&mut self) -> Result<Vec<ServerMessage>, SMError> {
74        trace!("🗄️ WebPushClient::check_storage_loop");
75        while self.flags.check_storage {
76            let smsgs = self.check_storage_advance().await?;
77            if !smsgs.is_empty() {
78                self.check_msg_limit().await?;
79                return Ok(smsgs);
80            }
81        }
82        // No more notifications (check_storage = false). Despite returning no
83        // notifs we may have advanced through expired timestamp messages and
84        // need to increment_storage to mark them as deleted
85        if self.flags.increment_storage {
86            debug!("🗄️ WebPushClient::check_storage_loop increment_storage");
87            self.increment_storage().await?;
88        }
89        Ok(vec![])
90    }
91
92    /// Read a chunk (max count 10 returned) of Notifications from storage
93    ///
94    /// This filters out expired Notifications and may return an empty result
95    /// set when there's still pending Notifications to be read: so it should
96    /// be called in a loop to advance through all Notification records
97    async fn check_storage_advance(&mut self) -> Result<Vec<ServerMessage>, SMError> {
98        trace!("🗄️ WebPushClient::check_storage_advance");
99        let CheckStorageResponse {
100            include_topic,
101            mut messages,
102            timestamp,
103        } = self.do_check_storage().await?;
104
105        debug!(
106            "🗄️ WebPushClient::check_storage_advance \
107                 include_topic: {} -> {} \
108                 unacked_stored_highest: {:?} -> {:?}",
109            self.flags.include_topic,
110            include_topic,
111            self.ack_state.unacked_stored_highest,
112            timestamp
113        );
114        self.flags.include_topic = include_topic;
115        self.ack_state.unacked_stored_highest = timestamp;
116
117        if messages.is_empty() {
118            trace!("🗄️ WebPushClient::check_storage_advance finished");
119            self.flags.check_storage = false;
120            self.sent_from_storage = 0;
121            return Ok(vec![]);
122        }
123
124        // Filter out TTL expired messages
125        let now_sec = sec_since_epoch();
126        // Topic messages require immediate deletion from the db
127        let mut expired_messages = vec![];
128        // NOTE: Vec::extract_if (stabilizing soon) can negate the need for the
129        // inner msg.clone()
130        messages.retain(|msg| {
131            if !msg.expired(now_sec) {
132                return true;
133            }
134            if msg.sortkey_timestamp.is_none() {
135                expired_messages.push(msg.clone());
136            }
137            false
138        });
139        // TODO: A batch remove_messages would be nicer
140        #[allow(unused_mut)]
141        for mut msg in expired_messages {
142            let chidmessageid = msg.chidmessageid();
143            trace!("🉑 removing expired topic chidmessageid: {chidmessageid}");
144            self.app_state
145                .db
146                .remove_message(&self.uaid, &chidmessageid)
147                .await?;
148            #[cfg(feature = "reliable_report")]
149            msg.record_reliability(
150                &self.app_state.reliability,
151                autopush_common::reliability::ReliabilityState::Expired,
152            )
153            .await;
154        }
155
156        self.flags.increment_storage = !include_topic && timestamp.is_some();
157
158        if messages.is_empty() {
159            trace!("🗄️ WebPushClient::check_storage_advance empty response (filtered expired)");
160            return Ok(vec![]);
161        }
162
163        self.ack_state
164            .unacked_stored_notifs
165            .extend(messages.iter().cloned());
166        let smsgs: Vec<_> = messages
167            .into_iter()
168            .inspect(|msg| {
169                trace!("🗄️ WebPushClient::check_storage_advance Sending stored");
170                self.emit_send_metrics(msg, "Stored")
171            })
172            .map(ServerMessage::Notification)
173            .collect();
174
175        let count = smsgs.len() as u32;
176        debug!(
177            "🗄️ WebPushClient::check_storage_advance: sent_from_storage: {}, +{}",
178            self.sent_from_storage, count
179        );
180        self.sent_from_storage += count;
181        Ok(smsgs)
182    }
183
184    #[cfg(feature = "reliable_report")]
185    /// Record and transition the state for trackable messages.
186    async fn record_state(
187        &self,
188        messages: &mut Vec<Notification>,
189        state: autopush_common::reliability::ReliabilityState,
190    ) {
191        // *Note* because `.map()` is sync
192        // we can't call the async func without additional hoops.
193        for message in messages {
194            message
195                .record_reliability(&self.app_state.reliability, state)
196                .await;
197        }
198    }
199
200    /// Read a chunk (max count 10 returned) of Notifications from storage
201    ///
202    /// This alternates between reading Topic Notifications and Timestamp
203    /// Notifications which are stored separately in storage.
204    ///
205    /// Topic Messages differ in that they replace pending Notifications with
206    /// new ones if they have matching Topic names. They are used when a sender
207    /// desires a scenario where multiple Messages sent to an offline device
208    /// result in the user only seeing the latest Message when the device comes
209    /// online.
210    async fn do_check_storage(&self) -> Result<CheckStorageResponse, SMError> {
211        // start at the latest unacked timestamp or the previous, latest timestamp.
212        let timestamp = self
213            .ack_state
214            .unacked_stored_highest
215            .or(self.current_timestamp);
216        trace!("🗄️ WebPushClient::do_check_storage {:?}", &timestamp);
217        // if we're to include topic messages, do those first.
218        // NOTE: Bigtable can't fetch `current_timestamp`, so we can't rely on
219        // `fetch_topic_messages()` returning a reasonable timestamp.
220        let topic_resp = if self.flags.include_topic {
221            trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
222            // Get the most recent max 11 messages.
223            #[allow(unused_mut)]
224            let mut messages = self
225                .app_state
226                .db
227                .fetch_topic_messages(&self.uaid, 11)
228                .await?;
229            #[cfg(feature = "reliable_report")]
230            // Since we pulled these from storage, mark them as "retrieved"
231            self.record_state(
232                &mut messages.messages,
233                autopush_common::reliability::ReliabilityState::Retrieved,
234            )
235            .await;
236            messages
237        } else {
238            Default::default()
239        };
240        // if we have topic messages...
241        if !topic_resp.messages.is_empty() {
242            trace!(
243                "🗄️ WebPushClient::do_check_storage: Topic message returns: {:#?}",
244                topic_resp.messages
245            );
246            self.app_state
247                .metrics
248                .count_with_tags(
249                    "notification.message.retrieved",
250                    topic_resp.messages.len() as i64,
251                )
252                .with_tag("topic", "true")
253                .send();
254            return Ok(CheckStorageResponse {
255                include_topic: true,
256                messages: topic_resp.messages,
257                timestamp: topic_resp.timestamp,
258            });
259        }
260        // No topic messages, so carry on with normal ones, starting from the latest timestamp.
261        let timestamp = if self.flags.include_topic {
262            // See above, but Bigtable doesn't return the last message read timestamp when polling
263            // for topic messages. Instead, we'll use the explicitly set one we store in the User
264            // record and copy into the WebPushClient struct.
265            topic_resp.timestamp.or(self.current_timestamp)
266        } else {
267            timestamp
268        };
269        trace!(
270            "🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
271            timestamp
272        );
273        #[allow(unused_mut)]
274        let mut timestamp_resp = self
275            .app_state
276            .db
277            .fetch_timestamp_messages(&self.uaid, timestamp, 10)
278            .await?;
279        if !timestamp_resp.messages.is_empty() {
280            trace!(
281                "🗄️ WebPushClient::do_check_storage: Timestamp message returns: {:#?}",
282                timestamp_resp.messages
283            );
284            self.app_state
285                .metrics
286                .count_with_tags(
287                    "notification.message.retrieved",
288                    timestamp_resp.messages.len() as i64,
289                )
290                .with_tag("topic", "false")
291                .send();
292            #[cfg(feature = "reliable_report")]
293            // Since we pulled these from storage, mark them as "retrieved"
294            self.record_state(
295                &mut timestamp_resp.messages,
296                autopush_common::reliability::ReliabilityState::Retrieved,
297            )
298            .await;
299        }
300
301        Ok(CheckStorageResponse {
302            include_topic: false,
303            messages: timestamp_resp.messages,
304            // If we didn't get a timestamp off the last query, use the
305            // original value if passed one
306            timestamp: timestamp_resp.timestamp.or(timestamp),
307        })
308    }
309
310    /// Update the user's last Message read timestamp (for timestamp Messages)
311    ///
312    /// Called when a Client's Ack'd all timestamp messages sent to it to move
313    /// the timestamp Messages' "pointer". See
314    /// `AckState::unacked_stored_highest` for further information.
315    pub(super) async fn increment_storage(&mut self) -> Result<(), SMError> {
316        trace!(
317            "🗄️ WebPushClient::increment_storage: unacked_stored_highest: {:?}",
318            self.ack_state.unacked_stored_highest
319        );
320        let Some(timestamp) = self.ack_state.unacked_stored_highest else {
321            return Err(SMErrorKind::Internal(
322                "increment_storage w/ no unacked_stored_highest".to_owned(),
323            )
324            .into());
325        };
326        self.current_timestamp = Some(timestamp);
327        self.app_state
328            .db
329            .increment_storage(&self.uaid, timestamp)
330            .await?;
331        #[cfg(feature = "reliable_report")]
332        {
333            let mut notifs = mem::take(&mut self.ack_state.acked_stored_timestamp_notifs);
334            self.record_state(
335                &mut notifs,
336                autopush_common::reliability::ReliabilityState::Delivered,
337            )
338            .await;
339        }
340        self.flags.increment_storage = false;
341        Ok(())
342    }
343
344    /// Ensure this user hasn't exceeded the maximum allowed number of messages
345    /// read from storage (`Settings::msg_limit`)
346    ///
347    /// Drops the user record and returns the `SMErrorKind::UaidReset` error if
348    /// they have
349    async fn check_msg_limit(&mut self) -> Result<(), SMError> {
350        trace!(
351            "WebPushClient::check_msg_limit: sent_from_storage: {} msg_limit: {}",
352            self.sent_from_storage,
353            self.app_state.settings.msg_limit
354        );
355        if self.sent_from_storage > self.app_state.settings.msg_limit {
356            // Exceeded the max limit of stored messages: drop the user to
357            // trigger a re-register
358            self.app_state
359                .metrics
360                .incr_with_tags(MetricName::UaExpiration)
361                .with_tag("reason", "too_many_messages")
362                .send();
363            self.app_state.db.remove_user(&self.uaid).await?;
364            return Err(SMErrorKind::UaidReset.into());
365        }
366        Ok(())
367    }
368
369    /// Emit metrics for a Notification to be sent to the user
370    fn emit_send_metrics(&self, notif: &Notification, source: &'static str) {
371        let metrics = &self.app_state.metrics;
372        let ua_info = &self.ua_info;
373        metrics
374            .incr_with_tags(MetricName::UaNotificationSent)
375            .with_tag("source", source)
376            .with_tag("topic", &notif.topic.is_some().to_string())
377            .with_tag("os", &ua_info.metrics_os)
378            // TODO: include `internal` if meta is set
379            .send();
380        metrics
381            .count_with_tags(
382                "ua.message_data",
383                notif.data.as_ref().map_or(0, |data| data.len() as i64),
384            )
385            .with_tag("source", source)
386            .with_tag("os", &ua_info.metrics_os)
387            .send();
388    }
389}