autopush_common/db/
mod.rs

1/// Contains the general Database access bits
2///
3/// Database access is abstracted into a DbClient impl
4/// which contains the required trait functions the
5/// application will need to perform in the database.
6/// Each of the abstractions contains a DbClientImpl
7/// that is responsible for carrying out the requested
8/// functions. Each of the data stores are VERY
9/// different, although the requested functions
10/// are fairly simple.
11use std::collections::{HashMap, HashSet};
12use std::result::Result as StdResult;
13
14use derive_builder::Builder;
15use serde::Serializer;
16use serde_derive::{Deserialize, Serialize};
17use uuid::Uuid;
18
19#[cfg(feature = "bigtable")]
20pub mod bigtable;
21pub mod client;
22pub mod error;
23pub mod models;
24#[cfg(feature = "postgres")]
25pub mod postgres;
26#[cfg(feature = "redis")]
27pub mod redis;
28pub mod reporter;
29pub mod routing;
30
31// used by integration testing
32pub mod mock;
33
34pub use reporter::spawn_pool_periodic_reporter;
35
36use crate::notification::Notification;
37use crate::util::timing::ms_since_epoch;
38
39pub const USER_RECORD_VERSION: u64 = 1;
40
41#[derive(Eq, Debug, PartialEq)]
42pub enum StorageType {
43    INVALID,
44    #[cfg(feature = "bigtable")]
45    BigTable,
46    #[cfg(feature = "postgres")]
47    Postgres,
48    #[cfg(feature = "redis")]
49    Redis,
50}
51
52impl From<&str> for StorageType {
53    fn from(name: &str) -> Self {
54        match name.to_lowercase().as_str() {
55            #[cfg(feature = "bigtable")]
56            "bigtable" => Self::BigTable,
57            #[cfg(feature = "postgres")]
58            "postgres" => Self::Postgres,
59            #[cfg(feature = "redis")]
60            "redis" => Self::Redis,
61            _ => Self::INVALID,
62        }
63    }
64}
65
66/// The type of storage to use.
67#[allow(clippy::vec_init_then_push)] // Because we are only pushing on feature flags.
68impl StorageType {
69    fn available<'a>() -> Vec<&'a str> {
70        #[allow(unused_mut)]
71        let mut result: Vec<&str> = Vec::new();
72        #[cfg(feature = "bigtable")]
73        result.push("Bigtable");
74        #[cfg(feature = "postgres")]
75        result.push("Postgres");
76        #[cfg(feature = "redis")]
77        result.push("Redis");
78        result
79    }
80
81    pub fn from_dsn(dsn: &Option<String>) -> Self {
82        debug!("Supported data types: {:?}", StorageType::available());
83        debug!("Checking DSN: {:?}", &dsn);
84        if dsn.is_none() {
85            let default = Self::available()[0];
86            info!("No DSN specified, failing over to old default dsn: {default}");
87            return Self::from(default);
88        }
89        let dsn = dsn.clone().unwrap_or_default();
90        #[cfg(feature = "bigtable")]
91        if dsn.starts_with("grpc") {
92            trace!("Found grpc");
93            // Credentials can be stored in either a path provided in an environment
94            // variable, or $HOME/.config/gcloud/applicaion_default_credentals.json
95            //
96            // NOTE: if no credentials are found, application will panic
97            //
98            if let Ok(cred) = std::env::var("GOOGLE_APPLICATION_CREDENTIALS") {
99                trace!("Env: {:?}", cred);
100            }
101            return Self::BigTable;
102        }
103        #[cfg(feature = "postgres")]
104        if dsn.starts_with("postgres") {
105            trace!("Found postgres");
106            return Self::Postgres;
107        }
108        #[cfg(feature = "redis")]
109        if dsn.starts_with("redis") {
110            trace!("Found redis");
111            return Self::Redis;
112        }
113        Self::INVALID
114    }
115}
116
117/// The universal settings for the database
118/// abstractor.
119#[derive(Clone, Debug, Default, Deserialize)]
120pub struct DbSettings {
121    /// Database connector string
122    pub dsn: Option<String>,
123    /// A JSON formatted dictionary containing Database settings that
124    /// are specific to the type of Data storage specified in the `dsn`
125    /// See the respective settings structure for
126    /// [crate::db::bigtable::BigTableDbSettings]
127    pub db_settings: String,
128}
129//TODO: add `From<autopush::settings::Settings> for DbSettings`?
130//TODO: add `From<autoendpoint::settings::Settings> for DbSettings`?
131
132/// Custom Uuid serializer
133///
134/// Serializes a Uuid as a simple string instead of hyphenated
135pub fn uuid_serializer<S>(x: &Uuid, s: S) -> StdResult<S::Ok, S::Error>
136where
137    S: Serializer,
138{
139    s.serialize_str(&x.simple().to_string())
140}
141
142#[derive(Clone, Default, Debug)]
143pub struct CheckStorageResponse {
144    /// The messages include a "topic"
145    /// "topics" are messages that replace prior messages of that topic.
146    /// (e.g. you can only have one message for a topic of "foo")
147    pub include_topic: bool,
148    /// The list of pending messages.
149    pub messages: Vec<Notification>,
150    /// All the messages up to this timestamp
151    pub timestamp: Option<u64>,
152}
153
154/// A user data record.
155#[derive(Deserialize, PartialEq, Debug, Clone, Serialize, Builder)]
156#[builder(default, setter(strip_option))]
157pub struct User {
158    /// The UAID. This is generally a UUID4. It needs to be globally
159    /// unique.
160    #[serde(serialize_with = "uuid_serializer")]
161    pub uaid: Uuid,
162    /// Time in milliseconds that the user last connected at
163    pub connected_at: u64,
164    /// Router type of the user
165    pub router_type: String,
166    /// Router-specific data
167    pub router_data: Option<HashMap<String, serde_json::Value>>,
168    /// Last node/port the client was or may be connected to
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub node_id: Option<String>,
171    /// Record version
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub record_version: Option<u64>,
174    /// the timestamp of the last notification sent to the user
175    /// This field is exclusive to the Bigtable data scheme
176    //TODO: rename this to `last_notification_timestamp`
177    #[serde(skip_serializing_if = "Option::is_none")]
178    pub current_timestamp: Option<u64>,
179    /// UUID4 version number for optimistic locking of updates on Bigtable
180    #[serde(skip_serializing)]
181    pub version: Option<Uuid>,
182    /// Set of user's channel ids. These are stored in router (user) record's
183    /// row in Bigtable. They are read along with the rest of the user record
184    /// so that them, along with every other field in the router record, will
185    /// automatically have their TTL (cell timestamp) reset during
186    /// [DbClient::update_user].
187    ///
188    /// This is solely used for the sake of that update thus private.
189    /// [DbClient::get_channels] is preferred for reading the latest version of
190    /// the channel ids (partly due to historical purposes but also is a more
191    /// flexible API that might benefit different, non Bigtable [DbClient]
192    /// backends that don't necessarily store the channel ids in the router
193    /// record).
194    priv_channels: HashSet<Uuid>,
195}
196
197impl Default for User {
198    fn default() -> Self {
199        let uaid = Uuid::new_v4();
200        //trace!(">>> Setting default uaid: {:?}", &uaid);
201        Self {
202            uaid,
203            connected_at: ms_since_epoch(),
204            router_type: "webpush".to_string(),
205            router_data: None,
206            node_id: None,
207            record_version: Some(USER_RECORD_VERSION),
208            current_timestamp: None,
209            version: Some(Uuid::new_v4()),
210            priv_channels: HashSet::new(),
211        }
212    }
213}
214
215impl User {
216    /// Return a new [UserBuilder] (generated from [derive_builder::Builder])
217    pub fn builder() -> UserBuilder {
218        UserBuilder::default()
219    }
220
221    pub fn channel_count(&self) -> usize {
222        self.priv_channels.len()
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::{User, USER_RECORD_VERSION};
229
230    #[test]
231    fn user_defaults() {
232        let user = User::builder().current_timestamp(22).build().unwrap();
233        assert_eq!(user.current_timestamp, Some(22));
234        assert_eq!(user.router_type, "webpush".to_owned());
235        assert_eq!(user.record_version, Some(USER_RECORD_VERSION));
236    }
237}