autopush_common/
notification.rs

1//! Notification protocol
2use std::collections::HashMap;
3#[cfg(feature = "postgres")]
4use std::str::FromStr;
5
6use serde_derive::{Deserialize, Serialize};
7use uuid::Uuid;
8
9#[cfg(feature = "postgres")]
10use crate::db::error::DbError;
11use crate::util::ms_since_epoch;
12
13#[derive(Serialize, Default, Deserialize, Clone, Debug)]
14/// A Publishable Notification record. This is a notification that is either
15/// received from a third party or is outbound to a UserAgent.
16///
17pub struct Notification {
18    // Required values
19    #[serde(rename = "channelID")]
20    pub channel_id: Uuid,
21    pub version: String,
22    #[serde(skip_serializing)]
23    pub timestamp: u64,
24    // Possibly stored values, provided with a default.
25    #[serde(default = "default_ttl", skip_serializing)]
26    pub ttl: u64,
27    // Optional values, which imply a "None" default.
28    #[serde(skip_serializing)]
29    pub topic: Option<String>,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub data: Option<String>,
32    #[serde(skip_serializing)]
33    pub sortkey_timestamp: Option<u64>,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub headers: Option<HashMap<String, String>>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    #[cfg(feature = "reliable_report")]
38    pub reliability_id: Option<String>,
39    #[cfg(feature = "reliable_report")]
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub reliable_state: Option<crate::reliability::ReliabilityState>,
42}
43
44pub const TOPIC_NOTIFICATION_PREFIX: &str = "01";
45pub const STANDARD_NOTIFICATION_PREFIX: &str = "02";
46
47impl Notification {
48    /// Return an appropriate chidmessageid
49    ///
50    /// For standard messages:
51    ///     {STANDARD_NOTIFICATION_PREFIX}:{sortkey_timestamp}:{chid}
52    ///
53    /// For topic messages:
54    ///     {TOPIC_NOTIFICATION_PREFIX}:{chid}:{topic}
55    ///
56    /// Old format for non-topic messages that is no longer returned:
57    ///     {chid}:{message_id}
58    pub fn chidmessageid(&self) -> String {
59        let chid = self.channel_id.as_hyphenated();
60
61        if let Some(ref topic) = self.topic {
62            format!("{TOPIC_NOTIFICATION_PREFIX}:{chid}:{topic}")
63        } else if let Some(sortkey_timestamp) = self.sortkey_timestamp {
64            format!(
65                "{STANDARD_NOTIFICATION_PREFIX}:{}:{}",
66                if sortkey_timestamp == 0 {
67                    ms_since_epoch()
68                } else {
69                    sortkey_timestamp
70                },
71                chid
72            )
73        } else {
74            warn!("🚨 LEGACY MESSAGE!? {:?} ", self);
75            // Legacy messages which we should never get anymore
76            format!("{}:{}", chid, self.version)
77        }
78    }
79
80    pub fn expiry(&self) -> u64 {
81        self.timestamp + self.ttl
82    }
83
84    /// Convenience function to determine if the notification
85    /// has aged out.
86    pub fn expired(&self, at_sec: u64) -> bool {
87        at_sec >= self.expiry()
88    }
89
90    #[cfg(feature = "reliable_report")]
91    pub async fn record_reliability(
92        &mut self,
93        reliability: &crate::reliability::PushReliability,
94        state: crate::reliability::ReliabilityState,
95    ) {
96        self.reliable_state = reliability
97            .record(
98                &self.reliability_id,
99                state,
100                &self.reliable_state,
101                Some(self.expiry()),
102            )
103            .await
104            .inspect_err(|e| {
105                warn!("🔍⚠️ Unable to record reliability state log: {:?}", e);
106            })
107            .unwrap_or(Some(state));
108    }
109
110    #[cfg(feature = "reliable_report")]
111    pub fn clone_without_reliability_state(&self) -> Self {
112        let mut cloned = self.clone();
113        cloned.reliable_state = None;
114        cloned
115    }
116}
117
118pub(crate) fn default_ttl() -> u64 {
119    0
120}
121
122#[cfg(feature = "postgres")]
123/// Postgres Message Row to Notification (may be used as a model for other DBs later)
124impl TryFrom<&tokio_postgres::Row> for Notification {
125    type Error = DbError;
126
127    fn try_from(row: &tokio_postgres::Row) -> Result<Self, Self::Error> {
128        #[cfg(feature = "reliable_report")]
129        use crate::reliability::ReliabilityState;
130        Ok(Self {
131            channel_id: row
132                .try_get::<&str, &str>("channel_id")
133                .map(|v| Uuid::from_str(v).unwrap())
134                .unwrap(),
135            version: row.try_get::<&str, String>("version").unwrap(),
136            ttl: row.try_get::<&str, i64>("ttl").map(|v| v as u64).unwrap(),
137            topic: row
138                .try_get::<&str, String>("topic")
139                .map(Some)
140                .unwrap_or_default(),
141            timestamp: row
142                .try_get::<&str, i64>("timestamp")
143                .map(|v| v as u64)
144                .unwrap(),
145            data: row.try_get::<&str, String>("data").map(Some).unwrap(),
146            sortkey_timestamp: row
147                .try_get::<&str, i64>("sortkey_timestamp")
148                .map(|v| Some(v as u64))
149                .unwrap_or_default(),
150            headers: row
151                .try_get::<&str, &str>("headers")
152                .map(|v| {
153                    if v.is_empty() || v == "null" || v == "{}" {
154                        return None;
155                    }
156                    let hdrs: HashMap<String, String> = serde_json::from_str(v).unwrap();
157                    Some(hdrs)
158                })
159                .unwrap_or_default(),
160            #[cfg(feature = "reliable_report")]
161            reliability_id: row.try_get::<&str, String>("reliability_id").ok(),
162            #[cfg(feature = "reliable_report")]
163            reliable_state: row
164                .try_get::<&str, &str>("reliable_state")
165                .map(|v| ReliabilityState::from_str(v).unwrap())
166                .ok(),
167        })
168    }
169}