autopush_common/db/bigtable/bigtable_client/
mod.rs

1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8
9use again::RetryPolicy;
10use async_trait::async_trait;
11use cadence::StatsdClient;
12use chrono::TimeDelta;
13use futures_util::StreamExt;
14use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
15use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
16use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
17use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
18use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
19use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
20use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
21use protobuf::RepeatedField;
22use serde_json::{from_str, json};
23use uuid::Uuid;
24
25use crate::db::RangeKey;
26use crate::db::{
27    client::{DbClient, FetchMessageResponse},
28    error::{DbError, DbResult},
29    DbSettings, Notification, User, MAX_ROUTER_TTL, USER_RECORD_VERSION,
30};
31use crate::metric_name::MetricName;
32use crate::metrics::StatsdClientExt;
33
34pub use self::metadata::MetadataBuilder;
35use self::row::{Row, RowCells};
36use super::pool::BigTablePool;
37use super::BigTableDbSettings;
38
39pub mod cell;
40pub mod error;
41pub(crate) mod merge;
42pub mod metadata;
43pub mod row;
44
45// these are normally Vec<u8>
46pub type RowKey = String;
47
48// These are more for code clarity than functional types.
49// Rust will happily swap between the two in any case.
50// See [super::row::Row] for discussion about how these
51// are overloaded in order to simplify fetching data.
52pub type Qualifier = String;
53pub type FamilyId = String;
54
55const ROUTER_FAMILY: &str = "router";
56const MESSAGE_FAMILY: &str = "message"; // The default family for messages
57const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
58#[cfg(feature = "reliable_report")]
59const RELIABLE_LOG_FAMILY: &str = "reliability";
60#[cfg(feature = "reliable_report")]
61/// The maximum TTL for reliability logging (60 days).
62/// /// In most use cases, converted to seconds through .num_seconds().
63pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
64
65pub(crate) const RETRY_COUNT: usize = 5;
66
67/// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently.
68// TODO:Should we create something similar for ChannelID?
69struct Uaid(Uuid);
70
71impl Display for Uaid {
72    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
73        write!(f, "{}", self.0.as_simple())
74    }
75}
76
77impl From<Uaid> for String {
78    fn from(uaid: Uaid) -> String {
79        uaid.0.as_simple().to_string()
80    }
81}
82
83#[derive(Clone)]
84/// Wrapper for the BigTable connection
85pub struct BigTableClientImpl {
86    pub(crate) settings: BigTableDbSettings,
87    /// Metrics client
88    metrics: Arc<StatsdClient>,
89    /// Connection Channel (used for alternate calls)
90    pool: BigTablePool,
91    metadata: Metadata,
92    admin_metadata: Metadata,
93}
94
95/// Return a a RowFilter matching the GC policy of the router Column Family
96fn router_gc_policy_filter() -> data::RowFilter {
97    let mut latest_cell_filter = data::RowFilter::default();
98    latest_cell_filter.set_cells_per_column_limit_filter(1);
99    latest_cell_filter
100}
101
102/// Return a chain of RowFilters matching the GC policy of the message Column
103/// Families
104fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
105    let mut timestamp_filter = data::RowFilter::default();
106    let bt_now: i64 = SystemTime::now()
107        .duration_since(SystemTime::UNIX_EPOCH)
108        .map_err(error::BigTableError::WriteTime)?
109        .as_millis() as i64;
110    let mut range_filter = data::TimestampRange::default();
111    range_filter.set_start_timestamp_micros(bt_now * 1000);
112    timestamp_filter.set_timestamp_range_filter(range_filter);
113
114    Ok(vec![router_gc_policy_filter(), timestamp_filter])
115}
116
117/// Return a Column family regex RowFilter
118fn family_filter(regex: String) -> data::RowFilter {
119    let mut filter = data::RowFilter::default();
120    filter.set_family_name_regex_filter(regex);
121    filter
122}
123
124/// Escape bytes for RE values
125///
126/// Based off google-re2/perl's quotemeta function
127fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
128    let mut vec = Vec::with_capacity(bytes.len() * 2);
129    for &b in bytes {
130        if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
131            if b == b'\0' {
132                // Special handling for null: Note that this special handling
133                // is not strictly required for RE2, but this quoting is
134                // required for other regexp libraries such as PCRE.
135                // Can't use "\\0" since the next character might be a digit.
136                vec.extend("\\x00".as_bytes());
137                continue;
138            }
139            vec.push(b'\\');
140        }
141        vec.push(b);
142    }
143    vec
144}
145
146/// Return a chain of RowFilters limiting to a match of the specified
147/// `version`'s column value
148fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
149    let mut cq_filter = data::RowFilter::default();
150    cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());
151
152    let mut value_filter = data::RowFilter::default();
153    value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));
154
155    vec![
156        family_filter(format!("^{ROUTER_FAMILY}$")),
157        cq_filter,
158        value_filter,
159    ]
160}
161
162/// Return a newly generated `version` column `Cell`
163fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
164    cell::Cell {
165        qualifier: "version".to_owned(),
166        value: Uuid::new_v4().into(),
167        timestamp,
168        ..Default::default()
169    }
170}
171
172/// Return a RowFilter chain from multiple RowFilters
173fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
174    let mut chain = RowFilter_Chain::default();
175    chain.set_filters(filters.into());
176    let mut filter = RowFilter::default();
177    filter.set_chain(chain);
178    filter
179}
180
181/// Return a ReadRowsRequest against table for a given row key
182fn read_row_request(
183    table_name: &str,
184    app_profile_id: &str,
185    row_key: &str,
186) -> bigtable::ReadRowsRequest {
187    let mut req = bigtable::ReadRowsRequest::default();
188    req.set_table_name(table_name.to_owned());
189    req.set_app_profile_id(app_profile_id.to_owned());
190
191    let mut row_keys = RepeatedField::default();
192    row_keys.push(row_key.as_bytes().to_vec());
193    let mut row_set = data::RowSet::default();
194    row_set.set_row_keys(row_keys);
195    req.set_rows(row_set);
196
197    req
198}
199
200fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
201    let v: [u8; 8] = value
202        .try_into()
203        .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
204    Ok(u64::from_be_bytes(v))
205}
206
207fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
208    String::from_utf8(value).map_err(|e| {
209        debug!("🉑 cannot read string {}: {:?}", name, e);
210        DbError::DeserializeString(name.to_owned())
211    })
212}
213
214/// Parse the "set" (see [DbClient::add_channels]) of channel ids in a bigtable Row.
215///
216/// Cells should solely contain the set of channels otherwise an Error is returned.
217fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
218    let mut result = HashSet::new();
219    for cells in cells.values() {
220        let Some(cell) = cells.last() else {
221            continue;
222        };
223        let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
224            return Err(DbError::Integrity(
225                "get_channels expected: chid:<chid>".to_owned(),
226                None,
227            ));
228        };
229        result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
230    }
231    Ok(result)
232}
233
234/// Convert the [HashSet] of channel ids to cell entries for a bigtable Row
235fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
236    let channels = channels.into_owned();
237    let mut cells = Vec::with_capacity(channels.len().min(100_000));
238    for (i, channel_id) in channels.into_iter().enumerate() {
239        // There is a limit of 100,000 mutations per batch for bigtable.
240        // https://cloud.google.com/bigtable/quotas
241        // If you have 100,000 channels, you have too many.
242        if i >= 100_000 {
243            break;
244        }
245        cells.push(cell::Cell {
246            qualifier: format!("chid:{}", channel_id.as_hyphenated()),
247            timestamp: expiry,
248            ..Default::default()
249        });
250    }
251    cells
252}
253
254pub fn retry_policy(max: usize) -> RetryPolicy {
255    RetryPolicy::default()
256        .with_max_retries(max)
257        .with_jitter(true)
258}
259
260fn retryable_internal_err(status: &RpcStatus) -> bool {
261    match status.code() {
262        RpcStatusCode::UNKNOWN => status
263            .message()
264            .eq_ignore_ascii_case("error occurred when fetching oauth2 token."),
265        RpcStatusCode::INTERNAL => [
266            "rst_stream",
267            "rst stream",
268            "received unexpected eos on data frame from server",
269        ]
270        .contains(&status.message().to_lowercase().as_str()),
271        RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
272        _ => false,
273    }
274}
275
276pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
277    let mut metric = metrics
278        .incr_with_tags(MetricName::DatabaseRetry)
279        .with_tag("error", err_type)
280        .with_tag("type", "bigtable");
281    if let Some(code) = code {
282        metric = metric.with_tag("code", code);
283    }
284    metric.send();
285}
286
287pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool + '_ {
288    move |err| {
289        debug!("🉑 Checking grpcio::Error...{err}");
290        match err {
291            grpcio::Error::RpcFailure(status) => {
292                info!("GRPC Failure :{:?}", status);
293                let retry = retryable_internal_err(status);
294                if retry {
295                    metric(metrics, "RpcFailure", Some(&status.code().to_string()));
296                }
297                retry
298            }
299            grpcio::Error::BindFail(_) => {
300                metric(metrics, "BindFail", None);
301                true
302            }
303            // The parameter here is a [grpcio_sys::grpc_call_error] enum
304            // Not all of these are retryable.
305            grpcio::Error::CallFailure(grpc_call_status) => {
306                let retry = grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR;
307                if retry {
308                    metric(
309                        metrics,
310                        "CallFailure",
311                        Some(&format!("{grpc_call_status:?}")),
312                    );
313                }
314                retry
315            }
316            _ => false,
317        }
318    }
319}
320
321pub fn retryable_bt_err(
322    metrics: &Arc<StatsdClient>,
323) -> impl Fn(&error::BigTableError) -> bool + '_ {
324    move |err| {
325        debug!("🉑 Checking BigTableError...{err}");
326        match err {
327            error::BigTableError::InvalidRowResponse(e)
328            | error::BigTableError::Read(e)
329            | error::BigTableError::Write(e)
330            | error::BigTableError::GRPC(e) => retryable_grpcio_err(metrics)(e),
331            _ => false,
332        }
333    }
334}
335
336/// Determine if a router record is "incomplete" (doesn't include [User]
337/// columns):
338///
339/// They can be incomplete for a couple reasons:
340///
341/// 1) A migration code bug caused a few incomplete migrations where
342///    `add_channels` and `increment_storage` calls occurred when the migration's
343///    initial `add_user` was never completed:
344///    https://github.com/mozilla-services/autopush-rs/pull/640
345///
346/// 2) When router TTLs are eventually enabled: `add_channel` and
347///    `increment_storage` can write cells with later expiry times than the other
348///    router cells
349fn is_incomplete_router_record(cells: &RowCells) -> bool {
350    cells
351        .keys()
352        .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
353}
354
355fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
356    ::grpcio::CallOption::default().headers(metadata)
357}
358
359/// Connect to a BigTable storage model.
360///
361/// BigTable is available via the Google Console, and is a schema less storage system.
362///
363/// The `db_dsn` string should be in the form of
364/// `grpc://{BigTableEndpoint}`
365///
366/// The settings contains the `table_name` which is the GRPC path to the data.
367/// (e.g. `projects/{project_id}/instances/{instance_id}/tables/{table_id}`)
368///
369/// where:
370/// _BigTableEndpoint_ is the endpoint domain to use (the default is `bigtable.googleapis.com`) See
371/// [BigTable Endpoints](https://cloud.google.com/bigtable/docs/regional-endpoints) for more details.
372/// _project-id_ is the Google project identifier (see the Google developer console (e.g. 'autopush-dev'))
373/// _instance-id_ is the Google project instance, (see the Google developer console (e.g. 'development-1'))
374/// _table_id_ is the Table Name (e.g. 'autopush')
375///
376/// This will automatically bring in the default credentials specified by the `GOOGLE_APPLICATION_CREDENTIALS`
377/// environment variable.
378///
379/// NOTE: Some configurations may look for the default credential file (pointed to by
380/// `GOOGLE_APPLICATION_CREDENTIALS`) to be stored in
381/// `$HOME/.config/gcloud/application_default_credentials.json`)
382///
383impl BigTableClientImpl {
384    pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
385        // let env = Arc::new(EnvBuilder::new().build());
386        debug!("🏊 BT Pool new");
387        let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
388        info!("🉑 {:#?}", db_settings);
389        let pool = BigTablePool::new(settings, &metrics)?;
390
391        // create the metadata header blocks required by Google for accessing GRPC resources.
392        let metadata = db_settings.metadata()?;
393        let admin_metadata = db_settings.admin_metadata()?;
394        Ok(Self {
395            settings: db_settings,
396            metrics,
397            metadata,
398            admin_metadata,
399            pool,
400        })
401    }
402
403    /// Spawn a task to periodically evict idle connections
404    pub fn spawn_sweeper(&self, interval: Duration) {
405        self.pool.spawn_sweeper(interval);
406    }
407
408    /// Return a ReadRowsRequest for a given row key
409    fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
410        read_row_request(
411            &self.settings.table_name,
412            &self.settings.app_profile_id,
413            row_key,
414        )
415    }
416
417    /// Return a MutateRowRequest for a given row key
418    fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
419        let mut req = bigtable::MutateRowRequest::default();
420        req.set_table_name(self.settings.table_name.clone());
421        req.set_app_profile_id(self.settings.app_profile_id.clone());
422        req.set_row_key(row_key.as_bytes().to_vec());
423        req
424    }
425
426    /// Return a CheckAndMutateRowRequest for a given row key
427    fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
428        let mut req = bigtable::CheckAndMutateRowRequest::default();
429        req.set_table_name(self.settings.table_name.clone());
430        req.set_app_profile_id(self.settings.app_profile_id.clone());
431        req.set_row_key(row_key.as_bytes().to_vec());
432        req
433    }
434
435    /// Read a given row from the row key.
436    async fn mutate_row(
437        &self,
438        req: bigtable::MutateRowRequest,
439    ) -> Result<(), error::BigTableError> {
440        let bigtable = self.pool.get().await?;
441        retry_policy(self.settings.retry_count)
442            .retry_if(
443                || async {
444                    bigtable
445                        .conn
446                        .mutate_row_opt(&req, call_opts(self.metadata.clone()))
447                },
448                retryable_grpcio_err(&self.metrics),
449            )
450            .await
451            .map_err(error::BigTableError::Write)?;
452        Ok(())
453    }
454
455    /// Perform a MutateRowsRequest
456    #[allow(unused)]
457    async fn mutate_rows(
458        &self,
459        req: bigtable::MutateRowsRequest,
460    ) -> Result<(), error::BigTableError> {
461        let bigtable = self.pool.get().await?;
462        // ClientSStreamReceiver will cancel an operation if it's dropped before it's done.
463        let resp = retry_policy(self.settings.retry_count)
464            .retry_if(
465                || async {
466                    bigtable
467                        .conn
468                        .mutate_rows_opt(&req, call_opts(self.metadata.clone()))
469                },
470                retryable_grpcio_err(&self.metrics),
471            )
472            .await
473            .map_err(error::BigTableError::Write)?;
474
475        // Scan the returned stream looking for errors.
476        // As I understand, the returned stream contains chunked MutateRowsResponse structs. Each
477        // struct contains the result of the row mutation, and contains a `status` (non-zero on error)
478        // and an optional message string (empty if none).
479        // The structure also contains an overall `status` but that does not appear to be exposed.
480        // Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html
481        let mut stream = Box::pin(resp);
482        let mut cnt = 0;
483        loop {
484            let (result, remainder) = stream.into_future().await;
485            if let Some(result) = result {
486                debug!("🎏 Result block: {}", cnt);
487                match result {
488                    Ok(r) => {
489                        for e in r.get_entries() {
490                            if e.has_status() {
491                                let status = e.get_status();
492                                // See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
493                                let code = error::MutateRowStatus::from(status.get_code());
494                                if !code.is_ok() {
495                                    return Err(error::BigTableError::Status(
496                                        code,
497                                        status.get_message().to_owned(),
498                                    ));
499                                }
500                                debug!("🎏 Response: {} OK", e.index);
501                            }
502                        }
503                    }
504                    Err(e) => return Err(error::BigTableError::Write(e)),
505                };
506                cnt += 1;
507            } else {
508                debug!("🎏 Done!");
509                break;
510            }
511            stream = remainder;
512        }
513
514        Ok(())
515    }
516
517    /// Read one row for the [ReadRowsRequest] (assuming only a single row was requested).
518    async fn read_row(
519        &self,
520        req: bigtable::ReadRowsRequest,
521    ) -> Result<Option<row::Row>, error::BigTableError> {
522        let mut rows = self.read_rows(req).await?;
523        Ok(rows.pop_first().map(|(_, v)| v))
524    }
525
526    /// Take a big table ReadRowsRequest (containing the keys and filters) and return a set of row data indexed by row key.
527    ///
528    ///
529    async fn read_rows(
530        &self,
531        req: ReadRowsRequest,
532    ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
533        let bigtable = self.pool.get().await?;
534        let resp = retry_policy(self.settings.retry_count)
535            .retry_if(
536                || async {
537                    let resp: grpcio::ClientSStreamReceiver<bigtable::ReadRowsResponse> = bigtable
538                        .conn
539                        .read_rows_opt(&req, call_opts(self.metadata.clone()))
540                        .map_err(error::BigTableError::Read)?;
541                    merge::RowMerger::process_chunks(resp).await
542                },
543                retryable_bt_err(&self.metrics),
544            )
545            .await?;
546        Ok(resp)
547    }
548
549    /// write a given row.
550    ///
551    /// there's also `.mutate_rows` which I presume allows multiple.
552    async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
553        let mut req = self.mutate_row_request(&row.row_key);
554        // compile the mutations.
555        // It's possible to do a lot here, including altering in process
556        // mutations, clearing them, etc. It's all up for grabs until we commit
557        // below. For now, let's just presume a write and be done.
558        let mutations = self.get_mutations(row.cells)?;
559        req.set_mutations(mutations);
560        self.mutate_row(req).await?;
561        Ok(())
562    }
563
564    /// Compile the list of mutations for this row.
565    fn get_mutations(
566        &self,
567        cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
568    ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
569        let mut mutations = protobuf::RepeatedField::default();
570        for (family_id, cells) in cells {
571            for cell in cells {
572                let mut mutation = data::Mutation::default();
573                let mut set_cell = data::Mutation_SetCell::default();
574                let timestamp = cell
575                    .timestamp
576                    .duration_since(SystemTime::UNIX_EPOCH)
577                    .map_err(error::BigTableError::WriteTime)?;
578                set_cell.family_name.clone_from(&family_id);
579                set_cell.set_column_qualifier(cell.qualifier.clone().into_bytes());
580                set_cell.set_value(cell.value);
581                // Yes, this is passing milli bounded time as a micro. Otherwise I get
582                // a `Timestamp granularity mismatch` error
583                set_cell.set_timestamp_micros((timestamp.as_millis() * 1000) as i64);
584                debug!("🉑 expiring in {:?}", timestamp.as_millis());
585                mutation.set_set_cell(set_cell);
586                mutations.push(mutation);
587            }
588        }
589        Ok(mutations)
590    }
591
592    /// Write mutations if the row meets a condition specified by the filter.
593    ///
594    /// Mutations can be applied either when the filter matches (state `true`)
595    /// or doesn't match (state `false`).
596    ///
597    /// Returns whether the filter matched records (which indicates whether the
598    /// mutations were applied, depending on the state)
599    async fn check_and_mutate_row(
600        &self,
601        row: row::Row,
602        filter: RowFilter,
603        state: bool,
604    ) -> Result<bool, error::BigTableError> {
605        let mut req = self.check_and_mutate_row_request(&row.row_key);
606        let mutations = self.get_mutations(row.cells)?;
607        req.set_predicate_filter(filter);
608        if state {
609            req.set_true_mutations(mutations);
610        } else {
611            req.set_false_mutations(mutations);
612        }
613        self.check_and_mutate(req).await
614    }
615
616    async fn check_and_mutate(
617        &self,
618        req: bigtable::CheckAndMutateRowRequest,
619    ) -> Result<bool, error::BigTableError> {
620        let bigtable = self.pool.get().await?;
621        let resp = retry_policy(self.settings.retry_count)
622            .retry_if(
623                || async {
624                    // Note: check_and_mutate_row_async may return before the row
625                    // is written, which can cause race conditions for reads
626                    bigtable
627                        .conn
628                        .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
629                },
630                retryable_grpcio_err(&self.metrics),
631            )
632            .await
633            .map_err(error::BigTableError::Write)?;
634        debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
635        Ok(resp.get_predicate_matched())
636    }
637
638    fn get_delete_mutations(
639        &self,
640        family: &str,
641        column_names: &[&str],
642        time_range: Option<&data::TimestampRange>,
643    ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
644        let mut mutations = protobuf::RepeatedField::default();
645        for column in column_names {
646            let mut mutation = data::Mutation::default();
647            // Mutation_DeleteFromRow -- Delete all cells for a given row.
648            // Mutation_DeleteFromFamily -- Delete all cells from a family for a given row.
649            // Mutation_DeleteFromColumn -- Delete all cells from a column name for a given row, restricted by timestamp range.
650            let mut del_cell = data::Mutation_DeleteFromColumn::default();
651            del_cell.set_family_name(family.to_owned());
652            del_cell.set_column_qualifier(column.as_bytes().to_vec());
653            if let Some(range) = time_range {
654                del_cell.set_time_range(range.clone());
655            }
656            mutation.set_delete_from_column(del_cell);
657            mutations.push(mutation);
658        }
659        Ok(mutations)
660    }
661
662    #[allow(unused)]
663    /// Delete all cell data from the specified columns with the optional time range.
664    #[allow(unused)]
665    async fn delete_cells(
666        &self,
667        row_key: &str,
668        family: &str,
669        column_names: &[&str],
670        time_range: Option<&data::TimestampRange>,
671    ) -> Result<(), error::BigTableError> {
672        let mut req = self.mutate_row_request(row_key);
673        req.set_mutations(self.get_delete_mutations(family, column_names, time_range)?);
674        self.mutate_row(req).await
675    }
676
677    /// Delete all the cells for the given row. NOTE: This will drop the row.
678    async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
679        let mut req = self.mutate_row_request(row_key);
680        let mut mutations = protobuf::RepeatedField::default();
681        let mut mutation = data::Mutation::default();
682        mutation.set_delete_from_row(data::Mutation_DeleteFromRow::default());
683        mutations.push(mutation);
684        req.set_mutations(mutations);
685        self.mutate_row(req).await
686    }
687
688    /// This uses the admin interface to drop row ranges.
689    /// This will drop ALL data associated with these rows.
690    /// Note that deletion may take up to a week to occur.
691    /// see https://cloud.google.com/php/docs/reference/cloud-bigtable/latest/Admin.V2.DropRowRangeRequest
692    #[allow(unused)]
693    async fn delete_rows(&self, row_key: &str) -> Result<bool, error::BigTableError> {
694        let admin = BigtableTableAdminClient::new(self.pool.get_channel()?);
695        let mut req = DropRowRangeRequest::new();
696        req.set_name(self.settings.table_name.clone());
697        req.set_row_key_prefix(row_key.as_bytes().to_vec());
698
699        admin
700            .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone()))
701            .map_err(|e| {
702                error!("{:?}", e);
703                error::BigTableError::Admin(
704                    format!(
705                        "Could not send delete command for {}",
706                        &self.settings.table_name
707                    ),
708                    Some(e.to_string()),
709                )
710            })?
711            .await
712            .map_err(|e| {
713                error!("post await: {:?}", e);
714                error::BigTableError::Admin(
715                    format!(
716                        "Could not delete data from table {}",
717                        &self.settings.table_name
718                    ),
719                    Some(e.to_string()),
720                )
721            })?;
722
723        Ok(true)
724    }
725
726    fn rows_to_notifications(
727        &self,
728        rows: BTreeMap<String, Row>,
729    ) -> Result<Vec<Notification>, DbError> {
730        rows.into_iter()
731            .map(|(row_key, row)| self.row_to_notification(&row_key, row))
732            .collect()
733    }
734
735    fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
736        let Some((_, chidmessageid)) = row_key.split_once('#') else {
737            return Err(DbError::Integrity(
738                "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
739                None,
740            ));
741        };
742        let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
743            DbError::Integrity(
744                format!("rows_to_notification expected chidmessageid: {e}"),
745                None,
746            )
747        })?;
748
749        // Create from the known, required fields.
750        let mut notif = Notification {
751            channel_id: range_key.channel_id,
752            topic: range_key.topic,
753            sortkey_timestamp: range_key.sortkey_timestamp,
754            version: to_string(row.take_required_cell("version")?.value, "version")?,
755            ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
756            timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
757            ..Default::default()
758        };
759
760        // Backfill the Optional fields
761        if let Some(cell) = row.take_cell("data") {
762            notif.data = Some(to_string(cell.value, "data")?);
763        }
764        #[cfg(feature = "reliable_report")]
765        {
766            if let Some(cell) = row.take_cell("reliability_id") {
767                notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
768            }
769            if let Some(cell) = row.take_cell("reliable_state") {
770                notif.reliable_state = Some(
771                    crate::reliability::ReliabilityState::from_str(&to_string(
772                        cell.value,
773                        "reliable_state",
774                    )?)
775                    .map_err(|e| {
776                        DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
777                    })?,
778                );
779            }
780        }
781        if let Some(cell) = row.take_cell("headers") {
782            notif.headers = Some(
783                serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
784                    .map_err(|e| DbError::Serialization(e.to_string()))?,
785            );
786        }
787        if let Some(cell) = row.take_cell("reliability_id") {
788            trace!("🚣  Is reliable");
789            notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
790        }
791
792        trace!("🚣  Deserialized message row: {:?}", &notif);
793        Ok(notif)
794    }
795
796    /// Return a Row for writing from a [User] and a `version`
797    ///
798    /// `version` is specified as an argument (ignoring [User::version]) so
799    /// that [update_user] may specify a new version to write before modifying
800    /// the [User] struct
801    fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
802        let row_key = user.uaid.simple().to_string();
803        let mut row = Row::new(row_key);
804        let expiry =
805            std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
806
807        let mut cells: Vec<cell::Cell> = vec![
808            cell::Cell {
809                qualifier: "connected_at".to_owned(),
810                value: user.connected_at.to_be_bytes().to_vec(),
811                timestamp: expiry,
812                ..Default::default()
813            },
814            cell::Cell {
815                qualifier: "router_type".to_owned(),
816                value: user.router_type.clone().into_bytes(),
817                timestamp: expiry,
818                ..Default::default()
819            },
820            cell::Cell {
821                qualifier: "record_version".to_owned(),
822                value: user
823                    .record_version
824                    .unwrap_or(USER_RECORD_VERSION)
825                    .to_be_bytes()
826                    .to_vec(),
827                timestamp: expiry,
828                ..Default::default()
829            },
830            cell::Cell {
831                qualifier: "version".to_owned(),
832                value: (*version).into(),
833                timestamp: expiry,
834                ..Default::default()
835            },
836        ];
837
838        if let Some(router_data) = &user.router_data {
839            cells.push(cell::Cell {
840                qualifier: "router_data".to_owned(),
841                value: json!(router_data).to_string().as_bytes().to_vec(),
842                timestamp: expiry,
843                ..Default::default()
844            });
845        };
846        if let Some(current_timestamp) = user.current_timestamp {
847            cells.push(cell::Cell {
848                qualifier: "current_timestamp".to_owned(),
849                value: current_timestamp.to_be_bytes().to_vec(),
850                timestamp: expiry,
851                ..Default::default()
852            });
853        };
854        if let Some(node_id) = &user.node_id {
855            cells.push(cell::Cell {
856                qualifier: "node_id".to_owned(),
857                value: node_id.as_bytes().to_vec(),
858                timestamp: expiry,
859                ..Default::default()
860            });
861        };
862
863        cells.extend(channels_to_cells(
864            Cow::Borrowed(&user.priv_channels),
865            expiry,
866        ));
867
868        row.add_cells(ROUTER_FAMILY, cells);
869        row
870    }
871}
872
873#[derive(Clone)]
874pub struct BigtableDb {
875    pub(super) conn: BigtableClient,
876    pub(super) health_metadata: Metadata,
877    table_name: String,
878}
879
880impl BigtableDb {
881    pub fn new(channel: Channel, health_metadata: &Metadata, table_name: &str) -> Self {
882        Self {
883            conn: BigtableClient::new(channel),
884            health_metadata: health_metadata.clone(),
885            table_name: table_name.to_owned(),
886        }
887    }
888    /// Perform a simple connectivity check. This should return no actual results
889    /// but should verify that the connection is valid. We use this for the
890    /// Recycle check as well, so it has to be fairly low in the implementation
891    /// stack.
892    ///
893    ///
894    pub async fn health_check(
895        &mut self,
896        metrics: &Arc<StatsdClient>,
897        app_profile_id: &str,
898    ) -> Result<bool, error::BigTableError> {
899        // It is recommended that we pick a random key to perform the health check. Selecting
900        // a single key for all health checks causes a "hot tablet" to arise. The `PingAndWarm`
901        // is intended to be used prior to large updates and is not recommended for use in
902        // health checks.
903        // This health check is to see if the database is present, the response is not important
904        // other than it does not return an error.
905        let random_uaid = Uuid::new_v4().simple().to_string();
906        let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
907        let mut filter = data::RowFilter::default();
908        filter.set_block_all_filter(true);
909        req.set_filter(filter);
910        let _r = retry_policy(RETRY_COUNT)
911            .retry_if(
912                || async {
913                    self.conn
914                        .read_rows_opt(&req, call_opts(self.health_metadata.clone()))
915                },
916                retryable_grpcio_err(metrics),
917            )
918            .await
919            .map_err(error::BigTableError::Read)?;
920
921        debug!("🉑 health check");
922        Ok(true)
923    }
924}
925
926#[async_trait]
927impl DbClient for BigTableClientImpl {
928    /// add user to the database
929    async fn add_user(&self, user: &User) -> DbResult<()> {
930        trace!("🉑 Adding user");
931        let Some(ref version) = user.version else {
932            return Err(DbError::General(
933                "add_user expected a user version field".to_owned(),
934            ));
935        };
936        let row = self.user_to_row(user, version);
937
938        // Only add when the user doesn't already exist
939        let mut row_key_filter = RowFilter::default();
940        row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
941        let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
942
943        if self.check_and_mutate_row(row, filter, false).await? {
944            return Err(DbError::Conditional);
945        }
946        Ok(())
947    }
948
949    /// BigTable doesn't really have the concept of an "update". You simply write the data and
950    /// the individual cells create a new version. Depending on the garbage collection rules for
951    /// the family, these can either persist or be automatically deleted.
952    ///
953    /// NOTE: This function updates the key ROUTER records for a given UAID. It does this by
954    /// calling [BigTableClientImpl::user_to_row] which creates a new row with new `cell.timestamp` values set
955    /// to now + `MAX_ROUTER_TTL`. This function is called by mobile during the daily
956    /// [autoendpoint::routes::update_token_route] handling, and by desktop
957    /// [autoconnect-ws-sm::get_or_create_user]` which is called
958    /// during the `HELLO` handler. This should be enough to ensure that the ROUTER records
959    /// are properly refreshed for "lively" clients.
960    ///
961    /// NOTE: There is some, very small, potential risk that a desktop client that can
962    /// somehow remain connected the duration of MAX_ROUTER_TTL, may be dropped as not being
963    /// "lively".
964    async fn update_user(&self, user: &mut User) -> DbResult<bool> {
965        let Some(ref version) = user.version else {
966            return Err(DbError::General(
967                "update_user expected a user version field".to_owned(),
968            ));
969        };
970
971        let mut filters = vec![router_gc_policy_filter()];
972        filters.extend(version_filter(version));
973        let filter = filter_chain(filters);
974
975        let new_version = Uuid::new_v4();
976        // Always write a newly generated version
977        let row = self.user_to_row(user, &new_version);
978
979        let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
980        user.version = Some(new_version);
981        Ok(predicate_matched)
982    }
983
984    async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
985        let row_key = uaid.as_simple().to_string();
986        let mut req = self.read_row_request(&row_key);
987        let mut filters = vec![router_gc_policy_filter()];
988        filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
989        req.set_filter(filter_chain(filters));
990        let Some(mut row) = self.read_row(req).await? else {
991            return Ok(None);
992        };
993
994        trace!("🉑 Found a record for {}", row_key);
995
996        let connected_at_cell = match row.take_required_cell("connected_at") {
997            Ok(cell) => cell,
998            Err(_) => {
999                if !is_incomplete_router_record(&row.cells) {
1000                    return Err(DbError::Integrity(
1001                        "Expected column: connected_at".to_owned(),
1002                        Some(format!("{row:#?}")),
1003                    ));
1004                }
1005                // Special case incomplete records: they're equivalent to no
1006                // user exists. Incompletes caused by the migration bug in #640
1007                // will have their migration re-triggered by returning None:
1008                // https://github.com/mozilla-services/autopush-rs/pull/640
1009                trace!("🉑 Dropping an incomplete user record for {}", row_key);
1010                self.metrics
1011                    .incr_with_tags(MetricName::DatabaseDropUser)
1012                    .with_tag("reason", "incomplete_record")
1013                    .send();
1014                self.remove_user(uaid).await?;
1015                return Ok(None);
1016            }
1017        };
1018
1019        let mut result = User {
1020            uaid: *uaid,
1021            connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1022            router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1023            record_version: Some(to_u64(
1024                row.take_required_cell("record_version")?.value,
1025                "record_version",
1026            )?),
1027            version: Some(
1028                row.take_required_cell("version")?
1029                    .value
1030                    .try_into()
1031                    .map_err(|e| {
1032                        DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1033                    })?,
1034            ),
1035            ..Default::default()
1036        };
1037
1038        if let Some(cell) = row.take_cell("router_data") {
1039            result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1040                DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1041            })?;
1042        }
1043
1044        if let Some(cell) = row.take_cell("node_id") {
1045            result.node_id = Some(to_string(cell.value, "node_id")?);
1046        }
1047
1048        if let Some(cell) = row.take_cell("current_timestamp") {
1049            result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1050        }
1051
1052        // Read the channels last, after removal of all non channel cells
1053        result.priv_channels = channels_from_cells(&row.cells)?;
1054
1055        Ok(Some(result))
1056    }
1057
1058    async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1059        let row_key = uaid.simple().to_string();
1060        self.delete_row(&row_key).await?;
1061        Ok(())
1062    }
1063
1064    async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1065        let channels = HashSet::from_iter([channel_id.to_owned()]);
1066        self.add_channels(uaid, channels).await
1067    }
1068
1069    /// Add channels in bulk (used mostly during migration)
1070    ///
1071    async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1072        // channel_ids are stored as a set within one Bigtable row
1073        //
1074        // Bigtable allows "millions of columns in a table, as long as no row
1075        // exceeds the maximum limit of 256 MB per row" enabling the use of
1076        // column qualifiers as data.
1077        //
1078        // The "set" of channel_ids consists of column qualifiers named
1079        // "chid:<chid value>" as set member entries (with their cell values
1080        // being a single 0 byte).
1081        //
1082        // Storing the full set in a single row makes batch updates
1083        // (particularly to reset the GC expiry timestamps) potentially more
1084        // easy/efficient
1085        let row_key = uaid.simple().to_string();
1086        let mut row = Row::new(row_key);
1087        let expiry =
1088            std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
1089
1090        // Note: updating the version column isn't necessary here because this
1091        // write only adds a new (or updates an existing) column with a 0 byte
1092        // value
1093        row.add_cells(
1094            ROUTER_FAMILY,
1095            channels_to_cells(Cow::Owned(channels), expiry),
1096        );
1097
1098        self.write_row(row).await?;
1099        Ok(())
1100    }
1101
1102    async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1103        let row_key = uaid.simple().to_string();
1104        let mut req = self.read_row_request(&row_key);
1105
1106        let mut cq_filter = data::RowFilter::default();
1107        cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());
1108        req.set_filter(filter_chain(vec![
1109            router_gc_policy_filter(),
1110            family_filter(format!("^{ROUTER_FAMILY}$")),
1111            cq_filter,
1112        ]));
1113
1114        let Some(row) = self.read_row(req).await? else {
1115            return Ok(Default::default());
1116        };
1117        channels_from_cells(&row.cells)
1118    }
1119
1120    /// Delete the channel. Does not delete its associated pending messages.
1121    async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1122        let row_key = uaid.simple().to_string();
1123        let mut req = self.check_and_mutate_row_request(&row_key);
1124
1125        // Delete the column representing the channel_id
1126        let column = format!("chid:{}", channel_id.as_hyphenated());
1127        let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1128
1129        // and write a new version cell
1130        let mut row = Row::new(row_key);
1131        let expiry =
1132            std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
1133        row.cells
1134            .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1135        mutations.extend(self.get_mutations(row.cells)?);
1136
1137        // check if the channel existed/was actually removed
1138        let mut cq_filter = data::RowFilter::default();
1139        cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
1140        req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1141        req.set_true_mutations(mutations);
1142
1143        Ok(self.check_and_mutate(req).await?)
1144    }
1145
1146    /// Remove the node_id
1147    async fn remove_node_id(
1148        &self,
1149        uaid: &Uuid,
1150        _node_id: &str,
1151        _connected_at: u64,
1152        version: &Option<Uuid>,
1153    ) -> DbResult<bool> {
1154        let row_key = uaid.simple().to_string();
1155        trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1156        let Some(ref version) = version else {
1157            return Err(DbError::General("Expected a user version field".to_owned()));
1158        };
1159
1160        let mut req = self.check_and_mutate_row_request(&row_key);
1161
1162        let mut filters = vec![router_gc_policy_filter()];
1163        filters.extend(version_filter(version));
1164        req.set_predicate_filter(filter_chain(filters));
1165        req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);
1166
1167        Ok(self.check_and_mutate(req).await?)
1168    }
1169
1170    /// Write the notification to storage.
1171    async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1172        let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1173        debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1174        trace!(
1175            "🉑 timestamp: {:?}",
1176            &message.timestamp.to_be_bytes().to_vec()
1177        );
1178        let mut row = Row::new(row_key);
1179
1180        // Remember, `timestamp` is effectively the time to kill the message, not the
1181        // current time.
1182        // TODO: use message.expiry()
1183        let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1184        trace!(
1185            "🉑 Message Expiry {}",
1186            expiry
1187                .duration_since(SystemTime::UNIX_EPOCH)
1188                .unwrap_or_default()
1189                .as_millis()
1190        );
1191
1192        let mut cells: Vec<cell::Cell> = Vec::new();
1193
1194        let is_topic = message.topic.is_some();
1195        let family = if is_topic {
1196            MESSAGE_TOPIC_FAMILY
1197        } else {
1198            MESSAGE_FAMILY
1199        };
1200        cells.extend(vec![
1201            cell::Cell {
1202                qualifier: "ttl".to_owned(),
1203                value: message.ttl.to_be_bytes().to_vec(),
1204                timestamp: expiry,
1205                ..Default::default()
1206            },
1207            cell::Cell {
1208                qualifier: "timestamp".to_owned(),
1209                value: message.timestamp.to_be_bytes().to_vec(),
1210                timestamp: expiry,
1211                ..Default::default()
1212            },
1213            cell::Cell {
1214                qualifier: "version".to_owned(),
1215                value: message.version.into_bytes(),
1216                timestamp: expiry,
1217                ..Default::default()
1218            },
1219        ]);
1220        if let Some(headers) = message.headers {
1221            if !headers.is_empty() {
1222                cells.push(cell::Cell {
1223                    qualifier: "headers".to_owned(),
1224                    value: json!(headers).to_string().into_bytes(),
1225                    timestamp: expiry,
1226                    ..Default::default()
1227                });
1228            }
1229        }
1230        #[cfg(feature = "reliable_report")]
1231        {
1232            if let Some(reliability_id) = message.reliability_id {
1233                trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1234                cells.push(cell::Cell {
1235                    qualifier: "reliability_id".to_owned(),
1236                    value: reliability_id.into_bytes(),
1237                    timestamp: expiry,
1238                    ..Default::default()
1239                });
1240            }
1241            if let Some(reliable_state) = message.reliable_state {
1242                cells.push(cell::Cell {
1243                    qualifier: "reliable_state".to_owned(),
1244                    value: reliable_state.to_string().into_bytes(),
1245                    timestamp: expiry,
1246                    ..Default::default()
1247                });
1248            }
1249        }
1250        if let Some(data) = message.data {
1251            cells.push(cell::Cell {
1252                qualifier: "data".to_owned(),
1253                value: data.into_bytes(),
1254                timestamp: expiry,
1255                ..Default::default()
1256            });
1257        }
1258
1259        row.add_cells(family, cells);
1260        trace!("🉑 Adding row");
1261        self.write_row(row).await?;
1262
1263        self.metrics
1264            .incr_with_tags(MetricName::NotificationMessageStored)
1265            .with_tag("topic", &is_topic.to_string())
1266            .with_tag("database", &self.name())
1267            .send();
1268        Ok(())
1269    }
1270
1271    /// Save a batch of messages to the database.
1272    ///
1273    /// Currently just iterating through the list and saving one at a time. There's a bulk way
1274    /// to save messages, but there are other considerations (e.g. mutation limits)
1275    async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1276        // plate simple way of solving this:
1277        for message in messages {
1278            self.save_message(uaid, message).await?;
1279        }
1280        Ok(())
1281    }
1282
1283    /// Set the `current_timestamp` in the meta record for this user agent.
1284    ///
1285    /// This is a bit different for BigTable. Field expiration (technically cell
1286    /// expiration) is determined by the lifetime assigned to the cell once it hits
1287    /// a given date. That means you can't really extend a lifetime by adjusting a
1288    /// single field. You'd have to adjust all the cells that are in the family.
1289    /// So, we're not going to do expiration that way.
1290    ///
1291    /// That leaves the meta "current_timestamp" field. We do not purge ACK'd records,
1292    /// instead we presume that the TTL will kill them off eventually. On reads, we use
1293    /// the `current_timestamp` to determine what records to return, since we return
1294    /// records with timestamps later than `current_timestamp`.
1295    ///
1296    async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1297        let row_key = uaid.simple().to_string();
1298        debug!(
1299            "🉑 Updating {} current_timestamp:  {:?}",
1300            &row_key,
1301            timestamp.to_be_bytes().to_vec()
1302        );
1303        let expiry =
1304            std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
1305        let mut row = Row::new(row_key.clone());
1306
1307        row.cells.insert(
1308            ROUTER_FAMILY.to_owned(),
1309            vec![
1310                cell::Cell {
1311                    qualifier: "current_timestamp".to_owned(),
1312                    value: timestamp.to_be_bytes().to_vec(),
1313                    timestamp: expiry,
1314                    ..Default::default()
1315                },
1316                new_version_cell(expiry),
1317            ],
1318        );
1319
1320        self.write_row(row).await?;
1321
1322        Ok(())
1323    }
1324
1325    /// Delete the notification from storage.
1326    async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1327        trace!(
1328            "🉑 attemping to delete {:?} :: {:?}",
1329            uaid.to_string(),
1330            chidmessageid
1331        );
1332        let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1333        debug!("🉑🔥 Deleting message {}", &row_key);
1334        self.delete_row(&row_key).await?;
1335        self.metrics
1336            .incr_with_tags(MetricName::NotificationMessageDeleted)
1337            .with_tag("database", &self.name())
1338            .send();
1339        Ok(())
1340    }
1341
1342    /// Return `limit` pending messages from storage. `limit=0` for all messages.
1343    async fn fetch_topic_messages(
1344        &self,
1345        uaid: &Uuid,
1346        limit: usize,
1347    ) -> DbResult<FetchMessageResponse> {
1348        let mut req = ReadRowsRequest::default();
1349        req.set_table_name(self.settings.table_name.clone());
1350        req.set_app_profile_id(self.settings.app_profile_id.clone());
1351
1352        let start_key = format!("{}#01:", uaid.simple());
1353        let end_key = format!("{}#02:", uaid.simple());
1354        let mut rows = data::RowSet::default();
1355        let mut row_range = data::RowRange::default();
1356        row_range.set_start_key_open(start_key.into_bytes());
1357        row_range.set_end_key_open(end_key.into_bytes());
1358        let mut row_ranges = RepeatedField::default();
1359        row_ranges.push(row_range);
1360        rows.set_row_ranges(row_ranges);
1361        req.set_rows(rows);
1362
1363        let mut filters = message_gc_policy_filter()?;
1364        filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1365
1366        req.set_filter(filter_chain(filters));
1367        if limit > 0 {
1368            trace!("🉑 Setting limit to {limit}");
1369            req.set_rows_limit(limit as i64);
1370        }
1371        let rows = self.read_rows(req).await?;
1372        debug!(
1373            "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1374            rows.len(),
1375            limit
1376        );
1377
1378        let messages = self.rows_to_notifications(rows)?;
1379
1380        // Note: Bigtable always returns a timestamp of None.
1381        // Under Bigtable `current_timestamp` is instead initially read
1382        // from [get_user].
1383        Ok(FetchMessageResponse {
1384            messages,
1385            timestamp: None,
1386        })
1387    }
1388
1389    /// Return `limit` messages pending for a UAID that have a sortkey_timestamp after
1390    /// what's specified. `limit=0` for all messages.
1391    async fn fetch_timestamp_messages(
1392        &self,
1393        uaid: &Uuid,
1394        timestamp: Option<u64>,
1395        limit: usize,
1396    ) -> DbResult<FetchMessageResponse> {
1397        let mut req = ReadRowsRequest::default();
1398        req.set_table_name(self.settings.table_name.clone());
1399        req.set_app_profile_id(self.settings.app_profile_id.clone());
1400
1401        let mut rows = data::RowSet::default();
1402        let mut row_range = data::RowRange::default();
1403
1404        let start_key = if let Some(ts) = timestamp {
1405            // Fetch everything after the last message with timestamp: the "z"
1406            // moves past the last message's channel_id's 1st hex digit
1407            format!("{}#02:{}z", uaid.simple(), ts)
1408        } else {
1409            format!("{}#02:", uaid.simple())
1410        };
1411        let end_key = format!("{}#03:", uaid.simple());
1412        row_range.set_start_key_open(start_key.into_bytes());
1413        row_range.set_end_key_open(end_key.into_bytes());
1414
1415        let mut row_ranges = RepeatedField::default();
1416        row_ranges.push(row_range);
1417        rows.set_row_ranges(row_ranges);
1418        req.set_rows(rows);
1419
1420        // We can fetch data and do [some remote filtering](https://cloud.google.com/bigtable/docs/filters),
1421        // unfortunately I don't think the filtering we need will be super helpful.
1422        //
1423        //
1424        /*
1425        //NOTE: if you filter on a given field, BigTable will only
1426        // return that specific field. Adding filters for the rest of
1427        // the known elements may NOT return those elements or may
1428        // cause the message to not be returned because any of
1429        // those elements are not present. It may be preferable to
1430        // therefore run two filters, one to fetch the candidate IDs
1431        // and another to fetch the content of the messages.
1432         */
1433        let mut filters = message_gc_policy_filter()?;
1434        filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1435
1436        req.set_filter(filter_chain(filters));
1437        if limit > 0 {
1438            req.set_rows_limit(limit as i64);
1439        }
1440        let rows = self.read_rows(req).await?;
1441        debug!(
1442            "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1443            timestamp,
1444            rows.len(),
1445            limit,
1446        );
1447
1448        let messages = self.rows_to_notifications(rows)?;
1449        // The timestamp of the last message read
1450        let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1451        Ok(FetchMessageResponse {
1452            messages,
1453            timestamp,
1454        })
1455    }
1456
1457    async fn health_check(&self) -> DbResult<bool> {
1458        Ok(self
1459            .pool
1460            .get()
1461            .await?
1462            .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1463            .await?)
1464    }
1465
1466    /// Returns true, because there's only one table in BigTable. We divide things up
1467    /// by `family`.
1468    async fn router_table_exists(&self) -> DbResult<bool> {
1469        Ok(true)
1470    }
1471
1472    /// Returns true, because there's only one table in BigTable. We divide things up
1473    /// by `family`.
1474    async fn message_table_exists(&self) -> DbResult<bool> {
1475        Ok(true)
1476    }
1477
1478    #[cfg(feature = "reliable_report")]
1479    async fn log_report(
1480        &self,
1481        reliability_id: &str,
1482        new_state: crate::reliability::ReliabilityState,
1483    ) -> DbResult<()> {
1484        let row_key = reliability_id.to_owned();
1485
1486        let mut row = Row::new(row_key);
1487        let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1488
1489        // Log the latest transition time for this id.
1490        let cells: Vec<cell::Cell> = vec![cell::Cell {
1491            qualifier: new_state.to_string(),
1492            value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1493            timestamp: expiry,
1494            ..Default::default()
1495        }];
1496
1497        row.add_cells(RELIABLE_LOG_FAMILY, cells);
1498
1499        self.write_row(row).await?;
1500
1501        Ok(())
1502    }
1503
1504    fn box_clone(&self) -> Box<dyn DbClient> {
1505        Box::new(self.clone())
1506    }
1507
1508    fn name(&self) -> String {
1509        "Bigtable".to_owned()
1510    }
1511
1512    fn pool_status(&self) -> Option<deadpool::Status> {
1513        Some(self.pool.pool.status())
1514    }
1515}
1516
1517#[cfg(all(test, feature = "emulator"))]
1518mod tests {
1519
1520    //! Currently, these test rely on having a BigTable emulator running on the current machine.
1521    //! The tests presume to be able to connect to localhost:8086. See docs/bigtable.md for
1522    //! details and how to set up and initialize an emulator.
1523    //!
1524    use std::sync::Arc;
1525    use std::time::SystemTime;
1526
1527    use cadence::StatsdClient;
1528    use uuid;
1529
1530    use super::*;
1531    use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1532
1533    const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1534    const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1535    const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1536
1537    fn now() -> u64 {
1538        SystemTime::now()
1539            .duration_since(SystemTime::UNIX_EPOCH)
1540            .unwrap()
1541            .as_secs()
1542    }
1543
1544    fn new_client() -> DbResult<BigTableClientImpl> {
1545        let env_dsn = format!(
1546            "grpc://{}",
1547            std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1548        );
1549        let settings = DbSettings {
1550            // this presumes the table was created with
1551            // ```
1552            // scripts/setup_bt.sh
1553            // ```
1554            // with `message`, `router`, and `message_topic` families
1555            dsn: Some(env_dsn),
1556            db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1557                .to_string(),
1558        };
1559
1560        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1561
1562        BigTableClientImpl::new(metrics, &settings)
1563    }
1564
1565    #[test]
1566    fn escape_bytes_for_regex() {
1567        let b = b"hi";
1568        assert_eq!(escape_bytes(b), b.to_vec());
1569        assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1570        let b = b"\xe2\x80\xb3";
1571        assert_eq!(escape_bytes(b), b.to_vec());
1572        // clippy::octal-escapes rightly discourages this ("\022") in a byte literal
1573        let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1574        assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1575        let b = b"\xc0";
1576        assert_eq!(escape_bytes(b), b.to_vec());
1577        assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1578    }
1579
1580    #[actix_rt::test]
1581    async fn health_check() {
1582        let client = new_client().unwrap();
1583
1584        let result = client.health_check().await;
1585        assert!(result.is_ok());
1586        assert!(result.unwrap());
1587    }
1588
1589    /// run a gauntlet of testing. These are a bit linear because they need
1590    /// to run in sequence.
1591    #[actix_rt::test]
1592    async fn run_gauntlet() -> DbResult<()> {
1593        let client = new_client()?;
1594
1595        let connected_at = ms_since_epoch();
1596
1597        let uaid = Uuid::parse_str(TEST_USER).unwrap();
1598        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1599        let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1600
1601        let node_id = "test_node".to_owned();
1602
1603        // purge the user record if it exists.
1604        let _ = client.remove_user(&uaid).await;
1605
1606        let test_user = User {
1607            uaid,
1608            router_type: "webpush".to_owned(),
1609            connected_at,
1610            router_data: None,
1611            node_id: Some(node_id.clone()),
1612            ..Default::default()
1613        };
1614
1615        // purge the old user (if present)
1616        // in case a prior test failed for whatever reason.
1617        let _ = client.remove_user(&uaid).await;
1618
1619        // can we add the user?
1620        client.add_user(&test_user).await?;
1621        let fetched = client.get_user(&uaid).await?;
1622        assert!(fetched.is_some());
1623        let fetched = fetched.unwrap();
1624        assert_eq!(fetched.router_type, "webpush".to_owned());
1625
1626        // Simulate a connected_at occuring before the following writes
1627        let connected_at = ms_since_epoch();
1628
1629        // can we add channels?
1630        client.add_channel(&uaid, &chid).await?;
1631        let channels = client.get_channels(&uaid).await?;
1632        assert!(channels.contains(&chid));
1633
1634        // can we add lots of channels?
1635        let mut new_channels: HashSet<Uuid> = HashSet::new();
1636        new_channels.insert(chid);
1637        for _ in 1..10 {
1638            new_channels.insert(uuid::Uuid::new_v4());
1639        }
1640        let chid_to_remove = uuid::Uuid::new_v4();
1641        new_channels.insert(chid_to_remove);
1642        client.add_channels(&uaid, new_channels.clone()).await?;
1643        let channels = client.get_channels(&uaid).await?;
1644        assert_eq!(channels, new_channels);
1645
1646        // can we remove a channel?
1647        assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1648        assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1649        new_channels.remove(&chid_to_remove);
1650        let channels = client.get_channels(&uaid).await?;
1651        assert_eq!(channels, new_channels);
1652
1653        // now ensure that we can update a user that's after the time we set
1654        // prior. first ensure that we can't update a user that's before the
1655        // time we set prior to the last write
1656        let mut updated = User {
1657            connected_at,
1658            ..test_user.clone()
1659        };
1660        let result = client.update_user(&mut updated).await;
1661        assert!(result.is_ok());
1662        assert!(!result.unwrap());
1663
1664        // Make sure that the `connected_at` wasn't modified
1665        let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1666        assert_eq!(fetched.connected_at, fetched2.connected_at);
1667
1668        // and make sure we can update a record with a later connected_at time.
1669        let mut updated = User {
1670            connected_at: fetched.connected_at + 300,
1671            ..fetched2
1672        };
1673        let result = client.update_user(&mut updated).await;
1674        assert!(result.is_ok());
1675        assert!(result.unwrap());
1676        assert_ne!(
1677            fetched2.connected_at,
1678            client.get_user(&uaid).await?.unwrap().connected_at
1679        );
1680
1681        // can we increment the storage for the user?
1682        client
1683            .increment_storage(
1684                &fetched.uaid,
1685                SystemTime::now()
1686                    .duration_since(SystemTime::UNIX_EPOCH)
1687                    .unwrap()
1688                    .as_secs(),
1689            )
1690            .await?;
1691
1692        let test_data = "An_encrypted_pile_of_crap".to_owned();
1693        let timestamp = now();
1694        let sort_key = now();
1695        // Can we store a message?
1696        let test_notification = crate::db::Notification {
1697            channel_id: chid,
1698            version: "test".to_owned(),
1699            ttl: 300,
1700            timestamp,
1701            data: Some(test_data.clone()),
1702            sortkey_timestamp: Some(sort_key),
1703            ..Default::default()
1704        };
1705        let res = client.save_message(&uaid, test_notification.clone()).await;
1706        assert!(res.is_ok());
1707
1708        let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1709        assert_ne!(fetched.messages.len(), 0);
1710        let fm = fetched.messages.pop().unwrap();
1711        assert_eq!(fm.channel_id, test_notification.channel_id);
1712        assert_eq!(fm.data, Some(test_data));
1713
1714        // Grab all 1 of the messages that were submmited within the past 10 seconds.
1715        let fetched = client
1716            .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1717            .await?;
1718        assert_ne!(fetched.messages.len(), 0);
1719
1720        // Try grabbing a message for 10 seconds from now.
1721        let fetched = client
1722            .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1723            .await?;
1724        assert_eq!(fetched.messages.len(), 0);
1725
1726        // can we clean up our toys?
1727        assert!(client
1728            .remove_message(&uaid, &test_notification.chidmessageid())
1729            .await
1730            .is_ok());
1731
1732        assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1733
1734        // Now, can we do all that with topic messages
1735        client.add_channel(&uaid, &topic_chid).await?;
1736        let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1737        let timestamp = now();
1738        let sort_key = now();
1739        // Can we store a message?
1740        let test_notification = crate::db::Notification {
1741            channel_id: topic_chid,
1742            version: "test".to_owned(),
1743            ttl: 300,
1744            topic: Some("topic".to_owned()),
1745            timestamp,
1746            data: Some(test_data.clone()),
1747            sortkey_timestamp: Some(sort_key),
1748            ..Default::default()
1749        };
1750        assert!(client
1751            .save_message(&uaid, test_notification.clone())
1752            .await
1753            .is_ok());
1754
1755        let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1756        assert_ne!(fetched.messages.len(), 0);
1757        let fm = fetched.messages.pop().unwrap();
1758        assert_eq!(fm.channel_id, test_notification.channel_id);
1759        assert_eq!(fm.data, Some(test_data));
1760
1761        // Grab the message that was submmited.
1762        let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1763        assert_ne!(fetched.messages.len(), 0);
1764
1765        // can we clean up our toys?
1766        assert!(client
1767            .remove_message(&uaid, &test_notification.chidmessageid())
1768            .await
1769            .is_ok());
1770
1771        assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1772
1773        let msgs = client
1774            .fetch_timestamp_messages(&uaid, None, 999)
1775            .await?
1776            .messages;
1777        assert!(msgs.is_empty());
1778
1779        let fetched = client.get_user(&uaid).await?.unwrap();
1780        assert!(client
1781            .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1782            .await
1783            .is_ok());
1784        // did we remove it?
1785        let fetched = client.get_user(&uaid).await?.unwrap();
1786        assert_eq!(fetched.node_id, None);
1787
1788        assert!(client.remove_user(&uaid).await.is_ok());
1789
1790        assert!(client.get_user(&uaid).await?.is_none());
1791
1792        Ok(())
1793    }
1794
1795    #[actix_rt::test]
1796    async fn read_cells_family_id() -> DbResult<()> {
1797        let client = new_client().unwrap();
1798        let uaid = gen_test_uaid();
1799        client.remove_user(&uaid).await.unwrap();
1800
1801        let qualifier = "foo".to_owned();
1802
1803        let row_key = uaid.simple().to_string();
1804        let mut row = Row::new(row_key.clone());
1805        row.cells.insert(
1806            ROUTER_FAMILY.to_owned(),
1807            vec![cell::Cell {
1808                qualifier: qualifier.to_owned(),
1809                value: "bar".as_bytes().to_vec(),
1810                ..Default::default()
1811            }],
1812        );
1813        client.write_row(row).await.unwrap();
1814        let req = client.read_row_request(&row_key);
1815        let Some(row) = client.read_row(req).await.unwrap() else {
1816            panic!("Expected row");
1817        };
1818        assert_eq!(row.cells.len(), 1);
1819        assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1820        client.remove_user(&uaid).await
1821    }
1822
1823    #[actix_rt::test]
1824    async fn add_user_existing() {
1825        let client = new_client().unwrap();
1826        let uaid = gen_test_uaid();
1827        let user = User {
1828            uaid,
1829            ..Default::default()
1830        };
1831        client.remove_user(&uaid).await.unwrap();
1832
1833        client.add_user(&user).await.unwrap();
1834        let err = client.add_user(&user).await.unwrap_err();
1835        assert!(matches!(err, DbError::Conditional));
1836    }
1837
1838    #[actix_rt::test]
1839    async fn version_check() {
1840        let client = new_client().unwrap();
1841        let uaid = gen_test_uaid();
1842        let user = User {
1843            uaid,
1844            ..Default::default()
1845        };
1846        client.remove_user(&uaid).await.unwrap();
1847
1848        client.add_user(&user).await.unwrap();
1849        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1850        assert!(client.update_user(&mut user.clone()).await.unwrap());
1851
1852        let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1853        assert_ne!(user.version, fetched.version);
1854        // should now fail w/ a stale version
1855        assert!(!client.update_user(&mut user).await.unwrap());
1856
1857        client.remove_user(&uaid).await.unwrap();
1858    }
1859
1860    #[actix_rt::test]
1861    async fn lingering_chid_record() {
1862        let client = new_client().unwrap();
1863        let uaid = gen_test_uaid();
1864        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1865        let user = User {
1866            uaid,
1867            ..Default::default()
1868        };
1869        client.remove_user(&uaid).await.unwrap();
1870
1871        // add_channel doesn't check for the existence of a user
1872        client.add_channel(&uaid, &chid).await.unwrap();
1873
1874        // w/ chid records in the router row, get_user should treat
1875        // this as the user not existing
1876        assert!(client.get_user(&uaid).await.unwrap().is_none());
1877
1878        client.add_user(&user).await.unwrap();
1879        // get_user should have also cleaned up the chids
1880        assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1881
1882        client.remove_user(&uaid).await.unwrap();
1883    }
1884
1885    #[actix_rt::test]
1886    async fn lingering_current_timestamp() {
1887        let client = new_client().unwrap();
1888        let uaid = gen_test_uaid();
1889        client.remove_user(&uaid).await.unwrap();
1890
1891        client
1892            .increment_storage(&uaid, ms_since_epoch())
1893            .await
1894            .unwrap();
1895        assert!(client.get_user(&uaid).await.unwrap().is_none());
1896
1897        client.remove_user(&uaid).await.unwrap();
1898    }
1899
1900    #[actix_rt::test]
1901    async fn lingering_chid_w_version_record() {
1902        let client = new_client().unwrap();
1903        let uaid = gen_test_uaid();
1904        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1905        client.remove_user(&uaid).await.unwrap();
1906
1907        client.add_channel(&uaid, &chid).await.unwrap();
1908        assert!(client.remove_channel(&uaid, &chid).await.unwrap());
1909        assert!(client.get_user(&uaid).await.unwrap().is_none());
1910
1911        client.remove_user(&uaid).await.unwrap();
1912    }
1913
1914    #[actix_rt::test]
1915    async fn channel_and_current_timestamp_ttl_updates() {
1916        let client = new_client().unwrap();
1917        let uaid = gen_test_uaid();
1918        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1919        client.remove_user(&uaid).await.unwrap();
1920
1921        // Setup a user with some channels and a current_timestamp
1922        let user = User {
1923            uaid,
1924            ..Default::default()
1925        };
1926        client.add_user(&user).await.unwrap();
1927
1928        client.add_channel(&uaid, &chid).await.unwrap();
1929        client
1930            .add_channel(&uaid, &uuid::Uuid::new_v4())
1931            .await
1932            .unwrap();
1933
1934        client
1935            .increment_storage(
1936                &uaid,
1937                SystemTime::now()
1938                    .duration_since(SystemTime::UNIX_EPOCH)
1939                    .unwrap()
1940                    .as_secs(),
1941            )
1942            .await
1943            .unwrap();
1944
1945        let req = client.read_row_request(&uaid.as_simple().to_string());
1946        let Some(mut row) = client.read_row(req).await.unwrap() else {
1947            panic!("Expected row");
1948        };
1949
1950        // Ensure the initial cell expiry (timestamp) of all the cells
1951        // in the row has been updated
1952        let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
1953        for mut cells in row.cells.into_values() {
1954            let Some(cell) = cells.pop() else {
1955                continue;
1956            };
1957            assert!(
1958                cell.timestamp >= ca_expiry,
1959                "{} cell timestamp should >= connected_at's",
1960                cell.qualifier
1961            );
1962        }
1963
1964        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1965
1966        // Quick nap to make sure that the ca_expiry values are different.
1967        tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
1968        client.update_user(&mut user).await.unwrap();
1969
1970        // Ensure update_user updated the expiry (timestamp) of every cell in the row
1971        let req = client.read_row_request(&uaid.as_simple().to_string());
1972        let Some(mut row) = client.read_row(req).await.unwrap() else {
1973            panic!("Expected row");
1974        };
1975
1976        let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
1977
1978        assert!(ca_expiry2 > ca_expiry);
1979
1980        for mut cells in row.cells.into_values() {
1981            let Some(cell) = cells.pop() else {
1982                continue;
1983            };
1984            assert!(
1985                cell.timestamp >= ca_expiry2,
1986                "{} cell timestamp expiry should exceed connected_at's",
1987                cell.qualifier
1988            );
1989        }
1990
1991        client.remove_user(&uaid).await.unwrap();
1992    }
1993}