autoconnect_ws_sm/identified/
on_server_notif.rs1#[cfg(feature = "reliable_report")]
2use std::mem;
3
4use cadence::Counted;
5
6use autoconnect_common::protocol::{ServerMessage, ServerNotification};
7use autopush_common::{
8 db::CheckStorageResponse, metric_name::MetricName, metrics::StatsdClientExt,
9 notification::Notification, util::sec_since_epoch,
10};
11
12use super::WebPushClient;
13use crate::error::{SMError, SMErrorKind};
14
15impl WebPushClient {
16 pub async fn on_server_notif(
24 &mut self,
25 snotif: ServerNotification,
26 ) -> Result<Vec<ServerMessage>, SMError> {
27 match snotif {
28 ServerNotification::Notification(notif) => Ok(vec![self.notif(notif)?]),
29 ServerNotification::CheckStorage => self.check_storage().await,
30 ServerNotification::Disconnect => Err(SMErrorKind::Ghost.into()),
31 }
32 }
33
34 pub fn on_server_notif_shutdown(&mut self, snotif: ServerNotification) {
38 trace!("WebPushClient::on_server_notif_shutdown");
39 if let ServerNotification::Notification(notif) = snotif {
40 let key = notif.version.clone();
41 self.ack_state.unacked_direct_notifs.insert(key, notif);
42 }
43 }
44
45 fn notif(&mut self, notif: Notification) -> Result<ServerMessage, SMError> {
47 trace!("WebPushClient::notif Sending a direct notif");
48 let response = notif.clone();
51 if notif.ttl != 0 {
52 let key = notif.version.clone();
55 self.ack_state.unacked_direct_notifs.insert(key, notif);
56 }
57 self.emit_send_metrics(&response, "Direct");
58 Ok(ServerMessage::Notification(response))
59 }
60
61 pub(super) async fn check_storage(&mut self) -> Result<Vec<ServerMessage>, SMError> {
66 trace!("🗄️ WebPushClient::check_storage");
67 self.flags.check_storage = true;
68 self.flags.include_topic = true;
69 self.check_storage_loop().await
70 }
71
72 pub(super) async fn check_storage_loop(&mut self) -> Result<Vec<ServerMessage>, SMError> {
77 trace!("🗄️ WebPushClient::check_storage_loop");
78 while self.flags.check_storage {
79 let smsgs = self.check_storage_advance().await?;
80 if !smsgs.is_empty() {
81 self.check_msg_limit().await?;
82 return Ok(smsgs);
83 }
84 }
85 if self.flags.increment_storage {
89 debug!("🗄️ WebPushClient::check_storage_loop increment_storage");
90 self.increment_storage().await?;
91 }
92 Ok(vec![])
93 }
94
95 async fn check_storage_advance(&mut self) -> Result<Vec<ServerMessage>, SMError> {
101 trace!("🗄️ WebPushClient::check_storage_advance");
102 let CheckStorageResponse {
103 include_topic,
104 mut messages,
105 timestamp,
106 } = self.do_check_storage().await?;
107
108 debug!(
109 "🗄️ WebPushClient::check_storage_advance \
110 include_topic: {} -> {} \
111 unacked_stored_highest: {:?} -> {:?}",
112 self.flags.include_topic,
113 include_topic,
114 self.ack_state.unacked_stored_highest,
115 timestamp
116 );
117 self.flags.include_topic = include_topic;
118 self.ack_state.unacked_stored_highest = timestamp;
119
120 if messages.is_empty() {
121 trace!("🗄️ WebPushClient::check_storage_advance finished");
122 self.flags.check_storage = false;
123 self.sent_from_storage = 0;
124 return Ok(vec![]);
125 }
126
127 let now_sec = sec_since_epoch();
129 let mut expired_messages = vec![];
131 messages.retain(|msg| {
134 if !msg.expired(now_sec) {
135 return true;
136 }
137 if msg.sortkey_timestamp.is_none() {
138 expired_messages.push(msg.clone());
139 }
140 false
141 });
142 #[allow(unused_mut)]
144 for mut msg in expired_messages {
145 let chidmessageid = msg.chidmessageid();
146 trace!("🉑 removing expired topic chidmessageid: {chidmessageid}");
147 self.app_state
148 .db
149 .remove_message(&self.uaid, &chidmessageid)
150 .await?;
151 #[cfg(feature = "reliable_report")]
152 msg.record_reliability(
153 &self.app_state.reliability,
154 autopush_common::reliability::ReliabilityState::Expired,
155 )
156 .await;
157 }
158
159 self.flags.increment_storage = !include_topic && timestamp.is_some();
160
161 if messages.is_empty() {
162 trace!("🗄️ WebPushClient::check_storage_advance empty response (filtered expired)");
163 return Ok(vec![]);
164 }
165
166 for msg in messages.iter() {
167 let key = msg.version.clone();
168 self.ack_state
169 .unacked_stored_notifs
170 .insert(key, msg.clone());
171 }
172 let smsgs: Vec<_> = messages
173 .into_iter()
174 .inspect(|msg| {
175 trace!("🗄️ WebPushClient::check_storage_advance Sending stored");
176 self.emit_send_metrics(msg, "Stored")
177 })
178 .map(ServerMessage::Notification)
179 .collect();
180
181 let count = smsgs.len() as u32;
182 debug!(
183 "🗄️ WebPushClient::check_storage_advance: sent_from_storage: {}, +{}",
184 self.sent_from_storage, count
185 );
186 self.sent_from_storage += count;
187 Ok(smsgs)
188 }
189
190 #[cfg(feature = "reliable_report")]
191 async fn record_state(
193 &self,
194 messages: &mut Vec<Notification>,
195 state: autopush_common::reliability::ReliabilityState,
196 ) {
197 for message in messages {
200 message
201 .record_reliability(&self.app_state.reliability, state)
202 .await;
203 }
204 }
205
206 async fn do_check_storage(&self) -> Result<CheckStorageResponse, SMError> {
217 let timestamp = self
219 .ack_state
220 .unacked_stored_highest
221 .or(self.current_timestamp);
222 trace!("🗄️ WebPushClient::do_check_storage {:?}", ×tamp);
223 let topic_resp = if self.flags.include_topic {
227 trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
228 #[allow(unused_mut)]
230 let mut messages = self
231 .app_state
232 .db
233 .fetch_topic_messages(&self.uaid, 11)
234 .await?;
235 #[cfg(feature = "reliable_report")]
236 self.record_state(
238 &mut messages.messages,
239 autopush_common::reliability::ReliabilityState::Retrieved,
240 )
241 .await;
242 messages
243 } else {
244 Default::default()
245 };
246 if !topic_resp.messages.is_empty() {
248 trace!(
249 "🗄️ WebPushClient::do_check_storage: Topic message returns: {:#?}",
250 topic_resp.messages
251 );
252 self.app_state
253 .metrics
254 .count_with_tags(
255 "notification.message.retrieved",
256 topic_resp.messages.len() as i64,
257 )
258 .with_tag("topic", "true")
259 .send();
260 return Ok(CheckStorageResponse {
261 include_topic: true,
262 messages: topic_resp.messages,
263 timestamp: topic_resp.timestamp,
264 });
265 }
266 let timestamp = if self.flags.include_topic {
268 topic_resp.timestamp.or(self.current_timestamp)
272 } else {
273 timestamp
274 };
275 trace!(
276 "🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
277 timestamp
278 );
279 #[allow(unused_mut)]
280 let mut timestamp_resp = self
281 .app_state
282 .db
283 .fetch_timestamp_messages(&self.uaid, timestamp, 10)
284 .await?;
285 if !timestamp_resp.messages.is_empty() {
286 trace!(
287 "🗄️ WebPushClient::do_check_storage: Timestamp message returns: {:#?}",
288 timestamp_resp.messages
289 );
290 self.app_state
291 .metrics
292 .count_with_tags(
293 "notification.message.retrieved",
294 timestamp_resp.messages.len() as i64,
295 )
296 .with_tag("topic", "false")
297 .send();
298 #[cfg(feature = "reliable_report")]
299 self.record_state(
301 &mut timestamp_resp.messages,
302 autopush_common::reliability::ReliabilityState::Retrieved,
303 )
304 .await;
305 }
306
307 Ok(CheckStorageResponse {
308 include_topic: false,
309 messages: timestamp_resp.messages,
310 timestamp: timestamp_resp.timestamp.or(timestamp),
313 })
314 }
315
316 pub(super) async fn increment_storage(&mut self) -> Result<(), SMError> {
322 trace!(
323 "🗄️ WebPushClient::increment_storage: unacked_stored_highest: {:?}",
324 self.ack_state.unacked_stored_highest
325 );
326 let Some(timestamp) = self.ack_state.unacked_stored_highest else {
327 return Err(SMErrorKind::Internal(
328 "increment_storage w/ no unacked_stored_highest".to_owned(),
329 )
330 .into());
331 };
332 self.current_timestamp = Some(timestamp);
333 self.app_state
334 .db
335 .increment_storage(&self.uaid, timestamp)
336 .await?;
337 #[cfg(feature = "reliable_report")]
338 {
339 let mut notifs = mem::take(&mut self.ack_state.acked_stored_timestamp_notifs);
340 self.record_state(
341 &mut notifs,
342 autopush_common::reliability::ReliabilityState::Delivered,
343 )
344 .await;
345 }
346 self.flags.increment_storage = false;
347 Ok(())
348 }
349
350 async fn check_msg_limit(&mut self) -> Result<(), SMError> {
356 trace!(
357 "WebPushClient::check_msg_limit: sent_from_storage: {} msg_limit: {}",
358 self.sent_from_storage,
359 self.app_state.settings.msg_limit
360 );
361 if self.sent_from_storage > self.app_state.settings.msg_limit {
362 self.app_state
365 .metrics
366 .incr_with_tags(MetricName::UaExpiration)
367 .with_tag("reason", "too_many_messages")
368 .send();
369 self.app_state.db.remove_user(&self.uaid).await?;
370 return Err(SMErrorKind::UaidReset.into());
371 }
372 Ok(())
373 }
374
375 fn emit_send_metrics(&self, notif: &Notification, source: &'static str) {
377 let metrics = &self.app_state.metrics;
378 let ua_info = &self.ua_info;
379 metrics
380 .incr_with_tags(MetricName::UaNotificationSent)
381 .with_tag("source", source)
382 .with_tag("topic", ¬if.topic.is_some().to_string())
383 .with_tag("os", &ua_info.metrics_os)
384 .send();
386 metrics
387 .count_with_tags(
388 "ua.message_data",
389 notif.data.as_ref().map_or(0, |data| data.len() as i64),
390 )
391 .with_tag("source", source)
392 .with_tag("os", &ua_info.metrics_os)
393 .send();
394 }
395}