autopush_common/reliability.rs
1/// Push Reliability Recorder
2///
3/// This allows us to track messages from select, known parties (currently, just
4/// mozilla generated and consumed) so that we can identify potential trouble spots
5/// and where messages expire early. Message expiration can lead to message loss
6use std::collections::HashMap;
7use std::str::FromStr;
8use std::sync::Arc;
9
10use actix_web::HttpResponse;
11use cadence::StatsdClient;
12use chrono::TimeDelta;
13use deadpool_redis::Config;
14
15use prometheus_client::{
16 encoding::text::encode, metrics::family::Family, metrics::gauge::Gauge, registry::Registry,
17};
18use redis::{AsyncCommands, aio::ConnectionLike};
19use redis::{Pipeline, Value};
20
21use crate::db::client::DbClient;
22use crate::errors::{ApcError, ApcErrorKind, Result};
23use crate::metric_name::MetricName;
24use crate::metrics::StatsdClientExt;
25use crate::util::timing::sec_since_epoch;
26
27// Redis Keys
28pub const COUNTS: &str = "state_counts";
29pub const EXPIRY: &str = "expiry";
30
31const CONNECTION_EXPIRATION: TimeDelta = TimeDelta::seconds(10);
32// Minimum expiration period of 1 second.
33// This was set to `0`, but there was some confusion whether that would not set an
34// expiration time for a record or would set a record not to expire.
35const MIN_EXPIRATION: u64 = 1;
36
37/// The various states that a message may transit on the way from reception to delivery.
38// Note: "Message" in this context refers to the Subscription Update.
39// TODO: Differentiate between "transmitted via webpush" and "transmitted via bridge"?
40#[derive(
41 Debug,
42 Clone,
43 Copy,
44 PartialEq,
45 Eq,
46 serde::Deserialize,
47 serde::Serialize,
48 strum::Display,
49 strum::EnumString,
50)]
51#[serde(rename_all = "snake_case")]
52#[strum(serialize_all = "snake_case")]
53pub enum ReliabilityState {
54 Received, // Message was received by the Push Server
55 Stored, // Message was stored because it could not be delivered immediately
56 Retrieved, // Message was taken from storage for delivery
57 IntTransmitted, // Message was handed off between autoendpoint and autoconnect
58 IntAccepted, // Message was accepted by autoconnect from autopendpoint
59 BridgeTransmitted, // Message was handed off to a mobile bridge for eventual delivery
60 Transmitted, // Message was handed off for delivery to the UA
61 Accepted, // Message was accepted for delivery by the UA
62 Delivered, // Message was provided to the WebApp recipient by the UA
63 DecryptionError, // Message was provided to the UA and it reported a decryption error
64 NotDelivered, // Message was provided to the UA and it reported a not delivered error
65 Expired, // Message expired naturally (e.g. TTL=0)
66 Errored, // Message resulted in an Error state and failed to be delivered.
67}
68
69impl ReliabilityState {
70 /// Has the message reached a state where no further transitions should be possible?
71 pub fn is_terminal(self) -> bool {
72 // NOTE: this list should match the daily snapshot captured by `reliability_cron.py`
73 // which will trim these counts after the max message TTL has expired.
74 matches!(
75 self,
76 ReliabilityState::DecryptionError
77 | ReliabilityState::BridgeTransmitted
78 | ReliabilityState::Delivered
79 | ReliabilityState::Errored
80 | ReliabilityState::Expired
81 | ReliabilityState::NotDelivered
82 )
83 }
84}
85
86#[derive(Clone)]
87pub struct PushReliability {
88 pool: Option<deadpool_redis::Pool>,
89 db: Box<dyn DbClient>,
90 metrics: Arc<StatsdClient>,
91 retries: usize,
92}
93
94// Define a struct to hold the expiry key, since it's easy to flub the order.
95pub struct ExpiryKey {
96 pub id: String,
97 pub state: ReliabilityState,
98}
99
100impl std::fmt::Display for ExpiryKey {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(f, "{}#{}", self.id, self.state)
103 }
104}
105
106impl TryFrom<String> for ExpiryKey {
107 type Error = ApcError;
108 fn try_from(value: String) -> Result<Self> {
109 let (id, state) = value.split_once('#').ok_or_else(|| {
110 ApcErrorKind::GeneralError("ExpiryKey must be in the format 'id#state'".to_owned())
111 })?;
112 let state: ReliabilityState = ReliabilityState::from_str(state).map_err(|_| {
113 ApcErrorKind::GeneralError(
114 "Invalid state in ExpiryKey, must be a valid ReliabilityState".to_owned(),
115 )
116 })?;
117 Ok(Self {
118 id: id.to_owned(),
119 state,
120 })
121 }
122}
123
124impl PushReliability {
125 // Do the magic to make a report instance, whatever that will be.
126 pub fn new(
127 reliability_dsn: &Option<String>,
128 db: Box<dyn DbClient>,
129 metrics: &Arc<StatsdClient>,
130 retries: usize,
131 ) -> Result<Self> {
132 let Some(reliability_dsn) = reliability_dsn else {
133 debug!("π No reliability DSN declared.");
134 return Ok(Self {
135 pool: None,
136 db: db.clone(),
137 metrics: metrics.clone(),
138 retries,
139 });
140 };
141
142 let config: deadpool_redis::Config = Config::from_url(reliability_dsn);
143 let pool = Some(
144 config
145 .builder()
146 .map_err(|e| {
147 ApcErrorKind::GeneralError(format!("Could not config reliability pool {e:?}"))
148 })?
149 .create_timeout(Some(CONNECTION_EXPIRATION.to_std().unwrap()))
150 .runtime(deadpool::Runtime::Tokio1)
151 .build()
152 .map_err(|e| {
153 ApcErrorKind::GeneralError(format!("Could not build reliability pool {e:?}"))
154 })?,
155 );
156
157 Ok(Self {
158 pool,
159 db: db.clone(),
160 metrics: metrics.clone(),
161 retries,
162 })
163 }
164
165 // Record the message state change to storage.
166 pub async fn record(
167 &self,
168 reliability_id: &Option<String>,
169 new: ReliabilityState,
170 old: &Option<ReliabilityState>,
171 expr: Option<u64>,
172 ) -> Result<Option<ReliabilityState>> {
173 let Some(id) = reliability_id else {
174 return Ok(None);
175 };
176 if let Some(pool) = &self.pool {
177 debug!(
178 "π {} from {} to {}",
179 id,
180 old.map(|v| v.to_string())
181 .unwrap_or_else(|| "None".to_owned()),
182 new
183 );
184 match pool.get().await {
185 Ok(mut conn) => {
186 self.internal_record(&mut conn, old, new, expr, id)
187 .await
188 .map_err(|e| {
189 warn!("πβ οΈ Unable to record reliability state: {:?}", e);
190 ApcErrorKind::GeneralError(
191 "Could not record reliability state".to_owned(),
192 )
193 })?;
194 }
195 Err(e) => warn!("πβ οΈ Unable to get reliability state pool, {:?}", e),
196 };
197 };
198 // Errors are not fatal, and should not impact message flow, but
199 // we should record them somewhere.
200 let _ = self.db.log_report(id, new).await.inspect_err(|e| {
201 warn!("πβ οΈ Unable to record reliability state log: {:?}", e);
202 });
203 Ok(Some(new))
204 }
205
206 /// Record the state change in the reliability datastore.
207 /// Because of a strange duplication error, we're using three "tables".
208 /// The COUNTS table contains the count of messages in each state. It's provided as a quick lookup
209 /// for the dashboard query.
210 /// The EXPIRY table contains the expiration time for messages. Those are cleaned up by the
211 /// `gc` function.
212 /// Finally, there's the "table" that is `state.{id}`. This contains the current state of each
213 /// message. Ideally, this would use something like `HSETEX`, but that is not available until
214 /// Redis 8.0, so for now, we use the `SET` function, which provides an expiration time.
215 /// BE SURE TO SET AN EXPIRATION TIME FOR EACH STATE MESSAGE!
216 // TODO: Do we need to keep the state in the message? Probably good for sanity reasons.
217 pub(crate) async fn internal_record<C: ConnectionLike + AsyncCommands>(
218 &self,
219 conn: &mut C,
220 old: &Option<ReliabilityState>,
221 new: ReliabilityState,
222 expr: Option<u64>,
223 id: &str,
224 ) -> Result<()> {
225 trace!(
226 "π internal record: {} from {} to {}",
227 id,
228 old.map(|v| v.to_string())
229 .unwrap_or_else(|| "None".to_owned()),
230 new
231 );
232
233 let state_key = format!("state.{id}");
234 trace!("π state key: {}", &state_key);
235 /*
236 ## Reliability lock and adjustment
237
238 This transaction creates a lock on the individual messages "state_key" (which contains the current messages state)
239 and the entire Expiration table. If either of those values are changed while the transaction is in progress, then
240 the transaction will fail, return a `Redis::Nil`, and the transaction will retry up to `self.retries` (Note:
241 none of the operations in the transaction will have taken place yet.)
242
243 We want to lock on the `state_key` because that shows the given state of the message.
244 We want to lock on the `expiry` table because the message may have expired, been handled by the `gc()` function and
245 may have already been adjusted. (Remember, the `expiry` table holds timestamp markers for when a message will be
246 expiring.)
247
248 The operations in the Pipeline are:
249
250 1. If there is an `old` state, decrement the old state count and remove the old marker from the `expiry` table.
251 2. If the `new` state is not a terminal state (e.g. it is not a final disposition for a message), then create a new
252 entry for `expiry`
253 3. Increment the `new` state count
254 4. Modify the `state.{ID}` value (if it exists) to indicate the newest known state for the message, returning the
255 prior value. Checking that this value is not Nil is an additional sanity check that the value was not removed
256 or altered by a different process. (We are already imposing a transaction lock which should prevent this, but
257 additional paranoia can sometimes be good.)
258
259 Upon execution of the Pipeline, we get back a list of results. This should look similar to a `Queued` entry for
260 every function that we've performed, followed by an inline list of the results of those functions. (e.g.
261 for a transition between `ReliabilityState::Stored` to `ReliabilityState::Retrieved`, which are neither terminal
262 states), we should see something like: `["Queued","Queued","Queued",["OK", "OK", "stored"]]`
263
264 If any of the locks failed, we should get back a `Nil`, in which case, we would want to try this operation again.
265 In addition, the sanity `Get` may also return a `Nil` which would also indicate that there was a problem. There's
266 some debate whether or not to retry the operation if that's the case (since it would imply that the lock failed
267 for some unexpected reason), however for now, we just report a soft `error!()`.
268
269 */
270 // Create the initial state key.
271 // Do not place this inside of the transaction. We monitor the state key and the transaction will
272 // fail because the value will change before the transaction completes. Yes, really.
273 if new == ReliabilityState::Received && old.is_none() {
274 trace!(
275 "π Creating new record {state_key} ex {:?}",
276 expr.unwrap_or(MIN_EXPIRATION)
277 );
278 // we can't perform this in a transaction because we can only increment if the set succeeds,
279 // and values aren't returned when creating values in transactions. In order to do this
280 // from inside the transaction, you would need to create a function, and that feels a bit
281 // too heavy for this.
282 // Create the new `state.{id}` key if it does not exist, and set the expiration.
283 let options = redis::SetOptions::default()
284 .with_expiration(redis::SetExpiry::EX(expr.unwrap_or(MIN_EXPIRATION)))
285 .conditional_set(redis::ExistenceCheck::NX);
286 trace!("π β SET {state_key} NX EX {:?}", new);
287 let result = conn
288 .set_options::<_, _, Value>(&state_key, new.to_string(), options)
289 .await
290 .map_err(|e| {
291 //dbg!(&e);
292 warn!("πβ οΈ Could not create state key: {:?}", e);
293 ApcErrorKind::GeneralError("Could not create the state key".to_owned())
294 })?;
295 if result != redis::Value::Okay {
296 // Redis returned a `Nil`, indicating that there was some error. The only thing that should cause that
297 // would be if the `old` state was reset to `None` and we thought we needed to create a new state.
298 // Since the message carries it's prior state, it shouldn't be set to `None` unless there's something
299 // strange going on.
300 // TODO: It's worth noting that when restarting autoendpoint, we get a large number of these in the logs.
301 // Need to figure out the reason for that.
302 // The `result` is always `nil` so that's not helpful.
303 error!("πβ οΈ Tried to recreate state_key {state_key}: {old:?} => {new:?}",);
304 return Err(
305 ApcErrorKind::GeneralError("Tried to recreate state_key".to_string()).into(),
306 );
307 }
308 } else {
309 trace!("π Checking {:?}", &old);
310 // safety check (yes, there's still a slight chance of a race, but it's small)
311 if let Some(old) = old {
312 let check_state: String = conn.get(&state_key).await?;
313 trace!("π Checking state for {}: {:?}", id, &check_state);
314 if check_state != old.to_string() {
315 trace!(
316 "π Attempting to update state for {} from {} to {}, but current state is different: {:?}",
317 id, old, new, check_state
318 );
319 return Err(ApcErrorKind::GeneralError(
320 "State mismatch during reliability record update".to_owned(),
321 )
322 .into());
323 }
324 };
325 }
326
327 crate::redis_util::transaction(
328 conn,
329 &[&state_key, &EXPIRY.to_owned()],
330 self.retries,
331 || ApcErrorKind::GeneralError("Exceeded reliability record retry attempts".to_owned()),
332 async |conn, pipe: &mut Pipeline| {
333 // The first state is special, since we need to create the `state_key`.
334 // remove the old state from the expiry set, if it exists.
335 // There should only be one message at a given state in the `expiry` table.
336 // Since we only use that table to track messages that may expire. (We
337 // decrement "expired" messages in the `gc` function, so having messages
338 // in multiple states may decrement counts incorrectly.))
339 if let Some(old) = old {
340 trace!("π β {old} - {id}");
341 trace!("π πͺβ HINCRBY {COUNTS} 1");
342 pipe.hincr(COUNTS, old.to_string(), -1);
343 let key = ExpiryKey {
344 id: id.to_string(),
345 state: old.to_owned(),
346 }
347 .to_string();
348 trace!("π internal remove old state: {:?}", key);
349 trace!("π πͺβ ZREM {EXPIRY} {key}");
350 pipe.zrem(EXPIRY, &key);
351 }
352 if !new.is_terminal() {
353 // Write the expiration only if the state is non-terminal. Otherwise we run the risk of
354 // messages reporting a false "expired" state even if they were "successful".
355 let key = ExpiryKey {
356 id: id.to_string(),
357 state: new.to_owned(),
358 }
359 .to_string();
360 trace!("π πͺβ ZADD {EXPIRY} {:?} {key}", expr.unwrap_or_default());
361 pipe.zadd(EXPIRY, &key, expr.unwrap_or_default());
362 trace!("π internal record result: {:?}", key);
363 }
364 trace!("π β {new} - {id}");
365 // Bump up the new state count, and set the state key's state if it still exists.
366 trace!("π πͺβ HINCRBY {COUNTS} {new} 1");
367 pipe.hincr(COUNTS, new.to_string(), 1);
368 if new != ReliabilityState::Received {
369 let options = redis::SetOptions::default()
370 .with_expiration(redis::SetExpiry::KEEPTTL)
371 .conditional_set(redis::ExistenceCheck::XX)
372 .get(true);
373 trace!("π πͺβ SET {state_key} {new} XX KEEPTTL");
374 pipe.set_options(&state_key, new.to_string(), options);
375 }
376 // `exec_query` returns `RedisResult<()>`.
377 // `query_async` returns `RedisResult<Option<redis::Value>, RedisError>`.
378 // We really don't care about the returned result here, but transaction
379 // retries if we return Ok(None), so we run the exec and return
380 // a nonce `Some` value.
381 // The turbo-fish is a fallback for edition 2024
382 let result = pipe.query_async::<redis::Value>(conn).await?;
383 trace!("π πͺ {id} - {:?}", &result);
384 // The last element returned from the command is the result of the pipeline.
385 // If Redis encounters an error, it will return a `nil` as well. We handle both
386 // the same (retry), so we can normalize errors as `nil`.
387 // The last of which should be the result of the piped command set.
388 // This should return `nil` if there is any error, in which case we record
389 // a soft error. (On success, it will return the result of the last command
390 // in the pipe, which may vary due to the current state).
391 // This could also be strung together as a cascade of functions, but it's broken
392 // out to discrete steps for readability.
393 /* On prod, we get a large number of these errors, which I think might be clogging
394 up the redis connections, causing servers to report as degraded.
395 */
396 if result == redis::Value::Nil {
397 warn!("πβ πͺ {id} - Pipe failed, skipping retry.");
398 // temporarily just let things fail to handle autoendpoint degradation.
399 // return Ok(None);
400 return Ok(Some(redis::Value::Okay));
401 }
402 if let Some(operations) = result.as_sequence() {
403 // We have responses, the first items report the state of the commands,
404 // the final line is a list of command results.
405 if let Some(result_values) = operations.last() && let Some(results) = result_values.as_sequence() &&
406 // The last command should contain the prior state. If it returned `Nil`
407 // for some, unexpected reason, note the error.
408 Some(&redis::Value::Nil) == results.last() {
409 error!("ππ¨ WARNING: Lock Issue for {id}")
410 // There is some debate about whether or not to rerun
411 // the transaction if this state is reached.
412 // Rerunning would cause the counts to be impacted, but
413 // might address any other issues that caused the
414 // `Nil` to be returned.
415 // For now, let's just log the error.
416
417
418 }
419 Ok(Some(redis::Value::Okay))
420 } else {
421 // a Nil will rerun the transaction.
422 Ok(None)
423 }
424 },
425 )
426 .await
427 .inspect_err(|e| {
428 //dbg!(&e);
429 warn!("πβ οΈError occurred during transaction: {:?}", e);
430 })?;
431 Ok(())
432 }
433
434 /// Perform a garbage collection cycle on a reliability object.
435 pub async fn gc(&self) -> Result<()> {
436 if let Some(pool) = &self.pool
437 && let Ok(mut conn) = pool.get().await
438 {
439 debug!("π performing pre-report garbage collection");
440 return self.internal_gc(&mut conn, sec_since_epoch()).await;
441 }
442 Ok(())
443 }
444
445 // Perform the `garbage collection` cycle. This will scan the currently known timestamp
446 // indexed entries in redis looking for "expired" data, and then rectify the counts to
447 // indicate the final states. This is because many of the storage systems do not provide
448 // indicators when data reaches a TTL.
449
450 // A few notes about redis:
451 // "pipeline" essentially stitches commands together. Each command executes in turn, but the data store
452 // remains open for other operations to occur.
453 // `atomic()` wraps things in a transaction, essentially locking the data store while the command executes.
454 // Sadly, there's no way to create a true transaction where you read and write in a single operation, so we
455 // have to presume some "slop" here.
456 pub(crate) async fn internal_gc<C: ConnectionLike + AsyncCommands>(
457 &self,
458 conn: &mut C,
459 expr: u64,
460 ) -> Result<()> {
461 let result: redis::Value = crate::redis_util::transaction(
462 conn,
463 &[EXPIRY],
464 self.retries,
465 || ApcErrorKind::GeneralError("Exceeded gc retry attempts".to_owned()),
466 async |conn, pipe| {
467 // First, get the list of values that are to be purged.
468 let purged: Vec<String> = conn.zrangebyscore(EXPIRY, 0, expr as isize).await?;
469 // insta-bail if there's nothing to do.
470 if purged.is_empty() {
471 return Ok(Some(redis::Value::Nil));
472 }
473
474 // Now purge each of the values by resetting the counts and removing the item from
475 // the purge set.
476 for key in purged {
477 let Ok(expiry_key) = ExpiryKey::try_from(key.clone()) else {
478 let err = "Invalid key stored in Reliability datastore";
479 error!("ππ₯ {} [{:?}]", &err, &key);
480 return Err(ApcErrorKind::GeneralError(err.to_owned()));
481 };
482 // Adjust the COUNTS and then remove the record from the list of expired rows.
483 trace!("π βπͺ¦ {} - {key}", expiry_key.state.to_string());
484 pipe.hincr(COUNTS, expiry_key.state.to_string(), -1);
485 pipe.hincr(COUNTS, ReliabilityState::Expired.to_string(), 1);
486 pipe.zrem(EXPIRY, key);
487 }
488 Ok(pipe.query_async(conn).await?)
489 },
490 )
491 .await?;
492 self.metrics
493 .incr_with_tags(MetricName::ReliabilityGc)
494 .with_tag(
495 "status",
496 if result == redis::Value::Nil {
497 "error"
498 } else {
499 "success"
500 },
501 )
502 .send();
503 Ok(())
504 }
505
506 // Return a snapshot of milestone states
507 // This will probably not be called directly, but useful for debugging.
508 pub async fn report(&self) -> Result<HashMap<String, i32>> {
509 if let Some(pool) = &self.pool
510 && let Ok(mut conn) = pool.get().await
511 {
512 return Ok(conn.hgetall(COUNTS).await.map_err(|e| {
513 ApcErrorKind::GeneralError(format!("Could not read report {e:?}"))
514 })?);
515 }
516 Ok(HashMap::new())
517 }
518
519 pub async fn health_check<'a>(&self) -> Result<&'a str> {
520 if let Some(pool) = &self.pool {
521 let mut conn = pool.get().await.map_err(|e| {
522 ApcErrorKind::GeneralError(format!(
523 "Could not connect to reliability datastore: {e:?}"
524 ))
525 })?;
526 // Add a type here, even though we're tossing the value, in order to prevent the `FromRedisValue` warning.
527 conn.ping::<()>().await.map_err(|e| {
528 ApcErrorKind::GeneralError(format!("Could not ping reliability datastore: {e:?}"))
529 })?;
530 Ok("OK")
531 } else {
532 Ok("OK")
533 }
534 }
535}
536
537const METRIC_NAME: &str = "autopush_reliability";
538
539/// Generate a Prometheus compatible report. Output should follow the
540/// [instrumentation](https://prometheus.io/docs/practices/instrumentation/) guidelines.
541///
542/// In short form, the file should be a plain text output, with each metric on it's own line
543/// using the following format:
544/// ```text
545/// # HELP metric_name Optional description of this metric
546/// # TYPE metric_name {required type (gauge|count|histogram|summary)}
547/// metric_name{label="label1"} value
548/// metric_name{label="label2"} value
549/// ```
550/// An example which would return counts of messages in given states at the current
551/// time would be:
552/// ```text
553/// # HELP autopush_reliability Counts for messages in given states
554/// # TYPE metric_name gauge
555/// autopush_reliability{state="recv"} 123
556/// autopush_reliability{state="stor"} 123
557/// # EOF
558/// ```
559/// Note that time is not required. A timestamp has been added to the output, but is
560/// ignored by Prometheus, and is only provided to ensure that there is no intermediate
561/// caching occurring.
562///
563/// The report endpoint currently is only provided by `autoendpoint`, even though the report
564/// is inclusive for all push milestones. This is done for simplicity, both for serving the
565/// data and for collection and management of the metrics.
566///
567pub fn gen_report(values: HashMap<String, i32>) -> Result<String> {
568 let mut registry = Registry::default();
569
570 // A "family" is a grouping of metrics.
571 // we specify this as the ("label", "label value") which index to a Gauge.
572 let family = Family::<Vec<(&str, String)>, Gauge>::default();
573 // This creates the top level association of the elements in the family with the metric.
574 registry.register(
575 METRIC_NAME,
576 "Count of messages at given states",
577 family.clone(),
578 );
579 for (milestone, value) in values.into_iter() {
580 // prevent any stray leakage of invalid state data
581 if ReliabilityState::from_str(&milestone).is_err() {
582 trace!("π skipping invalid state {milestone:?}");
583 continue;
584 }
585 // Specify the static "state" label name with the given milestone, and add the
586 // value as the gauge value.
587 family
588 .get_or_create(&vec![("state", milestone)])
589 .set(value.into());
590 }
591
592 // Return the formatted string that Prometheus will eventually read.
593 let mut encoded = String::new();
594 encode(&mut encoded, ®istry).map_err(|e| {
595 ApcErrorKind::GeneralError(format!("Could not generate Reliability report {e:?}"))
596 })?;
597 Ok(encoded)
598}
599
600/// Handle the `/metrics` request by returning a Prometheus compatible report.
601pub async fn report_handler(reliability: &Arc<PushReliability>) -> Result<HttpResponse> {
602 reliability.gc().await?;
603 let report = gen_report(reliability.report().await?)?;
604 Ok(HttpResponse::Ok()
605 .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
606 .body(report))
607}
608
609#[cfg(test)]
610mod tests {
611 use std::collections::HashMap;
612 use std::str::FromStr;
613
614 use super::*;
615 use redis_test::{MockCmd, MockRedisConnection};
616 use uuid::Uuid;
617
618 #[test]
619 fn test_report() {
620 // create a nonce report
621 let mut report: HashMap<String, i32> = HashMap::new();
622 let recv = ReliabilityState::Received.to_string();
623 let trns = ReliabilityState::Transmitted.to_string();
624 report.insert(recv.clone(), 111);
625 report.insert(ReliabilityState::Stored.to_string(), 222);
626 report.insert(ReliabilityState::Retrieved.to_string(), 333);
627 report.insert(trns.clone(), 444);
628 report.insert("biginvalid".to_string(), -1);
629
630 let generated = gen_report(report).unwrap();
631 // We don't really care if the `Created` or `HELP` lines are included
632 assert!(generated.contains(&format!("# TYPE {METRIC_NAME}")));
633 // sample the first and last values.
634 assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{recv}\"}} 111")));
635 assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{trns}\"}} 444")));
636 assert!(!generated.contains(&format!("{METRIC_NAME}{{state=\"biginvalid\"}} -1")));
637 }
638
639 #[test]
640 fn state_ser() {
641 assert_eq!(
642 ReliabilityState::from_str("delivered").unwrap(),
643 ReliabilityState::Delivered
644 );
645 assert_eq!(
646 ReliabilityState::from_str("int_accepted").unwrap(),
647 ReliabilityState::IntAccepted
648 );
649 assert_eq!(
650 serde_json::from_str::<ReliabilityState>(r#""int_accepted""#).unwrap(),
651 ReliabilityState::IntAccepted
652 );
653
654 assert_eq!(ReliabilityState::IntAccepted.to_string(), "int_accepted");
655 assert_eq!(
656 serde_json::to_string(&ReliabilityState::IntAccepted).unwrap(),
657 r#""int_accepted""#
658 );
659 }
660
661 #[actix_rt::test]
662 async fn test_push_reliability_report() -> Result<()> {
663 let mut db = crate::db::mock::MockDbClient::new();
664
665 // Build the state.
666 let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
667 // Remember, we shouldn't just arbitrarily create mid-state values, so
668 // let's start from the beginning
669 let new = ReliabilityState::Received;
670 let old = None;
671 let expr = 1;
672
673 let exp_key = ExpiryKey {
674 id: test_id.clone(),
675 state: new,
676 }
677 .to_string();
678
679 let state_id = format!("state.{test_id}");
680
681 let mut pipeline = redis::Pipeline::new();
682 // We're adding an element so we have something to record.
683 pipeline
684 .cmd("MULTI")
685 // No old state, so no decrement.
686 // "received" is not terminal
687 .cmd("ZADD")
688 .arg(EXPIRY)
689 .arg(expr)
690 .arg(&exp_key)
691 // adjust the counts
692 .cmd("HINCRBY")
693 .arg(COUNTS)
694 .arg(new.to_string())
695 .arg(1)
696 // Run the transaction
697 .cmd("EXEC");
698
699 let mut conn = MockRedisConnection::new(vec![
700 MockCmd::new(
701 redis::cmd("SET")
702 .arg(&state_id)
703 .arg(new.to_string())
704 .arg("NX")
705 .arg("EX")
706 .arg(expr),
707 Ok(redis::Value::Okay),
708 ),
709 MockCmd::new(
710 redis::cmd("WATCH").arg(&state_id).arg(EXPIRY),
711 Ok(redis::Value::Okay),
712 ),
713 MockCmd::new(
714 pipeline,
715 Ok(redis::Value::Array(vec![
716 redis::Value::Okay, // MULTI
717 redis::Value::SimpleString("QUEUED".to_owned()), // ZADD
718 redis::Value::SimpleString("QUEUED".to_owned()), // HINCRBY
719 redis::Value::SimpleString("QUEUED".to_owned()), // SET
720 // Return the transaction results.
721 redis::Value::Array(vec![
722 redis::Value::Int(1), // 0 -> 1
723 redis::Value::Int(1),
724 // there's no prior value, so this will return Nil
725 redis::Value::Nil,
726 ]),
727 ])),
728 ),
729 MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
730 ]);
731
732 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
733
734 let int_test_id = test_id.clone();
735 db.expect_log_report()
736 .times(1)
737 .withf(move |id, state| id == int_test_id && state == &new)
738 .return_once(|_, _| Ok(()));
739 // test the main report function (note, this does not test redis)
740 let db_box = Box::new(Arc::new(db));
741 let pr = PushReliability::new(
742 &None,
743 db_box.clone(),
744 &metrics,
745 crate::redis_util::MAX_TRANSACTION_LOOP,
746 )
747 .unwrap();
748 // `.record()` uses a pool, so we can't pass the moc connection directly.
749 // Instead, we're going to essentially recreate what `.record()` does calling
750 // the `.internal_record()` function, followed by the database `.record()`.
751 // This emulates
752 // ```
753 // pr.record(&Some(test_id.clone()), new, &None, Some(expr))
754 // .await?;
755 // ```
756
757 // and mock the redis call.
758 pr.internal_record(&mut conn, &old, new, Some(expr), &test_id)
759 .await?;
760
761 db_box
762 .log_report(&test_id, new)
763 .await
764 .inspect_err(|e| {
765 warn!("πβ οΈ Unable to record reliability state log: {:?}", e);
766 })
767 .map_err(|e| ApcErrorKind::GeneralError(e.to_string()))?;
768
769 Ok(())
770 }
771
772 #[actix_rt::test]
773 async fn test_push_reliability_record() -> Result<()> {
774 let db = crate::db::mock::MockDbClient::new();
775 let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
776 let new = ReliabilityState::Stored;
777 let old = ReliabilityState::Received;
778 let expr = 1;
779
780 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
781 let new_key = ExpiryKey {
782 id: test_id.clone(),
783 state: new,
784 }
785 .to_string();
786 let old_key = ExpiryKey {
787 id: test_id.clone(),
788 state: old,
789 }
790 .to_string();
791
792 let state_key = format!("state.{test_id}");
793
794 let mut mock_pipe = redis::Pipeline::new();
795 mock_pipe
796 .cmd("MULTI")
797 .ignore()
798 // Decrement the old state
799 .cmd("HINCRBY")
800 .arg(COUNTS)
801 .arg(old.to_string())
802 .arg(-1)
803 .ignore()
804 // Replace the combined old state key in the expiry set.
805 .cmd("ZREM")
806 .arg(EXPIRY)
807 .arg(old_key)
808 .ignore()
809 .cmd("ZADD")
810 .arg(EXPIRY)
811 .arg(expr)
812 .arg(new_key)
813 .ignore()
814 .cmd("HINCRBY")
815 .arg(COUNTS)
816 .arg(new.to_string())
817 .arg(1)
818 .cmd("SET")
819 .arg(&state_key)
820 .arg(new.to_string())
821 .arg("XX")
822 .arg("GET")
823 .arg("KEEPTTL")
824 .ignore()
825 .cmd("EXEC")
826 .ignore();
827
828 let mut conn = MockRedisConnection::new(vec![
829 // Create the new state
830 MockCmd::new(
831 redis::cmd("WATCH").arg(&state_key).arg(EXPIRY),
832 Ok(redis::Value::Okay),
833 ),
834 MockCmd::new(
835 redis::cmd("GET").arg(&state_key),
836 Ok(redis::Value::SimpleString(old.to_string())),
837 ),
838 // NOTE: Technically, since we `.ignore()` these, we could just have a
839 // Vec containing just `Okay`. I'm being a bit pedantic here because I know
840 // that this will come back to haunt me if I'm not, and because figuring out
841 // the proper response for this was annoying.
842 MockCmd::new(
843 mock_pipe,
844 Ok(redis::Value::Array(vec![
845 redis::Value::Okay,
846 // Match the number of commands that are being held for processing
847 redis::Value::SimpleString("QUEUED".to_owned()), // multi
848 redis::Value::SimpleString("QUEUED".to_owned()), // hincrby
849 redis::Value::SimpleString("QUEUED".to_owned()), // zrem
850 redis::Value::SimpleString("QUEUED".to_owned()), // zadd
851 redis::Value::SimpleString("QUEUED".to_owned()), // hincrby
852 redis::Value::SimpleString("QUEUED".to_owned()), // set
853 // the exec has been called, return an array containing the results.
854 redis::Value::Array(vec![
855 redis::Value::Okay, //Multi
856 redis::Value::Okay, //hincrby
857 redis::Value::Okay, //zrem
858 redis::Value::Okay, //zadd
859 redis::Value::Okay, //hincr
860 redis::Value::SimpleString(old.to_string()),
861 ]),
862 ])),
863 ),
864 // If the transaction fails, this should return a redis::Value::Nil
865 MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
866 ]);
867
868 // test the main report function (note, this does not test redis)
869 let pr = PushReliability::new(
870 &None,
871 Box::new(Arc::new(db)),
872 &metrics,
873 crate::redis_util::MAX_TRANSACTION_LOOP,
874 )
875 .unwrap();
876 let _ = pr
877 .internal_record(&mut conn, &Some(old), new, Some(expr), &test_id)
878 .await;
879
880 Ok(())
881 }
882
883 #[actix_rt::test]
884 async fn test_push_reliability_full() -> Result<()> {
885 let db = crate::db::mock::MockDbClient::new();
886 let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
887 let new = ReliabilityState::Received;
888 let stored = ReliabilityState::Stored;
889 let expr = 1;
890
891 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
892 let new_key = ExpiryKey {
893 id: test_id.clone(),
894 state: new,
895 }
896 .to_string();
897 let stored_key = ExpiryKey {
898 id: test_id.clone(),
899 state: stored,
900 }
901 .to_string();
902
903 let state_key = format!("state.{test_id}");
904
905 let mut mock_pipe = redis::Pipeline::new();
906 mock_pipe
907 .cmd("MULTI")
908 .ignore()
909 // Decrement the old state count
910 .cmd("HINCRBY")
911 .arg(COUNTS)
912 .arg(stored.to_string())
913 .arg(-1)
914 .ignore()
915 // Replace the expiry key
916 .cmd("ZREM")
917 .arg(EXPIRY)
918 .arg(stored_key.to_string())
919 .ignore()
920 .cmd("ZADD")
921 .arg(EXPIRY)
922 .arg(expr)
923 .arg(new_key)
924 .ignore()
925 // Increment the new state count
926 .cmd("HINCRBY")
927 .arg(COUNTS)
928 .arg(new.to_string())
929 .arg(1)
930 .ignore()
931 // And create the new state transition key (since the message is "live" again.)
932 .cmd("SET")
933 .arg(&state_key)
934 .arg(new.to_string())
935 .arg("XX")
936 .arg("GET")
937 .cmd("EXEC")
938 .ignore();
939
940 let mut conn = MockRedisConnection::new(vec![
941 MockCmd::new(
942 // Create the new state
943 redis::cmd("SET")
944 .arg(format!("state.{test_id}"))
945 .arg(new.to_string())
946 .arg("NX")
947 .arg("EX")
948 .arg(expr),
949 Ok(redis::Value::Okay),
950 ),
951 // increment the count for new
952 MockCmd::new(
953 redis::cmd("HINCRBY")
954 .arg(COUNTS)
955 .arg(new.to_string())
956 .arg(1),
957 Ok(redis::Value::Okay),
958 ),
959 // begin the transaction
960 MockCmd::new(
961 redis::cmd("WATCH").arg(&state_key).arg(EXPIRY),
962 Ok(redis::Value::Okay),
963 ),
964 // NOTE: Technically, since we `.ignore()` these, we could just have a
965 // Vec containing just `Okay`. I'm being a bit pedantic here because I know
966 // that this will come back to haunt me if I'm not, and because figuring out
967 // the proper response for this was annoying.
968 MockCmd::new(
969 mock_pipe,
970 Ok(redis::Value::Array(vec![
971 redis::Value::Okay,
972 // Match the number of commands that are being held for processing
973 redis::Value::SimpleString("QUEUED".to_owned()),
974 redis::Value::SimpleString("QUEUED".to_owned()),
975 redis::Value::SimpleString("QUEUED".to_owned()),
976 redis::Value::SimpleString("QUEUED".to_owned()),
977 // the exec has been called, return an array containing the results.
978 redis::Value::Array(vec![
979 redis::Value::Okay,
980 redis::Value::Okay,
981 redis::Value::Okay,
982 redis::Value::Okay,
983 ]),
984 ])),
985 ),
986 // If the transaction fails, this should return a redis::Value::Nil
987 MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
988 ]);
989
990 // test the main report function (note, this does not test redis)
991 let pr = PushReliability::new(
992 &None,
993 Box::new(Arc::new(db)),
994 &metrics,
995 crate::redis_util::MAX_TRANSACTION_LOOP,
996 )
997 .unwrap();
998 let _ = pr
999 .internal_record(&mut conn, &Some(stored), new, Some(expr), &test_id)
1000 .await;
1001
1002 Ok(())
1003 }
1004
1005 #[actix_rt::test]
1006 async fn test_push_reliability_gc() -> Result<()> {
1007 let db = crate::db::mock::MockDbClient::new();
1008 let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
1009 let new = ReliabilityState::Accepted;
1010 let key = format!("{}#{}", &test_id, &new);
1011 let expr = 1;
1012 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1013
1014 let response: redis::Value = redis::Value::Array(vec![redis::Value::SimpleString(
1015 ExpiryKey {
1016 id: test_id.clone(),
1017 state: new,
1018 }
1019 .to_string(),
1020 )]);
1021
1022 // Construct the Pipeline.
1023 // A redis "pipeline" is a set of instructions that are executed in order. These
1024 // are not truly atomic, as other actions can interrupt a pipeline, but they
1025 // are guaranteed to happen in sequence. Transactions essentially check that the
1026 // WATCH key is not altered before the pipeline is executed.
1027 let mut mock_pipe = redis::Pipeline::new();
1028 mock_pipe
1029 .cmd("MULTI")
1030 .ignore()
1031 // Adjust the state counts.
1032 .cmd("HINCRBY")
1033 .arg(COUNTS)
1034 .arg(new.to_string())
1035 .arg(-1)
1036 .ignore()
1037 .cmd("HINCRBY")
1038 .arg(COUNTS)
1039 .arg(ReliabilityState::Expired.to_string())
1040 .arg(1)
1041 // Replace the state key in the expiry set.
1042 .ignore()
1043 .cmd("ZREM")
1044 .arg(EXPIRY)
1045 .arg(key)
1046 .ignore()
1047 .cmd("EXEC")
1048 .ignore();
1049
1050 // This mocks the "conn"ection, so all commands sent to the connection need
1051 // to be separated out. This includes the transaction "WATCH" and "UNWATCH".
1052 // The "meat" of the functions are handled via a Pipeline, which conjoins the
1053 // commands into one group (see above). Pipelines return an array of
1054 // results, one entry for each pipeline command.
1055 // In other words, the `internal_gc` commands are probably in the pipeline,
1056 // all the others are probably in the conn.
1057 let mut conn = MockRedisConnection::new(vec![
1058 MockCmd::new(redis::cmd("WATCH").arg(EXPIRY), Ok(redis::Value::Okay)),
1059 MockCmd::new(
1060 redis::cmd("ZRANGEBYSCORE").arg(EXPIRY).arg(0).arg(expr),
1061 Ok(response),
1062 ),
1063 // NOTE: Technically, since we `.ignore()` these, we could just have a
1064 // Vec containing just `Okay`. I'm being a bit pedantic here because I know
1065 // that this will come back to haunt me if I'm not, and because figuring out
1066 // the proper response for this was annoying.
1067 MockCmd::new(
1068 mock_pipe,
1069 Ok(redis::Value::Array(vec![
1070 redis::Value::Okay,
1071 // Match the number of commands that are being held for processing
1072 redis::Value::SimpleString("QUEUED".to_owned()),
1073 redis::Value::SimpleString("QUEUED".to_owned()),
1074 redis::Value::SimpleString("QUEUED".to_owned()),
1075 // the exec has been called, return an array containing the results.
1076 redis::Value::Array(vec![
1077 redis::Value::Okay,
1078 redis::Value::Okay,
1079 redis::Value::Okay,
1080 ]),
1081 ])),
1082 ),
1083 // If the transaction fails, this should return a redis::Value::Nil
1084 MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
1085 ]);
1086
1087 // test the main report function (note, this does not test redis)
1088 let pr = PushReliability::new(
1089 &None,
1090 Box::new(Arc::new(db)),
1091 &metrics,
1092 crate::redis_util::MAX_TRANSACTION_LOOP,
1093 )
1094 .unwrap();
1095 // functionally a no-op, but it does exercise lines.
1096 pr.gc().await?;
1097
1098 // and mock the redis call.
1099 pr.internal_gc(&mut conn, 1).await?;
1100
1101 Ok(())
1102 }
1103}