autopush_common/
reliability.rsuse std::collections::HashMap;
use std::sync::Arc;
use redis::Commands;
use crate::db::client::DbClient;
use crate::errors::{ApcError, ApcErrorKind, Result};
pub const COUNTS: &str = "state_counts";
pub const EXPIRY: &str = "expiry";
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize)]
pub enum ReliabilityState {
#[serde(rename = "received")]
Received, #[serde(rename = "stored")]
Stored, #[serde(rename = "retrieved")]
Retrieved, #[serde(rename = "transmitted_webpush")]
IntTransmitted, #[serde(rename = "accepted_webpush")]
IntAccepted, #[serde(rename = "transmitted")]
Transmitted, #[serde(rename = "accepted")]
Accepted, #[serde(rename = "delivered")]
Delivered, #[serde(rename = "expired")]
Expired, }
impl std::fmt::Display for ReliabilityState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Self::Received => "received",
Self::Stored => "stored",
Self::Retrieved => "retrieved",
Self::Transmitted => "transmitted",
Self::IntTransmitted => "transmitted_webpush",
Self::IntAccepted => "accepted_webpush",
Self::Accepted => "accepted",
Self::Delivered => "delivered",
Self::Expired => "expired",
})
}
}
impl std::str::FromStr for ReliabilityState {
type Err = ApcError;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Ok(match s.to_lowercase().as_str() {
"received" => Self::Received,
"stored" => Self::Stored,
"retrieved" => Self::Retrieved,
"transmitted" => Self::Transmitted,
"accepted" => Self::Accepted,
"transmitted_webpush" => Self::IntTransmitted,
"accepted_webpush" => Self::IntAccepted,
"delivered" => Self::Delivered,
"expired" => Self::Expired,
_ => {
return Err(
ApcErrorKind::GeneralError(format!("Unknown tracker state \"{}\"", s)).into(),
);
}
})
}
}
impl serde::Serialize for ReliabilityState {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
#[derive(Clone)]
pub struct PushReliability {
client: Option<Arc<redis::Client>>,
db: Box<dyn DbClient>,
}
impl PushReliability {
pub fn new(reliability_dsn: &Option<String>, db: Box<dyn DbClient>) -> Result<Self> {
if reliability_dsn.is_none() {
debug!("🔍 No reliability DSN declared.");
return Ok(Self {
client: None,
db: db.clone(),
});
};
let client = if let Some(dsn) = reliability_dsn {
let rclient = redis::Client::open(dsn.clone()).map_err(|e| {
ApcErrorKind::GeneralError(format!("Could not connect to redis server: {:?}", e))
})?;
Some(Arc::new(rclient))
} else {
None
};
Ok(Self {
client,
db: db.clone(),
})
}
pub async fn record(
&self,
reliability_id: &Option<String>,
new: ReliabilityState,
old: &Option<ReliabilityState>,
expr: Option<u64>,
) -> Option<ReliabilityState> {
let Some(id) = reliability_id else {
return None;
};
if let Some(client) = &self.client {
debug!(
"🔍 {} from {} to {}",
id,
old.map(|v| v.to_string())
.unwrap_or_else(|| "None".to_owned()),
new
);
if let Ok(mut con) = client.get_connection() {
let mut pipeline = redis::Pipeline::new();
let pipeline = pipeline.hincr(COUNTS, new.to_string(), 1);
let pipeline = if let Some(old) = old {
pipeline
.hincr(COUNTS, old.to_string(), -1)
.zrem(EXPIRY, format!("{}#{}", &old, id))
} else {
pipeline
};
let _ = pipeline
.zadd(EXPIRY, format!("{}#{}", new, id), expr.unwrap_or_default())
.exec(&mut con)
.inspect_err(|e| {
warn!("🔍 Failed to write to storage: {:?}", e);
});
}
};
let _ = self.db.log_report(id, new).await.inspect_err(|e| {
warn!("🔍 Unable to record reliability state: {:?}", e);
});
Some(new)
}
pub async fn report(&self) -> Result<Option<HashMap<String, i32>>> {
if let Some(client) = &self.client {
if let Ok(mut conn) = client.get_connection() {
return Ok(Some(conn.hgetall(COUNTS).map_err(|e| {
ApcErrorKind::GeneralError(format!("Could not read report {:?}", e))
})?));
}
}
Ok(None)
}
}