autopush_common/db/redis/
mod.rs

1/// This uses redis as a storage and management
2/// system for Autopush Notifications and Routing information.
3///
4/// Keys for the data are
5/// `autopush/user/{uaid}` String to store the user data
6/// `autopush/co/{uaid}` u64 to store the last time the user has interacted with the server
7/// `autopush/timestamp/{uaid}` u64 to store the last storage timestamp incremented by the server, once messages are delivered
8/// `autopush/channels/{uaid}` List to store the list of the channels of the user
9/// `autopush/msgs/{uaid}` SortedSet to store the list of the pending message ids for the user
10/// `autopush/msgs_exp/{uaid}` SortedSet to store the list of the pending message ids, ordered by expiry date, this is because SortedSet elements can't have independent expiry date
11/// `autopush/msg/{uaid}/{chidmessageid}`, with `{chidmessageid} == {chid}:{version}` String to store
12/// the content of the messages
13///
14mod redis_client;
15
16pub use redis_client::RedisClientImpl;
17
18use std::collections::HashMap;
19use std::time::Duration;
20
21use crate::db::error::DbError;
22use crate::notification::{Notification, default_ttl};
23use crate::util::deserialize_opt_u32_to_duration;
24
25use serde_derive::{Deserialize, Serialize};
26use uuid::Uuid;
27
28/// The settings for accessing the redis contents.
29#[derive(Clone, Debug, Deserialize)]
30#[serde(default)]
31pub struct RedisDbSettings {
32    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
33    pub create_timeout: Option<Duration>,
34    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
35    // Minimum value is 1 (second), defaults to MAX_ROUTER_TTL_SECS
36    pub router_ttl: Option<Duration>,
37    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
38    // Minimum value is 1 (second), defaults to MAX_NOTIFICATION_TTL_SECS
39    pub notification_ttl: Option<Duration>,
40}
41
42#[allow(clippy::derivable_impls)]
43impl Default for RedisDbSettings {
44    fn default() -> Self {
45        Self {
46            create_timeout: Default::default(),
47            router_ttl: Some(Duration::from_secs(crate::MAX_ROUTER_TTL_SECS)),
48            notification_ttl: Some(Duration::from_secs(crate::MAX_NOTIFICATION_TTL_SECS)),
49        }
50    }
51}
52
53impl TryFrom<&str> for RedisDbSettings {
54    type Error = DbError;
55    fn try_from(setting_string: &str) -> Result<Self, Self::Error> {
56        let me: Self = match serde_json::from_str(setting_string) {
57            Ok(me) => me,
58            Err(e) if e.is_eof() => Self::default(),
59            Err(e) => Err(DbError::General(format!(
60                "Could not parse RedisDbSettings: {:?}",
61                e
62            )))?,
63        };
64        if let Some(router_ttl) = me.router_ttl
65            && router_ttl.as_secs() == 0
66        {
67            return Err(DbError::General(
68                "router_ttl must be greater than 0".to_string(),
69            ));
70        }
71        if let Some(notification_ttl) = me.notification_ttl
72            && notification_ttl.as_secs() == 0
73        {
74            return Err(DbError::General(
75                "notification_ttl must be greater than 0".to_string(),
76            ));
77        }
78        // Supply defaults for explicitly null values (deserializer handles missing keys)
79        // Otherwise it defaults to 0 duration, which is not a valid TTL
80        let me = Self {
81            router_ttl: me
82                .router_ttl
83                .or(Some(Duration::from_secs(crate::MAX_ROUTER_TTL_SECS))),
84            notification_ttl: me
85                .notification_ttl
86                .or(Some(Duration::from_secs(crate::MAX_NOTIFICATION_TTL_SECS))),
87            ..me
88        };
89        Ok(me)
90    }
91}
92
93#[derive(Serialize, Default, Deserialize, Clone, Debug)]
94/// A Publishable Notification record. This is a notification that is either
95/// received from a third party or is outbound to a UserAgent.
96///
97pub struct StorableNotification {
98    // Required values
99    #[serde(rename = "channelID")]
100    pub channel_id: Uuid,
101    pub version: String,
102    pub timestamp: u64,
103    // Possibly stored values, provided with a default.
104    #[serde(default = "default_ttl", skip_serializing)]
105    pub ttl: u64,
106    // Optional values, which imply a "None" default.
107    #[serde(skip_serializing)]
108    pub topic: Option<String>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub data: Option<String>,
111    #[serde(skip_serializing)]
112    pub sortkey_timestamp: Option<u64>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub headers: Option<HashMap<String, String>>,
115    #[cfg(feature = "reliable_report")]
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub reliability_id: Option<String>,
118    #[cfg(feature = "reliable_report")]
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub reliable_state: Option<crate::reliability::ReliabilityState>,
121}
122
123impl From<Notification> for StorableNotification {
124    fn from(notification: Notification) -> Self {
125        Self {
126            channel_id: notification.channel_id,
127            version: notification.version,
128            timestamp: notification.timestamp,
129            ttl: notification.ttl,
130            topic: notification.topic,
131            data: notification.data,
132            sortkey_timestamp: notification.sortkey_timestamp,
133            headers: notification.headers,
134            #[cfg(feature = "reliable_report")]
135            reliability_id: notification.reliability_id,
136            #[cfg(feature = "reliable_report")]
137            reliable_state: notification.reliable_state,
138        }
139    }
140}
141
142impl From<StorableNotification> for Notification {
143    fn from(storable: StorableNotification) -> Self {
144        Self {
145            channel_id: storable.channel_id,
146            version: storable.version,
147            timestamp: storable.timestamp,
148            ttl: storable.ttl,
149            topic: storable.topic,
150            data: storable.data,
151            sortkey_timestamp: storable.sortkey_timestamp,
152            headers: storable.headers,
153            #[cfg(feature = "reliable_report")]
154            reliability_id: storable.reliability_id,
155            #[cfg(feature = "reliable_report")]
156            reliable_state: storable.reliable_state,
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests {
163
164    use std::time::Duration;
165
166    #[test]
167    fn test_settings_parse() -> Result<(), crate::db::error::DbError> {
168        let settings = super::RedisDbSettings::try_from("{\"create_timeout\": 123}")?;
169        assert_eq!(
170            settings.create_timeout,
171            Some(std::time::Duration::from_secs(123))
172        );
173        let settings = super::RedisDbSettings::try_from("{}")?;
174        assert_ne!(settings.router_ttl, Some(Duration::from_secs(0)));
175        assert_ne!(settings.notification_ttl, Some(Duration::from_secs(0)));
176        let settings = super::RedisDbSettings::try_from("{\"router_ttl\":0}");
177        assert!(settings.is_err());
178        let settings =
179            super::RedisDbSettings::try_from("{\"notification_ttl\": null, \"router_ttl\": null}")?;
180        assert_eq!(
181            settings.notification_ttl,
182            Some(std::time::Duration::from_secs(
183                crate::MAX_NOTIFICATION_TTL_SECS
184            ))
185        );
186        assert_eq!(
187            settings.router_ttl,
188            Some(std::time::Duration::from_secs(crate::MAX_ROUTER_TTL_SECS))
189        );
190        Ok(())
191    }
192}