Skip to main content

autopush_common/db/bigtable/bigtable_client/
mod.rs

1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
8use std::time::{Duration, SystemTime};
9
10use again::RetryPolicy;
11use async_trait::async_trait;
12use cadence::StatsdClient;
13#[cfg(feature = "reliable_report")]
14use chrono::TimeDelta;
15use gcp_auth::TokenProvider;
16use googleapis_tonic_google_bigtable_v2::google::bigtable::v2 as bigtable;
17use googleapis_tonic_google_bigtable_v2::google::bigtable::v2::bigtable_client::BigtableClient;
18use serde_json::{from_str, json};
19use tonic::metadata::{AsciiMetadataValue, MetadataMap};
20use tonic::transport::Channel;
21use tonic::{Code, Request, Status};
22use uuid::Uuid;
23
24use crate::MAX_ROUTER_TTL_SECS;
25use crate::db::{
26    DbSettings, Notification, USER_RECORD_VERSION, User,
27    client::{DbClient, FetchMessageResponse},
28    error::{DbError, DbResult},
29    models::RangeKey,
30};
31use crate::metric_name::MetricName;
32use crate::metrics::StatsdClientExt;
33
34pub use self::metadata::MetadataBuilder;
35use self::row::{Row, RowCells};
36use super::BigTableDbSettings;
37use super::pool::BigTablePool;
38
39pub mod cell;
40pub mod error;
41pub(crate) mod merge;
42pub mod metadata;
43pub mod row;
44
45// these are normally Vec<u8>
46pub type RowKey = String;
47
48// These are more for code clarity than functional types.
49// Rust will happily swap between the two in any case.
50// See [super::row::Row] for discussion about how these
51// are overloaded in order to simplify fetching data.
52pub type Qualifier = String;
53pub type FamilyId = String;
54
55const ROUTER_FAMILY: &str = "router";
56const MESSAGE_FAMILY: &str = "message"; // The default family for messages
57const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
58#[cfg(feature = "reliable_report")]
59const RELIABLE_LOG_FAMILY: &str = "reliability";
60#[cfg(feature = "reliable_report")]
61/// The maximum TTL for reliability logging (60 days).
62/// /// In most use cases, converted to seconds through .num_seconds().
63pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
64
65pub(crate) const RETRY_COUNT: usize = 5;
66
67/// Maximum gRPC message size (256MB), matching the prior grpcio configuration.
68const MAX_MESSAGE_LEN: usize = 1 << 28;
69
70/// OAuth2 scopes requested for the Bigtable data API.
71const BIGTABLE_DATA_SCOPES: &[&str] = &["https://www.googleapis.com/auth/bigtable.data"];
72
73/// Simple circuit breaker to prevent retry storms during BigTable outages.
74///
75/// After `failure_threshold` consecutive failures, the circuit opens and
76/// requests fail fast for `cooldown_secs` seconds before allowing a retry.
77#[derive(Debug)]
78pub struct CircuitBreaker {
79    consecutive_failures: AtomicU32,
80    opened_at_epoch_secs: AtomicU64,
81    failure_threshold: u32,
82    cooldown_secs: u64,
83}
84
85impl CircuitBreaker {
86    pub fn new(failure_threshold: u32, cooldown_secs: u64) -> Self {
87        Self {
88            consecutive_failures: AtomicU32::new(0),
89            opened_at_epoch_secs: AtomicU64::new(0),
90            failure_threshold,
91            cooldown_secs,
92        }
93    }
94
95    /// Check if the circuit is allowing requests through.
96    /// Returns true if the request should proceed, false if it should fail fast.
97    pub fn allow_request(&self) -> bool {
98        let failures = self.consecutive_failures.load(Ordering::Relaxed);
99        if failures < self.failure_threshold {
100            return true;
101        }
102        // Circuit is open — check if cooldown has elapsed
103        let opened_at = self.opened_at_epoch_secs.load(Ordering::Relaxed);
104        let now = SystemTime::now()
105            .duration_since(SystemTime::UNIX_EPOCH)
106            .unwrap_or_default()
107            .as_secs();
108        if now.saturating_sub(opened_at) >= self.cooldown_secs {
109            // Allow a single probe request (half-open state)
110            true
111        } else {
112            false
113        }
114    }
115
116    /// Record a successful operation, resetting the circuit breaker.
117    pub fn record_success(&self) {
118        self.consecutive_failures.store(0, Ordering::Relaxed);
119    }
120
121    /// Record a failed operation.
122    pub fn record_failure(&self) {
123        let prev = self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
124        if prev + 1 >= self.failure_threshold {
125            let now = SystemTime::now()
126                .duration_since(SystemTime::UNIX_EPOCH)
127                .unwrap_or_default()
128                .as_secs();
129            self.opened_at_epoch_secs.store(now, Ordering::Relaxed);
130        }
131    }
132}
133
134impl Default for CircuitBreaker {
135    fn default() -> Self {
136        // Open after 5 consecutive failures, cooldown for 30 seconds
137        Self::new(5, 30)
138    }
139}
140
141/// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently.
142// TODO:Should we create something similar for ChannelID?
143struct Uaid(Uuid);
144
145impl Display for Uaid {
146    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
147        write!(f, "{}", self.0.as_simple())
148    }
149}
150
151impl From<Uaid> for String {
152    fn from(uaid: Uaid) -> String {
153        uaid.0.as_simple().to_string()
154    }
155}
156
157#[derive(Clone)]
158/// Wrapper for the BigTable connection
159pub struct BigTableClientImpl {
160    pub(crate) settings: BigTableDbSettings,
161    /// Metrics client
162    metrics: Arc<StatsdClient>,
163    /// Connection Channel (used for alternate calls)
164    pool: BigTablePool,
165    metadata: MetadataMap,
166    /// Circuit breaker to prevent retry storms during BigTable outages
167    circuit_breaker: Arc<CircuitBreaker>,
168}
169
170/// Return a a RowFilter matching the GC policy of the router Column Family
171fn router_gc_policy_filter() -> bigtable::RowFilter {
172    bigtable::RowFilter {
173        filter: Some(bigtable::row_filter::Filter::CellsPerColumnLimitFilter(1)),
174    }
175}
176
177/// Return a chain of RowFilters matching the GC policy of the message Column
178/// Families
179fn message_gc_policy_filter() -> Result<Vec<bigtable::RowFilter>, error::BigTableError> {
180    let bt_now: i64 = SystemTime::now()
181        .duration_since(SystemTime::UNIX_EPOCH)
182        .map_err(error::BigTableError::WriteTime)?
183        .as_millis() as i64;
184    let timestamp_filter = bigtable::RowFilter {
185        filter: Some(bigtable::row_filter::Filter::TimestampRangeFilter(
186            bigtable::TimestampRange {
187                start_timestamp_micros: bt_now * 1000,
188                end_timestamp_micros: 0,
189            },
190        )),
191    };
192
193    Ok(vec![router_gc_policy_filter(), timestamp_filter])
194}
195
196/// Return a Column family regex RowFilter
197fn family_filter(regex: String) -> bigtable::RowFilter {
198    bigtable::RowFilter {
199        filter: Some(bigtable::row_filter::Filter::FamilyNameRegexFilter(regex)),
200    }
201}
202
203/// Escape bytes for RE values
204///
205/// Based off google-re2/perl's quotemeta function
206fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
207    let mut vec = Vec::with_capacity(bytes.len() * 2);
208    for &b in bytes {
209        if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
210            if b == b'\0' {
211                // Special handling for null: Note that this special handling
212                // is not strictly required for RE2, but this quoting is
213                // required for other regexp libraries such as PCRE.
214                // Can't use "\\0" since the next character might be a digit.
215                vec.extend("\\x00".as_bytes());
216                continue;
217            }
218            vec.push(b'\\');
219        }
220        vec.push(b);
221    }
222    vec
223}
224
225/// Return a chain of RowFilters limiting to a match of the specified
226/// `version`'s column value
227fn version_filter(version: &Uuid) -> Vec<bigtable::RowFilter> {
228    let cq_filter = bigtable::RowFilter {
229        filter: Some(bigtable::row_filter::Filter::ColumnQualifierRegexFilter(
230            "^version$".as_bytes().to_vec(),
231        )),
232    };
233    let value_filter = bigtable::RowFilter {
234        filter: Some(bigtable::row_filter::Filter::ValueRegexFilter(
235            escape_bytes(version.as_bytes()),
236        )),
237    };
238
239    vec![
240        family_filter(format!("^{ROUTER_FAMILY}$")),
241        cq_filter,
242        value_filter,
243    ]
244}
245
246/// Return a newly generated `version` column `Cell`
247fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
248    cell::Cell {
249        qualifier: "version".to_owned(),
250        value: Uuid::new_v4().into(),
251        timestamp,
252        ..Default::default()
253    }
254}
255
256/// Return a RowFilter chain from multiple RowFilters
257fn filter_chain(filters: Vec<bigtable::RowFilter>) -> bigtable::RowFilter {
258    bigtable::RowFilter {
259        filter: Some(bigtable::row_filter::Filter::Chain(
260            bigtable::row_filter::Chain { filters },
261        )),
262    }
263}
264
265/// Return a ReadRowsRequest against table for a given row key
266fn read_row_request(
267    table_name: &str,
268    app_profile_id: &str,
269    row_key: &str,
270) -> bigtable::ReadRowsRequest {
271    bigtable::ReadRowsRequest {
272        table_name: table_name.to_owned(),
273        app_profile_id: app_profile_id.to_owned(),
274        rows: Some(bigtable::RowSet {
275            row_keys: vec![row_key.as_bytes().to_vec()],
276            row_ranges: Vec::new(),
277        }),
278        ..Default::default()
279    }
280}
281
282fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
283    let v: [u8; 8] = value
284        .try_into()
285        .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
286    Ok(u64::from_be_bytes(v))
287}
288
289fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
290    String::from_utf8(value).map_err(|e| {
291        debug!("🉑 cannot read string {}: {:?}", name, e);
292        DbError::DeserializeString(name.to_owned())
293    })
294}
295
296/// Parse the "set" (see [DbClient::add_channels]) of channel ids in a bigtable Row.
297///
298/// Cells should solely contain the set of channels otherwise an Error is returned.
299fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
300    let mut result = HashSet::new();
301    for cells in cells.values() {
302        let Some(cell) = cells.last() else {
303            continue;
304        };
305        let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
306            return Err(DbError::Integrity(
307                "get_channels expected: chid:<chid>".to_owned(),
308                None,
309            ));
310        };
311        result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
312    }
313    Ok(result)
314}
315
316/// Convert the [HashSet] of channel ids to cell entries for a bigtable Row
317fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
318    let channels = channels.into_owned();
319    let mut cells = Vec::with_capacity(channels.len().min(100_000));
320    for (i, channel_id) in channels.into_iter().enumerate() {
321        // There is a limit of 100,000 mutations per batch for bigtable.
322        // https://cloud.google.com/bigtable/quotas
323        // If you have 100,000 channels, you have too many.
324        if i >= 100_000 {
325            break;
326        }
327        cells.push(cell::Cell {
328            qualifier: format!("chid:{}", channel_id.as_hyphenated()),
329            timestamp: expiry,
330            ..Default::default()
331        });
332    }
333    cells
334}
335
336pub fn retry_policy(max: usize) -> RetryPolicy {
337    RetryPolicy::default()
338        .with_max_retries(max)
339        .with_jitter(true)
340}
341
342fn retryable_internal_err(status: &Status) -> bool {
343    match status.code() {
344        // tonic surfaces transport-level failures (connection resets, etc.)
345        // as `Unknown`.
346        Code::Unknown => true,
347        Code::Internal => [
348            "rst_stream",
349            "rst stream",
350            "received unexpected eos on data frame from server",
351        ]
352        .contains(&status.message().to_lowercase().as_str()),
353        Code::Unavailable | Code::DeadlineExceeded => true,
354        _ => false,
355    }
356}
357
358pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
359    let mut metric = metrics
360        .incr_with_tags(MetricName::DatabaseRetry)
361        .with_tag("error", err_type)
362        .with_tag("type", "bigtable");
363    if let Some(code) = code {
364        metric = metric.with_tag("code", code);
365    }
366    metric.send();
367}
368
369pub fn retryable_status(metrics: &Arc<StatsdClient>) -> impl Fn(&Status) -> bool + '_ {
370    move |status| {
371        info!("GRPC Failure: {:?}", status);
372        let retry = retryable_internal_err(status);
373        if retry {
374            metric(metrics, "RpcFailure", Some(&format!("{:?}", status.code())));
375        }
376        retry
377    }
378}
379
380pub fn retryable_bt_err(
381    metrics: &Arc<StatsdClient>,
382) -> impl Fn(&error::BigTableError) -> bool + '_ {
383    move |err| {
384        debug!("🉑 Checking BigTableError...{err}");
385        match err {
386            error::BigTableError::InvalidRowResponse(s)
387            | error::BigTableError::Read(s)
388            | error::BigTableError::Write(s) => retryable_status(metrics)(s),
389            // Failures to fetch an OAuth token (e.g. a transient metadata
390            // server hiccup) are retryable, matching the prior behavior of
391            // retrying grpcio's oauth token fetch errors.
392            error::BigTableError::Auth(_) => {
393                metric(metrics, "Auth", None);
394                true
395            }
396            _ => false,
397        }
398    }
399}
400
401/// Determine if a router record is "incomplete" (doesn't include [User]
402/// columns):
403///
404/// They can be incomplete for a couple reasons:
405///
406/// 1) A migration code bug caused a few incomplete migrations where
407///    `add_channels` and `increment_storage` calls occurred when the migration's
408///    initial `add_user` was never completed:
409///    https://github.com/mozilla-services/autopush-rs/pull/640
410///
411/// 2) When router TTLs are eventually enabled: `add_channel` and
412///    `increment_storage` can write cells with later expiry times than the other
413///    router cells
414fn is_incomplete_router_record(cells: &RowCells) -> bool {
415    cells
416        .keys()
417        .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
418}
419
420/// Connect to a BigTable storage model.
421///
422/// BigTable is available via the Google Console, and is a schema less storage system.
423///
424/// The `db_dsn` string should be in the form of
425/// `grpc://{BigTableEndpoint}`
426///
427/// The settings contains the `table_name` which is the GRPC path to the data.
428/// (e.g. `projects/{project_id}/instances/{instance_id}/tables/{table_id}`)
429///
430/// where:
431/// _BigTableEndpoint_ is the endpoint domain to use (the default is `bigtable.googleapis.com`) See
432/// [BigTable Endpoints](https://cloud.google.com/bigtable/docs/regional-endpoints) for more details.
433/// _project-id_ is the Google project identifier (see the Google developer console (e.g. 'autopush-dev'))
434/// _instance-id_ is the Google project instance, (see the Google developer console (e.g. 'development-1'))
435/// _table_id_ is the Table Name (e.g. 'autopush')
436///
437/// This will automatically bring in the default credentials specified by the `GOOGLE_APPLICATION_CREDENTIALS`
438/// environment variable.
439///
440/// NOTE: Some configurations may look for the default credential file (pointed to by
441/// `GOOGLE_APPLICATION_CREDENTIALS`) to be stored in
442/// `$HOME/.config/gcloud/application_default_credentials.json`)
443///
444impl BigTableClientImpl {
445    pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
446        debug!("🏊 BT Pool new");
447        let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
448        info!("🉑 {:#?}", db_settings);
449        let pool = BigTablePool::new(settings, &metrics)?;
450
451        // create the metadata header blocks required by Google for accessing GRPC resources.
452        let metadata = db_settings.metadata()?;
453        Ok(Self {
454            settings: db_settings,
455            metrics,
456            metadata,
457            pool,
458            circuit_breaker: Arc::new(CircuitBreaker::default()),
459        })
460    }
461
462    fn router_ttl(&self) -> Duration {
463        self.settings
464            .max_router_ttl
465            .unwrap_or(Duration::from_secs(MAX_ROUTER_TTL_SECS))
466    }
467
468    /// Spawn a task to periodically evict idle connections
469    pub fn spawn_sweeper(&self, interval: Duration) {
470        self.pool.spawn_sweeper(interval);
471    }
472
473    /// Return a ReadRowsRequest for a given row key
474    fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
475        read_row_request(
476            &self.settings.table_name,
477            &self.settings.app_profile_id,
478            row_key,
479        )
480    }
481
482    /// Return a MutateRowRequest for a given row key
483    fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
484        bigtable::MutateRowRequest {
485            table_name: self.settings.table_name.clone(),
486            app_profile_id: self.settings.app_profile_id.clone(),
487            row_key: row_key.as_bytes().to_vec(),
488            ..Default::default()
489        }
490    }
491
492    /// Return a CheckAndMutateRowRequest for a given row key
493    fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
494        bigtable::CheckAndMutateRowRequest {
495            table_name: self.settings.table_name.clone(),
496            app_profile_id: self.settings.app_profile_id.clone(),
497            row_key: row_key.as_bytes().to_vec(),
498            ..Default::default()
499        }
500    }
501
502    /// Read a given row from the row key.
503    async fn mutate_row(
504        &self,
505        req: bigtable::MutateRowRequest,
506    ) -> Result<(), error::BigTableError> {
507        if !self.circuit_breaker.allow_request() {
508            return Err(error::BigTableError::CircuitBreakerOpen);
509        }
510        let bigtable = self.pool.get().await?;
511        let result = retry_policy(self.settings.retry_count)
512            .retry_if(
513                || async {
514                    let request = bigtable.request(req.clone(), &self.metadata).await?;
515                    bigtable
516                        .conn
517                        .clone()
518                        .mutate_row(request)
519                        .await
520                        .map_err(error::BigTableError::Write)?;
521                    Ok(())
522                },
523                retryable_bt_err(&self.metrics),
524            )
525            .await;
526        match &result {
527            Ok(()) => self.circuit_breaker.record_success(),
528            Err(_) => self.circuit_breaker.record_failure(),
529        }
530        result
531    }
532
533    /// Read one row for the [ReadRowsRequest] (assuming only a single row was requested).
534    async fn read_row(
535        &self,
536        req: bigtable::ReadRowsRequest,
537    ) -> Result<Option<row::Row>, error::BigTableError> {
538        let mut rows = self.read_rows(req).await?;
539        Ok(rows.pop_first().map(|(_, v)| v))
540    }
541
542    /// Take a big table ReadRowsRequest (containing the keys and filters) and return a set of row data indexed by row key.
543    ///
544    ///
545    async fn read_rows(
546        &self,
547        req: bigtable::ReadRowsRequest,
548    ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
549        if !self.circuit_breaker.allow_request() {
550            return Err(error::BigTableError::CircuitBreakerOpen);
551        }
552        let bigtable = self.pool.get().await?;
553        let result = retry_policy(self.settings.retry_count)
554            .retry_if(
555                || async {
556                    let request = bigtable.request(req.clone(), &self.metadata).await?;
557                    let resp = bigtable
558                        .conn
559                        .clone()
560                        .read_rows(request)
561                        .await
562                        .map_err(error::BigTableError::Read)?
563                        .into_inner();
564                    merge::RowMerger::process_chunks(resp).await
565                },
566                retryable_bt_err(&self.metrics),
567            )
568            .await;
569        match &result {
570            Ok(_) => self.circuit_breaker.record_success(),
571            Err(_) => self.circuit_breaker.record_failure(),
572        }
573        result
574    }
575
576    /// write a given row.
577    ///
578    /// there's also `.mutate_rows` which I presume allows multiple.
579    async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
580        let mut req = self.mutate_row_request(&row.row_key);
581        // compile the mutations.
582        // It's possible to do a lot here, including altering in process
583        // mutations, clearing them, etc. It's all up for grabs until we commit
584        // below. For now, let's just presume a write and be done.
585        req.mutations = self.get_mutations(row.cells)?;
586        self.mutate_row(req).await?;
587        Ok(())
588    }
589
590    /// Compile the list of mutations for this row.
591    fn get_mutations(
592        &self,
593        cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
594    ) -> Result<Vec<bigtable::Mutation>, error::BigTableError> {
595        let mut mutations = Vec::new();
596        for (family_id, cells) in cells {
597            for cell in cells {
598                let timestamp = cell
599                    .timestamp
600                    .duration_since(SystemTime::UNIX_EPOCH)
601                    .map_err(error::BigTableError::WriteTime)?;
602                debug!("🉑 expiring in {:?}", timestamp.as_millis());
603                mutations.push(bigtable::Mutation {
604                    mutation: Some(bigtable::mutation::Mutation::SetCell(
605                        bigtable::mutation::SetCell {
606                            family_name: family_id.clone(),
607                            column_qualifier: cell.qualifier.clone().into_bytes(),
608                            // Bigtable tables use millisecond cell-timestamp granularity;
609                            // timestamp_micros must be a multiple of 1,000 or the server
610                            // rejects the mutation with a granularity mismatch.
611                            timestamp_micros: (timestamp.as_millis() * 1000) as i64,
612                            value: cell.value,
613                        },
614                    )),
615                });
616            }
617        }
618        Ok(mutations)
619    }
620
621    /// Write mutations if the row meets a condition specified by the filter.
622    ///
623    /// Mutations can be applied either when the filter matches (state `true`)
624    /// or doesn't match (state `false`).
625    ///
626    /// Returns whether the filter matched records (which indicates whether the
627    /// mutations were applied, depending on the state)
628    async fn check_and_mutate_row(
629        &self,
630        row: row::Row,
631        filter: bigtable::RowFilter,
632        state: bool,
633    ) -> Result<bool, error::BigTableError> {
634        let mut req = self.check_and_mutate_row_request(&row.row_key);
635        let mutations = self.get_mutations(row.cells)?;
636        req.predicate_filter = Some(filter);
637        if state {
638            req.true_mutations = mutations;
639        } else {
640            req.false_mutations = mutations;
641        }
642        self.check_and_mutate(req).await
643    }
644
645    async fn check_and_mutate(
646        &self,
647        req: bigtable::CheckAndMutateRowRequest,
648    ) -> Result<bool, error::BigTableError> {
649        let bigtable = self.pool.get().await?;
650        let resp = retry_policy(self.settings.retry_count)
651            .retry_if(
652                || async {
653                    let request = bigtable.request(req.clone(), &self.metadata).await?;
654                    bigtable
655                        .conn
656                        .clone()
657                        .check_and_mutate_row(request)
658                        .await
659                        .map_err(error::BigTableError::Write)
660                },
661                retryable_bt_err(&self.metrics),
662            )
663            .await?
664            .into_inner();
665        debug!("🉑 Predicate Matched: {}", &resp.predicate_matched);
666        Ok(resp.predicate_matched)
667    }
668
669    fn get_delete_mutations(
670        &self,
671        family: &str,
672        column_names: &[&str],
673        time_range: Option<&bigtable::TimestampRange>,
674    ) -> Result<Vec<bigtable::Mutation>, error::BigTableError> {
675        let mut mutations = Vec::new();
676        for column in column_names {
677            // DeleteFromRow -- Delete all cells for a given row.
678            // DeleteFromFamily -- Delete all cells from a family for a given row.
679            // DeleteFromColumn -- Delete all cells from a column name for a given row, restricted by timestamp range.
680            mutations.push(bigtable::Mutation {
681                mutation: Some(bigtable::mutation::Mutation::DeleteFromColumn(
682                    bigtable::mutation::DeleteFromColumn {
683                        family_name: family.to_owned(),
684                        column_qualifier: column.as_bytes().to_vec(),
685                        time_range: time_range.cloned(),
686                    },
687                )),
688            });
689        }
690        Ok(mutations)
691    }
692
693    /// Delete all the cells for the given row. NOTE: This will drop the row.
694    async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
695        let mut req = self.mutate_row_request(row_key);
696        req.mutations = vec![bigtable::Mutation {
697            mutation: Some(bigtable::mutation::Mutation::DeleteFromRow(
698                bigtable::mutation::DeleteFromRow {},
699            )),
700        }];
701        self.mutate_row(req).await
702    }
703
704    fn rows_to_notifications(
705        &self,
706        rows: BTreeMap<String, Row>,
707    ) -> Result<Vec<Notification>, DbError> {
708        rows.into_iter()
709            .map(|(row_key, row)| self.row_to_notification(&row_key, row))
710            .collect()
711    }
712
713    fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
714        let Some((_, chidmessageid)) = row_key.split_once('#') else {
715            return Err(DbError::Integrity(
716                "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
717                None,
718            ));
719        };
720        let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
721            DbError::Integrity(
722                format!("rows_to_notification expected chidmessageid: {e}"),
723                None,
724            )
725        })?;
726
727        // Create from the known, required fields.
728        let mut notif = Notification {
729            channel_id: range_key.channel_id,
730            topic: range_key.topic,
731            sortkey_timestamp: range_key.sortkey_timestamp,
732            version: to_string(row.take_required_cell("version")?.value, "version")?,
733            ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
734            timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
735            ..Default::default()
736        };
737
738        // Backfill the Optional fields
739        if let Some(cell) = row.take_cell("data") {
740            notif.data = Some(to_string(cell.value, "data")?);
741        }
742        #[cfg(feature = "reliable_report")]
743        {
744            if let Some(cell) = row.take_cell("reliability_id") {
745                notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
746            }
747            if let Some(cell) = row.take_cell("reliable_state") {
748                notif.reliable_state = Some(
749                    crate::reliability::ReliabilityState::from_str(&to_string(
750                        cell.value,
751                        "reliable_state",
752                    )?)
753                    .map_err(|e| {
754                        DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
755                    })?,
756                );
757            }
758        }
759        if let Some(cell) = row.take_cell("headers") {
760            notif.headers = Some(
761                serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
762                    .map_err(|e| DbError::Serialization(e.to_string()))?,
763            );
764        }
765        #[cfg(feature = "reliable_report")]
766        if let Some(cell) = row.take_cell("reliability_id") {
767            trace!("🚣  Is reliable");
768            notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
769        }
770
771        trace!("🚣  Deserialized message row: {:?}", &notif);
772        Ok(notif)
773    }
774
775    /// Return a Row for writing from a [User] and a `version`
776    ///
777    /// `version` is specified as an argument (ignoring [User::version]) so
778    /// that [update_user] may specify a new version to write before modifying
779    /// the [User] struct
780    fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
781        let row_key = user.uaid.simple().to_string();
782        let mut row = Row::new(row_key);
783        let expiry = std::time::SystemTime::now() + self.router_ttl();
784
785        let mut cells: Vec<cell::Cell> = vec![
786            cell::Cell {
787                qualifier: "connected_at".to_owned(),
788                value: user.connected_at.to_be_bytes().to_vec(),
789                timestamp: expiry,
790                ..Default::default()
791            },
792            cell::Cell {
793                qualifier: "router_type".to_owned(),
794                value: user.router_type.clone().into_bytes(),
795                timestamp: expiry,
796                ..Default::default()
797            },
798            cell::Cell {
799                qualifier: "record_version".to_owned(),
800                value: user
801                    .record_version
802                    .unwrap_or(USER_RECORD_VERSION)
803                    .to_be_bytes()
804                    .to_vec(),
805                timestamp: expiry,
806                ..Default::default()
807            },
808            cell::Cell {
809                qualifier: "version".to_owned(),
810                value: (*version).into(),
811                timestamp: expiry,
812                ..Default::default()
813            },
814        ];
815
816        if let Some(router_data) = &user.router_data {
817            cells.push(cell::Cell {
818                qualifier: "router_data".to_owned(),
819                value: json!(router_data).to_string().as_bytes().to_vec(),
820                timestamp: expiry,
821                ..Default::default()
822            });
823        };
824        if let Some(current_timestamp) = user.current_timestamp {
825            cells.push(cell::Cell {
826                qualifier: "current_timestamp".to_owned(),
827                value: current_timestamp.to_be_bytes().to_vec(),
828                timestamp: expiry,
829                ..Default::default()
830            });
831        };
832        if let Some(node_id) = &user.node_id {
833            cells.push(cell::Cell {
834                qualifier: "node_id".to_owned(),
835                value: node_id.as_bytes().to_vec(),
836                timestamp: expiry,
837                ..Default::default()
838            });
839        };
840
841        cells.extend(channels_to_cells(
842            Cow::Borrowed(&user.priv_channels),
843            expiry,
844        ));
845
846        row.add_cells(ROUTER_FAMILY, cells);
847        row
848    }
849}
850
851#[derive(Clone)]
852pub struct BigtableDb {
853    pub(super) conn: BigtableClient<Channel>,
854    pub(super) health_metadata: MetadataMap,
855    /// Application Default Credentials token provider, shared across the
856    /// pool. `None` when running against the emulator (which needs no
857    /// credentials).
858    auth_provider: Option<Arc<dyn TokenProvider>>,
859    table_name: String,
860}
861
862impl BigtableDb {
863    pub fn new(
864        channel: Channel,
865        auth_provider: Option<Arc<dyn TokenProvider>>,
866        health_metadata: &MetadataMap,
867        table_name: &str,
868    ) -> Self {
869        Self {
870            conn: BigtableClient::new(channel)
871                .max_decoding_message_size(MAX_MESSAGE_LEN)
872                .max_encoding_message_size(MAX_MESSAGE_LEN),
873            health_metadata: health_metadata.clone(),
874            auth_provider,
875            table_name: table_name.to_owned(),
876        }
877    }
878
879    /// Build a [tonic::Request] for the given message, attaching the standard
880    /// Google routing metadata and (outside of the emulator) an OAuth2
881    /// `authorization` token from the Application Default Credentials.
882    ///
883    /// The token provider caches tokens internally, so this is cheap to call
884    /// per-request (and per retry attempt, where it transparently picks up a
885    /// fresh token if the prior one expired).
886    pub(super) async fn request<T>(
887        &self,
888        msg: T,
889        metadata: &MetadataMap,
890    ) -> Result<Request<T>, error::BigTableError> {
891        let mut request = Request::new(msg);
892        *request.metadata_mut() = metadata.clone();
893        if let Some(provider) = &self.auth_provider {
894            let token = provider
895                .token(BIGTABLE_DATA_SCOPES)
896                .await
897                .map_err(error::BigTableError::Auth)?;
898            let value: AsciiMetadataValue = format!("Bearer {}", token.as_str())
899                .parse()
900                .map_err(|e| error::BigTableError::Config(format!("Invalid auth token: {e}")))?;
901            request.metadata_mut().insert("authorization", value);
902        }
903        Ok(request)
904    }
905
906    /// Perform a simple connectivity check. This should return no actual results
907    /// but should verify that the connection is valid. We use this for the
908    /// Recycle check as well, so it has to be fairly low in the implementation
909    /// stack.
910    ///
911    ///
912    pub async fn health_check(
913        &mut self,
914        metrics: &Arc<StatsdClient>,
915        app_profile_id: &str,
916    ) -> Result<bool, error::BigTableError> {
917        // It is recommended that we pick a random key to perform the health check. Selecting
918        // a single key for all health checks causes a "hot tablet" to arise. The `PingAndWarm`
919        // is intended to be used prior to large updates and is not recommended for use in
920        // health checks.
921        // This health check is to see if the database is present, the response is not important
922        // other than it does not return an error.
923        let random_uaid = Uuid::new_v4().simple().to_string();
924        let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
925        req.filter = Some(bigtable::RowFilter {
926            filter: Some(bigtable::row_filter::Filter::BlockAllFilter(true)),
927        });
928        let _r = retry_policy(RETRY_COUNT)
929            .retry_if(
930                || async {
931                    let request = self.request(req.clone(), &self.health_metadata).await?;
932                    self.conn
933                        .clone()
934                        .read_rows(request)
935                        .await
936                        .map_err(error::BigTableError::Read)
937                },
938                retryable_bt_err(metrics),
939            )
940            .await?;
941
942        debug!("🉑 health check");
943        Ok(true)
944    }
945}
946
947#[async_trait]
948impl DbClient for BigTableClientImpl {
949    /// add user to the database
950    async fn add_user(&self, user: &User) -> DbResult<()> {
951        trace!("🉑 Adding user");
952        let Some(ref version) = user.version else {
953            return Err(DbError::General(
954                "add_user expected a user version field".to_owned(),
955            ));
956        };
957        let row = self.user_to_row(user, version);
958
959        // Only add when the user doesn't already exist
960        let row_key_filter = bigtable::RowFilter {
961            filter: Some(bigtable::row_filter::Filter::RowKeyRegexFilter(
962                format!("^{}$", row.row_key).into_bytes(),
963            )),
964        };
965        let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
966
967        if self.check_and_mutate_row(row, filter, false).await? {
968            return Err(DbError::Conditional);
969        }
970        Ok(())
971    }
972
973    /// BigTable doesn't really have the concept of an "update". You simply write the data and
974    /// the individual cells create a new version. Depending on the garbage collection rules for
975    /// the family, these can either persist or be automatically deleted.
976    ///
977    /// NOTE: This function updates the key ROUTER records for a given UAID. It does this by
978    /// calling [BigTableClientImpl::user_to_row] which creates a new row with new `cell.timestamp` values set
979    /// to now + `MAX_ROUTER_TTL`. This function is called by mobile during the daily
980    /// [autoendpoint::routes::update_token_route] handling, and by desktop
981    /// [autoconnect-ws-sm::get_or_create_user]` which is called
982    /// during the `HELLO` handler. This should be enough to ensure that the ROUTER records
983    /// are properly refreshed for "lively" clients.
984    ///
985    /// NOTE: There is some, very small, potential risk that a desktop client that can
986    /// somehow remain connected the duration of MAX_ROUTER_TTL, may be dropped as not being
987    /// "lively".
988    async fn update_user(&self, user: &mut User) -> DbResult<bool> {
989        let Some(ref version) = user.version else {
990            return Err(DbError::General(
991                "update_user expected a user version field".to_owned(),
992            ));
993        };
994
995        let mut filters = vec![router_gc_policy_filter()];
996        filters.extend(version_filter(version));
997        let filter = filter_chain(filters);
998
999        let new_version = Uuid::new_v4();
1000        // Always write a newly generated version
1001        let row = self.user_to_row(user, &new_version);
1002
1003        let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
1004        user.version = Some(new_version);
1005        Ok(predicate_matched)
1006    }
1007
1008    async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
1009        let row_key = uaid.as_simple().to_string();
1010        let mut req = self.read_row_request(&row_key);
1011        let mut filters = vec![router_gc_policy_filter()];
1012        filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
1013        req.filter = Some(filter_chain(filters));
1014        let Some(mut row) = self.read_row(req).await? else {
1015            return Ok(None);
1016        };
1017
1018        trace!("🉑 Found a record for {}", row_key);
1019
1020        let connected_at_cell = match row.take_required_cell("connected_at") {
1021            Ok(cell) => cell,
1022            Err(_) => {
1023                if !is_incomplete_router_record(&row.cells) {
1024                    return Err(DbError::Integrity(
1025                        "Expected column: connected_at".to_owned(),
1026                        Some(format!("{row:#?}")),
1027                    ));
1028                }
1029                // Special case incomplete records: they're equivalent to no
1030                // user exists. Incompletes caused by the migration bug in #640
1031                // will have their migration re-triggered by returning None:
1032                // https://github.com/mozilla-services/autopush-rs/pull/640
1033                trace!("🉑 Dropping an incomplete user record for {}", row_key);
1034                self.metrics
1035                    .incr_with_tags(MetricName::DatabaseDropUser)
1036                    .with_tag("reason", "incomplete_record")
1037                    .send();
1038                self.remove_user(uaid).await?;
1039                return Ok(None);
1040            }
1041        };
1042
1043        let mut result = User {
1044            uaid: *uaid,
1045            connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1046            router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1047            record_version: Some(to_u64(
1048                row.take_required_cell("record_version")?.value,
1049                "record_version",
1050            )?),
1051            version: Some(
1052                row.take_required_cell("version")?
1053                    .value
1054                    .try_into()
1055                    .map_err(|e| {
1056                        DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1057                    })?,
1058            ),
1059            ..Default::default()
1060        };
1061
1062        if let Some(cell) = row.take_cell("router_data") {
1063            result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1064                DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1065            })?;
1066        }
1067
1068        if let Some(cell) = row.take_cell("node_id") {
1069            result.node_id = Some(to_string(cell.value, "node_id")?);
1070        }
1071
1072        if let Some(cell) = row.take_cell("current_timestamp") {
1073            result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1074        }
1075
1076        // Read the channels last, after removal of all non channel cells
1077        result.priv_channels = channels_from_cells(&row.cells)?;
1078
1079        Ok(Some(result))
1080    }
1081
1082    async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1083        let row_key = uaid.simple().to_string();
1084        self.delete_row(&row_key).await?;
1085        Ok(())
1086    }
1087
1088    async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1089        let channels = HashSet::from_iter([channel_id.to_owned()]);
1090        self.add_channels(uaid, channels).await
1091    }
1092
1093    /// Add channels in bulk (used mostly during migration)
1094    ///
1095    async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1096        // channel_ids are stored as a set within one Bigtable row
1097        //
1098        // Bigtable allows "millions of columns in a table, as long as no row
1099        // exceeds the maximum limit of 256 MB per row" enabling the use of
1100        // column qualifiers as data.
1101        //
1102        // The "set" of channel_ids consists of column qualifiers named
1103        // "chid:<chid value>" as set member entries (with their cell values
1104        // being a single 0 byte).
1105        //
1106        // Storing the full set in a single row makes batch updates
1107        // (particularly to reset the GC expiry timestamps) potentially more
1108        // easy/efficient
1109        let row_key = uaid.simple().to_string();
1110        let mut row = Row::new(row_key);
1111        let expiry = std::time::SystemTime::now() + self.router_ttl();
1112
1113        // Note: updating the version column isn't necessary here because this
1114        // write only adds a new (or updates an existing) column with a 0 byte
1115        // value
1116        row.add_cells(
1117            ROUTER_FAMILY,
1118            channels_to_cells(Cow::Owned(channels), expiry),
1119        );
1120
1121        self.write_row(row).await?;
1122        Ok(())
1123    }
1124
1125    async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1126        let row_key = uaid.simple().to_string();
1127        let mut req = self.read_row_request(&row_key);
1128
1129        let cq_filter = bigtable::RowFilter {
1130            filter: Some(bigtable::row_filter::Filter::ColumnQualifierRegexFilter(
1131                "^chid:.*$".as_bytes().to_vec(),
1132            )),
1133        };
1134        req.filter = Some(filter_chain(vec![
1135            router_gc_policy_filter(),
1136            family_filter(format!("^{ROUTER_FAMILY}$")),
1137            cq_filter,
1138        ]));
1139
1140        let Some(row) = self.read_row(req).await? else {
1141            return Ok(Default::default());
1142        };
1143        channels_from_cells(&row.cells)
1144    }
1145
1146    /// Delete the channel. Does not delete its associated pending messages.
1147    async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1148        let row_key = uaid.simple().to_string();
1149        let mut req = self.check_and_mutate_row_request(&row_key);
1150
1151        // Delete the column representing the channel_id
1152        let column = format!("chid:{}", channel_id.as_hyphenated());
1153        let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1154
1155        // and write a new version cell
1156        let mut row = Row::new(row_key);
1157        let expiry = std::time::SystemTime::now() + self.router_ttl();
1158        row.cells
1159            .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1160        mutations.extend(self.get_mutations(row.cells)?);
1161
1162        // check if the channel existed/was actually removed
1163        let cq_filter = bigtable::RowFilter {
1164            filter: Some(bigtable::row_filter::Filter::ColumnQualifierRegexFilter(
1165                format!("^{column}$").into_bytes(),
1166            )),
1167        };
1168        req.predicate_filter = Some(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1169        req.true_mutations = mutations;
1170
1171        Ok(self.check_and_mutate(req).await?)
1172    }
1173
1174    /// Remove the node_id
1175    async fn remove_node_id(
1176        &self,
1177        uaid: &Uuid,
1178        _node_id: &str,
1179        _connected_at: u64,
1180        version: &Option<Uuid>,
1181    ) -> DbResult<bool> {
1182        let row_key = uaid.simple().to_string();
1183        trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1184        let Some(version) = version else {
1185            return Err(DbError::General("Expected a user version field".to_owned()));
1186        };
1187
1188        let mut req = self.check_and_mutate_row_request(&row_key);
1189
1190        let mut filters = vec![router_gc_policy_filter()];
1191        filters.extend(version_filter(version));
1192        req.predicate_filter = Some(filter_chain(filters));
1193        req.true_mutations = self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?;
1194
1195        Ok(self.check_and_mutate(req).await?)
1196    }
1197
1198    /// Write the notification to storage.
1199    async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1200        let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1201        debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1202        trace!(
1203            "🉑 timestamp: {:?}",
1204            &message.timestamp.to_be_bytes().to_vec()
1205        );
1206        let mut row = Row::new(row_key);
1207
1208        // Remember, `timestamp` is effectively the time to kill the message, not the
1209        // current time.
1210        // TODO: use message.expiry()
1211        let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1212        trace!(
1213            "🉑 Message Expiry {}",
1214            expiry
1215                .duration_since(SystemTime::UNIX_EPOCH)
1216                .unwrap_or_default()
1217                .as_millis()
1218        );
1219
1220        let mut cells: Vec<cell::Cell> = Vec::new();
1221
1222        let is_topic = message.topic.is_some();
1223        let family = if is_topic {
1224            MESSAGE_TOPIC_FAMILY
1225        } else {
1226            MESSAGE_FAMILY
1227        };
1228        cells.extend(vec![
1229            cell::Cell {
1230                qualifier: "ttl".to_owned(),
1231                value: message.ttl.to_be_bytes().to_vec(),
1232                timestamp: expiry,
1233                ..Default::default()
1234            },
1235            cell::Cell {
1236                qualifier: "timestamp".to_owned(),
1237                value: message.timestamp.to_be_bytes().to_vec(),
1238                timestamp: expiry,
1239                ..Default::default()
1240            },
1241            cell::Cell {
1242                qualifier: "version".to_owned(),
1243                value: message.version.into_bytes(),
1244                timestamp: expiry,
1245                ..Default::default()
1246            },
1247        ]);
1248        if let Some(headers) = message.headers
1249            && !headers.is_empty()
1250        {
1251            cells.push(cell::Cell {
1252                qualifier: "headers".to_owned(),
1253                value: json!(headers).to_string().into_bytes(),
1254                timestamp: expiry,
1255                ..Default::default()
1256            });
1257        }
1258        #[cfg(feature = "reliable_report")]
1259        {
1260            if let Some(reliability_id) = message.reliability_id {
1261                trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1262                cells.push(cell::Cell {
1263                    qualifier: "reliability_id".to_owned(),
1264                    value: reliability_id.into_bytes(),
1265                    timestamp: expiry,
1266                    ..Default::default()
1267                });
1268            }
1269            if let Some(reliable_state) = message.reliable_state {
1270                cells.push(cell::Cell {
1271                    qualifier: "reliable_state".to_owned(),
1272                    value: reliable_state.to_string().into_bytes(),
1273                    timestamp: expiry,
1274                    ..Default::default()
1275                });
1276            }
1277        }
1278        if let Some(data) = message.data {
1279            cells.push(cell::Cell {
1280                qualifier: "data".to_owned(),
1281                value: data.into_bytes(),
1282                timestamp: expiry,
1283                ..Default::default()
1284            });
1285        }
1286
1287        row.add_cells(family, cells);
1288        trace!("🉑 Adding row");
1289        self.write_row(row).await?;
1290
1291        self.metrics
1292            .incr_with_tags(MetricName::NotificationMessageStored)
1293            .with_tag("topic", &is_topic.to_string())
1294            .with_tag("database", &self.name())
1295            .send();
1296        Ok(())
1297    }
1298
1299    /// Save a batch of messages to the database.
1300    ///
1301    /// Currently just iterating through the list and saving one at a time. There's a bulk way
1302    /// to save messages, but there are other considerations (e.g. mutation limits)
1303    async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1304        // plate simple way of solving this:
1305        for message in messages {
1306            self.save_message(uaid, message).await?;
1307        }
1308        Ok(())
1309    }
1310
1311    /// Set the `current_timestamp` in the meta record for this user agent.
1312    ///
1313    /// This is a bit different for BigTable. Field expiration (technically cell
1314    /// expiration) is determined by the lifetime assigned to the cell once it hits
1315    /// a given date. That means you can't really extend a lifetime by adjusting a
1316    /// single field. You'd have to adjust all the cells that are in the family.
1317    /// So, we're not going to do expiration that way.
1318    ///
1319    /// That leaves the meta "current_timestamp" field. We do not purge ACK'd records,
1320    /// instead we presume that the TTL will kill them off eventually. On reads, we use
1321    /// the `current_timestamp` to determine what records to return, since we return
1322    /// records with timestamps later than `current_timestamp`.
1323    ///
1324    async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1325        let row_key = uaid.simple().to_string();
1326        debug!(
1327            "🉑 Updating {} current_timestamp:  {:?}",
1328            &row_key,
1329            timestamp.to_be_bytes().to_vec()
1330        );
1331        let expiry = std::time::SystemTime::now() + self.router_ttl();
1332        let mut row = Row::new(row_key.clone());
1333
1334        row.cells.insert(
1335            ROUTER_FAMILY.to_owned(),
1336            vec![
1337                cell::Cell {
1338                    qualifier: "current_timestamp".to_owned(),
1339                    value: timestamp.to_be_bytes().to_vec(),
1340                    timestamp: expiry,
1341                    ..Default::default()
1342                },
1343                new_version_cell(expiry),
1344            ],
1345        );
1346
1347        self.write_row(row).await?;
1348
1349        Ok(())
1350    }
1351
1352    /// Delete the notification from storage.
1353    async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1354        trace!(
1355            "🉑 attemping to delete {:?} :: {:?}",
1356            uaid.to_string(),
1357            chidmessageid
1358        );
1359        let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1360        debug!("🉑🔥 Deleting message {}", &row_key);
1361        self.delete_row(&row_key).await?;
1362        self.metrics
1363            .incr_with_tags(MetricName::NotificationMessageDeleted)
1364            .with_tag("database", &self.name())
1365            .send();
1366        Ok(())
1367    }
1368
1369    /// Return `limit` pending messages from storage. `limit=0` for all messages.
1370    async fn fetch_topic_messages(
1371        &self,
1372        uaid: &Uuid,
1373        limit: usize,
1374    ) -> DbResult<FetchMessageResponse> {
1375        let start_key = format!("{}#01:", uaid.simple());
1376        let end_key = format!("{}#02:", uaid.simple());
1377        let mut req = bigtable::ReadRowsRequest {
1378            table_name: self.settings.table_name.clone(),
1379            app_profile_id: self.settings.app_profile_id.clone(),
1380            rows: Some(bigtable::RowSet {
1381                row_keys: Vec::new(),
1382                row_ranges: vec![bigtable::RowRange {
1383                    start_key: Some(bigtable::row_range::StartKey::StartKeyOpen(
1384                        start_key.into_bytes(),
1385                    )),
1386                    end_key: Some(bigtable::row_range::EndKey::EndKeyOpen(
1387                        end_key.into_bytes(),
1388                    )),
1389                }],
1390            }),
1391            ..Default::default()
1392        };
1393
1394        let mut filters = message_gc_policy_filter()?;
1395        filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1396
1397        req.filter = Some(filter_chain(filters));
1398        if limit > 0 {
1399            trace!("🉑 Setting limit to {limit}");
1400            req.rows_limit = limit as i64;
1401        }
1402        let rows = self.read_rows(req).await?;
1403        debug!(
1404            "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1405            rows.len(),
1406            limit
1407        );
1408
1409        let messages = self.rows_to_notifications(rows)?;
1410
1411        // Note: Bigtable always returns a timestamp of None.
1412        // Under Bigtable `current_timestamp` is instead initially read
1413        // from [get_user].
1414        Ok(FetchMessageResponse {
1415            messages,
1416            timestamp: None,
1417        })
1418    }
1419
1420    /// Return `limit` messages pending for a UAID that have a sortkey_timestamp after
1421    /// what's specified. `limit=0` for all messages.
1422    async fn fetch_timestamp_messages(
1423        &self,
1424        uaid: &Uuid,
1425        timestamp: Option<u64>,
1426        limit: usize,
1427    ) -> DbResult<FetchMessageResponse> {
1428        let start_key = if let Some(ts) = timestamp {
1429            // Fetch everything after the last message with timestamp: the "z"
1430            // moves past the last message's channel_id's 1st hex digit
1431            format!("{}#02:{}z", uaid.simple(), ts)
1432        } else {
1433            format!("{}#02:", uaid.simple())
1434        };
1435        let end_key = format!("{}#03:", uaid.simple());
1436        let mut req = bigtable::ReadRowsRequest {
1437            table_name: self.settings.table_name.clone(),
1438            app_profile_id: self.settings.app_profile_id.clone(),
1439            rows: Some(bigtable::RowSet {
1440                row_keys: Vec::new(),
1441                row_ranges: vec![bigtable::RowRange {
1442                    start_key: Some(bigtable::row_range::StartKey::StartKeyOpen(
1443                        start_key.into_bytes(),
1444                    )),
1445                    end_key: Some(bigtable::row_range::EndKey::EndKeyOpen(
1446                        end_key.into_bytes(),
1447                    )),
1448                }],
1449            }),
1450            ..Default::default()
1451        };
1452
1453        // We can fetch data and do [some remote filtering](https://cloud.google.com/bigtable/docs/filters),
1454        // unfortunately I don't think the filtering we need will be super helpful.
1455        //
1456        //
1457        /*
1458        //NOTE: if you filter on a given field, BigTable will only
1459        // return that specific field. Adding filters for the rest of
1460        // the known elements may NOT return those elements or may
1461        // cause the message to not be returned because any of
1462        // those elements are not present. It may be preferable to
1463        // therefore run two filters, one to fetch the candidate IDs
1464        // and another to fetch the content of the messages.
1465         */
1466        let mut filters = message_gc_policy_filter()?;
1467        filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1468
1469        req.filter = Some(filter_chain(filters));
1470        if limit > 0 {
1471            req.rows_limit = limit as i64;
1472        }
1473        let rows = self.read_rows(req).await?;
1474        debug!(
1475            "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1476            timestamp,
1477            rows.len(),
1478            limit,
1479        );
1480
1481        let messages = self.rows_to_notifications(rows)?;
1482        // The timestamp of the last message read
1483        let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1484        Ok(FetchMessageResponse {
1485            messages,
1486            timestamp,
1487        })
1488    }
1489
1490    async fn health_check(&self) -> DbResult<bool> {
1491        Ok(self
1492            .pool
1493            .get()
1494            .await?
1495            .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1496            .await?)
1497    }
1498
1499    /// Returns true, because there's only one table in BigTable. We divide things up
1500    /// by `family`.
1501    async fn router_table_exists(&self) -> DbResult<bool> {
1502        Ok(true)
1503    }
1504
1505    /// Returns true, because there's only one table in BigTable. We divide things up
1506    /// by `family`.
1507    async fn message_table_exists(&self) -> DbResult<bool> {
1508        Ok(true)
1509    }
1510
1511    #[cfg(feature = "reliable_report")]
1512    async fn log_report(
1513        &self,
1514        reliability_id: &str,
1515        new_state: crate::reliability::ReliabilityState,
1516    ) -> DbResult<()> {
1517        let row_key = reliability_id.to_owned();
1518
1519        let mut row = Row::new(row_key);
1520        let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1521
1522        // Log the latest transition time for this id.
1523        let cells: Vec<cell::Cell> = vec![cell::Cell {
1524            qualifier: new_state.to_string(),
1525            value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1526            timestamp: expiry,
1527            ..Default::default()
1528        }];
1529
1530        row.add_cells(RELIABLE_LOG_FAMILY, cells);
1531
1532        self.write_row(row).await?;
1533
1534        Ok(())
1535    }
1536
1537    fn box_clone(&self) -> Box<dyn DbClient> {
1538        Box::new(self.clone())
1539    }
1540
1541    fn name(&self) -> String {
1542        "Bigtable".to_owned()
1543    }
1544
1545    fn pool_status(&self) -> Option<deadpool::Status> {
1546        Some(self.pool.pool.status())
1547    }
1548}
1549
1550#[cfg(all(test, feature = "emulator"))]
1551mod tests {
1552
1553    //! Currently, these test rely on having a BigTable emulator running on the current machine.
1554    //! The tests presume to be able to connect to localhost:8086. See docs/bigtable.md for
1555    //! details and how to set up and initialize an emulator.
1556    //!
1557    use std::sync::Arc;
1558    use std::time::SystemTime;
1559
1560    use cadence::StatsdClient;
1561    use uuid;
1562
1563    use super::*;
1564    use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1565
1566    const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1567    const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1568    const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1569
1570    fn now() -> u64 {
1571        SystemTime::now()
1572            .duration_since(SystemTime::UNIX_EPOCH)
1573            .unwrap()
1574            .as_secs()
1575    }
1576
1577    fn new_client() -> DbResult<BigTableClientImpl> {
1578        let env_dsn = format!(
1579            "grpc://{}",
1580            std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1581        );
1582        let settings = DbSettings {
1583            // this presumes the table was created with
1584            // ```
1585            // scripts/setup_bt.sh
1586            // ```
1587            // with `message`, `router`, and `message_topic` families
1588            dsn: Some(env_dsn),
1589            db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1590                .to_string(),
1591        };
1592
1593        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1594
1595        BigTableClientImpl::new(metrics, &settings)
1596    }
1597
1598    #[test]
1599    fn escape_bytes_for_regex() {
1600        let b = b"hi";
1601        assert_eq!(escape_bytes(b), b.to_vec());
1602        assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1603        let b = b"\xe2\x80\xb3";
1604        assert_eq!(escape_bytes(b), b.to_vec());
1605        // clippy::octal-escapes rightly discourages this ("\022") in a byte literal
1606        let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1607        assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1608        let b = b"\xc0";
1609        assert_eq!(escape_bytes(b), b.to_vec());
1610        assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1611    }
1612
1613    #[actix_rt::test]
1614    async fn health_check() {
1615        let client = new_client().unwrap();
1616
1617        let result = client.health_check().await;
1618        assert!(result.is_ok());
1619        assert!(result.unwrap());
1620    }
1621
1622    /// Bigtable rejects SetCell timestamps that are not aligned to the table's
1623    /// millisecond granularity (timestamp_micros must be a multiple of 1,000).
1624    #[actix_rt::test]
1625    async fn timestamp_granularity_rejects_sub_millisecond_micros() {
1626        let client = new_client().unwrap();
1627        let uaid = gen_test_uaid();
1628        let row_key = uaid.simple().to_string();
1629        let _ = client.remove_user(&uaid).await;
1630
1631        let mut req = client.mutate_row_request(&row_key);
1632        req.mutations = vec![bigtable::Mutation {
1633            mutation: Some(bigtable::mutation::Mutation::SetCell(
1634                bigtable::mutation::SetCell {
1635                    family_name: ROUTER_FAMILY.to_owned(),
1636                    column_qualifier: b"granularity_probe".to_vec(),
1637                    timestamp_micros: 1,
1638                    value: b"x".to_vec(),
1639                },
1640            )),
1641        }];
1642
1643        assert!(
1644            client.mutate_row(req).await.is_err(),
1645            "expected granularity mismatch for timestamp_micros=1"
1646        );
1647
1648        let _ = client.remove_user(&uaid).await;
1649    }
1650
1651    /// run a gauntlet of testing. These are a bit linear because they need
1652    /// to run in sequence.
1653    #[actix_rt::test]
1654    async fn run_gauntlet() -> DbResult<()> {
1655        let client = new_client()?;
1656
1657        let connected_at = ms_since_epoch();
1658
1659        let uaid = Uuid::parse_str(TEST_USER).unwrap();
1660        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1661        let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1662
1663        let node_id = "test_node".to_owned();
1664
1665        // purge the user record if it exists.
1666        let _ = client.remove_user(&uaid).await;
1667
1668        let test_user = User {
1669            uaid,
1670            router_type: "webpush".to_owned(),
1671            connected_at,
1672            router_data: None,
1673            node_id: Some(node_id.clone()),
1674            ..Default::default()
1675        };
1676
1677        // purge the old user (if present)
1678        // in case a prior test failed for whatever reason.
1679        let _ = client.remove_user(&uaid).await;
1680
1681        // can we add the user?
1682        client.add_user(&test_user).await?;
1683        let fetched = client.get_user(&uaid).await?;
1684        assert!(fetched.is_some());
1685        let fetched = fetched.unwrap();
1686        assert_eq!(fetched.router_type, "webpush".to_owned());
1687
1688        // Simulate a connected_at occuring before the following writes
1689        let connected_at = ms_since_epoch();
1690
1691        // can we add channels?
1692        client.add_channel(&uaid, &chid).await?;
1693        let channels = client.get_channels(&uaid).await?;
1694        assert!(channels.contains(&chid));
1695
1696        // can we add lots of channels?
1697        let mut new_channels: HashSet<Uuid> = HashSet::new();
1698        new_channels.insert(chid);
1699        for _ in 1..10 {
1700            new_channels.insert(uuid::Uuid::new_v4());
1701        }
1702        let chid_to_remove = uuid::Uuid::new_v4();
1703        new_channels.insert(chid_to_remove);
1704        client.add_channels(&uaid, new_channels.clone()).await?;
1705        let channels = client.get_channels(&uaid).await?;
1706        assert_eq!(channels, new_channels);
1707
1708        // can we remove a channel?
1709        assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1710        assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1711        new_channels.remove(&chid_to_remove);
1712        let channels = client.get_channels(&uaid).await?;
1713        assert_eq!(channels, new_channels);
1714
1715        // now ensure that we can update a user that's after the time we set
1716        // prior. first ensure that we can't update a user that's before the
1717        // time we set prior to the last write
1718        let mut updated = User {
1719            connected_at,
1720            ..test_user.clone()
1721        };
1722        let result = client.update_user(&mut updated).await;
1723        assert!(result.is_ok());
1724        assert!(!result.unwrap());
1725
1726        // Make sure that the `connected_at` wasn't modified
1727        let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1728        assert_eq!(fetched.connected_at, fetched2.connected_at);
1729
1730        // and make sure we can update a record with a later connected_at time.
1731        let mut updated = User {
1732            connected_at: fetched.connected_at + 300,
1733            ..fetched2
1734        };
1735        let result = client.update_user(&mut updated).await;
1736        assert!(result.is_ok());
1737        assert!(result.unwrap());
1738        assert_ne!(
1739            fetched2.connected_at,
1740            client.get_user(&uaid).await?.unwrap().connected_at
1741        );
1742
1743        // can we increment the storage for the user?
1744        client
1745            .increment_storage(
1746                &fetched.uaid,
1747                SystemTime::now()
1748                    .duration_since(SystemTime::UNIX_EPOCH)
1749                    .unwrap()
1750                    .as_secs(),
1751            )
1752            .await?;
1753
1754        let test_data = "An_encrypted_pile_of_crap".to_owned();
1755        let timestamp = now();
1756        let sort_key = now();
1757        // Can we store a message?
1758        let test_notification = crate::db::Notification {
1759            channel_id: chid,
1760            version: "test".to_owned(),
1761            ttl: 300,
1762            timestamp,
1763            data: Some(test_data.clone()),
1764            sortkey_timestamp: Some(sort_key),
1765            ..Default::default()
1766        };
1767        let res = client.save_message(&uaid, test_notification.clone()).await;
1768        assert!(res.is_ok());
1769
1770        let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1771        assert_ne!(fetched.messages.len(), 0);
1772        let fm = fetched.messages.pop().unwrap();
1773        assert_eq!(fm.channel_id, test_notification.channel_id);
1774        assert_eq!(fm.data, Some(test_data));
1775
1776        // Grab all 1 of the messages that were submmited within the past 10 seconds.
1777        let fetched = client
1778            .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1779            .await?;
1780        assert_ne!(fetched.messages.len(), 0);
1781
1782        // Try grabbing a message for 10 seconds from now.
1783        let fetched = client
1784            .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1785            .await?;
1786        assert_eq!(fetched.messages.len(), 0);
1787
1788        // can we clean up our toys?
1789        assert!(
1790            client
1791                .remove_message(&uaid, &test_notification.chidmessageid())
1792                .await
1793                .is_ok()
1794        );
1795
1796        assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1797
1798        // Now, can we do all that with topic messages
1799        client.add_channel(&uaid, &topic_chid).await?;
1800        let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1801        let timestamp = now();
1802        let sort_key = now();
1803        // Can we store a message?
1804        let test_notification = crate::db::Notification {
1805            channel_id: topic_chid,
1806            version: "test".to_owned(),
1807            ttl: 300,
1808            topic: Some("topic".to_owned()),
1809            timestamp,
1810            data: Some(test_data.clone()),
1811            sortkey_timestamp: Some(sort_key),
1812            ..Default::default()
1813        };
1814        assert!(
1815            client
1816                .save_message(&uaid, test_notification.clone())
1817                .await
1818                .is_ok()
1819        );
1820
1821        let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1822        assert_ne!(fetched.messages.len(), 0);
1823        let fm = fetched.messages.pop().unwrap();
1824        assert_eq!(fm.channel_id, test_notification.channel_id);
1825        assert_eq!(fm.data, Some(test_data));
1826
1827        // Grab the message that was submmited.
1828        let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1829        assert_ne!(fetched.messages.len(), 0);
1830
1831        // can we clean up our toys?
1832        assert!(
1833            client
1834                .remove_message(&uaid, &test_notification.chidmessageid())
1835                .await
1836                .is_ok()
1837        );
1838
1839        assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1840
1841        let msgs = client
1842            .fetch_timestamp_messages(&uaid, None, 999)
1843            .await?
1844            .messages;
1845        assert!(msgs.is_empty());
1846
1847        let fetched = client.get_user(&uaid).await?.unwrap();
1848        assert!(
1849            client
1850                .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1851                .await
1852                .is_ok()
1853        );
1854        // did we remove it?
1855        let fetched = client.get_user(&uaid).await?.unwrap();
1856        assert_eq!(fetched.node_id, None);
1857
1858        assert!(client.remove_user(&uaid).await.is_ok());
1859
1860        assert!(client.get_user(&uaid).await?.is_none());
1861
1862        Ok(())
1863    }
1864
1865    #[actix_rt::test]
1866    async fn read_cells_family_id() -> DbResult<()> {
1867        let client = new_client().unwrap();
1868        let uaid = gen_test_uaid();
1869        client.remove_user(&uaid).await.unwrap();
1870
1871        let qualifier = "foo".to_owned();
1872
1873        let row_key = uaid.simple().to_string();
1874        let mut row = Row::new(row_key.clone());
1875        row.cells.insert(
1876            ROUTER_FAMILY.to_owned(),
1877            vec![cell::Cell {
1878                qualifier: qualifier.to_owned(),
1879                value: "bar".as_bytes().to_vec(),
1880                ..Default::default()
1881            }],
1882        );
1883        client.write_row(row).await.unwrap();
1884        let req = client.read_row_request(&row_key);
1885        let Some(row) = client.read_row(req).await.unwrap() else {
1886            panic!("Expected row");
1887        };
1888        assert_eq!(row.cells.len(), 1);
1889        assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1890        client.remove_user(&uaid).await
1891    }
1892
1893    #[actix_rt::test]
1894    async fn add_user_existing() {
1895        let client = new_client().unwrap();
1896        let uaid = gen_test_uaid();
1897        let user = User {
1898            uaid,
1899            ..Default::default()
1900        };
1901        client.remove_user(&uaid).await.unwrap();
1902
1903        client.add_user(&user).await.unwrap();
1904        let err = client.add_user(&user).await.unwrap_err();
1905        assert!(matches!(err, DbError::Conditional));
1906    }
1907
1908    #[actix_rt::test]
1909    async fn version_check() {
1910        let client = new_client().unwrap();
1911        let uaid = gen_test_uaid();
1912        let user = User {
1913            uaid,
1914            ..Default::default()
1915        };
1916        client.remove_user(&uaid).await.unwrap();
1917
1918        client.add_user(&user).await.unwrap();
1919        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1920        assert!(client.update_user(&mut user.clone()).await.unwrap());
1921
1922        let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1923        assert_ne!(user.version, fetched.version);
1924        // should now fail w/ a stale version
1925        assert!(!client.update_user(&mut user).await.unwrap());
1926
1927        client.remove_user(&uaid).await.unwrap();
1928    }
1929
1930    #[actix_rt::test]
1931    async fn lingering_chid_record() {
1932        let client = new_client().unwrap();
1933        let uaid = gen_test_uaid();
1934        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1935        let user = User {
1936            uaid,
1937            ..Default::default()
1938        };
1939        client.remove_user(&uaid).await.unwrap();
1940
1941        // add_channel doesn't check for the existence of a user
1942        client.add_channel(&uaid, &chid).await.unwrap();
1943
1944        // w/ chid records in the router row, get_user should treat
1945        // this as the user not existing
1946        assert!(client.get_user(&uaid).await.unwrap().is_none());
1947
1948        client.add_user(&user).await.unwrap();
1949        // get_user should have also cleaned up the chids
1950        assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1951
1952        client.remove_user(&uaid).await.unwrap();
1953    }
1954
1955    #[actix_rt::test]
1956    async fn lingering_current_timestamp() {
1957        let client = new_client().unwrap();
1958        let uaid = gen_test_uaid();
1959        client.remove_user(&uaid).await.unwrap();
1960
1961        client
1962            .increment_storage(&uaid, crate::util::sec_since_epoch())
1963            .await
1964            .unwrap();
1965        assert!(client.get_user(&uaid).await.unwrap().is_none());
1966
1967        client.remove_user(&uaid).await.unwrap();
1968    }
1969
1970    #[actix_rt::test]
1971    async fn lingering_chid_w_version_record() {
1972        let client = new_client().unwrap();
1973        let uaid = gen_test_uaid();
1974        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1975        client.remove_user(&uaid).await.unwrap();
1976
1977        client.add_channel(&uaid, &chid).await.unwrap();
1978        assert!(client.remove_channel(&uaid, &chid).await.unwrap());
1979        assert!(client.get_user(&uaid).await.unwrap().is_none());
1980
1981        client.remove_user(&uaid).await.unwrap();
1982    }
1983
1984    #[actix_rt::test]
1985    async fn channel_and_current_timestamp_ttl_updates() {
1986        let client = new_client().unwrap();
1987        let uaid = gen_test_uaid();
1988        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1989        client.remove_user(&uaid).await.unwrap();
1990
1991        // Setup a user with some channels and a current_timestamp
1992        let user = User {
1993            uaid,
1994            ..Default::default()
1995        };
1996        client.add_user(&user).await.unwrap();
1997
1998        client.add_channel(&uaid, &chid).await.unwrap();
1999        client
2000            .add_channel(&uaid, &uuid::Uuid::new_v4())
2001            .await
2002            .unwrap();
2003
2004        client
2005            .increment_storage(
2006                &uaid,
2007                SystemTime::now()
2008                    .duration_since(SystemTime::UNIX_EPOCH)
2009                    .unwrap()
2010                    .as_secs(),
2011            )
2012            .await
2013            .unwrap();
2014
2015        let req = client.read_row_request(&uaid.as_simple().to_string());
2016        let Some(mut row) = client.read_row(req).await.unwrap() else {
2017            panic!("Expected row");
2018        };
2019
2020        // Ensure the initial cell expiry (timestamp) of all the cells
2021        // in the row has been updated
2022        let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
2023        for mut cells in row.cells.into_values() {
2024            let Some(cell) = cells.pop() else {
2025                continue;
2026            };
2027            assert!(
2028                cell.timestamp >= ca_expiry,
2029                "{} cell timestamp should >= connected_at's",
2030                cell.qualifier
2031            );
2032        }
2033
2034        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
2035
2036        // Quick nap to make sure that the ca_expiry values are different.
2037        tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
2038        client.update_user(&mut user).await.unwrap();
2039
2040        // Ensure update_user updated the expiry (timestamp) of every cell in the row
2041        let req = client.read_row_request(&uaid.as_simple().to_string());
2042        let Some(mut row) = client.read_row(req).await.unwrap() else {
2043            panic!("Expected row");
2044        };
2045
2046        let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
2047
2048        assert!(ca_expiry2 > ca_expiry);
2049
2050        for mut cells in row.cells.into_values() {
2051            let Some(cell) = cells.pop() else {
2052                continue;
2053            };
2054            assert!(
2055                cell.timestamp >= ca_expiry2,
2056                "{} cell timestamp expiry should exceed connected_at's",
2057                cell.qualifier
2058            );
2059        }
2060
2061        client.remove_user(&uaid).await.unwrap();
2062    }
2063}