1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use again::RetryPolicy;
11use async_trait::async_trait;
12use cadence::StatsdClient;
13#[cfg(feature = "reliable_report")]
14use chrono::TimeDelta;
15use futures_util::StreamExt;
16use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
17use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
18use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
19use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
20use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
21use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
22use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
23use protobuf::RepeatedField;
24use serde_json::{from_str, json};
25use uuid::Uuid;
26
27use crate::db::{
28 client::{DbClient, FetchMessageResponse},
29 error::{DbError, DbResult},
30 models::RangeKey,
31 DbSettings, Notification, User, USER_RECORD_VERSION,
32};
33use crate::metric_name::MetricName;
34use crate::metrics::StatsdClientExt;
35use crate::MAX_ROUTER_TTL_SECS;
36
37pub use self::metadata::MetadataBuilder;
38use self::row::{Row, RowCells};
39use super::pool::BigTablePool;
40use super::BigTableDbSettings;
41
42pub mod cell;
43pub mod error;
44pub(crate) mod merge;
45pub mod metadata;
46pub mod row;
47
48pub type RowKey = String;
50
51pub type Qualifier = String;
56pub type FamilyId = String;
57
58const ROUTER_FAMILY: &str = "router";
59const MESSAGE_FAMILY: &str = "message"; const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
61#[cfg(feature = "reliable_report")]
62const RELIABLE_LOG_FAMILY: &str = "reliability";
63#[cfg(feature = "reliable_report")]
64pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
67
68pub(crate) const RETRY_COUNT: usize = 5;
69
70#[derive(Debug)]
75pub struct CircuitBreaker {
76 consecutive_failures: AtomicU32,
77 opened_at_epoch_secs: AtomicU64,
78 failure_threshold: u32,
79 cooldown_secs: u64,
80}
81
82impl CircuitBreaker {
83 pub fn new(failure_threshold: u32, cooldown_secs: u64) -> Self {
84 Self {
85 consecutive_failures: AtomicU32::new(0),
86 opened_at_epoch_secs: AtomicU64::new(0),
87 failure_threshold,
88 cooldown_secs,
89 }
90 }
91
92 pub fn allow_request(&self) -> bool {
95 let failures = self.consecutive_failures.load(Ordering::Relaxed);
96 if failures < self.failure_threshold {
97 return true;
98 }
99 let opened_at = self.opened_at_epoch_secs.load(Ordering::Relaxed);
101 let now = SystemTime::now()
102 .duration_since(SystemTime::UNIX_EPOCH)
103 .unwrap_or_default()
104 .as_secs();
105 if now.saturating_sub(opened_at) >= self.cooldown_secs {
106 true
108 } else {
109 false
110 }
111 }
112
113 pub fn record_success(&self) {
115 self.consecutive_failures.store(0, Ordering::Relaxed);
116 }
117
118 pub fn record_failure(&self) {
120 let prev = self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
121 if prev + 1 >= self.failure_threshold {
122 let now = SystemTime::now()
123 .duration_since(SystemTime::UNIX_EPOCH)
124 .unwrap_or_default()
125 .as_secs();
126 self.opened_at_epoch_secs.store(now, Ordering::Relaxed);
127 }
128 }
129}
130
131impl Default for CircuitBreaker {
132 fn default() -> Self {
133 Self::new(5, 30)
135 }
136}
137
138struct Uaid(Uuid);
141
142impl Display for Uaid {
143 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144 write!(f, "{}", self.0.as_simple())
145 }
146}
147
148impl From<Uaid> for String {
149 fn from(uaid: Uaid) -> String {
150 uaid.0.as_simple().to_string()
151 }
152}
153
154#[derive(Clone)]
155pub struct BigTableClientImpl {
157 pub(crate) settings: BigTableDbSettings,
158 metrics: Arc<StatsdClient>,
160 pool: BigTablePool,
162 metadata: Metadata,
163 admin_metadata: Metadata,
164 circuit_breaker: Arc<CircuitBreaker>,
166}
167
168fn router_gc_policy_filter() -> data::RowFilter {
170 let mut latest_cell_filter = data::RowFilter::default();
171 latest_cell_filter.set_cells_per_column_limit_filter(1);
172 latest_cell_filter
173}
174
175fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
178 let mut timestamp_filter = data::RowFilter::default();
179 let bt_now: i64 = SystemTime::now()
180 .duration_since(SystemTime::UNIX_EPOCH)
181 .map_err(error::BigTableError::WriteTime)?
182 .as_millis() as i64;
183 let mut range_filter = data::TimestampRange::default();
184 range_filter.set_start_timestamp_micros(bt_now * 1000);
185 timestamp_filter.set_timestamp_range_filter(range_filter);
186
187 Ok(vec![router_gc_policy_filter(), timestamp_filter])
188}
189
190fn family_filter(regex: String) -> data::RowFilter {
192 let mut filter = data::RowFilter::default();
193 filter.set_family_name_regex_filter(regex);
194 filter
195}
196
197fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
201 let mut vec = Vec::with_capacity(bytes.len() * 2);
202 for &b in bytes {
203 if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
204 if b == b'\0' {
205 vec.extend("\\x00".as_bytes());
210 continue;
211 }
212 vec.push(b'\\');
213 }
214 vec.push(b);
215 }
216 vec
217}
218
219fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
222 let mut cq_filter = data::RowFilter::default();
223 cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());
224
225 let mut value_filter = data::RowFilter::default();
226 value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));
227
228 vec![
229 family_filter(format!("^{ROUTER_FAMILY}$")),
230 cq_filter,
231 value_filter,
232 ]
233}
234
235fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
237 cell::Cell {
238 qualifier: "version".to_owned(),
239 value: Uuid::new_v4().into(),
240 timestamp,
241 ..Default::default()
242 }
243}
244
245fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
247 let mut chain = RowFilter_Chain::default();
248 chain.set_filters(filters.into());
249 let mut filter = RowFilter::default();
250 filter.set_chain(chain);
251 filter
252}
253
254fn read_row_request(
256 table_name: &str,
257 app_profile_id: &str,
258 row_key: &str,
259) -> bigtable::ReadRowsRequest {
260 let mut req = bigtable::ReadRowsRequest::default();
261 req.set_table_name(table_name.to_owned());
262 req.set_app_profile_id(app_profile_id.to_owned());
263
264 let mut row_keys = RepeatedField::default();
265 row_keys.push(row_key.as_bytes().to_vec());
266 let mut row_set = data::RowSet::default();
267 row_set.set_row_keys(row_keys);
268 req.set_rows(row_set);
269
270 req
271}
272
273fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
274 let v: [u8; 8] = value
275 .try_into()
276 .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
277 Ok(u64::from_be_bytes(v))
278}
279
280fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
281 String::from_utf8(value).map_err(|e| {
282 debug!("🉑 cannot read string {}: {:?}", name, e);
283 DbError::DeserializeString(name.to_owned())
284 })
285}
286
287fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
291 let mut result = HashSet::new();
292 for cells in cells.values() {
293 let Some(cell) = cells.last() else {
294 continue;
295 };
296 let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
297 return Err(DbError::Integrity(
298 "get_channels expected: chid:<chid>".to_owned(),
299 None,
300 ));
301 };
302 result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
303 }
304 Ok(result)
305}
306
307fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
309 let channels = channels.into_owned();
310 let mut cells = Vec::with_capacity(channels.len().min(100_000));
311 for (i, channel_id) in channels.into_iter().enumerate() {
312 if i >= 100_000 {
316 break;
317 }
318 cells.push(cell::Cell {
319 qualifier: format!("chid:{}", channel_id.as_hyphenated()),
320 timestamp: expiry,
321 ..Default::default()
322 });
323 }
324 cells
325}
326
327pub fn retry_policy(max: usize) -> RetryPolicy {
328 RetryPolicy::default()
329 .with_max_retries(max)
330 .with_jitter(true)
331}
332
333fn retryable_internal_err(status: &RpcStatus) -> bool {
334 match status.code() {
335 RpcStatusCode::UNKNOWN => status
336 .message()
337 .eq_ignore_ascii_case("error occurred when fetching oauth2 token."),
338 RpcStatusCode::INTERNAL => [
339 "rst_stream",
340 "rst stream",
341 "received unexpected eos on data frame from server",
342 ]
343 .contains(&status.message().to_lowercase().as_str()),
344 RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
345 _ => false,
346 }
347}
348
349pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
350 let mut metric = metrics
351 .incr_with_tags(MetricName::DatabaseRetry)
352 .with_tag("error", err_type)
353 .with_tag("type", "bigtable");
354 if let Some(code) = code {
355 metric = metric.with_tag("code", code);
356 }
357 metric.send();
358}
359
360pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool + '_ {
361 move |err| {
362 debug!("🉑 Checking grpcio::Error...{err}");
363 match err {
364 grpcio::Error::RpcFailure(status) => {
365 info!("GRPC Failure :{:?}", status);
366 let retry = retryable_internal_err(status);
367 if retry {
368 metric(metrics, "RpcFailure", Some(&status.code().to_string()));
369 }
370 retry
371 }
372 grpcio::Error::BindFail(_) => {
373 metric(metrics, "BindFail", None);
374 true
375 }
376 grpcio::Error::CallFailure(grpc_call_status) => {
379 let retry = grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR;
380 if retry {
381 metric(
382 metrics,
383 "CallFailure",
384 Some(&format!("{grpc_call_status:?}")),
385 );
386 }
387 retry
388 }
389 _ => false,
390 }
391 }
392}
393
394pub fn retryable_bt_err(
395 metrics: &Arc<StatsdClient>,
396) -> impl Fn(&error::BigTableError) -> bool + '_ {
397 move |err| {
398 debug!("🉑 Checking BigTableError...{err}");
399 match err {
400 error::BigTableError::InvalidRowResponse(e)
401 | error::BigTableError::Read(e)
402 | error::BigTableError::Write(e)
403 | error::BigTableError::GRPC(e) => retryable_grpcio_err(metrics)(e),
404 _ => false,
405 }
406 }
407}
408
409fn is_incomplete_router_record(cells: &RowCells) -> bool {
423 cells
424 .keys()
425 .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
426}
427
428fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
429 ::grpcio::CallOption::default().headers(metadata)
430}
431
432impl BigTableClientImpl {
457 pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
458 debug!("🏊 BT Pool new");
460 let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
461 info!("🉑 {:#?}", db_settings);
462 let pool = BigTablePool::new(settings, &metrics)?;
463
464 let metadata = db_settings.metadata()?;
466 let admin_metadata = db_settings.admin_metadata()?;
467 Ok(Self {
468 settings: db_settings,
469 metrics,
470 metadata,
471 admin_metadata,
472 pool,
473 circuit_breaker: Arc::new(CircuitBreaker::default()),
474 })
475 }
476
477 fn router_ttl(&self) -> Duration {
478 self.settings
479 .max_router_ttl
480 .unwrap_or(Duration::from_secs(MAX_ROUTER_TTL_SECS))
481 }
482
483 pub fn spawn_sweeper(&self, interval: Duration) {
485 self.pool.spawn_sweeper(interval);
486 }
487
488 fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
490 read_row_request(
491 &self.settings.table_name,
492 &self.settings.app_profile_id,
493 row_key,
494 )
495 }
496
497 fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
499 let mut req = bigtable::MutateRowRequest::default();
500 req.set_table_name(self.settings.table_name.clone());
501 req.set_app_profile_id(self.settings.app_profile_id.clone());
502 req.set_row_key(row_key.as_bytes().to_vec());
503 req
504 }
505
506 fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
508 let mut req = bigtable::CheckAndMutateRowRequest::default();
509 req.set_table_name(self.settings.table_name.clone());
510 req.set_app_profile_id(self.settings.app_profile_id.clone());
511 req.set_row_key(row_key.as_bytes().to_vec());
512 req
513 }
514
515 async fn mutate_row(
517 &self,
518 req: bigtable::MutateRowRequest,
519 ) -> Result<(), error::BigTableError> {
520 if !self.circuit_breaker.allow_request() {
521 return Err(error::BigTableError::CircuitBreakerOpen);
522 }
523 let bigtable = self.pool.get().await?;
524 let result = retry_policy(self.settings.retry_count)
525 .retry_if(
526 || async {
527 bigtable
528 .conn
529 .mutate_row_opt(&req, call_opts(self.metadata.clone()))
530 },
531 retryable_grpcio_err(&self.metrics),
532 )
533 .await;
534 match &result {
535 Ok(_) => self.circuit_breaker.record_success(),
536 Err(_) => self.circuit_breaker.record_failure(),
537 }
538 result.map_err(error::BigTableError::Write)?;
539 Ok(())
540 }
541
542 #[allow(unused)]
544 async fn mutate_rows(
545 &self,
546 req: bigtable::MutateRowsRequest,
547 ) -> Result<(), error::BigTableError> {
548 let bigtable = self.pool.get().await?;
549 let resp = retry_policy(self.settings.retry_count)
551 .retry_if(
552 || async {
553 bigtable
554 .conn
555 .mutate_rows_opt(&req, call_opts(self.metadata.clone()))
556 },
557 retryable_grpcio_err(&self.metrics),
558 )
559 .await
560 .map_err(error::BigTableError::Write)?;
561
562 let mut stream = Box::pin(resp);
569 let mut cnt = 0;
570 loop {
571 let (result, remainder) = StreamExt::into_future(stream).await;
572 if let Some(result) = result {
573 debug!("🎏 Result block: {}", cnt);
574 match result {
575 Ok(r) => {
576 for e in r.get_entries() {
577 if e.has_status() {
578 let status = e.get_status();
579 let code = error::MutateRowStatus::from(status.get_code());
581 if !code.is_ok() {
582 return Err(error::BigTableError::Status(
583 code,
584 status.get_message().to_owned(),
585 ));
586 }
587 debug!("🎏 Response: {} OK", e.index);
588 }
589 }
590 }
591 Err(e) => return Err(error::BigTableError::Write(e)),
592 };
593 cnt += 1;
594 } else {
595 debug!("🎏 Done!");
596 break;
597 }
598 stream = remainder;
599 }
600
601 Ok(())
602 }
603
604 async fn read_row(
606 &self,
607 req: bigtable::ReadRowsRequest,
608 ) -> Result<Option<row::Row>, error::BigTableError> {
609 let mut rows = self.read_rows(req).await?;
610 Ok(rows.pop_first().map(|(_, v)| v))
611 }
612
613 async fn read_rows(
617 &self,
618 req: ReadRowsRequest,
619 ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
620 if !self.circuit_breaker.allow_request() {
621 return Err(error::BigTableError::CircuitBreakerOpen);
622 }
623 let bigtable = self.pool.get().await?;
624 let result = retry_policy(self.settings.retry_count)
625 .retry_if(
626 || async {
627 let resp: grpcio::ClientSStreamReceiver<bigtable::ReadRowsResponse> = bigtable
628 .conn
629 .read_rows_opt(&req, call_opts(self.metadata.clone()))
630 .map_err(error::BigTableError::Read)?;
631 merge::RowMerger::process_chunks(resp).await
632 },
633 retryable_bt_err(&self.metrics),
634 )
635 .await;
636 match &result {
637 Ok(_) => self.circuit_breaker.record_success(),
638 Err(_) => self.circuit_breaker.record_failure(),
639 }
640 result
641 }
642
643 async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
647 let mut req = self.mutate_row_request(&row.row_key);
648 let mutations = self.get_mutations(row.cells)?;
653 req.set_mutations(mutations);
654 self.mutate_row(req).await?;
655 Ok(())
656 }
657
658 fn get_mutations(
660 &self,
661 cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
662 ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
663 let mut mutations = protobuf::RepeatedField::default();
664 for (family_id, cells) in cells {
665 for cell in cells {
666 let mut mutation = data::Mutation::default();
667 let mut set_cell = data::Mutation_SetCell::default();
668 let timestamp = cell
669 .timestamp
670 .duration_since(SystemTime::UNIX_EPOCH)
671 .map_err(error::BigTableError::WriteTime)?;
672 set_cell.family_name.clone_from(&family_id);
673 set_cell.set_column_qualifier(cell.qualifier.clone().into_bytes());
674 set_cell.set_value(cell.value);
675 set_cell.set_timestamp_micros((timestamp.as_millis() * 1000) as i64);
678 debug!("🉑 expiring in {:?}", timestamp.as_millis());
679 mutation.set_set_cell(set_cell);
680 mutations.push(mutation);
681 }
682 }
683 Ok(mutations)
684 }
685
686 async fn check_and_mutate_row(
694 &self,
695 row: row::Row,
696 filter: RowFilter,
697 state: bool,
698 ) -> Result<bool, error::BigTableError> {
699 let mut req = self.check_and_mutate_row_request(&row.row_key);
700 let mutations = self.get_mutations(row.cells)?;
701 req.set_predicate_filter(filter);
702 if state {
703 req.set_true_mutations(mutations);
704 } else {
705 req.set_false_mutations(mutations);
706 }
707 self.check_and_mutate(req).await
708 }
709
710 async fn check_and_mutate(
711 &self,
712 req: bigtable::CheckAndMutateRowRequest,
713 ) -> Result<bool, error::BigTableError> {
714 let bigtable = self.pool.get().await?;
715 let resp = retry_policy(self.settings.retry_count)
716 .retry_if(
717 || async {
718 bigtable
721 .conn
722 .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
723 },
724 retryable_grpcio_err(&self.metrics),
725 )
726 .await
727 .map_err(error::BigTableError::Write)?;
728 debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
729 Ok(resp.get_predicate_matched())
730 }
731
732 fn get_delete_mutations(
733 &self,
734 family: &str,
735 column_names: &[&str],
736 time_range: Option<&data::TimestampRange>,
737 ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
738 let mut mutations = protobuf::RepeatedField::default();
739 for column in column_names {
740 let mut mutation = data::Mutation::default();
741 let mut del_cell = data::Mutation_DeleteFromColumn::default();
745 del_cell.set_family_name(family.to_owned());
746 del_cell.set_column_qualifier(column.as_bytes().to_vec());
747 if let Some(range) = time_range {
748 del_cell.set_time_range(range.clone());
749 }
750 mutation.set_delete_from_column(del_cell);
751 mutations.push(mutation);
752 }
753 Ok(mutations)
754 }
755
756 #[allow(unused)]
757 #[allow(unused)]
759 async fn delete_cells(
760 &self,
761 row_key: &str,
762 family: &str,
763 column_names: &[&str],
764 time_range: Option<&data::TimestampRange>,
765 ) -> Result<(), error::BigTableError> {
766 let mut req = self.mutate_row_request(row_key);
767 req.set_mutations(self.get_delete_mutations(family, column_names, time_range)?);
768 self.mutate_row(req).await
769 }
770
771 async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
773 let mut req = self.mutate_row_request(row_key);
774 let mut mutations = protobuf::RepeatedField::default();
775 let mut mutation = data::Mutation::default();
776 mutation.set_delete_from_row(data::Mutation_DeleteFromRow::default());
777 mutations.push(mutation);
778 req.set_mutations(mutations);
779 self.mutate_row(req).await
780 }
781
782 #[allow(unused)]
787 async fn delete_rows(&self, row_key: &str) -> Result<bool, error::BigTableError> {
788 let admin = BigtableTableAdminClient::new(self.pool.get_channel()?);
789 let mut req = DropRowRangeRequest::new();
790 req.set_name(self.settings.table_name.clone());
791 req.set_row_key_prefix(row_key.as_bytes().to_vec());
792
793 admin
794 .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone()))
795 .map_err(|e| {
796 error!("{:?}", e);
797 error::BigTableError::Admin(
798 format!(
799 "Could not send delete command for {}",
800 &self.settings.table_name
801 ),
802 Some(e.to_string()),
803 )
804 })?
805 .await
806 .map_err(|e| {
807 error!("post await: {:?}", e);
808 error::BigTableError::Admin(
809 format!(
810 "Could not delete data from table {}",
811 &self.settings.table_name
812 ),
813 Some(e.to_string()),
814 )
815 })?;
816
817 Ok(true)
818 }
819
820 fn rows_to_notifications(
821 &self,
822 rows: BTreeMap<String, Row>,
823 ) -> Result<Vec<Notification>, DbError> {
824 rows.into_iter()
825 .map(|(row_key, row)| self.row_to_notification(&row_key, row))
826 .collect()
827 }
828
829 fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
830 let Some((_, chidmessageid)) = row_key.split_once('#') else {
831 return Err(DbError::Integrity(
832 "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
833 None,
834 ));
835 };
836 let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
837 DbError::Integrity(
838 format!("rows_to_notification expected chidmessageid: {e}"),
839 None,
840 )
841 })?;
842
843 let mut notif = Notification {
845 channel_id: range_key.channel_id,
846 topic: range_key.topic,
847 sortkey_timestamp: range_key.sortkey_timestamp,
848 version: to_string(row.take_required_cell("version")?.value, "version")?,
849 ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
850 timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
851 ..Default::default()
852 };
853
854 if let Some(cell) = row.take_cell("data") {
856 notif.data = Some(to_string(cell.value, "data")?);
857 }
858 #[cfg(feature = "reliable_report")]
859 {
860 if let Some(cell) = row.take_cell("reliability_id") {
861 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
862 }
863 if let Some(cell) = row.take_cell("reliable_state") {
864 notif.reliable_state = Some(
865 crate::reliability::ReliabilityState::from_str(&to_string(
866 cell.value,
867 "reliable_state",
868 )?)
869 .map_err(|e| {
870 DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
871 })?,
872 );
873 }
874 }
875 if let Some(cell) = row.take_cell("headers") {
876 notif.headers = Some(
877 serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
878 .map_err(|e| DbError::Serialization(e.to_string()))?,
879 );
880 }
881 #[cfg(feature = "reliable_report")]
882 if let Some(cell) = row.take_cell("reliability_id") {
883 trace!("🚣 Is reliable");
884 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
885 }
886
887 trace!("🚣 Deserialized message row: {:?}", ¬if);
888 Ok(notif)
889 }
890
891 fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
897 let row_key = user.uaid.simple().to_string();
898 let mut row = Row::new(row_key);
899 let expiry = std::time::SystemTime::now() + self.router_ttl();
900
901 let mut cells: Vec<cell::Cell> = vec![
902 cell::Cell {
903 qualifier: "connected_at".to_owned(),
904 value: user.connected_at.to_be_bytes().to_vec(),
905 timestamp: expiry,
906 ..Default::default()
907 },
908 cell::Cell {
909 qualifier: "router_type".to_owned(),
910 value: user.router_type.clone().into_bytes(),
911 timestamp: expiry,
912 ..Default::default()
913 },
914 cell::Cell {
915 qualifier: "record_version".to_owned(),
916 value: user
917 .record_version
918 .unwrap_or(USER_RECORD_VERSION)
919 .to_be_bytes()
920 .to_vec(),
921 timestamp: expiry,
922 ..Default::default()
923 },
924 cell::Cell {
925 qualifier: "version".to_owned(),
926 value: (*version).into(),
927 timestamp: expiry,
928 ..Default::default()
929 },
930 ];
931
932 if let Some(router_data) = &user.router_data {
933 cells.push(cell::Cell {
934 qualifier: "router_data".to_owned(),
935 value: json!(router_data).to_string().as_bytes().to_vec(),
936 timestamp: expiry,
937 ..Default::default()
938 });
939 };
940 if let Some(current_timestamp) = user.current_timestamp {
941 cells.push(cell::Cell {
942 qualifier: "current_timestamp".to_owned(),
943 value: current_timestamp.to_be_bytes().to_vec(),
944 timestamp: expiry,
945 ..Default::default()
946 });
947 };
948 if let Some(node_id) = &user.node_id {
949 cells.push(cell::Cell {
950 qualifier: "node_id".to_owned(),
951 value: node_id.as_bytes().to_vec(),
952 timestamp: expiry,
953 ..Default::default()
954 });
955 };
956
957 cells.extend(channels_to_cells(
958 Cow::Borrowed(&user.priv_channels),
959 expiry,
960 ));
961
962 row.add_cells(ROUTER_FAMILY, cells);
963 row
964 }
965}
966
967#[derive(Clone)]
968pub struct BigtableDb {
969 pub(super) conn: BigtableClient,
970 pub(super) health_metadata: Metadata,
971 table_name: String,
972}
973
974impl BigtableDb {
975 pub fn new(channel: Channel, health_metadata: &Metadata, table_name: &str) -> Self {
976 Self {
977 conn: BigtableClient::new(channel),
978 health_metadata: health_metadata.clone(),
979 table_name: table_name.to_owned(),
980 }
981 }
982 pub async fn health_check(
989 &mut self,
990 metrics: &Arc<StatsdClient>,
991 app_profile_id: &str,
992 ) -> Result<bool, error::BigTableError> {
993 let random_uaid = Uuid::new_v4().simple().to_string();
1000 let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
1001 let mut filter = data::RowFilter::default();
1002 filter.set_block_all_filter(true);
1003 req.set_filter(filter);
1004 let _r = retry_policy(RETRY_COUNT)
1005 .retry_if(
1006 || async {
1007 self.conn
1008 .read_rows_opt(&req, call_opts(self.health_metadata.clone()))
1009 },
1010 retryable_grpcio_err(metrics),
1011 )
1012 .await
1013 .map_err(error::BigTableError::Read)?;
1014
1015 debug!("🉑 health check");
1016 Ok(true)
1017 }
1018}
1019
1020#[async_trait]
1021impl DbClient for BigTableClientImpl {
1022 async fn add_user(&self, user: &User) -> DbResult<()> {
1024 trace!("🉑 Adding user");
1025 let Some(ref version) = user.version else {
1026 return Err(DbError::General(
1027 "add_user expected a user version field".to_owned(),
1028 ));
1029 };
1030 let row = self.user_to_row(user, version);
1031
1032 let mut row_key_filter = RowFilter::default();
1034 row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
1035 let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
1036
1037 if self.check_and_mutate_row(row, filter, false).await? {
1038 return Err(DbError::Conditional);
1039 }
1040 Ok(())
1041 }
1042
1043 async fn update_user(&self, user: &mut User) -> DbResult<bool> {
1059 let Some(ref version) = user.version else {
1060 return Err(DbError::General(
1061 "update_user expected a user version field".to_owned(),
1062 ));
1063 };
1064
1065 let mut filters = vec![router_gc_policy_filter()];
1066 filters.extend(version_filter(version));
1067 let filter = filter_chain(filters);
1068
1069 let new_version = Uuid::new_v4();
1070 let row = self.user_to_row(user, &new_version);
1072
1073 let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
1074 user.version = Some(new_version);
1075 Ok(predicate_matched)
1076 }
1077
1078 async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
1079 let row_key = uaid.as_simple().to_string();
1080 let mut req = self.read_row_request(&row_key);
1081 let mut filters = vec![router_gc_policy_filter()];
1082 filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
1083 req.set_filter(filter_chain(filters));
1084 let Some(mut row) = self.read_row(req).await? else {
1085 return Ok(None);
1086 };
1087
1088 trace!("🉑 Found a record for {}", row_key);
1089
1090 let connected_at_cell = match row.take_required_cell("connected_at") {
1091 Ok(cell) => cell,
1092 Err(_) => {
1093 if !is_incomplete_router_record(&row.cells) {
1094 return Err(DbError::Integrity(
1095 "Expected column: connected_at".to_owned(),
1096 Some(format!("{row:#?}")),
1097 ));
1098 }
1099 trace!("🉑 Dropping an incomplete user record for {}", row_key);
1104 self.metrics
1105 .incr_with_tags(MetricName::DatabaseDropUser)
1106 .with_tag("reason", "incomplete_record")
1107 .send();
1108 self.remove_user(uaid).await?;
1109 return Ok(None);
1110 }
1111 };
1112
1113 let mut result = User {
1114 uaid: *uaid,
1115 connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1116 router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1117 record_version: Some(to_u64(
1118 row.take_required_cell("record_version")?.value,
1119 "record_version",
1120 )?),
1121 version: Some(
1122 row.take_required_cell("version")?
1123 .value
1124 .try_into()
1125 .map_err(|e| {
1126 DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1127 })?,
1128 ),
1129 ..Default::default()
1130 };
1131
1132 if let Some(cell) = row.take_cell("router_data") {
1133 result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1134 DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1135 })?;
1136 }
1137
1138 if let Some(cell) = row.take_cell("node_id") {
1139 result.node_id = Some(to_string(cell.value, "node_id")?);
1140 }
1141
1142 if let Some(cell) = row.take_cell("current_timestamp") {
1143 result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1144 }
1145
1146 result.priv_channels = channels_from_cells(&row.cells)?;
1148
1149 Ok(Some(result))
1150 }
1151
1152 async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1153 let row_key = uaid.simple().to_string();
1154 self.delete_row(&row_key).await?;
1155 Ok(())
1156 }
1157
1158 async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1159 let channels = HashSet::from_iter([channel_id.to_owned()]);
1160 self.add_channels(uaid, channels).await
1161 }
1162
1163 async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1166 let row_key = uaid.simple().to_string();
1180 let mut row = Row::new(row_key);
1181 let expiry = std::time::SystemTime::now() + self.router_ttl();
1182
1183 row.add_cells(
1187 ROUTER_FAMILY,
1188 channels_to_cells(Cow::Owned(channels), expiry),
1189 );
1190
1191 self.write_row(row).await?;
1192 Ok(())
1193 }
1194
1195 async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1196 let row_key = uaid.simple().to_string();
1197 let mut req = self.read_row_request(&row_key);
1198
1199 let mut cq_filter = data::RowFilter::default();
1200 cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());
1201 req.set_filter(filter_chain(vec![
1202 router_gc_policy_filter(),
1203 family_filter(format!("^{ROUTER_FAMILY}$")),
1204 cq_filter,
1205 ]));
1206
1207 let Some(row) = self.read_row(req).await? else {
1208 return Ok(Default::default());
1209 };
1210 channels_from_cells(&row.cells)
1211 }
1212
1213 async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1215 let row_key = uaid.simple().to_string();
1216 let mut req = self.check_and_mutate_row_request(&row_key);
1217
1218 let column = format!("chid:{}", channel_id.as_hyphenated());
1220 let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1221
1222 let mut row = Row::new(row_key);
1224 let expiry = std::time::SystemTime::now() + self.router_ttl();
1225 row.cells
1226 .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1227 mutations.extend(self.get_mutations(row.cells)?);
1228
1229 let mut cq_filter = data::RowFilter::default();
1231 cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
1232 req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1233 req.set_true_mutations(mutations);
1234
1235 Ok(self.check_and_mutate(req).await?)
1236 }
1237
1238 async fn remove_node_id(
1240 &self,
1241 uaid: &Uuid,
1242 _node_id: &str,
1243 _connected_at: u64,
1244 version: &Option<Uuid>,
1245 ) -> DbResult<bool> {
1246 let row_key = uaid.simple().to_string();
1247 trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1248 let Some(version) = version else {
1249 return Err(DbError::General("Expected a user version field".to_owned()));
1250 };
1251
1252 let mut req = self.check_and_mutate_row_request(&row_key);
1253
1254 let mut filters = vec![router_gc_policy_filter()];
1255 filters.extend(version_filter(version));
1256 req.set_predicate_filter(filter_chain(filters));
1257 req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);
1258
1259 Ok(self.check_and_mutate(req).await?)
1260 }
1261
1262 async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1264 let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1265 debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1266 trace!(
1267 "🉑 timestamp: {:?}",
1268 &message.timestamp.to_be_bytes().to_vec()
1269 );
1270 let mut row = Row::new(row_key);
1271
1272 let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1276 trace!(
1277 "🉑 Message Expiry {}",
1278 expiry
1279 .duration_since(SystemTime::UNIX_EPOCH)
1280 .unwrap_or_default()
1281 .as_millis()
1282 );
1283
1284 let mut cells: Vec<cell::Cell> = Vec::new();
1285
1286 let is_topic = message.topic.is_some();
1287 let family = if is_topic {
1288 MESSAGE_TOPIC_FAMILY
1289 } else {
1290 MESSAGE_FAMILY
1291 };
1292 cells.extend(vec![
1293 cell::Cell {
1294 qualifier: "ttl".to_owned(),
1295 value: message.ttl.to_be_bytes().to_vec(),
1296 timestamp: expiry,
1297 ..Default::default()
1298 },
1299 cell::Cell {
1300 qualifier: "timestamp".to_owned(),
1301 value: message.timestamp.to_be_bytes().to_vec(),
1302 timestamp: expiry,
1303 ..Default::default()
1304 },
1305 cell::Cell {
1306 qualifier: "version".to_owned(),
1307 value: message.version.into_bytes(),
1308 timestamp: expiry,
1309 ..Default::default()
1310 },
1311 ]);
1312 if let Some(headers) = message.headers {
1313 if !headers.is_empty() {
1314 cells.push(cell::Cell {
1315 qualifier: "headers".to_owned(),
1316 value: json!(headers).to_string().into_bytes(),
1317 timestamp: expiry,
1318 ..Default::default()
1319 });
1320 }
1321 }
1322 #[cfg(feature = "reliable_report")]
1323 {
1324 if let Some(reliability_id) = message.reliability_id {
1325 trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1326 cells.push(cell::Cell {
1327 qualifier: "reliability_id".to_owned(),
1328 value: reliability_id.into_bytes(),
1329 timestamp: expiry,
1330 ..Default::default()
1331 });
1332 }
1333 if let Some(reliable_state) = message.reliable_state {
1334 cells.push(cell::Cell {
1335 qualifier: "reliable_state".to_owned(),
1336 value: reliable_state.to_string().into_bytes(),
1337 timestamp: expiry,
1338 ..Default::default()
1339 });
1340 }
1341 }
1342 if let Some(data) = message.data {
1343 cells.push(cell::Cell {
1344 qualifier: "data".to_owned(),
1345 value: data.into_bytes(),
1346 timestamp: expiry,
1347 ..Default::default()
1348 });
1349 }
1350
1351 row.add_cells(family, cells);
1352 trace!("🉑 Adding row");
1353 self.write_row(row).await?;
1354
1355 self.metrics
1356 .incr_with_tags(MetricName::NotificationMessageStored)
1357 .with_tag("topic", &is_topic.to_string())
1358 .with_tag("database", &self.name())
1359 .send();
1360 Ok(())
1361 }
1362
1363 async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1368 for message in messages {
1370 self.save_message(uaid, message).await?;
1371 }
1372 Ok(())
1373 }
1374
1375 async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1389 let row_key = uaid.simple().to_string();
1390 debug!(
1391 "🉑 Updating {} current_timestamp: {:?}",
1392 &row_key,
1393 timestamp.to_be_bytes().to_vec()
1394 );
1395 let expiry = std::time::SystemTime::now() + self.router_ttl();
1396 let mut row = Row::new(row_key.clone());
1397
1398 row.cells.insert(
1399 ROUTER_FAMILY.to_owned(),
1400 vec![
1401 cell::Cell {
1402 qualifier: "current_timestamp".to_owned(),
1403 value: timestamp.to_be_bytes().to_vec(),
1404 timestamp: expiry,
1405 ..Default::default()
1406 },
1407 new_version_cell(expiry),
1408 ],
1409 );
1410
1411 self.write_row(row).await?;
1412
1413 Ok(())
1414 }
1415
1416 async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1418 trace!(
1419 "🉑 attemping to delete {:?} :: {:?}",
1420 uaid.to_string(),
1421 chidmessageid
1422 );
1423 let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1424 debug!("🉑🔥 Deleting message {}", &row_key);
1425 self.delete_row(&row_key).await?;
1426 self.metrics
1427 .incr_with_tags(MetricName::NotificationMessageDeleted)
1428 .with_tag("database", &self.name())
1429 .send();
1430 Ok(())
1431 }
1432
1433 async fn fetch_topic_messages(
1435 &self,
1436 uaid: &Uuid,
1437 limit: usize,
1438 ) -> DbResult<FetchMessageResponse> {
1439 let mut req = ReadRowsRequest::default();
1440 req.set_table_name(self.settings.table_name.clone());
1441 req.set_app_profile_id(self.settings.app_profile_id.clone());
1442
1443 let start_key = format!("{}#01:", uaid.simple());
1444 let end_key = format!("{}#02:", uaid.simple());
1445 let mut rows = data::RowSet::default();
1446 let mut row_range = data::RowRange::default();
1447 row_range.set_start_key_open(start_key.into_bytes());
1448 row_range.set_end_key_open(end_key.into_bytes());
1449 let mut row_ranges = RepeatedField::default();
1450 row_ranges.push(row_range);
1451 rows.set_row_ranges(row_ranges);
1452 req.set_rows(rows);
1453
1454 let mut filters = message_gc_policy_filter()?;
1455 filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1456
1457 req.set_filter(filter_chain(filters));
1458 if limit > 0 {
1459 trace!("🉑 Setting limit to {limit}");
1460 req.set_rows_limit(limit as i64);
1461 }
1462 let rows = self.read_rows(req).await?;
1463 debug!(
1464 "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1465 rows.len(),
1466 limit
1467 );
1468
1469 let messages = self.rows_to_notifications(rows)?;
1470
1471 Ok(FetchMessageResponse {
1475 messages,
1476 timestamp: None,
1477 })
1478 }
1479
1480 async fn fetch_timestamp_messages(
1483 &self,
1484 uaid: &Uuid,
1485 timestamp: Option<u64>,
1486 limit: usize,
1487 ) -> DbResult<FetchMessageResponse> {
1488 let mut req = ReadRowsRequest::default();
1489 req.set_table_name(self.settings.table_name.clone());
1490 req.set_app_profile_id(self.settings.app_profile_id.clone());
1491
1492 let mut rows = data::RowSet::default();
1493 let mut row_range = data::RowRange::default();
1494
1495 let start_key = if let Some(ts) = timestamp {
1496 format!("{}#02:{}z", uaid.simple(), ts)
1499 } else {
1500 format!("{}#02:", uaid.simple())
1501 };
1502 let end_key = format!("{}#03:", uaid.simple());
1503 row_range.set_start_key_open(start_key.into_bytes());
1504 row_range.set_end_key_open(end_key.into_bytes());
1505
1506 let mut row_ranges = RepeatedField::default();
1507 row_ranges.push(row_range);
1508 rows.set_row_ranges(row_ranges);
1509 req.set_rows(rows);
1510
1511 let mut filters = message_gc_policy_filter()?;
1525 filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1526
1527 req.set_filter(filter_chain(filters));
1528 if limit > 0 {
1529 req.set_rows_limit(limit as i64);
1530 }
1531 let rows = self.read_rows(req).await?;
1532 debug!(
1533 "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1534 timestamp,
1535 rows.len(),
1536 limit,
1537 );
1538
1539 let messages = self.rows_to_notifications(rows)?;
1540 let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1542 Ok(FetchMessageResponse {
1543 messages,
1544 timestamp,
1545 })
1546 }
1547
1548 async fn health_check(&self) -> DbResult<bool> {
1549 Ok(self
1550 .pool
1551 .get()
1552 .await?
1553 .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1554 .await?)
1555 }
1556
1557 async fn router_table_exists(&self) -> DbResult<bool> {
1560 Ok(true)
1561 }
1562
1563 async fn message_table_exists(&self) -> DbResult<bool> {
1566 Ok(true)
1567 }
1568
1569 #[cfg(feature = "reliable_report")]
1570 async fn log_report(
1571 &self,
1572 reliability_id: &str,
1573 new_state: crate::reliability::ReliabilityState,
1574 ) -> DbResult<()> {
1575 let row_key = reliability_id.to_owned();
1576
1577 let mut row = Row::new(row_key);
1578 let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1579
1580 let cells: Vec<cell::Cell> = vec![cell::Cell {
1582 qualifier: new_state.to_string(),
1583 value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1584 timestamp: expiry,
1585 ..Default::default()
1586 }];
1587
1588 row.add_cells(RELIABLE_LOG_FAMILY, cells);
1589
1590 self.write_row(row).await?;
1591
1592 Ok(())
1593 }
1594
1595 fn box_clone(&self) -> Box<dyn DbClient> {
1596 Box::new(self.clone())
1597 }
1598
1599 fn name(&self) -> String {
1600 "Bigtable".to_owned()
1601 }
1602
1603 fn pool_status(&self) -> Option<deadpool::Status> {
1604 Some(self.pool.pool.status())
1605 }
1606}
1607
1608#[cfg(all(test, feature = "emulator"))]
1609mod tests {
1610
1611 use std::sync::Arc;
1616 use std::time::SystemTime;
1617
1618 use cadence::StatsdClient;
1619 use uuid;
1620
1621 use super::*;
1622 use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1623
1624 const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1625 const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1626 const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1627
1628 fn now() -> u64 {
1629 SystemTime::now()
1630 .duration_since(SystemTime::UNIX_EPOCH)
1631 .unwrap()
1632 .as_secs()
1633 }
1634
1635 fn new_client() -> DbResult<BigTableClientImpl> {
1636 let env_dsn = format!(
1637 "grpc://{}",
1638 std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1639 );
1640 let settings = DbSettings {
1641 dsn: Some(env_dsn),
1647 db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1648 .to_string(),
1649 };
1650
1651 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1652
1653 BigTableClientImpl::new(metrics, &settings)
1654 }
1655
1656 #[test]
1657 fn escape_bytes_for_regex() {
1658 let b = b"hi";
1659 assert_eq!(escape_bytes(b), b.to_vec());
1660 assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1661 let b = b"\xe2\x80\xb3";
1662 assert_eq!(escape_bytes(b), b.to_vec());
1663 let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1665 assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1666 let b = b"\xc0";
1667 assert_eq!(escape_bytes(b), b.to_vec());
1668 assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1669 }
1670
1671 #[actix_rt::test]
1672 async fn health_check() {
1673 let client = new_client().unwrap();
1674
1675 let result = client.health_check().await;
1676 assert!(result.is_ok());
1677 assert!(result.unwrap());
1678 }
1679
1680 #[actix_rt::test]
1683 async fn run_gauntlet() -> DbResult<()> {
1684 let client = new_client()?;
1685
1686 let connected_at = ms_since_epoch();
1687
1688 let uaid = Uuid::parse_str(TEST_USER).unwrap();
1689 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1690 let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1691
1692 let node_id = "test_node".to_owned();
1693
1694 let _ = client.remove_user(&uaid).await;
1696
1697 let test_user = User {
1698 uaid,
1699 router_type: "webpush".to_owned(),
1700 connected_at,
1701 router_data: None,
1702 node_id: Some(node_id.clone()),
1703 ..Default::default()
1704 };
1705
1706 let _ = client.remove_user(&uaid).await;
1709
1710 client.add_user(&test_user).await?;
1712 let fetched = client.get_user(&uaid).await?;
1713 assert!(fetched.is_some());
1714 let fetched = fetched.unwrap();
1715 assert_eq!(fetched.router_type, "webpush".to_owned());
1716
1717 let connected_at = ms_since_epoch();
1719
1720 client.add_channel(&uaid, &chid).await?;
1722 let channels = client.get_channels(&uaid).await?;
1723 assert!(channels.contains(&chid));
1724
1725 let mut new_channels: HashSet<Uuid> = HashSet::new();
1727 new_channels.insert(chid);
1728 for _ in 1..10 {
1729 new_channels.insert(uuid::Uuid::new_v4());
1730 }
1731 let chid_to_remove = uuid::Uuid::new_v4();
1732 new_channels.insert(chid_to_remove);
1733 client.add_channels(&uaid, new_channels.clone()).await?;
1734 let channels = client.get_channels(&uaid).await?;
1735 assert_eq!(channels, new_channels);
1736
1737 assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1739 assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1740 new_channels.remove(&chid_to_remove);
1741 let channels = client.get_channels(&uaid).await?;
1742 assert_eq!(channels, new_channels);
1743
1744 let mut updated = User {
1748 connected_at,
1749 ..test_user.clone()
1750 };
1751 let result = client.update_user(&mut updated).await;
1752 assert!(result.is_ok());
1753 assert!(!result.unwrap());
1754
1755 let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1757 assert_eq!(fetched.connected_at, fetched2.connected_at);
1758
1759 let mut updated = User {
1761 connected_at: fetched.connected_at + 300,
1762 ..fetched2
1763 };
1764 let result = client.update_user(&mut updated).await;
1765 assert!(result.is_ok());
1766 assert!(result.unwrap());
1767 assert_ne!(
1768 fetched2.connected_at,
1769 client.get_user(&uaid).await?.unwrap().connected_at
1770 );
1771
1772 client
1774 .increment_storage(
1775 &fetched.uaid,
1776 SystemTime::now()
1777 .duration_since(SystemTime::UNIX_EPOCH)
1778 .unwrap()
1779 .as_secs(),
1780 )
1781 .await?;
1782
1783 let test_data = "An_encrypted_pile_of_crap".to_owned();
1784 let timestamp = now();
1785 let sort_key = now();
1786 let test_notification = crate::db::Notification {
1788 channel_id: chid,
1789 version: "test".to_owned(),
1790 ttl: 300,
1791 timestamp,
1792 data: Some(test_data.clone()),
1793 sortkey_timestamp: Some(sort_key),
1794 ..Default::default()
1795 };
1796 let res = client.save_message(&uaid, test_notification.clone()).await;
1797 assert!(res.is_ok());
1798
1799 let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1800 assert_ne!(fetched.messages.len(), 0);
1801 let fm = fetched.messages.pop().unwrap();
1802 assert_eq!(fm.channel_id, test_notification.channel_id);
1803 assert_eq!(fm.data, Some(test_data));
1804
1805 let fetched = client
1807 .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1808 .await?;
1809 assert_ne!(fetched.messages.len(), 0);
1810
1811 let fetched = client
1813 .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1814 .await?;
1815 assert_eq!(fetched.messages.len(), 0);
1816
1817 assert!(client
1819 .remove_message(&uaid, &test_notification.chidmessageid())
1820 .await
1821 .is_ok());
1822
1823 assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1824
1825 client.add_channel(&uaid, &topic_chid).await?;
1827 let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1828 let timestamp = now();
1829 let sort_key = now();
1830 let test_notification = crate::db::Notification {
1832 channel_id: topic_chid,
1833 version: "test".to_owned(),
1834 ttl: 300,
1835 topic: Some("topic".to_owned()),
1836 timestamp,
1837 data: Some(test_data.clone()),
1838 sortkey_timestamp: Some(sort_key),
1839 ..Default::default()
1840 };
1841 assert!(client
1842 .save_message(&uaid, test_notification.clone())
1843 .await
1844 .is_ok());
1845
1846 let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1847 assert_ne!(fetched.messages.len(), 0);
1848 let fm = fetched.messages.pop().unwrap();
1849 assert_eq!(fm.channel_id, test_notification.channel_id);
1850 assert_eq!(fm.data, Some(test_data));
1851
1852 let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1854 assert_ne!(fetched.messages.len(), 0);
1855
1856 assert!(client
1858 .remove_message(&uaid, &test_notification.chidmessageid())
1859 .await
1860 .is_ok());
1861
1862 assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1863
1864 let msgs = client
1865 .fetch_timestamp_messages(&uaid, None, 999)
1866 .await?
1867 .messages;
1868 assert!(msgs.is_empty());
1869
1870 let fetched = client.get_user(&uaid).await?.unwrap();
1871 assert!(client
1872 .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1873 .await
1874 .is_ok());
1875 let fetched = client.get_user(&uaid).await?.unwrap();
1877 assert_eq!(fetched.node_id, None);
1878
1879 assert!(client.remove_user(&uaid).await.is_ok());
1880
1881 assert!(client.get_user(&uaid).await?.is_none());
1882
1883 Ok(())
1884 }
1885
1886 #[actix_rt::test]
1887 async fn read_cells_family_id() -> DbResult<()> {
1888 let client = new_client().unwrap();
1889 let uaid = gen_test_uaid();
1890 client.remove_user(&uaid).await.unwrap();
1891
1892 let qualifier = "foo".to_owned();
1893
1894 let row_key = uaid.simple().to_string();
1895 let mut row = Row::new(row_key.clone());
1896 row.cells.insert(
1897 ROUTER_FAMILY.to_owned(),
1898 vec![cell::Cell {
1899 qualifier: qualifier.to_owned(),
1900 value: "bar".as_bytes().to_vec(),
1901 ..Default::default()
1902 }],
1903 );
1904 client.write_row(row).await.unwrap();
1905 let req = client.read_row_request(&row_key);
1906 let Some(row) = client.read_row(req).await.unwrap() else {
1907 panic!("Expected row");
1908 };
1909 assert_eq!(row.cells.len(), 1);
1910 assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1911 client.remove_user(&uaid).await
1912 }
1913
1914 #[actix_rt::test]
1915 async fn add_user_existing() {
1916 let client = new_client().unwrap();
1917 let uaid = gen_test_uaid();
1918 let user = User {
1919 uaid,
1920 ..Default::default()
1921 };
1922 client.remove_user(&uaid).await.unwrap();
1923
1924 client.add_user(&user).await.unwrap();
1925 let err = client.add_user(&user).await.unwrap_err();
1926 assert!(matches!(err, DbError::Conditional));
1927 }
1928
1929 #[actix_rt::test]
1930 async fn version_check() {
1931 let client = new_client().unwrap();
1932 let uaid = gen_test_uaid();
1933 let user = User {
1934 uaid,
1935 ..Default::default()
1936 };
1937 client.remove_user(&uaid).await.unwrap();
1938
1939 client.add_user(&user).await.unwrap();
1940 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1941 assert!(client.update_user(&mut user.clone()).await.unwrap());
1942
1943 let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1944 assert_ne!(user.version, fetched.version);
1945 assert!(!client.update_user(&mut user).await.unwrap());
1947
1948 client.remove_user(&uaid).await.unwrap();
1949 }
1950
1951 #[actix_rt::test]
1952 async fn lingering_chid_record() {
1953 let client = new_client().unwrap();
1954 let uaid = gen_test_uaid();
1955 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1956 let user = User {
1957 uaid,
1958 ..Default::default()
1959 };
1960 client.remove_user(&uaid).await.unwrap();
1961
1962 client.add_channel(&uaid, &chid).await.unwrap();
1964
1965 assert!(client.get_user(&uaid).await.unwrap().is_none());
1968
1969 client.add_user(&user).await.unwrap();
1970 assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1972
1973 client.remove_user(&uaid).await.unwrap();
1974 }
1975
1976 #[actix_rt::test]
1977 async fn lingering_current_timestamp() {
1978 let client = new_client().unwrap();
1979 let uaid = gen_test_uaid();
1980 client.remove_user(&uaid).await.unwrap();
1981
1982 client
1983 .increment_storage(&uaid, crate::util::sec_since_epoch())
1984 .await
1985 .unwrap();
1986 assert!(client.get_user(&uaid).await.unwrap().is_none());
1987
1988 client.remove_user(&uaid).await.unwrap();
1989 }
1990
1991 #[actix_rt::test]
1992 async fn lingering_chid_w_version_record() {
1993 let client = new_client().unwrap();
1994 let uaid = gen_test_uaid();
1995 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1996 client.remove_user(&uaid).await.unwrap();
1997
1998 client.add_channel(&uaid, &chid).await.unwrap();
1999 assert!(client.remove_channel(&uaid, &chid).await.unwrap());
2000 assert!(client.get_user(&uaid).await.unwrap().is_none());
2001
2002 client.remove_user(&uaid).await.unwrap();
2003 }
2004
2005 #[actix_rt::test]
2006 async fn channel_and_current_timestamp_ttl_updates() {
2007 let client = new_client().unwrap();
2008 let uaid = gen_test_uaid();
2009 let chid = Uuid::parse_str(TEST_CHID).unwrap();
2010 client.remove_user(&uaid).await.unwrap();
2011
2012 let user = User {
2014 uaid,
2015 ..Default::default()
2016 };
2017 client.add_user(&user).await.unwrap();
2018
2019 client.add_channel(&uaid, &chid).await.unwrap();
2020 client
2021 .add_channel(&uaid, &uuid::Uuid::new_v4())
2022 .await
2023 .unwrap();
2024
2025 client
2026 .increment_storage(
2027 &uaid,
2028 SystemTime::now()
2029 .duration_since(SystemTime::UNIX_EPOCH)
2030 .unwrap()
2031 .as_secs(),
2032 )
2033 .await
2034 .unwrap();
2035
2036 let req = client.read_row_request(&uaid.as_simple().to_string());
2037 let Some(mut row) = client.read_row(req).await.unwrap() else {
2038 panic!("Expected row");
2039 };
2040
2041 let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
2044 for mut cells in row.cells.into_values() {
2045 let Some(cell) = cells.pop() else {
2046 continue;
2047 };
2048 assert!(
2049 cell.timestamp >= ca_expiry,
2050 "{} cell timestamp should >= connected_at's",
2051 cell.qualifier
2052 );
2053 }
2054
2055 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
2056
2057 tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
2059 client.update_user(&mut user).await.unwrap();
2060
2061 let req = client.read_row_request(&uaid.as_simple().to_string());
2063 let Some(mut row) = client.read_row(req).await.unwrap() else {
2064 panic!("Expected row");
2065 };
2066
2067 let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
2068
2069 assert!(ca_expiry2 > ca_expiry);
2070
2071 for mut cells in row.cells.into_values() {
2072 let Some(cell) = cells.pop() else {
2073 continue;
2074 };
2075 assert!(
2076 cell.timestamp >= ca_expiry2,
2077 "{} cell timestamp expiry should exceed connected_at's",
2078 cell.qualifier
2079 );
2080 }
2081
2082 client.remove_user(&uaid).await.unwrap();
2083 }
2084}