autopush_common/db/bigtable/bigtable_client/
mod.rs

1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use again::RetryPolicy;
11use async_trait::async_trait;
12use cadence::StatsdClient;
13#[cfg(feature = "reliable_report")]
14use chrono::TimeDelta;
15use futures_util::StreamExt;
16use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
17use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
18use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
19use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
20use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
21use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
22use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
23use protobuf::RepeatedField;
24use serde_json::{from_str, json};
25use uuid::Uuid;
26
27use crate::db::{
28    client::{DbClient, FetchMessageResponse},
29    error::{DbError, DbResult},
30    models::RangeKey,
31    DbSettings, Notification, User, USER_RECORD_VERSION,
32};
33use crate::metric_name::MetricName;
34use crate::metrics::StatsdClientExt;
35use crate::MAX_ROUTER_TTL_SECS;
36
37pub use self::metadata::MetadataBuilder;
38use self::row::{Row, RowCells};
39use super::pool::BigTablePool;
40use super::BigTableDbSettings;
41
42pub mod cell;
43pub mod error;
44pub(crate) mod merge;
45pub mod metadata;
46pub mod row;
47
48// these are normally Vec<u8>
49pub type RowKey = String;
50
51// These are more for code clarity than functional types.
52// Rust will happily swap between the two in any case.
53// See [super::row::Row] for discussion about how these
54// are overloaded in order to simplify fetching data.
55pub type Qualifier = String;
56pub type FamilyId = String;
57
58const ROUTER_FAMILY: &str = "router";
59const MESSAGE_FAMILY: &str = "message"; // The default family for messages
60const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
61#[cfg(feature = "reliable_report")]
62const RELIABLE_LOG_FAMILY: &str = "reliability";
63#[cfg(feature = "reliable_report")]
64/// The maximum TTL for reliability logging (60 days).
65/// /// In most use cases, converted to seconds through .num_seconds().
66pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
67
68pub(crate) const RETRY_COUNT: usize = 5;
69
70/// Simple circuit breaker to prevent retry storms during BigTable outages.
71///
72/// After `failure_threshold` consecutive failures, the circuit opens and
73/// requests fail fast for `cooldown_secs` seconds before allowing a retry.
74#[derive(Debug)]
75pub struct CircuitBreaker {
76    consecutive_failures: AtomicU32,
77    opened_at_epoch_secs: AtomicU64,
78    failure_threshold: u32,
79    cooldown_secs: u64,
80}
81
82impl CircuitBreaker {
83    pub fn new(failure_threshold: u32, cooldown_secs: u64) -> Self {
84        Self {
85            consecutive_failures: AtomicU32::new(0),
86            opened_at_epoch_secs: AtomicU64::new(0),
87            failure_threshold,
88            cooldown_secs,
89        }
90    }
91
92    /// Check if the circuit is allowing requests through.
93    /// Returns true if the request should proceed, false if it should fail fast.
94    pub fn allow_request(&self) -> bool {
95        let failures = self.consecutive_failures.load(Ordering::Relaxed);
96        if failures < self.failure_threshold {
97            return true;
98        }
99        // Circuit is open — check if cooldown has elapsed
100        let opened_at = self.opened_at_epoch_secs.load(Ordering::Relaxed);
101        let now = SystemTime::now()
102            .duration_since(SystemTime::UNIX_EPOCH)
103            .unwrap_or_default()
104            .as_secs();
105        if now.saturating_sub(opened_at) >= self.cooldown_secs {
106            // Allow a single probe request (half-open state)
107            true
108        } else {
109            false
110        }
111    }
112
113    /// Record a successful operation, resetting the circuit breaker.
114    pub fn record_success(&self) {
115        self.consecutive_failures.store(0, Ordering::Relaxed);
116    }
117
118    /// Record a failed operation.
119    pub fn record_failure(&self) {
120        let prev = self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
121        if prev + 1 >= self.failure_threshold {
122            let now = SystemTime::now()
123                .duration_since(SystemTime::UNIX_EPOCH)
124                .unwrap_or_default()
125                .as_secs();
126            self.opened_at_epoch_secs.store(now, Ordering::Relaxed);
127        }
128    }
129}
130
131impl Default for CircuitBreaker {
132    fn default() -> Self {
133        // Open after 5 consecutive failures, cooldown for 30 seconds
134        Self::new(5, 30)
135    }
136}
137
138/// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently.
139// TODO:Should we create something similar for ChannelID?
140struct Uaid(Uuid);
141
142impl Display for Uaid {
143    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144        write!(f, "{}", self.0.as_simple())
145    }
146}
147
148impl From<Uaid> for String {
149    fn from(uaid: Uaid) -> String {
150        uaid.0.as_simple().to_string()
151    }
152}
153
154#[derive(Clone)]
155/// Wrapper for the BigTable connection
156pub struct BigTableClientImpl {
157    pub(crate) settings: BigTableDbSettings,
158    /// Metrics client
159    metrics: Arc<StatsdClient>,
160    /// Connection Channel (used for alternate calls)
161    pool: BigTablePool,
162    metadata: Metadata,
163    admin_metadata: Metadata,
164    /// Circuit breaker to prevent retry storms during BigTable outages
165    circuit_breaker: Arc<CircuitBreaker>,
166}
167
168/// Return a a RowFilter matching the GC policy of the router Column Family
169fn router_gc_policy_filter() -> data::RowFilter {
170    let mut latest_cell_filter = data::RowFilter::default();
171    latest_cell_filter.set_cells_per_column_limit_filter(1);
172    latest_cell_filter
173}
174
175/// Return a chain of RowFilters matching the GC policy of the message Column
176/// Families
177fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
178    let mut timestamp_filter = data::RowFilter::default();
179    let bt_now: i64 = SystemTime::now()
180        .duration_since(SystemTime::UNIX_EPOCH)
181        .map_err(error::BigTableError::WriteTime)?
182        .as_millis() as i64;
183    let mut range_filter = data::TimestampRange::default();
184    range_filter.set_start_timestamp_micros(bt_now * 1000);
185    timestamp_filter.set_timestamp_range_filter(range_filter);
186
187    Ok(vec![router_gc_policy_filter(), timestamp_filter])
188}
189
190/// Return a Column family regex RowFilter
191fn family_filter(regex: String) -> data::RowFilter {
192    let mut filter = data::RowFilter::default();
193    filter.set_family_name_regex_filter(regex);
194    filter
195}
196
197/// Escape bytes for RE values
198///
199/// Based off google-re2/perl's quotemeta function
200fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
201    let mut vec = Vec::with_capacity(bytes.len() * 2);
202    for &b in bytes {
203        if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
204            if b == b'\0' {
205                // Special handling for null: Note that this special handling
206                // is not strictly required for RE2, but this quoting is
207                // required for other regexp libraries such as PCRE.
208                // Can't use "\\0" since the next character might be a digit.
209                vec.extend("\\x00".as_bytes());
210                continue;
211            }
212            vec.push(b'\\');
213        }
214        vec.push(b);
215    }
216    vec
217}
218
219/// Return a chain of RowFilters limiting to a match of the specified
220/// `version`'s column value
221fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
222    let mut cq_filter = data::RowFilter::default();
223    cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());
224
225    let mut value_filter = data::RowFilter::default();
226    value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));
227
228    vec![
229        family_filter(format!("^{ROUTER_FAMILY}$")),
230        cq_filter,
231        value_filter,
232    ]
233}
234
235/// Return a newly generated `version` column `Cell`
236fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
237    cell::Cell {
238        qualifier: "version".to_owned(),
239        value: Uuid::new_v4().into(),
240        timestamp,
241        ..Default::default()
242    }
243}
244
245/// Return a RowFilter chain from multiple RowFilters
246fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
247    let mut chain = RowFilter_Chain::default();
248    chain.set_filters(filters.into());
249    let mut filter = RowFilter::default();
250    filter.set_chain(chain);
251    filter
252}
253
254/// Return a ReadRowsRequest against table for a given row key
255fn read_row_request(
256    table_name: &str,
257    app_profile_id: &str,
258    row_key: &str,
259) -> bigtable::ReadRowsRequest {
260    let mut req = bigtable::ReadRowsRequest::default();
261    req.set_table_name(table_name.to_owned());
262    req.set_app_profile_id(app_profile_id.to_owned());
263
264    let mut row_keys = RepeatedField::default();
265    row_keys.push(row_key.as_bytes().to_vec());
266    let mut row_set = data::RowSet::default();
267    row_set.set_row_keys(row_keys);
268    req.set_rows(row_set);
269
270    req
271}
272
273fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
274    let v: [u8; 8] = value
275        .try_into()
276        .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
277    Ok(u64::from_be_bytes(v))
278}
279
280fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
281    String::from_utf8(value).map_err(|e| {
282        debug!("🉑 cannot read string {}: {:?}", name, e);
283        DbError::DeserializeString(name.to_owned())
284    })
285}
286
287/// Parse the "set" (see [DbClient::add_channels]) of channel ids in a bigtable Row.
288///
289/// Cells should solely contain the set of channels otherwise an Error is returned.
290fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
291    let mut result = HashSet::new();
292    for cells in cells.values() {
293        let Some(cell) = cells.last() else {
294            continue;
295        };
296        let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
297            return Err(DbError::Integrity(
298                "get_channels expected: chid:<chid>".to_owned(),
299                None,
300            ));
301        };
302        result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
303    }
304    Ok(result)
305}
306
307/// Convert the [HashSet] of channel ids to cell entries for a bigtable Row
308fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
309    let channels = channels.into_owned();
310    let mut cells = Vec::with_capacity(channels.len().min(100_000));
311    for (i, channel_id) in channels.into_iter().enumerate() {
312        // There is a limit of 100,000 mutations per batch for bigtable.
313        // https://cloud.google.com/bigtable/quotas
314        // If you have 100,000 channels, you have too many.
315        if i >= 100_000 {
316            break;
317        }
318        cells.push(cell::Cell {
319            qualifier: format!("chid:{}", channel_id.as_hyphenated()),
320            timestamp: expiry,
321            ..Default::default()
322        });
323    }
324    cells
325}
326
327pub fn retry_policy(max: usize) -> RetryPolicy {
328    RetryPolicy::default()
329        .with_max_retries(max)
330        .with_jitter(true)
331}
332
333fn retryable_internal_err(status: &RpcStatus) -> bool {
334    match status.code() {
335        RpcStatusCode::UNKNOWN => status
336            .message()
337            .eq_ignore_ascii_case("error occurred when fetching oauth2 token."),
338        RpcStatusCode::INTERNAL => [
339            "rst_stream",
340            "rst stream",
341            "received unexpected eos on data frame from server",
342        ]
343        .contains(&status.message().to_lowercase().as_str()),
344        RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
345        _ => false,
346    }
347}
348
349pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
350    let mut metric = metrics
351        .incr_with_tags(MetricName::DatabaseRetry)
352        .with_tag("error", err_type)
353        .with_tag("type", "bigtable");
354    if let Some(code) = code {
355        metric = metric.with_tag("code", code);
356    }
357    metric.send();
358}
359
360pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool + '_ {
361    move |err| {
362        debug!("🉑 Checking grpcio::Error...{err}");
363        match err {
364            grpcio::Error::RpcFailure(status) => {
365                info!("GRPC Failure :{:?}", status);
366                let retry = retryable_internal_err(status);
367                if retry {
368                    metric(metrics, "RpcFailure", Some(&status.code().to_string()));
369                }
370                retry
371            }
372            grpcio::Error::BindFail(_) => {
373                metric(metrics, "BindFail", None);
374                true
375            }
376            // The parameter here is a [grpcio_sys::grpc_call_error] enum
377            // Not all of these are retryable.
378            grpcio::Error::CallFailure(grpc_call_status) => {
379                let retry = grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR;
380                if retry {
381                    metric(
382                        metrics,
383                        "CallFailure",
384                        Some(&format!("{grpc_call_status:?}")),
385                    );
386                }
387                retry
388            }
389            _ => false,
390        }
391    }
392}
393
394pub fn retryable_bt_err(
395    metrics: &Arc<StatsdClient>,
396) -> impl Fn(&error::BigTableError) -> bool + '_ {
397    move |err| {
398        debug!("🉑 Checking BigTableError...{err}");
399        match err {
400            error::BigTableError::InvalidRowResponse(e)
401            | error::BigTableError::Read(e)
402            | error::BigTableError::Write(e)
403            | error::BigTableError::GRPC(e) => retryable_grpcio_err(metrics)(e),
404            _ => false,
405        }
406    }
407}
408
409/// Determine if a router record is "incomplete" (doesn't include [User]
410/// columns):
411///
412/// They can be incomplete for a couple reasons:
413///
414/// 1) A migration code bug caused a few incomplete migrations where
415///    `add_channels` and `increment_storage` calls occurred when the migration's
416///    initial `add_user` was never completed:
417///    https://github.com/mozilla-services/autopush-rs/pull/640
418///
419/// 2) When router TTLs are eventually enabled: `add_channel` and
420///    `increment_storage` can write cells with later expiry times than the other
421///    router cells
422fn is_incomplete_router_record(cells: &RowCells) -> bool {
423    cells
424        .keys()
425        .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
426}
427
428fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
429    ::grpcio::CallOption::default().headers(metadata)
430}
431
432/// Connect to a BigTable storage model.
433///
434/// BigTable is available via the Google Console, and is a schema less storage system.
435///
436/// The `db_dsn` string should be in the form of
437/// `grpc://{BigTableEndpoint}`
438///
439/// The settings contains the `table_name` which is the GRPC path to the data.
440/// (e.g. `projects/{project_id}/instances/{instance_id}/tables/{table_id}`)
441///
442/// where:
443/// _BigTableEndpoint_ is the endpoint domain to use (the default is `bigtable.googleapis.com`) See
444/// [BigTable Endpoints](https://cloud.google.com/bigtable/docs/regional-endpoints) for more details.
445/// _project-id_ is the Google project identifier (see the Google developer console (e.g. 'autopush-dev'))
446/// _instance-id_ is the Google project instance, (see the Google developer console (e.g. 'development-1'))
447/// _table_id_ is the Table Name (e.g. 'autopush')
448///
449/// This will automatically bring in the default credentials specified by the `GOOGLE_APPLICATION_CREDENTIALS`
450/// environment variable.
451///
452/// NOTE: Some configurations may look for the default credential file (pointed to by
453/// `GOOGLE_APPLICATION_CREDENTIALS`) to be stored in
454/// `$HOME/.config/gcloud/application_default_credentials.json`)
455///
456impl BigTableClientImpl {
457    pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
458        // let env = Arc::new(EnvBuilder::new().build());
459        debug!("🏊 BT Pool new");
460        let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
461        info!("🉑 {:#?}", db_settings);
462        let pool = BigTablePool::new(settings, &metrics)?;
463
464        // create the metadata header blocks required by Google for accessing GRPC resources.
465        let metadata = db_settings.metadata()?;
466        let admin_metadata = db_settings.admin_metadata()?;
467        Ok(Self {
468            settings: db_settings,
469            metrics,
470            metadata,
471            admin_metadata,
472            pool,
473            circuit_breaker: Arc::new(CircuitBreaker::default()),
474        })
475    }
476
477    fn router_ttl(&self) -> Duration {
478        self.settings
479            .max_router_ttl
480            .unwrap_or(Duration::from_secs(MAX_ROUTER_TTL_SECS))
481    }
482
483    /// Spawn a task to periodically evict idle connections
484    pub fn spawn_sweeper(&self, interval: Duration) {
485        self.pool.spawn_sweeper(interval);
486    }
487
488    /// Return a ReadRowsRequest for a given row key
489    fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
490        read_row_request(
491            &self.settings.table_name,
492            &self.settings.app_profile_id,
493            row_key,
494        )
495    }
496
497    /// Return a MutateRowRequest for a given row key
498    fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
499        let mut req = bigtable::MutateRowRequest::default();
500        req.set_table_name(self.settings.table_name.clone());
501        req.set_app_profile_id(self.settings.app_profile_id.clone());
502        req.set_row_key(row_key.as_bytes().to_vec());
503        req
504    }
505
506    /// Return a CheckAndMutateRowRequest for a given row key
507    fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
508        let mut req = bigtable::CheckAndMutateRowRequest::default();
509        req.set_table_name(self.settings.table_name.clone());
510        req.set_app_profile_id(self.settings.app_profile_id.clone());
511        req.set_row_key(row_key.as_bytes().to_vec());
512        req
513    }
514
515    /// Read a given row from the row key.
516    async fn mutate_row(
517        &self,
518        req: bigtable::MutateRowRequest,
519    ) -> Result<(), error::BigTableError> {
520        if !self.circuit_breaker.allow_request() {
521            return Err(error::BigTableError::CircuitBreakerOpen);
522        }
523        let bigtable = self.pool.get().await?;
524        let result = retry_policy(self.settings.retry_count)
525            .retry_if(
526                || async {
527                    bigtable
528                        .conn
529                        .mutate_row_opt(&req, call_opts(self.metadata.clone()))
530                },
531                retryable_grpcio_err(&self.metrics),
532            )
533            .await;
534        match &result {
535            Ok(_) => self.circuit_breaker.record_success(),
536            Err(_) => self.circuit_breaker.record_failure(),
537        }
538        result.map_err(error::BigTableError::Write)?;
539        Ok(())
540    }
541
542    /// Perform a MutateRowsRequest
543    #[allow(unused)]
544    async fn mutate_rows(
545        &self,
546        req: bigtable::MutateRowsRequest,
547    ) -> Result<(), error::BigTableError> {
548        let bigtable = self.pool.get().await?;
549        // ClientSStreamReceiver will cancel an operation if it's dropped before it's done.
550        let resp = retry_policy(self.settings.retry_count)
551            .retry_if(
552                || async {
553                    bigtable
554                        .conn
555                        .mutate_rows_opt(&req, call_opts(self.metadata.clone()))
556                },
557                retryable_grpcio_err(&self.metrics),
558            )
559            .await
560            .map_err(error::BigTableError::Write)?;
561
562        // Scan the returned stream looking for errors.
563        // As I understand, the returned stream contains chunked MutateRowsResponse structs. Each
564        // struct contains the result of the row mutation, and contains a `status` (non-zero on error)
565        // and an optional message string (empty if none).
566        // The structure also contains an overall `status` but that does not appear to be exposed.
567        // Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html
568        let mut stream = Box::pin(resp);
569        let mut cnt = 0;
570        loop {
571            let (result, remainder) = StreamExt::into_future(stream).await;
572            if let Some(result) = result {
573                debug!("🎏 Result block: {}", cnt);
574                match result {
575                    Ok(r) => {
576                        for e in r.get_entries() {
577                            if e.has_status() {
578                                let status = e.get_status();
579                                // See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
580                                let code = error::MutateRowStatus::from(status.get_code());
581                                if !code.is_ok() {
582                                    return Err(error::BigTableError::Status(
583                                        code,
584                                        status.get_message().to_owned(),
585                                    ));
586                                }
587                                debug!("🎏 Response: {} OK", e.index);
588                            }
589                        }
590                    }
591                    Err(e) => return Err(error::BigTableError::Write(e)),
592                };
593                cnt += 1;
594            } else {
595                debug!("🎏 Done!");
596                break;
597            }
598            stream = remainder;
599        }
600
601        Ok(())
602    }
603
604    /// Read one row for the [ReadRowsRequest] (assuming only a single row was requested).
605    async fn read_row(
606        &self,
607        req: bigtable::ReadRowsRequest,
608    ) -> Result<Option<row::Row>, error::BigTableError> {
609        let mut rows = self.read_rows(req).await?;
610        Ok(rows.pop_first().map(|(_, v)| v))
611    }
612
613    /// Take a big table ReadRowsRequest (containing the keys and filters) and return a set of row data indexed by row key.
614    ///
615    ///
616    async fn read_rows(
617        &self,
618        req: ReadRowsRequest,
619    ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
620        if !self.circuit_breaker.allow_request() {
621            return Err(error::BigTableError::CircuitBreakerOpen);
622        }
623        let bigtable = self.pool.get().await?;
624        let result = retry_policy(self.settings.retry_count)
625            .retry_if(
626                || async {
627                    let resp: grpcio::ClientSStreamReceiver<bigtable::ReadRowsResponse> = bigtable
628                        .conn
629                        .read_rows_opt(&req, call_opts(self.metadata.clone()))
630                        .map_err(error::BigTableError::Read)?;
631                    merge::RowMerger::process_chunks(resp).await
632                },
633                retryable_bt_err(&self.metrics),
634            )
635            .await;
636        match &result {
637            Ok(_) => self.circuit_breaker.record_success(),
638            Err(_) => self.circuit_breaker.record_failure(),
639        }
640        result
641    }
642
643    /// write a given row.
644    ///
645    /// there's also `.mutate_rows` which I presume allows multiple.
646    async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
647        let mut req = self.mutate_row_request(&row.row_key);
648        // compile the mutations.
649        // It's possible to do a lot here, including altering in process
650        // mutations, clearing them, etc. It's all up for grabs until we commit
651        // below. For now, let's just presume a write and be done.
652        let mutations = self.get_mutations(row.cells)?;
653        req.set_mutations(mutations);
654        self.mutate_row(req).await?;
655        Ok(())
656    }
657
658    /// Compile the list of mutations for this row.
659    fn get_mutations(
660        &self,
661        cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
662    ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
663        let mut mutations = protobuf::RepeatedField::default();
664        for (family_id, cells) in cells {
665            for cell in cells {
666                let mut mutation = data::Mutation::default();
667                let mut set_cell = data::Mutation_SetCell::default();
668                let timestamp = cell
669                    .timestamp
670                    .duration_since(SystemTime::UNIX_EPOCH)
671                    .map_err(error::BigTableError::WriteTime)?;
672                set_cell.family_name.clone_from(&family_id);
673                set_cell.set_column_qualifier(cell.qualifier.clone().into_bytes());
674                set_cell.set_value(cell.value);
675                // Yes, this is passing milli bounded time as a micro. Otherwise I get
676                // a `Timestamp granularity mismatch` error
677                set_cell.set_timestamp_micros((timestamp.as_millis() * 1000) as i64);
678                debug!("🉑 expiring in {:?}", timestamp.as_millis());
679                mutation.set_set_cell(set_cell);
680                mutations.push(mutation);
681            }
682        }
683        Ok(mutations)
684    }
685
686    /// Write mutations if the row meets a condition specified by the filter.
687    ///
688    /// Mutations can be applied either when the filter matches (state `true`)
689    /// or doesn't match (state `false`).
690    ///
691    /// Returns whether the filter matched records (which indicates whether the
692    /// mutations were applied, depending on the state)
693    async fn check_and_mutate_row(
694        &self,
695        row: row::Row,
696        filter: RowFilter,
697        state: bool,
698    ) -> Result<bool, error::BigTableError> {
699        let mut req = self.check_and_mutate_row_request(&row.row_key);
700        let mutations = self.get_mutations(row.cells)?;
701        req.set_predicate_filter(filter);
702        if state {
703            req.set_true_mutations(mutations);
704        } else {
705            req.set_false_mutations(mutations);
706        }
707        self.check_and_mutate(req).await
708    }
709
710    async fn check_and_mutate(
711        &self,
712        req: bigtable::CheckAndMutateRowRequest,
713    ) -> Result<bool, error::BigTableError> {
714        let bigtable = self.pool.get().await?;
715        let resp = retry_policy(self.settings.retry_count)
716            .retry_if(
717                || async {
718                    // Note: check_and_mutate_row_async may return before the row
719                    // is written, which can cause race conditions for reads
720                    bigtable
721                        .conn
722                        .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
723                },
724                retryable_grpcio_err(&self.metrics),
725            )
726            .await
727            .map_err(error::BigTableError::Write)?;
728        debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
729        Ok(resp.get_predicate_matched())
730    }
731
732    fn get_delete_mutations(
733        &self,
734        family: &str,
735        column_names: &[&str],
736        time_range: Option<&data::TimestampRange>,
737    ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
738        let mut mutations = protobuf::RepeatedField::default();
739        for column in column_names {
740            let mut mutation = data::Mutation::default();
741            // Mutation_DeleteFromRow -- Delete all cells for a given row.
742            // Mutation_DeleteFromFamily -- Delete all cells from a family for a given row.
743            // Mutation_DeleteFromColumn -- Delete all cells from a column name for a given row, restricted by timestamp range.
744            let mut del_cell = data::Mutation_DeleteFromColumn::default();
745            del_cell.set_family_name(family.to_owned());
746            del_cell.set_column_qualifier(column.as_bytes().to_vec());
747            if let Some(range) = time_range {
748                del_cell.set_time_range(range.clone());
749            }
750            mutation.set_delete_from_column(del_cell);
751            mutations.push(mutation);
752        }
753        Ok(mutations)
754    }
755
756    #[allow(unused)]
757    /// Delete all cell data from the specified columns with the optional time range.
758    #[allow(unused)]
759    async fn delete_cells(
760        &self,
761        row_key: &str,
762        family: &str,
763        column_names: &[&str],
764        time_range: Option<&data::TimestampRange>,
765    ) -> Result<(), error::BigTableError> {
766        let mut req = self.mutate_row_request(row_key);
767        req.set_mutations(self.get_delete_mutations(family, column_names, time_range)?);
768        self.mutate_row(req).await
769    }
770
771    /// Delete all the cells for the given row. NOTE: This will drop the row.
772    async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
773        let mut req = self.mutate_row_request(row_key);
774        let mut mutations = protobuf::RepeatedField::default();
775        let mut mutation = data::Mutation::default();
776        mutation.set_delete_from_row(data::Mutation_DeleteFromRow::default());
777        mutations.push(mutation);
778        req.set_mutations(mutations);
779        self.mutate_row(req).await
780    }
781
782    /// This uses the admin interface to drop row ranges.
783    /// This will drop ALL data associated with these rows.
784    /// Note that deletion may take up to a week to occur.
785    /// see https://cloud.google.com/php/docs/reference/cloud-bigtable/latest/Admin.V2.DropRowRangeRequest
786    #[allow(unused)]
787    async fn delete_rows(&self, row_key: &str) -> Result<bool, error::BigTableError> {
788        let admin = BigtableTableAdminClient::new(self.pool.get_channel()?);
789        let mut req = DropRowRangeRequest::new();
790        req.set_name(self.settings.table_name.clone());
791        req.set_row_key_prefix(row_key.as_bytes().to_vec());
792
793        admin
794            .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone()))
795            .map_err(|e| {
796                error!("{:?}", e);
797                error::BigTableError::Admin(
798                    format!(
799                        "Could not send delete command for {}",
800                        &self.settings.table_name
801                    ),
802                    Some(e.to_string()),
803                )
804            })?
805            .await
806            .map_err(|e| {
807                error!("post await: {:?}", e);
808                error::BigTableError::Admin(
809                    format!(
810                        "Could not delete data from table {}",
811                        &self.settings.table_name
812                    ),
813                    Some(e.to_string()),
814                )
815            })?;
816
817        Ok(true)
818    }
819
820    fn rows_to_notifications(
821        &self,
822        rows: BTreeMap<String, Row>,
823    ) -> Result<Vec<Notification>, DbError> {
824        rows.into_iter()
825            .map(|(row_key, row)| self.row_to_notification(&row_key, row))
826            .collect()
827    }
828
829    fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
830        let Some((_, chidmessageid)) = row_key.split_once('#') else {
831            return Err(DbError::Integrity(
832                "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
833                None,
834            ));
835        };
836        let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
837            DbError::Integrity(
838                format!("rows_to_notification expected chidmessageid: {e}"),
839                None,
840            )
841        })?;
842
843        // Create from the known, required fields.
844        let mut notif = Notification {
845            channel_id: range_key.channel_id,
846            topic: range_key.topic,
847            sortkey_timestamp: range_key.sortkey_timestamp,
848            version: to_string(row.take_required_cell("version")?.value, "version")?,
849            ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
850            timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
851            ..Default::default()
852        };
853
854        // Backfill the Optional fields
855        if let Some(cell) = row.take_cell("data") {
856            notif.data = Some(to_string(cell.value, "data")?);
857        }
858        #[cfg(feature = "reliable_report")]
859        {
860            if let Some(cell) = row.take_cell("reliability_id") {
861                notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
862            }
863            if let Some(cell) = row.take_cell("reliable_state") {
864                notif.reliable_state = Some(
865                    crate::reliability::ReliabilityState::from_str(&to_string(
866                        cell.value,
867                        "reliable_state",
868                    )?)
869                    .map_err(|e| {
870                        DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
871                    })?,
872                );
873            }
874        }
875        if let Some(cell) = row.take_cell("headers") {
876            notif.headers = Some(
877                serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
878                    .map_err(|e| DbError::Serialization(e.to_string()))?,
879            );
880        }
881        #[cfg(feature = "reliable_report")]
882        if let Some(cell) = row.take_cell("reliability_id") {
883            trace!("🚣  Is reliable");
884            notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
885        }
886
887        trace!("🚣  Deserialized message row: {:?}", &notif);
888        Ok(notif)
889    }
890
891    /// Return a Row for writing from a [User] and a `version`
892    ///
893    /// `version` is specified as an argument (ignoring [User::version]) so
894    /// that [update_user] may specify a new version to write before modifying
895    /// the [User] struct
896    fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
897        let row_key = user.uaid.simple().to_string();
898        let mut row = Row::new(row_key);
899        let expiry = std::time::SystemTime::now() + self.router_ttl();
900
901        let mut cells: Vec<cell::Cell> = vec![
902            cell::Cell {
903                qualifier: "connected_at".to_owned(),
904                value: user.connected_at.to_be_bytes().to_vec(),
905                timestamp: expiry,
906                ..Default::default()
907            },
908            cell::Cell {
909                qualifier: "router_type".to_owned(),
910                value: user.router_type.clone().into_bytes(),
911                timestamp: expiry,
912                ..Default::default()
913            },
914            cell::Cell {
915                qualifier: "record_version".to_owned(),
916                value: user
917                    .record_version
918                    .unwrap_or(USER_RECORD_VERSION)
919                    .to_be_bytes()
920                    .to_vec(),
921                timestamp: expiry,
922                ..Default::default()
923            },
924            cell::Cell {
925                qualifier: "version".to_owned(),
926                value: (*version).into(),
927                timestamp: expiry,
928                ..Default::default()
929            },
930        ];
931
932        if let Some(router_data) = &user.router_data {
933            cells.push(cell::Cell {
934                qualifier: "router_data".to_owned(),
935                value: json!(router_data).to_string().as_bytes().to_vec(),
936                timestamp: expiry,
937                ..Default::default()
938            });
939        };
940        if let Some(current_timestamp) = user.current_timestamp {
941            cells.push(cell::Cell {
942                qualifier: "current_timestamp".to_owned(),
943                value: current_timestamp.to_be_bytes().to_vec(),
944                timestamp: expiry,
945                ..Default::default()
946            });
947        };
948        if let Some(node_id) = &user.node_id {
949            cells.push(cell::Cell {
950                qualifier: "node_id".to_owned(),
951                value: node_id.as_bytes().to_vec(),
952                timestamp: expiry,
953                ..Default::default()
954            });
955        };
956
957        cells.extend(channels_to_cells(
958            Cow::Borrowed(&user.priv_channels),
959            expiry,
960        ));
961
962        row.add_cells(ROUTER_FAMILY, cells);
963        row
964    }
965}
966
967#[derive(Clone)]
968pub struct BigtableDb {
969    pub(super) conn: BigtableClient,
970    pub(super) health_metadata: Metadata,
971    table_name: String,
972}
973
974impl BigtableDb {
975    pub fn new(channel: Channel, health_metadata: &Metadata, table_name: &str) -> Self {
976        Self {
977            conn: BigtableClient::new(channel),
978            health_metadata: health_metadata.clone(),
979            table_name: table_name.to_owned(),
980        }
981    }
982    /// Perform a simple connectivity check. This should return no actual results
983    /// but should verify that the connection is valid. We use this for the
984    /// Recycle check as well, so it has to be fairly low in the implementation
985    /// stack.
986    ///
987    ///
988    pub async fn health_check(
989        &mut self,
990        metrics: &Arc<StatsdClient>,
991        app_profile_id: &str,
992    ) -> Result<bool, error::BigTableError> {
993        // It is recommended that we pick a random key to perform the health check. Selecting
994        // a single key for all health checks causes a "hot tablet" to arise. The `PingAndWarm`
995        // is intended to be used prior to large updates and is not recommended for use in
996        // health checks.
997        // This health check is to see if the database is present, the response is not important
998        // other than it does not return an error.
999        let random_uaid = Uuid::new_v4().simple().to_string();
1000        let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
1001        let mut filter = data::RowFilter::default();
1002        filter.set_block_all_filter(true);
1003        req.set_filter(filter);
1004        let _r = retry_policy(RETRY_COUNT)
1005            .retry_if(
1006                || async {
1007                    self.conn
1008                        .read_rows_opt(&req, call_opts(self.health_metadata.clone()))
1009                },
1010                retryable_grpcio_err(metrics),
1011            )
1012            .await
1013            .map_err(error::BigTableError::Read)?;
1014
1015        debug!("🉑 health check");
1016        Ok(true)
1017    }
1018}
1019
1020#[async_trait]
1021impl DbClient for BigTableClientImpl {
1022    /// add user to the database
1023    async fn add_user(&self, user: &User) -> DbResult<()> {
1024        trace!("🉑 Adding user");
1025        let Some(ref version) = user.version else {
1026            return Err(DbError::General(
1027                "add_user expected a user version field".to_owned(),
1028            ));
1029        };
1030        let row = self.user_to_row(user, version);
1031
1032        // Only add when the user doesn't already exist
1033        let mut row_key_filter = RowFilter::default();
1034        row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
1035        let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
1036
1037        if self.check_and_mutate_row(row, filter, false).await? {
1038            return Err(DbError::Conditional);
1039        }
1040        Ok(())
1041    }
1042
1043    /// BigTable doesn't really have the concept of an "update". You simply write the data and
1044    /// the individual cells create a new version. Depending on the garbage collection rules for
1045    /// the family, these can either persist or be automatically deleted.
1046    ///
1047    /// NOTE: This function updates the key ROUTER records for a given UAID. It does this by
1048    /// calling [BigTableClientImpl::user_to_row] which creates a new row with new `cell.timestamp` values set
1049    /// to now + `MAX_ROUTER_TTL`. This function is called by mobile during the daily
1050    /// [autoendpoint::routes::update_token_route] handling, and by desktop
1051    /// [autoconnect-ws-sm::get_or_create_user]` which is called
1052    /// during the `HELLO` handler. This should be enough to ensure that the ROUTER records
1053    /// are properly refreshed for "lively" clients.
1054    ///
1055    /// NOTE: There is some, very small, potential risk that a desktop client that can
1056    /// somehow remain connected the duration of MAX_ROUTER_TTL, may be dropped as not being
1057    /// "lively".
1058    async fn update_user(&self, user: &mut User) -> DbResult<bool> {
1059        let Some(ref version) = user.version else {
1060            return Err(DbError::General(
1061                "update_user expected a user version field".to_owned(),
1062            ));
1063        };
1064
1065        let mut filters = vec![router_gc_policy_filter()];
1066        filters.extend(version_filter(version));
1067        let filter = filter_chain(filters);
1068
1069        let new_version = Uuid::new_v4();
1070        // Always write a newly generated version
1071        let row = self.user_to_row(user, &new_version);
1072
1073        let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
1074        user.version = Some(new_version);
1075        Ok(predicate_matched)
1076    }
1077
1078    async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
1079        let row_key = uaid.as_simple().to_string();
1080        let mut req = self.read_row_request(&row_key);
1081        let mut filters = vec![router_gc_policy_filter()];
1082        filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
1083        req.set_filter(filter_chain(filters));
1084        let Some(mut row) = self.read_row(req).await? else {
1085            return Ok(None);
1086        };
1087
1088        trace!("🉑 Found a record for {}", row_key);
1089
1090        let connected_at_cell = match row.take_required_cell("connected_at") {
1091            Ok(cell) => cell,
1092            Err(_) => {
1093                if !is_incomplete_router_record(&row.cells) {
1094                    return Err(DbError::Integrity(
1095                        "Expected column: connected_at".to_owned(),
1096                        Some(format!("{row:#?}")),
1097                    ));
1098                }
1099                // Special case incomplete records: they're equivalent to no
1100                // user exists. Incompletes caused by the migration bug in #640
1101                // will have their migration re-triggered by returning None:
1102                // https://github.com/mozilla-services/autopush-rs/pull/640
1103                trace!("🉑 Dropping an incomplete user record for {}", row_key);
1104                self.metrics
1105                    .incr_with_tags(MetricName::DatabaseDropUser)
1106                    .with_tag("reason", "incomplete_record")
1107                    .send();
1108                self.remove_user(uaid).await?;
1109                return Ok(None);
1110            }
1111        };
1112
1113        let mut result = User {
1114            uaid: *uaid,
1115            connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1116            router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1117            record_version: Some(to_u64(
1118                row.take_required_cell("record_version")?.value,
1119                "record_version",
1120            )?),
1121            version: Some(
1122                row.take_required_cell("version")?
1123                    .value
1124                    .try_into()
1125                    .map_err(|e| {
1126                        DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1127                    })?,
1128            ),
1129            ..Default::default()
1130        };
1131
1132        if let Some(cell) = row.take_cell("router_data") {
1133            result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1134                DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1135            })?;
1136        }
1137
1138        if let Some(cell) = row.take_cell("node_id") {
1139            result.node_id = Some(to_string(cell.value, "node_id")?);
1140        }
1141
1142        if let Some(cell) = row.take_cell("current_timestamp") {
1143            result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1144        }
1145
1146        // Read the channels last, after removal of all non channel cells
1147        result.priv_channels = channels_from_cells(&row.cells)?;
1148
1149        Ok(Some(result))
1150    }
1151
1152    async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1153        let row_key = uaid.simple().to_string();
1154        self.delete_row(&row_key).await?;
1155        Ok(())
1156    }
1157
1158    async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1159        let channels = HashSet::from_iter([channel_id.to_owned()]);
1160        self.add_channels(uaid, channels).await
1161    }
1162
1163    /// Add channels in bulk (used mostly during migration)
1164    ///
1165    async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1166        // channel_ids are stored as a set within one Bigtable row
1167        //
1168        // Bigtable allows "millions of columns in a table, as long as no row
1169        // exceeds the maximum limit of 256 MB per row" enabling the use of
1170        // column qualifiers as data.
1171        //
1172        // The "set" of channel_ids consists of column qualifiers named
1173        // "chid:<chid value>" as set member entries (with their cell values
1174        // being a single 0 byte).
1175        //
1176        // Storing the full set in a single row makes batch updates
1177        // (particularly to reset the GC expiry timestamps) potentially more
1178        // easy/efficient
1179        let row_key = uaid.simple().to_string();
1180        let mut row = Row::new(row_key);
1181        let expiry = std::time::SystemTime::now() + self.router_ttl();
1182
1183        // Note: updating the version column isn't necessary here because this
1184        // write only adds a new (or updates an existing) column with a 0 byte
1185        // value
1186        row.add_cells(
1187            ROUTER_FAMILY,
1188            channels_to_cells(Cow::Owned(channels), expiry),
1189        );
1190
1191        self.write_row(row).await?;
1192        Ok(())
1193    }
1194
1195    async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1196        let row_key = uaid.simple().to_string();
1197        let mut req = self.read_row_request(&row_key);
1198
1199        let mut cq_filter = data::RowFilter::default();
1200        cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());
1201        req.set_filter(filter_chain(vec![
1202            router_gc_policy_filter(),
1203            family_filter(format!("^{ROUTER_FAMILY}$")),
1204            cq_filter,
1205        ]));
1206
1207        let Some(row) = self.read_row(req).await? else {
1208            return Ok(Default::default());
1209        };
1210        channels_from_cells(&row.cells)
1211    }
1212
1213    /// Delete the channel. Does not delete its associated pending messages.
1214    async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1215        let row_key = uaid.simple().to_string();
1216        let mut req = self.check_and_mutate_row_request(&row_key);
1217
1218        // Delete the column representing the channel_id
1219        let column = format!("chid:{}", channel_id.as_hyphenated());
1220        let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1221
1222        // and write a new version cell
1223        let mut row = Row::new(row_key);
1224        let expiry = std::time::SystemTime::now() + self.router_ttl();
1225        row.cells
1226            .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1227        mutations.extend(self.get_mutations(row.cells)?);
1228
1229        // check if the channel existed/was actually removed
1230        let mut cq_filter = data::RowFilter::default();
1231        cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
1232        req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1233        req.set_true_mutations(mutations);
1234
1235        Ok(self.check_and_mutate(req).await?)
1236    }
1237
1238    /// Remove the node_id
1239    async fn remove_node_id(
1240        &self,
1241        uaid: &Uuid,
1242        _node_id: &str,
1243        _connected_at: u64,
1244        version: &Option<Uuid>,
1245    ) -> DbResult<bool> {
1246        let row_key = uaid.simple().to_string();
1247        trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1248        let Some(version) = version else {
1249            return Err(DbError::General("Expected a user version field".to_owned()));
1250        };
1251
1252        let mut req = self.check_and_mutate_row_request(&row_key);
1253
1254        let mut filters = vec![router_gc_policy_filter()];
1255        filters.extend(version_filter(version));
1256        req.set_predicate_filter(filter_chain(filters));
1257        req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);
1258
1259        Ok(self.check_and_mutate(req).await?)
1260    }
1261
1262    /// Write the notification to storage.
1263    async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1264        let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1265        debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1266        trace!(
1267            "🉑 timestamp: {:?}",
1268            &message.timestamp.to_be_bytes().to_vec()
1269        );
1270        let mut row = Row::new(row_key);
1271
1272        // Remember, `timestamp` is effectively the time to kill the message, not the
1273        // current time.
1274        // TODO: use message.expiry()
1275        let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1276        trace!(
1277            "🉑 Message Expiry {}",
1278            expiry
1279                .duration_since(SystemTime::UNIX_EPOCH)
1280                .unwrap_or_default()
1281                .as_millis()
1282        );
1283
1284        let mut cells: Vec<cell::Cell> = Vec::new();
1285
1286        let is_topic = message.topic.is_some();
1287        let family = if is_topic {
1288            MESSAGE_TOPIC_FAMILY
1289        } else {
1290            MESSAGE_FAMILY
1291        };
1292        cells.extend(vec![
1293            cell::Cell {
1294                qualifier: "ttl".to_owned(),
1295                value: message.ttl.to_be_bytes().to_vec(),
1296                timestamp: expiry,
1297                ..Default::default()
1298            },
1299            cell::Cell {
1300                qualifier: "timestamp".to_owned(),
1301                value: message.timestamp.to_be_bytes().to_vec(),
1302                timestamp: expiry,
1303                ..Default::default()
1304            },
1305            cell::Cell {
1306                qualifier: "version".to_owned(),
1307                value: message.version.into_bytes(),
1308                timestamp: expiry,
1309                ..Default::default()
1310            },
1311        ]);
1312        if let Some(headers) = message.headers {
1313            if !headers.is_empty() {
1314                cells.push(cell::Cell {
1315                    qualifier: "headers".to_owned(),
1316                    value: json!(headers).to_string().into_bytes(),
1317                    timestamp: expiry,
1318                    ..Default::default()
1319                });
1320            }
1321        }
1322        #[cfg(feature = "reliable_report")]
1323        {
1324            if let Some(reliability_id) = message.reliability_id {
1325                trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1326                cells.push(cell::Cell {
1327                    qualifier: "reliability_id".to_owned(),
1328                    value: reliability_id.into_bytes(),
1329                    timestamp: expiry,
1330                    ..Default::default()
1331                });
1332            }
1333            if let Some(reliable_state) = message.reliable_state {
1334                cells.push(cell::Cell {
1335                    qualifier: "reliable_state".to_owned(),
1336                    value: reliable_state.to_string().into_bytes(),
1337                    timestamp: expiry,
1338                    ..Default::default()
1339                });
1340            }
1341        }
1342        if let Some(data) = message.data {
1343            cells.push(cell::Cell {
1344                qualifier: "data".to_owned(),
1345                value: data.into_bytes(),
1346                timestamp: expiry,
1347                ..Default::default()
1348            });
1349        }
1350
1351        row.add_cells(family, cells);
1352        trace!("🉑 Adding row");
1353        self.write_row(row).await?;
1354
1355        self.metrics
1356            .incr_with_tags(MetricName::NotificationMessageStored)
1357            .with_tag("topic", &is_topic.to_string())
1358            .with_tag("database", &self.name())
1359            .send();
1360        Ok(())
1361    }
1362
1363    /// Save a batch of messages to the database.
1364    ///
1365    /// Currently just iterating through the list and saving one at a time. There's a bulk way
1366    /// to save messages, but there are other considerations (e.g. mutation limits)
1367    async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1368        // plate simple way of solving this:
1369        for message in messages {
1370            self.save_message(uaid, message).await?;
1371        }
1372        Ok(())
1373    }
1374
1375    /// Set the `current_timestamp` in the meta record for this user agent.
1376    ///
1377    /// This is a bit different for BigTable. Field expiration (technically cell
1378    /// expiration) is determined by the lifetime assigned to the cell once it hits
1379    /// a given date. That means you can't really extend a lifetime by adjusting a
1380    /// single field. You'd have to adjust all the cells that are in the family.
1381    /// So, we're not going to do expiration that way.
1382    ///
1383    /// That leaves the meta "current_timestamp" field. We do not purge ACK'd records,
1384    /// instead we presume that the TTL will kill them off eventually. On reads, we use
1385    /// the `current_timestamp` to determine what records to return, since we return
1386    /// records with timestamps later than `current_timestamp`.
1387    ///
1388    async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1389        let row_key = uaid.simple().to_string();
1390        debug!(
1391            "🉑 Updating {} current_timestamp:  {:?}",
1392            &row_key,
1393            timestamp.to_be_bytes().to_vec()
1394        );
1395        let expiry = std::time::SystemTime::now() + self.router_ttl();
1396        let mut row = Row::new(row_key.clone());
1397
1398        row.cells.insert(
1399            ROUTER_FAMILY.to_owned(),
1400            vec![
1401                cell::Cell {
1402                    qualifier: "current_timestamp".to_owned(),
1403                    value: timestamp.to_be_bytes().to_vec(),
1404                    timestamp: expiry,
1405                    ..Default::default()
1406                },
1407                new_version_cell(expiry),
1408            ],
1409        );
1410
1411        self.write_row(row).await?;
1412
1413        Ok(())
1414    }
1415
1416    /// Delete the notification from storage.
1417    async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1418        trace!(
1419            "🉑 attemping to delete {:?} :: {:?}",
1420            uaid.to_string(),
1421            chidmessageid
1422        );
1423        let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1424        debug!("🉑🔥 Deleting message {}", &row_key);
1425        self.delete_row(&row_key).await?;
1426        self.metrics
1427            .incr_with_tags(MetricName::NotificationMessageDeleted)
1428            .with_tag("database", &self.name())
1429            .send();
1430        Ok(())
1431    }
1432
1433    /// Return `limit` pending messages from storage. `limit=0` for all messages.
1434    async fn fetch_topic_messages(
1435        &self,
1436        uaid: &Uuid,
1437        limit: usize,
1438    ) -> DbResult<FetchMessageResponse> {
1439        let mut req = ReadRowsRequest::default();
1440        req.set_table_name(self.settings.table_name.clone());
1441        req.set_app_profile_id(self.settings.app_profile_id.clone());
1442
1443        let start_key = format!("{}#01:", uaid.simple());
1444        let end_key = format!("{}#02:", uaid.simple());
1445        let mut rows = data::RowSet::default();
1446        let mut row_range = data::RowRange::default();
1447        row_range.set_start_key_open(start_key.into_bytes());
1448        row_range.set_end_key_open(end_key.into_bytes());
1449        let mut row_ranges = RepeatedField::default();
1450        row_ranges.push(row_range);
1451        rows.set_row_ranges(row_ranges);
1452        req.set_rows(rows);
1453
1454        let mut filters = message_gc_policy_filter()?;
1455        filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1456
1457        req.set_filter(filter_chain(filters));
1458        if limit > 0 {
1459            trace!("🉑 Setting limit to {limit}");
1460            req.set_rows_limit(limit as i64);
1461        }
1462        let rows = self.read_rows(req).await?;
1463        debug!(
1464            "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1465            rows.len(),
1466            limit
1467        );
1468
1469        let messages = self.rows_to_notifications(rows)?;
1470
1471        // Note: Bigtable always returns a timestamp of None.
1472        // Under Bigtable `current_timestamp` is instead initially read
1473        // from [get_user].
1474        Ok(FetchMessageResponse {
1475            messages,
1476            timestamp: None,
1477        })
1478    }
1479
1480    /// Return `limit` messages pending for a UAID that have a sortkey_timestamp after
1481    /// what's specified. `limit=0` for all messages.
1482    async fn fetch_timestamp_messages(
1483        &self,
1484        uaid: &Uuid,
1485        timestamp: Option<u64>,
1486        limit: usize,
1487    ) -> DbResult<FetchMessageResponse> {
1488        let mut req = ReadRowsRequest::default();
1489        req.set_table_name(self.settings.table_name.clone());
1490        req.set_app_profile_id(self.settings.app_profile_id.clone());
1491
1492        let mut rows = data::RowSet::default();
1493        let mut row_range = data::RowRange::default();
1494
1495        let start_key = if let Some(ts) = timestamp {
1496            // Fetch everything after the last message with timestamp: the "z"
1497            // moves past the last message's channel_id's 1st hex digit
1498            format!("{}#02:{}z", uaid.simple(), ts)
1499        } else {
1500            format!("{}#02:", uaid.simple())
1501        };
1502        let end_key = format!("{}#03:", uaid.simple());
1503        row_range.set_start_key_open(start_key.into_bytes());
1504        row_range.set_end_key_open(end_key.into_bytes());
1505
1506        let mut row_ranges = RepeatedField::default();
1507        row_ranges.push(row_range);
1508        rows.set_row_ranges(row_ranges);
1509        req.set_rows(rows);
1510
1511        // We can fetch data and do [some remote filtering](https://cloud.google.com/bigtable/docs/filters),
1512        // unfortunately I don't think the filtering we need will be super helpful.
1513        //
1514        //
1515        /*
1516        //NOTE: if you filter on a given field, BigTable will only
1517        // return that specific field. Adding filters for the rest of
1518        // the known elements may NOT return those elements or may
1519        // cause the message to not be returned because any of
1520        // those elements are not present. It may be preferable to
1521        // therefore run two filters, one to fetch the candidate IDs
1522        // and another to fetch the content of the messages.
1523         */
1524        let mut filters = message_gc_policy_filter()?;
1525        filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1526
1527        req.set_filter(filter_chain(filters));
1528        if limit > 0 {
1529            req.set_rows_limit(limit as i64);
1530        }
1531        let rows = self.read_rows(req).await?;
1532        debug!(
1533            "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1534            timestamp,
1535            rows.len(),
1536            limit,
1537        );
1538
1539        let messages = self.rows_to_notifications(rows)?;
1540        // The timestamp of the last message read
1541        let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1542        Ok(FetchMessageResponse {
1543            messages,
1544            timestamp,
1545        })
1546    }
1547
1548    async fn health_check(&self) -> DbResult<bool> {
1549        Ok(self
1550            .pool
1551            .get()
1552            .await?
1553            .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1554            .await?)
1555    }
1556
1557    /// Returns true, because there's only one table in BigTable. We divide things up
1558    /// by `family`.
1559    async fn router_table_exists(&self) -> DbResult<bool> {
1560        Ok(true)
1561    }
1562
1563    /// Returns true, because there's only one table in BigTable. We divide things up
1564    /// by `family`.
1565    async fn message_table_exists(&self) -> DbResult<bool> {
1566        Ok(true)
1567    }
1568
1569    #[cfg(feature = "reliable_report")]
1570    async fn log_report(
1571        &self,
1572        reliability_id: &str,
1573        new_state: crate::reliability::ReliabilityState,
1574    ) -> DbResult<()> {
1575        let row_key = reliability_id.to_owned();
1576
1577        let mut row = Row::new(row_key);
1578        let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1579
1580        // Log the latest transition time for this id.
1581        let cells: Vec<cell::Cell> = vec![cell::Cell {
1582            qualifier: new_state.to_string(),
1583            value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1584            timestamp: expiry,
1585            ..Default::default()
1586        }];
1587
1588        row.add_cells(RELIABLE_LOG_FAMILY, cells);
1589
1590        self.write_row(row).await?;
1591
1592        Ok(())
1593    }
1594
1595    fn box_clone(&self) -> Box<dyn DbClient> {
1596        Box::new(self.clone())
1597    }
1598
1599    fn name(&self) -> String {
1600        "Bigtable".to_owned()
1601    }
1602
1603    fn pool_status(&self) -> Option<deadpool::Status> {
1604        Some(self.pool.pool.status())
1605    }
1606}
1607
1608#[cfg(all(test, feature = "emulator"))]
1609mod tests {
1610
1611    //! Currently, these test rely on having a BigTable emulator running on the current machine.
1612    //! The tests presume to be able to connect to localhost:8086. See docs/bigtable.md for
1613    //! details and how to set up and initialize an emulator.
1614    //!
1615    use std::sync::Arc;
1616    use std::time::SystemTime;
1617
1618    use cadence::StatsdClient;
1619    use uuid;
1620
1621    use super::*;
1622    use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1623
1624    const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1625    const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1626    const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1627
1628    fn now() -> u64 {
1629        SystemTime::now()
1630            .duration_since(SystemTime::UNIX_EPOCH)
1631            .unwrap()
1632            .as_secs()
1633    }
1634
1635    fn new_client() -> DbResult<BigTableClientImpl> {
1636        let env_dsn = format!(
1637            "grpc://{}",
1638            std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1639        );
1640        let settings = DbSettings {
1641            // this presumes the table was created with
1642            // ```
1643            // scripts/setup_bt.sh
1644            // ```
1645            // with `message`, `router`, and `message_topic` families
1646            dsn: Some(env_dsn),
1647            db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1648                .to_string(),
1649        };
1650
1651        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1652
1653        BigTableClientImpl::new(metrics, &settings)
1654    }
1655
1656    #[test]
1657    fn escape_bytes_for_regex() {
1658        let b = b"hi";
1659        assert_eq!(escape_bytes(b), b.to_vec());
1660        assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1661        let b = b"\xe2\x80\xb3";
1662        assert_eq!(escape_bytes(b), b.to_vec());
1663        // clippy::octal-escapes rightly discourages this ("\022") in a byte literal
1664        let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1665        assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1666        let b = b"\xc0";
1667        assert_eq!(escape_bytes(b), b.to_vec());
1668        assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1669    }
1670
1671    #[actix_rt::test]
1672    async fn health_check() {
1673        let client = new_client().unwrap();
1674
1675        let result = client.health_check().await;
1676        assert!(result.is_ok());
1677        assert!(result.unwrap());
1678    }
1679
1680    /// run a gauntlet of testing. These are a bit linear because they need
1681    /// to run in sequence.
1682    #[actix_rt::test]
1683    async fn run_gauntlet() -> DbResult<()> {
1684        let client = new_client()?;
1685
1686        let connected_at = ms_since_epoch();
1687
1688        let uaid = Uuid::parse_str(TEST_USER).unwrap();
1689        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1690        let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1691
1692        let node_id = "test_node".to_owned();
1693
1694        // purge the user record if it exists.
1695        let _ = client.remove_user(&uaid).await;
1696
1697        let test_user = User {
1698            uaid,
1699            router_type: "webpush".to_owned(),
1700            connected_at,
1701            router_data: None,
1702            node_id: Some(node_id.clone()),
1703            ..Default::default()
1704        };
1705
1706        // purge the old user (if present)
1707        // in case a prior test failed for whatever reason.
1708        let _ = client.remove_user(&uaid).await;
1709
1710        // can we add the user?
1711        client.add_user(&test_user).await?;
1712        let fetched = client.get_user(&uaid).await?;
1713        assert!(fetched.is_some());
1714        let fetched = fetched.unwrap();
1715        assert_eq!(fetched.router_type, "webpush".to_owned());
1716
1717        // Simulate a connected_at occuring before the following writes
1718        let connected_at = ms_since_epoch();
1719
1720        // can we add channels?
1721        client.add_channel(&uaid, &chid).await?;
1722        let channels = client.get_channels(&uaid).await?;
1723        assert!(channels.contains(&chid));
1724
1725        // can we add lots of channels?
1726        let mut new_channels: HashSet<Uuid> = HashSet::new();
1727        new_channels.insert(chid);
1728        for _ in 1..10 {
1729            new_channels.insert(uuid::Uuid::new_v4());
1730        }
1731        let chid_to_remove = uuid::Uuid::new_v4();
1732        new_channels.insert(chid_to_remove);
1733        client.add_channels(&uaid, new_channels.clone()).await?;
1734        let channels = client.get_channels(&uaid).await?;
1735        assert_eq!(channels, new_channels);
1736
1737        // can we remove a channel?
1738        assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1739        assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1740        new_channels.remove(&chid_to_remove);
1741        let channels = client.get_channels(&uaid).await?;
1742        assert_eq!(channels, new_channels);
1743
1744        // now ensure that we can update a user that's after the time we set
1745        // prior. first ensure that we can't update a user that's before the
1746        // time we set prior to the last write
1747        let mut updated = User {
1748            connected_at,
1749            ..test_user.clone()
1750        };
1751        let result = client.update_user(&mut updated).await;
1752        assert!(result.is_ok());
1753        assert!(!result.unwrap());
1754
1755        // Make sure that the `connected_at` wasn't modified
1756        let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1757        assert_eq!(fetched.connected_at, fetched2.connected_at);
1758
1759        // and make sure we can update a record with a later connected_at time.
1760        let mut updated = User {
1761            connected_at: fetched.connected_at + 300,
1762            ..fetched2
1763        };
1764        let result = client.update_user(&mut updated).await;
1765        assert!(result.is_ok());
1766        assert!(result.unwrap());
1767        assert_ne!(
1768            fetched2.connected_at,
1769            client.get_user(&uaid).await?.unwrap().connected_at
1770        );
1771
1772        // can we increment the storage for the user?
1773        client
1774            .increment_storage(
1775                &fetched.uaid,
1776                SystemTime::now()
1777                    .duration_since(SystemTime::UNIX_EPOCH)
1778                    .unwrap()
1779                    .as_secs(),
1780            )
1781            .await?;
1782
1783        let test_data = "An_encrypted_pile_of_crap".to_owned();
1784        let timestamp = now();
1785        let sort_key = now();
1786        // Can we store a message?
1787        let test_notification = crate::db::Notification {
1788            channel_id: chid,
1789            version: "test".to_owned(),
1790            ttl: 300,
1791            timestamp,
1792            data: Some(test_data.clone()),
1793            sortkey_timestamp: Some(sort_key),
1794            ..Default::default()
1795        };
1796        let res = client.save_message(&uaid, test_notification.clone()).await;
1797        assert!(res.is_ok());
1798
1799        let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1800        assert_ne!(fetched.messages.len(), 0);
1801        let fm = fetched.messages.pop().unwrap();
1802        assert_eq!(fm.channel_id, test_notification.channel_id);
1803        assert_eq!(fm.data, Some(test_data));
1804
1805        // Grab all 1 of the messages that were submmited within the past 10 seconds.
1806        let fetched = client
1807            .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1808            .await?;
1809        assert_ne!(fetched.messages.len(), 0);
1810
1811        // Try grabbing a message for 10 seconds from now.
1812        let fetched = client
1813            .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1814            .await?;
1815        assert_eq!(fetched.messages.len(), 0);
1816
1817        // can we clean up our toys?
1818        assert!(client
1819            .remove_message(&uaid, &test_notification.chidmessageid())
1820            .await
1821            .is_ok());
1822
1823        assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1824
1825        // Now, can we do all that with topic messages
1826        client.add_channel(&uaid, &topic_chid).await?;
1827        let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1828        let timestamp = now();
1829        let sort_key = now();
1830        // Can we store a message?
1831        let test_notification = crate::db::Notification {
1832            channel_id: topic_chid,
1833            version: "test".to_owned(),
1834            ttl: 300,
1835            topic: Some("topic".to_owned()),
1836            timestamp,
1837            data: Some(test_data.clone()),
1838            sortkey_timestamp: Some(sort_key),
1839            ..Default::default()
1840        };
1841        assert!(client
1842            .save_message(&uaid, test_notification.clone())
1843            .await
1844            .is_ok());
1845
1846        let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1847        assert_ne!(fetched.messages.len(), 0);
1848        let fm = fetched.messages.pop().unwrap();
1849        assert_eq!(fm.channel_id, test_notification.channel_id);
1850        assert_eq!(fm.data, Some(test_data));
1851
1852        // Grab the message that was submmited.
1853        let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1854        assert_ne!(fetched.messages.len(), 0);
1855
1856        // can we clean up our toys?
1857        assert!(client
1858            .remove_message(&uaid, &test_notification.chidmessageid())
1859            .await
1860            .is_ok());
1861
1862        assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1863
1864        let msgs = client
1865            .fetch_timestamp_messages(&uaid, None, 999)
1866            .await?
1867            .messages;
1868        assert!(msgs.is_empty());
1869
1870        let fetched = client.get_user(&uaid).await?.unwrap();
1871        assert!(client
1872            .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1873            .await
1874            .is_ok());
1875        // did we remove it?
1876        let fetched = client.get_user(&uaid).await?.unwrap();
1877        assert_eq!(fetched.node_id, None);
1878
1879        assert!(client.remove_user(&uaid).await.is_ok());
1880
1881        assert!(client.get_user(&uaid).await?.is_none());
1882
1883        Ok(())
1884    }
1885
1886    #[actix_rt::test]
1887    async fn read_cells_family_id() -> DbResult<()> {
1888        let client = new_client().unwrap();
1889        let uaid = gen_test_uaid();
1890        client.remove_user(&uaid).await.unwrap();
1891
1892        let qualifier = "foo".to_owned();
1893
1894        let row_key = uaid.simple().to_string();
1895        let mut row = Row::new(row_key.clone());
1896        row.cells.insert(
1897            ROUTER_FAMILY.to_owned(),
1898            vec![cell::Cell {
1899                qualifier: qualifier.to_owned(),
1900                value: "bar".as_bytes().to_vec(),
1901                ..Default::default()
1902            }],
1903        );
1904        client.write_row(row).await.unwrap();
1905        let req = client.read_row_request(&row_key);
1906        let Some(row) = client.read_row(req).await.unwrap() else {
1907            panic!("Expected row");
1908        };
1909        assert_eq!(row.cells.len(), 1);
1910        assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1911        client.remove_user(&uaid).await
1912    }
1913
1914    #[actix_rt::test]
1915    async fn add_user_existing() {
1916        let client = new_client().unwrap();
1917        let uaid = gen_test_uaid();
1918        let user = User {
1919            uaid,
1920            ..Default::default()
1921        };
1922        client.remove_user(&uaid).await.unwrap();
1923
1924        client.add_user(&user).await.unwrap();
1925        let err = client.add_user(&user).await.unwrap_err();
1926        assert!(matches!(err, DbError::Conditional));
1927    }
1928
1929    #[actix_rt::test]
1930    async fn version_check() {
1931        let client = new_client().unwrap();
1932        let uaid = gen_test_uaid();
1933        let user = User {
1934            uaid,
1935            ..Default::default()
1936        };
1937        client.remove_user(&uaid).await.unwrap();
1938
1939        client.add_user(&user).await.unwrap();
1940        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1941        assert!(client.update_user(&mut user.clone()).await.unwrap());
1942
1943        let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1944        assert_ne!(user.version, fetched.version);
1945        // should now fail w/ a stale version
1946        assert!(!client.update_user(&mut user).await.unwrap());
1947
1948        client.remove_user(&uaid).await.unwrap();
1949    }
1950
1951    #[actix_rt::test]
1952    async fn lingering_chid_record() {
1953        let client = new_client().unwrap();
1954        let uaid = gen_test_uaid();
1955        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1956        let user = User {
1957            uaid,
1958            ..Default::default()
1959        };
1960        client.remove_user(&uaid).await.unwrap();
1961
1962        // add_channel doesn't check for the existence of a user
1963        client.add_channel(&uaid, &chid).await.unwrap();
1964
1965        // w/ chid records in the router row, get_user should treat
1966        // this as the user not existing
1967        assert!(client.get_user(&uaid).await.unwrap().is_none());
1968
1969        client.add_user(&user).await.unwrap();
1970        // get_user should have also cleaned up the chids
1971        assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1972
1973        client.remove_user(&uaid).await.unwrap();
1974    }
1975
1976    #[actix_rt::test]
1977    async fn lingering_current_timestamp() {
1978        let client = new_client().unwrap();
1979        let uaid = gen_test_uaid();
1980        client.remove_user(&uaid).await.unwrap();
1981
1982        client
1983            .increment_storage(&uaid, crate::util::sec_since_epoch())
1984            .await
1985            .unwrap();
1986        assert!(client.get_user(&uaid).await.unwrap().is_none());
1987
1988        client.remove_user(&uaid).await.unwrap();
1989    }
1990
1991    #[actix_rt::test]
1992    async fn lingering_chid_w_version_record() {
1993        let client = new_client().unwrap();
1994        let uaid = gen_test_uaid();
1995        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1996        client.remove_user(&uaid).await.unwrap();
1997
1998        client.add_channel(&uaid, &chid).await.unwrap();
1999        assert!(client.remove_channel(&uaid, &chid).await.unwrap());
2000        assert!(client.get_user(&uaid).await.unwrap().is_none());
2001
2002        client.remove_user(&uaid).await.unwrap();
2003    }
2004
2005    #[actix_rt::test]
2006    async fn channel_and_current_timestamp_ttl_updates() {
2007        let client = new_client().unwrap();
2008        let uaid = gen_test_uaid();
2009        let chid = Uuid::parse_str(TEST_CHID).unwrap();
2010        client.remove_user(&uaid).await.unwrap();
2011
2012        // Setup a user with some channels and a current_timestamp
2013        let user = User {
2014            uaid,
2015            ..Default::default()
2016        };
2017        client.add_user(&user).await.unwrap();
2018
2019        client.add_channel(&uaid, &chid).await.unwrap();
2020        client
2021            .add_channel(&uaid, &uuid::Uuid::new_v4())
2022            .await
2023            .unwrap();
2024
2025        client
2026            .increment_storage(
2027                &uaid,
2028                SystemTime::now()
2029                    .duration_since(SystemTime::UNIX_EPOCH)
2030                    .unwrap()
2031                    .as_secs(),
2032            )
2033            .await
2034            .unwrap();
2035
2036        let req = client.read_row_request(&uaid.as_simple().to_string());
2037        let Some(mut row) = client.read_row(req).await.unwrap() else {
2038            panic!("Expected row");
2039        };
2040
2041        // Ensure the initial cell expiry (timestamp) of all the cells
2042        // in the row has been updated
2043        let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
2044        for mut cells in row.cells.into_values() {
2045            let Some(cell) = cells.pop() else {
2046                continue;
2047            };
2048            assert!(
2049                cell.timestamp >= ca_expiry,
2050                "{} cell timestamp should >= connected_at's",
2051                cell.qualifier
2052            );
2053        }
2054
2055        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
2056
2057        // Quick nap to make sure that the ca_expiry values are different.
2058        tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
2059        client.update_user(&mut user).await.unwrap();
2060
2061        // Ensure update_user updated the expiry (timestamp) of every cell in the row
2062        let req = client.read_row_request(&uaid.as_simple().to_string());
2063        let Some(mut row) = client.read_row(req).await.unwrap() else {
2064            panic!("Expected row");
2065        };
2066
2067        let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
2068
2069        assert!(ca_expiry2 > ca_expiry);
2070
2071        for mut cells in row.cells.into_values() {
2072            let Some(cell) = cells.pop() else {
2073                continue;
2074            };
2075            assert!(
2076                cell.timestamp >= ca_expiry2,
2077                "{} cell timestamp expiry should exceed connected_at's",
2078                cell.qualifier
2079            );
2080        }
2081
2082        client.remove_user(&uaid).await.unwrap();
2083    }
2084}