autoconnect_ws_sm/identified/
on_server_notif.rs

1#[cfg(feature = "reliable_report")]
2use std::mem;
3
4use cadence::Counted;
5
6use autoconnect_common::protocol::{ServerMessage, ServerNotification};
7use autopush_common::{
8    db::CheckStorageResponse, metric_name::MetricName, metrics::StatsdClientExt,
9    notification::Notification, util::sec_since_epoch,
10};
11
12use super::WebPushClient;
13use crate::error::{SMError, SMErrorKind};
14
15impl WebPushClient {
16    /// Handle a `ServerNotification` for this user
17    ///
18    /// `ServerNotification::Disconnect` is emitted by the same autoconnect
19    /// node receiving it when a User has logged into that same node twice to
20    /// "Ghost" (disconnect) the first user's session for its second session.
21    ///
22    /// Other variants are emitted by autoendpoint
23    pub async fn on_server_notif(
24        &mut self,
25        snotif: ServerNotification,
26    ) -> Result<Vec<ServerMessage>, SMError> {
27        match snotif {
28            ServerNotification::Notification(notif) => Ok(vec![self.notif(notif)?]),
29            ServerNotification::CheckStorage => self.check_storage().await,
30            ServerNotification::Disconnect => Err(SMErrorKind::Ghost.into()),
31        }
32    }
33
34    /// After disconnecting from the `ClientRegistry`, moves any queued Direct
35    /// Push Notifications to unacked_direct_notifs (to be stored in the db on
36    /// `shutdown`)
37    pub fn on_server_notif_shutdown(&mut self, snotif: ServerNotification) {
38        trace!("WebPushClient::on_server_notif_shutdown");
39        if let ServerNotification::Notification(notif) = snotif {
40            self.ack_state.unacked_direct_notifs.push(notif);
41        }
42    }
43
44    /// Send a Direct Push Notification to this user
45    fn notif(&mut self, notif: Notification) -> Result<ServerMessage, SMError> {
46        trace!("WebPushClient::notif Sending a direct notif");
47        // The notification we return here is sent directly to the client.
48        // No reliability state is recorded.
49        let response = notif.clone();
50        if notif.ttl != 0 {
51            // Consume the original notification by adding it to the
52            // unacked stack. This will eventually record the state.
53            self.ack_state.unacked_direct_notifs.push(notif);
54        }
55        self.emit_send_metrics(&response, "Direct");
56        Ok(ServerMessage::Notification(response))
57    }
58
59    /// Top level read of Push Notifications from storage
60    ///
61    /// Initializes the top level `check_storage` and `include_topic` flags and
62    /// runs `check_storage_loop`
63    pub(super) async fn check_storage(&mut self) -> Result<Vec<ServerMessage>, SMError> {
64        trace!("🗄️ WebPushClient::check_storage");
65        self.flags.check_storage = true;
66        self.flags.include_topic = true;
67        self.check_storage_loop().await
68    }
69
70    /// Loop the read of Push Notifications from storage
71    ///
72    /// Loops until any unexpired Push Notifications are read or there's no
73    /// more Notifications in storage
74    pub(super) async fn check_storage_loop(&mut self) -> Result<Vec<ServerMessage>, SMError> {
75        trace!("🗄️ WebPushClient::check_storage_loop");
76        while self.flags.check_storage {
77            let smsgs = self.check_storage_advance().await?;
78            if !smsgs.is_empty() {
79                self.check_msg_limit().await?;
80                return Ok(smsgs);
81            }
82        }
83        // No more notifications (check_storage = false). Despite returning no
84        // notifs we may have advanced through expired timestamp messages and
85        // need to increment_storage to mark them as deleted
86        if self.flags.increment_storage {
87            debug!("🗄️ WebPushClient::check_storage_loop increment_storage");
88            self.increment_storage().await?;
89        }
90        Ok(vec![])
91    }
92
93    /// Read a chunk (max count 10 returned) of Notifications from storage
94    ///
95    /// This filters out expired Notifications and may return an empty result
96    /// set when there's still pending Notifications to be read: so it should
97    /// be called in a loop to advance through all Notification records
98    async fn check_storage_advance(&mut self) -> Result<Vec<ServerMessage>, SMError> {
99        trace!("🗄️ WebPushClient::check_storage_advance");
100        let CheckStorageResponse {
101            include_topic,
102            mut messages,
103            timestamp,
104        } = self.do_check_storage().await?;
105
106        debug!(
107            "🗄️ WebPushClient::check_storage_advance \
108                 include_topic: {} -> {} \
109                 unacked_stored_highest: {:?} -> {:?}",
110            self.flags.include_topic,
111            include_topic,
112            self.ack_state.unacked_stored_highest,
113            timestamp
114        );
115        self.flags.include_topic = include_topic;
116        self.ack_state.unacked_stored_highest = timestamp;
117
118        if messages.is_empty() {
119            trace!("🗄️ WebPushClient::check_storage_advance finished");
120            self.flags.check_storage = false;
121            self.sent_from_storage = 0;
122            return Ok(vec![]);
123        }
124
125        // Filter out TTL expired messages
126        let now_sec = sec_since_epoch();
127        // Topic messages require immediate deletion from the db
128        let mut expired_messages = vec![];
129        // NOTE: Vec::extract_if (stabilizing soon) can negate the need for the
130        // inner msg.clone()
131        messages.retain(|msg| {
132            if !msg.expired(now_sec) {
133                return true;
134            }
135            if msg.sortkey_timestamp.is_none() {
136                expired_messages.push(msg.clone());
137            }
138            false
139        });
140        // TODO: A batch remove_messages would be nicer
141        #[allow(unused_mut)]
142        for mut msg in expired_messages {
143            let chidmessageid = msg.chidmessageid();
144            trace!("🉑 removing expired topic chidmessageid: {chidmessageid}");
145            self.app_state
146                .db
147                .remove_message(&self.uaid, &chidmessageid)
148                .await?;
149            #[cfg(feature = "reliable_report")]
150            msg.record_reliability(
151                &self.app_state.reliability,
152                autopush_common::reliability::ReliabilityState::Expired,
153            )
154            .await;
155        }
156
157        self.flags.increment_storage = !include_topic && timestamp.is_some();
158
159        if messages.is_empty() {
160            trace!("🗄️ WebPushClient::check_storage_advance empty response (filtered expired)");
161            return Ok(vec![]);
162        }
163
164        self.ack_state
165            .unacked_stored_notifs
166            .extend(messages.iter().cloned());
167        let smsgs: Vec<_> = messages
168            .into_iter()
169            .inspect(|msg| {
170                trace!("🗄️ WebPushClient::check_storage_advance Sending stored");
171                self.emit_send_metrics(msg, "Stored")
172            })
173            .map(ServerMessage::Notification)
174            .collect();
175
176        let count = smsgs.len() as u32;
177        debug!(
178            "🗄️ WebPushClient::check_storage_advance: sent_from_storage: {}, +{}",
179            self.sent_from_storage, count
180        );
181        self.sent_from_storage += count;
182        Ok(smsgs)
183    }
184
185    #[cfg(feature = "reliable_report")]
186    /// Record and transition the state for trackable messages.
187    async fn record_state(
188        &self,
189        messages: &mut Vec<Notification>,
190        state: autopush_common::reliability::ReliabilityState,
191    ) {
192        // *Note* because `.map()` is sync
193        // we can't call the async func without additional hoops.
194        for message in messages {
195            message
196                .record_reliability(&self.app_state.reliability, state)
197                .await;
198        }
199    }
200
201    /// Read a chunk (max count 10 returned) of Notifications from storage
202    ///
203    /// This alternates between reading Topic Notifications and Timestamp
204    /// Notifications which are stored separately in storage.
205    ///
206    /// Topic Messages differ in that they replace pending Notifications with
207    /// new ones if they have matching Topic names. They are used when a sender
208    /// desires a scenario where multiple Messages sent to an offline device
209    /// result in the user only seeing the latest Message when the device comes
210    /// online.
211    async fn do_check_storage(&self) -> Result<CheckStorageResponse, SMError> {
212        // start at the latest unacked timestamp or the previous, latest timestamp.
213        let timestamp = self
214            .ack_state
215            .unacked_stored_highest
216            .or(self.current_timestamp);
217        trace!("🗄️ WebPushClient::do_check_storage {:?}", &timestamp);
218        // if we're to include topic messages, do those first.
219        // NOTE: Bigtable can't fetch `current_timestamp`, so we can't rely on
220        // `fetch_topic_messages()` returning a reasonable timestamp.
221        let topic_resp = if self.flags.include_topic {
222            trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
223            // Get the most recent max 11 messages.
224            #[allow(unused_mut)]
225            let mut messages = self
226                .app_state
227                .db
228                .fetch_topic_messages(&self.uaid, 11)
229                .await?;
230            #[cfg(feature = "reliable_report")]
231            // Since we pulled these from storage, mark them as "retrieved"
232            self.record_state(
233                &mut messages.messages,
234                autopush_common::reliability::ReliabilityState::Retrieved,
235            )
236            .await;
237            messages
238        } else {
239            Default::default()
240        };
241        // if we have topic messages...
242        if !topic_resp.messages.is_empty() {
243            trace!(
244                "🗄️ WebPushClient::do_check_storage: Topic message returns: {:#?}",
245                topic_resp.messages
246            );
247            self.app_state
248                .metrics
249                .count_with_tags(
250                    "notification.message.retrieved",
251                    topic_resp.messages.len() as i64,
252                )
253                .with_tag("topic", "true")
254                .send();
255            return Ok(CheckStorageResponse {
256                include_topic: true,
257                messages: topic_resp.messages,
258                timestamp: topic_resp.timestamp,
259            });
260        }
261        // No topic messages, so carry on with normal ones, starting from the latest timestamp.
262        let timestamp = if self.flags.include_topic {
263            // See above, but Bigtable doesn't return the last message read timestamp when polling
264            // for topic messages. Instead, we'll use the explicitly set one we store in the User
265            // record and copy into the WebPushClient struct.
266            topic_resp.timestamp.or(self.current_timestamp)
267        } else {
268            timestamp
269        };
270        trace!(
271            "🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
272            timestamp
273        );
274        #[allow(unused_mut)]
275        let mut timestamp_resp = self
276            .app_state
277            .db
278            .fetch_timestamp_messages(&self.uaid, timestamp, 10)
279            .await?;
280        if !timestamp_resp.messages.is_empty() {
281            trace!(
282                "🗄️ WebPushClient::do_check_storage: Timestamp message returns: {:#?}",
283                timestamp_resp.messages
284            );
285            self.app_state
286                .metrics
287                .count_with_tags(
288                    "notification.message.retrieved",
289                    timestamp_resp.messages.len() as i64,
290                )
291                .with_tag("topic", "false")
292                .send();
293            #[cfg(feature = "reliable_report")]
294            // Since we pulled these from storage, mark them as "retrieved"
295            self.record_state(
296                &mut timestamp_resp.messages,
297                autopush_common::reliability::ReliabilityState::Retrieved,
298            )
299            .await;
300        }
301
302        Ok(CheckStorageResponse {
303            include_topic: false,
304            messages: timestamp_resp.messages,
305            // If we didn't get a timestamp off the last query, use the
306            // original value if passed one
307            timestamp: timestamp_resp.timestamp.or(timestamp),
308        })
309    }
310
311    /// Update the user's last Message read timestamp (for timestamp Messages)
312    ///
313    /// Called when a Client's Ack'd all timestamp messages sent to it to move
314    /// the timestamp Messages' "pointer". See
315    /// `AckState::unacked_stored_highest` for further information.
316    pub(super) async fn increment_storage(&mut self) -> Result<(), SMError> {
317        trace!(
318            "🗄️ WebPushClient::increment_storage: unacked_stored_highest: {:?}",
319            self.ack_state.unacked_stored_highest
320        );
321        let Some(timestamp) = self.ack_state.unacked_stored_highest else {
322            return Err(SMErrorKind::Internal(
323                "increment_storage w/ no unacked_stored_highest".to_owned(),
324            )
325            .into());
326        };
327        self.current_timestamp = Some(timestamp);
328        self.app_state
329            .db
330            .increment_storage(&self.uaid, timestamp)
331            .await?;
332        #[cfg(feature = "reliable_report")]
333        {
334            let mut notifs = mem::take(&mut self.ack_state.acked_stored_timestamp_notifs);
335            self.record_state(
336                &mut notifs,
337                autopush_common::reliability::ReliabilityState::Delivered,
338            )
339            .await;
340        }
341        self.flags.increment_storage = false;
342        Ok(())
343    }
344
345    /// Ensure this user hasn't exceeded the maximum allowed number of messages
346    /// read from storage (`Settings::msg_limit`)
347    ///
348    /// Drops the user record and returns the `SMErrorKind::UaidReset` error if
349    /// they have
350    async fn check_msg_limit(&mut self) -> Result<(), SMError> {
351        trace!(
352            "WebPushClient::check_msg_limit: sent_from_storage: {} msg_limit: {}",
353            self.sent_from_storage,
354            self.app_state.settings.msg_limit
355        );
356        if self.sent_from_storage > self.app_state.settings.msg_limit {
357            // Exceeded the max limit of stored messages: drop the user to
358            // trigger a re-register
359            self.app_state
360                .metrics
361                .incr_with_tags(MetricName::UaExpiration)
362                .with_tag("reason", "too_many_messages")
363                .send();
364            self.app_state.db.remove_user(&self.uaid).await?;
365            return Err(SMErrorKind::UaidReset.into());
366        }
367        Ok(())
368    }
369
370    /// Emit metrics for a Notification to be sent to the user
371    fn emit_send_metrics(&self, notif: &Notification, source: &'static str) {
372        let metrics = &self.app_state.metrics;
373        let ua_info = &self.ua_info;
374        metrics
375            .incr_with_tags(MetricName::UaNotificationSent)
376            .with_tag("source", source)
377            .with_tag("topic", &notif.topic.is_some().to_string())
378            .with_tag("os", &ua_info.metrics_os)
379            // TODO: include `internal` if meta is set
380            .send();
381        metrics
382            .count_with_tags(
383                "ua.message_data",
384                notif.data.as_ref().map_or(0, |data| data.len() as i64),
385            )
386            .with_tag("source", source)
387            .with_tag("os", &ua_info.metrics_os)
388            .send();
389    }
390}