1use std::{fmt, mem, sync::Arc};
2
3use actix_web::rt;
4use cadence::Timed;
5use futures::channel::mpsc;
6use uuid::Uuid;
7
8use autoconnect_common::{
9 broadcast::{Broadcast, BroadcastSubs},
10 protocol::{ServerMessage, ServerNotification},
11};
12
13use autoconnect_settings::{AppState, Settings};
14use autopush_common::{
15 db::User,
16 notification::Notification,
17 util::{ms_since_epoch, user_agent::UserAgentInfo},
18};
19
20use crate::error::{SMError, SMErrorKind};
21
22mod on_client_msg;
23mod on_server_notif;
24
25pub struct WebPushClient {
36 pub uaid: Uuid,
38 pub uid: Uuid,
40 pub ua_info: UserAgentInfo,
42
43 broadcast_subs: BroadcastSubs,
45
46 flags: ClientFlags,
48 ack_state: AckState,
50 sent_from_storage: u32,
53 deferred_add_user: Option<User>,
57
58 stats: SessionStatistics,
60
61 connected_at: u64,
63 last_ping: u64,
65 current_timestamp: Option<u64>,
68
69 app_state: Arc<AppState>,
70}
71
72impl fmt::Debug for WebPushClient {
73 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
74 fmt.debug_struct("WebPushClient")
75 .field("uaid", &self.uaid)
76 .field("uid", &self.uid)
77 .field("ua_info", &self.ua_info)
78 .field("broadcast_subs", &self.broadcast_subs)
79 .field("flags", &self.flags)
80 .field("ack_state", &self.ack_state)
81 .field("sent_from_storage", &self.sent_from_storage)
82 .field("deferred_add_user", &self.deferred_add_user)
83 .field("stats", &self.stats)
84 .field("connected_at", &self.connected_at)
85 .field("last_ping", &self.last_ping)
86 .finish()
87 }
88}
89
90impl WebPushClient {
91 #[allow(clippy::too_many_arguments)]
92 pub async fn new(
93 uaid: Uuid,
94 ua: String,
95 broadcast_subs: BroadcastSubs,
96 flags: ClientFlags,
97 connected_at: u64,
98 current_timestamp: Option<u64>,
99 deferred_add_user: Option<User>,
100 app_state: Arc<AppState>,
101 ) -> Result<(Self, Vec<ServerMessage>), SMError> {
102 trace!("👁🗨WebPushClient::new");
103 let stats = SessionStatistics {
104 existing_uaid: deferred_add_user.is_none(),
105 ..Default::default()
106 };
107 let mut client = WebPushClient {
108 uaid,
109 uid: Uuid::new_v4(),
110 ua_info: UserAgentInfo::from(ua.as_str()),
111 broadcast_subs,
112 flags,
113 ack_state: Default::default(),
114 sent_from_storage: Default::default(),
115 connected_at,
116 current_timestamp,
117 deferred_add_user,
118 last_ping: Default::default(),
119 stats,
120 app_state,
121 };
122
123 let smsgs = if client.flags.check_storage {
124 let smsgs = client.check_storage().await?;
125 debug!(
126 "WebPushClient::new: check_storage smsgs.len(): {}",
127 smsgs.len()
128 );
129 smsgs
130 } else {
131 vec![]
132 };
133 Ok((client, smsgs))
134 }
135
136 pub fn app_settings(&self) -> &Settings {
138 &self.app_state.settings
139 }
140
141 #[cfg(feature = "reliable_report")]
142 pub fn app_reliability(&self) -> &autopush_common::reliability::PushReliability {
143 &self.app_state.reliability
144 }
145
146 pub async fn registry_connect(&self) -> mpsc::UnboundedReceiver<ServerNotification> {
150 self.app_state.clients.connect(self.uaid, self.uid).await
151 }
152
153 pub async fn registry_disconnect(&self) {
155 let _ = self
157 .app_state
158 .clients
159 .disconnect(&self.uaid, &self.uid)
160 .await;
161 }
162
163 pub async fn broadcast_delta(&mut self) -> Option<Vec<Broadcast>> {
166 self.app_state
167 .broadcaster
168 .read()
169 .await
170 .change_count_delta(&mut self.broadcast_subs)
171 }
172
173 pub fn shutdown(&mut self, reason: Option<String>) {
175 trace!("👁🗨WebPushClient::shutdown");
176 self.save_and_notify_unacked_direct_notifs();
177
178 let ua_info = &self.ua_info;
179 let stats = &self.stats;
180 let elapsed_sec = (ms_since_epoch() - self.connected_at) / 1_000;
181 self.app_state
182 .metrics
183 .time_with_tags("ua.connection.lifespan", elapsed_sec)
184 .with_tag("ua_os_family", &ua_info.metrics_os)
185 .with_tag("ua_browser_family", &ua_info.metrics_browser)
186 .send();
187
188 info!("Session";
190 "uaid_hash" => self.uaid.as_simple().to_string(),
191 "uaid_reset" => self.flags.old_record_version,
192 "existing_uaid" => stats.existing_uaid,
193 "connection_type" => "webpush",
194 "ua_name" => &ua_info.browser_name,
195 "ua_os_family" => &ua_info.metrics_os,
196 "ua_os_ver" => &ua_info.os_version,
197 "ua_browser_family" => &ua_info.metrics_browser,
198 "ua_browser_ver" => &ua_info.browser_version,
199 "ua_category" => &ua_info.category,
200 "connection_time" => elapsed_sec,
201 "direct_acked" => stats.direct_acked,
202 "direct_storage" => stats.direct_storage,
203 "stored_retrieved" => stats.stored_retrieved,
204 "stored_acked" => stats.stored_acked,
205 "nacks" => stats.nacks,
206 "registers" => stats.registers,
207 "unregisters" => stats.unregisters,
208 "disconnect_reason" => reason.unwrap_or_else(|| "".to_owned()),
209 );
210 }
211
212 fn save_and_notify_unacked_direct_notifs(&mut self) {
217 let mut notifs = mem::take(&mut self.ack_state.unacked_direct_notifs);
218 trace!(
219 "👁🗨WebPushClient::save_and_notify_unacked_direct_notifs len: {}",
220 notifs.len()
221 );
222 if notifs.is_empty() {
223 return;
224 }
225
226 self.stats.direct_storage += notifs.len() as i32;
227 for notif in &mut notifs {
233 notif.sortkey_timestamp = Some(0);
234 }
235
236 let app_state = Arc::clone(&self.app_state);
237 let uaid = self.uaid;
238 let connected_at = self.connected_at;
239 rt::spawn(async move {
240 #[cfg(not(feature = "reliable_report"))]
241 app_state.db.save_messages(&uaid, notifs).await?;
242 #[cfg(feature = "reliable_report")]
243 {
244 app_state.db.save_messages(&uaid, notifs.clone()).await?;
245 for mut notif in notifs {
246 notif
247 .record_reliability(
248 &app_state.reliability,
249 autopush_common::reliability::ReliabilityState::Stored,
250 )
251 .await;
252 }
253 }
254 debug!("Finished saving unacked direct notifs, checking for reconnect");
255 let Some(user) = app_state.db.get_user(&uaid).await? else {
256 return Err(SMErrorKind::Internal(format!(
257 "User not found for unacked direct notifs: {uaid}"
258 )));
259 };
260 if connected_at == user.connected_at {
261 return Ok(());
262 }
263 if let Some(node_id) = user.node_id {
264 app_state
265 .http
266 .put(format!("{}/notif/{}", node_id, uaid.as_simple()))
267 .send()
268 .await?
269 .error_for_status()?;
270 }
271 Ok(())
272 });
273 }
274
275 pub fn add_sentry_info(self, event: &mut sentry::protocol::Event) {
277 event.user = Some(sentry::User {
278 id: Some(self.uaid.as_simple().to_string()),
279 ..Default::default()
280 });
281 let ua_info = self.ua_info;
282 event
283 .tags
284 .insert("ua_name".to_owned(), ua_info.browser_name);
285 event
286 .tags
287 .insert("ua_os_family".to_owned(), ua_info.metrics_os);
288 event
289 .tags
290 .insert("ua_os_ver".to_owned(), ua_info.os_version);
291 event
292 .tags
293 .insert("ua_browser_family".to_owned(), ua_info.metrics_browser);
294 event
295 .tags
296 .insert("ua_browser_ver".to_owned(), ua_info.browser_version);
297 }
298}
299
300#[derive(Debug)]
301pub struct ClientFlags {
302 pub include_topic: bool,
304 pub increment_storage: bool,
306 pub check_storage: bool,
308 pub old_record_version: bool,
310 pub emit_channel_metrics: bool,
312}
313
314impl Default for ClientFlags {
315 fn default() -> Self {
316 Self {
317 include_topic: true,
318 increment_storage: false,
319 check_storage: false,
320 old_record_version: false,
321 emit_channel_metrics: false,
322 }
323 }
324}
325
326#[derive(Debug, Default)]
331pub struct SessionStatistics {
332 direct_acked: i32,
334 direct_storage: i32,
336 stored_retrieved: i32,
338 stored_acked: i32,
340 nacks: i32,
342 unregisters: i32,
344 registers: i32,
346 existing_uaid: bool,
348}
349
350#[derive(Debug, Default)]
352struct AckState {
353 unacked_direct_notifs: Vec<Notification>,
355 unacked_stored_notifs: Vec<Notification>,
357 #[cfg(feature = "reliable_report")]
360 acked_stored_timestamp_notifs: Vec<Notification>,
361 unacked_stored_highest: Option<u64>,
380}
381
382impl AckState {
383 fn unacked_notifs(&self) -> bool {
386 !self.unacked_stored_notifs.is_empty() || !self.unacked_direct_notifs.is_empty()
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use std::sync::Arc;
393
394 use uuid::Uuid;
395
396 use autoconnect_common::{
397 protocol::{ClientMessage, ServerMessage, ServerNotification},
398 test_support::{DUMMY_CHID, DUMMY_UAID, UA},
399 };
400 use autoconnect_settings::AppState;
401 use autopush_common::{
402 db::{client::FetchMessageResponse, mock::MockDbClient},
403 notification::Notification,
404 util::{ms_since_epoch, sec_since_epoch},
405 };
406
407 use super::WebPushClient;
408
409 async fn wpclient(uaid: Uuid, app_state: AppState) -> (WebPushClient, Vec<ServerMessage>) {
410 WebPushClient::new(
411 uaid,
412 UA.to_owned(),
413 Default::default(),
414 Default::default(),
415 ms_since_epoch(),
416 None,
417 None,
418 Arc::new(app_state),
419 )
420 .await
421 .unwrap()
422 }
423
424 fn new_timestamp_notif(channel_id: &Uuid, ttl: u64) -> Notification {
426 Notification {
427 channel_id: *channel_id,
428 ttl,
429 timestamp: sec_since_epoch(),
430 sortkey_timestamp: Some(ms_since_epoch()),
431 ..Default::default()
432 }
433 }
434
435 #[actix_rt::test]
436 async fn webpush_ping() {
437 let (mut client, _) = wpclient(DUMMY_UAID, Default::default()).await;
438 let pong = client.on_client_msg(ClientMessage::Ping).await.unwrap();
439 assert!(matches!(pong.as_slice(), [ServerMessage::Ping]));
440 }
441
442 #[actix_rt::test]
443 async fn expired_increments_storage() {
444 let mut db = MockDbClient::new();
445 let mut seq = mockall::Sequence::new();
446 let timestamp = sec_since_epoch();
447 db.expect_fetch_topic_messages()
449 .times(1)
450 .in_sequence(&mut seq)
451 .return_once(move |_, _| {
452 Ok(FetchMessageResponse {
453 timestamp: None,
454 messages: vec![],
455 })
456 });
457 db.expect_fetch_timestamp_messages()
459 .times(1)
460 .in_sequence(&mut seq)
461 .withf(move |_, ts, _| ts.is_none())
462 .return_once(move |_, _, _| {
463 Ok(FetchMessageResponse {
464 timestamp: Some(timestamp),
465 messages: vec![
466 new_timestamp_notif(&DUMMY_CHID, 0),
467 new_timestamp_notif(&DUMMY_CHID, 0),
468 ],
469 })
470 });
471 db.expect_fetch_timestamp_messages()
473 .times(1)
474 .in_sequence(&mut seq)
475 .withf(move |_, ts, _| ts == &Some(timestamp))
476 .return_once(|_, _, _| {
477 Ok(FetchMessageResponse {
478 timestamp: None,
479 messages: vec![],
480 })
481 });
482 db.expect_increment_storage()
486 .times(1)
487 .in_sequence(&mut seq)
488 .withf(move |_, ts| ts == ×tamp)
489 .return_once(|_, _| Ok(()));
490
491 let (mut client, _) = wpclient(
493 DUMMY_UAID,
494 AppState {
495 db: db.into_boxed_arc(),
496 ..Default::default()
497 },
498 )
499 .await;
500
501 let smsgs = client
502 .on_server_notif(ServerNotification::CheckStorage)
503 .await
504 .expect("CheckStorage failed");
505 assert!(smsgs.is_empty())
506 }
507}