autopush_common/db/bigtable/
mod.rs1mod bigtable_client;
23mod pool;
24
25pub use bigtable_client::error::BigTableError;
26pub use bigtable_client::BigTableClientImpl;
27
28use grpcio::Metadata;
29use serde::Deserialize;
30use std::time::Duration;
31
32use crate::db::bigtable::bigtable_client::MetadataBuilder;
33use crate::db::error::DbError;
34use crate::util::deserialize_opt_u32_to_duration;
35
36fn retry_default() -> usize {
37 bigtable_client::RETRY_COUNT
38}
39
40#[derive(Clone, Debug, Deserialize)]
42pub struct BigTableDbSettings {
43 #[serde(default)]
50 pub table_name: String,
51 #[serde(default)]
54 pub app_profile_id: String,
55 #[serde(default)]
56 pub router_family: String,
57 #[serde(default)]
58 pub message_family: String,
59 #[serde(default)]
60 pub message_topic_family: String,
61 #[serde(default)]
62 pub database_pool_max_size: Option<u32>,
63 #[serde(default)]
65 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
66 pub database_pool_create_timeout: Option<Duration>,
67 #[serde(default)]
69 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
70 pub database_pool_wait_timeout: Option<Duration>,
71 #[serde(default)]
73 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
74 pub database_pool_recycle_timeout: Option<Duration>,
75 #[serde(default)]
77 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
78 pub database_pool_connection_ttl: Option<Duration>,
79 #[serde(default)]
81 #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
82 pub database_pool_max_idle: Option<Duration>,
83 #[serde(default)]
85 pub route_to_leader: bool,
86 #[serde(default = "retry_default")]
88 pub retry_count: usize,
89}
90
91#[allow(clippy::derivable_impls)]
93#[cfg(test)]
94impl Default for BigTableDbSettings {
95 fn default() -> Self {
96 Self {
97 table_name: Default::default(),
98 router_family: Default::default(),
99 message_family: Default::default(),
100 message_topic_family: Default::default(),
101 database_pool_max_size: Default::default(),
102 database_pool_create_timeout: Default::default(),
103 database_pool_wait_timeout: Default::default(),
104 database_pool_recycle_timeout: Default::default(),
105 database_pool_connection_ttl: Default::default(),
106 database_pool_max_idle: Default::default(),
107 route_to_leader: Default::default(),
108 retry_count: Default::default(),
109 app_profile_id: Default::default(),
110 }
111 }
112}
113
114impl BigTableDbSettings {
115 pub fn metadata(&self) -> Result<Metadata, BigTableError> {
116 MetadataBuilder::with_prefix(&self.table_name)
117 .routing_param("table_name", &self.table_name)
118 .route_to_leader(self.route_to_leader)
119 .build()
120 .map_err(BigTableError::GRPC)
121 }
122
123 pub fn health_metadata(&self) -> Result<Metadata, BigTableError> {
125 self.metadata()
126 }
127
128 pub fn admin_metadata(&self) -> Result<Metadata, BigTableError> {
129 let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else {
132 return Err(BigTableError::Config(
133 "Invalid table name specified".to_owned(),
134 ));
135 };
136 MetadataBuilder::with_prefix(admin_prefix)
137 .routing_param("name", &self.table_name)
138 .route_to_leader(self.route_to_leader)
139 .build()
140 .map_err(BigTableError::GRPC)
141 }
142
143 pub fn get_instance_name(&self) -> Result<String, BigTableError> {
144 let parts: Vec<&str> = self.table_name.split('/').collect();
145 if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
146 return Err(BigTableError::Config(
147 "Invalid table name specified. Cannot parse instance".to_owned(),
148 ));
149 }
150 Ok(parts[0..4].join("/"))
151 }
152}
153
154impl TryFrom<&str> for BigTableDbSettings {
155 type Error = DbError;
156 fn try_from(setting_string: &str) -> Result<Self, Self::Error> {
157 let mut me: Self = serde_json::from_str(setting_string)
158 .map_err(|e| DbError::General(format!("Could not parse DdbSettings: {e:?}")))?;
159
160 if me.table_name.starts_with('/') {
161 return Err(DbError::ConnectionError(
162 "Table name path begins with a '/'".to_owned(),
163 ));
164 };
165
166 if me.app_profile_id.is_empty() {
170 "default".clone_into(&mut me.app_profile_id);
171 }
172
173 Ok(me)
174 }
175}
176
177mod tests {
178
179 #[test]
180 fn test_settings_parse() -> Result<(), crate::db::error::DbError> {
181 let settings =
182 super::BigTableDbSettings::try_from("{\"database_pool_create_timeout\": 123}")?;
183 assert_eq!(
184 settings.database_pool_create_timeout,
185 Some(std::time::Duration::from_secs(123))
186 );
187 Ok(())
188 }
189 #[test]
190 fn test_get_instance() -> Result<(), super::BigTableError> {
191 let settings = super::BigTableDbSettings {
192 table_name: "projects/foo/instances/bar/tables/gorp".to_owned(),
193 ..Default::default()
194 };
195 let res = settings.get_instance_name()?;
196 assert_eq!(res.as_str(), "projects/foo/instances/bar");
197
198 let settings = super::BigTableDbSettings {
199 table_name: "projects/foo/".to_owned(),
200 ..Default::default()
201 };
202 assert!(settings.get_instance_name().is_err());
203
204 let settings = super::BigTableDbSettings {
205 table_name: "protect/foo/instances/bar/tables/gorp".to_owned(),
206 ..Default::default()
207 };
208 assert!(settings.get_instance_name().is_err());
209
210 let settings = super::BigTableDbSettings {
211 table_name: "project/foo/instance/bar/tables/gorp".to_owned(),
212 ..Default::default()
213 };
214 assert!(settings.get_instance_name().is_err());
215
216 Ok(())
217 }
218}