autoconnect_ws_sm/identified/
on_server_notif.rs1use std::mem;
2
3use cadence::Counted;
4
5use autoconnect_common::protocol::{ServerMessage, ServerNotification};
6use autopush_common::{
7 db::CheckStorageResponse, metric_name::MetricName, metrics::StatsdClientExt,
8 notification::Notification, util::sec_since_epoch,
9};
10
11use super::WebPushClient;
12use crate::error::{SMError, SMErrorKind};
13
14impl WebPushClient {
15 pub async fn on_server_notif(
23 &mut self,
24 snotif: ServerNotification,
25 ) -> Result<Vec<ServerMessage>, SMError> {
26 match snotif {
27 ServerNotification::Notification(notif) => Ok(vec![self.notif(notif)?]),
28 ServerNotification::CheckStorage => self.check_storage().await,
29 ServerNotification::Disconnect => Err(SMErrorKind::Ghost.into()),
30 }
31 }
32
33 pub fn on_server_notif_shutdown(&mut self, snotif: ServerNotification) {
37 trace!("WebPushClient::on_server_notif_shutdown");
38 if let ServerNotification::Notification(notif) = snotif {
39 self.ack_state.unacked_direct_notifs.push(notif);
40 }
41 }
42
43 fn notif(&mut self, notif: Notification) -> Result<ServerMessage, SMError> {
45 trace!("WebPushClient::notif Sending a direct notif");
46 let response = notif.clone();
49 if notif.ttl != 0 {
50 self.ack_state.unacked_direct_notifs.push(notif);
53 }
54 self.emit_send_metrics(&response, "Direct");
55 Ok(ServerMessage::Notification(response))
56 }
57
58 pub(super) async fn check_storage(&mut self) -> Result<Vec<ServerMessage>, SMError> {
63 trace!("🗄️ WebPushClient::check_storage");
64 self.flags.check_storage = true;
65 self.flags.include_topic = true;
66 self.check_storage_loop().await
67 }
68
69 pub(super) async fn check_storage_loop(&mut self) -> Result<Vec<ServerMessage>, SMError> {
74 trace!("🗄️ WebPushClient::check_storage_loop");
75 while self.flags.check_storage {
76 let smsgs = self.check_storage_advance().await?;
77 if !smsgs.is_empty() {
78 self.check_msg_limit().await?;
79 return Ok(smsgs);
80 }
81 }
82 if self.flags.increment_storage {
86 debug!("🗄️ WebPushClient::check_storage_loop increment_storage");
87 self.increment_storage().await?;
88 }
89 Ok(vec![])
90 }
91
92 async fn check_storage_advance(&mut self) -> Result<Vec<ServerMessage>, SMError> {
98 trace!("🗄️ WebPushClient::check_storage_advance");
99 let CheckStorageResponse {
100 include_topic,
101 mut messages,
102 timestamp,
103 } = self.do_check_storage().await?;
104
105 debug!(
106 "🗄️ WebPushClient::check_storage_advance \
107 include_topic: {} -> {} \
108 unacked_stored_highest: {:?} -> {:?}",
109 self.flags.include_topic,
110 include_topic,
111 self.ack_state.unacked_stored_highest,
112 timestamp
113 );
114 self.flags.include_topic = include_topic;
115 self.ack_state.unacked_stored_highest = timestamp;
116
117 if messages.is_empty() {
118 trace!("🗄️ WebPushClient::check_storage_advance finished");
119 self.flags.check_storage = false;
120 self.sent_from_storage = 0;
121 return Ok(vec![]);
122 }
123
124 let now_sec = sec_since_epoch();
126 let mut expired_messages = vec![];
128 messages.retain(|msg| {
131 if !msg.expired(now_sec) {
132 return true;
133 }
134 if msg.sortkey_timestamp.is_none() {
135 expired_messages.push(msg.clone());
136 }
137 false
138 });
139 #[allow(unused_mut)]
141 for mut msg in expired_messages {
142 let chidmessageid = msg.chidmessageid();
143 trace!("🉑 removing expired topic chidmessageid: {chidmessageid}");
144 self.app_state
145 .db
146 .remove_message(&self.uaid, &chidmessageid)
147 .await?;
148 #[cfg(feature = "reliable_report")]
149 msg.record_reliability(
150 &self.app_state.reliability,
151 autopush_common::reliability::ReliabilityState::Expired,
152 )
153 .await;
154 }
155
156 self.flags.increment_storage = !include_topic && timestamp.is_some();
157
158 if messages.is_empty() {
159 trace!("🗄️ WebPushClient::check_storage_advance empty response (filtered expired)");
160 return Ok(vec![]);
161 }
162
163 self.ack_state
164 .unacked_stored_notifs
165 .extend(messages.iter().cloned());
166 let smsgs: Vec<_> = messages
167 .into_iter()
168 .inspect(|msg| {
169 trace!("🗄️ WebPushClient::check_storage_advance Sending stored");
170 self.emit_send_metrics(msg, "Stored")
171 })
172 .map(ServerMessage::Notification)
173 .collect();
174
175 let count = smsgs.len() as u32;
176 debug!(
177 "🗄️ WebPushClient::check_storage_advance: sent_from_storage: {}, +{}",
178 self.sent_from_storage, count
179 );
180 self.sent_from_storage += count;
181 Ok(smsgs)
182 }
183
184 #[cfg(feature = "reliable_report")]
185 async fn record_state(
187 &self,
188 messages: &mut Vec<Notification>,
189 state: autopush_common::reliability::ReliabilityState,
190 ) {
191 for message in messages {
194 message
195 .record_reliability(&self.app_state.reliability, state)
196 .await;
197 }
198 }
199
200 async fn do_check_storage(&self) -> Result<CheckStorageResponse, SMError> {
211 let timestamp = self
213 .ack_state
214 .unacked_stored_highest
215 .or(self.current_timestamp);
216 trace!("🗄️ WebPushClient::do_check_storage {:?}", ×tamp);
217 let topic_resp = if self.flags.include_topic {
221 trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
222 #[allow(unused_mut)]
224 let mut messages = self
225 .app_state
226 .db
227 .fetch_topic_messages(&self.uaid, 11)
228 .await?;
229 #[cfg(feature = "reliable_report")]
230 self.record_state(
232 &mut messages.messages,
233 autopush_common::reliability::ReliabilityState::Retrieved,
234 )
235 .await;
236 messages
237 } else {
238 Default::default()
239 };
240 if !topic_resp.messages.is_empty() {
242 trace!(
243 "🗄️ WebPushClient::do_check_storage: Topic message returns: {:#?}",
244 topic_resp.messages
245 );
246 self.app_state
247 .metrics
248 .count_with_tags(
249 "notification.message.retrieved",
250 topic_resp.messages.len() as i64,
251 )
252 .with_tag("topic", "true")
253 .send();
254 return Ok(CheckStorageResponse {
255 include_topic: true,
256 messages: topic_resp.messages,
257 timestamp: topic_resp.timestamp,
258 });
259 }
260 let timestamp = if self.flags.include_topic {
262 topic_resp.timestamp.or(self.current_timestamp)
266 } else {
267 timestamp
268 };
269 trace!(
270 "🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
271 timestamp
272 );
273 #[allow(unused_mut)]
274 let mut timestamp_resp = self
275 .app_state
276 .db
277 .fetch_timestamp_messages(&self.uaid, timestamp, 10)
278 .await?;
279 if !timestamp_resp.messages.is_empty() {
280 trace!(
281 "🗄️ WebPushClient::do_check_storage: Timestamp message returns: {:#?}",
282 timestamp_resp.messages
283 );
284 self.app_state
285 .metrics
286 .count_with_tags(
287 "notification.message.retrieved",
288 timestamp_resp.messages.len() as i64,
289 )
290 .with_tag("topic", "false")
291 .send();
292 #[cfg(feature = "reliable_report")]
293 self.record_state(
295 &mut timestamp_resp.messages,
296 autopush_common::reliability::ReliabilityState::Retrieved,
297 )
298 .await;
299 }
300
301 Ok(CheckStorageResponse {
302 include_topic: false,
303 messages: timestamp_resp.messages,
304 timestamp: timestamp_resp.timestamp.or(timestamp),
307 })
308 }
309
310 pub(super) async fn increment_storage(&mut self) -> Result<(), SMError> {
316 trace!(
317 "🗄️ WebPushClient::increment_storage: unacked_stored_highest: {:?}",
318 self.ack_state.unacked_stored_highest
319 );
320 let Some(timestamp) = self.ack_state.unacked_stored_highest else {
321 return Err(SMErrorKind::Internal(
322 "increment_storage w/ no unacked_stored_highest".to_owned(),
323 )
324 .into());
325 };
326 self.current_timestamp = Some(timestamp);
327 self.app_state
328 .db
329 .increment_storage(&self.uaid, timestamp)
330 .await?;
331 #[cfg(feature = "reliable_report")]
332 {
333 let mut notifs = mem::take(&mut self.ack_state.acked_stored_timestamp_notifs);
334 self.record_state(
335 &mut notifs,
336 autopush_common::reliability::ReliabilityState::Delivered,
337 )
338 .await;
339 }
340 self.flags.increment_storage = false;
341 Ok(())
342 }
343
344 async fn check_msg_limit(&mut self) -> Result<(), SMError> {
350 trace!(
351 "WebPushClient::check_msg_limit: sent_from_storage: {} msg_limit: {}",
352 self.sent_from_storage,
353 self.app_state.settings.msg_limit
354 );
355 if self.sent_from_storage > self.app_state.settings.msg_limit {
356 self.app_state
359 .metrics
360 .incr_with_tags(MetricName::UaExpiration)
361 .with_tag("reason", "too_many_messages")
362 .send();
363 self.app_state.db.remove_user(&self.uaid).await?;
364 return Err(SMErrorKind::UaidReset.into());
365 }
366 Ok(())
367 }
368
369 fn emit_send_metrics(&self, notif: &Notification, source: &'static str) {
371 let metrics = &self.app_state.metrics;
372 let ua_info = &self.ua_info;
373 metrics
374 .incr_with_tags(MetricName::UaNotificationSent)
375 .with_tag("source", source)
376 .with_tag("topic", ¬if.topic.is_some().to_string())
377 .with_tag("os", &ua_info.metrics_os)
378 .send();
380 metrics
381 .count_with_tags(
382 "ua.message_data",
383 notif.data.as_ref().map_or(0, |data| data.len() as i64),
384 )
385 .with_tag("source", source)
386 .with_tag("os", &ua_info.metrics_os)
387 .send();
388 }
389}