autopush_common/db/bigtable/
mod.rs

1/// This uses Google Cloud Platform (GCP) Bigtable as a storage and management
2/// system for Autopush Notifications and Routing information.
3///
4/// Bigtable has a single index key, and uses "cell family" designators to
5/// perform garbage collection.
6///
7/// Keys for the data are
8/// `{uaid}` - the meta data record around a given UAID record
9/// `{uaid}#{channelid}` - the meta record for a channel associated with a
10///     UAID
11/// `{uaid}#{channelid}#{sortkey_timestamp}` - a message record for a UAID
12///     and channel
13///
14/// Bigtable will automatically sort by the primary key. This schema uses
15/// regular expression lookups in order to do things like return the channels
16/// associated with a given UAID, fetch the appropriate topic messages, and
17/// other common functions. Please refer to the Bigtable documentation
18/// for how to create these keys, since they must be inclusive. Partial
19/// key matches will not return data. (e.g `/foo/` will not match `foobar`,
20/// but `/foo.*/` will)
21///
22mod bigtable_client;
23mod pool;
24
25pub use bigtable_client::error::BigTableError;
26pub use bigtable_client::BigTableClientImpl;
27
28use grpcio::Metadata;
29use serde::Deserialize;
30use std::time::Duration;
31
32use crate::db::bigtable::bigtable_client::MetadataBuilder;
33use crate::db::error::DbError;
34use crate::util::deserialize_opt_u32_to_duration;
35
36fn retry_default() -> usize {
37    bigtable_client::RETRY_COUNT
38}
39
40/// The settings for accessing the BigTable contents.
41#[derive(Clone, Debug, Deserialize)]
42pub struct BigTableDbSettings {
43    /// The Table name matches the GRPC template for table paths.
44    /// e.g. `projects/{projectid}/instances/{instanceid}/tables/{tablename}`
45    /// *NOTE* There is no leading `/`
46    /// By default, this (may?) use the `*` variant which translates to
47    /// `projects/*/instances/*/tables/*` which searches all data stored in
48    /// bigtable.
49    #[serde(default)]
50    pub table_name: String,
51    /// Routing replication profile id.
52    /// Should be used everywhere we set `table_name` when creating requests
53    #[serde(default)]
54    pub app_profile_id: String,
55    #[serde(default)]
56    pub router_family: String,
57    #[serde(default)]
58    pub message_family: String,
59    #[serde(default)]
60    pub message_topic_family: String,
61    #[serde(default)]
62    pub database_pool_max_size: Option<u32>,
63    /// Max time (in seconds) to wait to create a new connection to bigtable
64    #[serde(default)]
65    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
66    pub database_pool_create_timeout: Option<Duration>,
67    /// Max time (in seconds) to wait for a socket to become available
68    #[serde(default)]
69    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
70    pub database_pool_wait_timeout: Option<Duration>,
71    /// Max time(in seconds) to recycle a connection
72    #[serde(default)]
73    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
74    pub database_pool_recycle_timeout: Option<Duration>,
75    /// Max time (in seconds) a connection should live
76    #[serde(default)]
77    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
78    pub database_pool_connection_ttl: Option<Duration>,
79    /// Max idle time(in seconds) for a connection
80    #[serde(default)]
81    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
82    pub database_pool_max_idle: Option<Duration>,
83    /// Include route to leader header in metadata
84    #[serde(default)]
85    pub route_to_leader: bool,
86    /// Number of times to retry a GRPC function
87    #[serde(default = "retry_default")]
88    pub retry_count: usize,
89    /// Max lifetime (in seconds) for a router entry
90    #[serde(default)]
91    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
92    pub max_router_ttl: Option<Duration>,
93}
94
95// Used by test, but we don't want available for release.
96#[allow(clippy::derivable_impls)]
97#[cfg(test)]
98impl Default for BigTableDbSettings {
99    fn default() -> Self {
100        use crate::MAX_ROUTER_TTL_SECS;
101
102        Self {
103            table_name: Default::default(),
104            router_family: Default::default(),
105            message_family: Default::default(),
106            message_topic_family: Default::default(),
107            database_pool_max_size: Default::default(),
108            database_pool_create_timeout: Default::default(),
109            database_pool_wait_timeout: Default::default(),
110            database_pool_recycle_timeout: Default::default(),
111            database_pool_connection_ttl: Default::default(),
112            database_pool_max_idle: Default::default(),
113            route_to_leader: Default::default(),
114            retry_count: Default::default(),
115            app_profile_id: Default::default(),
116            max_router_ttl: Some(Duration::from_secs(MAX_ROUTER_TTL_SECS)),
117        }
118    }
119}
120
121impl BigTableDbSettings {
122    pub fn metadata(&self) -> Result<Metadata, BigTableError> {
123        MetadataBuilder::with_prefix(&self.table_name)
124            .routing_param("table_name", &self.table_name)
125            .route_to_leader(self.route_to_leader)
126            .build()
127            .map_err(BigTableError::GRPC)
128    }
129
130    // Health may require a different metadata declaration.
131    pub fn health_metadata(&self) -> Result<Metadata, BigTableError> {
132        self.metadata()
133    }
134
135    pub fn admin_metadata(&self) -> Result<Metadata, BigTableError> {
136        // Admin calls use a slightly different routing param and a truncated prefix
137        // See https://github.com/googleapis/google-cloud-cpp/issues/190#issuecomment-370520185
138        let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else {
139            return Err(BigTableError::Config(
140                "Invalid table name specified".to_owned(),
141            ));
142        };
143        MetadataBuilder::with_prefix(admin_prefix)
144            .routing_param("name", &self.table_name)
145            .route_to_leader(self.route_to_leader)
146            .build()
147            .map_err(BigTableError::GRPC)
148    }
149
150    pub fn get_instance_name(&self) -> Result<String, BigTableError> {
151        let parts: Vec<&str> = self.table_name.split('/').collect();
152        if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
153            return Err(BigTableError::Config(
154                "Invalid table name specified. Cannot parse instance".to_owned(),
155            ));
156        }
157        Ok(parts[0..4].join("/"))
158    }
159}
160
161impl TryFrom<&str> for BigTableDbSettings {
162    type Error = DbError;
163    fn try_from(setting_string: &str) -> Result<Self, Self::Error> {
164        let mut me: Self = serde_json::from_str(setting_string)
165            .map_err(|e| DbError::General(format!("Could not parse DdbSettings: {e:?}")))?;
166
167        if me.table_name.starts_with('/') {
168            return Err(DbError::ConnectionError(
169                "Table name path begins with a '/'".to_owned(),
170            ));
171        };
172
173        // specify the default string "default" if it's not specified.
174        // There's a small chance that this could be reported as "unspecified", so this
175        // removes that confusion.
176        if me.app_profile_id.is_empty() {
177            "default".clone_into(&mut me.app_profile_id);
178        }
179
180        Ok(me)
181    }
182}
183
184mod tests {
185
186    #[test]
187    fn test_settings_parse() -> Result<(), crate::db::error::DbError> {
188        let settings =
189            super::BigTableDbSettings::try_from("{\"database_pool_create_timeout\": 123}")?;
190        assert_eq!(
191            settings.database_pool_create_timeout,
192            Some(std::time::Duration::from_secs(123))
193        );
194        Ok(())
195    }
196    #[test]
197    fn test_get_instance() -> Result<(), super::BigTableError> {
198        let settings = super::BigTableDbSettings {
199            table_name: "projects/foo/instances/bar/tables/gorp".to_owned(),
200            ..Default::default()
201        };
202        let res = settings.get_instance_name()?;
203        assert_eq!(res.as_str(), "projects/foo/instances/bar");
204
205        let settings = super::BigTableDbSettings {
206            table_name: "projects/foo/".to_owned(),
207            ..Default::default()
208        };
209        assert!(settings.get_instance_name().is_err());
210
211        let settings = super::BigTableDbSettings {
212            table_name: "protect/foo/instances/bar/tables/gorp".to_owned(),
213            ..Default::default()
214        };
215        assert!(settings.get_instance_name().is_err());
216
217        let settings = super::BigTableDbSettings {
218            table_name: "project/foo/instance/bar/tables/gorp".to_owned(),
219            ..Default::default()
220        };
221        assert!(settings.get_instance_name().is_err());
222
223        Ok(())
224    }
225}