autopush_common/db/
mod.rs

1/// Contains the general Database access bits
2///
3/// Database access is abstracted into a DbClient impl
4/// which contains the required trait functions the
5/// application will need to perform in the database.
6/// Each of the abstractions contains a DbClientImpl
7/// that is responsible for carrying out the requested
8/// functions. Each of the data stores are VERY
9/// different, although the requested functions
10/// are fairly simple.
11use std::collections::{HashMap, HashSet};
12use std::result::Result as StdResult;
13
14use derive_builder::Builder;
15use serde::Serializer;
16use serde_derive::{Deserialize, Serialize};
17use uuid::Uuid;
18
19#[cfg(feature = "bigtable")]
20pub mod bigtable;
21pub mod client;
22pub mod error;
23pub mod models;
24#[cfg(feature = "redis")]
25pub mod redis;
26pub mod reporter;
27pub mod routing;
28
29// used by integration testing
30pub mod mock;
31
32pub use reporter::spawn_pool_periodic_reporter;
33
34use crate::notification::Notification;
35use crate::util::timing::ms_since_epoch;
36
37pub const USER_RECORD_VERSION: u64 = 1;
38
39#[derive(Eq, Debug, PartialEq)]
40pub enum StorageType {
41    INVALID,
42    #[cfg(feature = "bigtable")]
43    BigTable,
44    #[cfg(feature = "redis")]
45    Redis,
46}
47
48impl From<&str> for StorageType {
49    fn from(name: &str) -> Self {
50        match name.to_lowercase().as_str() {
51            #[cfg(feature = "bigtable")]
52            "bigtable" => Self::BigTable,
53            #[cfg(feature = "redis")]
54            "redis" => Self::Redis,
55            _ => Self::INVALID,
56        }
57    }
58}
59
60/// The type of storage to use.
61#[allow(clippy::vec_init_then_push)] // Because we are only pushing on feature flags.
62impl StorageType {
63    fn available<'a>() -> Vec<&'a str> {
64        #[allow(unused_mut)]
65        let mut result: Vec<&str> = Vec::new();
66        #[cfg(feature = "bigtable")]
67        result.push("Bigtable");
68        #[cfg(feature = "redis")]
69        result.push("Redis");
70        result
71    }
72
73    pub fn from_dsn(dsn: &Option<String>) -> Self {
74        debug!("Supported data types: {:?}", StorageType::available());
75        debug!("Checking DSN: {:?}", &dsn);
76        if dsn.is_none() {
77            let default = Self::available()[0];
78            info!("No DSN specified, failing over to old default dsn: {default}");
79            return Self::from(default);
80        }
81        let dsn = dsn.clone().unwrap_or_default();
82        #[cfg(feature = "bigtable")]
83        if dsn.starts_with("grpc") {
84            trace!("Found grpc");
85            // Credentials can be stored in either a path provided in an environment
86            // variable, or $HOME/.config/gcloud/applicaion_default_credentals.json
87            //
88            // NOTE: if no credentials are found, application will panic
89            //
90            if let Ok(cred) = std::env::var("GOOGLE_APPLICATION_CREDENTIALS") {
91                trace!("Env: {:?}", cred);
92            }
93            return Self::BigTable;
94        }
95        #[cfg(feature = "redis")]
96        if dsn.starts_with("redis") {
97            trace!("Found redis");
98            return Self::Redis;
99        }
100        Self::INVALID
101    }
102}
103
104/// The universal settings for the database
105/// abstractor.
106#[derive(Clone, Debug, Default, Deserialize)]
107pub struct DbSettings {
108    /// Database connector string
109    pub dsn: Option<String>,
110    /// A JSON formatted dictionary containing Database settings that
111    /// are specific to the type of Data storage specified in the `dsn`
112    /// See the respective settings structure for
113    /// [crate::db::bigtable::BigTableDbSettings]
114    pub db_settings: String,
115}
116//TODO: add `From<autopush::settings::Settings> for DbSettings`?
117//TODO: add `From<autoendpoint::settings::Settings> for DbSettings`?
118
119/// Custom Uuid serializer
120///
121/// Serializes a Uuid as a simple string instead of hyphenated
122pub fn uuid_serializer<S>(x: &Uuid, s: S) -> StdResult<S::Ok, S::Error>
123where
124    S: Serializer,
125{
126    s.serialize_str(&x.simple().to_string())
127}
128
129#[derive(Clone, Default, Debug)]
130pub struct CheckStorageResponse {
131    /// The messages include a "topic"
132    /// "topics" are messages that replace prior messages of that topic.
133    /// (e.g. you can only have one message for a topic of "foo")
134    pub include_topic: bool,
135    /// The list of pending messages.
136    pub messages: Vec<Notification>,
137    /// All the messages up to this timestamp
138    pub timestamp: Option<u64>,
139}
140
141/// A user data record.
142#[derive(Deserialize, PartialEq, Debug, Clone, Serialize, Builder)]
143#[builder(default, setter(strip_option))]
144pub struct User {
145    /// The UAID. This is generally a UUID4. It needs to be globally
146    /// unique.
147    #[serde(serialize_with = "uuid_serializer")]
148    pub uaid: Uuid,
149    /// Time in milliseconds that the user last connected at
150    pub connected_at: u64,
151    /// Router type of the user
152    pub router_type: String,
153    /// Router-specific data
154    pub router_data: Option<HashMap<String, serde_json::Value>>,
155    /// Last node/port the client was or may be connected to
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub node_id: Option<String>,
158    /// Record version
159    #[serde(skip_serializing_if = "Option::is_none")]
160    pub record_version: Option<u64>,
161    /// the timestamp of the last notification sent to the user
162    /// This field is exclusive to the Bigtable data scheme
163    //TODO: rename this to `last_notification_timestamp`
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub current_timestamp: Option<u64>,
166    /// UUID4 version number for optimistic locking of updates on Bigtable
167    #[serde(skip_serializing)]
168    pub version: Option<Uuid>,
169    /// Set of user's channel ids. These are stored in router (user) record's
170    /// row in Bigtable. They are read along with the rest of the user record
171    /// so that them, along with every other field in the router record, will
172    /// automatically have their TTL (cell timestamp) reset during
173    /// [DbClient::update_user].
174    ///
175    /// This is solely used for the sake of that update thus private.
176    /// [DbClient::get_channels] is preferred for reading the latest version of
177    /// the channel ids (partly due to historical purposes but also is a more
178    /// flexible API that might benefit different, non Bigtable [DbClient]
179    /// backends that don't necessarily store the channel ids in the router
180    /// record).
181    priv_channels: HashSet<Uuid>,
182}
183
184impl Default for User {
185    fn default() -> Self {
186        let uaid = Uuid::new_v4();
187        //trace!(">>> Setting default uaid: {:?}", &uaid);
188        Self {
189            uaid,
190            connected_at: ms_since_epoch(),
191            router_type: "webpush".to_string(),
192            router_data: None,
193            node_id: None,
194            record_version: Some(USER_RECORD_VERSION),
195            current_timestamp: None,
196            version: Some(Uuid::new_v4()),
197            priv_channels: HashSet::new(),
198        }
199    }
200}
201
202impl User {
203    /// Return a new [UserBuilder] (generated from [derive_builder::Builder])
204    pub fn builder() -> UserBuilder {
205        UserBuilder::default()
206    }
207
208    pub fn channel_count(&self) -> usize {
209        self.priv_channels.len()
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::{User, USER_RECORD_VERSION};
216
217    #[test]
218    fn user_defaults() {
219        let user = User::builder().current_timestamp(22).build().unwrap();
220        assert_eq!(user.current_timestamp, Some(22));
221        assert_eq!(user.router_type, "webpush".to_owned());
222        assert_eq!(user.record_version, Some(USER_RECORD_VERSION));
223    }
224}