autopush_common/db/bigtable/bigtable_client/
mod.rs

1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8
9use again::RetryPolicy;
10use async_trait::async_trait;
11use cadence::StatsdClient;
12#[cfg(feature = "reliable_report")]
13use chrono::TimeDelta;
14use futures_util::StreamExt;
15use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
16use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
17use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
18use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
19use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
20use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
21use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
22use protobuf::RepeatedField;
23use serde_json::{from_str, json};
24use uuid::Uuid;
25
26use crate::db::{
27    client::{DbClient, FetchMessageResponse},
28    error::{DbError, DbResult},
29    models::RangeKey,
30    DbSettings, Notification, User, USER_RECORD_VERSION,
31};
32use crate::metric_name::MetricName;
33use crate::metrics::StatsdClientExt;
34use crate::MAX_ROUTER_TTL_SECS;
35
36pub use self::metadata::MetadataBuilder;
37use self::row::{Row, RowCells};
38use super::pool::BigTablePool;
39use super::BigTableDbSettings;
40
41pub mod cell;
42pub mod error;
43pub(crate) mod merge;
44pub mod metadata;
45pub mod row;
46
47// these are normally Vec<u8>
48pub type RowKey = String;
49
50// These are more for code clarity than functional types.
51// Rust will happily swap between the two in any case.
52// See [super::row::Row] for discussion about how these
53// are overloaded in order to simplify fetching data.
54pub type Qualifier = String;
55pub type FamilyId = String;
56
57const ROUTER_FAMILY: &str = "router";
58const MESSAGE_FAMILY: &str = "message"; // The default family for messages
59const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
60#[cfg(feature = "reliable_report")]
61const RELIABLE_LOG_FAMILY: &str = "reliability";
62#[cfg(feature = "reliable_report")]
63/// The maximum TTL for reliability logging (60 days).
64/// /// In most use cases, converted to seconds through .num_seconds().
65pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
66
67pub(crate) const RETRY_COUNT: usize = 5;
68
69/// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently.
70// TODO:Should we create something similar for ChannelID?
71struct Uaid(Uuid);
72
73impl Display for Uaid {
74    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75        write!(f, "{}", self.0.as_simple())
76    }
77}
78
79impl From<Uaid> for String {
80    fn from(uaid: Uaid) -> String {
81        uaid.0.as_simple().to_string()
82    }
83}
84
85#[derive(Clone)]
86/// Wrapper for the BigTable connection
87pub struct BigTableClientImpl {
88    pub(crate) settings: BigTableDbSettings,
89    /// Metrics client
90    metrics: Arc<StatsdClient>,
91    /// Connection Channel (used for alternate calls)
92    pool: BigTablePool,
93    metadata: Metadata,
94    admin_metadata: Metadata,
95}
96
97/// Return a a RowFilter matching the GC policy of the router Column Family
98fn router_gc_policy_filter() -> data::RowFilter {
99    let mut latest_cell_filter = data::RowFilter::default();
100    latest_cell_filter.set_cells_per_column_limit_filter(1);
101    latest_cell_filter
102}
103
104/// Return a chain of RowFilters matching the GC policy of the message Column
105/// Families
106fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
107    let mut timestamp_filter = data::RowFilter::default();
108    let bt_now: i64 = SystemTime::now()
109        .duration_since(SystemTime::UNIX_EPOCH)
110        .map_err(error::BigTableError::WriteTime)?
111        .as_millis() as i64;
112    let mut range_filter = data::TimestampRange::default();
113    range_filter.set_start_timestamp_micros(bt_now * 1000);
114    timestamp_filter.set_timestamp_range_filter(range_filter);
115
116    Ok(vec![router_gc_policy_filter(), timestamp_filter])
117}
118
119/// Return a Column family regex RowFilter
120fn family_filter(regex: String) -> data::RowFilter {
121    let mut filter = data::RowFilter::default();
122    filter.set_family_name_regex_filter(regex);
123    filter
124}
125
126/// Escape bytes for RE values
127///
128/// Based off google-re2/perl's quotemeta function
129fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
130    let mut vec = Vec::with_capacity(bytes.len() * 2);
131    for &b in bytes {
132        if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
133            if b == b'\0' {
134                // Special handling for null: Note that this special handling
135                // is not strictly required for RE2, but this quoting is
136                // required for other regexp libraries such as PCRE.
137                // Can't use "\\0" since the next character might be a digit.
138                vec.extend("\\x00".as_bytes());
139                continue;
140            }
141            vec.push(b'\\');
142        }
143        vec.push(b);
144    }
145    vec
146}
147
148/// Return a chain of RowFilters limiting to a match of the specified
149/// `version`'s column value
150fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
151    let mut cq_filter = data::RowFilter::default();
152    cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());
153
154    let mut value_filter = data::RowFilter::default();
155    value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));
156
157    vec![
158        family_filter(format!("^{ROUTER_FAMILY}$")),
159        cq_filter,
160        value_filter,
161    ]
162}
163
164/// Return a newly generated `version` column `Cell`
165fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
166    cell::Cell {
167        qualifier: "version".to_owned(),
168        value: Uuid::new_v4().into(),
169        timestamp,
170        ..Default::default()
171    }
172}
173
174/// Return a RowFilter chain from multiple RowFilters
175fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
176    let mut chain = RowFilter_Chain::default();
177    chain.set_filters(filters.into());
178    let mut filter = RowFilter::default();
179    filter.set_chain(chain);
180    filter
181}
182
183/// Return a ReadRowsRequest against table for a given row key
184fn read_row_request(
185    table_name: &str,
186    app_profile_id: &str,
187    row_key: &str,
188) -> bigtable::ReadRowsRequest {
189    let mut req = bigtable::ReadRowsRequest::default();
190    req.set_table_name(table_name.to_owned());
191    req.set_app_profile_id(app_profile_id.to_owned());
192
193    let mut row_keys = RepeatedField::default();
194    row_keys.push(row_key.as_bytes().to_vec());
195    let mut row_set = data::RowSet::default();
196    row_set.set_row_keys(row_keys);
197    req.set_rows(row_set);
198
199    req
200}
201
202fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
203    let v: [u8; 8] = value
204        .try_into()
205        .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
206    Ok(u64::from_be_bytes(v))
207}
208
209fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
210    String::from_utf8(value).map_err(|e| {
211        debug!("🉑 cannot read string {}: {:?}", name, e);
212        DbError::DeserializeString(name.to_owned())
213    })
214}
215
216/// Parse the "set" (see [DbClient::add_channels]) of channel ids in a bigtable Row.
217///
218/// Cells should solely contain the set of channels otherwise an Error is returned.
219fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
220    let mut result = HashSet::new();
221    for cells in cells.values() {
222        let Some(cell) = cells.last() else {
223            continue;
224        };
225        let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
226            return Err(DbError::Integrity(
227                "get_channels expected: chid:<chid>".to_owned(),
228                None,
229            ));
230        };
231        result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
232    }
233    Ok(result)
234}
235
236/// Convert the [HashSet] of channel ids to cell entries for a bigtable Row
237fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
238    let channels = channels.into_owned();
239    let mut cells = Vec::with_capacity(channels.len().min(100_000));
240    for (i, channel_id) in channels.into_iter().enumerate() {
241        // There is a limit of 100,000 mutations per batch for bigtable.
242        // https://cloud.google.com/bigtable/quotas
243        // If you have 100,000 channels, you have too many.
244        if i >= 100_000 {
245            break;
246        }
247        cells.push(cell::Cell {
248            qualifier: format!("chid:{}", channel_id.as_hyphenated()),
249            timestamp: expiry,
250            ..Default::default()
251        });
252    }
253    cells
254}
255
256pub fn retry_policy(max: usize) -> RetryPolicy {
257    RetryPolicy::default()
258        .with_max_retries(max)
259        .with_jitter(true)
260}
261
262fn retryable_internal_err(status: &RpcStatus) -> bool {
263    match status.code() {
264        RpcStatusCode::UNKNOWN => status
265            .message()
266            .eq_ignore_ascii_case("error occurred when fetching oauth2 token."),
267        RpcStatusCode::INTERNAL => [
268            "rst_stream",
269            "rst stream",
270            "received unexpected eos on data frame from server",
271        ]
272        .contains(&status.message().to_lowercase().as_str()),
273        RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
274        _ => false,
275    }
276}
277
278pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
279    let mut metric = metrics
280        .incr_with_tags(MetricName::DatabaseRetry)
281        .with_tag("error", err_type)
282        .with_tag("type", "bigtable");
283    if let Some(code) = code {
284        metric = metric.with_tag("code", code);
285    }
286    metric.send();
287}
288
289pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool + '_ {
290    move |err| {
291        debug!("🉑 Checking grpcio::Error...{err}");
292        match err {
293            grpcio::Error::RpcFailure(status) => {
294                info!("GRPC Failure :{:?}", status);
295                let retry = retryable_internal_err(status);
296                if retry {
297                    metric(metrics, "RpcFailure", Some(&status.code().to_string()));
298                }
299                retry
300            }
301            grpcio::Error::BindFail(_) => {
302                metric(metrics, "BindFail", None);
303                true
304            }
305            // The parameter here is a [grpcio_sys::grpc_call_error] enum
306            // Not all of these are retryable.
307            grpcio::Error::CallFailure(grpc_call_status) => {
308                let retry = grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR;
309                if retry {
310                    metric(
311                        metrics,
312                        "CallFailure",
313                        Some(&format!("{grpc_call_status:?}")),
314                    );
315                }
316                retry
317            }
318            _ => false,
319        }
320    }
321}
322
323pub fn retryable_bt_err(
324    metrics: &Arc<StatsdClient>,
325) -> impl Fn(&error::BigTableError) -> bool + '_ {
326    move |err| {
327        debug!("🉑 Checking BigTableError...{err}");
328        match err {
329            error::BigTableError::InvalidRowResponse(e)
330            | error::BigTableError::Read(e)
331            | error::BigTableError::Write(e)
332            | error::BigTableError::GRPC(e) => retryable_grpcio_err(metrics)(e),
333            _ => false,
334        }
335    }
336}
337
338/// Determine if a router record is "incomplete" (doesn't include [User]
339/// columns):
340///
341/// They can be incomplete for a couple reasons:
342///
343/// 1) A migration code bug caused a few incomplete migrations where
344///    `add_channels` and `increment_storage` calls occurred when the migration's
345///    initial `add_user` was never completed:
346///    https://github.com/mozilla-services/autopush-rs/pull/640
347///
348/// 2) When router TTLs are eventually enabled: `add_channel` and
349///    `increment_storage` can write cells with later expiry times than the other
350///    router cells
351fn is_incomplete_router_record(cells: &RowCells) -> bool {
352    cells
353        .keys()
354        .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
355}
356
357fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
358    ::grpcio::CallOption::default().headers(metadata)
359}
360
361/// Connect to a BigTable storage model.
362///
363/// BigTable is available via the Google Console, and is a schema less storage system.
364///
365/// The `db_dsn` string should be in the form of
366/// `grpc://{BigTableEndpoint}`
367///
368/// The settings contains the `table_name` which is the GRPC path to the data.
369/// (e.g. `projects/{project_id}/instances/{instance_id}/tables/{table_id}`)
370///
371/// where:
372/// _BigTableEndpoint_ is the endpoint domain to use (the default is `bigtable.googleapis.com`) See
373/// [BigTable Endpoints](https://cloud.google.com/bigtable/docs/regional-endpoints) for more details.
374/// _project-id_ is the Google project identifier (see the Google developer console (e.g. 'autopush-dev'))
375/// _instance-id_ is the Google project instance, (see the Google developer console (e.g. 'development-1'))
376/// _table_id_ is the Table Name (e.g. 'autopush')
377///
378/// This will automatically bring in the default credentials specified by the `GOOGLE_APPLICATION_CREDENTIALS`
379/// environment variable.
380///
381/// NOTE: Some configurations may look for the default credential file (pointed to by
382/// `GOOGLE_APPLICATION_CREDENTIALS`) to be stored in
383/// `$HOME/.config/gcloud/application_default_credentials.json`)
384///
385impl BigTableClientImpl {
386    pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
387        // let env = Arc::new(EnvBuilder::new().build());
388        debug!("🏊 BT Pool new");
389        let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
390        info!("🉑 {:#?}", db_settings);
391        let pool = BigTablePool::new(settings, &metrics)?;
392
393        // create the metadata header blocks required by Google for accessing GRPC resources.
394        let metadata = db_settings.metadata()?;
395        let admin_metadata = db_settings.admin_metadata()?;
396        Ok(Self {
397            settings: db_settings,
398            metrics,
399            metadata,
400            admin_metadata,
401            pool,
402        })
403    }
404
405    fn router_ttl(&self) -> Duration {
406        self.settings
407            .max_router_ttl
408            .unwrap_or(Duration::from_secs(MAX_ROUTER_TTL_SECS))
409    }
410
411    /// Spawn a task to periodically evict idle connections
412    pub fn spawn_sweeper(&self, interval: Duration) {
413        self.pool.spawn_sweeper(interval);
414    }
415
416    /// Return a ReadRowsRequest for a given row key
417    fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
418        read_row_request(
419            &self.settings.table_name,
420            &self.settings.app_profile_id,
421            row_key,
422        )
423    }
424
425    /// Return a MutateRowRequest for a given row key
426    fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
427        let mut req = bigtable::MutateRowRequest::default();
428        req.set_table_name(self.settings.table_name.clone());
429        req.set_app_profile_id(self.settings.app_profile_id.clone());
430        req.set_row_key(row_key.as_bytes().to_vec());
431        req
432    }
433
434    /// Return a CheckAndMutateRowRequest for a given row key
435    fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
436        let mut req = bigtable::CheckAndMutateRowRequest::default();
437        req.set_table_name(self.settings.table_name.clone());
438        req.set_app_profile_id(self.settings.app_profile_id.clone());
439        req.set_row_key(row_key.as_bytes().to_vec());
440        req
441    }
442
443    /// Read a given row from the row key.
444    async fn mutate_row(
445        &self,
446        req: bigtable::MutateRowRequest,
447    ) -> Result<(), error::BigTableError> {
448        let bigtable = self.pool.get().await?;
449        retry_policy(self.settings.retry_count)
450            .retry_if(
451                || async {
452                    bigtable
453                        .conn
454                        .mutate_row_opt(&req, call_opts(self.metadata.clone()))
455                },
456                retryable_grpcio_err(&self.metrics),
457            )
458            .await
459            .map_err(error::BigTableError::Write)?;
460        Ok(())
461    }
462
463    /// Perform a MutateRowsRequest
464    #[allow(unused)]
465    async fn mutate_rows(
466        &self,
467        req: bigtable::MutateRowsRequest,
468    ) -> Result<(), error::BigTableError> {
469        let bigtable = self.pool.get().await?;
470        // ClientSStreamReceiver will cancel an operation if it's dropped before it's done.
471        let resp = retry_policy(self.settings.retry_count)
472            .retry_if(
473                || async {
474                    bigtable
475                        .conn
476                        .mutate_rows_opt(&req, call_opts(self.metadata.clone()))
477                },
478                retryable_grpcio_err(&self.metrics),
479            )
480            .await
481            .map_err(error::BigTableError::Write)?;
482
483        // Scan the returned stream looking for errors.
484        // As I understand, the returned stream contains chunked MutateRowsResponse structs. Each
485        // struct contains the result of the row mutation, and contains a `status` (non-zero on error)
486        // and an optional message string (empty if none).
487        // The structure also contains an overall `status` but that does not appear to be exposed.
488        // Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html
489        let mut stream = Box::pin(resp);
490        let mut cnt = 0;
491        loop {
492            let (result, remainder) = StreamExt::into_future(stream).await;
493            if let Some(result) = result {
494                debug!("🎏 Result block: {}", cnt);
495                match result {
496                    Ok(r) => {
497                        for e in r.get_entries() {
498                            if e.has_status() {
499                                let status = e.get_status();
500                                // See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
501                                let code = error::MutateRowStatus::from(status.get_code());
502                                if !code.is_ok() {
503                                    return Err(error::BigTableError::Status(
504                                        code,
505                                        status.get_message().to_owned(),
506                                    ));
507                                }
508                                debug!("🎏 Response: {} OK", e.index);
509                            }
510                        }
511                    }
512                    Err(e) => return Err(error::BigTableError::Write(e)),
513                };
514                cnt += 1;
515            } else {
516                debug!("🎏 Done!");
517                break;
518            }
519            stream = remainder;
520        }
521
522        Ok(())
523    }
524
525    /// Read one row for the [ReadRowsRequest] (assuming only a single row was requested).
526    async fn read_row(
527        &self,
528        req: bigtable::ReadRowsRequest,
529    ) -> Result<Option<row::Row>, error::BigTableError> {
530        let mut rows = self.read_rows(req).await?;
531        Ok(rows.pop_first().map(|(_, v)| v))
532    }
533
534    /// Take a big table ReadRowsRequest (containing the keys and filters) and return a set of row data indexed by row key.
535    ///
536    ///
537    async fn read_rows(
538        &self,
539        req: ReadRowsRequest,
540    ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
541        let bigtable = self.pool.get().await?;
542        let resp = retry_policy(self.settings.retry_count)
543            .retry_if(
544                || async {
545                    let resp: grpcio::ClientSStreamReceiver<bigtable::ReadRowsResponse> = bigtable
546                        .conn
547                        .read_rows_opt(&req, call_opts(self.metadata.clone()))
548                        .map_err(error::BigTableError::Read)?;
549                    merge::RowMerger::process_chunks(resp).await
550                },
551                retryable_bt_err(&self.metrics),
552            )
553            .await?;
554        Ok(resp)
555    }
556
557    /// write a given row.
558    ///
559    /// there's also `.mutate_rows` which I presume allows multiple.
560    async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
561        let mut req = self.mutate_row_request(&row.row_key);
562        // compile the mutations.
563        // It's possible to do a lot here, including altering in process
564        // mutations, clearing them, etc. It's all up for grabs until we commit
565        // below. For now, let's just presume a write and be done.
566        let mutations = self.get_mutations(row.cells)?;
567        req.set_mutations(mutations);
568        self.mutate_row(req).await?;
569        Ok(())
570    }
571
572    /// Compile the list of mutations for this row.
573    fn get_mutations(
574        &self,
575        cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
576    ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
577        let mut mutations = protobuf::RepeatedField::default();
578        for (family_id, cells) in cells {
579            for cell in cells {
580                let mut mutation = data::Mutation::default();
581                let mut set_cell = data::Mutation_SetCell::default();
582                let timestamp = cell
583                    .timestamp
584                    .duration_since(SystemTime::UNIX_EPOCH)
585                    .map_err(error::BigTableError::WriteTime)?;
586                set_cell.family_name.clone_from(&family_id);
587                set_cell.set_column_qualifier(cell.qualifier.clone().into_bytes());
588                set_cell.set_value(cell.value);
589                // Yes, this is passing milli bounded time as a micro. Otherwise I get
590                // a `Timestamp granularity mismatch` error
591                set_cell.set_timestamp_micros((timestamp.as_millis() * 1000) as i64);
592                debug!("🉑 expiring in {:?}", timestamp.as_millis());
593                mutation.set_set_cell(set_cell);
594                mutations.push(mutation);
595            }
596        }
597        Ok(mutations)
598    }
599
600    /// Write mutations if the row meets a condition specified by the filter.
601    ///
602    /// Mutations can be applied either when the filter matches (state `true`)
603    /// or doesn't match (state `false`).
604    ///
605    /// Returns whether the filter matched records (which indicates whether the
606    /// mutations were applied, depending on the state)
607    async fn check_and_mutate_row(
608        &self,
609        row: row::Row,
610        filter: RowFilter,
611        state: bool,
612    ) -> Result<bool, error::BigTableError> {
613        let mut req = self.check_and_mutate_row_request(&row.row_key);
614        let mutations = self.get_mutations(row.cells)?;
615        req.set_predicate_filter(filter);
616        if state {
617            req.set_true_mutations(mutations);
618        } else {
619            req.set_false_mutations(mutations);
620        }
621        self.check_and_mutate(req).await
622    }
623
624    async fn check_and_mutate(
625        &self,
626        req: bigtable::CheckAndMutateRowRequest,
627    ) -> Result<bool, error::BigTableError> {
628        let bigtable = self.pool.get().await?;
629        let resp = retry_policy(self.settings.retry_count)
630            .retry_if(
631                || async {
632                    // Note: check_and_mutate_row_async may return before the row
633                    // is written, which can cause race conditions for reads
634                    bigtable
635                        .conn
636                        .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
637                },
638                retryable_grpcio_err(&self.metrics),
639            )
640            .await
641            .map_err(error::BigTableError::Write)?;
642        debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
643        Ok(resp.get_predicate_matched())
644    }
645
646    fn get_delete_mutations(
647        &self,
648        family: &str,
649        column_names: &[&str],
650        time_range: Option<&data::TimestampRange>,
651    ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
652        let mut mutations = protobuf::RepeatedField::default();
653        for column in column_names {
654            let mut mutation = data::Mutation::default();
655            // Mutation_DeleteFromRow -- Delete all cells for a given row.
656            // Mutation_DeleteFromFamily -- Delete all cells from a family for a given row.
657            // Mutation_DeleteFromColumn -- Delete all cells from a column name for a given row, restricted by timestamp range.
658            let mut del_cell = data::Mutation_DeleteFromColumn::default();
659            del_cell.set_family_name(family.to_owned());
660            del_cell.set_column_qualifier(column.as_bytes().to_vec());
661            if let Some(range) = time_range {
662                del_cell.set_time_range(range.clone());
663            }
664            mutation.set_delete_from_column(del_cell);
665            mutations.push(mutation);
666        }
667        Ok(mutations)
668    }
669
670    #[allow(unused)]
671    /// Delete all cell data from the specified columns with the optional time range.
672    #[allow(unused)]
673    async fn delete_cells(
674        &self,
675        row_key: &str,
676        family: &str,
677        column_names: &[&str],
678        time_range: Option<&data::TimestampRange>,
679    ) -> Result<(), error::BigTableError> {
680        let mut req = self.mutate_row_request(row_key);
681        req.set_mutations(self.get_delete_mutations(family, column_names, time_range)?);
682        self.mutate_row(req).await
683    }
684
685    /// Delete all the cells for the given row. NOTE: This will drop the row.
686    async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
687        let mut req = self.mutate_row_request(row_key);
688        let mut mutations = protobuf::RepeatedField::default();
689        let mut mutation = data::Mutation::default();
690        mutation.set_delete_from_row(data::Mutation_DeleteFromRow::default());
691        mutations.push(mutation);
692        req.set_mutations(mutations);
693        self.mutate_row(req).await
694    }
695
696    /// This uses the admin interface to drop row ranges.
697    /// This will drop ALL data associated with these rows.
698    /// Note that deletion may take up to a week to occur.
699    /// see https://cloud.google.com/php/docs/reference/cloud-bigtable/latest/Admin.V2.DropRowRangeRequest
700    #[allow(unused)]
701    async fn delete_rows(&self, row_key: &str) -> Result<bool, error::BigTableError> {
702        let admin = BigtableTableAdminClient::new(self.pool.get_channel()?);
703        let mut req = DropRowRangeRequest::new();
704        req.set_name(self.settings.table_name.clone());
705        req.set_row_key_prefix(row_key.as_bytes().to_vec());
706
707        admin
708            .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone()))
709            .map_err(|e| {
710                error!("{:?}", e);
711                error::BigTableError::Admin(
712                    format!(
713                        "Could not send delete command for {}",
714                        &self.settings.table_name
715                    ),
716                    Some(e.to_string()),
717                )
718            })?
719            .await
720            .map_err(|e| {
721                error!("post await: {:?}", e);
722                error::BigTableError::Admin(
723                    format!(
724                        "Could not delete data from table {}",
725                        &self.settings.table_name
726                    ),
727                    Some(e.to_string()),
728                )
729            })?;
730
731        Ok(true)
732    }
733
734    fn rows_to_notifications(
735        &self,
736        rows: BTreeMap<String, Row>,
737    ) -> Result<Vec<Notification>, DbError> {
738        rows.into_iter()
739            .map(|(row_key, row)| self.row_to_notification(&row_key, row))
740            .collect()
741    }
742
743    fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
744        let Some((_, chidmessageid)) = row_key.split_once('#') else {
745            return Err(DbError::Integrity(
746                "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
747                None,
748            ));
749        };
750        let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
751            DbError::Integrity(
752                format!("rows_to_notification expected chidmessageid: {e}"),
753                None,
754            )
755        })?;
756
757        // Create from the known, required fields.
758        let mut notif = Notification {
759            channel_id: range_key.channel_id,
760            topic: range_key.topic,
761            sortkey_timestamp: range_key.sortkey_timestamp,
762            version: to_string(row.take_required_cell("version")?.value, "version")?,
763            ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
764            timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
765            ..Default::default()
766        };
767
768        // Backfill the Optional fields
769        if let Some(cell) = row.take_cell("data") {
770            notif.data = Some(to_string(cell.value, "data")?);
771        }
772        #[cfg(feature = "reliable_report")]
773        {
774            if let Some(cell) = row.take_cell("reliability_id") {
775                notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
776            }
777            if let Some(cell) = row.take_cell("reliable_state") {
778                notif.reliable_state = Some(
779                    crate::reliability::ReliabilityState::from_str(&to_string(
780                        cell.value,
781                        "reliable_state",
782                    )?)
783                    .map_err(|e| {
784                        DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
785                    })?,
786                );
787            }
788        }
789        if let Some(cell) = row.take_cell("headers") {
790            notif.headers = Some(
791                serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
792                    .map_err(|e| DbError::Serialization(e.to_string()))?,
793            );
794        }
795        #[cfg(feature = "reliable_report")]
796        if let Some(cell) = row.take_cell("reliability_id") {
797            trace!("🚣  Is reliable");
798            notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
799        }
800
801        trace!("🚣  Deserialized message row: {:?}", &notif);
802        Ok(notif)
803    }
804
805    /// Return a Row for writing from a [User] and a `version`
806    ///
807    /// `version` is specified as an argument (ignoring [User::version]) so
808    /// that [update_user] may specify a new version to write before modifying
809    /// the [User] struct
810    fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
811        let row_key = user.uaid.simple().to_string();
812        let mut row = Row::new(row_key);
813        let expiry = std::time::SystemTime::now() + self.router_ttl();
814
815        let mut cells: Vec<cell::Cell> = vec![
816            cell::Cell {
817                qualifier: "connected_at".to_owned(),
818                value: user.connected_at.to_be_bytes().to_vec(),
819                timestamp: expiry,
820                ..Default::default()
821            },
822            cell::Cell {
823                qualifier: "router_type".to_owned(),
824                value: user.router_type.clone().into_bytes(),
825                timestamp: expiry,
826                ..Default::default()
827            },
828            cell::Cell {
829                qualifier: "record_version".to_owned(),
830                value: user
831                    .record_version
832                    .unwrap_or(USER_RECORD_VERSION)
833                    .to_be_bytes()
834                    .to_vec(),
835                timestamp: expiry,
836                ..Default::default()
837            },
838            cell::Cell {
839                qualifier: "version".to_owned(),
840                value: (*version).into(),
841                timestamp: expiry,
842                ..Default::default()
843            },
844        ];
845
846        if let Some(router_data) = &user.router_data {
847            cells.push(cell::Cell {
848                qualifier: "router_data".to_owned(),
849                value: json!(router_data).to_string().as_bytes().to_vec(),
850                timestamp: expiry,
851                ..Default::default()
852            });
853        };
854        if let Some(current_timestamp) = user.current_timestamp {
855            cells.push(cell::Cell {
856                qualifier: "current_timestamp".to_owned(),
857                value: current_timestamp.to_be_bytes().to_vec(),
858                timestamp: expiry,
859                ..Default::default()
860            });
861        };
862        if let Some(node_id) = &user.node_id {
863            cells.push(cell::Cell {
864                qualifier: "node_id".to_owned(),
865                value: node_id.as_bytes().to_vec(),
866                timestamp: expiry,
867                ..Default::default()
868            });
869        };
870
871        cells.extend(channels_to_cells(
872            Cow::Borrowed(&user.priv_channels),
873            expiry,
874        ));
875
876        row.add_cells(ROUTER_FAMILY, cells);
877        row
878    }
879}
880
881#[derive(Clone)]
882pub struct BigtableDb {
883    pub(super) conn: BigtableClient,
884    pub(super) health_metadata: Metadata,
885    table_name: String,
886}
887
888impl BigtableDb {
889    pub fn new(channel: Channel, health_metadata: &Metadata, table_name: &str) -> Self {
890        Self {
891            conn: BigtableClient::new(channel),
892            health_metadata: health_metadata.clone(),
893            table_name: table_name.to_owned(),
894        }
895    }
896    /// Perform a simple connectivity check. This should return no actual results
897    /// but should verify that the connection is valid. We use this for the
898    /// Recycle check as well, so it has to be fairly low in the implementation
899    /// stack.
900    ///
901    ///
902    pub async fn health_check(
903        &mut self,
904        metrics: &Arc<StatsdClient>,
905        app_profile_id: &str,
906    ) -> Result<bool, error::BigTableError> {
907        // It is recommended that we pick a random key to perform the health check. Selecting
908        // a single key for all health checks causes a "hot tablet" to arise. The `PingAndWarm`
909        // is intended to be used prior to large updates and is not recommended for use in
910        // health checks.
911        // This health check is to see if the database is present, the response is not important
912        // other than it does not return an error.
913        let random_uaid = Uuid::new_v4().simple().to_string();
914        let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
915        let mut filter = data::RowFilter::default();
916        filter.set_block_all_filter(true);
917        req.set_filter(filter);
918        let _r = retry_policy(RETRY_COUNT)
919            .retry_if(
920                || async {
921                    self.conn
922                        .read_rows_opt(&req, call_opts(self.health_metadata.clone()))
923                },
924                retryable_grpcio_err(metrics),
925            )
926            .await
927            .map_err(error::BigTableError::Read)?;
928
929        debug!("🉑 health check");
930        Ok(true)
931    }
932}
933
934#[async_trait]
935impl DbClient for BigTableClientImpl {
936    /// add user to the database
937    async fn add_user(&self, user: &User) -> DbResult<()> {
938        trace!("🉑 Adding user");
939        let Some(ref version) = user.version else {
940            return Err(DbError::General(
941                "add_user expected a user version field".to_owned(),
942            ));
943        };
944        let row = self.user_to_row(user, version);
945
946        // Only add when the user doesn't already exist
947        let mut row_key_filter = RowFilter::default();
948        row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
949        let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
950
951        if self.check_and_mutate_row(row, filter, false).await? {
952            return Err(DbError::Conditional);
953        }
954        Ok(())
955    }
956
957    /// BigTable doesn't really have the concept of an "update". You simply write the data and
958    /// the individual cells create a new version. Depending on the garbage collection rules for
959    /// the family, these can either persist or be automatically deleted.
960    ///
961    /// NOTE: This function updates the key ROUTER records for a given UAID. It does this by
962    /// calling [BigTableClientImpl::user_to_row] which creates a new row with new `cell.timestamp` values set
963    /// to now + `MAX_ROUTER_TTL`. This function is called by mobile during the daily
964    /// [autoendpoint::routes::update_token_route] handling, and by desktop
965    /// [autoconnect-ws-sm::get_or_create_user]` which is called
966    /// during the `HELLO` handler. This should be enough to ensure that the ROUTER records
967    /// are properly refreshed for "lively" clients.
968    ///
969    /// NOTE: There is some, very small, potential risk that a desktop client that can
970    /// somehow remain connected the duration of MAX_ROUTER_TTL, may be dropped as not being
971    /// "lively".
972    async fn update_user(&self, user: &mut User) -> DbResult<bool> {
973        let Some(ref version) = user.version else {
974            return Err(DbError::General(
975                "update_user expected a user version field".to_owned(),
976            ));
977        };
978
979        let mut filters = vec![router_gc_policy_filter()];
980        filters.extend(version_filter(version));
981        let filter = filter_chain(filters);
982
983        let new_version = Uuid::new_v4();
984        // Always write a newly generated version
985        let row = self.user_to_row(user, &new_version);
986
987        let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
988        user.version = Some(new_version);
989        Ok(predicate_matched)
990    }
991
992    async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
993        let row_key = uaid.as_simple().to_string();
994        let mut req = self.read_row_request(&row_key);
995        let mut filters = vec![router_gc_policy_filter()];
996        filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
997        req.set_filter(filter_chain(filters));
998        let Some(mut row) = self.read_row(req).await? else {
999            return Ok(None);
1000        };
1001
1002        trace!("🉑 Found a record for {}", row_key);
1003
1004        let connected_at_cell = match row.take_required_cell("connected_at") {
1005            Ok(cell) => cell,
1006            Err(_) => {
1007                if !is_incomplete_router_record(&row.cells) {
1008                    return Err(DbError::Integrity(
1009                        "Expected column: connected_at".to_owned(),
1010                        Some(format!("{row:#?}")),
1011                    ));
1012                }
1013                // Special case incomplete records: they're equivalent to no
1014                // user exists. Incompletes caused by the migration bug in #640
1015                // will have their migration re-triggered by returning None:
1016                // https://github.com/mozilla-services/autopush-rs/pull/640
1017                trace!("🉑 Dropping an incomplete user record for {}", row_key);
1018                self.metrics
1019                    .incr_with_tags(MetricName::DatabaseDropUser)
1020                    .with_tag("reason", "incomplete_record")
1021                    .send();
1022                self.remove_user(uaid).await?;
1023                return Ok(None);
1024            }
1025        };
1026
1027        let mut result = User {
1028            uaid: *uaid,
1029            connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1030            router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1031            record_version: Some(to_u64(
1032                row.take_required_cell("record_version")?.value,
1033                "record_version",
1034            )?),
1035            version: Some(
1036                row.take_required_cell("version")?
1037                    .value
1038                    .try_into()
1039                    .map_err(|e| {
1040                        DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1041                    })?,
1042            ),
1043            ..Default::default()
1044        };
1045
1046        if let Some(cell) = row.take_cell("router_data") {
1047            result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1048                DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1049            })?;
1050        }
1051
1052        if let Some(cell) = row.take_cell("node_id") {
1053            result.node_id = Some(to_string(cell.value, "node_id")?);
1054        }
1055
1056        if let Some(cell) = row.take_cell("current_timestamp") {
1057            result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1058        }
1059
1060        // Read the channels last, after removal of all non channel cells
1061        result.priv_channels = channels_from_cells(&row.cells)?;
1062
1063        Ok(Some(result))
1064    }
1065
1066    async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1067        let row_key = uaid.simple().to_string();
1068        self.delete_row(&row_key).await?;
1069        Ok(())
1070    }
1071
1072    async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1073        let channels = HashSet::from_iter([channel_id.to_owned()]);
1074        self.add_channels(uaid, channels).await
1075    }
1076
1077    /// Add channels in bulk (used mostly during migration)
1078    ///
1079    async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1080        // channel_ids are stored as a set within one Bigtable row
1081        //
1082        // Bigtable allows "millions of columns in a table, as long as no row
1083        // exceeds the maximum limit of 256 MB per row" enabling the use of
1084        // column qualifiers as data.
1085        //
1086        // The "set" of channel_ids consists of column qualifiers named
1087        // "chid:<chid value>" as set member entries (with their cell values
1088        // being a single 0 byte).
1089        //
1090        // Storing the full set in a single row makes batch updates
1091        // (particularly to reset the GC expiry timestamps) potentially more
1092        // easy/efficient
1093        let row_key = uaid.simple().to_string();
1094        let mut row = Row::new(row_key);
1095        let expiry = std::time::SystemTime::now() + self.router_ttl();
1096
1097        // Note: updating the version column isn't necessary here because this
1098        // write only adds a new (or updates an existing) column with a 0 byte
1099        // value
1100        row.add_cells(
1101            ROUTER_FAMILY,
1102            channels_to_cells(Cow::Owned(channels), expiry),
1103        );
1104
1105        self.write_row(row).await?;
1106        Ok(())
1107    }
1108
1109    async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1110        let row_key = uaid.simple().to_string();
1111        let mut req = self.read_row_request(&row_key);
1112
1113        let mut cq_filter = data::RowFilter::default();
1114        cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());
1115        req.set_filter(filter_chain(vec![
1116            router_gc_policy_filter(),
1117            family_filter(format!("^{ROUTER_FAMILY}$")),
1118            cq_filter,
1119        ]));
1120
1121        let Some(row) = self.read_row(req).await? else {
1122            return Ok(Default::default());
1123        };
1124        channels_from_cells(&row.cells)
1125    }
1126
1127    /// Delete the channel. Does not delete its associated pending messages.
1128    async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1129        let row_key = uaid.simple().to_string();
1130        let mut req = self.check_and_mutate_row_request(&row_key);
1131
1132        // Delete the column representing the channel_id
1133        let column = format!("chid:{}", channel_id.as_hyphenated());
1134        let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1135
1136        // and write a new version cell
1137        let mut row = Row::new(row_key);
1138        let expiry = std::time::SystemTime::now() + self.router_ttl();
1139        row.cells
1140            .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1141        mutations.extend(self.get_mutations(row.cells)?);
1142
1143        // check if the channel existed/was actually removed
1144        let mut cq_filter = data::RowFilter::default();
1145        cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
1146        req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1147        req.set_true_mutations(mutations);
1148
1149        Ok(self.check_and_mutate(req).await?)
1150    }
1151
1152    /// Remove the node_id
1153    async fn remove_node_id(
1154        &self,
1155        uaid: &Uuid,
1156        _node_id: &str,
1157        _connected_at: u64,
1158        version: &Option<Uuid>,
1159    ) -> DbResult<bool> {
1160        let row_key = uaid.simple().to_string();
1161        trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1162        let Some(version) = version else {
1163            return Err(DbError::General("Expected a user version field".to_owned()));
1164        };
1165
1166        let mut req = self.check_and_mutate_row_request(&row_key);
1167
1168        let mut filters = vec![router_gc_policy_filter()];
1169        filters.extend(version_filter(version));
1170        req.set_predicate_filter(filter_chain(filters));
1171        req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);
1172
1173        Ok(self.check_and_mutate(req).await?)
1174    }
1175
1176    /// Write the notification to storage.
1177    async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1178        let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1179        debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1180        trace!(
1181            "🉑 timestamp: {:?}",
1182            &message.timestamp.to_be_bytes().to_vec()
1183        );
1184        let mut row = Row::new(row_key);
1185
1186        // Remember, `timestamp` is effectively the time to kill the message, not the
1187        // current time.
1188        // TODO: use message.expiry()
1189        let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1190        trace!(
1191            "🉑 Message Expiry {}",
1192            expiry
1193                .duration_since(SystemTime::UNIX_EPOCH)
1194                .unwrap_or_default()
1195                .as_millis()
1196        );
1197
1198        let mut cells: Vec<cell::Cell> = Vec::new();
1199
1200        let is_topic = message.topic.is_some();
1201        let family = if is_topic {
1202            MESSAGE_TOPIC_FAMILY
1203        } else {
1204            MESSAGE_FAMILY
1205        };
1206        cells.extend(vec![
1207            cell::Cell {
1208                qualifier: "ttl".to_owned(),
1209                value: message.ttl.to_be_bytes().to_vec(),
1210                timestamp: expiry,
1211                ..Default::default()
1212            },
1213            cell::Cell {
1214                qualifier: "timestamp".to_owned(),
1215                value: message.timestamp.to_be_bytes().to_vec(),
1216                timestamp: expiry,
1217                ..Default::default()
1218            },
1219            cell::Cell {
1220                qualifier: "version".to_owned(),
1221                value: message.version.into_bytes(),
1222                timestamp: expiry,
1223                ..Default::default()
1224            },
1225        ]);
1226        if let Some(headers) = message.headers {
1227            if !headers.is_empty() {
1228                cells.push(cell::Cell {
1229                    qualifier: "headers".to_owned(),
1230                    value: json!(headers).to_string().into_bytes(),
1231                    timestamp: expiry,
1232                    ..Default::default()
1233                });
1234            }
1235        }
1236        #[cfg(feature = "reliable_report")]
1237        {
1238            if let Some(reliability_id) = message.reliability_id {
1239                trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1240                cells.push(cell::Cell {
1241                    qualifier: "reliability_id".to_owned(),
1242                    value: reliability_id.into_bytes(),
1243                    timestamp: expiry,
1244                    ..Default::default()
1245                });
1246            }
1247            if let Some(reliable_state) = message.reliable_state {
1248                cells.push(cell::Cell {
1249                    qualifier: "reliable_state".to_owned(),
1250                    value: reliable_state.to_string().into_bytes(),
1251                    timestamp: expiry,
1252                    ..Default::default()
1253                });
1254            }
1255        }
1256        if let Some(data) = message.data {
1257            cells.push(cell::Cell {
1258                qualifier: "data".to_owned(),
1259                value: data.into_bytes(),
1260                timestamp: expiry,
1261                ..Default::default()
1262            });
1263        }
1264
1265        row.add_cells(family, cells);
1266        trace!("🉑 Adding row");
1267        self.write_row(row).await?;
1268
1269        self.metrics
1270            .incr_with_tags(MetricName::NotificationMessageStored)
1271            .with_tag("topic", &is_topic.to_string())
1272            .with_tag("database", &self.name())
1273            .send();
1274        Ok(())
1275    }
1276
1277    /// Save a batch of messages to the database.
1278    ///
1279    /// Currently just iterating through the list and saving one at a time. There's a bulk way
1280    /// to save messages, but there are other considerations (e.g. mutation limits)
1281    async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1282        // plate simple way of solving this:
1283        for message in messages {
1284            self.save_message(uaid, message).await?;
1285        }
1286        Ok(())
1287    }
1288
1289    /// Set the `current_timestamp` in the meta record for this user agent.
1290    ///
1291    /// This is a bit different for BigTable. Field expiration (technically cell
1292    /// expiration) is determined by the lifetime assigned to the cell once it hits
1293    /// a given date. That means you can't really extend a lifetime by adjusting a
1294    /// single field. You'd have to adjust all the cells that are in the family.
1295    /// So, we're not going to do expiration that way.
1296    ///
1297    /// That leaves the meta "current_timestamp" field. We do not purge ACK'd records,
1298    /// instead we presume that the TTL will kill them off eventually. On reads, we use
1299    /// the `current_timestamp` to determine what records to return, since we return
1300    /// records with timestamps later than `current_timestamp`.
1301    ///
1302    async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1303        let row_key = uaid.simple().to_string();
1304        debug!(
1305            "🉑 Updating {} current_timestamp:  {:?}",
1306            &row_key,
1307            timestamp.to_be_bytes().to_vec()
1308        );
1309        let expiry = std::time::SystemTime::now() + self.router_ttl();
1310        let mut row = Row::new(row_key.clone());
1311
1312        row.cells.insert(
1313            ROUTER_FAMILY.to_owned(),
1314            vec![
1315                cell::Cell {
1316                    qualifier: "current_timestamp".to_owned(),
1317                    value: timestamp.to_be_bytes().to_vec(),
1318                    timestamp: expiry,
1319                    ..Default::default()
1320                },
1321                new_version_cell(expiry),
1322            ],
1323        );
1324
1325        self.write_row(row).await?;
1326
1327        Ok(())
1328    }
1329
1330    /// Delete the notification from storage.
1331    async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1332        trace!(
1333            "🉑 attemping to delete {:?} :: {:?}",
1334            uaid.to_string(),
1335            chidmessageid
1336        );
1337        let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1338        debug!("🉑🔥 Deleting message {}", &row_key);
1339        self.delete_row(&row_key).await?;
1340        self.metrics
1341            .incr_with_tags(MetricName::NotificationMessageDeleted)
1342            .with_tag("database", &self.name())
1343            .send();
1344        Ok(())
1345    }
1346
1347    /// Return `limit` pending messages from storage. `limit=0` for all messages.
1348    async fn fetch_topic_messages(
1349        &self,
1350        uaid: &Uuid,
1351        limit: usize,
1352    ) -> DbResult<FetchMessageResponse> {
1353        let mut req = ReadRowsRequest::default();
1354        req.set_table_name(self.settings.table_name.clone());
1355        req.set_app_profile_id(self.settings.app_profile_id.clone());
1356
1357        let start_key = format!("{}#01:", uaid.simple());
1358        let end_key = format!("{}#02:", uaid.simple());
1359        let mut rows = data::RowSet::default();
1360        let mut row_range = data::RowRange::default();
1361        row_range.set_start_key_open(start_key.into_bytes());
1362        row_range.set_end_key_open(end_key.into_bytes());
1363        let mut row_ranges = RepeatedField::default();
1364        row_ranges.push(row_range);
1365        rows.set_row_ranges(row_ranges);
1366        req.set_rows(rows);
1367
1368        let mut filters = message_gc_policy_filter()?;
1369        filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1370
1371        req.set_filter(filter_chain(filters));
1372        if limit > 0 {
1373            trace!("🉑 Setting limit to {limit}");
1374            req.set_rows_limit(limit as i64);
1375        }
1376        let rows = self.read_rows(req).await?;
1377        debug!(
1378            "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1379            rows.len(),
1380            limit
1381        );
1382
1383        let messages = self.rows_to_notifications(rows)?;
1384
1385        // Note: Bigtable always returns a timestamp of None.
1386        // Under Bigtable `current_timestamp` is instead initially read
1387        // from [get_user].
1388        Ok(FetchMessageResponse {
1389            messages,
1390            timestamp: None,
1391        })
1392    }
1393
1394    /// Return `limit` messages pending for a UAID that have a sortkey_timestamp after
1395    /// what's specified. `limit=0` for all messages.
1396    async fn fetch_timestamp_messages(
1397        &self,
1398        uaid: &Uuid,
1399        timestamp: Option<u64>,
1400        limit: usize,
1401    ) -> DbResult<FetchMessageResponse> {
1402        let mut req = ReadRowsRequest::default();
1403        req.set_table_name(self.settings.table_name.clone());
1404        req.set_app_profile_id(self.settings.app_profile_id.clone());
1405
1406        let mut rows = data::RowSet::default();
1407        let mut row_range = data::RowRange::default();
1408
1409        let start_key = if let Some(ts) = timestamp {
1410            // Fetch everything after the last message with timestamp: the "z"
1411            // moves past the last message's channel_id's 1st hex digit
1412            format!("{}#02:{}z", uaid.simple(), ts)
1413        } else {
1414            format!("{}#02:", uaid.simple())
1415        };
1416        let end_key = format!("{}#03:", uaid.simple());
1417        row_range.set_start_key_open(start_key.into_bytes());
1418        row_range.set_end_key_open(end_key.into_bytes());
1419
1420        let mut row_ranges = RepeatedField::default();
1421        row_ranges.push(row_range);
1422        rows.set_row_ranges(row_ranges);
1423        req.set_rows(rows);
1424
1425        // We can fetch data and do [some remote filtering](https://cloud.google.com/bigtable/docs/filters),
1426        // unfortunately I don't think the filtering we need will be super helpful.
1427        //
1428        //
1429        /*
1430        //NOTE: if you filter on a given field, BigTable will only
1431        // return that specific field. Adding filters for the rest of
1432        // the known elements may NOT return those elements or may
1433        // cause the message to not be returned because any of
1434        // those elements are not present. It may be preferable to
1435        // therefore run two filters, one to fetch the candidate IDs
1436        // and another to fetch the content of the messages.
1437         */
1438        let mut filters = message_gc_policy_filter()?;
1439        filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1440
1441        req.set_filter(filter_chain(filters));
1442        if limit > 0 {
1443            req.set_rows_limit(limit as i64);
1444        }
1445        let rows = self.read_rows(req).await?;
1446        debug!(
1447            "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1448            timestamp,
1449            rows.len(),
1450            limit,
1451        );
1452
1453        let messages = self.rows_to_notifications(rows)?;
1454        // The timestamp of the last message read
1455        let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1456        Ok(FetchMessageResponse {
1457            messages,
1458            timestamp,
1459        })
1460    }
1461
1462    async fn health_check(&self) -> DbResult<bool> {
1463        Ok(self
1464            .pool
1465            .get()
1466            .await?
1467            .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1468            .await?)
1469    }
1470
1471    /// Returns true, because there's only one table in BigTable. We divide things up
1472    /// by `family`.
1473    async fn router_table_exists(&self) -> DbResult<bool> {
1474        Ok(true)
1475    }
1476
1477    /// Returns true, because there's only one table in BigTable. We divide things up
1478    /// by `family`.
1479    async fn message_table_exists(&self) -> DbResult<bool> {
1480        Ok(true)
1481    }
1482
1483    #[cfg(feature = "reliable_report")]
1484    async fn log_report(
1485        &self,
1486        reliability_id: &str,
1487        new_state: crate::reliability::ReliabilityState,
1488    ) -> DbResult<()> {
1489        let row_key = reliability_id.to_owned();
1490
1491        let mut row = Row::new(row_key);
1492        let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1493
1494        // Log the latest transition time for this id.
1495        let cells: Vec<cell::Cell> = vec![cell::Cell {
1496            qualifier: new_state.to_string(),
1497            value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1498            timestamp: expiry,
1499            ..Default::default()
1500        }];
1501
1502        row.add_cells(RELIABLE_LOG_FAMILY, cells);
1503
1504        self.write_row(row).await?;
1505
1506        Ok(())
1507    }
1508
1509    fn box_clone(&self) -> Box<dyn DbClient> {
1510        Box::new(self.clone())
1511    }
1512
1513    fn name(&self) -> String {
1514        "Bigtable".to_owned()
1515    }
1516
1517    fn pool_status(&self) -> Option<deadpool::Status> {
1518        Some(self.pool.pool.status())
1519    }
1520}
1521
1522#[cfg(all(test, feature = "emulator"))]
1523mod tests {
1524
1525    //! Currently, these test rely on having a BigTable emulator running on the current machine.
1526    //! The tests presume to be able to connect to localhost:8086. See docs/bigtable.md for
1527    //! details and how to set up and initialize an emulator.
1528    //!
1529    use std::sync::Arc;
1530    use std::time::SystemTime;
1531
1532    use cadence::StatsdClient;
1533    use uuid;
1534
1535    use super::*;
1536    use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1537
1538    const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1539    const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1540    const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1541
1542    fn now() -> u64 {
1543        SystemTime::now()
1544            .duration_since(SystemTime::UNIX_EPOCH)
1545            .unwrap()
1546            .as_secs()
1547    }
1548
1549    fn new_client() -> DbResult<BigTableClientImpl> {
1550        let env_dsn = format!(
1551            "grpc://{}",
1552            std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1553        );
1554        let settings = DbSettings {
1555            // this presumes the table was created with
1556            // ```
1557            // scripts/setup_bt.sh
1558            // ```
1559            // with `message`, `router`, and `message_topic` families
1560            dsn: Some(env_dsn),
1561            db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1562                .to_string(),
1563        };
1564
1565        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1566
1567        BigTableClientImpl::new(metrics, &settings)
1568    }
1569
1570    #[test]
1571    fn escape_bytes_for_regex() {
1572        let b = b"hi";
1573        assert_eq!(escape_bytes(b), b.to_vec());
1574        assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1575        let b = b"\xe2\x80\xb3";
1576        assert_eq!(escape_bytes(b), b.to_vec());
1577        // clippy::octal-escapes rightly discourages this ("\022") in a byte literal
1578        let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1579        assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1580        let b = b"\xc0";
1581        assert_eq!(escape_bytes(b), b.to_vec());
1582        assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1583    }
1584
1585    #[actix_rt::test]
1586    async fn health_check() {
1587        let client = new_client().unwrap();
1588
1589        let result = client.health_check().await;
1590        assert!(result.is_ok());
1591        assert!(result.unwrap());
1592    }
1593
1594    /// run a gauntlet of testing. These are a bit linear because they need
1595    /// to run in sequence.
1596    #[actix_rt::test]
1597    async fn run_gauntlet() -> DbResult<()> {
1598        let client = new_client()?;
1599
1600        let connected_at = ms_since_epoch();
1601
1602        let uaid = Uuid::parse_str(TEST_USER).unwrap();
1603        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1604        let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1605
1606        let node_id = "test_node".to_owned();
1607
1608        // purge the user record if it exists.
1609        let _ = client.remove_user(&uaid).await;
1610
1611        let test_user = User {
1612            uaid,
1613            router_type: "webpush".to_owned(),
1614            connected_at,
1615            router_data: None,
1616            node_id: Some(node_id.clone()),
1617            ..Default::default()
1618        };
1619
1620        // purge the old user (if present)
1621        // in case a prior test failed for whatever reason.
1622        let _ = client.remove_user(&uaid).await;
1623
1624        // can we add the user?
1625        client.add_user(&test_user).await?;
1626        let fetched = client.get_user(&uaid).await?;
1627        assert!(fetched.is_some());
1628        let fetched = fetched.unwrap();
1629        assert_eq!(fetched.router_type, "webpush".to_owned());
1630
1631        // Simulate a connected_at occuring before the following writes
1632        let connected_at = ms_since_epoch();
1633
1634        // can we add channels?
1635        client.add_channel(&uaid, &chid).await?;
1636        let channels = client.get_channels(&uaid).await?;
1637        assert!(channels.contains(&chid));
1638
1639        // can we add lots of channels?
1640        let mut new_channels: HashSet<Uuid> = HashSet::new();
1641        new_channels.insert(chid);
1642        for _ in 1..10 {
1643            new_channels.insert(uuid::Uuid::new_v4());
1644        }
1645        let chid_to_remove = uuid::Uuid::new_v4();
1646        new_channels.insert(chid_to_remove);
1647        client.add_channels(&uaid, new_channels.clone()).await?;
1648        let channels = client.get_channels(&uaid).await?;
1649        assert_eq!(channels, new_channels);
1650
1651        // can we remove a channel?
1652        assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1653        assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1654        new_channels.remove(&chid_to_remove);
1655        let channels = client.get_channels(&uaid).await?;
1656        assert_eq!(channels, new_channels);
1657
1658        // now ensure that we can update a user that's after the time we set
1659        // prior. first ensure that we can't update a user that's before the
1660        // time we set prior to the last write
1661        let mut updated = User {
1662            connected_at,
1663            ..test_user.clone()
1664        };
1665        let result = client.update_user(&mut updated).await;
1666        assert!(result.is_ok());
1667        assert!(!result.unwrap());
1668
1669        // Make sure that the `connected_at` wasn't modified
1670        let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1671        assert_eq!(fetched.connected_at, fetched2.connected_at);
1672
1673        // and make sure we can update a record with a later connected_at time.
1674        let mut updated = User {
1675            connected_at: fetched.connected_at + 300,
1676            ..fetched2
1677        };
1678        let result = client.update_user(&mut updated).await;
1679        assert!(result.is_ok());
1680        assert!(result.unwrap());
1681        assert_ne!(
1682            fetched2.connected_at,
1683            client.get_user(&uaid).await?.unwrap().connected_at
1684        );
1685
1686        // can we increment the storage for the user?
1687        client
1688            .increment_storage(
1689                &fetched.uaid,
1690                SystemTime::now()
1691                    .duration_since(SystemTime::UNIX_EPOCH)
1692                    .unwrap()
1693                    .as_secs(),
1694            )
1695            .await?;
1696
1697        let test_data = "An_encrypted_pile_of_crap".to_owned();
1698        let timestamp = now();
1699        let sort_key = now();
1700        // Can we store a message?
1701        let test_notification = crate::db::Notification {
1702            channel_id: chid,
1703            version: "test".to_owned(),
1704            ttl: 300,
1705            timestamp,
1706            data: Some(test_data.clone()),
1707            sortkey_timestamp: Some(sort_key),
1708            ..Default::default()
1709        };
1710        let res = client.save_message(&uaid, test_notification.clone()).await;
1711        assert!(res.is_ok());
1712
1713        let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1714        assert_ne!(fetched.messages.len(), 0);
1715        let fm = fetched.messages.pop().unwrap();
1716        assert_eq!(fm.channel_id, test_notification.channel_id);
1717        assert_eq!(fm.data, Some(test_data));
1718
1719        // Grab all 1 of the messages that were submmited within the past 10 seconds.
1720        let fetched = client
1721            .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1722            .await?;
1723        assert_ne!(fetched.messages.len(), 0);
1724
1725        // Try grabbing a message for 10 seconds from now.
1726        let fetched = client
1727            .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1728            .await?;
1729        assert_eq!(fetched.messages.len(), 0);
1730
1731        // can we clean up our toys?
1732        assert!(client
1733            .remove_message(&uaid, &test_notification.chidmessageid())
1734            .await
1735            .is_ok());
1736
1737        assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1738
1739        // Now, can we do all that with topic messages
1740        client.add_channel(&uaid, &topic_chid).await?;
1741        let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1742        let timestamp = now();
1743        let sort_key = now();
1744        // Can we store a message?
1745        let test_notification = crate::db::Notification {
1746            channel_id: topic_chid,
1747            version: "test".to_owned(),
1748            ttl: 300,
1749            topic: Some("topic".to_owned()),
1750            timestamp,
1751            data: Some(test_data.clone()),
1752            sortkey_timestamp: Some(sort_key),
1753            ..Default::default()
1754        };
1755        assert!(client
1756            .save_message(&uaid, test_notification.clone())
1757            .await
1758            .is_ok());
1759
1760        let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1761        assert_ne!(fetched.messages.len(), 0);
1762        let fm = fetched.messages.pop().unwrap();
1763        assert_eq!(fm.channel_id, test_notification.channel_id);
1764        assert_eq!(fm.data, Some(test_data));
1765
1766        // Grab the message that was submmited.
1767        let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1768        assert_ne!(fetched.messages.len(), 0);
1769
1770        // can we clean up our toys?
1771        assert!(client
1772            .remove_message(&uaid, &test_notification.chidmessageid())
1773            .await
1774            .is_ok());
1775
1776        assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1777
1778        let msgs = client
1779            .fetch_timestamp_messages(&uaid, None, 999)
1780            .await?
1781            .messages;
1782        assert!(msgs.is_empty());
1783
1784        let fetched = client.get_user(&uaid).await?.unwrap();
1785        assert!(client
1786            .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1787            .await
1788            .is_ok());
1789        // did we remove it?
1790        let fetched = client.get_user(&uaid).await?.unwrap();
1791        assert_eq!(fetched.node_id, None);
1792
1793        assert!(client.remove_user(&uaid).await.is_ok());
1794
1795        assert!(client.get_user(&uaid).await?.is_none());
1796
1797        Ok(())
1798    }
1799
1800    #[actix_rt::test]
1801    async fn read_cells_family_id() -> DbResult<()> {
1802        let client = new_client().unwrap();
1803        let uaid = gen_test_uaid();
1804        client.remove_user(&uaid).await.unwrap();
1805
1806        let qualifier = "foo".to_owned();
1807
1808        let row_key = uaid.simple().to_string();
1809        let mut row = Row::new(row_key.clone());
1810        row.cells.insert(
1811            ROUTER_FAMILY.to_owned(),
1812            vec![cell::Cell {
1813                qualifier: qualifier.to_owned(),
1814                value: "bar".as_bytes().to_vec(),
1815                ..Default::default()
1816            }],
1817        );
1818        client.write_row(row).await.unwrap();
1819        let req = client.read_row_request(&row_key);
1820        let Some(row) = client.read_row(req).await.unwrap() else {
1821            panic!("Expected row");
1822        };
1823        assert_eq!(row.cells.len(), 1);
1824        assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1825        client.remove_user(&uaid).await
1826    }
1827
1828    #[actix_rt::test]
1829    async fn add_user_existing() {
1830        let client = new_client().unwrap();
1831        let uaid = gen_test_uaid();
1832        let user = User {
1833            uaid,
1834            ..Default::default()
1835        };
1836        client.remove_user(&uaid).await.unwrap();
1837
1838        client.add_user(&user).await.unwrap();
1839        let err = client.add_user(&user).await.unwrap_err();
1840        assert!(matches!(err, DbError::Conditional));
1841    }
1842
1843    #[actix_rt::test]
1844    async fn version_check() {
1845        let client = new_client().unwrap();
1846        let uaid = gen_test_uaid();
1847        let user = User {
1848            uaid,
1849            ..Default::default()
1850        };
1851        client.remove_user(&uaid).await.unwrap();
1852
1853        client.add_user(&user).await.unwrap();
1854        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1855        assert!(client.update_user(&mut user.clone()).await.unwrap());
1856
1857        let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1858        assert_ne!(user.version, fetched.version);
1859        // should now fail w/ a stale version
1860        assert!(!client.update_user(&mut user).await.unwrap());
1861
1862        client.remove_user(&uaid).await.unwrap();
1863    }
1864
1865    #[actix_rt::test]
1866    async fn lingering_chid_record() {
1867        let client = new_client().unwrap();
1868        let uaid = gen_test_uaid();
1869        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1870        let user = User {
1871            uaid,
1872            ..Default::default()
1873        };
1874        client.remove_user(&uaid).await.unwrap();
1875
1876        // add_channel doesn't check for the existence of a user
1877        client.add_channel(&uaid, &chid).await.unwrap();
1878
1879        // w/ chid records in the router row, get_user should treat
1880        // this as the user not existing
1881        assert!(client.get_user(&uaid).await.unwrap().is_none());
1882
1883        client.add_user(&user).await.unwrap();
1884        // get_user should have also cleaned up the chids
1885        assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1886
1887        client.remove_user(&uaid).await.unwrap();
1888    }
1889
1890    #[actix_rt::test]
1891    async fn lingering_current_timestamp() {
1892        let client = new_client().unwrap();
1893        let uaid = gen_test_uaid();
1894        client.remove_user(&uaid).await.unwrap();
1895
1896        client
1897            .increment_storage(&uaid, crate::util::sec_since_epoch())
1898            .await
1899            .unwrap();
1900        assert!(client.get_user(&uaid).await.unwrap().is_none());
1901
1902        client.remove_user(&uaid).await.unwrap();
1903    }
1904
1905    #[actix_rt::test]
1906    async fn lingering_chid_w_version_record() {
1907        let client = new_client().unwrap();
1908        let uaid = gen_test_uaid();
1909        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1910        client.remove_user(&uaid).await.unwrap();
1911
1912        client.add_channel(&uaid, &chid).await.unwrap();
1913        assert!(client.remove_channel(&uaid, &chid).await.unwrap());
1914        assert!(client.get_user(&uaid).await.unwrap().is_none());
1915
1916        client.remove_user(&uaid).await.unwrap();
1917    }
1918
1919    #[actix_rt::test]
1920    async fn channel_and_current_timestamp_ttl_updates() {
1921        let client = new_client().unwrap();
1922        let uaid = gen_test_uaid();
1923        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1924        client.remove_user(&uaid).await.unwrap();
1925
1926        // Setup a user with some channels and a current_timestamp
1927        let user = User {
1928            uaid,
1929            ..Default::default()
1930        };
1931        client.add_user(&user).await.unwrap();
1932
1933        client.add_channel(&uaid, &chid).await.unwrap();
1934        client
1935            .add_channel(&uaid, &uuid::Uuid::new_v4())
1936            .await
1937            .unwrap();
1938
1939        client
1940            .increment_storage(
1941                &uaid,
1942                SystemTime::now()
1943                    .duration_since(SystemTime::UNIX_EPOCH)
1944                    .unwrap()
1945                    .as_secs(),
1946            )
1947            .await
1948            .unwrap();
1949
1950        let req = client.read_row_request(&uaid.as_simple().to_string());
1951        let Some(mut row) = client.read_row(req).await.unwrap() else {
1952            panic!("Expected row");
1953        };
1954
1955        // Ensure the initial cell expiry (timestamp) of all the cells
1956        // in the row has been updated
1957        let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
1958        for mut cells in row.cells.into_values() {
1959            let Some(cell) = cells.pop() else {
1960                continue;
1961            };
1962            assert!(
1963                cell.timestamp >= ca_expiry,
1964                "{} cell timestamp should >= connected_at's",
1965                cell.qualifier
1966            );
1967        }
1968
1969        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1970
1971        // Quick nap to make sure that the ca_expiry values are different.
1972        tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
1973        client.update_user(&mut user).await.unwrap();
1974
1975        // Ensure update_user updated the expiry (timestamp) of every cell in the row
1976        let req = client.read_row_request(&uaid.as_simple().to_string());
1977        let Some(mut row) = client.read_row(req).await.unwrap() else {
1978            panic!("Expected row");
1979        };
1980
1981        let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
1982
1983        assert!(ca_expiry2 > ca_expiry);
1984
1985        for mut cells in row.cells.into_values() {
1986            let Some(cell) = cells.pop() else {
1987                continue;
1988            };
1989            assert!(
1990                cell.timestamp >= ca_expiry2,
1991                "{} cell timestamp expiry should exceed connected_at's",
1992                cell.qualifier
1993            );
1994        }
1995
1996        client.remove_user(&uaid).await.unwrap();
1997    }
1998}