autopush_common/
notification.rs1use std::collections::HashMap;
3#[cfg(feature = "postgres")]
4use std::str::FromStr;
5
6use serde_derive::{Deserialize, Serialize};
7use uuid::Uuid;
8
9#[cfg(feature = "postgres")]
10use crate::db::error::DbError;
11use crate::util::ms_since_epoch;
12
13#[derive(Serialize, Default, Deserialize, Clone, Debug)]
14pub struct Notification {
18 #[serde(rename = "channelID")]
20 pub channel_id: Uuid,
21 pub version: String,
22 #[serde(skip_serializing)]
23 pub timestamp: u64,
24 #[serde(default = "default_ttl", skip_serializing)]
26 pub ttl: u64,
27 #[serde(skip_serializing)]
29 pub topic: Option<String>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub data: Option<String>,
32 #[serde(skip_serializing)]
33 pub sortkey_timestamp: Option<u64>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub headers: Option<HashMap<String, String>>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 #[cfg(feature = "reliable_report")]
38 pub reliability_id: Option<String>,
39 #[cfg(feature = "reliable_report")]
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub reliable_state: Option<crate::reliability::ReliabilityState>,
42}
43
44pub const TOPIC_NOTIFICATION_PREFIX: &str = "01";
45pub const STANDARD_NOTIFICATION_PREFIX: &str = "02";
46
47impl Notification {
48 pub fn chidmessageid(&self) -> String {
59 let chid = self.channel_id.as_hyphenated();
60
61 if let Some(ref topic) = self.topic {
62 format!("{TOPIC_NOTIFICATION_PREFIX}:{chid}:{topic}")
63 } else if let Some(sortkey_timestamp) = self.sortkey_timestamp {
64 format!(
65 "{STANDARD_NOTIFICATION_PREFIX}:{}:{}",
66 if sortkey_timestamp == 0 {
67 ms_since_epoch()
68 } else {
69 sortkey_timestamp
70 },
71 chid
72 )
73 } else {
74 warn!("🚨 LEGACY MESSAGE!? {:?} ", self);
75 format!("{}:{}", chid, self.version)
77 }
78 }
79
80 pub fn expiry(&self) -> u64 {
81 self.timestamp + self.ttl
82 }
83
84 pub fn expired(&self, at_sec: u64) -> bool {
87 at_sec >= self.expiry()
88 }
89
90 #[cfg(feature = "reliable_report")]
91 pub async fn record_reliability(
92 &mut self,
93 reliability: &crate::reliability::PushReliability,
94 state: crate::reliability::ReliabilityState,
95 ) {
96 self.reliable_state = reliability
97 .record(
98 &self.reliability_id,
99 state,
100 &self.reliable_state,
101 Some(self.expiry()),
102 )
103 .await
104 .inspect_err(|e| {
105 warn!("🔍⚠️ Unable to record reliability state log: {:?}", e);
106 })
107 .unwrap_or(Some(state));
108 }
109
110 #[cfg(feature = "reliable_report")]
111 pub fn clone_without_reliability_state(&self) -> Self {
112 let mut cloned = self.clone();
113 cloned.reliable_state = None;
114 cloned
115 }
116}
117
118pub(crate) fn default_ttl() -> u64 {
119 0
120}
121
122#[cfg(feature = "postgres")]
123impl TryFrom<&tokio_postgres::Row> for Notification {
125 type Error = DbError;
126
127 fn try_from(row: &tokio_postgres::Row) -> Result<Self, Self::Error> {
128 #[cfg(feature = "reliable_report")]
129 use crate::reliability::ReliabilityState;
130 Ok(Self {
131 channel_id: row
132 .try_get::<&str, &str>("channel_id")
133 .map(|v| Uuid::from_str(v).unwrap())
134 .unwrap(),
135 version: row.try_get::<&str, String>("version").unwrap(),
136 ttl: row.try_get::<&str, i64>("ttl").map(|v| v as u64).unwrap(),
137 topic: row
138 .try_get::<&str, String>("topic")
139 .map(Some)
140 .unwrap_or_default(),
141 timestamp: row
142 .try_get::<&str, i64>("timestamp")
143 .map(|v| v as u64)
144 .unwrap(),
145 data: row.try_get::<&str, String>("data").map(Some).unwrap(),
146 sortkey_timestamp: row
147 .try_get::<&str, i64>("sortkey_timestamp")
148 .map(|v| Some(v as u64))
149 .unwrap_or_default(),
150 headers: row
151 .try_get::<&str, &str>("headers")
152 .map(|v| {
153 if v.is_empty() || v == "null" || v == "{}" {
154 return None;
155 }
156 let hdrs: HashMap<String, String> = serde_json::from_str(v).unwrap();
157 Some(hdrs)
158 })
159 .unwrap_or_default(),
160 #[cfg(feature = "reliable_report")]
161 reliability_id: row.try_get::<&str, String>("reliability_id").ok(),
162 #[cfg(feature = "reliable_report")]
163 reliable_state: row
164 .try_get::<&str, &str>("reliable_state")
165 .map(|v| ReliabilityState::from_str(v).unwrap())
166 .ok(),
167 })
168 }
169}