autopush_common/db/bigtable/
mod.rs1mod bigtable_client;
23mod pool;
24
25pub use bigtable_client::error::BigTableError;
26pub use bigtable_client::BigTableClientImpl;
27
28use grpcio::Metadata;
29use serde::Deserialize;
30use std::time::Duration;
31
32use crate::db::bigtable::bigtable_client::MetadataBuilder;
33use crate::db::error::DbError;
34use crate::util::deserialize_opt_u32_to_duration;
35
36fn retry_default() -> usize {
37 bigtable_client::RETRY_COUNT
38}
39
40#[derive(Clone, Debug, Deserialize)]
42pub struct BigTableDbSettings {
43 #[serde(default)]
50 pub table_name: String,
51 #[serde(default)]
54 pub app_profile_id: String,
55 #[serde(default)]
56 pub router_family: String,
57 #[serde(default)]
58 pub message_family: String,
59 #[serde(default)]
60 pub message_topic_family: String,
61 #[serde(default)]
62 pub database_pool_max_size: Option<u32>,
63 #[serde(default)]
65 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
66 pub database_pool_create_timeout: Option<Duration>,
67 #[serde(default)]
69 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
70 pub database_pool_wait_timeout: Option<Duration>,
71 #[serde(default)]
73 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
74 pub database_pool_recycle_timeout: Option<Duration>,
75 #[serde(default)]
77 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
78 pub database_pool_connection_ttl: Option<Duration>,
79 #[serde(default)]
81 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
82 pub database_pool_max_idle: Option<Duration>,
83 #[serde(default)]
85 pub route_to_leader: bool,
86 #[serde(default = "retry_default")]
88 pub retry_count: usize,
89 #[serde(default)]
91 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
92 pub max_router_ttl: Option<Duration>,
93}
94
95#[allow(clippy::derivable_impls)]
97#[cfg(test)]
98impl Default for BigTableDbSettings {
99 fn default() -> Self {
100 use crate::MAX_ROUTER_TTL_SECS;
101
102 Self {
103 table_name: Default::default(),
104 router_family: Default::default(),
105 message_family: Default::default(),
106 message_topic_family: Default::default(),
107 database_pool_max_size: Default::default(),
108 database_pool_create_timeout: Default::default(),
109 database_pool_wait_timeout: Default::default(),
110 database_pool_recycle_timeout: Default::default(),
111 database_pool_connection_ttl: Default::default(),
112 database_pool_max_idle: Default::default(),
113 route_to_leader: Default::default(),
114 retry_count: Default::default(),
115 app_profile_id: Default::default(),
116 max_router_ttl: Some(Duration::from_secs(MAX_ROUTER_TTL_SECS)),
117 }
118 }
119}
120
121impl BigTableDbSettings {
122 pub fn metadata(&self) -> Result<Metadata, BigTableError> {
123 MetadataBuilder::with_prefix(&self.table_name)
124 .routing_param("table_name", &self.table_name)
125 .route_to_leader(self.route_to_leader)
126 .build()
127 .map_err(BigTableError::GRPC)
128 }
129
130 pub fn health_metadata(&self) -> Result<Metadata, BigTableError> {
132 self.metadata()
133 }
134
135 pub fn admin_metadata(&self) -> Result<Metadata, BigTableError> {
136 let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else {
139 return Err(BigTableError::Config(
140 "Invalid table name specified".to_owned(),
141 ));
142 };
143 MetadataBuilder::with_prefix(admin_prefix)
144 .routing_param("name", &self.table_name)
145 .route_to_leader(self.route_to_leader)
146 .build()
147 .map_err(BigTableError::GRPC)
148 }
149
150 pub fn get_instance_name(&self) -> Result<String, BigTableError> {
151 let parts: Vec<&str> = self.table_name.split('/').collect();
152 if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
153 return Err(BigTableError::Config(
154 "Invalid table name specified. Cannot parse instance".to_owned(),
155 ));
156 }
157 Ok(parts[0..4].join("/"))
158 }
159}
160
161impl TryFrom<&str> for BigTableDbSettings {
162 type Error = DbError;
163 fn try_from(setting_string: &str) -> Result<Self, Self::Error> {
164 let mut me: Self = serde_json::from_str(setting_string)
165 .map_err(|e| DbError::General(format!("Could not parse DdbSettings: {e:?}")))?;
166
167 if me.table_name.starts_with('/') {
168 return Err(DbError::ConnectionError(
169 "Table name path begins with a '/'".to_owned(),
170 ));
171 };
172
173 if me.app_profile_id.is_empty() {
177 "default".clone_into(&mut me.app_profile_id);
178 }
179
180 Ok(me)
181 }
182}
183
184mod tests {
185
186 #[test]
187 fn test_settings_parse() -> Result<(), crate::db::error::DbError> {
188 let settings =
189 super::BigTableDbSettings::try_from("{\"database_pool_create_timeout\": 123}")?;
190 assert_eq!(
191 settings.database_pool_create_timeout,
192 Some(std::time::Duration::from_secs(123))
193 );
194 Ok(())
195 }
196 #[test]
197 fn test_get_instance() -> Result<(), super::BigTableError> {
198 let settings = super::BigTableDbSettings {
199 table_name: "projects/foo/instances/bar/tables/gorp".to_owned(),
200 ..Default::default()
201 };
202 let res = settings.get_instance_name()?;
203 assert_eq!(res.as_str(), "projects/foo/instances/bar");
204
205 let settings = super::BigTableDbSettings {
206 table_name: "projects/foo/".to_owned(),
207 ..Default::default()
208 };
209 assert!(settings.get_instance_name().is_err());
210
211 let settings = super::BigTableDbSettings {
212 table_name: "protect/foo/instances/bar/tables/gorp".to_owned(),
213 ..Default::default()
214 };
215 assert!(settings.get_instance_name().is_err());
216
217 let settings = super::BigTableDbSettings {
218 table_name: "project/foo/instance/bar/tables/gorp".to_owned(),
219 ..Default::default()
220 };
221 assert!(settings.get_instance_name().is_err());
222
223 Ok(())
224 }
225}