autopush_common/db/bigtable/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
/// This uses Google Cloud Platform (GCP) Bigtable as a storage and management
/// system for Autopush Notifications and Routing information.
///
/// Bigtable has a single index key, and uses "cell family" designators to
/// perform garbage collection.
///
/// Keys for the data are
/// `{uaid}` - the meta data record around a given UAID record
/// `{uaid}#{channelid}` - the meta record for a channel associated with a
///     UAID
/// `{uaid}#{channelid}#{sortkey_timestamp}` - a message record for a UAID
///     and channel
///
/// Bigtable will automatically sort by the primary key. This schema uses
/// regular expression lookups in order to do things like return the channels
/// associated with a given UAID, fetch the appropriate topic messages, and
/// other common functions. Please refer to the Bigtable documentation
/// for how to create these keys, since they must be inclusive. Partial
/// key matches will not return data. (e.g `/foo/` will not match `foobar`,
/// but `/foo.*/` will)
///
mod bigtable_client;
mod pool;

pub use bigtable_client::error::BigTableError;
pub use bigtable_client::BigTableClientImpl;

use grpcio::Metadata;
use serde::Deserialize;
use std::time::Duration;

use crate::db::bigtable::bigtable_client::MetadataBuilder;
use crate::db::error::DbError;
use crate::util::deserialize_opt_u32_to_duration;

fn retry_default() -> usize {
    bigtable_client::RETRY_COUNT
}

/// The settings for accessing the BigTable contents.
#[derive(Clone, Debug, Deserialize)]
pub struct BigTableDbSettings {
    /// The Table name matches the GRPC template for table paths.
    /// e.g. `projects/{projectid}/instances/{instanceid}/tables/{tablename}`
    /// *NOTE* There is no leading `/`
    /// By default, this (may?) use the `*` variant which translates to
    /// `projects/*/instances/*/tables/*` which searches all data stored in
    /// bigtable.
    #[serde(default)]
    pub table_name: String,
    /// Routing replication profile id.
    /// Should be used everywhere we set `table_name` when creating requests
    #[serde(default)]
    pub app_profile_id: String,
    #[serde(default)]
    pub router_family: String,
    #[serde(default)]
    pub message_family: String,
    #[serde(default)]
    pub message_topic_family: String,
    #[serde(default)]
    pub database_pool_max_size: Option<u32>,
    /// Max time (in seconds) to wait to create a new connection to bigtable
    #[serde(default)]
    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
    pub database_pool_create_timeout: Option<Duration>,
    /// Max time (in seconds) to wait for a socket to become available
    #[serde(default)]
    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
    pub database_pool_wait_timeout: Option<Duration>,
    /// Max time(in seconds) to recycle a connection
    #[serde(default)]
    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
    pub database_pool_recycle_timeout: Option<Duration>,
    /// Max time (in seconds) a connection should live
    #[serde(default)]
    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
    pub database_pool_connection_ttl: Option<Duration>,
    /// Max idle time(in seconds) for a connection
    #[serde(default)]
    #[serde(deserialize_with = "deserialize_opt_u32_to_duration")]
    pub database_pool_max_idle: Option<Duration>,
    /// Include route to leader header in metadata
    #[serde(default)]
    pub route_to_leader: bool,
    /// Number of times to retry a GRPC function
    #[serde(default = "retry_default")]
    pub retry_count: usize,
}

// Used by test, but we don't want available for release.
#[allow(clippy::derivable_impls)]
#[cfg(test)]
impl Default for BigTableDbSettings {
    fn default() -> Self {
        Self {
            table_name: Default::default(),
            router_family: Default::default(),
            message_family: Default::default(),
            message_topic_family: Default::default(),
            database_pool_max_size: Default::default(),
            database_pool_create_timeout: Default::default(),
            database_pool_wait_timeout: Default::default(),
            database_pool_recycle_timeout: Default::default(),
            database_pool_connection_ttl: Default::default(),
            database_pool_max_idle: Default::default(),
            route_to_leader: Default::default(),
            retry_count: Default::default(),
            app_profile_id: Default::default(),
        }
    }
}

impl BigTableDbSettings {
    pub fn metadata(&self) -> Result<Metadata, BigTableError> {
        MetadataBuilder::with_prefix(&self.table_name)
            .routing_param("table_name", &self.table_name)
            .route_to_leader(self.route_to_leader)
            .build()
            .map_err(BigTableError::GRPC)
    }

    // Health may require a different metadata declaration.
    pub fn health_metadata(&self) -> Result<Metadata, BigTableError> {
        self.metadata()
    }

    pub fn admin_metadata(&self) -> Result<Metadata, BigTableError> {
        // Admin calls use a slightly different routing param and a truncated prefix
        // See https://github.com/googleapis/google-cloud-cpp/issues/190#issuecomment-370520185
        let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else {
            return Err(BigTableError::Config(
                "Invalid table name specified".to_owned(),
            ));
        };
        MetadataBuilder::with_prefix(admin_prefix)
            .routing_param("name", &self.table_name)
            .route_to_leader(self.route_to_leader)
            .build()
            .map_err(BigTableError::GRPC)
    }

    pub fn get_instance_name(&self) -> Result<String, BigTableError> {
        let parts: Vec<&str> = self.table_name.split('/').collect();
        if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
            return Err(BigTableError::Config(
                "Invalid table name specified. Cannot parse instance".to_owned(),
            ));
        }
        Ok(parts[0..4].join("/"))
    }
}

impl TryFrom<&str> for BigTableDbSettings {
    type Error = DbError;
    fn try_from(setting_string: &str) -> Result<Self, Self::Error> {
        let mut me: Self = serde_json::from_str(setting_string)
            .map_err(|e| DbError::General(format!("Could not parse DdbSettings: {:?}", e)))?;

        if me.table_name.starts_with('/') {
            return Err(DbError::ConnectionError(
                "Table name path begins with a '/'".to_owned(),
            ));
        };

        // specify the default string "default" if it's not specified.
        // There's a small chance that this could be reported as "unspecified", so this
        // removes that confusion.
        if me.app_profile_id.is_empty() {
            "default".clone_into(&mut me.app_profile_id);
        }

        Ok(me)
    }
}

mod tests {

    #[test]
    fn test_settings_parse() -> Result<(), crate::db::error::DbError> {
        let settings =
            super::BigTableDbSettings::try_from("{\"database_pool_create_timeout\": 123}")?;
        assert_eq!(
            settings.database_pool_create_timeout,
            Some(std::time::Duration::from_secs(123))
        );
        Ok(())
    }
    #[test]
    fn test_get_instance() -> Result<(), super::BigTableError> {
        let settings = super::BigTableDbSettings {
            table_name: "projects/foo/instances/bar/tables/gorp".to_owned(),
            ..Default::default()
        };
        let res = settings.get_instance_name()?;
        assert_eq!(res.as_str(), "projects/foo/instances/bar");

        let settings = super::BigTableDbSettings {
            table_name: "projects/foo/".to_owned(),
            ..Default::default()
        };
        assert!(settings.get_instance_name().is_err());

        let settings = super::BigTableDbSettings {
            table_name: "protect/foo/instances/bar/tables/gorp".to_owned(),
            ..Default::default()
        };
        assert!(settings.get_instance_name().is_err());

        let settings = super::BigTableDbSettings {
            table_name: "project/foo/instance/bar/tables/gorp".to_owned(),
            ..Default::default()
        };
        assert!(settings.get_instance_name().is_err());

        Ok(())
    }
}