autoconnect_ws_sm/identified/on_server_notif.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
use cadence::{Counted, CountedExt};
use autoconnect_common::protocol::{ServerMessage, ServerNotification};
use autopush_common::{
db::CheckStorageResponse, notification::Notification, util::sec_since_epoch,
};
use super::WebPushClient;
use crate::error::{SMError, SMErrorKind};
impl WebPushClient {
/// Handle a `ServerNotification` for this user
///
/// `ServerNotification::Disconnect` is emitted by the same autoconnect
/// node recieving it when a User has logged into that same node twice to
/// "Ghost" (disconnect) the first user's session for its second session.
///
/// Other variants are emitted by autoendpoint
pub async fn on_server_notif(
&mut self,
snotif: ServerNotification,
) -> Result<Vec<ServerMessage>, SMError> {
match snotif {
ServerNotification::Notification(notif) => Ok(vec![self.notif(notif)?]),
ServerNotification::CheckStorage => self.check_storage().await,
ServerNotification::Disconnect => Err(SMErrorKind::Ghost.into()),
}
}
/// After disconnecting from the `ClientRegistry`, moves any queued Direct
/// Push Notifications to unacked_direct_notifs (to be stored in the db on
/// `shutdown`)
pub fn on_server_notif_shutdown(&mut self, snotif: ServerNotification) {
trace!("WebPushClient::on_server_notif_shutdown");
if let ServerNotification::Notification(notif) = snotif {
self.ack_state.unacked_direct_notifs.push(notif);
}
}
/// Send a Direct Push Notification to this user
fn notif(&mut self, notif: Notification) -> Result<ServerMessage, SMError> {
trace!("WebPushClient::notif Sending a direct notif");
if notif.ttl != 0 {
self.ack_state.unacked_direct_notifs.push(notif.clone());
}
self.emit_send_metrics(¬if, "Direct");
Ok(ServerMessage::Notification(notif))
}
/// Top level read of Push Notifications from storage
///
/// Initializes the top level `check_storage` and `include_topic` flags and
/// runs `check_storage_loop`
pub(super) async fn check_storage(&mut self) -> Result<Vec<ServerMessage>, SMError> {
trace!("🗄️ WebPushClient::check_storage");
self.flags.check_storage = true;
self.flags.include_topic = true;
self.check_storage_loop().await
}
/// Loop the read of Push Notifications from storage
///
/// Loops until any unexpired Push Notifications are read or there's no
/// more Notifications in storage
pub(super) async fn check_storage_loop(&mut self) -> Result<Vec<ServerMessage>, SMError> {
trace!("🗄️ WebPushClient::check_storage_loop");
while self.flags.check_storage {
let smsgs = self.check_storage_advance().await?;
if !smsgs.is_empty() {
self.check_msg_limit().await?;
return Ok(smsgs);
}
}
// No more notifications (check_storage = false). Despite returning no
// notifs we may have advanced through expired timestamp messages and
// need to increment_storage to mark them as deleted
if self.flags.increment_storage {
debug!("🗄️ WebPushClient::check_storage_loop increment_storage");
self.increment_storage().await?;
}
Ok(vec![])
}
/// Read a chunk (max count 10 returned) of Notifications from storage
///
/// This filters out expired Notifications and may return an empty result
/// set when there's still pending Notifications to be read: so it should
/// be called in a loop to advance through all Notification records
async fn check_storage_advance(&mut self) -> Result<Vec<ServerMessage>, SMError> {
trace!("🗄️ WebPushClient::check_storage_advance");
let CheckStorageResponse {
include_topic,
mut messages,
timestamp,
} = self.do_check_storage().await?;
debug!(
"🗄️ WebPushClient::check_storage_advance \
include_topic: {} -> {} \
unacked_stored_highest: {:?} -> {:?}",
self.flags.include_topic,
include_topic,
self.ack_state.unacked_stored_highest,
timestamp
);
self.flags.include_topic = include_topic;
self.ack_state.unacked_stored_highest = timestamp;
if messages.is_empty() {
trace!("🗄️ WebPushClient::check_storage_advance finished");
self.flags.check_storage = false;
self.sent_from_storage = 0;
return Ok(vec![]);
}
// Filter out TTL expired messages
let now_sec = sec_since_epoch();
// Topic messages require immediate deletion from the db
let mut expired_topic_sort_keys = vec![];
messages.retain(|msg| {
if !msg.expired(now_sec) {
return true;
}
if msg.sortkey_timestamp.is_none() {
expired_topic_sort_keys.push(msg.chidmessageid());
}
// XXX: record ReliabilityState::Expired?
false
});
// TODO: A batch remove_messages would be nicer
for sort_key in expired_topic_sort_keys {
trace!("🉑 removing expired topic sort key: {sort_key}");
self.app_state
.db
.remove_message(&self.uaid, &sort_key)
.await?;
}
self.flags.increment_storage = !include_topic && timestamp.is_some();
if messages.is_empty() {
trace!("🗄️ WebPushClient::check_storage_advance empty response (filtered expired)");
return Ok(vec![]);
}
self.ack_state
.unacked_stored_notifs
.extend(messages.iter().cloned());
let smsgs: Vec<_> = messages
.into_iter()
.inspect(|msg| {
trace!("🗄️ WebPushClient::check_storage_advance Sending stored");
self.emit_send_metrics(msg, "Stored")
})
.map(ServerMessage::Notification)
.collect();
let count = smsgs.len() as u32;
debug!(
"🗄️ WebPushClient::check_storage_advance: sent_from_storage: {}, +{}",
self.sent_from_storage, count
);
self.sent_from_storage += count;
Ok(smsgs)
}
#[cfg(feature = "reliable_report")]
/// Record and transition the state for trackable messages.
async fn record_state(
&self,
messages: &mut Vec<Notification>,
state: autopush_common::reliability::ReliabilityState,
) {
// *Note* because `.map()` is sync
// we can't call the async func without additional hoops.
for message in messages {
message
.record_reliability(&self.app_state.reliability, state)
.await;
}
}
/// Read a chunk (max count 10 returned) of Notifications from storage
///
/// This alternates between reading Topic Notifications and Timestamp
/// Notifications which are stored separately in storage.
///
/// Topic Messages differ in that they replace pending Notifications with
/// new ones if they have matching Topic names. They are used when a sender
/// desires a scenario where multiple Messages sent to an offline device
/// result in the user only seeing the latest Message when the device comes
/// online.
async fn do_check_storage(&self) -> Result<CheckStorageResponse, SMError> {
// start at the latest unacked timestamp or the previous, latest timestamp.
let timestamp = self
.ack_state
.unacked_stored_highest
.or(self.current_timestamp);
trace!("🗄️ WebPushClient::do_check_storage {:?}", ×tamp);
// if we're to include topic messages, do those first.
// NOTE: Bigtable can't fetch `current_timestamp`, so we can't rely on
// `fetch_topic_messages()` returning a reasonable timestamp.
let topic_resp = if self.flags.include_topic {
trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
// Get the most recent max 11 messages.
#[allow(unused_mut)]
let mut messages = self
.app_state
.db
.fetch_topic_messages(&self.uaid, 11)
.await?;
#[cfg(feature = "reliable_report")]
// Since we pulled these from storage, mark them as "retrieved"
self.record_state(
&mut messages.messages,
autopush_common::reliability::ReliabilityState::Retrieved,
)
.await;
messages
} else {
Default::default()
};
// if we have topic messages...
if !topic_resp.messages.is_empty() {
trace!(
"🗄️ WebPushClient::do_check_storage: Topic message returns: {:#?}",
topic_resp.messages
);
self.app_state
.metrics
.count_with_tags(
"notification.message.retrieved",
topic_resp.messages.len() as i64,
)
.with_tag("topic", "true")
.send();
return Ok(CheckStorageResponse {
include_topic: true,
messages: topic_resp.messages,
timestamp: topic_resp.timestamp,
});
}
// No topic messages, so carry on with normal ones, starting from the latest timestamp.
let timestamp = if self.flags.include_topic {
// See above, but Bigtable doesn't return the last message read timestamp when polling
// for topic messages. Instead, we'll use the explicitly set one we store in the User
// record and copy into the WebPushClient struct.
topic_resp.timestamp.or(self.current_timestamp)
} else {
timestamp
};
trace!(
"🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
timestamp
);
#[allow(unused_mut)]
let mut timestamp_resp = self
.app_state
.db
.fetch_timestamp_messages(&self.uaid, timestamp, 10)
.await?;
if !timestamp_resp.messages.is_empty() {
trace!(
"🗄️ WebPushClient::do_check_storage: Timestamp message returns: {:#?}",
timestamp_resp.messages
);
self.app_state
.metrics
.count_with_tags(
"notification.message.retrieved",
timestamp_resp.messages.len() as i64,
)
.with_tag("topic", "false")
.send();
#[cfg(feature = "reliable_report")]
// Since we pulled these from storage, mark them as "retrieved"
self.record_state(
&mut timestamp_resp.messages,
autopush_common::reliability::ReliabilityState::Retrieved,
)
.await;
}
Ok(CheckStorageResponse {
include_topic: false,
messages: timestamp_resp.messages,
// If we didn't get a timestamp off the last query, use the
// original value if passed one
timestamp: timestamp_resp.timestamp.or(timestamp),
})
}
/// Update the user's last Message read timestamp (for timestamp Messages)
///
/// Called when a Client's Ack'd all timestamp messages sent to it to move
/// the timestamp Messages' "pointer". See
/// `AckState::unacked_stored_highest` for further information.
pub(super) async fn increment_storage(&mut self) -> Result<(), SMError> {
trace!(
"🗄️ WebPushClient::increment_storage: unacked_stored_highest: {:?}",
self.ack_state.unacked_stored_highest
);
let Some(timestamp) = self.ack_state.unacked_stored_highest else {
return Err(SMErrorKind::Internal(
"increment_storage w/ no unacked_stored_highest".to_owned(),
)
.into());
};
self.current_timestamp = Some(timestamp);
self.app_state
.db
.increment_storage(&self.uaid, timestamp)
.await?;
self.flags.increment_storage = false;
Ok(())
}
/// Ensure this user hasn't exceeded the maximum allowed number of messages
/// read from storage (`Settings::msg_limit`)
///
/// Drops the user record and returns the `SMErrorKind::UaidReset` error if
/// they have
async fn check_msg_limit(&mut self) -> Result<(), SMError> {
trace!(
"WebPushClient::check_msg_limit: sent_from_storage: {} msg_limit: {}",
self.sent_from_storage,
self.app_state.settings.msg_limit
);
if self.sent_from_storage > self.app_state.settings.msg_limit {
// Exceeded the max limit of stored messages: drop the user to
// trigger a re-register
self.app_state
.metrics
.incr_with_tags("ua.expiration")
.with_tag("reason", "too_many_messages")
.send();
self.app_state.db.remove_user(&self.uaid).await?;
return Err(SMErrorKind::UaidReset.into());
}
Ok(())
}
/// Emit metrics for a Notification to be sent to the user
fn emit_send_metrics(&self, notif: &Notification, source: &'static str) {
let metrics = &self.app_state.metrics;
let ua_info = &self.ua_info;
metrics
.incr_with_tags("ua.notification.sent")
.with_tag("source", source)
.with_tag("topic", ¬if.topic.is_some().to_string())
.with_tag("os", &ua_info.metrics_os)
// TODO: include `internal` if meta is set
.send();
metrics
.count_with_tags(
"ua.message_data",
notif.data.as_ref().map_or(0, |data| data.len() as i64),
)
.with_tag("source", source)
.with_tag("os", &ua_info.metrics_os)
.send();
}
}