autoconnect_ws_sm/identified/
on_server_notif.rs1#[cfg(feature = "reliable_report")]
2use std::mem;
3
4use cadence::Counted;
5
6use autoconnect_common::protocol::{ServerMessage, ServerNotification};
7use autopush_common::{
8 db::CheckStorageResponse, metric_name::MetricName, metrics::StatsdClientExt,
9 notification::Notification, util::sec_since_epoch,
10};
11
12use super::WebPushClient;
13use crate::error::{SMError, SMErrorKind};
14
15impl WebPushClient {
16 pub async fn on_server_notif(
24 &mut self,
25 snotif: ServerNotification,
26 ) -> Result<Vec<ServerMessage>, SMError> {
27 match snotif {
28 ServerNotification::Notification(notif) => Ok(vec![self.notif(notif)?]),
29 ServerNotification::CheckStorage => self.check_storage().await,
30 ServerNotification::Disconnect => Err(SMErrorKind::Ghost.into()),
31 }
32 }
33
34 pub fn on_server_notif_shutdown(&mut self, snotif: ServerNotification) {
38 trace!("WebPushClient::on_server_notif_shutdown");
39 if let ServerNotification::Notification(notif) = snotif {
40 self.ack_state.unacked_direct_notifs.push(notif);
41 }
42 }
43
44 fn notif(&mut self, notif: Notification) -> Result<ServerMessage, SMError> {
46 trace!("WebPushClient::notif Sending a direct notif");
47 let response = notif.clone();
50 if notif.ttl != 0 {
51 self.ack_state.unacked_direct_notifs.push(notif);
54 }
55 self.emit_send_metrics(&response, "Direct");
56 Ok(ServerMessage::Notification(response))
57 }
58
59 pub(super) async fn check_storage(&mut self) -> Result<Vec<ServerMessage>, SMError> {
64 trace!("🗄️ WebPushClient::check_storage");
65 self.flags.check_storage = true;
66 self.flags.include_topic = true;
67 self.check_storage_loop().await
68 }
69
70 pub(super) async fn check_storage_loop(&mut self) -> Result<Vec<ServerMessage>, SMError> {
75 trace!("🗄️ WebPushClient::check_storage_loop");
76 while self.flags.check_storage {
77 let smsgs = self.check_storage_advance().await?;
78 if !smsgs.is_empty() {
79 self.check_msg_limit().await?;
80 return Ok(smsgs);
81 }
82 }
83 if self.flags.increment_storage {
87 debug!("🗄️ WebPushClient::check_storage_loop increment_storage");
88 self.increment_storage().await?;
89 }
90 Ok(vec![])
91 }
92
93 async fn check_storage_advance(&mut self) -> Result<Vec<ServerMessage>, SMError> {
99 trace!("🗄️ WebPushClient::check_storage_advance");
100 let CheckStorageResponse {
101 include_topic,
102 mut messages,
103 timestamp,
104 } = self.do_check_storage().await?;
105
106 debug!(
107 "🗄️ WebPushClient::check_storage_advance \
108 include_topic: {} -> {} \
109 unacked_stored_highest: {:?} -> {:?}",
110 self.flags.include_topic,
111 include_topic,
112 self.ack_state.unacked_stored_highest,
113 timestamp
114 );
115 self.flags.include_topic = include_topic;
116 self.ack_state.unacked_stored_highest = timestamp;
117
118 if messages.is_empty() {
119 trace!("🗄️ WebPushClient::check_storage_advance finished");
120 self.flags.check_storage = false;
121 self.sent_from_storage = 0;
122 return Ok(vec![]);
123 }
124
125 let now_sec = sec_since_epoch();
127 let mut expired_messages = vec![];
129 messages.retain(|msg| {
132 if !msg.expired(now_sec) {
133 return true;
134 }
135 if msg.sortkey_timestamp.is_none() {
136 expired_messages.push(msg.clone());
137 }
138 false
139 });
140 #[allow(unused_mut)]
142 for mut msg in expired_messages {
143 let chidmessageid = msg.chidmessageid();
144 trace!("🉑 removing expired topic chidmessageid: {chidmessageid}");
145 self.app_state
146 .db
147 .remove_message(&self.uaid, &chidmessageid)
148 .await?;
149 #[cfg(feature = "reliable_report")]
150 msg.record_reliability(
151 &self.app_state.reliability,
152 autopush_common::reliability::ReliabilityState::Expired,
153 )
154 .await;
155 }
156
157 self.flags.increment_storage = !include_topic && timestamp.is_some();
158
159 if messages.is_empty() {
160 trace!("🗄️ WebPushClient::check_storage_advance empty response (filtered expired)");
161 return Ok(vec![]);
162 }
163
164 self.ack_state
165 .unacked_stored_notifs
166 .extend(messages.iter().cloned());
167 let smsgs: Vec<_> = messages
168 .into_iter()
169 .inspect(|msg| {
170 trace!("🗄️ WebPushClient::check_storage_advance Sending stored");
171 self.emit_send_metrics(msg, "Stored")
172 })
173 .map(ServerMessage::Notification)
174 .collect();
175
176 let count = smsgs.len() as u32;
177 debug!(
178 "🗄️ WebPushClient::check_storage_advance: sent_from_storage: {}, +{}",
179 self.sent_from_storage, count
180 );
181 self.sent_from_storage += count;
182 Ok(smsgs)
183 }
184
185 #[cfg(feature = "reliable_report")]
186 async fn record_state(
188 &self,
189 messages: &mut Vec<Notification>,
190 state: autopush_common::reliability::ReliabilityState,
191 ) {
192 for message in messages {
195 message
196 .record_reliability(&self.app_state.reliability, state)
197 .await;
198 }
199 }
200
201 async fn do_check_storage(&self) -> Result<CheckStorageResponse, SMError> {
212 let timestamp = self
214 .ack_state
215 .unacked_stored_highest
216 .or(self.current_timestamp);
217 trace!("🗄️ WebPushClient::do_check_storage {:?}", ×tamp);
218 let topic_resp = if self.flags.include_topic {
222 trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
223 #[allow(unused_mut)]
225 let mut messages = self
226 .app_state
227 .db
228 .fetch_topic_messages(&self.uaid, 11)
229 .await?;
230 #[cfg(feature = "reliable_report")]
231 self.record_state(
233 &mut messages.messages,
234 autopush_common::reliability::ReliabilityState::Retrieved,
235 )
236 .await;
237 messages
238 } else {
239 Default::default()
240 };
241 if !topic_resp.messages.is_empty() {
243 trace!(
244 "🗄️ WebPushClient::do_check_storage: Topic message returns: {:#?}",
245 topic_resp.messages
246 );
247 self.app_state
248 .metrics
249 .count_with_tags(
250 "notification.message.retrieved",
251 topic_resp.messages.len() as i64,
252 )
253 .with_tag("topic", "true")
254 .send();
255 return Ok(CheckStorageResponse {
256 include_topic: true,
257 messages: topic_resp.messages,
258 timestamp: topic_resp.timestamp,
259 });
260 }
261 let timestamp = if self.flags.include_topic {
263 topic_resp.timestamp.or(self.current_timestamp)
267 } else {
268 timestamp
269 };
270 trace!(
271 "🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
272 timestamp
273 );
274 #[allow(unused_mut)]
275 let mut timestamp_resp = self
276 .app_state
277 .db
278 .fetch_timestamp_messages(&self.uaid, timestamp, 10)
279 .await?;
280 if !timestamp_resp.messages.is_empty() {
281 trace!(
282 "🗄️ WebPushClient::do_check_storage: Timestamp message returns: {:#?}",
283 timestamp_resp.messages
284 );
285 self.app_state
286 .metrics
287 .count_with_tags(
288 "notification.message.retrieved",
289 timestamp_resp.messages.len() as i64,
290 )
291 .with_tag("topic", "false")
292 .send();
293 #[cfg(feature = "reliable_report")]
294 self.record_state(
296 &mut timestamp_resp.messages,
297 autopush_common::reliability::ReliabilityState::Retrieved,
298 )
299 .await;
300 }
301
302 Ok(CheckStorageResponse {
303 include_topic: false,
304 messages: timestamp_resp.messages,
305 timestamp: timestamp_resp.timestamp.or(timestamp),
308 })
309 }
310
311 pub(super) async fn increment_storage(&mut self) -> Result<(), SMError> {
317 trace!(
318 "🗄️ WebPushClient::increment_storage: unacked_stored_highest: {:?}",
319 self.ack_state.unacked_stored_highest
320 );
321 let Some(timestamp) = self.ack_state.unacked_stored_highest else {
322 return Err(SMErrorKind::Internal(
323 "increment_storage w/ no unacked_stored_highest".to_owned(),
324 )
325 .into());
326 };
327 self.current_timestamp = Some(timestamp);
328 self.app_state
329 .db
330 .increment_storage(&self.uaid, timestamp)
331 .await?;
332 #[cfg(feature = "reliable_report")]
333 {
334 let mut notifs = mem::take(&mut self.ack_state.acked_stored_timestamp_notifs);
335 self.record_state(
336 &mut notifs,
337 autopush_common::reliability::ReliabilityState::Delivered,
338 )
339 .await;
340 }
341 self.flags.increment_storage = false;
342 Ok(())
343 }
344
345 async fn check_msg_limit(&mut self) -> Result<(), SMError> {
351 trace!(
352 "WebPushClient::check_msg_limit: sent_from_storage: {} msg_limit: {}",
353 self.sent_from_storage,
354 self.app_state.settings.msg_limit
355 );
356 if self.sent_from_storage > self.app_state.settings.msg_limit {
357 self.app_state
360 .metrics
361 .incr_with_tags(MetricName::UaExpiration)
362 .with_tag("reason", "too_many_messages")
363 .send();
364 self.app_state.db.remove_user(&self.uaid).await?;
365 return Err(SMErrorKind::UaidReset.into());
366 }
367 Ok(())
368 }
369
370 fn emit_send_metrics(&self, notif: &Notification, source: &'static str) {
372 let metrics = &self.app_state.metrics;
373 let ua_info = &self.ua_info;
374 metrics
375 .incr_with_tags(MetricName::UaNotificationSent)
376 .with_tag("source", source)
377 .with_tag("topic", ¬if.topic.is_some().to_string())
378 .with_tag("os", &ua_info.metrics_os)
379 .send();
381 metrics
382 .count_with_tags(
383 "ua.message_data",
384 notif.data.as_ref().map_or(0, |data| data.len() as i64),
385 )
386 .with_tag("source", source)
387 .with_tag("os", &ua_info.metrics_os)
388 .send();
389 }
390}