autopush_common/db/bigtable/
mod.rs

1/// This uses Google Cloud Platform (GCP) Bigtable as a storage and management
2/// system for Autopush Notifications and Routing information.
3///
4/// Bigtable has a single index key, and uses "cell family" designators to
5/// perform garbage collection.
6///
7/// Keys for the data are
8/// `{uaid}` - the meta data record around a given UAID record
9/// `{uaid}#{channelid}` - the meta record for a channel associated with a
10///     UAID
11/// `{uaid}#{channelid}#{sortkey_timestamp}` - a message record for a UAID
12///     and channel
13///
14/// Bigtable will automatically sort by the primary key. This schema uses
15/// regular expression lookups in order to do things like return the channels
16/// associated with a given UAID, fetch the appropriate topic messages, and
17/// other common functions. Please refer to the Bigtable documentation
18/// for how to create these keys, since they must be inclusive. Partial
19/// key matches will not return data. (e.g `/foo/` will not match `foobar`,
20/// but `/foo.*/` will)
21///
22mod bigtable_client;
23mod pool;
24
25pub use bigtable_client::error::BigTableError;
26pub use bigtable_client::BigTableClientImpl;
27
28use grpcio::Metadata;
29use serde::Deserialize;
30use std::time::Duration;
31
32use crate::db::bigtable::bigtable_client::MetadataBuilder;
33use crate::db::error::DbError;
34use crate::util::deserialize_opt_u32_to_duration;
35
36fn retry_default() -> usize {
37    bigtable_client::RETRY_COUNT
38}
39
40/// The settings for accessing the BigTable contents.
41#[derive(Clone, Debug, Deserialize)]
42pub struct BigTableDbSettings {
43    /// The Table name matches the GRPC template for table paths.
44    /// e.g. `projects/{projectid}/instances/{instanceid}/tables/{tablename}`
45    /// *NOTE* There is no leading `/`
46    /// By default, this (may?) use the `*` variant which translates to
47    /// `projects/*/instances/*/tables/*` which searches all data stored in
48    /// bigtable.
49    #[serde(default)]
50    pub table_name: String,
51    /// Routing replication profile id.
52    /// Should be used everywhere we set `table_name` when creating requests
53    #[serde(default)]
54    pub app_profile_id: String,
55    #[serde(default)]
56    pub router_family: String,
57    #[serde(default)]
58    pub message_family: String,
59    #[serde(default)]
60    pub message_topic_family: String,
61    #[serde(default)]
62    pub database_pool_max_size: Option<u32>,
63    /// Max time (in seconds) to wait to create a new connection to bigtable
64    #[serde(default)]
65    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
66    pub database_pool_create_timeout: Option<Duration>,
67    /// Max time (in seconds) to wait for a socket to become available
68    #[serde(default)]
69    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
70    pub database_pool_wait_timeout: Option<Duration>,
71    /// Max time(in seconds) to recycle a connection
72    #[serde(default)]
73    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
74    pub database_pool_recycle_timeout: Option<Duration>,
75    /// Max time (in seconds) a connection should live
76    #[serde(default)]
77    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
78    pub database_pool_connection_ttl: Option<Duration>,
79    /// Max idle time(in seconds) for a connection
80    #[serde(default)]
81    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
82    pub database_pool_max_idle: Option<Duration>,
83    /// Include route to leader header in metadata
84    #[serde(default)]
85    pub route_to_leader: bool,
86    /// Number of times to retry a GRPC function
87    #[serde(default = "retry_default")]
88    pub retry_count: usize,
89}
90
91// Used by test, but we don't want available for release.
92#[allow(clippy::derivable_impls)]
93#[cfg(test)]
94impl Default for BigTableDbSettings {
95    fn default() -> Self {
96        Self {
97            table_name: Default::default(),
98            router_family: Default::default(),
99            message_family: Default::default(),
100            message_topic_family: Default::default(),
101            database_pool_max_size: Default::default(),
102            database_pool_create_timeout: Default::default(),
103            database_pool_wait_timeout: Default::default(),
104            database_pool_recycle_timeout: Default::default(),
105            database_pool_connection_ttl: Default::default(),
106            database_pool_max_idle: Default::default(),
107            route_to_leader: Default::default(),
108            retry_count: Default::default(),
109            app_profile_id: Default::default(),
110        }
111    }
112}
113
114impl BigTableDbSettings {
115    pub fn metadata(&self) -> Result<Metadata, BigTableError> {
116        MetadataBuilder::with_prefix(&self.table_name)
117            .routing_param("table_name", &self.table_name)
118            .route_to_leader(self.route_to_leader)
119            .build()
120            .map_err(BigTableError::GRPC)
121    }
122
123    // Health may require a different metadata declaration.
124    pub fn health_metadata(&self) -> Result<Metadata, BigTableError> {
125        self.metadata()
126    }
127
128    pub fn admin_metadata(&self) -> Result<Metadata, BigTableError> {
129        // Admin calls use a slightly different routing param and a truncated prefix
130        // See https://github.com/googleapis/google-cloud-cpp/issues/190#issuecomment-370520185
131        let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else {
132            return Err(BigTableError::Config(
133                "Invalid table name specified".to_owned(),
134            ));
135        };
136        MetadataBuilder::with_prefix(admin_prefix)
137            .routing_param("name", &self.table_name)
138            .route_to_leader(self.route_to_leader)
139            .build()
140            .map_err(BigTableError::GRPC)
141    }
142
143    pub fn get_instance_name(&self) -> Result<String, BigTableError> {
144        let parts: Vec<&str> = self.table_name.split('/').collect();
145        if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
146            return Err(BigTableError::Config(
147                "Invalid table name specified. Cannot parse instance".to_owned(),
148            ));
149        }
150        Ok(parts[0..4].join("/"))
151    }
152}
153
154impl TryFrom<&str> for BigTableDbSettings {
155    type Error = DbError;
156    fn try_from(setting_string: &str) -> Result<Self, Self::Error> {
157        let mut me: Self = serde_json::from_str(setting_string)
158            .map_err(|e| DbError::General(format!("Could not parse DdbSettings: {e:?}")))?;
159
160        if me.table_name.starts_with('/') {
161            return Err(DbError::ConnectionError(
162                "Table name path begins with a '/'".to_owned(),
163            ));
164        };
165
166        // specify the default string "default" if it's not specified.
167        // There's a small chance that this could be reported as "unspecified", so this
168        // removes that confusion.
169        if me.app_profile_id.is_empty() {
170            "default".clone_into(&mut me.app_profile_id);
171        }
172
173        Ok(me)
174    }
175}
176
177mod tests {
178
179    #[test]
180    fn test_settings_parse() -> Result<(), crate::db::error::DbError> {
181        let settings =
182            super::BigTableDbSettings::try_from("{\"database_pool_create_timeout\": 123}")?;
183        assert_eq!(
184            settings.database_pool_create_timeout,
185            Some(std::time::Duration::from_secs(123))
186        );
187        Ok(())
188    }
189    #[test]
190    fn test_get_instance() -> Result<(), super::BigTableError> {
191        let settings = super::BigTableDbSettings {
192            table_name: "projects/foo/instances/bar/tables/gorp".to_owned(),
193            ..Default::default()
194        };
195        let res = settings.get_instance_name()?;
196        assert_eq!(res.as_str(), "projects/foo/instances/bar");
197
198        let settings = super::BigTableDbSettings {
199            table_name: "projects/foo/".to_owned(),
200            ..Default::default()
201        };
202        assert!(settings.get_instance_name().is_err());
203
204        let settings = super::BigTableDbSettings {
205            table_name: "protect/foo/instances/bar/tables/gorp".to_owned(),
206            ..Default::default()
207        };
208        assert!(settings.get_instance_name().is_err());
209
210        let settings = super::BigTableDbSettings {
211            table_name: "project/foo/instance/bar/tables/gorp".to_owned(),
212            ..Default::default()
213        };
214        assert!(settings.get_instance_name().is_err());
215
216        Ok(())
217    }
218}