1use std::{collections::HashMap, fmt, mem, sync::Arc};
2
3use actix_web::rt;
4use cadence::Timed;
5use futures::channel::mpsc;
6use once_cell::sync::Lazy;
7use tokio::sync::Semaphore;
8use uuid::Uuid;
9
10static DISCONNECT_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(64));
13
14use autoconnect_common::{
15 broadcast::{Broadcast, BroadcastSubs},
16 protocol::{ServerMessage, ServerNotification},
17};
18
19use autoconnect_settings::{AppState, Settings};
20use autopush_common::{
21 db::User,
22 metric_name::MetricName,
23 metrics::StatsdClientExt,
24 notification::Notification,
25 util::{ms_since_epoch, user_agent::UserAgentInfo},
26};
27
28use crate::error::{SMError, SMErrorKind};
29
30mod on_client_msg;
31mod on_server_notif;
32
33pub struct WebPushClient {
44 pub uaid: Uuid,
46 pub uid: Uuid,
48 pub ua_info: UserAgentInfo,
50
51 broadcast_subs: BroadcastSubs,
53
54 flags: ClientFlags,
56 ack_state: AckState,
58 sent_from_storage: u32,
61 deferred_add_user: Option<User>,
65
66 stats: SessionStatistics,
68
69 connected_at: u64,
71 last_ping: u64,
73 current_timestamp: Option<u64>,
76
77 app_state: Arc<AppState>,
78}
79
80impl fmt::Debug for WebPushClient {
81 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
82 fmt.debug_struct("WebPushClient")
83 .field("uaid", &self.uaid)
84 .field("uid", &self.uid)
85 .field("ua_info", &self.ua_info)
86 .field("broadcast_subs", &self.broadcast_subs)
87 .field("flags", &self.flags)
88 .field("ack_state", &self.ack_state)
89 .field("sent_from_storage", &self.sent_from_storage)
90 .field("deferred_add_user", &self.deferred_add_user)
91 .field("stats", &self.stats)
92 .field("connected_at", &self.connected_at)
93 .field("last_ping", &self.last_ping)
94 .finish()
95 }
96}
97
98impl WebPushClient {
99 #[allow(clippy::too_many_arguments)]
100 pub async fn new(
101 uaid: Uuid,
102 ua: String,
103 broadcast_subs: BroadcastSubs,
104 flags: ClientFlags,
105 connected_at: u64,
106 current_timestamp: Option<u64>,
107 deferred_add_user: Option<User>,
108 app_state: Arc<AppState>,
109 ) -> Result<(Self, Vec<ServerMessage>), SMError> {
110 trace!("👁🗨WebPushClient::new");
111 let stats = SessionStatistics {
112 existing_uaid: deferred_add_user.is_none(),
113 ..Default::default()
114 };
115 let mut client = WebPushClient {
116 uaid,
117 uid: Uuid::new_v4(),
118 ua_info: UserAgentInfo::from(ua.as_str()),
119 broadcast_subs,
120 flags,
121 ack_state: Default::default(),
122 sent_from_storage: Default::default(),
123 connected_at,
124 current_timestamp,
125 deferred_add_user,
126 last_ping: Default::default(),
127 stats,
128 app_state,
129 };
130
131 let smsgs = if client.flags.check_storage {
132 let smsgs = client.check_storage().await?;
133 debug!(
134 "WebPushClient::new: check_storage smsgs.len(): {}",
135 smsgs.len()
136 );
137 smsgs
138 } else {
139 vec![]
140 };
141 Ok((client, smsgs))
142 }
143
144 pub fn app_settings(&self) -> &Settings {
146 &self.app_state.settings
147 }
148
149 #[cfg(feature = "reliable_report")]
150 pub fn app_reliability(&self) -> &autopush_common::reliability::PushReliability {
151 &self.app_state.reliability
152 }
153
154 pub fn registry_connect(&self) -> mpsc::Receiver<ServerNotification> {
158 self.app_state.clients.connect(self.uaid, self.uid)
159 }
160
161 pub fn registry_disconnect(&self) {
163 let _ = self.app_state.clients.disconnect(&self.uaid, &self.uid);
165 }
166
167 pub async fn broadcast_delta(&mut self) -> Option<Vec<Broadcast>> {
170 self.app_state
171 .broadcaster
172 .read()
173 .await
174 .change_count_delta(&mut self.broadcast_subs)
175 }
176
177 pub fn shutdown(&mut self, reason: Option<String>) {
179 trace!("👁🗨WebPushClient::shutdown");
180 self.save_and_notify_unacked_direct_notifs();
181
182 let ua_info = &self.ua_info;
183 let stats = &self.stats;
184 let elapsed_sec = (ms_since_epoch() - self.connected_at) / 1_000;
185 self.app_state
186 .metrics
187 .time_with_tags("ua.connection.lifespan", elapsed_sec)
188 .with_tag("ua_os_family", &ua_info.metrics_os)
189 .with_tag("ua_browser_family", &ua_info.metrics_browser)
190 .send();
191
192 info!("Session";
194 "uaid_hash" => self.uaid.as_simple().to_string(),
195 "uaid_reset" => self.flags.old_record_version,
196 "existing_uaid" => stats.existing_uaid,
197 "connection_type" => "webpush",
198 "ua_name" => &ua_info.browser_name,
199 "ua_os_family" => &ua_info.metrics_os,
200 "ua_os_ver" => &ua_info.os_version,
201 "ua_browser_family" => &ua_info.metrics_browser,
202 "ua_browser_ver" => &ua_info.browser_version,
203 "ua_category" => &ua_info.category,
204 "connection_time" => elapsed_sec,
205 "direct_acked" => stats.direct_acked,
206 "direct_storage" => stats.direct_storage,
207 "stored_retrieved" => stats.stored_retrieved,
208 "stored_acked" => stats.stored_acked,
209 "nacks" => stats.nacks,
210 "registers" => stats.registers,
211 "unregisters" => stats.unregisters,
212 "disconnect_reason" => reason.unwrap_or_else(|| "".to_owned()),
213 );
214 }
215
216 fn save_and_notify_unacked_direct_notifs(&mut self) {
221 let notif_map = mem::take(&mut self.ack_state.unacked_direct_notifs);
222 trace!(
223 "👁🗨WebPushClient::save_and_notify_unacked_direct_notifs len: {}",
224 notif_map.len()
225 );
226 if notif_map.is_empty() {
227 return;
228 }
229
230 self.stats.direct_storage += notif_map.len() as i32;
231 let mut notifs: Vec<Notification> = notif_map.into_values().collect();
237 for notif in &mut notifs {
238 notif.sortkey_timestamp = Some(0);
239 }
240
241 let app_state = Arc::clone(&self.app_state);
242 let uaid = self.uaid;
243 let connected_at = self.connected_at;
244 rt::spawn(async move {
245 let _permit = match DISCONNECT_SEMAPHORE.acquire().await {
246 Ok(permit) => permit,
247 Err(_) => {
248 app_state
249 .metrics
250 .incr(MetricName::ErrorDisconnectSemaphoreFull)
251 .ok();
252 warn!("Disconnect semaphore full, skipping save of unacked direct notifs");
253 return Ok(());
254 }
255 };
256 #[cfg(not(feature = "reliable_report"))]
257 app_state.db.save_messages(&uaid, notifs).await?;
258 #[cfg(feature = "reliable_report")]
259 {
260 app_state.db.save_messages(&uaid, notifs.clone()).await?;
261 for mut notif in notifs {
262 notif
263 .record_reliability(
264 &app_state.reliability,
265 autopush_common::reliability::ReliabilityState::Stored,
266 )
267 .await;
268 }
269 }
270 debug!("Finished saving unacked direct notifs, checking for reconnect");
271 let Some(user) = app_state.db.get_user(&uaid).await? else {
272 return Err(SMErrorKind::Internal(format!(
273 "User not found for unacked direct notifs: {uaid}"
274 )));
275 };
276 if connected_at == user.connected_at {
277 return Ok(());
278 }
279 if let Some(node_id) = user.node_id {
280 app_state
281 .http
282 .put(format!("{}/notif/{}", node_id, uaid.as_simple()))
283 .send()
284 .await?
285 .error_for_status()?;
286 }
287 Ok(())
288 });
289 }
290
291 pub fn add_sentry_info(self, event: &mut sentry::protocol::Event) {
293 event.user = Some(sentry::User {
294 id: Some(self.uaid.as_simple().to_string()),
295 ..Default::default()
296 });
297 let ua_info = self.ua_info;
298 event
299 .tags
300 .insert("ua_name".to_owned(), ua_info.browser_name);
301 event
302 .tags
303 .insert("ua_os_family".to_owned(), ua_info.metrics_os);
304 event
305 .tags
306 .insert("ua_os_ver".to_owned(), ua_info.os_version);
307 event
308 .tags
309 .insert("ua_browser_family".to_owned(), ua_info.metrics_browser);
310 event
311 .tags
312 .insert("ua_browser_ver".to_owned(), ua_info.browser_version);
313 }
314}
315
316#[derive(Debug)]
317pub struct ClientFlags {
318 pub include_topic: bool,
320 pub increment_storage: bool,
322 pub check_storage: bool,
324 pub old_record_version: bool,
326 pub emit_channel_metrics: bool,
328}
329
330impl Default for ClientFlags {
331 fn default() -> Self {
332 Self {
333 include_topic: true,
334 increment_storage: false,
335 check_storage: false,
336 old_record_version: false,
337 emit_channel_metrics: false,
338 }
339 }
340}
341
342#[derive(Debug, Default)]
347pub struct SessionStatistics {
348 direct_acked: i32,
350 direct_storage: i32,
352 stored_retrieved: i32,
354 stored_acked: i32,
356 nacks: i32,
358 unregisters: i32,
360 registers: i32,
362 existing_uaid: bool,
364}
365
366type AckKey = String;
371
372#[derive(Debug, Default)]
374struct AckState {
375 unacked_direct_notifs: HashMap<AckKey, Notification>,
377 unacked_stored_notifs: HashMap<AckKey, Notification>,
379 #[cfg(feature = "reliable_report")]
382 acked_stored_timestamp_notifs: Vec<Notification>,
383 unacked_stored_highest: Option<u64>,
402}
403
404impl AckState {
405 fn unacked_notifs(&self) -> bool {
408 !self.unacked_stored_notifs.is_empty() || !self.unacked_direct_notifs.is_empty()
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use std::sync::Arc;
415
416 use uuid::Uuid;
417
418 use autoconnect_common::{
419 protocol::{ClientMessage, ServerMessage, ServerNotification},
420 test_support::{DUMMY_CHID, DUMMY_UAID, UA},
421 };
422 use autoconnect_settings::AppState;
423 use autopush_common::{
424 db::{client::FetchMessageResponse, mock::MockDbClient},
425 notification::Notification,
426 util::{ms_since_epoch, sec_since_epoch},
427 };
428
429 use super::WebPushClient;
430
431 async fn wpclient(uaid: Uuid, app_state: AppState) -> (WebPushClient, Vec<ServerMessage>) {
432 WebPushClient::new(
433 uaid,
434 UA.to_owned(),
435 Default::default(),
436 Default::default(),
437 ms_since_epoch(),
438 None,
439 None,
440 Arc::new(app_state),
441 )
442 .await
443 .unwrap()
444 }
445
446 fn new_timestamp_notif(channel_id: &Uuid, ttl: u64) -> Notification {
448 Notification {
449 channel_id: *channel_id,
450 ttl,
451 timestamp: sec_since_epoch(),
452 sortkey_timestamp: Some(ms_since_epoch()),
453 ..Default::default()
454 }
455 }
456
457 #[actix_rt::test]
458 async fn webpush_ping() {
459 let (mut client, _) = wpclient(DUMMY_UAID, Default::default()).await;
460 let pong = client.on_client_msg(ClientMessage::Ping).await.unwrap();
461 assert!(matches!(pong.as_slice(), [ServerMessage::Ping]));
462 }
463
464 #[actix_rt::test]
465 async fn expired_increments_storage() {
466 let mut db = MockDbClient::new();
467 let mut seq = mockall::Sequence::new();
468 let timestamp = sec_since_epoch();
469 db.expect_fetch_topic_messages()
471 .times(1)
472 .in_sequence(&mut seq)
473 .return_once(move |_, _| {
474 Ok(FetchMessageResponse {
475 timestamp: None,
476 messages: vec![],
477 })
478 });
479 db.expect_fetch_timestamp_messages()
481 .times(1)
482 .in_sequence(&mut seq)
483 .withf(move |_, ts, _| ts.is_none())
484 .return_once(move |_, _, _| {
485 Ok(FetchMessageResponse {
486 timestamp: Some(timestamp),
487 messages: vec![
488 new_timestamp_notif(&DUMMY_CHID, 0),
489 new_timestamp_notif(&DUMMY_CHID, 0),
490 ],
491 })
492 });
493 db.expect_fetch_timestamp_messages()
495 .times(1)
496 .in_sequence(&mut seq)
497 .withf(move |_, ts, _| ts == &Some(timestamp))
498 .return_once(|_, _, _| {
499 Ok(FetchMessageResponse {
500 timestamp: None,
501 messages: vec![],
502 })
503 });
504 db.expect_increment_storage()
508 .times(1)
509 .in_sequence(&mut seq)
510 .withf(move |_, ts| ts == ×tamp)
511 .return_once(|_, _| Ok(()));
512
513 let (mut client, _) = wpclient(
515 DUMMY_UAID,
516 AppState {
517 db: db.into_boxed_arc(),
518 ..Default::default()
519 },
520 )
521 .await;
522
523 let smsgs = client
524 .on_server_notif(ServerNotification::CheckStorage)
525 .await
526 .expect("CheckStorage failed");
527 assert!(smsgs.is_empty())
528 }
529}