autopush_common/reliability.rs
1/// Push Reliability Recorder
2///
3/// This allows us to track messages from select, known parties (currently, just
4/// mozilla generated and consumed) so that we can identify potential trouble spots
5/// and where messages expire early. Message expiration can lead to message loss
6use std::collections::HashMap;
7use std::str::FromStr;
8use std::sync::Arc;
9
10use actix_web::HttpResponse;
11use cadence::StatsdClient;
12use chrono::TimeDelta;
13use deadpool_redis::Config;
14
15use prometheus_client::{
16 encoding::text::encode, metrics::family::Family, metrics::gauge::Gauge, registry::Registry,
17};
18use redis::{aio::ConnectionLike, AsyncCommands};
19use redis::{Pipeline, Value};
20
21use crate::db::client::DbClient;
22use crate::errors::{ApcError, ApcErrorKind, Result};
23use crate::metric_name::MetricName;
24use crate::metrics::StatsdClientExt;
25use crate::util::timing::sec_since_epoch;
26
27// Redis Keys
28pub const COUNTS: &str = "state_counts";
29pub const EXPIRY: &str = "expiry";
30
31const CONNECTION_EXPIRATION: TimeDelta = TimeDelta::seconds(10);
32// Minimum expiration period of 1 second.
33// This was set to `0`, but there was some confusion whether that would not set an
34// expiration time for a record or would set a record not to expire.
35const MIN_EXPIRATION: u64 = 1;
36
37/// The various states that a message may transit on the way from reception to delivery.
38// Note: "Message" in this context refers to the Subscription Update.
39// TODO: Differentiate between "transmitted via webpush" and "transmitted via bridge"?
40#[derive(
41 Debug,
42 Clone,
43 Copy,
44 PartialEq,
45 Eq,
46 serde::Deserialize,
47 serde::Serialize,
48 strum::Display,
49 strum::EnumString,
50)]
51#[serde(rename_all = "snake_case")]
52#[strum(serialize_all = "snake_case")]
53pub enum ReliabilityState {
54 Received, // Message was received by the Push Server
55 Stored, // Message was stored because it could not be delivered immediately
56 Retrieved, // Message was taken from storage for delivery
57 IntTransmitted, // Message was handed off between autoendpoint and autoconnect
58 IntAccepted, // Message was accepted by autoconnect from autopendpoint
59 BridgeTransmitted, // Message was handed off to a mobile bridge for eventual delivery
60 Transmitted, // Message was handed off for delivery to the UA
61 Accepted, // Message was accepted for delivery by the UA
62 Delivered, // Message was provided to the WebApp recipient by the UA
63 DecryptionError, // Message was provided to the UA and it reported a decryption error
64 NotDelivered, // Message was provided to the UA and it reported a not delivered error
65 Expired, // Message expired naturally (e.g. TTL=0)
66 Errored, // Message resulted in an Error state and failed to be delivered.
67}
68
69impl ReliabilityState {
70 /// Has the message reached a state where no further transitions should be possible?
71 pub fn is_terminal(self) -> bool {
72 // NOTE: this list should match the daily snapshot captured by `reliability_cron.py`
73 // which will trim these counts after the max message TTL has expired.
74 matches!(
75 self,
76 ReliabilityState::DecryptionError
77 | ReliabilityState::BridgeTransmitted
78 | ReliabilityState::Delivered
79 | ReliabilityState::Errored
80 | ReliabilityState::Expired
81 | ReliabilityState::NotDelivered
82 )
83 }
84}
85
86#[derive(Clone)]
87pub struct PushReliability {
88 pool: Option<deadpool_redis::Pool>,
89 db: Box<dyn DbClient>,
90 metrics: Arc<StatsdClient>,
91 retries: usize,
92}
93
94// Define a struct to hold the expiry key, since it's easy to flub the order.
95pub struct ExpiryKey {
96 pub id: String,
97 pub state: ReliabilityState,
98}
99
100impl std::fmt::Display for ExpiryKey {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(f, "{}#{}", self.id, self.state)
103 }
104}
105
106impl TryFrom<String> for ExpiryKey {
107 type Error = ApcError;
108 fn try_from(value: String) -> Result<Self> {
109 let (id, state) = value.split_once('#').ok_or_else(|| {
110 ApcErrorKind::GeneralError("ExpiryKey must be in the format 'id#state'".to_owned())
111 })?;
112 let state: ReliabilityState = ReliabilityState::from_str(state).map_err(|_| {
113 ApcErrorKind::GeneralError(
114 "Invalid state in ExpiryKey, must be a valid ReliabilityState".to_owned(),
115 )
116 })?;
117 Ok(Self {
118 id: id.to_owned(),
119 state,
120 })
121 }
122}
123
124impl PushReliability {
125 // Do the magic to make a report instance, whatever that will be.
126 pub fn new(
127 reliability_dsn: &Option<String>,
128 db: Box<dyn DbClient>,
129 metrics: &Arc<StatsdClient>,
130 retries: usize,
131 ) -> Result<Self> {
132 let Some(reliability_dsn) = reliability_dsn else {
133 debug!("π No reliability DSN declared.");
134 return Ok(Self {
135 pool: None,
136 db: db.clone(),
137 metrics: metrics.clone(),
138 retries,
139 });
140 };
141
142 let config: deadpool_redis::Config = Config::from_url(reliability_dsn);
143 let pool = Some(
144 config
145 .builder()
146 .map_err(|e| {
147 ApcErrorKind::GeneralError(format!("Could not config reliability pool {e:?}"))
148 })?
149 .create_timeout(Some(CONNECTION_EXPIRATION.to_std().unwrap()))
150 .runtime(deadpool::Runtime::Tokio1)
151 .build()
152 .map_err(|e| {
153 ApcErrorKind::GeneralError(format!("Could not build reliability pool {e:?}"))
154 })?,
155 );
156
157 Ok(Self {
158 pool,
159 db: db.clone(),
160 metrics: metrics.clone(),
161 retries,
162 })
163 }
164
165 // Record the message state change to storage.
166 pub async fn record(
167 &self,
168 reliability_id: &Option<String>,
169 new: ReliabilityState,
170 old: &Option<ReliabilityState>,
171 expr: Option<u64>,
172 ) -> Result<Option<ReliabilityState>> {
173 let Some(id) = reliability_id else {
174 return Ok(None);
175 };
176 if let Some(pool) = &self.pool {
177 debug!(
178 "π {} from {} to {}",
179 id,
180 old.map(|v| v.to_string())
181 .unwrap_or_else(|| "None".to_owned()),
182 new
183 );
184 match pool.get().await {
185 Ok(mut conn) => {
186 self.internal_record(&mut conn, old, new, expr, id)
187 .await
188 .map_err(|e| {
189 warn!("πβ οΈ Unable to record reliability state: {:?}", e);
190 ApcErrorKind::GeneralError(
191 "Could not record reliability state".to_owned(),
192 )
193 })?;
194 }
195 Err(e) => warn!("πβ οΈ Unable to get reliability state pool, {:?}", e),
196 };
197 };
198 // Errors are not fatal, and should not impact message flow, but
199 // we should record them somewhere.
200 let _ = self.db.log_report(id, new).await.inspect_err(|e| {
201 warn!("πβ οΈ Unable to record reliability state log: {:?}", e);
202 });
203 Ok(Some(new))
204 }
205
206 /// Record the state change in the reliability datastore.
207 /// Because of a strange duplication error, we're using three "tables".
208 /// The COUNTS table contains the count of messages in each state. It's provided as a quick lookup
209 /// for the dashboard query.
210 /// The EXPIRY table contains the expiration time for messages. Those are cleaned up by the
211 /// `gc` function.
212 /// Finally, there's the "table" that is `state.{id}`. This contains the current state of each
213 /// message. Ideally, this would use something like `HSETEX`, but that is not available until
214 /// Redis 8.0, so for now, we use the `SET` function, which provides an expiration time.
215 /// BE SURE TO SET AN EXPIRATION TIME FOR EACH STATE MESSAGE!
216 // TODO: Do we need to keep the state in the message? Probably good for sanity reasons.
217 pub(crate) async fn internal_record<C: ConnectionLike + AsyncCommands>(
218 &self,
219 conn: &mut C,
220 old: &Option<ReliabilityState>,
221 new: ReliabilityState,
222 expr: Option<u64>,
223 id: &str,
224 ) -> Result<()> {
225 trace!(
226 "π internal record: {} from {} to {}",
227 id,
228 old.map(|v| v.to_string())
229 .unwrap_or_else(|| "None".to_owned()),
230 new
231 );
232
233 let state_key = format!("state.{id}");
234 trace!("π state key: {}", &state_key);
235 /*
236 ## Reliability lock and adjustment
237
238 This transaction creates a lock on the individual messages "state_key" (which contains the current messages state)
239 and the entire Expiration table. If either of those values are changed while the transaction is in progress, then
240 the transaction will fail, return a `Redis::Nil`, and the transaction will retry up to `self.retries` (Note:
241 none of the operations in the transaction will have taken place yet.)
242
243 We want to lock on the `state_key` because that shows the given state of the message.
244 We want to lock on the `expiry` table because the message may have expired, been handled by the `gc()` function and
245 may have already been adjusted. (Remember, the `expiry` table holds timestamp markers for when a message will be
246 expiring.)
247
248 The operations in the Pipeline are:
249
250 1. If there is an `old` state, decrement the old state count and remove the old marker from the `expiry` table.
251 2. If the `new` state is not a terminal state (e.g. it is not a final disposition for a message), then create a new
252 entry for `expiry`
253 3. Increment the `new` state count
254 4. Modify the `state.{ID}` value (if it exists) to indicate the newest known state for the message, returning the
255 prior value. Checking that this value is not Nil is an additional sanity check that the value was not removed
256 or altered by a different process. (We are already imposing a transaction lock which should prevent this, but
257 additional paranoia can sometimes be good.)
258
259 Upon execution of the Pipeline, we get back a list of results. This should look similar to a `Queued` entry for
260 every function that we've performed, followed by an inline list of the results of those functions. (e.g.
261 for a transition between `ReliabilityState::Stored` to `ReliabilityState::Retrieved`, which are neither terminal
262 states), we should see something like: `["Queued","Queued","Queued",["OK", "OK", "stored"]]`
263
264 If any of the locks failed, we should get back a `Nil`, in which case, we would want to try this operation again.
265 In addition, the sanity `Get` may also return a `Nil` which would also indicate that there was a problem. There's
266 some debate whether or not to retry the operation if that's the case (since it would imply that the lock failed
267 for some unexpected reason), however for now, we just report a soft `error!()`.
268
269 */
270 // Create the initial state key.
271 // Do not place this inside of the transaction. We monitor the state key and the transaction will
272 // fail because the value will change before the transaction completes. Yes, really.
273 if new == ReliabilityState::Received && old.is_none() {
274 trace!(
275 "π Creating new record {state_key} ex {:?}",
276 expr.unwrap_or(MIN_EXPIRATION)
277 );
278 // we can't perform this in a transaction because we can only increment if the set succeeds,
279 // and values aren't returned when creating values in transactions. In order to do this
280 // from inside the transaction, you would need to create a function, and that feels a bit
281 // too heavy for this.
282 // Create the new `state.{id}` key if it does not exist, and set the expiration.
283 let options = redis::SetOptions::default()
284 .with_expiration(redis::SetExpiry::EX(expr.unwrap_or(MIN_EXPIRATION)))
285 .conditional_set(redis::ExistenceCheck::NX);
286 trace!("π β SET {state_key} NX EX {:?}", new);
287 let result = conn
288 .set_options::<_, _, Value>(&state_key, new.to_string(), options)
289 .await
290 .map_err(|e| {
291 //dbg!(&e);
292 warn!("πβ οΈ Could not create state key: {:?}", e);
293 ApcErrorKind::GeneralError("Could not create the state key".to_owned())
294 })?;
295 if result != redis::Value::Okay {
296 // Redis returned a `Nil`, indicating that there was some error. The only thing that should cause that
297 // would be if the `old` state was reset to `None` and we thought we needed to create a new state.
298 // Since the message carries it's prior state, it shouldn't be set to `None` unless there's something
299 // strange going on.
300 // TODO: It's worth noting that when restarting autoendpoint, we get a large number of these in the logs.
301 // Need to figure out the reason for that.
302 // The `result` is always `nil` so that's not helpful.
303 error!("πβ οΈ Tried to recreate state_key {state_key}: {old:?} => {new:?}",);
304 return Err(
305 ApcErrorKind::GeneralError("Tried to recreate state_key".to_string()).into(),
306 );
307 }
308 } else {
309 trace!("π Checking {:?}", &old);
310 // safety check (yes, there's still a slight chance of a race, but it's small)
311 if let Some(old) = old {
312 let check_state: String = conn.get(&state_key).await?;
313 trace!("π Checking state for {}: {:?}", id, &check_state);
314 if check_state != old.to_string() {
315 trace!(
316 "π Attempting to update state for {} from {} to {}, but current state is different: {:?}",
317 id, old, new, check_state
318 );
319 return Err(ApcErrorKind::GeneralError(
320 "State mismatch during reliability record update".to_owned(),
321 )
322 .into());
323 }
324 };
325 }
326
327 crate::redis_util::transaction(
328 conn,
329 &[&state_key, &EXPIRY.to_owned()],
330 self.retries,
331 || ApcErrorKind::GeneralError("Exceeded reliability record retry attempts".to_owned()),
332 async |conn, pipe: &mut Pipeline| {
333 // The first state is special, since we need to create the `state_key`.
334 // remove the old state from the expiry set, if it exists.
335 // There should only be one message at a given state in the `expiry` table.
336 // Since we only use that table to track messages that may expire. (We
337 // decrement "expired" messages in the `gc` function, so having messages
338 // in multiple states may decrement counts incorrectly.))
339 if let Some(old) = old {
340 trace!("π β {old} - {id}");
341 trace!("π πͺβ HINCRBY {COUNTS} 1");
342 pipe.hincr(COUNTS, old.to_string(), -1);
343 let key = ExpiryKey {
344 id: id.to_string(),
345 state: old.to_owned(),
346 }
347 .to_string();
348 trace!("π internal remove old state: {:?}", key);
349 trace!("π πͺβ ZREM {EXPIRY} {key}");
350 pipe.zrem(EXPIRY, &key);
351 }
352 if !new.is_terminal() {
353 // Write the expiration only if the state is non-terminal. Otherwise we run the risk of
354 // messages reporting a false "expired" state even if they were "successful".
355 let key = ExpiryKey {
356 id: id.to_string(),
357 state: new.to_owned(),
358 }
359 .to_string();
360 trace!("π πͺβ ZADD {EXPIRY} {:?} {key}", expr.unwrap_or_default());
361 pipe.zadd(EXPIRY, &key, expr.unwrap_or_default());
362 trace!("π internal record result: {:?}", key);
363 }
364 trace!("π β {new} - {id}");
365 // Bump up the new state count, and set the state key's state if it still exists.
366 trace!("π πͺβ HINCRBY {COUNTS} {new} 1");
367 pipe.hincr(COUNTS, new.to_string(), 1);
368 if new != ReliabilityState::Received {
369 let options = redis::SetOptions::default()
370 .with_expiration(redis::SetExpiry::KEEPTTL)
371 .conditional_set(redis::ExistenceCheck::XX)
372 .get(true);
373 trace!("π πͺβ SET {state_key} {new} XX KEEPTTL");
374 pipe.set_options(&state_key, new.to_string(), options);
375 }
376 // `exec_query` returns `RedisResult<()>`.
377 // `query_async` returns `RedisResult<Option<redis::Value>, RedisError>`.
378 // We really don't care about the returned result here, but transaction
379 // retries if we return Ok(None), so we run the exec and return
380 // a nonce `Some` value.
381 // The turbo-fish is a fallback for edition 2024
382 let result = pipe.query_async::<redis::Value>(conn).await?;
383 trace!("π πͺ {id} - {:?}", &result);
384 // The last element returned from the command is the result of the pipeline.
385 // If Redis encounters an error, it will return a `nil` as well. We handle both
386 // the same (retry), so we can normalize errors as `nil`.
387 // The last of which should be the result of the piped command set.
388 // This should return `nil` if there is any error, in which case we record
389 // a soft error. (On success, it will return the result of the last command
390 // in the pipe, which may vary due to the current state).
391 // This could also be strung together as a cascade of functions, but it's broken
392 // out to discrete steps for readability.
393 /* On prod, we get a large number of these errors, which I think might be clogging
394 up the redis connections, causing servers to report as degraded.
395 */
396 if result == redis::Value::Nil {
397 warn!("πβ πͺ {id} - Pipe failed, skipping retry.");
398 // temporarily just let things fail to handle autoendpoint degradation.
399 // return Ok(None);
400 return Ok(Some(redis::Value::Okay));
401 }
402 if let Some(operations) = result.as_sequence() {
403 // We have responses, the first items report the state of the commands,
404 // the final line is a list of command results.
405 if let Some(result_values) = operations.last() {
406 if let Some(results) = result_values.as_sequence() {
407 // The last command should contain the prior state. If it returned `Nil`
408 // for some, unexpected reason, note the error.
409 if Some(&redis::Value::Nil) == results.last() {
410 error!("ππ¨ WARNING: Lock Issue for {id}")
411 // There is some debate about whether or not to rerun
412 // the transaction if this state is reached.
413 // Rerunning would cause the counts to be impacted, but
414 // might address any other issues that caused the
415 // `Nil` to be returned.
416 // For now, let's just log the error.
417 }
418 }
419 }
420 Ok(Some(redis::Value::Okay))
421 } else {
422 // a Nil will rerun the transaction.
423 Ok(None)
424 }
425 },
426 )
427 .await
428 .inspect_err(|e| {
429 //dbg!(&e);
430 warn!("πβ οΈError occurred during transaction: {:?}", e);
431 })?;
432 Ok(())
433 }
434
435 /// Perform a garbage collection cycle on a reliability object.
436 pub async fn gc(&self) -> Result<()> {
437 if let Some(pool) = &self.pool {
438 if let Ok(mut conn) = pool.get().await {
439 debug!("π performing pre-report garbage collection");
440 return self.internal_gc(&mut conn, sec_since_epoch()).await;
441 }
442 }
443 Ok(())
444 }
445
446 // Perform the `garbage collection` cycle. This will scan the currently known timestamp
447 // indexed entries in redis looking for "expired" data, and then rectify the counts to
448 // indicate the final states. This is because many of the storage systems do not provide
449 // indicators when data reaches a TTL.
450
451 // A few notes about redis:
452 // "pipeline" essentially stitches commands together. Each command executes in turn, but the data store
453 // remains open for other operations to occur.
454 // `atomic()` wraps things in a transaction, essentially locking the data store while the command executes.
455 // Sadly, there's no way to create a true transaction where you read and write in a single operation, so we
456 // have to presume some "slop" here.
457 pub(crate) async fn internal_gc<C: ConnectionLike + AsyncCommands>(
458 &self,
459 conn: &mut C,
460 expr: u64,
461 ) -> Result<()> {
462 let result: redis::Value = crate::redis_util::transaction(
463 conn,
464 &[EXPIRY],
465 self.retries,
466 || ApcErrorKind::GeneralError("Exceeded gc retry attempts".to_owned()),
467 async |conn, pipe| {
468 // First, get the list of values that are to be purged.
469 let purged: Vec<String> = conn.zrangebyscore(EXPIRY, 0, expr as isize).await?;
470 // insta-bail if there's nothing to do.
471 if purged.is_empty() {
472 return Ok(Some(redis::Value::Nil));
473 }
474
475 // Now purge each of the values by resetting the counts and removing the item from
476 // the purge set.
477 for key in purged {
478 let Ok(expiry_key) = ExpiryKey::try_from(key.clone()) else {
479 let err = "Invalid key stored in Reliability datastore";
480 error!("ππ₯ {} [{:?}]", &err, &key);
481 return Err(ApcErrorKind::GeneralError(err.to_owned()));
482 };
483 // Adjust the COUNTS and then remove the record from the list of expired rows.
484 trace!("π βπͺ¦ {} - {key}", expiry_key.state.to_string());
485 pipe.hincr(COUNTS, expiry_key.state.to_string(), -1);
486 pipe.hincr(COUNTS, ReliabilityState::Expired.to_string(), 1);
487 pipe.zrem(EXPIRY, key);
488 }
489 Ok(pipe.query_async(conn).await?)
490 },
491 )
492 .await?;
493 self.metrics
494 .incr_with_tags(MetricName::ReliabilityGc)
495 .with_tag(
496 "status",
497 if result == redis::Value::Nil {
498 "error"
499 } else {
500 "success"
501 },
502 )
503 .send();
504 Ok(())
505 }
506
507 // Return a snapshot of milestone states
508 // This will probably not be called directly, but useful for debugging.
509 pub async fn report(&self) -> Result<HashMap<String, i32>> {
510 if let Some(pool) = &self.pool {
511 if let Ok(mut conn) = pool.get().await {
512 return Ok(conn.hgetall(COUNTS).await.map_err(|e| {
513 ApcErrorKind::GeneralError(format!("Could not read report {e:?}"))
514 })?);
515 }
516 }
517 Ok(HashMap::new())
518 }
519
520 pub async fn health_check<'a>(&self) -> Result<&'a str> {
521 if let Some(pool) = &self.pool {
522 let mut conn = pool.get().await.map_err(|e| {
523 ApcErrorKind::GeneralError(format!(
524 "Could not connect to reliability datastore: {e:?}"
525 ))
526 })?;
527 // Add a type here, even though we're tossing the value, in order to prevent the `FromRedisValue` warning.
528 conn.ping::<()>().await.map_err(|e| {
529 ApcErrorKind::GeneralError(format!("Could not ping reliability datastore: {e:?}"))
530 })?;
531 Ok("OK")
532 } else {
533 Ok("OK")
534 }
535 }
536}
537
538const METRIC_NAME: &str = "autopush_reliability";
539
540/// Generate a Prometheus compatible report. Output should follow the
541/// [instrumentation](https://prometheus.io/docs/practices/instrumentation/) guidelines.
542///
543/// In short form, the file should be a plain text output, with each metric on it's own line
544/// using the following format:
545/// ```text
546/// # HELP metric_name Optional description of this metric
547/// # TYPE metric_name {required type (gauge|count|histogram|summary)}
548/// metric_name{label="label1"} value
549/// metric_name{label="label2"} value
550/// ```
551/// An example which would return counts of messages in given states at the current
552/// time would be:
553/// ```text
554/// # HELP autopush_reliability Counts for messages in given states
555/// # TYPE metric_name gauge
556/// autopush_reliability{state="recv"} 123
557/// autopush_reliability{state="stor"} 123
558/// # EOF
559/// ```
560/// Note that time is not required. A timestamp has been added to the output, but is
561/// ignored by Prometheus, and is only provided to ensure that there is no intermediate
562/// caching occurring.
563///
564/// The report endpoint currently is only provided by `autoendpoint`, even though the report
565/// is inclusive for all push milestones. This is done for simplicity, both for serving the
566/// data and for collection and management of the metrics.
567///
568pub fn gen_report(values: HashMap<String, i32>) -> Result<String> {
569 let mut registry = Registry::default();
570
571 // A "family" is a grouping of metrics.
572 // we specify this as the ("label", "label value") which index to a Gauge.
573 let family = Family::<Vec<(&str, String)>, Gauge>::default();
574 // This creates the top level association of the elements in the family with the metric.
575 registry.register(
576 METRIC_NAME,
577 "Count of messages at given states",
578 family.clone(),
579 );
580 for (milestone, value) in values.into_iter() {
581 // prevent any stray leakage of invalid state data
582 if ReliabilityState::from_str(&milestone).is_err() {
583 trace!("π skipping invalid state {milestone:?}");
584 continue;
585 }
586 // Specify the static "state" label name with the given milestone, and add the
587 // value as the gauge value.
588 family
589 .get_or_create(&vec![("state", milestone)])
590 .set(value.into());
591 }
592
593 // Return the formatted string that Prometheus will eventually read.
594 let mut encoded = String::new();
595 encode(&mut encoded, ®istry).map_err(|e| {
596 ApcErrorKind::GeneralError(format!("Could not generate Reliability report {e:?}"))
597 })?;
598 Ok(encoded)
599}
600
601/// Handle the `/metrics` request by returning a Prometheus compatible report.
602pub async fn report_handler(reliability: &Arc<PushReliability>) -> Result<HttpResponse> {
603 reliability.gc().await?;
604 let report = gen_report(reliability.report().await?)?;
605 Ok(HttpResponse::Ok()
606 .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
607 .body(report))
608}
609
610#[cfg(test)]
611mod tests {
612 use std::collections::HashMap;
613 use std::str::FromStr;
614
615 use super::*;
616 use redis_test::{MockCmd, MockRedisConnection};
617 use uuid::Uuid;
618
619 #[test]
620 fn test_report() {
621 // create a nonce report
622 let mut report: HashMap<String, i32> = HashMap::new();
623 let recv = ReliabilityState::Received.to_string();
624 let trns = ReliabilityState::Transmitted.to_string();
625 report.insert(recv.clone(), 111);
626 report.insert(ReliabilityState::Stored.to_string(), 222);
627 report.insert(ReliabilityState::Retrieved.to_string(), 333);
628 report.insert(trns.clone(), 444);
629 report.insert("biginvalid".to_string(), -1);
630
631 let generated = gen_report(report).unwrap();
632 // We don't really care if the `Created` or `HELP` lines are included
633 assert!(generated.contains(&format!("# TYPE {METRIC_NAME}")));
634 // sample the first and last values.
635 assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{recv}\"}} 111")));
636 assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{trns}\"}} 444")));
637 assert!(!generated.contains(&format!("{METRIC_NAME}{{state=\"biginvalid\"}} -1")));
638 }
639
640 #[test]
641 fn state_ser() {
642 assert_eq!(
643 ReliabilityState::from_str("delivered").unwrap(),
644 ReliabilityState::Delivered
645 );
646 assert_eq!(
647 ReliabilityState::from_str("int_accepted").unwrap(),
648 ReliabilityState::IntAccepted
649 );
650 assert_eq!(
651 serde_json::from_str::<ReliabilityState>(r#""int_accepted""#).unwrap(),
652 ReliabilityState::IntAccepted
653 );
654
655 assert_eq!(ReliabilityState::IntAccepted.to_string(), "int_accepted");
656 assert_eq!(
657 serde_json::to_string(&ReliabilityState::IntAccepted).unwrap(),
658 r#""int_accepted""#
659 );
660 }
661
662 #[actix_rt::test]
663 async fn test_push_reliability_report() -> Result<()> {
664 let mut db = crate::db::mock::MockDbClient::new();
665
666 // Build the state.
667 let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
668 // Remember, we shouldn't just arbitrarily create mid-state values, so
669 // let's start from the beginning
670 let new = ReliabilityState::Received;
671 let old = None;
672 let expr = 1;
673
674 let exp_key = ExpiryKey {
675 id: test_id.clone(),
676 state: new,
677 }
678 .to_string();
679
680 let state_id = format!("state.{test_id}");
681
682 let mut pipeline = redis::Pipeline::new();
683 // We're adding an element so we have something to record.
684 pipeline
685 .cmd("MULTI")
686 // No old state, so no decrement.
687 // "received" is not terminal
688 .cmd("ZADD")
689 .arg(EXPIRY)
690 .arg(expr)
691 .arg(&exp_key)
692 // adjust the counts
693 .cmd("HINCRBY")
694 .arg(COUNTS)
695 .arg(new.to_string())
696 .arg(1)
697 // Run the transaction
698 .cmd("EXEC");
699
700 let mut conn = MockRedisConnection::new(vec![
701 MockCmd::new(
702 redis::cmd("SET")
703 .arg(&state_id)
704 .arg(new.to_string())
705 .arg("NX")
706 .arg("EX")
707 .arg(expr),
708 Ok(redis::Value::Okay),
709 ),
710 MockCmd::new(
711 redis::cmd("WATCH").arg(&state_id).arg(EXPIRY),
712 Ok(redis::Value::Okay),
713 ),
714 MockCmd::new(
715 pipeline,
716 Ok(redis::Value::Array(vec![
717 redis::Value::Okay, // MULTI
718 redis::Value::SimpleString("QUEUED".to_owned()), // ZADD
719 redis::Value::SimpleString("QUEUED".to_owned()), // HINCRBY
720 redis::Value::SimpleString("QUEUED".to_owned()), // SET
721 // Return the transaction results.
722 redis::Value::Array(vec![
723 redis::Value::Int(1), // 0 -> 1
724 redis::Value::Int(1),
725 // there's no prior value, so this will return Nil
726 redis::Value::Nil,
727 ]),
728 ])),
729 ),
730 MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
731 ]);
732
733 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
734
735 let int_test_id = test_id.clone();
736 db.expect_log_report()
737 .times(1)
738 .withf(move |id, state| id == int_test_id && state == &new)
739 .return_once(|_, _| Ok(()));
740 // test the main report function (note, this does not test redis)
741 let db_box = Box::new(Arc::new(db));
742 let pr = PushReliability::new(
743 &None,
744 db_box.clone(),
745 &metrics,
746 crate::redis_util::MAX_TRANSACTION_LOOP,
747 )
748 .unwrap();
749 // `.record()` uses a pool, so we can't pass the moc connection directly.
750 // Instead, we're going to essentially recreate what `.record()` does calling
751 // the `.internal_record()` function, followed by the database `.record()`.
752 // This emulates
753 // ```
754 // pr.record(&Some(test_id.clone()), new, &None, Some(expr))
755 // .await?;
756 // ```
757
758 // and mock the redis call.
759 pr.internal_record(&mut conn, &old, new, Some(expr), &test_id)
760 .await?;
761
762 db_box
763 .log_report(&test_id, new)
764 .await
765 .inspect_err(|e| {
766 warn!("πβ οΈ Unable to record reliability state log: {:?}", e);
767 })
768 .map_err(|e| ApcErrorKind::GeneralError(e.to_string()))?;
769
770 Ok(())
771 }
772
773 #[actix_rt::test]
774 async fn test_push_reliability_record() -> Result<()> {
775 let db = crate::db::mock::MockDbClient::new();
776 let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
777 let new = ReliabilityState::Stored;
778 let old = ReliabilityState::Received;
779 let expr = 1;
780
781 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
782 let new_key = ExpiryKey {
783 id: test_id.clone(),
784 state: new,
785 }
786 .to_string();
787 let old_key = ExpiryKey {
788 id: test_id.clone(),
789 state: old,
790 }
791 .to_string();
792
793 let state_key = format!("state.{test_id}");
794
795 let mut mock_pipe = redis::Pipeline::new();
796 mock_pipe
797 .cmd("MULTI")
798 .ignore()
799 // Decrement the old state
800 .cmd("HINCRBY")
801 .arg(COUNTS)
802 .arg(old.to_string())
803 .arg(-1)
804 .ignore()
805 // Replace the combined old state key in the expiry set.
806 .cmd("ZREM")
807 .arg(EXPIRY)
808 .arg(old_key)
809 .ignore()
810 .cmd("ZADD")
811 .arg(EXPIRY)
812 .arg(expr)
813 .arg(new_key)
814 .ignore()
815 .cmd("HINCRBY")
816 .arg(COUNTS)
817 .arg(new.to_string())
818 .arg(1)
819 .cmd("SET")
820 .arg(&state_key)
821 .arg(new.to_string())
822 .arg("XX")
823 .arg("GET")
824 .arg("KEEPTTL")
825 .ignore()
826 .cmd("EXEC")
827 .ignore();
828
829 let mut conn = MockRedisConnection::new(vec![
830 // Create the new state
831 MockCmd::new(
832 redis::cmd("WATCH").arg(&state_key).arg(EXPIRY),
833 Ok(redis::Value::Okay),
834 ),
835 MockCmd::new(
836 redis::cmd("GET").arg(&state_key),
837 Ok(redis::Value::SimpleString(old.to_string())),
838 ),
839 // NOTE: Technically, since we `.ignore()` these, we could just have a
840 // Vec containing just `Okay`. I'm being a bit pedantic here because I know
841 // that this will come back to haunt me if I'm not, and because figuring out
842 // the proper response for this was annoying.
843 MockCmd::new(
844 mock_pipe,
845 Ok(redis::Value::Array(vec![
846 redis::Value::Okay,
847 // Match the number of commands that are being held for processing
848 redis::Value::SimpleString("QUEUED".to_owned()), // multi
849 redis::Value::SimpleString("QUEUED".to_owned()), // hincrby
850 redis::Value::SimpleString("QUEUED".to_owned()), // zrem
851 redis::Value::SimpleString("QUEUED".to_owned()), // zadd
852 redis::Value::SimpleString("QUEUED".to_owned()), // hincrby
853 redis::Value::SimpleString("QUEUED".to_owned()), // set
854 // the exec has been called, return an array containing the results.
855 redis::Value::Array(vec![
856 redis::Value::Okay, //Multi
857 redis::Value::Okay, //hincrby
858 redis::Value::Okay, //zrem
859 redis::Value::Okay, //zadd
860 redis::Value::Okay, //hincr
861 redis::Value::SimpleString(old.to_string()),
862 ]),
863 ])),
864 ),
865 // If the transaction fails, this should return a redis::Value::Nil
866 MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
867 ]);
868
869 // test the main report function (note, this does not test redis)
870 let pr = PushReliability::new(
871 &None,
872 Box::new(Arc::new(db)),
873 &metrics,
874 crate::redis_util::MAX_TRANSACTION_LOOP,
875 )
876 .unwrap();
877 let _ = pr
878 .internal_record(&mut conn, &Some(old), new, Some(expr), &test_id)
879 .await;
880
881 Ok(())
882 }
883
884 #[actix_rt::test]
885 async fn test_push_reliability_full() -> Result<()> {
886 let db = crate::db::mock::MockDbClient::new();
887 let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
888 let new = ReliabilityState::Received;
889 let stored = ReliabilityState::Stored;
890 let expr = 1;
891
892 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
893 let new_key = ExpiryKey {
894 id: test_id.clone(),
895 state: new,
896 }
897 .to_string();
898 let stored_key = ExpiryKey {
899 id: test_id.clone(),
900 state: stored,
901 }
902 .to_string();
903
904 let state_key = format!("state.{test_id}");
905
906 let mut mock_pipe = redis::Pipeline::new();
907 mock_pipe
908 .cmd("MULTI")
909 .ignore()
910 // Decrement the old state count
911 .cmd("HINCRBY")
912 .arg(COUNTS)
913 .arg(stored.to_string())
914 .arg(-1)
915 .ignore()
916 // Replace the expiry key
917 .cmd("ZREM")
918 .arg(EXPIRY)
919 .arg(stored_key.to_string())
920 .ignore()
921 .cmd("ZADD")
922 .arg(EXPIRY)
923 .arg(expr)
924 .arg(new_key)
925 .ignore()
926 // Increment the new state count
927 .cmd("HINCRBY")
928 .arg(COUNTS)
929 .arg(new.to_string())
930 .arg(1)
931 .ignore()
932 // And create the new state transition key (since the message is "live" again.)
933 .cmd("SET")
934 .arg(&state_key)
935 .arg(new.to_string())
936 .arg("XX")
937 .arg("GET")
938 .cmd("EXEC")
939 .ignore();
940
941 let mut conn = MockRedisConnection::new(vec![
942 MockCmd::new(
943 // Create the new state
944 redis::cmd("SET")
945 .arg(format!("state.{test_id}"))
946 .arg(new.to_string())
947 .arg("NX")
948 .arg("EX")
949 .arg(expr),
950 Ok(redis::Value::Okay),
951 ),
952 // increment the count for new
953 MockCmd::new(
954 redis::cmd("HINCRBY")
955 .arg(COUNTS)
956 .arg(new.to_string())
957 .arg(1),
958 Ok(redis::Value::Okay),
959 ),
960 // begin the transaction
961 MockCmd::new(
962 redis::cmd("WATCH").arg(&state_key).arg(EXPIRY),
963 Ok(redis::Value::Okay),
964 ),
965 // NOTE: Technically, since we `.ignore()` these, we could just have a
966 // Vec containing just `Okay`. I'm being a bit pedantic here because I know
967 // that this will come back to haunt me if I'm not, and because figuring out
968 // the proper response for this was annoying.
969 MockCmd::new(
970 mock_pipe,
971 Ok(redis::Value::Array(vec![
972 redis::Value::Okay,
973 // Match the number of commands that are being held for processing
974 redis::Value::SimpleString("QUEUED".to_owned()),
975 redis::Value::SimpleString("QUEUED".to_owned()),
976 redis::Value::SimpleString("QUEUED".to_owned()),
977 redis::Value::SimpleString("QUEUED".to_owned()),
978 // the exec has been called, return an array containing the results.
979 redis::Value::Array(vec![
980 redis::Value::Okay,
981 redis::Value::Okay,
982 redis::Value::Okay,
983 redis::Value::Okay,
984 ]),
985 ])),
986 ),
987 // If the transaction fails, this should return a redis::Value::Nil
988 MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
989 ]);
990
991 // test the main report function (note, this does not test redis)
992 let pr = PushReliability::new(
993 &None,
994 Box::new(Arc::new(db)),
995 &metrics,
996 crate::redis_util::MAX_TRANSACTION_LOOP,
997 )
998 .unwrap();
999 let _ = pr
1000 .internal_record(&mut conn, &Some(stored), new, Some(expr), &test_id)
1001 .await;
1002
1003 Ok(())
1004 }
1005
1006 #[actix_rt::test]
1007 async fn test_push_reliability_gc() -> Result<()> {
1008 let db = crate::db::mock::MockDbClient::new();
1009 let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
1010 let new = ReliabilityState::Accepted;
1011 let key = format!("{}#{}", &test_id, &new);
1012 let expr = 1;
1013 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1014
1015 let response: redis::Value = redis::Value::Array(vec![redis::Value::SimpleString(
1016 ExpiryKey {
1017 id: test_id.clone(),
1018 state: new,
1019 }
1020 .to_string(),
1021 )]);
1022
1023 // Construct the Pipeline.
1024 // A redis "pipeline" is a set of instructions that are executed in order. These
1025 // are not truly atomic, as other actions can interrupt a pipeline, but they
1026 // are guaranteed to happen in sequence. Transactions essentially check that the
1027 // WATCH key is not altered before the pipeline is executed.
1028 let mut mock_pipe = redis::Pipeline::new();
1029 mock_pipe
1030 .cmd("MULTI")
1031 .ignore()
1032 // Adjust the state counts.
1033 .cmd("HINCRBY")
1034 .arg(COUNTS)
1035 .arg(new.to_string())
1036 .arg(-1)
1037 .ignore()
1038 .cmd("HINCRBY")
1039 .arg(COUNTS)
1040 .arg(ReliabilityState::Expired.to_string())
1041 .arg(1)
1042 // Replace the state key in the expiry set.
1043 .ignore()
1044 .cmd("ZREM")
1045 .arg(EXPIRY)
1046 .arg(key)
1047 .ignore()
1048 .cmd("EXEC")
1049 .ignore();
1050
1051 // This mocks the "conn"ection, so all commands sent to the connection need
1052 // to be separated out. This includes the transaction "WATCH" and "UNWATCH".
1053 // The "meat" of the functions are handled via a Pipeline, which conjoins the
1054 // commands into one group (see above). Pipelines return an array of
1055 // results, one entry for each pipeline command.
1056 // In other words, the `internal_gc` commands are probably in the pipeline,
1057 // all the others are probably in the conn.
1058 let mut conn = MockRedisConnection::new(vec![
1059 MockCmd::new(redis::cmd("WATCH").arg(EXPIRY), Ok(redis::Value::Okay)),
1060 MockCmd::new(
1061 redis::cmd("ZRANGEBYSCORE").arg(EXPIRY).arg(0).arg(expr),
1062 Ok(response),
1063 ),
1064 // NOTE: Technically, since we `.ignore()` these, we could just have a
1065 // Vec containing just `Okay`. I'm being a bit pedantic here because I know
1066 // that this will come back to haunt me if I'm not, and because figuring out
1067 // the proper response for this was annoying.
1068 MockCmd::new(
1069 mock_pipe,
1070 Ok(redis::Value::Array(vec![
1071 redis::Value::Okay,
1072 // Match the number of commands that are being held for processing
1073 redis::Value::SimpleString("QUEUED".to_owned()),
1074 redis::Value::SimpleString("QUEUED".to_owned()),
1075 redis::Value::SimpleString("QUEUED".to_owned()),
1076 // the exec has been called, return an array containing the results.
1077 redis::Value::Array(vec![
1078 redis::Value::Okay,
1079 redis::Value::Okay,
1080 redis::Value::Okay,
1081 ]),
1082 ])),
1083 ),
1084 // If the transaction fails, this should return a redis::Value::Nil
1085 MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
1086 ]);
1087
1088 // test the main report function (note, this does not test redis)
1089 let pr = PushReliability::new(
1090 &None,
1091 Box::new(Arc::new(db)),
1092 &metrics,
1093 crate::redis_util::MAX_TRANSACTION_LOOP,
1094 )
1095 .unwrap();
1096 // functionally a no-op, but it does exercise lines.
1097 pr.gc().await?;
1098
1099 // and mock the redis call.
1100 pr.internal_gc(&mut conn, 1).await?;
1101
1102 Ok(())
1103 }
1104}