autopush_common/
reliability.rs

1/// Push Reliability Recorder
2///
3/// This allows us to track messages from select, known parties (currently, just
4/// mozilla generated and consumed) so that we can identify potential trouble spots
5/// and where messages expire early. Message expiration can lead to message loss
6use std::collections::HashMap;
7use std::str::FromStr;
8use std::sync::Arc;
9
10use actix_web::HttpResponse;
11use cadence::StatsdClient;
12use chrono::TimeDelta;
13use deadpool_redis::Config;
14
15use prometheus_client::{
16    encoding::text::encode, metrics::family::Family, metrics::gauge::Gauge, registry::Registry,
17};
18use redis::{aio::ConnectionLike, AsyncCommands};
19use redis::{Pipeline, Value};
20
21use crate::db::client::DbClient;
22use crate::errors::{ApcError, ApcErrorKind, Result};
23use crate::metric_name::MetricName;
24use crate::metrics::StatsdClientExt;
25use crate::util::timing::sec_since_epoch;
26
27// Redis Keys
28pub const COUNTS: &str = "state_counts";
29pub const EXPIRY: &str = "expiry";
30
31const CONNECTION_EXPIRATION: TimeDelta = TimeDelta::seconds(10);
32// Minimum expiration period of 1 second.
33// This was set to `0`, but there was some confusion whether that would not set an
34// expiration time for a record or would set a record not to expire.
35const MIN_EXPIRATION: u64 = 1;
36
37/// The various states that a message may transit on the way from reception to delivery.
38// Note: "Message" in this context refers to the Subscription Update.
39// TODO: Differentiate between "transmitted via webpush" and "transmitted via bridge"?
40#[derive(
41    Debug,
42    Clone,
43    Copy,
44    PartialEq,
45    Eq,
46    serde::Deserialize,
47    serde::Serialize,
48    strum::Display,
49    strum::EnumString,
50)]
51#[serde(rename_all = "snake_case")]
52#[strum(serialize_all = "snake_case")]
53pub enum ReliabilityState {
54    Received,          // Message was received by the Push Server
55    Stored,            // Message was stored because it could not be delivered immediately
56    Retrieved,         // Message was taken from storage for delivery
57    IntTransmitted,    // Message was handed off between autoendpoint and autoconnect
58    IntAccepted,       // Message was accepted by autoconnect from autopendpoint
59    BridgeTransmitted, // Message was handed off to a mobile bridge for eventual delivery
60    Transmitted,       // Message was handed off for delivery to the UA
61    Accepted,          // Message was accepted for delivery by the UA
62    Delivered,         // Message was provided to the WebApp recipient by the UA
63    DecryptionError,   // Message was provided to the UA and it reported a decryption error
64    NotDelivered,      // Message was provided to the UA and it reported a not delivered error
65    Expired,           // Message expired naturally (e.g. TTL=0)
66    Errored,           // Message resulted in an Error state and failed to be delivered.
67}
68
69impl ReliabilityState {
70    ///  Has the message reached a state where no further transitions should be possible?
71    pub fn is_terminal(self) -> bool {
72        // NOTE: this list should match the daily snapshot captured by `reliability_cron.py`
73        // which will trim these counts after the max message TTL has expired.
74        matches!(
75            self,
76            ReliabilityState::DecryptionError
77                | ReliabilityState::BridgeTransmitted
78                | ReliabilityState::Delivered
79                | ReliabilityState::Errored
80                | ReliabilityState::Expired
81                | ReliabilityState::NotDelivered
82        )
83    }
84}
85
86#[derive(Clone)]
87pub struct PushReliability {
88    pool: Option<deadpool_redis::Pool>,
89    db: Box<dyn DbClient>,
90    metrics: Arc<StatsdClient>,
91    retries: usize,
92}
93
94// Define a struct to hold the expiry key, since it's easy to flub the order.
95pub struct ExpiryKey {
96    pub id: String,
97    pub state: ReliabilityState,
98}
99
100impl std::fmt::Display for ExpiryKey {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(f, "{}#{}", self.id, self.state)
103    }
104}
105
106impl TryFrom<String> for ExpiryKey {
107    type Error = ApcError;
108    fn try_from(value: String) -> Result<Self> {
109        let (id, state) = value.split_once('#').ok_or_else(|| {
110            ApcErrorKind::GeneralError("ExpiryKey must be in the format 'id#state'".to_owned())
111        })?;
112        let state: ReliabilityState = ReliabilityState::from_str(state).map_err(|_| {
113            ApcErrorKind::GeneralError(
114                "Invalid state in ExpiryKey, must be a valid ReliabilityState".to_owned(),
115            )
116        })?;
117        Ok(Self {
118            id: id.to_owned(),
119            state,
120        })
121    }
122}
123
124impl PushReliability {
125    // Do the magic to make a report instance, whatever that will be.
126    pub fn new(
127        reliability_dsn: &Option<String>,
128        db: Box<dyn DbClient>,
129        metrics: &Arc<StatsdClient>,
130        retries: usize,
131    ) -> Result<Self> {
132        let Some(reliability_dsn) = reliability_dsn else {
133            debug!("πŸ” No reliability DSN declared.");
134            return Ok(Self {
135                pool: None,
136                db: db.clone(),
137                metrics: metrics.clone(),
138                retries,
139            });
140        };
141
142        let config: deadpool_redis::Config = Config::from_url(reliability_dsn);
143        let pool = Some(
144            config
145                .builder()
146                .map_err(|e| {
147                    ApcErrorKind::GeneralError(format!("Could not config reliability pool {e:?}"))
148                })?
149                .create_timeout(Some(CONNECTION_EXPIRATION.to_std().unwrap()))
150                .runtime(deadpool::Runtime::Tokio1)
151                .build()
152                .map_err(|e| {
153                    ApcErrorKind::GeneralError(format!("Could not build reliability pool {e:?}"))
154                })?,
155        );
156
157        Ok(Self {
158            pool,
159            db: db.clone(),
160            metrics: metrics.clone(),
161            retries,
162        })
163    }
164
165    // Record the message state change to storage.
166    pub async fn record(
167        &self,
168        reliability_id: &Option<String>,
169        new: ReliabilityState,
170        old: &Option<ReliabilityState>,
171        expr: Option<u64>,
172    ) -> Result<Option<ReliabilityState>> {
173        let Some(id) = reliability_id else {
174            return Ok(None);
175        };
176        if let Some(pool) = &self.pool {
177            debug!(
178                "πŸ” {} from {} to {}",
179                id,
180                old.map(|v| v.to_string())
181                    .unwrap_or_else(|| "None".to_owned()),
182                new
183            );
184            match pool.get().await {
185                Ok(mut conn) => {
186                    self.internal_record(&mut conn, old, new, expr, id)
187                        .await
188                        .map_err(|e| {
189                            warn!("πŸ”βš οΈ Unable to record reliability state: {:?}", e);
190                            ApcErrorKind::GeneralError(
191                                "Could not record reliability state".to_owned(),
192                            )
193                        })?;
194                }
195                Err(e) => warn!("πŸ”βš οΈ Unable to get reliability state pool, {:?}", e),
196            };
197        };
198        // Errors are not fatal, and should not impact message flow, but
199        // we should record them somewhere.
200        let _ = self.db.log_report(id, new).await.inspect_err(|e| {
201            warn!("πŸ”βš οΈ Unable to record reliability state log: {:?}", e);
202        });
203        Ok(Some(new))
204    }
205
206    /// Record the state change in the reliability datastore.
207    /// Because of a strange duplication error, we're using three "tables".
208    /// The COUNTS table contains the count of messages in each state. It's provided as a quick lookup
209    /// for the dashboard query.
210    /// The EXPIRY table contains the expiration time for messages. Those are cleaned up by the
211    /// `gc` function.
212    /// Finally, there's the "table" that is `state.{id}`. This contains the current state of each
213    /// message. Ideally, this would use something like `HSETEX`, but that is not available until
214    /// Redis 8.0, so for now, we use the `SET` function, which provides an expiration time.
215    /// BE SURE TO SET AN EXPIRATION TIME FOR EACH STATE MESSAGE!
216    // TODO: Do we need to keep the state in the message? Probably good for sanity reasons.
217    pub(crate) async fn internal_record<C: ConnectionLike + AsyncCommands>(
218        &self,
219        conn: &mut C,
220        old: &Option<ReliabilityState>,
221        new: ReliabilityState,
222        expr: Option<u64>,
223        id: &str,
224    ) -> Result<()> {
225        trace!(
226            "πŸ” internal record: {} from {} to {}",
227            id,
228            old.map(|v| v.to_string())
229                .unwrap_or_else(|| "None".to_owned()),
230            new
231        );
232
233        let state_key = format!("state.{id}");
234        trace!("πŸ” state key: {}", &state_key);
235        /*
236        ## Reliability lock and adjustment
237
238        This transaction creates a lock on the individual messages "state_key" (which contains the current messages state)
239        and the entire Expiration table. If either of those values are changed while the transaction is in progress, then
240        the transaction will fail, return a `Redis::Nil`, and the transaction will retry up to `self.retries` (Note:
241        none of the operations in the transaction will have taken place yet.)
242
243        We want to lock on the `state_key` because that shows the given state of the message.
244        We want to lock on the `expiry` table because the message may have expired, been handled by the `gc()` function and
245        may have already been adjusted. (Remember, the `expiry` table holds timestamp markers for when a message will be
246        expiring.)
247
248        The operations in the Pipeline are:
249
250        1. If there is an `old` state, decrement the old state count and remove the old marker from the `expiry` table.
251        2. If the `new` state is not a terminal state (e.g. it is not a final disposition for a message), then create a new
252           entry for `expiry`
253        3. Increment the `new` state count
254        4. Modify the `state.{ID}` value (if it exists) to indicate the newest known state for the message, returning the
255           prior value. Checking that this value is not Nil is an additional sanity check that the value was not removed
256           or altered by a different process. (We are already imposing a transaction lock which should prevent this, but
257           additional paranoia can sometimes be good.)
258
259        Upon execution of the Pipeline, we get back a list of results. This should look similar to a `Queued` entry for
260        every function that we've performed, followed by an inline list of the results of those functions. (e.g.
261        for a transition between `ReliabilityState::Stored` to `ReliabilityState::Retrieved`, which are neither terminal
262        states), we should see something like: `["Queued","Queued","Queued",["OK", "OK", "stored"]]`
263
264        If any of the locks failed, we should get back a `Nil`, in which case, we would want to try this operation again.
265        In addition, the sanity `Get` may also return a `Nil` which would also indicate that there was a problem. There's
266        some debate whether or not to retry the operation if that's the case (since it would imply that the lock failed
267        for some unexpected reason), however for now, we just report a soft `error!()`.
268
269         */
270        // Create the initial state key.
271        // Do not place this inside of the transaction. We monitor the state key and the transaction will
272        // fail because the value will change before the transaction completes. Yes, really.
273        if new == ReliabilityState::Received && old.is_none() {
274            trace!(
275                "πŸ” Creating new record {state_key} ex {:?}",
276                expr.unwrap_or(MIN_EXPIRATION)
277            );
278            // we can't perform this in a transaction because we can only increment if the set succeeds,
279            // and values aren't returned when creating values in transactions. In order to do this
280            // from inside the transaction, you would need to create a function, and that feels a bit
281            // too heavy for this.
282            // Create the new `state.{id}` key if it does not exist, and set the expiration.
283            let options = redis::SetOptions::default()
284                .with_expiration(redis::SetExpiry::EX(expr.unwrap_or(MIN_EXPIRATION)))
285                .conditional_set(redis::ExistenceCheck::NX);
286            trace!("πŸ” β­• SET {state_key} NX EX {:?}", new);
287            let result = conn
288                .set_options::<_, _, Value>(&state_key, new.to_string(), options)
289                .await
290                .map_err(|e| {
291                    //dbg!(&e);
292                    warn!("πŸ”βš οΈ Could not create state key: {:?}", e);
293                    ApcErrorKind::GeneralError("Could not create the state key".to_owned())
294                })?;
295            if result != redis::Value::Okay {
296                // Redis returned a `Nil`, indicating that there was some error. The only thing that should cause that
297                // would be if the `old` state was reset to `None` and we thought we needed to create a new state.
298                // Since the message carries it's prior state, it shouldn't be set to `None` unless there's something
299                // strange going on.
300                // TODO: It's worth noting that when restarting autoendpoint, we get a large number of these in the logs.
301                // Need to figure out the reason for that.
302                // The `result` is always `nil` so that's not helpful.
303                error!("πŸ”βš οΈ Tried to recreate state_key {state_key}: {old:?} => {new:?}",);
304                return Err(
305                    ApcErrorKind::GeneralError("Tried to recreate state_key".to_string()).into(),
306                );
307            }
308        } else {
309            trace!("πŸ” Checking {:?}", &old);
310            // safety check (yes, there's still a slight chance of a race, but it's small)
311            if let Some(old) = old {
312                let check_state: String = conn.get(&state_key).await?;
313                trace!("πŸ” Checking state for {}: {:?}", id, &check_state);
314                if check_state != old.to_string() {
315                    trace!(
316                        "πŸ” Attempting to update state for {} from {} to {}, but current state is different: {:?}",
317                        id, old, new, check_state
318                    );
319                    return Err(ApcErrorKind::GeneralError(
320                        "State mismatch during reliability record update".to_owned(),
321                    )
322                    .into());
323                }
324            };
325        }
326
327        crate::redis_util::transaction(
328            conn,
329            &[&state_key, &EXPIRY.to_owned()],
330            self.retries,
331            || ApcErrorKind::GeneralError("Exceeded reliability record retry attempts".to_owned()),
332            async |conn, pipe: &mut Pipeline| {
333                // The first state is special, since we need to create the `state_key`.
334                // remove the old state from the expiry set, if it exists.
335                // There should only be one message at a given state in the `expiry` table.
336                // Since we only use that table to track messages that may expire. (We
337                // decrement "expired" messages in the `gc` function, so having messages
338                // in multiple states may decrement counts incorrectly.))
339                if let Some(old) = old {
340                    trace!("πŸ” βž– {old} - {id}");
341                    trace!("πŸ” πŸͺˆβ­• HINCRBY {COUNTS} 1");
342                    pipe.hincr(COUNTS, old.to_string(), -1);
343                    let key = ExpiryKey {
344                        id: id.to_string(),
345                        state: old.to_owned(),
346                    }
347                    .to_string();
348                    trace!("πŸ” internal remove old state: {:?}", key);
349                    trace!("πŸ” πŸͺˆβ­• ZREM {EXPIRY} {key}");
350                    pipe.zrem(EXPIRY, &key);
351                }
352                if !new.is_terminal() {
353                    // Write the expiration only if the state is non-terminal. Otherwise we run the risk of
354                    // messages reporting a false "expired" state even if they were "successful".
355                    let key = ExpiryKey {
356                        id: id.to_string(),
357                        state: new.to_owned(),
358                    }
359                    .to_string();
360                    trace!("πŸ” πŸͺˆβ­• ZADD {EXPIRY} {:?} {key}", expr.unwrap_or_default());
361                    pipe.zadd(EXPIRY, &key, expr.unwrap_or_default());
362                    trace!("πŸ” internal record result: {:?}", key);
363                }
364                trace!("πŸ” βž• {new} - {id}");
365                // Bump up the new state count, and set the state key's state if it still exists.
366                trace!("πŸ” πŸͺˆβ­• HINCRBY {COUNTS} {new} 1");
367                pipe.hincr(COUNTS, new.to_string(), 1);
368                if new != ReliabilityState::Received {
369                    let options = redis::SetOptions::default()
370                        .with_expiration(redis::SetExpiry::KEEPTTL)
371                        .conditional_set(redis::ExistenceCheck::XX)
372                        .get(true);
373                    trace!("πŸ” πŸͺˆβ­• SET {state_key} {new} XX KEEPTTL");
374                    pipe.set_options(&state_key, new.to_string(), options);
375                }
376                // `exec_query` returns `RedisResult<()>`.
377                // `query_async` returns `RedisResult<Option<redis::Value>, RedisError>`.
378                // We really don't care about the returned result here, but transaction
379                // retries if we return Ok(None), so we run the exec and return
380                // a nonce `Some` value.
381                // The turbo-fish is a fallback for edition 2024
382                let result = pipe.query_async::<redis::Value>(conn).await?;
383                trace!("πŸ” πŸͺˆ {id} - {:?}", &result);
384                // The last element returned from the command is the result of the pipeline.
385                // If Redis encounters an error, it will return a `nil` as well. We handle both
386                // the same (retry), so we can normalize errors as `nil`.
387                // The last of which should be the result of the piped command set.
388                // This should return `nil` if there is any error, in which case we record
389                // a soft error. (On success, it will return the result of the last command
390                // in the pipe, which may vary due to the current state).
391                // This could also be strung together as a cascade of functions, but it's broken
392                // out to discrete steps for readability.
393                /* On prod, we get a large number of these errors, which I think might be clogging
394                  up the redis connections, causing servers to report as degraded.
395                */
396                if result == redis::Value::Nil {
397                    warn!("πŸ”βš πŸͺˆ {id} - Pipe failed, skipping retry.");
398                    // temporarily just let things fail to handle autoendpoint degradation.
399                    // return Ok(None);
400                    return Ok(Some(redis::Value::Okay));
401                }
402                if let Some(operations) = result.as_sequence() {
403                    // We have responses, the first items report the state of the commands,
404                    // the final line is a list of command results.
405                    if let Some(result_values) = operations.last() {
406                        if let Some(results) = result_values.as_sequence() {
407                            // The last command should contain the prior state. If it returned `Nil`
408                            // for some, unexpected reason, note the error.
409                            if Some(&redis::Value::Nil) == results.last() {
410                                error!("πŸ”πŸš¨ WARNING: Lock Issue for {id}")
411                                // There is some debate about whether or not to rerun
412                                // the transaction if this state is reached.
413                                // Rerunning would cause the counts to be impacted, but
414                                // might address any other issues that caused the
415                                // `Nil` to be returned.
416                                // For now, let's just log the error.
417                            }
418                        }
419                    }
420                    Ok(Some(redis::Value::Okay))
421                } else {
422                    // a Nil will rerun the transaction.
423                    Ok(None)
424                }
425            },
426        )
427        .await
428        .inspect_err(|e| {
429            //dbg!(&e);
430            warn!("πŸ”βš οΈError occurred during transaction: {:?}", e);
431        })?;
432        Ok(())
433    }
434
435    /// Perform a garbage collection cycle on a reliability object.
436    pub async fn gc(&self) -> Result<()> {
437        if let Some(pool) = &self.pool {
438            if let Ok(mut conn) = pool.get().await {
439                debug!("πŸ” performing pre-report garbage collection");
440                return self.internal_gc(&mut conn, sec_since_epoch()).await;
441            }
442        }
443        Ok(())
444    }
445
446    // Perform the `garbage collection` cycle. This will scan the currently known timestamp
447    // indexed entries in redis looking for "expired" data, and then rectify the counts to
448    // indicate the final states. This is because many of the storage systems do not provide
449    // indicators when data reaches a TTL.
450
451    // A few notes about redis:
452    // "pipeline" essentially stitches commands together. Each command executes in turn, but the data store
453    // remains open for other operations to occur.
454    // `atomic()` wraps things in a transaction, essentially locking the data store while the command executes.
455    // Sadly, there's no way to create a true transaction where you read and write in a single operation, so we
456    // have to presume some "slop" here.
457    pub(crate) async fn internal_gc<C: ConnectionLike + AsyncCommands>(
458        &self,
459        conn: &mut C,
460        expr: u64,
461    ) -> Result<()> {
462        let result: redis::Value = crate::redis_util::transaction(
463            conn,
464            &[EXPIRY],
465            self.retries,
466            || ApcErrorKind::GeneralError("Exceeded gc retry attempts".to_owned()),
467            async |conn, pipe| {
468                // First, get the list of values that are to be purged.
469                let purged: Vec<String> = conn.zrangebyscore(EXPIRY, 0, expr as isize).await?;
470                // insta-bail if there's nothing to do.
471                if purged.is_empty() {
472                    return Ok(Some(redis::Value::Nil));
473                }
474
475                // Now purge each of the values by resetting the counts and removing the item from
476                // the purge set.
477                for key in purged {
478                    let Ok(expiry_key) = ExpiryKey::try_from(key.clone()) else {
479                        let err = "Invalid key stored in Reliability datastore";
480                        error!("πŸ”πŸŸ₯ {} [{:?}]", &err, &key);
481                        return Err(ApcErrorKind::GeneralError(err.to_owned()));
482                    };
483                    // Adjust the COUNTS and then remove the record from the list of expired rows.
484                    trace!("πŸ” βž–πŸͺ¦ {} - {key}", expiry_key.state.to_string());
485                    pipe.hincr(COUNTS, expiry_key.state.to_string(), -1);
486                    pipe.hincr(COUNTS, ReliabilityState::Expired.to_string(), 1);
487                    pipe.zrem(EXPIRY, key);
488                }
489                Ok(pipe.query_async(conn).await?)
490            },
491        )
492        .await?;
493        self.metrics
494            .incr_with_tags(MetricName::ReliabilityGc)
495            .with_tag(
496                "status",
497                if result == redis::Value::Nil {
498                    "error"
499                } else {
500                    "success"
501                },
502            )
503            .send();
504        Ok(())
505    }
506
507    // Return a snapshot of milestone states
508    // This will probably not be called directly, but useful for debugging.
509    pub async fn report(&self) -> Result<HashMap<String, i32>> {
510        if let Some(pool) = &self.pool {
511            if let Ok(mut conn) = pool.get().await {
512                return Ok(conn.hgetall(COUNTS).await.map_err(|e| {
513                    ApcErrorKind::GeneralError(format!("Could not read report {e:?}"))
514                })?);
515            }
516        }
517        Ok(HashMap::new())
518    }
519
520    pub async fn health_check<'a>(&self) -> Result<&'a str> {
521        if let Some(pool) = &self.pool {
522            let mut conn = pool.get().await.map_err(|e| {
523                ApcErrorKind::GeneralError(format!(
524                    "Could not connect to reliability datastore: {e:?}"
525                ))
526            })?;
527            // Add a type here, even though we're tossing the value, in order to prevent the `FromRedisValue` warning.
528            conn.ping::<()>().await.map_err(|e| {
529                ApcErrorKind::GeneralError(format!("Could not ping reliability datastore: {e:?}"))
530            })?;
531            Ok("OK")
532        } else {
533            Ok("OK")
534        }
535    }
536}
537
538const METRIC_NAME: &str = "autopush_reliability";
539
540/// Generate a Prometheus compatible report. Output should follow the
541/// [instrumentation](https://prometheus.io/docs/practices/instrumentation/) guidelines.
542///
543/// In short form, the file should be a plain text output, with each metric on it's own line
544/// using the following format:
545/// ```text
546/// # HELP metric_name Optional description of this metric
547/// # TYPE metric_name {required type (gauge|count|histogram|summary)}
548/// metric_name{label="label1"} value
549/// metric_name{label="label2"} value
550/// ```
551/// An example which would return counts of messages in given states at the current
552/// time would be:
553/// ```text
554/// # HELP autopush_reliability Counts for messages in given states
555/// # TYPE metric_name gauge
556/// autopush_reliability{state="recv"} 123
557/// autopush_reliability{state="stor"} 123
558/// # EOF
559/// ```
560/// Note that time is not required. A timestamp has been added to the output, but is
561/// ignored by Prometheus, and is only provided to ensure that there is no intermediate
562/// caching occurring.
563///
564/// The report endpoint currently is only provided by `autoendpoint`, even though the report
565/// is inclusive for all push milestones. This is done for simplicity, both for serving the
566/// data and for collection and management of the metrics.
567///
568pub fn gen_report(values: HashMap<String, i32>) -> Result<String> {
569    let mut registry = Registry::default();
570
571    // A "family" is a grouping of metrics.
572    // we specify this as the ("label", "label value") which index to a Gauge.
573    let family = Family::<Vec<(&str, String)>, Gauge>::default();
574    // This creates the top level association of the elements in the family with the metric.
575    registry.register(
576        METRIC_NAME,
577        "Count of messages at given states",
578        family.clone(),
579    );
580    for (milestone, value) in values.into_iter() {
581        // prevent any stray leakage of invalid state data
582        if ReliabilityState::from_str(&milestone).is_err() {
583            trace!("πŸ” skipping invalid state {milestone:?}");
584            continue;
585        }
586        // Specify the static "state" label name with the given milestone, and add the
587        // value as the gauge value.
588        family
589            .get_or_create(&vec![("state", milestone)])
590            .set(value.into());
591    }
592
593    // Return the formatted string that Prometheus will eventually read.
594    let mut encoded = String::new();
595    encode(&mut encoded, &registry).map_err(|e| {
596        ApcErrorKind::GeneralError(format!("Could not generate Reliability report {e:?}"))
597    })?;
598    Ok(encoded)
599}
600
601/// Handle the `/metrics` request by returning a Prometheus compatible report.
602pub async fn report_handler(reliability: &Arc<PushReliability>) -> Result<HttpResponse> {
603    reliability.gc().await?;
604    let report = gen_report(reliability.report().await?)?;
605    Ok(HttpResponse::Ok()
606        .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
607        .body(report))
608}
609
610#[cfg(test)]
611mod tests {
612    use std::collections::HashMap;
613    use std::str::FromStr;
614
615    use super::*;
616    use redis_test::{MockCmd, MockRedisConnection};
617    use uuid::Uuid;
618
619    #[test]
620    fn test_report() {
621        // create a nonce report
622        let mut report: HashMap<String, i32> = HashMap::new();
623        let recv = ReliabilityState::Received.to_string();
624        let trns = ReliabilityState::Transmitted.to_string();
625        report.insert(recv.clone(), 111);
626        report.insert(ReliabilityState::Stored.to_string(), 222);
627        report.insert(ReliabilityState::Retrieved.to_string(), 333);
628        report.insert(trns.clone(), 444);
629        report.insert("biginvalid".to_string(), -1);
630
631        let generated = gen_report(report).unwrap();
632        // We don't really care if the `Created` or `HELP` lines are included
633        assert!(generated.contains(&format!("# TYPE {METRIC_NAME}")));
634        // sample the first and last values.
635        assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{recv}\"}} 111")));
636        assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{trns}\"}} 444")));
637        assert!(!generated.contains(&format!("{METRIC_NAME}{{state=\"biginvalid\"}} -1")));
638    }
639
640    #[test]
641    fn state_ser() {
642        assert_eq!(
643            ReliabilityState::from_str("delivered").unwrap(),
644            ReliabilityState::Delivered
645        );
646        assert_eq!(
647            ReliabilityState::from_str("int_accepted").unwrap(),
648            ReliabilityState::IntAccepted
649        );
650        assert_eq!(
651            serde_json::from_str::<ReliabilityState>(r#""int_accepted""#).unwrap(),
652            ReliabilityState::IntAccepted
653        );
654
655        assert_eq!(ReliabilityState::IntAccepted.to_string(), "int_accepted");
656        assert_eq!(
657            serde_json::to_string(&ReliabilityState::IntAccepted).unwrap(),
658            r#""int_accepted""#
659        );
660    }
661
662    #[actix_rt::test]
663    async fn test_push_reliability_report() -> Result<()> {
664        let mut db = crate::db::mock::MockDbClient::new();
665
666        // Build the state.
667        let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
668        // Remember, we shouldn't just arbitrarily create mid-state values, so
669        // let's start from the beginning
670        let new = ReliabilityState::Received;
671        let old = None;
672        let expr = 1;
673
674        let exp_key = ExpiryKey {
675            id: test_id.clone(),
676            state: new,
677        }
678        .to_string();
679
680        let state_id = format!("state.{test_id}");
681
682        let mut pipeline = redis::Pipeline::new();
683        // We're adding an element so we have something to record.
684        pipeline
685            .cmd("MULTI")
686            // No old state, so no decrement.
687            // "received" is not terminal
688            .cmd("ZADD")
689            .arg(EXPIRY)
690            .arg(expr)
691            .arg(&exp_key)
692            // adjust the counts
693            .cmd("HINCRBY")
694            .arg(COUNTS)
695            .arg(new.to_string())
696            .arg(1)
697            // Run the transaction
698            .cmd("EXEC");
699
700        let mut conn = MockRedisConnection::new(vec![
701            MockCmd::new(
702                redis::cmd("SET")
703                    .arg(&state_id)
704                    .arg(new.to_string())
705                    .arg("NX")
706                    .arg("EX")
707                    .arg(expr),
708                Ok(redis::Value::Okay),
709            ),
710            MockCmd::new(
711                redis::cmd("WATCH").arg(&state_id).arg(EXPIRY),
712                Ok(redis::Value::Okay),
713            ),
714            MockCmd::new(
715                pipeline,
716                Ok(redis::Value::Array(vec![
717                    redis::Value::Okay,                              // MULTI
718                    redis::Value::SimpleString("QUEUED".to_owned()), // ZADD
719                    redis::Value::SimpleString("QUEUED".to_owned()), // HINCRBY
720                    redis::Value::SimpleString("QUEUED".to_owned()), // SET
721                    // Return the transaction results.
722                    redis::Value::Array(vec![
723                        redis::Value::Int(1), // 0 -> 1
724                        redis::Value::Int(1),
725                        // there's no prior value, so this will return Nil
726                        redis::Value::Nil,
727                    ]),
728                ])),
729            ),
730            MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
731        ]);
732
733        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
734
735        let int_test_id = test_id.clone();
736        db.expect_log_report()
737            .times(1)
738            .withf(move |id, state| id == int_test_id && state == &new)
739            .return_once(|_, _| Ok(()));
740        // test the main report function (note, this does not test redis)
741        let db_box = Box::new(Arc::new(db));
742        let pr = PushReliability::new(
743            &None,
744            db_box.clone(),
745            &metrics,
746            crate::redis_util::MAX_TRANSACTION_LOOP,
747        )
748        .unwrap();
749        // `.record()` uses a pool, so we can't pass the moc connection directly.
750        // Instead, we're going to essentially recreate what `.record()` does calling
751        // the `.internal_record()` function, followed by the database `.record()`.
752        // This emulates
753        // ```
754        // pr.record(&Some(test_id.clone()), new, &None, Some(expr))
755        //    .await?;
756        // ```
757
758        // and mock the redis call.
759        pr.internal_record(&mut conn, &old, new, Some(expr), &test_id)
760            .await?;
761
762        db_box
763            .log_report(&test_id, new)
764            .await
765            .inspect_err(|e| {
766                warn!("πŸ”βš οΈ Unable to record reliability state log: {:?}", e);
767            })
768            .map_err(|e| ApcErrorKind::GeneralError(e.to_string()))?;
769
770        Ok(())
771    }
772
773    #[actix_rt::test]
774    async fn test_push_reliability_record() -> Result<()> {
775        let db = crate::db::mock::MockDbClient::new();
776        let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
777        let new = ReliabilityState::Stored;
778        let old = ReliabilityState::Received;
779        let expr = 1;
780
781        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
782        let new_key = ExpiryKey {
783            id: test_id.clone(),
784            state: new,
785        }
786        .to_string();
787        let old_key = ExpiryKey {
788            id: test_id.clone(),
789            state: old,
790        }
791        .to_string();
792
793        let state_key = format!("state.{test_id}");
794
795        let mut mock_pipe = redis::Pipeline::new();
796        mock_pipe
797            .cmd("MULTI")
798            .ignore()
799            // Decrement the old state
800            .cmd("HINCRBY")
801            .arg(COUNTS)
802            .arg(old.to_string())
803            .arg(-1)
804            .ignore()
805            // Replace the combined old state key in the expiry set.
806            .cmd("ZREM")
807            .arg(EXPIRY)
808            .arg(old_key)
809            .ignore()
810            .cmd("ZADD")
811            .arg(EXPIRY)
812            .arg(expr)
813            .arg(new_key)
814            .ignore()
815            .cmd("HINCRBY")
816            .arg(COUNTS)
817            .arg(new.to_string())
818            .arg(1)
819            .cmd("SET")
820            .arg(&state_key)
821            .arg(new.to_string())
822            .arg("XX")
823            .arg("GET")
824            .arg("KEEPTTL")
825            .ignore()
826            .cmd("EXEC")
827            .ignore();
828
829        let mut conn = MockRedisConnection::new(vec![
830            // Create the new state
831            MockCmd::new(
832                redis::cmd("WATCH").arg(&state_key).arg(EXPIRY),
833                Ok(redis::Value::Okay),
834            ),
835            MockCmd::new(
836                redis::cmd("GET").arg(&state_key),
837                Ok(redis::Value::SimpleString(old.to_string())),
838            ),
839            // NOTE: Technically, since we `.ignore()` these, we could just have a
840            // Vec containing just `Okay`. I'm being a bit pedantic here because I know
841            // that this will come back to haunt me if I'm not, and because figuring out
842            // the proper response for this was annoying.
843            MockCmd::new(
844                mock_pipe,
845                Ok(redis::Value::Array(vec![
846                    redis::Value::Okay,
847                    // Match the number of commands that are being held for processing
848                    redis::Value::SimpleString("QUEUED".to_owned()), // multi
849                    redis::Value::SimpleString("QUEUED".to_owned()), // hincrby
850                    redis::Value::SimpleString("QUEUED".to_owned()), // zrem
851                    redis::Value::SimpleString("QUEUED".to_owned()), // zadd
852                    redis::Value::SimpleString("QUEUED".to_owned()), // hincrby
853                    redis::Value::SimpleString("QUEUED".to_owned()), // set
854                    // the exec has been called, return an array containing the results.
855                    redis::Value::Array(vec![
856                        redis::Value::Okay, //Multi
857                        redis::Value::Okay, //hincrby
858                        redis::Value::Okay, //zrem
859                        redis::Value::Okay, //zadd
860                        redis::Value::Okay, //hincr
861                        redis::Value::SimpleString(old.to_string()),
862                    ]),
863                ])),
864            ),
865            // If the transaction fails, this should return a redis::Value::Nil
866            MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
867        ]);
868
869        // test the main report function (note, this does not test redis)
870        let pr = PushReliability::new(
871            &None,
872            Box::new(Arc::new(db)),
873            &metrics,
874            crate::redis_util::MAX_TRANSACTION_LOOP,
875        )
876        .unwrap();
877        let _ = pr
878            .internal_record(&mut conn, &Some(old), new, Some(expr), &test_id)
879            .await;
880
881        Ok(())
882    }
883
884    #[actix_rt::test]
885    async fn test_push_reliability_full() -> Result<()> {
886        let db = crate::db::mock::MockDbClient::new();
887        let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
888        let new = ReliabilityState::Received;
889        let stored = ReliabilityState::Stored;
890        let expr = 1;
891
892        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
893        let new_key = ExpiryKey {
894            id: test_id.clone(),
895            state: new,
896        }
897        .to_string();
898        let stored_key = ExpiryKey {
899            id: test_id.clone(),
900            state: stored,
901        }
902        .to_string();
903
904        let state_key = format!("state.{test_id}");
905
906        let mut mock_pipe = redis::Pipeline::new();
907        mock_pipe
908            .cmd("MULTI")
909            .ignore()
910            // Decrement the old state count
911            .cmd("HINCRBY")
912            .arg(COUNTS)
913            .arg(stored.to_string())
914            .arg(-1)
915            .ignore()
916            // Replace the expiry key
917            .cmd("ZREM")
918            .arg(EXPIRY)
919            .arg(stored_key.to_string())
920            .ignore()
921            .cmd("ZADD")
922            .arg(EXPIRY)
923            .arg(expr)
924            .arg(new_key)
925            .ignore()
926            // Increment the new state count
927            .cmd("HINCRBY")
928            .arg(COUNTS)
929            .arg(new.to_string())
930            .arg(1)
931            .ignore()
932            // And create the new state transition key (since the message is "live" again.)
933            .cmd("SET")
934            .arg(&state_key)
935            .arg(new.to_string())
936            .arg("XX")
937            .arg("GET")
938            .cmd("EXEC")
939            .ignore();
940
941        let mut conn = MockRedisConnection::new(vec![
942            MockCmd::new(
943                // Create the new state
944                redis::cmd("SET")
945                    .arg(format!("state.{test_id}"))
946                    .arg(new.to_string())
947                    .arg("NX")
948                    .arg("EX")
949                    .arg(expr),
950                Ok(redis::Value::Okay),
951            ),
952            // increment the count for new
953            MockCmd::new(
954                redis::cmd("HINCRBY")
955                    .arg(COUNTS)
956                    .arg(new.to_string())
957                    .arg(1),
958                Ok(redis::Value::Okay),
959            ),
960            // begin the transaction
961            MockCmd::new(
962                redis::cmd("WATCH").arg(&state_key).arg(EXPIRY),
963                Ok(redis::Value::Okay),
964            ),
965            // NOTE: Technically, since we `.ignore()` these, we could just have a
966            // Vec containing just `Okay`. I'm being a bit pedantic here because I know
967            // that this will come back to haunt me if I'm not, and because figuring out
968            // the proper response for this was annoying.
969            MockCmd::new(
970                mock_pipe,
971                Ok(redis::Value::Array(vec![
972                    redis::Value::Okay,
973                    // Match the number of commands that are being held for processing
974                    redis::Value::SimpleString("QUEUED".to_owned()),
975                    redis::Value::SimpleString("QUEUED".to_owned()),
976                    redis::Value::SimpleString("QUEUED".to_owned()),
977                    redis::Value::SimpleString("QUEUED".to_owned()),
978                    // the exec has been called, return an array containing the results.
979                    redis::Value::Array(vec![
980                        redis::Value::Okay,
981                        redis::Value::Okay,
982                        redis::Value::Okay,
983                        redis::Value::Okay,
984                    ]),
985                ])),
986            ),
987            // If the transaction fails, this should return a redis::Value::Nil
988            MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
989        ]);
990
991        // test the main report function (note, this does not test redis)
992        let pr = PushReliability::new(
993            &None,
994            Box::new(Arc::new(db)),
995            &metrics,
996            crate::redis_util::MAX_TRANSACTION_LOOP,
997        )
998        .unwrap();
999        let _ = pr
1000            .internal_record(&mut conn, &Some(stored), new, Some(expr), &test_id)
1001            .await;
1002
1003        Ok(())
1004    }
1005
1006    #[actix_rt::test]
1007    async fn test_push_reliability_gc() -> Result<()> {
1008        let db = crate::db::mock::MockDbClient::new();
1009        let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
1010        let new = ReliabilityState::Accepted;
1011        let key = format!("{}#{}", &test_id, &new);
1012        let expr = 1;
1013        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1014
1015        let response: redis::Value = redis::Value::Array(vec![redis::Value::SimpleString(
1016            ExpiryKey {
1017                id: test_id.clone(),
1018                state: new,
1019            }
1020            .to_string(),
1021        )]);
1022
1023        // Construct the Pipeline.
1024        // A redis "pipeline" is a set of instructions that are executed in order. These
1025        // are not truly atomic, as other actions can interrupt a pipeline, but they
1026        // are guaranteed to happen in sequence. Transactions essentially check that the
1027        // WATCH key is not altered before the pipeline is executed.
1028        let mut mock_pipe = redis::Pipeline::new();
1029        mock_pipe
1030            .cmd("MULTI")
1031            .ignore()
1032            // Adjust the state counts.
1033            .cmd("HINCRBY")
1034            .arg(COUNTS)
1035            .arg(new.to_string())
1036            .arg(-1)
1037            .ignore()
1038            .cmd("HINCRBY")
1039            .arg(COUNTS)
1040            .arg(ReliabilityState::Expired.to_string())
1041            .arg(1)
1042            // Replace the state key in the expiry set.
1043            .ignore()
1044            .cmd("ZREM")
1045            .arg(EXPIRY)
1046            .arg(key)
1047            .ignore()
1048            .cmd("EXEC")
1049            .ignore();
1050
1051        // This mocks the "conn"ection, so all commands sent to the connection need
1052        // to be separated out. This includes the transaction "WATCH" and "UNWATCH".
1053        // The "meat" of the functions are handled via a Pipeline, which conjoins the
1054        // commands into one group (see above). Pipelines return an array of
1055        // results, one entry for each pipeline command.
1056        // In other words, the `internal_gc` commands are probably in the pipeline,
1057        // all the others are probably in the conn.
1058        let mut conn = MockRedisConnection::new(vec![
1059            MockCmd::new(redis::cmd("WATCH").arg(EXPIRY), Ok(redis::Value::Okay)),
1060            MockCmd::new(
1061                redis::cmd("ZRANGEBYSCORE").arg(EXPIRY).arg(0).arg(expr),
1062                Ok(response),
1063            ),
1064            // NOTE: Technically, since we `.ignore()` these, we could just have a
1065            // Vec containing just `Okay`. I'm being a bit pedantic here because I know
1066            // that this will come back to haunt me if I'm not, and because figuring out
1067            // the proper response for this was annoying.
1068            MockCmd::new(
1069                mock_pipe,
1070                Ok(redis::Value::Array(vec![
1071                    redis::Value::Okay,
1072                    // Match the number of commands that are being held for processing
1073                    redis::Value::SimpleString("QUEUED".to_owned()),
1074                    redis::Value::SimpleString("QUEUED".to_owned()),
1075                    redis::Value::SimpleString("QUEUED".to_owned()),
1076                    // the exec has been called, return an array containing the results.
1077                    redis::Value::Array(vec![
1078                        redis::Value::Okay,
1079                        redis::Value::Okay,
1080                        redis::Value::Okay,
1081                    ]),
1082                ])),
1083            ),
1084            // If the transaction fails, this should return a redis::Value::Nil
1085            MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
1086        ]);
1087
1088        // test the main report function (note, this does not test redis)
1089        let pr = PushReliability::new(
1090            &None,
1091            Box::new(Arc::new(db)),
1092            &metrics,
1093            crate::redis_util::MAX_TRANSACTION_LOOP,
1094        )
1095        .unwrap();
1096        // functionally a no-op, but it does exercise lines.
1097        pr.gc().await?;
1098
1099        // and mock the redis call.
1100        pr.internal_gc(&mut conn, 1).await?;
1101
1102        Ok(())
1103    }
1104}