autopush_common/
reliability.rs

1/// Push Reliability Recorder
2///
3/// This allows us to track messages from select, known parties (currently, just
4/// mozilla generated and consumed) so that we can identify potential trouble spots
5/// and where messages expire early. Message expiration can lead to message loss
6use std::collections::HashMap;
7use std::str::FromStr;
8use std::sync::Arc;
9
10use actix_web::HttpResponse;
11use cadence::StatsdClient;
12use chrono::TimeDelta;
13use deadpool_redis::Config;
14
15use prometheus_client::{
16    encoding::text::encode, metrics::family::Family, metrics::gauge::Gauge, registry::Registry,
17};
18use redis::{AsyncCommands, aio::ConnectionLike};
19use redis::{Pipeline, Value};
20
21use crate::db::client::DbClient;
22use crate::errors::{ApcError, ApcErrorKind, Result};
23use crate::metric_name::MetricName;
24use crate::metrics::StatsdClientExt;
25use crate::util::timing::sec_since_epoch;
26
27// Redis Keys
28pub const COUNTS: &str = "state_counts";
29pub const EXPIRY: &str = "expiry";
30
31const CONNECTION_EXPIRATION: TimeDelta = TimeDelta::seconds(10);
32// Minimum expiration period of 1 second.
33// This was set to `0`, but there was some confusion whether that would not set an
34// expiration time for a record or would set a record not to expire.
35const MIN_EXPIRATION: u64 = 1;
36
37/// The various states that a message may transit on the way from reception to delivery.
38// Note: "Message" in this context refers to the Subscription Update.
39// TODO: Differentiate between "transmitted via webpush" and "transmitted via bridge"?
40#[derive(
41    Debug,
42    Clone,
43    Copy,
44    PartialEq,
45    Eq,
46    serde::Deserialize,
47    serde::Serialize,
48    strum::Display,
49    strum::EnumString,
50)]
51#[serde(rename_all = "snake_case")]
52#[strum(serialize_all = "snake_case")]
53pub enum ReliabilityState {
54    Received,          // Message was received by the Push Server
55    Stored,            // Message was stored because it could not be delivered immediately
56    Retrieved,         // Message was taken from storage for delivery
57    IntTransmitted,    // Message was handed off between autoendpoint and autoconnect
58    IntAccepted,       // Message was accepted by autoconnect from autopendpoint
59    BridgeTransmitted, // Message was handed off to a mobile bridge for eventual delivery
60    Transmitted,       // Message was handed off for delivery to the UA
61    Accepted,          // Message was accepted for delivery by the UA
62    Delivered,         // Message was provided to the WebApp recipient by the UA
63    DecryptionError,   // Message was provided to the UA and it reported a decryption error
64    NotDelivered,      // Message was provided to the UA and it reported a not delivered error
65    Expired,           // Message expired naturally (e.g. TTL=0)
66    Errored,           // Message resulted in an Error state and failed to be delivered.
67}
68
69impl ReliabilityState {
70    ///  Has the message reached a state where no further transitions should be possible?
71    pub fn is_terminal(self) -> bool {
72        // NOTE: this list should match the daily snapshot captured by `reliability_cron.py`
73        // which will trim these counts after the max message TTL has expired.
74        matches!(
75            self,
76            ReliabilityState::DecryptionError
77                | ReliabilityState::BridgeTransmitted
78                | ReliabilityState::Delivered
79                | ReliabilityState::Errored
80                | ReliabilityState::Expired
81                | ReliabilityState::NotDelivered
82        )
83    }
84}
85
86#[derive(Clone)]
87pub struct PushReliability {
88    pool: Option<deadpool_redis::Pool>,
89    db: Box<dyn DbClient>,
90    metrics: Arc<StatsdClient>,
91    retries: usize,
92}
93
94// Define a struct to hold the expiry key, since it's easy to flub the order.
95pub struct ExpiryKey {
96    pub id: String,
97    pub state: ReliabilityState,
98}
99
100impl std::fmt::Display for ExpiryKey {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(f, "{}#{}", self.id, self.state)
103    }
104}
105
106impl TryFrom<String> for ExpiryKey {
107    type Error = ApcError;
108    fn try_from(value: String) -> Result<Self> {
109        let (id, state) = value.split_once('#').ok_or_else(|| {
110            ApcErrorKind::GeneralError("ExpiryKey must be in the format 'id#state'".to_owned())
111        })?;
112        let state: ReliabilityState = ReliabilityState::from_str(state).map_err(|_| {
113            ApcErrorKind::GeneralError(
114                "Invalid state in ExpiryKey, must be a valid ReliabilityState".to_owned(),
115            )
116        })?;
117        Ok(Self {
118            id: id.to_owned(),
119            state,
120        })
121    }
122}
123
124impl PushReliability {
125    // Do the magic to make a report instance, whatever that will be.
126    pub fn new(
127        reliability_dsn: &Option<String>,
128        db: Box<dyn DbClient>,
129        metrics: &Arc<StatsdClient>,
130        retries: usize,
131    ) -> Result<Self> {
132        let Some(reliability_dsn) = reliability_dsn else {
133            debug!("πŸ” No reliability DSN declared.");
134            return Ok(Self {
135                pool: None,
136                db: db.clone(),
137                metrics: metrics.clone(),
138                retries,
139            });
140        };
141
142        let config: deadpool_redis::Config = Config::from_url(reliability_dsn);
143        let pool = Some(
144            config
145                .builder()
146                .map_err(|e| {
147                    ApcErrorKind::GeneralError(format!("Could not config reliability pool {e:?}"))
148                })?
149                .create_timeout(Some(CONNECTION_EXPIRATION.to_std().unwrap()))
150                .runtime(deadpool::Runtime::Tokio1)
151                .build()
152                .map_err(|e| {
153                    ApcErrorKind::GeneralError(format!("Could not build reliability pool {e:?}"))
154                })?,
155        );
156
157        Ok(Self {
158            pool,
159            db: db.clone(),
160            metrics: metrics.clone(),
161            retries,
162        })
163    }
164
165    // Record the message state change to storage.
166    pub async fn record(
167        &self,
168        reliability_id: &Option<String>,
169        new: ReliabilityState,
170        old: &Option<ReliabilityState>,
171        expr: Option<u64>,
172    ) -> Result<Option<ReliabilityState>> {
173        let Some(id) = reliability_id else {
174            return Ok(None);
175        };
176        if let Some(pool) = &self.pool {
177            debug!(
178                "πŸ” {} from {} to {}",
179                id,
180                old.map(|v| v.to_string())
181                    .unwrap_or_else(|| "None".to_owned()),
182                new
183            );
184            match pool.get().await {
185                Ok(mut conn) => {
186                    self.internal_record(&mut conn, old, new, expr, id)
187                        .await
188                        .map_err(|e| {
189                            warn!("πŸ”βš οΈ Unable to record reliability state: {:?}", e);
190                            ApcErrorKind::GeneralError(
191                                "Could not record reliability state".to_owned(),
192                            )
193                        })?;
194                }
195                Err(e) => warn!("πŸ”βš οΈ Unable to get reliability state pool, {:?}", e),
196            };
197        };
198        // Errors are not fatal, and should not impact message flow, but
199        // we should record them somewhere.
200        let _ = self.db.log_report(id, new).await.inspect_err(|e| {
201            warn!("πŸ”βš οΈ Unable to record reliability state log: {:?}", e);
202        });
203        Ok(Some(new))
204    }
205
206    /// Record the state change in the reliability datastore.
207    /// Because of a strange duplication error, we're using three "tables".
208    /// The COUNTS table contains the count of messages in each state. It's provided as a quick lookup
209    /// for the dashboard query.
210    /// The EXPIRY table contains the expiration time for messages. Those are cleaned up by the
211    /// `gc` function.
212    /// Finally, there's the "table" that is `state.{id}`. This contains the current state of each
213    /// message. Ideally, this would use something like `HSETEX`, but that is not available until
214    /// Redis 8.0, so for now, we use the `SET` function, which provides an expiration time.
215    /// BE SURE TO SET AN EXPIRATION TIME FOR EACH STATE MESSAGE!
216    // TODO: Do we need to keep the state in the message? Probably good for sanity reasons.
217    pub(crate) async fn internal_record<C: ConnectionLike + AsyncCommands>(
218        &self,
219        conn: &mut C,
220        old: &Option<ReliabilityState>,
221        new: ReliabilityState,
222        expr: Option<u64>,
223        id: &str,
224    ) -> Result<()> {
225        trace!(
226            "πŸ” internal record: {} from {} to {}",
227            id,
228            old.map(|v| v.to_string())
229                .unwrap_or_else(|| "None".to_owned()),
230            new
231        );
232
233        let state_key = format!("state.{id}");
234        trace!("πŸ” state key: {}", &state_key);
235        /*
236        ## Reliability lock and adjustment
237
238        This transaction creates a lock on the individual messages "state_key" (which contains the current messages state)
239        and the entire Expiration table. If either of those values are changed while the transaction is in progress, then
240        the transaction will fail, return a `Redis::Nil`, and the transaction will retry up to `self.retries` (Note:
241        none of the operations in the transaction will have taken place yet.)
242
243        We want to lock on the `state_key` because that shows the given state of the message.
244        We want to lock on the `expiry` table because the message may have expired, been handled by the `gc()` function and
245        may have already been adjusted. (Remember, the `expiry` table holds timestamp markers for when a message will be
246        expiring.)
247
248        The operations in the Pipeline are:
249
250        1. If there is an `old` state, decrement the old state count and remove the old marker from the `expiry` table.
251        2. If the `new` state is not a terminal state (e.g. it is not a final disposition for a message), then create a new
252           entry for `expiry`
253        3. Increment the `new` state count
254        4. Modify the `state.{ID}` value (if it exists) to indicate the newest known state for the message, returning the
255           prior value. Checking that this value is not Nil is an additional sanity check that the value was not removed
256           or altered by a different process. (We are already imposing a transaction lock which should prevent this, but
257           additional paranoia can sometimes be good.)
258
259        Upon execution of the Pipeline, we get back a list of results. This should look similar to a `Queued` entry for
260        every function that we've performed, followed by an inline list of the results of those functions. (e.g.
261        for a transition between `ReliabilityState::Stored` to `ReliabilityState::Retrieved`, which are neither terminal
262        states), we should see something like: `["Queued","Queued","Queued",["OK", "OK", "stored"]]`
263
264        If any of the locks failed, we should get back a `Nil`, in which case, we would want to try this operation again.
265        In addition, the sanity `Get` may also return a `Nil` which would also indicate that there was a problem. There's
266        some debate whether or not to retry the operation if that's the case (since it would imply that the lock failed
267        for some unexpected reason), however for now, we just report a soft `error!()`.
268
269         */
270        // Create the initial state key.
271        // Do not place this inside of the transaction. We monitor the state key and the transaction will
272        // fail because the value will change before the transaction completes. Yes, really.
273        if new == ReliabilityState::Received && old.is_none() {
274            trace!(
275                "πŸ” Creating new record {state_key} ex {:?}",
276                expr.unwrap_or(MIN_EXPIRATION)
277            );
278            // we can't perform this in a transaction because we can only increment if the set succeeds,
279            // and values aren't returned when creating values in transactions. In order to do this
280            // from inside the transaction, you would need to create a function, and that feels a bit
281            // too heavy for this.
282            // Create the new `state.{id}` key if it does not exist, and set the expiration.
283            let options = redis::SetOptions::default()
284                .with_expiration(redis::SetExpiry::EX(expr.unwrap_or(MIN_EXPIRATION)))
285                .conditional_set(redis::ExistenceCheck::NX);
286            trace!("πŸ” β­• SET {state_key} NX EX {:?}", new);
287            let result = conn
288                .set_options::<_, _, Value>(&state_key, new.to_string(), options)
289                .await
290                .map_err(|e| {
291                    //dbg!(&e);
292                    warn!("πŸ”βš οΈ Could not create state key: {:?}", e);
293                    ApcErrorKind::GeneralError("Could not create the state key".to_owned())
294                })?;
295            if result != redis::Value::Okay {
296                // Redis returned a `Nil`, indicating that there was some error. The only thing that should cause that
297                // would be if the `old` state was reset to `None` and we thought we needed to create a new state.
298                // Since the message carries it's prior state, it shouldn't be set to `None` unless there's something
299                // strange going on.
300                // TODO: It's worth noting that when restarting autoendpoint, we get a large number of these in the logs.
301                // Need to figure out the reason for that.
302                // The `result` is always `nil` so that's not helpful.
303                error!("πŸ”βš οΈ Tried to recreate state_key {state_key}: {old:?} => {new:?}",);
304                return Err(
305                    ApcErrorKind::GeneralError("Tried to recreate state_key".to_string()).into(),
306                );
307            }
308        } else {
309            trace!("πŸ” Checking {:?}", &old);
310            // safety check (yes, there's still a slight chance of a race, but it's small)
311            if let Some(old) = old {
312                let check_state: String = conn.get(&state_key).await?;
313                trace!("πŸ” Checking state for {}: {:?}", id, &check_state);
314                if check_state != old.to_string() {
315                    trace!(
316                        "πŸ” Attempting to update state for {} from {} to {}, but current state is different: {:?}",
317                        id, old, new, check_state
318                    );
319                    return Err(ApcErrorKind::GeneralError(
320                        "State mismatch during reliability record update".to_owned(),
321                    )
322                    .into());
323                }
324            };
325        }
326
327        crate::redis_util::transaction(
328            conn,
329            &[&state_key, &EXPIRY.to_owned()],
330            self.retries,
331            || ApcErrorKind::GeneralError("Exceeded reliability record retry attempts".to_owned()),
332            async |conn, pipe: &mut Pipeline| {
333                // The first state is special, since we need to create the `state_key`.
334                // remove the old state from the expiry set, if it exists.
335                // There should only be one message at a given state in the `expiry` table.
336                // Since we only use that table to track messages that may expire. (We
337                // decrement "expired" messages in the `gc` function, so having messages
338                // in multiple states may decrement counts incorrectly.))
339                if let Some(old) = old {
340                    trace!("πŸ” βž– {old} - {id}");
341                    trace!("πŸ” πŸͺˆβ­• HINCRBY {COUNTS} 1");
342                    pipe.hincr(COUNTS, old.to_string(), -1);
343                    let key = ExpiryKey {
344                        id: id.to_string(),
345                        state: old.to_owned(),
346                    }
347                    .to_string();
348                    trace!("πŸ” internal remove old state: {:?}", key);
349                    trace!("πŸ” πŸͺˆβ­• ZREM {EXPIRY} {key}");
350                    pipe.zrem(EXPIRY, &key);
351                }
352                if !new.is_terminal() {
353                    // Write the expiration only if the state is non-terminal. Otherwise we run the risk of
354                    // messages reporting a false "expired" state even if they were "successful".
355                    let key = ExpiryKey {
356                        id: id.to_string(),
357                        state: new.to_owned(),
358                    }
359                    .to_string();
360                    trace!("πŸ” πŸͺˆβ­• ZADD {EXPIRY} {:?} {key}", expr.unwrap_or_default());
361                    pipe.zadd(EXPIRY, &key, expr.unwrap_or_default());
362                    trace!("πŸ” internal record result: {:?}", key);
363                }
364                trace!("πŸ” βž• {new} - {id}");
365                // Bump up the new state count, and set the state key's state if it still exists.
366                trace!("πŸ” πŸͺˆβ­• HINCRBY {COUNTS} {new} 1");
367                pipe.hincr(COUNTS, new.to_string(), 1);
368                if new != ReliabilityState::Received {
369                    let options = redis::SetOptions::default()
370                        .with_expiration(redis::SetExpiry::KEEPTTL)
371                        .conditional_set(redis::ExistenceCheck::XX)
372                        .get(true);
373                    trace!("πŸ” πŸͺˆβ­• SET {state_key} {new} XX KEEPTTL");
374                    pipe.set_options(&state_key, new.to_string(), options);
375                }
376                // `exec_query` returns `RedisResult<()>`.
377                // `query_async` returns `RedisResult<Option<redis::Value>, RedisError>`.
378                // We really don't care about the returned result here, but transaction
379                // retries if we return Ok(None), so we run the exec and return
380                // a nonce `Some` value.
381                // The turbo-fish is a fallback for edition 2024
382                let result = pipe.query_async::<redis::Value>(conn).await?;
383                trace!("πŸ” πŸͺˆ {id} - {:?}", &result);
384                // The last element returned from the command is the result of the pipeline.
385                // If Redis encounters an error, it will return a `nil` as well. We handle both
386                // the same (retry), so we can normalize errors as `nil`.
387                // The last of which should be the result of the piped command set.
388                // This should return `nil` if there is any error, in which case we record
389                // a soft error. (On success, it will return the result of the last command
390                // in the pipe, which may vary due to the current state).
391                // This could also be strung together as a cascade of functions, but it's broken
392                // out to discrete steps for readability.
393                /* On prod, we get a large number of these errors, which I think might be clogging
394                  up the redis connections, causing servers to report as degraded.
395                */
396                if result == redis::Value::Nil {
397                    warn!("πŸ”βš πŸͺˆ {id} - Pipe failed, skipping retry.");
398                    // temporarily just let things fail to handle autoendpoint degradation.
399                    // return Ok(None);
400                    return Ok(Some(redis::Value::Okay));
401                }
402                if let Some(operations) = result.as_sequence() {
403                    // We have responses, the first items report the state of the commands,
404                    // the final line is a list of command results.
405                    if let Some(result_values) = operations.last() && let Some(results) = result_values.as_sequence() &&
406                            // The last command should contain the prior state. If it returned `Nil`
407                            // for some, unexpected reason, note the error.
408                            Some(&redis::Value::Nil) == results.last() {
409                                error!("πŸ”πŸš¨ WARNING: Lock Issue for {id}")
410                                // There is some debate about whether or not to rerun
411                                // the transaction if this state is reached.
412                                // Rerunning would cause the counts to be impacted, but
413                                // might address any other issues that caused the
414                                // `Nil` to be returned.
415                                // For now, let's just log the error.
416
417
418                    }
419                    Ok(Some(redis::Value::Okay))
420                } else {
421                    // a Nil will rerun the transaction.
422                    Ok(None)
423                }
424            },
425        )
426        .await
427        .inspect_err(|e| {
428            //dbg!(&e);
429            warn!("πŸ”βš οΈError occurred during transaction: {:?}", e);
430        })?;
431        Ok(())
432    }
433
434    /// Perform a garbage collection cycle on a reliability object.
435    pub async fn gc(&self) -> Result<()> {
436        if let Some(pool) = &self.pool
437            && let Ok(mut conn) = pool.get().await
438        {
439            debug!("πŸ” performing pre-report garbage collection");
440            return self.internal_gc(&mut conn, sec_since_epoch()).await;
441        }
442        Ok(())
443    }
444
445    // Perform the `garbage collection` cycle. This will scan the currently known timestamp
446    // indexed entries in redis looking for "expired" data, and then rectify the counts to
447    // indicate the final states. This is because many of the storage systems do not provide
448    // indicators when data reaches a TTL.
449
450    // A few notes about redis:
451    // "pipeline" essentially stitches commands together. Each command executes in turn, but the data store
452    // remains open for other operations to occur.
453    // `atomic()` wraps things in a transaction, essentially locking the data store while the command executes.
454    // Sadly, there's no way to create a true transaction where you read and write in a single operation, so we
455    // have to presume some "slop" here.
456    pub(crate) async fn internal_gc<C: ConnectionLike + AsyncCommands>(
457        &self,
458        conn: &mut C,
459        expr: u64,
460    ) -> Result<()> {
461        let result: redis::Value = crate::redis_util::transaction(
462            conn,
463            &[EXPIRY],
464            self.retries,
465            || ApcErrorKind::GeneralError("Exceeded gc retry attempts".to_owned()),
466            async |conn, pipe| {
467                // First, get the list of values that are to be purged.
468                let purged: Vec<String> = conn.zrangebyscore(EXPIRY, 0, expr as isize).await?;
469                // insta-bail if there's nothing to do.
470                if purged.is_empty() {
471                    return Ok(Some(redis::Value::Nil));
472                }
473
474                // Now purge each of the values by resetting the counts and removing the item from
475                // the purge set.
476                for key in purged {
477                    let Ok(expiry_key) = ExpiryKey::try_from(key.clone()) else {
478                        let err = "Invalid key stored in Reliability datastore";
479                        error!("πŸ”πŸŸ₯ {} [{:?}]", &err, &key);
480                        return Err(ApcErrorKind::GeneralError(err.to_owned()));
481                    };
482                    // Adjust the COUNTS and then remove the record from the list of expired rows.
483                    trace!("πŸ” βž–πŸͺ¦ {} - {key}", expiry_key.state.to_string());
484                    pipe.hincr(COUNTS, expiry_key.state.to_string(), -1);
485                    pipe.hincr(COUNTS, ReliabilityState::Expired.to_string(), 1);
486                    pipe.zrem(EXPIRY, key);
487                }
488                Ok(pipe.query_async(conn).await?)
489            },
490        )
491        .await?;
492        self.metrics
493            .incr_with_tags(MetricName::ReliabilityGc)
494            .with_tag(
495                "status",
496                if result == redis::Value::Nil {
497                    "error"
498                } else {
499                    "success"
500                },
501            )
502            .send();
503        Ok(())
504    }
505
506    // Return a snapshot of milestone states
507    // This will probably not be called directly, but useful for debugging.
508    pub async fn report(&self) -> Result<HashMap<String, i32>> {
509        if let Some(pool) = &self.pool
510            && let Ok(mut conn) = pool.get().await
511        {
512            return Ok(conn.hgetall(COUNTS).await.map_err(|e| {
513                ApcErrorKind::GeneralError(format!("Could not read report {e:?}"))
514            })?);
515        }
516        Ok(HashMap::new())
517    }
518
519    pub async fn health_check<'a>(&self) -> Result<&'a str> {
520        if let Some(pool) = &self.pool {
521            let mut conn = pool.get().await.map_err(|e| {
522                ApcErrorKind::GeneralError(format!(
523                    "Could not connect to reliability datastore: {e:?}"
524                ))
525            })?;
526            // Add a type here, even though we're tossing the value, in order to prevent the `FromRedisValue` warning.
527            conn.ping::<()>().await.map_err(|e| {
528                ApcErrorKind::GeneralError(format!("Could not ping reliability datastore: {e:?}"))
529            })?;
530            Ok("OK")
531        } else {
532            Ok("OK")
533        }
534    }
535}
536
537const METRIC_NAME: &str = "autopush_reliability";
538
539/// Generate a Prometheus compatible report. Output should follow the
540/// [instrumentation](https://prometheus.io/docs/practices/instrumentation/) guidelines.
541///
542/// In short form, the file should be a plain text output, with each metric on it's own line
543/// using the following format:
544/// ```text
545/// # HELP metric_name Optional description of this metric
546/// # TYPE metric_name {required type (gauge|count|histogram|summary)}
547/// metric_name{label="label1"} value
548/// metric_name{label="label2"} value
549/// ```
550/// An example which would return counts of messages in given states at the current
551/// time would be:
552/// ```text
553/// # HELP autopush_reliability Counts for messages in given states
554/// # TYPE metric_name gauge
555/// autopush_reliability{state="recv"} 123
556/// autopush_reliability{state="stor"} 123
557/// # EOF
558/// ```
559/// Note that time is not required. A timestamp has been added to the output, but is
560/// ignored by Prometheus, and is only provided to ensure that there is no intermediate
561/// caching occurring.
562///
563/// The report endpoint currently is only provided by `autoendpoint`, even though the report
564/// is inclusive for all push milestones. This is done for simplicity, both for serving the
565/// data and for collection and management of the metrics.
566///
567pub fn gen_report(values: HashMap<String, i32>) -> Result<String> {
568    let mut registry = Registry::default();
569
570    // A "family" is a grouping of metrics.
571    // we specify this as the ("label", "label value") which index to a Gauge.
572    let family = Family::<Vec<(&str, String)>, Gauge>::default();
573    // This creates the top level association of the elements in the family with the metric.
574    registry.register(
575        METRIC_NAME,
576        "Count of messages at given states",
577        family.clone(),
578    );
579    for (milestone, value) in values.into_iter() {
580        // prevent any stray leakage of invalid state data
581        if ReliabilityState::from_str(&milestone).is_err() {
582            trace!("πŸ” skipping invalid state {milestone:?}");
583            continue;
584        }
585        // Specify the static "state" label name with the given milestone, and add the
586        // value as the gauge value.
587        family
588            .get_or_create(&vec![("state", milestone)])
589            .set(value.into());
590    }
591
592    // Return the formatted string that Prometheus will eventually read.
593    let mut encoded = String::new();
594    encode(&mut encoded, &registry).map_err(|e| {
595        ApcErrorKind::GeneralError(format!("Could not generate Reliability report {e:?}"))
596    })?;
597    Ok(encoded)
598}
599
600/// Handle the `/metrics` request by returning a Prometheus compatible report.
601pub async fn report_handler(reliability: &Arc<PushReliability>) -> Result<HttpResponse> {
602    reliability.gc().await?;
603    let report = gen_report(reliability.report().await?)?;
604    Ok(HttpResponse::Ok()
605        .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
606        .body(report))
607}
608
609#[cfg(test)]
610mod tests {
611    use std::collections::HashMap;
612    use std::str::FromStr;
613
614    use super::*;
615    use redis_test::{MockCmd, MockRedisConnection};
616    use uuid::Uuid;
617
618    #[test]
619    fn test_report() {
620        // create a nonce report
621        let mut report: HashMap<String, i32> = HashMap::new();
622        let recv = ReliabilityState::Received.to_string();
623        let trns = ReliabilityState::Transmitted.to_string();
624        report.insert(recv.clone(), 111);
625        report.insert(ReliabilityState::Stored.to_string(), 222);
626        report.insert(ReliabilityState::Retrieved.to_string(), 333);
627        report.insert(trns.clone(), 444);
628        report.insert("biginvalid".to_string(), -1);
629
630        let generated = gen_report(report).unwrap();
631        // We don't really care if the `Created` or `HELP` lines are included
632        assert!(generated.contains(&format!("# TYPE {METRIC_NAME}")));
633        // sample the first and last values.
634        assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{recv}\"}} 111")));
635        assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{trns}\"}} 444")));
636        assert!(!generated.contains(&format!("{METRIC_NAME}{{state=\"biginvalid\"}} -1")));
637    }
638
639    #[test]
640    fn state_ser() {
641        assert_eq!(
642            ReliabilityState::from_str("delivered").unwrap(),
643            ReliabilityState::Delivered
644        );
645        assert_eq!(
646            ReliabilityState::from_str("int_accepted").unwrap(),
647            ReliabilityState::IntAccepted
648        );
649        assert_eq!(
650            serde_json::from_str::<ReliabilityState>(r#""int_accepted""#).unwrap(),
651            ReliabilityState::IntAccepted
652        );
653
654        assert_eq!(ReliabilityState::IntAccepted.to_string(), "int_accepted");
655        assert_eq!(
656            serde_json::to_string(&ReliabilityState::IntAccepted).unwrap(),
657            r#""int_accepted""#
658        );
659    }
660
661    #[actix_rt::test]
662    async fn test_push_reliability_report() -> Result<()> {
663        let mut db = crate::db::mock::MockDbClient::new();
664
665        // Build the state.
666        let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
667        // Remember, we shouldn't just arbitrarily create mid-state values, so
668        // let's start from the beginning
669        let new = ReliabilityState::Received;
670        let old = None;
671        let expr = 1;
672
673        let exp_key = ExpiryKey {
674            id: test_id.clone(),
675            state: new,
676        }
677        .to_string();
678
679        let state_id = format!("state.{test_id}");
680
681        let mut pipeline = redis::Pipeline::new();
682        // We're adding an element so we have something to record.
683        pipeline
684            .cmd("MULTI")
685            // No old state, so no decrement.
686            // "received" is not terminal
687            .cmd("ZADD")
688            .arg(EXPIRY)
689            .arg(expr)
690            .arg(&exp_key)
691            // adjust the counts
692            .cmd("HINCRBY")
693            .arg(COUNTS)
694            .arg(new.to_string())
695            .arg(1)
696            // Run the transaction
697            .cmd("EXEC");
698
699        let mut conn = MockRedisConnection::new(vec![
700            MockCmd::new(
701                redis::cmd("SET")
702                    .arg(&state_id)
703                    .arg(new.to_string())
704                    .arg("NX")
705                    .arg("EX")
706                    .arg(expr),
707                Ok(redis::Value::Okay),
708            ),
709            MockCmd::new(
710                redis::cmd("WATCH").arg(&state_id).arg(EXPIRY),
711                Ok(redis::Value::Okay),
712            ),
713            MockCmd::new(
714                pipeline,
715                Ok(redis::Value::Array(vec![
716                    redis::Value::Okay,                              // MULTI
717                    redis::Value::SimpleString("QUEUED".to_owned()), // ZADD
718                    redis::Value::SimpleString("QUEUED".to_owned()), // HINCRBY
719                    redis::Value::SimpleString("QUEUED".to_owned()), // SET
720                    // Return the transaction results.
721                    redis::Value::Array(vec![
722                        redis::Value::Int(1), // 0 -> 1
723                        redis::Value::Int(1),
724                        // there's no prior value, so this will return Nil
725                        redis::Value::Nil,
726                    ]),
727                ])),
728            ),
729            MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
730        ]);
731
732        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
733
734        let int_test_id = test_id.clone();
735        db.expect_log_report()
736            .times(1)
737            .withf(move |id, state| id == int_test_id && state == &new)
738            .return_once(|_, _| Ok(()));
739        // test the main report function (note, this does not test redis)
740        let db_box = Box::new(Arc::new(db));
741        let pr = PushReliability::new(
742            &None,
743            db_box.clone(),
744            &metrics,
745            crate::redis_util::MAX_TRANSACTION_LOOP,
746        )
747        .unwrap();
748        // `.record()` uses a pool, so we can't pass the moc connection directly.
749        // Instead, we're going to essentially recreate what `.record()` does calling
750        // the `.internal_record()` function, followed by the database `.record()`.
751        // This emulates
752        // ```
753        // pr.record(&Some(test_id.clone()), new, &None, Some(expr))
754        //    .await?;
755        // ```
756
757        // and mock the redis call.
758        pr.internal_record(&mut conn, &old, new, Some(expr), &test_id)
759            .await?;
760
761        db_box
762            .log_report(&test_id, new)
763            .await
764            .inspect_err(|e| {
765                warn!("πŸ”βš οΈ Unable to record reliability state log: {:?}", e);
766            })
767            .map_err(|e| ApcErrorKind::GeneralError(e.to_string()))?;
768
769        Ok(())
770    }
771
772    #[actix_rt::test]
773    async fn test_push_reliability_record() -> Result<()> {
774        let db = crate::db::mock::MockDbClient::new();
775        let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
776        let new = ReliabilityState::Stored;
777        let old = ReliabilityState::Received;
778        let expr = 1;
779
780        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
781        let new_key = ExpiryKey {
782            id: test_id.clone(),
783            state: new,
784        }
785        .to_string();
786        let old_key = ExpiryKey {
787            id: test_id.clone(),
788            state: old,
789        }
790        .to_string();
791
792        let state_key = format!("state.{test_id}");
793
794        let mut mock_pipe = redis::Pipeline::new();
795        mock_pipe
796            .cmd("MULTI")
797            .ignore()
798            // Decrement the old state
799            .cmd("HINCRBY")
800            .arg(COUNTS)
801            .arg(old.to_string())
802            .arg(-1)
803            .ignore()
804            // Replace the combined old state key in the expiry set.
805            .cmd("ZREM")
806            .arg(EXPIRY)
807            .arg(old_key)
808            .ignore()
809            .cmd("ZADD")
810            .arg(EXPIRY)
811            .arg(expr)
812            .arg(new_key)
813            .ignore()
814            .cmd("HINCRBY")
815            .arg(COUNTS)
816            .arg(new.to_string())
817            .arg(1)
818            .cmd("SET")
819            .arg(&state_key)
820            .arg(new.to_string())
821            .arg("XX")
822            .arg("GET")
823            .arg("KEEPTTL")
824            .ignore()
825            .cmd("EXEC")
826            .ignore();
827
828        let mut conn = MockRedisConnection::new(vec![
829            // Create the new state
830            MockCmd::new(
831                redis::cmd("WATCH").arg(&state_key).arg(EXPIRY),
832                Ok(redis::Value::Okay),
833            ),
834            MockCmd::new(
835                redis::cmd("GET").arg(&state_key),
836                Ok(redis::Value::SimpleString(old.to_string())),
837            ),
838            // NOTE: Technically, since we `.ignore()` these, we could just have a
839            // Vec containing just `Okay`. I'm being a bit pedantic here because I know
840            // that this will come back to haunt me if I'm not, and because figuring out
841            // the proper response for this was annoying.
842            MockCmd::new(
843                mock_pipe,
844                Ok(redis::Value::Array(vec![
845                    redis::Value::Okay,
846                    // Match the number of commands that are being held for processing
847                    redis::Value::SimpleString("QUEUED".to_owned()), // multi
848                    redis::Value::SimpleString("QUEUED".to_owned()), // hincrby
849                    redis::Value::SimpleString("QUEUED".to_owned()), // zrem
850                    redis::Value::SimpleString("QUEUED".to_owned()), // zadd
851                    redis::Value::SimpleString("QUEUED".to_owned()), // hincrby
852                    redis::Value::SimpleString("QUEUED".to_owned()), // set
853                    // the exec has been called, return an array containing the results.
854                    redis::Value::Array(vec![
855                        redis::Value::Okay, //Multi
856                        redis::Value::Okay, //hincrby
857                        redis::Value::Okay, //zrem
858                        redis::Value::Okay, //zadd
859                        redis::Value::Okay, //hincr
860                        redis::Value::SimpleString(old.to_string()),
861                    ]),
862                ])),
863            ),
864            // If the transaction fails, this should return a redis::Value::Nil
865            MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
866        ]);
867
868        // test the main report function (note, this does not test redis)
869        let pr = PushReliability::new(
870            &None,
871            Box::new(Arc::new(db)),
872            &metrics,
873            crate::redis_util::MAX_TRANSACTION_LOOP,
874        )
875        .unwrap();
876        let _ = pr
877            .internal_record(&mut conn, &Some(old), new, Some(expr), &test_id)
878            .await;
879
880        Ok(())
881    }
882
883    #[actix_rt::test]
884    async fn test_push_reliability_full() -> Result<()> {
885        let db = crate::db::mock::MockDbClient::new();
886        let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
887        let new = ReliabilityState::Received;
888        let stored = ReliabilityState::Stored;
889        let expr = 1;
890
891        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
892        let new_key = ExpiryKey {
893            id: test_id.clone(),
894            state: new,
895        }
896        .to_string();
897        let stored_key = ExpiryKey {
898            id: test_id.clone(),
899            state: stored,
900        }
901        .to_string();
902
903        let state_key = format!("state.{test_id}");
904
905        let mut mock_pipe = redis::Pipeline::new();
906        mock_pipe
907            .cmd("MULTI")
908            .ignore()
909            // Decrement the old state count
910            .cmd("HINCRBY")
911            .arg(COUNTS)
912            .arg(stored.to_string())
913            .arg(-1)
914            .ignore()
915            // Replace the expiry key
916            .cmd("ZREM")
917            .arg(EXPIRY)
918            .arg(stored_key.to_string())
919            .ignore()
920            .cmd("ZADD")
921            .arg(EXPIRY)
922            .arg(expr)
923            .arg(new_key)
924            .ignore()
925            // Increment the new state count
926            .cmd("HINCRBY")
927            .arg(COUNTS)
928            .arg(new.to_string())
929            .arg(1)
930            .ignore()
931            // And create the new state transition key (since the message is "live" again.)
932            .cmd("SET")
933            .arg(&state_key)
934            .arg(new.to_string())
935            .arg("XX")
936            .arg("GET")
937            .cmd("EXEC")
938            .ignore();
939
940        let mut conn = MockRedisConnection::new(vec![
941            MockCmd::new(
942                // Create the new state
943                redis::cmd("SET")
944                    .arg(format!("state.{test_id}"))
945                    .arg(new.to_string())
946                    .arg("NX")
947                    .arg("EX")
948                    .arg(expr),
949                Ok(redis::Value::Okay),
950            ),
951            // increment the count for new
952            MockCmd::new(
953                redis::cmd("HINCRBY")
954                    .arg(COUNTS)
955                    .arg(new.to_string())
956                    .arg(1),
957                Ok(redis::Value::Okay),
958            ),
959            // begin the transaction
960            MockCmd::new(
961                redis::cmd("WATCH").arg(&state_key).arg(EXPIRY),
962                Ok(redis::Value::Okay),
963            ),
964            // NOTE: Technically, since we `.ignore()` these, we could just have a
965            // Vec containing just `Okay`. I'm being a bit pedantic here because I know
966            // that this will come back to haunt me if I'm not, and because figuring out
967            // the proper response for this was annoying.
968            MockCmd::new(
969                mock_pipe,
970                Ok(redis::Value::Array(vec![
971                    redis::Value::Okay,
972                    // Match the number of commands that are being held for processing
973                    redis::Value::SimpleString("QUEUED".to_owned()),
974                    redis::Value::SimpleString("QUEUED".to_owned()),
975                    redis::Value::SimpleString("QUEUED".to_owned()),
976                    redis::Value::SimpleString("QUEUED".to_owned()),
977                    // the exec has been called, return an array containing the results.
978                    redis::Value::Array(vec![
979                        redis::Value::Okay,
980                        redis::Value::Okay,
981                        redis::Value::Okay,
982                        redis::Value::Okay,
983                    ]),
984                ])),
985            ),
986            // If the transaction fails, this should return a redis::Value::Nil
987            MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
988        ]);
989
990        // test the main report function (note, this does not test redis)
991        let pr = PushReliability::new(
992            &None,
993            Box::new(Arc::new(db)),
994            &metrics,
995            crate::redis_util::MAX_TRANSACTION_LOOP,
996        )
997        .unwrap();
998        let _ = pr
999            .internal_record(&mut conn, &Some(stored), new, Some(expr), &test_id)
1000            .await;
1001
1002        Ok(())
1003    }
1004
1005    #[actix_rt::test]
1006    async fn test_push_reliability_gc() -> Result<()> {
1007        let db = crate::db::mock::MockDbClient::new();
1008        let test_id = format!("TEST_VALUE_{}", Uuid::new_v4());
1009        let new = ReliabilityState::Accepted;
1010        let key = format!("{}#{}", &test_id, &new);
1011        let expr = 1;
1012        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1013
1014        let response: redis::Value = redis::Value::Array(vec![redis::Value::SimpleString(
1015            ExpiryKey {
1016                id: test_id.clone(),
1017                state: new,
1018            }
1019            .to_string(),
1020        )]);
1021
1022        // Construct the Pipeline.
1023        // A redis "pipeline" is a set of instructions that are executed in order. These
1024        // are not truly atomic, as other actions can interrupt a pipeline, but they
1025        // are guaranteed to happen in sequence. Transactions essentially check that the
1026        // WATCH key is not altered before the pipeline is executed.
1027        let mut mock_pipe = redis::Pipeline::new();
1028        mock_pipe
1029            .cmd("MULTI")
1030            .ignore()
1031            // Adjust the state counts.
1032            .cmd("HINCRBY")
1033            .arg(COUNTS)
1034            .arg(new.to_string())
1035            .arg(-1)
1036            .ignore()
1037            .cmd("HINCRBY")
1038            .arg(COUNTS)
1039            .arg(ReliabilityState::Expired.to_string())
1040            .arg(1)
1041            // Replace the state key in the expiry set.
1042            .ignore()
1043            .cmd("ZREM")
1044            .arg(EXPIRY)
1045            .arg(key)
1046            .ignore()
1047            .cmd("EXEC")
1048            .ignore();
1049
1050        // This mocks the "conn"ection, so all commands sent to the connection need
1051        // to be separated out. This includes the transaction "WATCH" and "UNWATCH".
1052        // The "meat" of the functions are handled via a Pipeline, which conjoins the
1053        // commands into one group (see above). Pipelines return an array of
1054        // results, one entry for each pipeline command.
1055        // In other words, the `internal_gc` commands are probably in the pipeline,
1056        // all the others are probably in the conn.
1057        let mut conn = MockRedisConnection::new(vec![
1058            MockCmd::new(redis::cmd("WATCH").arg(EXPIRY), Ok(redis::Value::Okay)),
1059            MockCmd::new(
1060                redis::cmd("ZRANGEBYSCORE").arg(EXPIRY).arg(0).arg(expr),
1061                Ok(response),
1062            ),
1063            // NOTE: Technically, since we `.ignore()` these, we could just have a
1064            // Vec containing just `Okay`. I'm being a bit pedantic here because I know
1065            // that this will come back to haunt me if I'm not, and because figuring out
1066            // the proper response for this was annoying.
1067            MockCmd::new(
1068                mock_pipe,
1069                Ok(redis::Value::Array(vec![
1070                    redis::Value::Okay,
1071                    // Match the number of commands that are being held for processing
1072                    redis::Value::SimpleString("QUEUED".to_owned()),
1073                    redis::Value::SimpleString("QUEUED".to_owned()),
1074                    redis::Value::SimpleString("QUEUED".to_owned()),
1075                    // the exec has been called, return an array containing the results.
1076                    redis::Value::Array(vec![
1077                        redis::Value::Okay,
1078                        redis::Value::Okay,
1079                        redis::Value::Okay,
1080                    ]),
1081                ])),
1082            ),
1083            // If the transaction fails, this should return a redis::Value::Nil
1084            MockCmd::new(redis::cmd("UNWATCH"), Ok(redis::Value::Okay)),
1085        ]);
1086
1087        // test the main report function (note, this does not test redis)
1088        let pr = PushReliability::new(
1089            &None,
1090            Box::new(Arc::new(db)),
1091            &metrics,
1092            crate::redis_util::MAX_TRANSACTION_LOOP,
1093        )
1094        .unwrap();
1095        // functionally a no-op, but it does exercise lines.
1096        pr.gc().await?;
1097
1098        // and mock the redis call.
1099        pr.internal_gc(&mut conn, 1).await?;
1100
1101        Ok(())
1102    }
1103}