autopush_common/db/bigtable/bigtable_client/
mod.rs

1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8
9use again::RetryPolicy;
10use async_trait::async_trait;
11use cadence::StatsdClient;
12use chrono::TimeDelta;
13use futures_util::StreamExt;
14use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
15use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
16use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
17use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
18use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
19use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
20use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
21use protobuf::RepeatedField;
22use serde_json::{from_str, json};
23use uuid::Uuid;
24
25use crate::db::RangeKey;
26use crate::db::{
27    client::{DbClient, FetchMessageResponse},
28    error::{DbError, DbResult},
29    DbSettings, Notification, User, USER_RECORD_VERSION,
30};
31use crate::metric_name::MetricName;
32use crate::metrics::StatsdClientExt;
33use crate::MAX_ROUTER_TTL_SECS;
34
35pub use self::metadata::MetadataBuilder;
36use self::row::{Row, RowCells};
37use super::pool::BigTablePool;
38use super::BigTableDbSettings;
39
40pub mod cell;
41pub mod error;
42pub(crate) mod merge;
43pub mod metadata;
44pub mod row;
45
46// these are normally Vec<u8>
47pub type RowKey = String;
48
49// These are more for code clarity than functional types.
50// Rust will happily swap between the two in any case.
51// See [super::row::Row] for discussion about how these
52// are overloaded in order to simplify fetching data.
53pub type Qualifier = String;
54pub type FamilyId = String;
55
56const ROUTER_FAMILY: &str = "router";
57const MESSAGE_FAMILY: &str = "message"; // The default family for messages
58const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
59#[cfg(feature = "reliable_report")]
60const RELIABLE_LOG_FAMILY: &str = "reliability";
61#[cfg(feature = "reliable_report")]
62/// The maximum TTL for reliability logging (60 days).
63/// /// In most use cases, converted to seconds through .num_seconds().
64pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
65
66pub(crate) const RETRY_COUNT: usize = 5;
67
68/// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently.
69// TODO:Should we create something similar for ChannelID?
70struct Uaid(Uuid);
71
72impl Display for Uaid {
73    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
74        write!(f, "{}", self.0.as_simple())
75    }
76}
77
78impl From<Uaid> for String {
79    fn from(uaid: Uaid) -> String {
80        uaid.0.as_simple().to_string()
81    }
82}
83
84#[derive(Clone)]
85/// Wrapper for the BigTable connection
86pub struct BigTableClientImpl {
87    pub(crate) settings: BigTableDbSettings,
88    /// Metrics client
89    metrics: Arc<StatsdClient>,
90    /// Connection Channel (used for alternate calls)
91    pool: BigTablePool,
92    metadata: Metadata,
93    admin_metadata: Metadata,
94}
95
96/// Return a a RowFilter matching the GC policy of the router Column Family
97fn router_gc_policy_filter() -> data::RowFilter {
98    let mut latest_cell_filter = data::RowFilter::default();
99    latest_cell_filter.set_cells_per_column_limit_filter(1);
100    latest_cell_filter
101}
102
103/// Return a chain of RowFilters matching the GC policy of the message Column
104/// Families
105fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
106    let mut timestamp_filter = data::RowFilter::default();
107    let bt_now: i64 = SystemTime::now()
108        .duration_since(SystemTime::UNIX_EPOCH)
109        .map_err(error::BigTableError::WriteTime)?
110        .as_millis() as i64;
111    let mut range_filter = data::TimestampRange::default();
112    range_filter.set_start_timestamp_micros(bt_now * 1000);
113    timestamp_filter.set_timestamp_range_filter(range_filter);
114
115    Ok(vec![router_gc_policy_filter(), timestamp_filter])
116}
117
118/// Return a Column family regex RowFilter
119fn family_filter(regex: String) -> data::RowFilter {
120    let mut filter = data::RowFilter::default();
121    filter.set_family_name_regex_filter(regex);
122    filter
123}
124
125/// Escape bytes for RE values
126///
127/// Based off google-re2/perl's quotemeta function
128fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
129    let mut vec = Vec::with_capacity(bytes.len() * 2);
130    for &b in bytes {
131        if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
132            if b == b'\0' {
133                // Special handling for null: Note that this special handling
134                // is not strictly required for RE2, but this quoting is
135                // required for other regexp libraries such as PCRE.
136                // Can't use "\\0" since the next character might be a digit.
137                vec.extend("\\x00".as_bytes());
138                continue;
139            }
140            vec.push(b'\\');
141        }
142        vec.push(b);
143    }
144    vec
145}
146
147/// Return a chain of RowFilters limiting to a match of the specified
148/// `version`'s column value
149fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
150    let mut cq_filter = data::RowFilter::default();
151    cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());
152
153    let mut value_filter = data::RowFilter::default();
154    value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));
155
156    vec![
157        family_filter(format!("^{ROUTER_FAMILY}$")),
158        cq_filter,
159        value_filter,
160    ]
161}
162
163/// Return a newly generated `version` column `Cell`
164fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
165    cell::Cell {
166        qualifier: "version".to_owned(),
167        value: Uuid::new_v4().into(),
168        timestamp,
169        ..Default::default()
170    }
171}
172
173/// Return a RowFilter chain from multiple RowFilters
174fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
175    let mut chain = RowFilter_Chain::default();
176    chain.set_filters(filters.into());
177    let mut filter = RowFilter::default();
178    filter.set_chain(chain);
179    filter
180}
181
182/// Return a ReadRowsRequest against table for a given row key
183fn read_row_request(
184    table_name: &str,
185    app_profile_id: &str,
186    row_key: &str,
187) -> bigtable::ReadRowsRequest {
188    let mut req = bigtable::ReadRowsRequest::default();
189    req.set_table_name(table_name.to_owned());
190    req.set_app_profile_id(app_profile_id.to_owned());
191
192    let mut row_keys = RepeatedField::default();
193    row_keys.push(row_key.as_bytes().to_vec());
194    let mut row_set = data::RowSet::default();
195    row_set.set_row_keys(row_keys);
196    req.set_rows(row_set);
197
198    req
199}
200
201fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
202    let v: [u8; 8] = value
203        .try_into()
204        .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
205    Ok(u64::from_be_bytes(v))
206}
207
208fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
209    String::from_utf8(value).map_err(|e| {
210        debug!("🉑 cannot read string {}: {:?}", name, e);
211        DbError::DeserializeString(name.to_owned())
212    })
213}
214
215/// Parse the "set" (see [DbClient::add_channels]) of channel ids in a bigtable Row.
216///
217/// Cells should solely contain the set of channels otherwise an Error is returned.
218fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
219    let mut result = HashSet::new();
220    for cells in cells.values() {
221        let Some(cell) = cells.last() else {
222            continue;
223        };
224        let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
225            return Err(DbError::Integrity(
226                "get_channels expected: chid:<chid>".to_owned(),
227                None,
228            ));
229        };
230        result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
231    }
232    Ok(result)
233}
234
235/// Convert the [HashSet] of channel ids to cell entries for a bigtable Row
236fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
237    let channels = channels.into_owned();
238    let mut cells = Vec::with_capacity(channels.len().min(100_000));
239    for (i, channel_id) in channels.into_iter().enumerate() {
240        // There is a limit of 100,000 mutations per batch for bigtable.
241        // https://cloud.google.com/bigtable/quotas
242        // If you have 100,000 channels, you have too many.
243        if i >= 100_000 {
244            break;
245        }
246        cells.push(cell::Cell {
247            qualifier: format!("chid:{}", channel_id.as_hyphenated()),
248            timestamp: expiry,
249            ..Default::default()
250        });
251    }
252    cells
253}
254
255pub fn retry_policy(max: usize) -> RetryPolicy {
256    RetryPolicy::default()
257        .with_max_retries(max)
258        .with_jitter(true)
259}
260
261fn retryable_internal_err(status: &RpcStatus) -> bool {
262    match status.code() {
263        RpcStatusCode::UNKNOWN => status
264            .message()
265            .eq_ignore_ascii_case("error occurred when fetching oauth2 token."),
266        RpcStatusCode::INTERNAL => [
267            "rst_stream",
268            "rst stream",
269            "received unexpected eos on data frame from server",
270        ]
271        .contains(&status.message().to_lowercase().as_str()),
272        RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
273        _ => false,
274    }
275}
276
277pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
278    let mut metric = metrics
279        .incr_with_tags(MetricName::DatabaseRetry)
280        .with_tag("error", err_type)
281        .with_tag("type", "bigtable");
282    if let Some(code) = code {
283        metric = metric.with_tag("code", code);
284    }
285    metric.send();
286}
287
288pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool + '_ {
289    move |err| {
290        debug!("🉑 Checking grpcio::Error...{err}");
291        match err {
292            grpcio::Error::RpcFailure(status) => {
293                info!("GRPC Failure :{:?}", status);
294                let retry = retryable_internal_err(status);
295                if retry {
296                    metric(metrics, "RpcFailure", Some(&status.code().to_string()));
297                }
298                retry
299            }
300            grpcio::Error::BindFail(_) => {
301                metric(metrics, "BindFail", None);
302                true
303            }
304            // The parameter here is a [grpcio_sys::grpc_call_error] enum
305            // Not all of these are retryable.
306            grpcio::Error::CallFailure(grpc_call_status) => {
307                let retry = grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR;
308                if retry {
309                    metric(
310                        metrics,
311                        "CallFailure",
312                        Some(&format!("{grpc_call_status:?}")),
313                    );
314                }
315                retry
316            }
317            _ => false,
318        }
319    }
320}
321
322pub fn retryable_bt_err(
323    metrics: &Arc<StatsdClient>,
324) -> impl Fn(&error::BigTableError) -> bool + '_ {
325    move |err| {
326        debug!("🉑 Checking BigTableError...{err}");
327        match err {
328            error::BigTableError::InvalidRowResponse(e)
329            | error::BigTableError::Read(e)
330            | error::BigTableError::Write(e)
331            | error::BigTableError::GRPC(e) => retryable_grpcio_err(metrics)(e),
332            _ => false,
333        }
334    }
335}
336
337/// Determine if a router record is "incomplete" (doesn't include [User]
338/// columns):
339///
340/// They can be incomplete for a couple reasons:
341///
342/// 1) A migration code bug caused a few incomplete migrations where
343///    `add_channels` and `increment_storage` calls occurred when the migration's
344///    initial `add_user` was never completed:
345///    https://github.com/mozilla-services/autopush-rs/pull/640
346///
347/// 2) When router TTLs are eventually enabled: `add_channel` and
348///    `increment_storage` can write cells with later expiry times than the other
349///    router cells
350fn is_incomplete_router_record(cells: &RowCells) -> bool {
351    cells
352        .keys()
353        .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
354}
355
356fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
357    ::grpcio::CallOption::default().headers(metadata)
358}
359
360/// Connect to a BigTable storage model.
361///
362/// BigTable is available via the Google Console, and is a schema less storage system.
363///
364/// The `db_dsn` string should be in the form of
365/// `grpc://{BigTableEndpoint}`
366///
367/// The settings contains the `table_name` which is the GRPC path to the data.
368/// (e.g. `projects/{project_id}/instances/{instance_id}/tables/{table_id}`)
369///
370/// where:
371/// _BigTableEndpoint_ is the endpoint domain to use (the default is `bigtable.googleapis.com`) See
372/// [BigTable Endpoints](https://cloud.google.com/bigtable/docs/regional-endpoints) for more details.
373/// _project-id_ is the Google project identifier (see the Google developer console (e.g. 'autopush-dev'))
374/// _instance-id_ is the Google project instance, (see the Google developer console (e.g. 'development-1'))
375/// _table_id_ is the Table Name (e.g. 'autopush')
376///
377/// This will automatically bring in the default credentials specified by the `GOOGLE_APPLICATION_CREDENTIALS`
378/// environment variable.
379///
380/// NOTE: Some configurations may look for the default credential file (pointed to by
381/// `GOOGLE_APPLICATION_CREDENTIALS`) to be stored in
382/// `$HOME/.config/gcloud/application_default_credentials.json`)
383///
384impl BigTableClientImpl {
385    pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
386        // let env = Arc::new(EnvBuilder::new().build());
387        debug!("🏊 BT Pool new");
388        let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
389        info!("🉑 {:#?}", db_settings);
390        let pool = BigTablePool::new(settings, &metrics)?;
391
392        // create the metadata header blocks required by Google for accessing GRPC resources.
393        let metadata = db_settings.metadata()?;
394        let admin_metadata = db_settings.admin_metadata()?;
395        Ok(Self {
396            settings: db_settings,
397            metrics,
398            metadata,
399            admin_metadata,
400            pool,
401        })
402    }
403
404    fn router_ttl(&self) -> Duration {
405        self.settings
406            .max_router_ttl
407            .unwrap_or(Duration::from_secs(MAX_ROUTER_TTL_SECS))
408    }
409
410    /// Spawn a task to periodically evict idle connections
411    pub fn spawn_sweeper(&self, interval: Duration) {
412        self.pool.spawn_sweeper(interval);
413    }
414
415    /// Return a ReadRowsRequest for a given row key
416    fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
417        read_row_request(
418            &self.settings.table_name,
419            &self.settings.app_profile_id,
420            row_key,
421        )
422    }
423
424    /// Return a MutateRowRequest for a given row key
425    fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
426        let mut req = bigtable::MutateRowRequest::default();
427        req.set_table_name(self.settings.table_name.clone());
428        req.set_app_profile_id(self.settings.app_profile_id.clone());
429        req.set_row_key(row_key.as_bytes().to_vec());
430        req
431    }
432
433    /// Return a CheckAndMutateRowRequest for a given row key
434    fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
435        let mut req = bigtable::CheckAndMutateRowRequest::default();
436        req.set_table_name(self.settings.table_name.clone());
437        req.set_app_profile_id(self.settings.app_profile_id.clone());
438        req.set_row_key(row_key.as_bytes().to_vec());
439        req
440    }
441
442    /// Read a given row from the row key.
443    async fn mutate_row(
444        &self,
445        req: bigtable::MutateRowRequest,
446    ) -> Result<(), error::BigTableError> {
447        let bigtable = self.pool.get().await?;
448        retry_policy(self.settings.retry_count)
449            .retry_if(
450                || async {
451                    bigtable
452                        .conn
453                        .mutate_row_opt(&req, call_opts(self.metadata.clone()))
454                },
455                retryable_grpcio_err(&self.metrics),
456            )
457            .await
458            .map_err(error::BigTableError::Write)?;
459        Ok(())
460    }
461
462    /// Perform a MutateRowsRequest
463    #[allow(unused)]
464    async fn mutate_rows(
465        &self,
466        req: bigtable::MutateRowsRequest,
467    ) -> Result<(), error::BigTableError> {
468        let bigtable = self.pool.get().await?;
469        // ClientSStreamReceiver will cancel an operation if it's dropped before it's done.
470        let resp = retry_policy(self.settings.retry_count)
471            .retry_if(
472                || async {
473                    bigtable
474                        .conn
475                        .mutate_rows_opt(&req, call_opts(self.metadata.clone()))
476                },
477                retryable_grpcio_err(&self.metrics),
478            )
479            .await
480            .map_err(error::BigTableError::Write)?;
481
482        // Scan the returned stream looking for errors.
483        // As I understand, the returned stream contains chunked MutateRowsResponse structs. Each
484        // struct contains the result of the row mutation, and contains a `status` (non-zero on error)
485        // and an optional message string (empty if none).
486        // The structure also contains an overall `status` but that does not appear to be exposed.
487        // Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html
488        let mut stream = Box::pin(resp);
489        let mut cnt = 0;
490        loop {
491            let (result, remainder) = StreamExt::into_future(stream).await;
492            if let Some(result) = result {
493                debug!("🎏 Result block: {}", cnt);
494                match result {
495                    Ok(r) => {
496                        for e in r.get_entries() {
497                            if e.has_status() {
498                                let status = e.get_status();
499                                // See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
500                                let code = error::MutateRowStatus::from(status.get_code());
501                                if !code.is_ok() {
502                                    return Err(error::BigTableError::Status(
503                                        code,
504                                        status.get_message().to_owned(),
505                                    ));
506                                }
507                                debug!("🎏 Response: {} OK", e.index);
508                            }
509                        }
510                    }
511                    Err(e) => return Err(error::BigTableError::Write(e)),
512                };
513                cnt += 1;
514            } else {
515                debug!("🎏 Done!");
516                break;
517            }
518            stream = remainder;
519        }
520
521        Ok(())
522    }
523
524    /// Read one row for the [ReadRowsRequest] (assuming only a single row was requested).
525    async fn read_row(
526        &self,
527        req: bigtable::ReadRowsRequest,
528    ) -> Result<Option<row::Row>, error::BigTableError> {
529        let mut rows = self.read_rows(req).await?;
530        Ok(rows.pop_first().map(|(_, v)| v))
531    }
532
533    /// Take a big table ReadRowsRequest (containing the keys and filters) and return a set of row data indexed by row key.
534    ///
535    ///
536    async fn read_rows(
537        &self,
538        req: ReadRowsRequest,
539    ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
540        let bigtable = self.pool.get().await?;
541        let resp = retry_policy(self.settings.retry_count)
542            .retry_if(
543                || async {
544                    let resp: grpcio::ClientSStreamReceiver<bigtable::ReadRowsResponse> = bigtable
545                        .conn
546                        .read_rows_opt(&req, call_opts(self.metadata.clone()))
547                        .map_err(error::BigTableError::Read)?;
548                    merge::RowMerger::process_chunks(resp).await
549                },
550                retryable_bt_err(&self.metrics),
551            )
552            .await?;
553        Ok(resp)
554    }
555
556    /// write a given row.
557    ///
558    /// there's also `.mutate_rows` which I presume allows multiple.
559    async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
560        let mut req = self.mutate_row_request(&row.row_key);
561        // compile the mutations.
562        // It's possible to do a lot here, including altering in process
563        // mutations, clearing them, etc. It's all up for grabs until we commit
564        // below. For now, let's just presume a write and be done.
565        let mutations = self.get_mutations(row.cells)?;
566        req.set_mutations(mutations);
567        self.mutate_row(req).await?;
568        Ok(())
569    }
570
571    /// Compile the list of mutations for this row.
572    fn get_mutations(
573        &self,
574        cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
575    ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
576        let mut mutations = protobuf::RepeatedField::default();
577        for (family_id, cells) in cells {
578            for cell in cells {
579                let mut mutation = data::Mutation::default();
580                let mut set_cell = data::Mutation_SetCell::default();
581                let timestamp = cell
582                    .timestamp
583                    .duration_since(SystemTime::UNIX_EPOCH)
584                    .map_err(error::BigTableError::WriteTime)?;
585                set_cell.family_name.clone_from(&family_id);
586                set_cell.set_column_qualifier(cell.qualifier.clone().into_bytes());
587                set_cell.set_value(cell.value);
588                // Yes, this is passing milli bounded time as a micro. Otherwise I get
589                // a `Timestamp granularity mismatch` error
590                set_cell.set_timestamp_micros((timestamp.as_millis() * 1000) as i64);
591                debug!("🉑 expiring in {:?}", timestamp.as_millis());
592                mutation.set_set_cell(set_cell);
593                mutations.push(mutation);
594            }
595        }
596        Ok(mutations)
597    }
598
599    /// Write mutations if the row meets a condition specified by the filter.
600    ///
601    /// Mutations can be applied either when the filter matches (state `true`)
602    /// or doesn't match (state `false`).
603    ///
604    /// Returns whether the filter matched records (which indicates whether the
605    /// mutations were applied, depending on the state)
606    async fn check_and_mutate_row(
607        &self,
608        row: row::Row,
609        filter: RowFilter,
610        state: bool,
611    ) -> Result<bool, error::BigTableError> {
612        let mut req = self.check_and_mutate_row_request(&row.row_key);
613        let mutations = self.get_mutations(row.cells)?;
614        req.set_predicate_filter(filter);
615        if state {
616            req.set_true_mutations(mutations);
617        } else {
618            req.set_false_mutations(mutations);
619        }
620        self.check_and_mutate(req).await
621    }
622
623    async fn check_and_mutate(
624        &self,
625        req: bigtable::CheckAndMutateRowRequest,
626    ) -> Result<bool, error::BigTableError> {
627        let bigtable = self.pool.get().await?;
628        let resp = retry_policy(self.settings.retry_count)
629            .retry_if(
630                || async {
631                    // Note: check_and_mutate_row_async may return before the row
632                    // is written, which can cause race conditions for reads
633                    bigtable
634                        .conn
635                        .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
636                },
637                retryable_grpcio_err(&self.metrics),
638            )
639            .await
640            .map_err(error::BigTableError::Write)?;
641        debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
642        Ok(resp.get_predicate_matched())
643    }
644
645    fn get_delete_mutations(
646        &self,
647        family: &str,
648        column_names: &[&str],
649        time_range: Option<&data::TimestampRange>,
650    ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
651        let mut mutations = protobuf::RepeatedField::default();
652        for column in column_names {
653            let mut mutation = data::Mutation::default();
654            // Mutation_DeleteFromRow -- Delete all cells for a given row.
655            // Mutation_DeleteFromFamily -- Delete all cells from a family for a given row.
656            // Mutation_DeleteFromColumn -- Delete all cells from a column name for a given row, restricted by timestamp range.
657            let mut del_cell = data::Mutation_DeleteFromColumn::default();
658            del_cell.set_family_name(family.to_owned());
659            del_cell.set_column_qualifier(column.as_bytes().to_vec());
660            if let Some(range) = time_range {
661                del_cell.set_time_range(range.clone());
662            }
663            mutation.set_delete_from_column(del_cell);
664            mutations.push(mutation);
665        }
666        Ok(mutations)
667    }
668
669    #[allow(unused)]
670    /// Delete all cell data from the specified columns with the optional time range.
671    #[allow(unused)]
672    async fn delete_cells(
673        &self,
674        row_key: &str,
675        family: &str,
676        column_names: &[&str],
677        time_range: Option<&data::TimestampRange>,
678    ) -> Result<(), error::BigTableError> {
679        let mut req = self.mutate_row_request(row_key);
680        req.set_mutations(self.get_delete_mutations(family, column_names, time_range)?);
681        self.mutate_row(req).await
682    }
683
684    /// Delete all the cells for the given row. NOTE: This will drop the row.
685    async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
686        let mut req = self.mutate_row_request(row_key);
687        let mut mutations = protobuf::RepeatedField::default();
688        let mut mutation = data::Mutation::default();
689        mutation.set_delete_from_row(data::Mutation_DeleteFromRow::default());
690        mutations.push(mutation);
691        req.set_mutations(mutations);
692        self.mutate_row(req).await
693    }
694
695    /// This uses the admin interface to drop row ranges.
696    /// This will drop ALL data associated with these rows.
697    /// Note that deletion may take up to a week to occur.
698    /// see https://cloud.google.com/php/docs/reference/cloud-bigtable/latest/Admin.V2.DropRowRangeRequest
699    #[allow(unused)]
700    async fn delete_rows(&self, row_key: &str) -> Result<bool, error::BigTableError> {
701        let admin = BigtableTableAdminClient::new(self.pool.get_channel()?);
702        let mut req = DropRowRangeRequest::new();
703        req.set_name(self.settings.table_name.clone());
704        req.set_row_key_prefix(row_key.as_bytes().to_vec());
705
706        admin
707            .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone()))
708            .map_err(|e| {
709                error!("{:?}", e);
710                error::BigTableError::Admin(
711                    format!(
712                        "Could not send delete command for {}",
713                        &self.settings.table_name
714                    ),
715                    Some(e.to_string()),
716                )
717            })?
718            .await
719            .map_err(|e| {
720                error!("post await: {:?}", e);
721                error::BigTableError::Admin(
722                    format!(
723                        "Could not delete data from table {}",
724                        &self.settings.table_name
725                    ),
726                    Some(e.to_string()),
727                )
728            })?;
729
730        Ok(true)
731    }
732
733    fn rows_to_notifications(
734        &self,
735        rows: BTreeMap<String, Row>,
736    ) -> Result<Vec<Notification>, DbError> {
737        rows.into_iter()
738            .map(|(row_key, row)| self.row_to_notification(&row_key, row))
739            .collect()
740    }
741
742    fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
743        let Some((_, chidmessageid)) = row_key.split_once('#') else {
744            return Err(DbError::Integrity(
745                "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
746                None,
747            ));
748        };
749        let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
750            DbError::Integrity(
751                format!("rows_to_notification expected chidmessageid: {e}"),
752                None,
753            )
754        })?;
755
756        // Create from the known, required fields.
757        let mut notif = Notification {
758            channel_id: range_key.channel_id,
759            topic: range_key.topic,
760            sortkey_timestamp: range_key.sortkey_timestamp,
761            version: to_string(row.take_required_cell("version")?.value, "version")?,
762            ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
763            timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
764            ..Default::default()
765        };
766
767        // Backfill the Optional fields
768        if let Some(cell) = row.take_cell("data") {
769            notif.data = Some(to_string(cell.value, "data")?);
770        }
771        #[cfg(feature = "reliable_report")]
772        {
773            if let Some(cell) = row.take_cell("reliability_id") {
774                notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
775            }
776            if let Some(cell) = row.take_cell("reliable_state") {
777                notif.reliable_state = Some(
778                    crate::reliability::ReliabilityState::from_str(&to_string(
779                        cell.value,
780                        "reliable_state",
781                    )?)
782                    .map_err(|e| {
783                        DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
784                    })?,
785                );
786            }
787        }
788        if let Some(cell) = row.take_cell("headers") {
789            notif.headers = Some(
790                serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
791                    .map_err(|e| DbError::Serialization(e.to_string()))?,
792            );
793        }
794        if let Some(cell) = row.take_cell("reliability_id") {
795            trace!("🚣  Is reliable");
796            notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
797        }
798
799        trace!("🚣  Deserialized message row: {:?}", &notif);
800        Ok(notif)
801    }
802
803    /// Return a Row for writing from a [User] and a `version`
804    ///
805    /// `version` is specified as an argument (ignoring [User::version]) so
806    /// that [update_user] may specify a new version to write before modifying
807    /// the [User] struct
808    fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
809        let row_key = user.uaid.simple().to_string();
810        let mut row = Row::new(row_key);
811        let expiry = std::time::SystemTime::now() + self.router_ttl();
812
813        let mut cells: Vec<cell::Cell> = vec![
814            cell::Cell {
815                qualifier: "connected_at".to_owned(),
816                value: user.connected_at.to_be_bytes().to_vec(),
817                timestamp: expiry,
818                ..Default::default()
819            },
820            cell::Cell {
821                qualifier: "router_type".to_owned(),
822                value: user.router_type.clone().into_bytes(),
823                timestamp: expiry,
824                ..Default::default()
825            },
826            cell::Cell {
827                qualifier: "record_version".to_owned(),
828                value: user
829                    .record_version
830                    .unwrap_or(USER_RECORD_VERSION)
831                    .to_be_bytes()
832                    .to_vec(),
833                timestamp: expiry,
834                ..Default::default()
835            },
836            cell::Cell {
837                qualifier: "version".to_owned(),
838                value: (*version).into(),
839                timestamp: expiry,
840                ..Default::default()
841            },
842        ];
843
844        if let Some(router_data) = &user.router_data {
845            cells.push(cell::Cell {
846                qualifier: "router_data".to_owned(),
847                value: json!(router_data).to_string().as_bytes().to_vec(),
848                timestamp: expiry,
849                ..Default::default()
850            });
851        };
852        if let Some(current_timestamp) = user.current_timestamp {
853            cells.push(cell::Cell {
854                qualifier: "current_timestamp".to_owned(),
855                value: current_timestamp.to_be_bytes().to_vec(),
856                timestamp: expiry,
857                ..Default::default()
858            });
859        };
860        if let Some(node_id) = &user.node_id {
861            cells.push(cell::Cell {
862                qualifier: "node_id".to_owned(),
863                value: node_id.as_bytes().to_vec(),
864                timestamp: expiry,
865                ..Default::default()
866            });
867        };
868
869        cells.extend(channels_to_cells(
870            Cow::Borrowed(&user.priv_channels),
871            expiry,
872        ));
873
874        row.add_cells(ROUTER_FAMILY, cells);
875        row
876    }
877}
878
879#[derive(Clone)]
880pub struct BigtableDb {
881    pub(super) conn: BigtableClient,
882    pub(super) health_metadata: Metadata,
883    table_name: String,
884}
885
886impl BigtableDb {
887    pub fn new(channel: Channel, health_metadata: &Metadata, table_name: &str) -> Self {
888        Self {
889            conn: BigtableClient::new(channel),
890            health_metadata: health_metadata.clone(),
891            table_name: table_name.to_owned(),
892        }
893    }
894    /// Perform a simple connectivity check. This should return no actual results
895    /// but should verify that the connection is valid. We use this for the
896    /// Recycle check as well, so it has to be fairly low in the implementation
897    /// stack.
898    ///
899    ///
900    pub async fn health_check(
901        &mut self,
902        metrics: &Arc<StatsdClient>,
903        app_profile_id: &str,
904    ) -> Result<bool, error::BigTableError> {
905        // It is recommended that we pick a random key to perform the health check. Selecting
906        // a single key for all health checks causes a "hot tablet" to arise. The `PingAndWarm`
907        // is intended to be used prior to large updates and is not recommended for use in
908        // health checks.
909        // This health check is to see if the database is present, the response is not important
910        // other than it does not return an error.
911        let random_uaid = Uuid::new_v4().simple().to_string();
912        let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
913        let mut filter = data::RowFilter::default();
914        filter.set_block_all_filter(true);
915        req.set_filter(filter);
916        let _r = retry_policy(RETRY_COUNT)
917            .retry_if(
918                || async {
919                    self.conn
920                        .read_rows_opt(&req, call_opts(self.health_metadata.clone()))
921                },
922                retryable_grpcio_err(metrics),
923            )
924            .await
925            .map_err(error::BigTableError::Read)?;
926
927        debug!("🉑 health check");
928        Ok(true)
929    }
930}
931
932#[async_trait]
933impl DbClient for BigTableClientImpl {
934    /// add user to the database
935    async fn add_user(&self, user: &User) -> DbResult<()> {
936        trace!("🉑 Adding user");
937        let Some(ref version) = user.version else {
938            return Err(DbError::General(
939                "add_user expected a user version field".to_owned(),
940            ));
941        };
942        let row = self.user_to_row(user, version);
943
944        // Only add when the user doesn't already exist
945        let mut row_key_filter = RowFilter::default();
946        row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
947        let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
948
949        if self.check_and_mutate_row(row, filter, false).await? {
950            return Err(DbError::Conditional);
951        }
952        Ok(())
953    }
954
955    /// BigTable doesn't really have the concept of an "update". You simply write the data and
956    /// the individual cells create a new version. Depending on the garbage collection rules for
957    /// the family, these can either persist or be automatically deleted.
958    ///
959    /// NOTE: This function updates the key ROUTER records for a given UAID. It does this by
960    /// calling [BigTableClientImpl::user_to_row] which creates a new row with new `cell.timestamp` values set
961    /// to now + `MAX_ROUTER_TTL`. This function is called by mobile during the daily
962    /// [autoendpoint::routes::update_token_route] handling, and by desktop
963    /// [autoconnect-ws-sm::get_or_create_user]` which is called
964    /// during the `HELLO` handler. This should be enough to ensure that the ROUTER records
965    /// are properly refreshed for "lively" clients.
966    ///
967    /// NOTE: There is some, very small, potential risk that a desktop client that can
968    /// somehow remain connected the duration of MAX_ROUTER_TTL, may be dropped as not being
969    /// "lively".
970    async fn update_user(&self, user: &mut User) -> DbResult<bool> {
971        let Some(ref version) = user.version else {
972            return Err(DbError::General(
973                "update_user expected a user version field".to_owned(),
974            ));
975        };
976
977        let mut filters = vec![router_gc_policy_filter()];
978        filters.extend(version_filter(version));
979        let filter = filter_chain(filters);
980
981        let new_version = Uuid::new_v4();
982        // Always write a newly generated version
983        let row = self.user_to_row(user, &new_version);
984
985        let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
986        user.version = Some(new_version);
987        Ok(predicate_matched)
988    }
989
990    async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
991        let row_key = uaid.as_simple().to_string();
992        let mut req = self.read_row_request(&row_key);
993        let mut filters = vec![router_gc_policy_filter()];
994        filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
995        req.set_filter(filter_chain(filters));
996        let Some(mut row) = self.read_row(req).await? else {
997            return Ok(None);
998        };
999
1000        trace!("🉑 Found a record for {}", row_key);
1001
1002        let connected_at_cell = match row.take_required_cell("connected_at") {
1003            Ok(cell) => cell,
1004            Err(_) => {
1005                if !is_incomplete_router_record(&row.cells) {
1006                    return Err(DbError::Integrity(
1007                        "Expected column: connected_at".to_owned(),
1008                        Some(format!("{row:#?}")),
1009                    ));
1010                }
1011                // Special case incomplete records: they're equivalent to no
1012                // user exists. Incompletes caused by the migration bug in #640
1013                // will have their migration re-triggered by returning None:
1014                // https://github.com/mozilla-services/autopush-rs/pull/640
1015                trace!("🉑 Dropping an incomplete user record for {}", row_key);
1016                self.metrics
1017                    .incr_with_tags(MetricName::DatabaseDropUser)
1018                    .with_tag("reason", "incomplete_record")
1019                    .send();
1020                self.remove_user(uaid).await?;
1021                return Ok(None);
1022            }
1023        };
1024
1025        let mut result = User {
1026            uaid: *uaid,
1027            connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1028            router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1029            record_version: Some(to_u64(
1030                row.take_required_cell("record_version")?.value,
1031                "record_version",
1032            )?),
1033            version: Some(
1034                row.take_required_cell("version")?
1035                    .value
1036                    .try_into()
1037                    .map_err(|e| {
1038                        DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1039                    })?,
1040            ),
1041            ..Default::default()
1042        };
1043
1044        if let Some(cell) = row.take_cell("router_data") {
1045            result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1046                DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1047            })?;
1048        }
1049
1050        if let Some(cell) = row.take_cell("node_id") {
1051            result.node_id = Some(to_string(cell.value, "node_id")?);
1052        }
1053
1054        if let Some(cell) = row.take_cell("current_timestamp") {
1055            result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1056        }
1057
1058        // Read the channels last, after removal of all non channel cells
1059        result.priv_channels = channels_from_cells(&row.cells)?;
1060
1061        Ok(Some(result))
1062    }
1063
1064    async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1065        let row_key = uaid.simple().to_string();
1066        self.delete_row(&row_key).await?;
1067        Ok(())
1068    }
1069
1070    async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1071        let channels = HashSet::from_iter([channel_id.to_owned()]);
1072        self.add_channels(uaid, channels).await
1073    }
1074
1075    /// Add channels in bulk (used mostly during migration)
1076    ///
1077    async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1078        // channel_ids are stored as a set within one Bigtable row
1079        //
1080        // Bigtable allows "millions of columns in a table, as long as no row
1081        // exceeds the maximum limit of 256 MB per row" enabling the use of
1082        // column qualifiers as data.
1083        //
1084        // The "set" of channel_ids consists of column qualifiers named
1085        // "chid:<chid value>" as set member entries (with their cell values
1086        // being a single 0 byte).
1087        //
1088        // Storing the full set in a single row makes batch updates
1089        // (particularly to reset the GC expiry timestamps) potentially more
1090        // easy/efficient
1091        let row_key = uaid.simple().to_string();
1092        let mut row = Row::new(row_key);
1093        let expiry = std::time::SystemTime::now() + self.router_ttl();
1094
1095        // Note: updating the version column isn't necessary here because this
1096        // write only adds a new (or updates an existing) column with a 0 byte
1097        // value
1098        row.add_cells(
1099            ROUTER_FAMILY,
1100            channels_to_cells(Cow::Owned(channels), expiry),
1101        );
1102
1103        self.write_row(row).await?;
1104        Ok(())
1105    }
1106
1107    async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1108        let row_key = uaid.simple().to_string();
1109        let mut req = self.read_row_request(&row_key);
1110
1111        let mut cq_filter = data::RowFilter::default();
1112        cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());
1113        req.set_filter(filter_chain(vec![
1114            router_gc_policy_filter(),
1115            family_filter(format!("^{ROUTER_FAMILY}$")),
1116            cq_filter,
1117        ]));
1118
1119        let Some(row) = self.read_row(req).await? else {
1120            return Ok(Default::default());
1121        };
1122        channels_from_cells(&row.cells)
1123    }
1124
1125    /// Delete the channel. Does not delete its associated pending messages.
1126    async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1127        let row_key = uaid.simple().to_string();
1128        let mut req = self.check_and_mutate_row_request(&row_key);
1129
1130        // Delete the column representing the channel_id
1131        let column = format!("chid:{}", channel_id.as_hyphenated());
1132        let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1133
1134        // and write a new version cell
1135        let mut row = Row::new(row_key);
1136        let expiry = std::time::SystemTime::now() + self.router_ttl();
1137        row.cells
1138            .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1139        mutations.extend(self.get_mutations(row.cells)?);
1140
1141        // check if the channel existed/was actually removed
1142        let mut cq_filter = data::RowFilter::default();
1143        cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
1144        req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1145        req.set_true_mutations(mutations);
1146
1147        Ok(self.check_and_mutate(req).await?)
1148    }
1149
1150    /// Remove the node_id
1151    async fn remove_node_id(
1152        &self,
1153        uaid: &Uuid,
1154        _node_id: &str,
1155        _connected_at: u64,
1156        version: &Option<Uuid>,
1157    ) -> DbResult<bool> {
1158        let row_key = uaid.simple().to_string();
1159        trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1160        let Some(version) = version else {
1161            return Err(DbError::General("Expected a user version field".to_owned()));
1162        };
1163
1164        let mut req = self.check_and_mutate_row_request(&row_key);
1165
1166        let mut filters = vec![router_gc_policy_filter()];
1167        filters.extend(version_filter(version));
1168        req.set_predicate_filter(filter_chain(filters));
1169        req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);
1170
1171        Ok(self.check_and_mutate(req).await?)
1172    }
1173
1174    /// Write the notification to storage.
1175    async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1176        let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1177        debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1178        trace!(
1179            "🉑 timestamp: {:?}",
1180            &message.timestamp.to_be_bytes().to_vec()
1181        );
1182        let mut row = Row::new(row_key);
1183
1184        // Remember, `timestamp` is effectively the time to kill the message, not the
1185        // current time.
1186        // TODO: use message.expiry()
1187        let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1188        trace!(
1189            "🉑 Message Expiry {}",
1190            expiry
1191                .duration_since(SystemTime::UNIX_EPOCH)
1192                .unwrap_or_default()
1193                .as_millis()
1194        );
1195
1196        let mut cells: Vec<cell::Cell> = Vec::new();
1197
1198        let is_topic = message.topic.is_some();
1199        let family = if is_topic {
1200            MESSAGE_TOPIC_FAMILY
1201        } else {
1202            MESSAGE_FAMILY
1203        };
1204        cells.extend(vec![
1205            cell::Cell {
1206                qualifier: "ttl".to_owned(),
1207                value: message.ttl.to_be_bytes().to_vec(),
1208                timestamp: expiry,
1209                ..Default::default()
1210            },
1211            cell::Cell {
1212                qualifier: "timestamp".to_owned(),
1213                value: message.timestamp.to_be_bytes().to_vec(),
1214                timestamp: expiry,
1215                ..Default::default()
1216            },
1217            cell::Cell {
1218                qualifier: "version".to_owned(),
1219                value: message.version.into_bytes(),
1220                timestamp: expiry,
1221                ..Default::default()
1222            },
1223        ]);
1224        if let Some(headers) = message.headers {
1225            if !headers.is_empty() {
1226                cells.push(cell::Cell {
1227                    qualifier: "headers".to_owned(),
1228                    value: json!(headers).to_string().into_bytes(),
1229                    timestamp: expiry,
1230                    ..Default::default()
1231                });
1232            }
1233        }
1234        #[cfg(feature = "reliable_report")]
1235        {
1236            if let Some(reliability_id) = message.reliability_id {
1237                trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1238                cells.push(cell::Cell {
1239                    qualifier: "reliability_id".to_owned(),
1240                    value: reliability_id.into_bytes(),
1241                    timestamp: expiry,
1242                    ..Default::default()
1243                });
1244            }
1245            if let Some(reliable_state) = message.reliable_state {
1246                cells.push(cell::Cell {
1247                    qualifier: "reliable_state".to_owned(),
1248                    value: reliable_state.to_string().into_bytes(),
1249                    timestamp: expiry,
1250                    ..Default::default()
1251                });
1252            }
1253        }
1254        if let Some(data) = message.data {
1255            cells.push(cell::Cell {
1256                qualifier: "data".to_owned(),
1257                value: data.into_bytes(),
1258                timestamp: expiry,
1259                ..Default::default()
1260            });
1261        }
1262
1263        row.add_cells(family, cells);
1264        trace!("🉑 Adding row");
1265        self.write_row(row).await?;
1266
1267        self.metrics
1268            .incr_with_tags(MetricName::NotificationMessageStored)
1269            .with_tag("topic", &is_topic.to_string())
1270            .with_tag("database", &self.name())
1271            .send();
1272        Ok(())
1273    }
1274
1275    /// Save a batch of messages to the database.
1276    ///
1277    /// Currently just iterating through the list and saving one at a time. There's a bulk way
1278    /// to save messages, but there are other considerations (e.g. mutation limits)
1279    async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1280        // plate simple way of solving this:
1281        for message in messages {
1282            self.save_message(uaid, message).await?;
1283        }
1284        Ok(())
1285    }
1286
1287    /// Set the `current_timestamp` in the meta record for this user agent.
1288    ///
1289    /// This is a bit different for BigTable. Field expiration (technically cell
1290    /// expiration) is determined by the lifetime assigned to the cell once it hits
1291    /// a given date. That means you can't really extend a lifetime by adjusting a
1292    /// single field. You'd have to adjust all the cells that are in the family.
1293    /// So, we're not going to do expiration that way.
1294    ///
1295    /// That leaves the meta "current_timestamp" field. We do not purge ACK'd records,
1296    /// instead we presume that the TTL will kill them off eventually. On reads, we use
1297    /// the `current_timestamp` to determine what records to return, since we return
1298    /// records with timestamps later than `current_timestamp`.
1299    ///
1300    async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1301        let row_key = uaid.simple().to_string();
1302        debug!(
1303            "🉑 Updating {} current_timestamp:  {:?}",
1304            &row_key,
1305            timestamp.to_be_bytes().to_vec()
1306        );
1307        let expiry = std::time::SystemTime::now() + self.router_ttl();
1308        let mut row = Row::new(row_key.clone());
1309
1310        row.cells.insert(
1311            ROUTER_FAMILY.to_owned(),
1312            vec![
1313                cell::Cell {
1314                    qualifier: "current_timestamp".to_owned(),
1315                    value: timestamp.to_be_bytes().to_vec(),
1316                    timestamp: expiry,
1317                    ..Default::default()
1318                },
1319                new_version_cell(expiry),
1320            ],
1321        );
1322
1323        self.write_row(row).await?;
1324
1325        Ok(())
1326    }
1327
1328    /// Delete the notification from storage.
1329    async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1330        trace!(
1331            "🉑 attemping to delete {:?} :: {:?}",
1332            uaid.to_string(),
1333            chidmessageid
1334        );
1335        let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1336        debug!("🉑🔥 Deleting message {}", &row_key);
1337        self.delete_row(&row_key).await?;
1338        self.metrics
1339            .incr_with_tags(MetricName::NotificationMessageDeleted)
1340            .with_tag("database", &self.name())
1341            .send();
1342        Ok(())
1343    }
1344
1345    /// Return `limit` pending messages from storage. `limit=0` for all messages.
1346    async fn fetch_topic_messages(
1347        &self,
1348        uaid: &Uuid,
1349        limit: usize,
1350    ) -> DbResult<FetchMessageResponse> {
1351        let mut req = ReadRowsRequest::default();
1352        req.set_table_name(self.settings.table_name.clone());
1353        req.set_app_profile_id(self.settings.app_profile_id.clone());
1354
1355        let start_key = format!("{}#01:", uaid.simple());
1356        let end_key = format!("{}#02:", uaid.simple());
1357        let mut rows = data::RowSet::default();
1358        let mut row_range = data::RowRange::default();
1359        row_range.set_start_key_open(start_key.into_bytes());
1360        row_range.set_end_key_open(end_key.into_bytes());
1361        let mut row_ranges = RepeatedField::default();
1362        row_ranges.push(row_range);
1363        rows.set_row_ranges(row_ranges);
1364        req.set_rows(rows);
1365
1366        let mut filters = message_gc_policy_filter()?;
1367        filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1368
1369        req.set_filter(filter_chain(filters));
1370        if limit > 0 {
1371            trace!("🉑 Setting limit to {limit}");
1372            req.set_rows_limit(limit as i64);
1373        }
1374        let rows = self.read_rows(req).await?;
1375        debug!(
1376            "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1377            rows.len(),
1378            limit
1379        );
1380
1381        let messages = self.rows_to_notifications(rows)?;
1382
1383        // Note: Bigtable always returns a timestamp of None.
1384        // Under Bigtable `current_timestamp` is instead initially read
1385        // from [get_user].
1386        Ok(FetchMessageResponse {
1387            messages,
1388            timestamp: None,
1389        })
1390    }
1391
1392    /// Return `limit` messages pending for a UAID that have a sortkey_timestamp after
1393    /// what's specified. `limit=0` for all messages.
1394    async fn fetch_timestamp_messages(
1395        &self,
1396        uaid: &Uuid,
1397        timestamp: Option<u64>,
1398        limit: usize,
1399    ) -> DbResult<FetchMessageResponse> {
1400        let mut req = ReadRowsRequest::default();
1401        req.set_table_name(self.settings.table_name.clone());
1402        req.set_app_profile_id(self.settings.app_profile_id.clone());
1403
1404        let mut rows = data::RowSet::default();
1405        let mut row_range = data::RowRange::default();
1406
1407        let start_key = if let Some(ts) = timestamp {
1408            // Fetch everything after the last message with timestamp: the "z"
1409            // moves past the last message's channel_id's 1st hex digit
1410            format!("{}#02:{}z", uaid.simple(), ts)
1411        } else {
1412            format!("{}#02:", uaid.simple())
1413        };
1414        let end_key = format!("{}#03:", uaid.simple());
1415        row_range.set_start_key_open(start_key.into_bytes());
1416        row_range.set_end_key_open(end_key.into_bytes());
1417
1418        let mut row_ranges = RepeatedField::default();
1419        row_ranges.push(row_range);
1420        rows.set_row_ranges(row_ranges);
1421        req.set_rows(rows);
1422
1423        // We can fetch data and do [some remote filtering](https://cloud.google.com/bigtable/docs/filters),
1424        // unfortunately I don't think the filtering we need will be super helpful.
1425        //
1426        //
1427        /*
1428        //NOTE: if you filter on a given field, BigTable will only
1429        // return that specific field. Adding filters for the rest of
1430        // the known elements may NOT return those elements or may
1431        // cause the message to not be returned because any of
1432        // those elements are not present. It may be preferable to
1433        // therefore run two filters, one to fetch the candidate IDs
1434        // and another to fetch the content of the messages.
1435         */
1436        let mut filters = message_gc_policy_filter()?;
1437        filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1438
1439        req.set_filter(filter_chain(filters));
1440        if limit > 0 {
1441            req.set_rows_limit(limit as i64);
1442        }
1443        let rows = self.read_rows(req).await?;
1444        debug!(
1445            "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1446            timestamp,
1447            rows.len(),
1448            limit,
1449        );
1450
1451        let messages = self.rows_to_notifications(rows)?;
1452        // The timestamp of the last message read
1453        let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1454        Ok(FetchMessageResponse {
1455            messages,
1456            timestamp,
1457        })
1458    }
1459
1460    async fn health_check(&self) -> DbResult<bool> {
1461        Ok(self
1462            .pool
1463            .get()
1464            .await?
1465            .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1466            .await?)
1467    }
1468
1469    /// Returns true, because there's only one table in BigTable. We divide things up
1470    /// by `family`.
1471    async fn router_table_exists(&self) -> DbResult<bool> {
1472        Ok(true)
1473    }
1474
1475    /// Returns true, because there's only one table in BigTable. We divide things up
1476    /// by `family`.
1477    async fn message_table_exists(&self) -> DbResult<bool> {
1478        Ok(true)
1479    }
1480
1481    #[cfg(feature = "reliable_report")]
1482    async fn log_report(
1483        &self,
1484        reliability_id: &str,
1485        new_state: crate::reliability::ReliabilityState,
1486    ) -> DbResult<()> {
1487        let row_key = reliability_id.to_owned();
1488
1489        let mut row = Row::new(row_key);
1490        let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1491
1492        // Log the latest transition time for this id.
1493        let cells: Vec<cell::Cell> = vec![cell::Cell {
1494            qualifier: new_state.to_string(),
1495            value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1496            timestamp: expiry,
1497            ..Default::default()
1498        }];
1499
1500        row.add_cells(RELIABLE_LOG_FAMILY, cells);
1501
1502        self.write_row(row).await?;
1503
1504        Ok(())
1505    }
1506
1507    fn box_clone(&self) -> Box<dyn DbClient> {
1508        Box::new(self.clone())
1509    }
1510
1511    fn name(&self) -> String {
1512        "Bigtable".to_owned()
1513    }
1514
1515    fn pool_status(&self) -> Option<deadpool::Status> {
1516        Some(self.pool.pool.status())
1517    }
1518}
1519
1520#[cfg(all(test, feature = "emulator"))]
1521mod tests {
1522
1523    //! Currently, these test rely on having a BigTable emulator running on the current machine.
1524    //! The tests presume to be able to connect to localhost:8086. See docs/bigtable.md for
1525    //! details and how to set up and initialize an emulator.
1526    //!
1527    use std::sync::Arc;
1528    use std::time::SystemTime;
1529
1530    use cadence::StatsdClient;
1531    use uuid;
1532
1533    use super::*;
1534    use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1535
1536    const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1537    const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1538    const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1539
1540    fn now() -> u64 {
1541        SystemTime::now()
1542            .duration_since(SystemTime::UNIX_EPOCH)
1543            .unwrap()
1544            .as_secs()
1545    }
1546
1547    fn new_client() -> DbResult<BigTableClientImpl> {
1548        let env_dsn = format!(
1549            "grpc://{}",
1550            std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1551        );
1552        let settings = DbSettings {
1553            // this presumes the table was created with
1554            // ```
1555            // scripts/setup_bt.sh
1556            // ```
1557            // with `message`, `router`, and `message_topic` families
1558            dsn: Some(env_dsn),
1559            db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1560                .to_string(),
1561        };
1562
1563        let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1564
1565        BigTableClientImpl::new(metrics, &settings)
1566    }
1567
1568    #[test]
1569    fn escape_bytes_for_regex() {
1570        let b = b"hi";
1571        assert_eq!(escape_bytes(b), b.to_vec());
1572        assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1573        let b = b"\xe2\x80\xb3";
1574        assert_eq!(escape_bytes(b), b.to_vec());
1575        // clippy::octal-escapes rightly discourages this ("\022") in a byte literal
1576        let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1577        assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1578        let b = b"\xc0";
1579        assert_eq!(escape_bytes(b), b.to_vec());
1580        assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1581    }
1582
1583    #[actix_rt::test]
1584    async fn health_check() {
1585        let client = new_client().unwrap();
1586
1587        let result = client.health_check().await;
1588        assert!(result.is_ok());
1589        assert!(result.unwrap());
1590    }
1591
1592    /// run a gauntlet of testing. These are a bit linear because they need
1593    /// to run in sequence.
1594    #[actix_rt::test]
1595    async fn run_gauntlet() -> DbResult<()> {
1596        let client = new_client()?;
1597
1598        let connected_at = ms_since_epoch();
1599
1600        let uaid = Uuid::parse_str(TEST_USER).unwrap();
1601        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1602        let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1603
1604        let node_id = "test_node".to_owned();
1605
1606        // purge the user record if it exists.
1607        let _ = client.remove_user(&uaid).await;
1608
1609        let test_user = User {
1610            uaid,
1611            router_type: "webpush".to_owned(),
1612            connected_at,
1613            router_data: None,
1614            node_id: Some(node_id.clone()),
1615            ..Default::default()
1616        };
1617
1618        // purge the old user (if present)
1619        // in case a prior test failed for whatever reason.
1620        let _ = client.remove_user(&uaid).await;
1621
1622        // can we add the user?
1623        client.add_user(&test_user).await?;
1624        let fetched = client.get_user(&uaid).await?;
1625        assert!(fetched.is_some());
1626        let fetched = fetched.unwrap();
1627        assert_eq!(fetched.router_type, "webpush".to_owned());
1628
1629        // Simulate a connected_at occuring before the following writes
1630        let connected_at = ms_since_epoch();
1631
1632        // can we add channels?
1633        client.add_channel(&uaid, &chid).await?;
1634        let channels = client.get_channels(&uaid).await?;
1635        assert!(channels.contains(&chid));
1636
1637        // can we add lots of channels?
1638        let mut new_channels: HashSet<Uuid> = HashSet::new();
1639        new_channels.insert(chid);
1640        for _ in 1..10 {
1641            new_channels.insert(uuid::Uuid::new_v4());
1642        }
1643        let chid_to_remove = uuid::Uuid::new_v4();
1644        new_channels.insert(chid_to_remove);
1645        client.add_channels(&uaid, new_channels.clone()).await?;
1646        let channels = client.get_channels(&uaid).await?;
1647        assert_eq!(channels, new_channels);
1648
1649        // can we remove a channel?
1650        assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1651        assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1652        new_channels.remove(&chid_to_remove);
1653        let channels = client.get_channels(&uaid).await?;
1654        assert_eq!(channels, new_channels);
1655
1656        // now ensure that we can update a user that's after the time we set
1657        // prior. first ensure that we can't update a user that's before the
1658        // time we set prior to the last write
1659        let mut updated = User {
1660            connected_at,
1661            ..test_user.clone()
1662        };
1663        let result = client.update_user(&mut updated).await;
1664        assert!(result.is_ok());
1665        assert!(!result.unwrap());
1666
1667        // Make sure that the `connected_at` wasn't modified
1668        let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1669        assert_eq!(fetched.connected_at, fetched2.connected_at);
1670
1671        // and make sure we can update a record with a later connected_at time.
1672        let mut updated = User {
1673            connected_at: fetched.connected_at + 300,
1674            ..fetched2
1675        };
1676        let result = client.update_user(&mut updated).await;
1677        assert!(result.is_ok());
1678        assert!(result.unwrap());
1679        assert_ne!(
1680            fetched2.connected_at,
1681            client.get_user(&uaid).await?.unwrap().connected_at
1682        );
1683
1684        // can we increment the storage for the user?
1685        client
1686            .increment_storage(
1687                &fetched.uaid,
1688                SystemTime::now()
1689                    .duration_since(SystemTime::UNIX_EPOCH)
1690                    .unwrap()
1691                    .as_secs(),
1692            )
1693            .await?;
1694
1695        let test_data = "An_encrypted_pile_of_crap".to_owned();
1696        let timestamp = now();
1697        let sort_key = now();
1698        // Can we store a message?
1699        let test_notification = crate::db::Notification {
1700            channel_id: chid,
1701            version: "test".to_owned(),
1702            ttl: 300,
1703            timestamp,
1704            data: Some(test_data.clone()),
1705            sortkey_timestamp: Some(sort_key),
1706            ..Default::default()
1707        };
1708        let res = client.save_message(&uaid, test_notification.clone()).await;
1709        assert!(res.is_ok());
1710
1711        let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1712        assert_ne!(fetched.messages.len(), 0);
1713        let fm = fetched.messages.pop().unwrap();
1714        assert_eq!(fm.channel_id, test_notification.channel_id);
1715        assert_eq!(fm.data, Some(test_data));
1716
1717        // Grab all 1 of the messages that were submmited within the past 10 seconds.
1718        let fetched = client
1719            .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1720            .await?;
1721        assert_ne!(fetched.messages.len(), 0);
1722
1723        // Try grabbing a message for 10 seconds from now.
1724        let fetched = client
1725            .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1726            .await?;
1727        assert_eq!(fetched.messages.len(), 0);
1728
1729        // can we clean up our toys?
1730        assert!(client
1731            .remove_message(&uaid, &test_notification.chidmessageid())
1732            .await
1733            .is_ok());
1734
1735        assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1736
1737        // Now, can we do all that with topic messages
1738        client.add_channel(&uaid, &topic_chid).await?;
1739        let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1740        let timestamp = now();
1741        let sort_key = now();
1742        // Can we store a message?
1743        let test_notification = crate::db::Notification {
1744            channel_id: topic_chid,
1745            version: "test".to_owned(),
1746            ttl: 300,
1747            topic: Some("topic".to_owned()),
1748            timestamp,
1749            data: Some(test_data.clone()),
1750            sortkey_timestamp: Some(sort_key),
1751            ..Default::default()
1752        };
1753        assert!(client
1754            .save_message(&uaid, test_notification.clone())
1755            .await
1756            .is_ok());
1757
1758        let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1759        assert_ne!(fetched.messages.len(), 0);
1760        let fm = fetched.messages.pop().unwrap();
1761        assert_eq!(fm.channel_id, test_notification.channel_id);
1762        assert_eq!(fm.data, Some(test_data));
1763
1764        // Grab the message that was submmited.
1765        let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1766        assert_ne!(fetched.messages.len(), 0);
1767
1768        // can we clean up our toys?
1769        assert!(client
1770            .remove_message(&uaid, &test_notification.chidmessageid())
1771            .await
1772            .is_ok());
1773
1774        assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1775
1776        let msgs = client
1777            .fetch_timestamp_messages(&uaid, None, 999)
1778            .await?
1779            .messages;
1780        assert!(msgs.is_empty());
1781
1782        let fetched = client.get_user(&uaid).await?.unwrap();
1783        assert!(client
1784            .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1785            .await
1786            .is_ok());
1787        // did we remove it?
1788        let fetched = client.get_user(&uaid).await?.unwrap();
1789        assert_eq!(fetched.node_id, None);
1790
1791        assert!(client.remove_user(&uaid).await.is_ok());
1792
1793        assert!(client.get_user(&uaid).await?.is_none());
1794
1795        Ok(())
1796    }
1797
1798    #[actix_rt::test]
1799    async fn read_cells_family_id() -> DbResult<()> {
1800        let client = new_client().unwrap();
1801        let uaid = gen_test_uaid();
1802        client.remove_user(&uaid).await.unwrap();
1803
1804        let qualifier = "foo".to_owned();
1805
1806        let row_key = uaid.simple().to_string();
1807        let mut row = Row::new(row_key.clone());
1808        row.cells.insert(
1809            ROUTER_FAMILY.to_owned(),
1810            vec![cell::Cell {
1811                qualifier: qualifier.to_owned(),
1812                value: "bar".as_bytes().to_vec(),
1813                ..Default::default()
1814            }],
1815        );
1816        client.write_row(row).await.unwrap();
1817        let req = client.read_row_request(&row_key);
1818        let Some(row) = client.read_row(req).await.unwrap() else {
1819            panic!("Expected row");
1820        };
1821        assert_eq!(row.cells.len(), 1);
1822        assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1823        client.remove_user(&uaid).await
1824    }
1825
1826    #[actix_rt::test]
1827    async fn add_user_existing() {
1828        let client = new_client().unwrap();
1829        let uaid = gen_test_uaid();
1830        let user = User {
1831            uaid,
1832            ..Default::default()
1833        };
1834        client.remove_user(&uaid).await.unwrap();
1835
1836        client.add_user(&user).await.unwrap();
1837        let err = client.add_user(&user).await.unwrap_err();
1838        assert!(matches!(err, DbError::Conditional));
1839    }
1840
1841    #[actix_rt::test]
1842    async fn version_check() {
1843        let client = new_client().unwrap();
1844        let uaid = gen_test_uaid();
1845        let user = User {
1846            uaid,
1847            ..Default::default()
1848        };
1849        client.remove_user(&uaid).await.unwrap();
1850
1851        client.add_user(&user).await.unwrap();
1852        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1853        assert!(client.update_user(&mut user.clone()).await.unwrap());
1854
1855        let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1856        assert_ne!(user.version, fetched.version);
1857        // should now fail w/ a stale version
1858        assert!(!client.update_user(&mut user).await.unwrap());
1859
1860        client.remove_user(&uaid).await.unwrap();
1861    }
1862
1863    #[actix_rt::test]
1864    async fn lingering_chid_record() {
1865        let client = new_client().unwrap();
1866        let uaid = gen_test_uaid();
1867        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1868        let user = User {
1869            uaid,
1870            ..Default::default()
1871        };
1872        client.remove_user(&uaid).await.unwrap();
1873
1874        // add_channel doesn't check for the existence of a user
1875        client.add_channel(&uaid, &chid).await.unwrap();
1876
1877        // w/ chid records in the router row, get_user should treat
1878        // this as the user not existing
1879        assert!(client.get_user(&uaid).await.unwrap().is_none());
1880
1881        client.add_user(&user).await.unwrap();
1882        // get_user should have also cleaned up the chids
1883        assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1884
1885        client.remove_user(&uaid).await.unwrap();
1886    }
1887
1888    #[actix_rt::test]
1889    async fn lingering_current_timestamp() {
1890        let client = new_client().unwrap();
1891        let uaid = gen_test_uaid();
1892        client.remove_user(&uaid).await.unwrap();
1893
1894        client
1895            .increment_storage(&uaid, ms_since_epoch())
1896            .await
1897            .unwrap();
1898        assert!(client.get_user(&uaid).await.unwrap().is_none());
1899
1900        client.remove_user(&uaid).await.unwrap();
1901    }
1902
1903    #[actix_rt::test]
1904    async fn lingering_chid_w_version_record() {
1905        let client = new_client().unwrap();
1906        let uaid = gen_test_uaid();
1907        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1908        client.remove_user(&uaid).await.unwrap();
1909
1910        client.add_channel(&uaid, &chid).await.unwrap();
1911        assert!(client.remove_channel(&uaid, &chid).await.unwrap());
1912        assert!(client.get_user(&uaid).await.unwrap().is_none());
1913
1914        client.remove_user(&uaid).await.unwrap();
1915    }
1916
1917    #[actix_rt::test]
1918    async fn channel_and_current_timestamp_ttl_updates() {
1919        let client = new_client().unwrap();
1920        let uaid = gen_test_uaid();
1921        let chid = Uuid::parse_str(TEST_CHID).unwrap();
1922        client.remove_user(&uaid).await.unwrap();
1923
1924        // Setup a user with some channels and a current_timestamp
1925        let user = User {
1926            uaid,
1927            ..Default::default()
1928        };
1929        client.add_user(&user).await.unwrap();
1930
1931        client.add_channel(&uaid, &chid).await.unwrap();
1932        client
1933            .add_channel(&uaid, &uuid::Uuid::new_v4())
1934            .await
1935            .unwrap();
1936
1937        client
1938            .increment_storage(
1939                &uaid,
1940                SystemTime::now()
1941                    .duration_since(SystemTime::UNIX_EPOCH)
1942                    .unwrap()
1943                    .as_secs(),
1944            )
1945            .await
1946            .unwrap();
1947
1948        let req = client.read_row_request(&uaid.as_simple().to_string());
1949        let Some(mut row) = client.read_row(req).await.unwrap() else {
1950            panic!("Expected row");
1951        };
1952
1953        // Ensure the initial cell expiry (timestamp) of all the cells
1954        // in the row has been updated
1955        let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
1956        for mut cells in row.cells.into_values() {
1957            let Some(cell) = cells.pop() else {
1958                continue;
1959            };
1960            assert!(
1961                cell.timestamp >= ca_expiry,
1962                "{} cell timestamp should >= connected_at's",
1963                cell.qualifier
1964            );
1965        }
1966
1967        let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1968
1969        // Quick nap to make sure that the ca_expiry values are different.
1970        tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
1971        client.update_user(&mut user).await.unwrap();
1972
1973        // Ensure update_user updated the expiry (timestamp) of every cell in the row
1974        let req = client.read_row_request(&uaid.as_simple().to_string());
1975        let Some(mut row) = client.read_row(req).await.unwrap() else {
1976            panic!("Expected row");
1977        };
1978
1979        let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
1980
1981        assert!(ca_expiry2 > ca_expiry);
1982
1983        for mut cells in row.cells.into_values() {
1984            let Some(cell) = cells.pop() else {
1985                continue;
1986            };
1987            assert!(
1988                cell.timestamp >= ca_expiry2,
1989                "{} cell timestamp expiry should exceed connected_at's",
1990                cell.qualifier
1991            );
1992        }
1993
1994        client.remove_user(&uaid).await.unwrap();
1995    }
1996}