autopush_common/db/bigtable/
mod.rsmod bigtable_client;
mod pool;
pub use bigtable_client::error::BigTableError;
pub use bigtable_client::BigTableClientImpl;
use grpcio::Metadata;
use serde::Deserialize;
use std::time::Duration;
use crate::db::bigtable::bigtable_client::MetadataBuilder;
use crate::db::error::DbError;
use crate::util::deserialize_opt_u32_to_duration;
fn retry_default() -> usize {
bigtable_client::RETRY_COUNT
}
#[derive(Clone, Debug, Deserialize)]
pub struct BigTableDbSettings {
#[serde(default)]
pub table_name: String,
#[serde(default)]
pub app_profile_id: String,
#[serde(default)]
pub router_family: String,
#[serde(default)]
pub message_family: String,
#[serde(default)]
pub message_topic_family: String,
#[serde(default)]
pub database_pool_max_size: Option<u32>,
#[serde(default)]
#[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
pub database_pool_create_timeout: Option<Duration>,
#[serde(default)]
#[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
pub database_pool_wait_timeout: Option<Duration>,
#[serde(default)]
#[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
pub database_pool_recycle_timeout: Option<Duration>,
#[serde(default)]
#[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
pub database_pool_connection_ttl: Option<Duration>,
#[serde(default)]
#[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
pub database_pool_max_idle: Option<Duration>,
#[serde(default)]
pub route_to_leader: bool,
#[serde(default = "retry_default")]
pub retry_count: usize,
}
#[allow(clippy::derivable_impls)]
#[cfg(test)]
impl Default for BigTableDbSettings {
fn default() -> Self {
Self {
table_name: Default::default(),
router_family: Default::default(),
message_family: Default::default(),
message_topic_family: Default::default(),
database_pool_max_size: Default::default(),
database_pool_create_timeout: Default::default(),
database_pool_wait_timeout: Default::default(),
database_pool_recycle_timeout: Default::default(),
database_pool_connection_ttl: Default::default(),
database_pool_max_idle: Default::default(),
route_to_leader: Default::default(),
retry_count: Default::default(),
app_profile_id: Default::default(),
}
}
}
impl BigTableDbSettings {
pub fn metadata(&self) -> Result<Metadata, BigTableError> {
MetadataBuilder::with_prefix(&self.table_name)
.routing_param("table_name", &self.table_name)
.route_to_leader(self.route_to_leader)
.build()
.map_err(BigTableError::GRPC)
}
pub fn health_metadata(&self) -> Result<Metadata, BigTableError> {
self.metadata()
}
pub fn admin_metadata(&self) -> Result<Metadata, BigTableError> {
let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else {
return Err(BigTableError::Config(
"Invalid table name specified".to_owned(),
));
};
MetadataBuilder::with_prefix(admin_prefix)
.routing_param("name", &self.table_name)
.route_to_leader(self.route_to_leader)
.build()
.map_err(BigTableError::GRPC)
}
pub fn get_instance_name(&self) -> Result<String, BigTableError> {
let parts: Vec<&str> = self.table_name.split('/').collect();
if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
return Err(BigTableError::Config(
"Invalid table name specified. Cannot parse instance".to_owned(),
));
}
Ok(parts[0..4].join("/"))
}
}
impl TryFrom<&str> for BigTableDbSettings {
type Error = DbError;
fn try_from(setting_string: &str) -> Result<Self, Self::Error> {
let mut me: Self = serde_json::from_str(setting_string)
.map_err(|e| DbError::General(format!("Could not parse DdbSettings: {:?}", e)))?;
if me.table_name.starts_with('/') {
return Err(DbError::ConnectionError(
"Table name path begins with a '/'".to_owned(),
));
};
if me.app_profile_id.is_empty() {
"default".clone_into(&mut me.app_profile_id);
}
Ok(me)
}
}
mod tests {
#[test]
fn test_settings_parse() -> Result<(), crate::db::error::DbError> {
let settings =
super::BigTableDbSettings::try_from("{\"database_pool_create_timeout\": 123}")?;
assert_eq!(
settings.database_pool_create_timeout,
Some(std::time::Duration::from_secs(123))
);
Ok(())
}
#[test]
fn test_get_instance() -> Result<(), super::BigTableError> {
let settings = super::BigTableDbSettings {
table_name: "projects/foo/instances/bar/tables/gorp".to_owned(),
..Default::default()
};
let res = settings.get_instance_name()?;
assert_eq!(res.as_str(), "projects/foo/instances/bar");
let settings = super::BigTableDbSettings {
table_name: "projects/foo/".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());
let settings = super::BigTableDbSettings {
table_name: "protect/foo/instances/bar/tables/gorp".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());
let settings = super::BigTableDbSettings {
table_name: "project/foo/instance/bar/tables/gorp".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());
Ok(())
}
}