1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
8use std::time::{Duration, SystemTime};
9
10use again::RetryPolicy;
11use async_trait::async_trait;
12use cadence::StatsdClient;
13#[cfg(feature = "reliable_report")]
14use chrono::TimeDelta;
15use gcp_auth::TokenProvider;
16use googleapis_tonic_google_bigtable_v2::google::bigtable::v2 as bigtable;
17use googleapis_tonic_google_bigtable_v2::google::bigtable::v2::bigtable_client::BigtableClient;
18use serde_json::{from_str, json};
19use tonic::metadata::{AsciiMetadataValue, MetadataMap};
20use tonic::transport::Channel;
21use tonic::{Code, Request, Status};
22use uuid::Uuid;
23
24use crate::MAX_ROUTER_TTL_SECS;
25use crate::db::{
26 DbSettings, Notification, USER_RECORD_VERSION, User,
27 client::{DbClient, FetchMessageResponse},
28 error::{DbError, DbResult},
29 models::RangeKey,
30};
31use crate::metric_name::MetricName;
32use crate::metrics::StatsdClientExt;
33
34pub use self::metadata::MetadataBuilder;
35use self::row::{Row, RowCells};
36use super::BigTableDbSettings;
37use super::pool::BigTablePool;
38
39pub mod cell;
40pub mod error;
41pub(crate) mod merge;
42pub mod metadata;
43pub mod row;
44
45pub type RowKey = String;
47
48pub type Qualifier = String;
53pub type FamilyId = String;
54
55const ROUTER_FAMILY: &str = "router";
56const MESSAGE_FAMILY: &str = "message"; const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
58#[cfg(feature = "reliable_report")]
59const RELIABLE_LOG_FAMILY: &str = "reliability";
60#[cfg(feature = "reliable_report")]
61pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
64
65pub(crate) const RETRY_COUNT: usize = 5;
66
67const MAX_MESSAGE_LEN: usize = 1 << 28;
69
70const BIGTABLE_DATA_SCOPES: &[&str] = &["https://www.googleapis.com/auth/bigtable.data"];
72
73#[derive(Debug)]
78pub struct CircuitBreaker {
79 consecutive_failures: AtomicU32,
80 opened_at_epoch_secs: AtomicU64,
81 failure_threshold: u32,
82 cooldown_secs: u64,
83}
84
85impl CircuitBreaker {
86 pub fn new(failure_threshold: u32, cooldown_secs: u64) -> Self {
87 Self {
88 consecutive_failures: AtomicU32::new(0),
89 opened_at_epoch_secs: AtomicU64::new(0),
90 failure_threshold,
91 cooldown_secs,
92 }
93 }
94
95 pub fn allow_request(&self) -> bool {
98 let failures = self.consecutive_failures.load(Ordering::Relaxed);
99 if failures < self.failure_threshold {
100 return true;
101 }
102 let opened_at = self.opened_at_epoch_secs.load(Ordering::Relaxed);
104 let now = SystemTime::now()
105 .duration_since(SystemTime::UNIX_EPOCH)
106 .unwrap_or_default()
107 .as_secs();
108 if now.saturating_sub(opened_at) >= self.cooldown_secs {
109 true
111 } else {
112 false
113 }
114 }
115
116 pub fn record_success(&self) {
118 self.consecutive_failures.store(0, Ordering::Relaxed);
119 }
120
121 pub fn record_failure(&self) {
123 let prev = self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
124 if prev + 1 >= self.failure_threshold {
125 let now = SystemTime::now()
126 .duration_since(SystemTime::UNIX_EPOCH)
127 .unwrap_or_default()
128 .as_secs();
129 self.opened_at_epoch_secs.store(now, Ordering::Relaxed);
130 }
131 }
132}
133
134impl Default for CircuitBreaker {
135 fn default() -> Self {
136 Self::new(5, 30)
138 }
139}
140
141struct Uaid(Uuid);
144
145impl Display for Uaid {
146 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
147 write!(f, "{}", self.0.as_simple())
148 }
149}
150
151impl From<Uaid> for String {
152 fn from(uaid: Uaid) -> String {
153 uaid.0.as_simple().to_string()
154 }
155}
156
157#[derive(Clone)]
158pub struct BigTableClientImpl {
160 pub(crate) settings: BigTableDbSettings,
161 metrics: Arc<StatsdClient>,
163 pool: BigTablePool,
165 metadata: MetadataMap,
166 circuit_breaker: Arc<CircuitBreaker>,
168}
169
170fn router_gc_policy_filter() -> bigtable::RowFilter {
172 bigtable::RowFilter {
173 filter: Some(bigtable::row_filter::Filter::CellsPerColumnLimitFilter(1)),
174 }
175}
176
177fn message_gc_policy_filter() -> Result<Vec<bigtable::RowFilter>, error::BigTableError> {
180 let bt_now: i64 = SystemTime::now()
181 .duration_since(SystemTime::UNIX_EPOCH)
182 .map_err(error::BigTableError::WriteTime)?
183 .as_millis() as i64;
184 let timestamp_filter = bigtable::RowFilter {
185 filter: Some(bigtable::row_filter::Filter::TimestampRangeFilter(
186 bigtable::TimestampRange {
187 start_timestamp_micros: bt_now * 1000,
188 end_timestamp_micros: 0,
189 },
190 )),
191 };
192
193 Ok(vec![router_gc_policy_filter(), timestamp_filter])
194}
195
196fn family_filter(regex: String) -> bigtable::RowFilter {
198 bigtable::RowFilter {
199 filter: Some(bigtable::row_filter::Filter::FamilyNameRegexFilter(regex)),
200 }
201}
202
203fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
207 let mut vec = Vec::with_capacity(bytes.len() * 2);
208 for &b in bytes {
209 if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
210 if b == b'\0' {
211 vec.extend("\\x00".as_bytes());
216 continue;
217 }
218 vec.push(b'\\');
219 }
220 vec.push(b);
221 }
222 vec
223}
224
225fn version_filter(version: &Uuid) -> Vec<bigtable::RowFilter> {
228 let cq_filter = bigtable::RowFilter {
229 filter: Some(bigtable::row_filter::Filter::ColumnQualifierRegexFilter(
230 "^version$".as_bytes().to_vec(),
231 )),
232 };
233 let value_filter = bigtable::RowFilter {
234 filter: Some(bigtable::row_filter::Filter::ValueRegexFilter(
235 escape_bytes(version.as_bytes()),
236 )),
237 };
238
239 vec![
240 family_filter(format!("^{ROUTER_FAMILY}$")),
241 cq_filter,
242 value_filter,
243 ]
244}
245
246fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
248 cell::Cell {
249 qualifier: "version".to_owned(),
250 value: Uuid::new_v4().into(),
251 timestamp,
252 ..Default::default()
253 }
254}
255
256fn filter_chain(filters: Vec<bigtable::RowFilter>) -> bigtable::RowFilter {
258 bigtable::RowFilter {
259 filter: Some(bigtable::row_filter::Filter::Chain(
260 bigtable::row_filter::Chain { filters },
261 )),
262 }
263}
264
265fn read_row_request(
267 table_name: &str,
268 app_profile_id: &str,
269 row_key: &str,
270) -> bigtable::ReadRowsRequest {
271 bigtable::ReadRowsRequest {
272 table_name: table_name.to_owned(),
273 app_profile_id: app_profile_id.to_owned(),
274 rows: Some(bigtable::RowSet {
275 row_keys: vec![row_key.as_bytes().to_vec()],
276 row_ranges: Vec::new(),
277 }),
278 ..Default::default()
279 }
280}
281
282fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
283 let v: [u8; 8] = value
284 .try_into()
285 .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
286 Ok(u64::from_be_bytes(v))
287}
288
289fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
290 String::from_utf8(value).map_err(|e| {
291 debug!("🉑 cannot read string {}: {:?}", name, e);
292 DbError::DeserializeString(name.to_owned())
293 })
294}
295
296fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
300 let mut result = HashSet::new();
301 for cells in cells.values() {
302 let Some(cell) = cells.last() else {
303 continue;
304 };
305 let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
306 return Err(DbError::Integrity(
307 "get_channels expected: chid:<chid>".to_owned(),
308 None,
309 ));
310 };
311 result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
312 }
313 Ok(result)
314}
315
316fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
318 let channels = channels.into_owned();
319 let mut cells = Vec::with_capacity(channels.len().min(100_000));
320 for (i, channel_id) in channels.into_iter().enumerate() {
321 if i >= 100_000 {
325 break;
326 }
327 cells.push(cell::Cell {
328 qualifier: format!("chid:{}", channel_id.as_hyphenated()),
329 timestamp: expiry,
330 ..Default::default()
331 });
332 }
333 cells
334}
335
336pub fn retry_policy(max: usize) -> RetryPolicy {
337 RetryPolicy::default()
338 .with_max_retries(max)
339 .with_jitter(true)
340}
341
342fn retryable_internal_err(status: &Status) -> bool {
343 match status.code() {
344 Code::Unknown => true,
347 Code::Internal => [
348 "rst_stream",
349 "rst stream",
350 "received unexpected eos on data frame from server",
351 ]
352 .contains(&status.message().to_lowercase().as_str()),
353 Code::Unavailable | Code::DeadlineExceeded => true,
354 _ => false,
355 }
356}
357
358pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
359 let mut metric = metrics
360 .incr_with_tags(MetricName::DatabaseRetry)
361 .with_tag("error", err_type)
362 .with_tag("type", "bigtable");
363 if let Some(code) = code {
364 metric = metric.with_tag("code", code);
365 }
366 metric.send();
367}
368
369pub fn retryable_status(metrics: &Arc<StatsdClient>) -> impl Fn(&Status) -> bool + '_ {
370 move |status| {
371 info!("GRPC Failure: {:?}", status);
372 let retry = retryable_internal_err(status);
373 if retry {
374 metric(metrics, "RpcFailure", Some(&format!("{:?}", status.code())));
375 }
376 retry
377 }
378}
379
380pub fn retryable_bt_err(
381 metrics: &Arc<StatsdClient>,
382) -> impl Fn(&error::BigTableError) -> bool + '_ {
383 move |err| {
384 debug!("🉑 Checking BigTableError...{err}");
385 match err {
386 error::BigTableError::InvalidRowResponse(s)
387 | error::BigTableError::Read(s)
388 | error::BigTableError::Write(s) => retryable_status(metrics)(s),
389 error::BigTableError::Auth(_) => {
393 metric(metrics, "Auth", None);
394 true
395 }
396 _ => false,
397 }
398 }
399}
400
401fn is_incomplete_router_record(cells: &RowCells) -> bool {
415 cells
416 .keys()
417 .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
418}
419
420impl BigTableClientImpl {
445 pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
446 debug!("🏊 BT Pool new");
447 let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
448 info!("🉑 {:#?}", db_settings);
449 let pool = BigTablePool::new(settings, &metrics)?;
450
451 let metadata = db_settings.metadata()?;
453 Ok(Self {
454 settings: db_settings,
455 metrics,
456 metadata,
457 pool,
458 circuit_breaker: Arc::new(CircuitBreaker::default()),
459 })
460 }
461
462 fn router_ttl(&self) -> Duration {
463 self.settings
464 .max_router_ttl
465 .unwrap_or(Duration::from_secs(MAX_ROUTER_TTL_SECS))
466 }
467
468 pub fn spawn_sweeper(&self, interval: Duration) {
470 self.pool.spawn_sweeper(interval);
471 }
472
473 fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
475 read_row_request(
476 &self.settings.table_name,
477 &self.settings.app_profile_id,
478 row_key,
479 )
480 }
481
482 fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
484 bigtable::MutateRowRequest {
485 table_name: self.settings.table_name.clone(),
486 app_profile_id: self.settings.app_profile_id.clone(),
487 row_key: row_key.as_bytes().to_vec(),
488 ..Default::default()
489 }
490 }
491
492 fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
494 bigtable::CheckAndMutateRowRequest {
495 table_name: self.settings.table_name.clone(),
496 app_profile_id: self.settings.app_profile_id.clone(),
497 row_key: row_key.as_bytes().to_vec(),
498 ..Default::default()
499 }
500 }
501
502 async fn mutate_row(
504 &self,
505 req: bigtable::MutateRowRequest,
506 ) -> Result<(), error::BigTableError> {
507 if !self.circuit_breaker.allow_request() {
508 return Err(error::BigTableError::CircuitBreakerOpen);
509 }
510 let bigtable = self.pool.get().await?;
511 let result = retry_policy(self.settings.retry_count)
512 .retry_if(
513 || async {
514 let request = bigtable.request(req.clone(), &self.metadata).await?;
515 bigtable
516 .conn
517 .clone()
518 .mutate_row(request)
519 .await
520 .map_err(error::BigTableError::Write)?;
521 Ok(())
522 },
523 retryable_bt_err(&self.metrics),
524 )
525 .await;
526 match &result {
527 Ok(()) => self.circuit_breaker.record_success(),
528 Err(_) => self.circuit_breaker.record_failure(),
529 }
530 result
531 }
532
533 async fn read_row(
535 &self,
536 req: bigtable::ReadRowsRequest,
537 ) -> Result<Option<row::Row>, error::BigTableError> {
538 let mut rows = self.read_rows(req).await?;
539 Ok(rows.pop_first().map(|(_, v)| v))
540 }
541
542 async fn read_rows(
546 &self,
547 req: bigtable::ReadRowsRequest,
548 ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
549 if !self.circuit_breaker.allow_request() {
550 return Err(error::BigTableError::CircuitBreakerOpen);
551 }
552 let bigtable = self.pool.get().await?;
553 let result = retry_policy(self.settings.retry_count)
554 .retry_if(
555 || async {
556 let request = bigtable.request(req.clone(), &self.metadata).await?;
557 let resp = bigtable
558 .conn
559 .clone()
560 .read_rows(request)
561 .await
562 .map_err(error::BigTableError::Read)?
563 .into_inner();
564 merge::RowMerger::process_chunks(resp).await
565 },
566 retryable_bt_err(&self.metrics),
567 )
568 .await;
569 match &result {
570 Ok(_) => self.circuit_breaker.record_success(),
571 Err(_) => self.circuit_breaker.record_failure(),
572 }
573 result
574 }
575
576 async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
580 let mut req = self.mutate_row_request(&row.row_key);
581 req.mutations = self.get_mutations(row.cells)?;
586 self.mutate_row(req).await?;
587 Ok(())
588 }
589
590 fn get_mutations(
592 &self,
593 cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
594 ) -> Result<Vec<bigtable::Mutation>, error::BigTableError> {
595 let mut mutations = Vec::new();
596 for (family_id, cells) in cells {
597 for cell in cells {
598 let timestamp = cell
599 .timestamp
600 .duration_since(SystemTime::UNIX_EPOCH)
601 .map_err(error::BigTableError::WriteTime)?;
602 debug!("🉑 expiring in {:?}", timestamp.as_millis());
603 mutations.push(bigtable::Mutation {
604 mutation: Some(bigtable::mutation::Mutation::SetCell(
605 bigtable::mutation::SetCell {
606 family_name: family_id.clone(),
607 column_qualifier: cell.qualifier.clone().into_bytes(),
608 timestamp_micros: (timestamp.as_millis() * 1000) as i64,
612 value: cell.value,
613 },
614 )),
615 });
616 }
617 }
618 Ok(mutations)
619 }
620
621 async fn check_and_mutate_row(
629 &self,
630 row: row::Row,
631 filter: bigtable::RowFilter,
632 state: bool,
633 ) -> Result<bool, error::BigTableError> {
634 let mut req = self.check_and_mutate_row_request(&row.row_key);
635 let mutations = self.get_mutations(row.cells)?;
636 req.predicate_filter = Some(filter);
637 if state {
638 req.true_mutations = mutations;
639 } else {
640 req.false_mutations = mutations;
641 }
642 self.check_and_mutate(req).await
643 }
644
645 async fn check_and_mutate(
646 &self,
647 req: bigtable::CheckAndMutateRowRequest,
648 ) -> Result<bool, error::BigTableError> {
649 let bigtable = self.pool.get().await?;
650 let resp = retry_policy(self.settings.retry_count)
651 .retry_if(
652 || async {
653 let request = bigtable.request(req.clone(), &self.metadata).await?;
654 bigtable
655 .conn
656 .clone()
657 .check_and_mutate_row(request)
658 .await
659 .map_err(error::BigTableError::Write)
660 },
661 retryable_bt_err(&self.metrics),
662 )
663 .await?
664 .into_inner();
665 debug!("🉑 Predicate Matched: {}", &resp.predicate_matched);
666 Ok(resp.predicate_matched)
667 }
668
669 fn get_delete_mutations(
670 &self,
671 family: &str,
672 column_names: &[&str],
673 time_range: Option<&bigtable::TimestampRange>,
674 ) -> Result<Vec<bigtable::Mutation>, error::BigTableError> {
675 let mut mutations = Vec::new();
676 for column in column_names {
677 mutations.push(bigtable::Mutation {
681 mutation: Some(bigtable::mutation::Mutation::DeleteFromColumn(
682 bigtable::mutation::DeleteFromColumn {
683 family_name: family.to_owned(),
684 column_qualifier: column.as_bytes().to_vec(),
685 time_range: time_range.cloned(),
686 },
687 )),
688 });
689 }
690 Ok(mutations)
691 }
692
693 async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
695 let mut req = self.mutate_row_request(row_key);
696 req.mutations = vec![bigtable::Mutation {
697 mutation: Some(bigtable::mutation::Mutation::DeleteFromRow(
698 bigtable::mutation::DeleteFromRow {},
699 )),
700 }];
701 self.mutate_row(req).await
702 }
703
704 fn rows_to_notifications(
705 &self,
706 rows: BTreeMap<String, Row>,
707 ) -> Result<Vec<Notification>, DbError> {
708 rows.into_iter()
709 .map(|(row_key, row)| self.row_to_notification(&row_key, row))
710 .collect()
711 }
712
713 fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
714 let Some((_, chidmessageid)) = row_key.split_once('#') else {
715 return Err(DbError::Integrity(
716 "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
717 None,
718 ));
719 };
720 let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
721 DbError::Integrity(
722 format!("rows_to_notification expected chidmessageid: {e}"),
723 None,
724 )
725 })?;
726
727 let mut notif = Notification {
729 channel_id: range_key.channel_id,
730 topic: range_key.topic,
731 sortkey_timestamp: range_key.sortkey_timestamp,
732 version: to_string(row.take_required_cell("version")?.value, "version")?,
733 ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
734 timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
735 ..Default::default()
736 };
737
738 if let Some(cell) = row.take_cell("data") {
740 notif.data = Some(to_string(cell.value, "data")?);
741 }
742 #[cfg(feature = "reliable_report")]
743 {
744 if let Some(cell) = row.take_cell("reliability_id") {
745 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
746 }
747 if let Some(cell) = row.take_cell("reliable_state") {
748 notif.reliable_state = Some(
749 crate::reliability::ReliabilityState::from_str(&to_string(
750 cell.value,
751 "reliable_state",
752 )?)
753 .map_err(|e| {
754 DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
755 })?,
756 );
757 }
758 }
759 if let Some(cell) = row.take_cell("headers") {
760 notif.headers = Some(
761 serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
762 .map_err(|e| DbError::Serialization(e.to_string()))?,
763 );
764 }
765 #[cfg(feature = "reliable_report")]
766 if let Some(cell) = row.take_cell("reliability_id") {
767 trace!("🚣 Is reliable");
768 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
769 }
770
771 trace!("🚣 Deserialized message row: {:?}", ¬if);
772 Ok(notif)
773 }
774
775 fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
781 let row_key = user.uaid.simple().to_string();
782 let mut row = Row::new(row_key);
783 let expiry = std::time::SystemTime::now() + self.router_ttl();
784
785 let mut cells: Vec<cell::Cell> = vec![
786 cell::Cell {
787 qualifier: "connected_at".to_owned(),
788 value: user.connected_at.to_be_bytes().to_vec(),
789 timestamp: expiry,
790 ..Default::default()
791 },
792 cell::Cell {
793 qualifier: "router_type".to_owned(),
794 value: user.router_type.clone().into_bytes(),
795 timestamp: expiry,
796 ..Default::default()
797 },
798 cell::Cell {
799 qualifier: "record_version".to_owned(),
800 value: user
801 .record_version
802 .unwrap_or(USER_RECORD_VERSION)
803 .to_be_bytes()
804 .to_vec(),
805 timestamp: expiry,
806 ..Default::default()
807 },
808 cell::Cell {
809 qualifier: "version".to_owned(),
810 value: (*version).into(),
811 timestamp: expiry,
812 ..Default::default()
813 },
814 ];
815
816 if let Some(router_data) = &user.router_data {
817 cells.push(cell::Cell {
818 qualifier: "router_data".to_owned(),
819 value: json!(router_data).to_string().as_bytes().to_vec(),
820 timestamp: expiry,
821 ..Default::default()
822 });
823 };
824 if let Some(current_timestamp) = user.current_timestamp {
825 cells.push(cell::Cell {
826 qualifier: "current_timestamp".to_owned(),
827 value: current_timestamp.to_be_bytes().to_vec(),
828 timestamp: expiry,
829 ..Default::default()
830 });
831 };
832 if let Some(node_id) = &user.node_id {
833 cells.push(cell::Cell {
834 qualifier: "node_id".to_owned(),
835 value: node_id.as_bytes().to_vec(),
836 timestamp: expiry,
837 ..Default::default()
838 });
839 };
840
841 cells.extend(channels_to_cells(
842 Cow::Borrowed(&user.priv_channels),
843 expiry,
844 ));
845
846 row.add_cells(ROUTER_FAMILY, cells);
847 row
848 }
849}
850
851#[derive(Clone)]
852pub struct BigtableDb {
853 pub(super) conn: BigtableClient<Channel>,
854 pub(super) health_metadata: MetadataMap,
855 auth_provider: Option<Arc<dyn TokenProvider>>,
859 table_name: String,
860}
861
862impl BigtableDb {
863 pub fn new(
864 channel: Channel,
865 auth_provider: Option<Arc<dyn TokenProvider>>,
866 health_metadata: &MetadataMap,
867 table_name: &str,
868 ) -> Self {
869 Self {
870 conn: BigtableClient::new(channel)
871 .max_decoding_message_size(MAX_MESSAGE_LEN)
872 .max_encoding_message_size(MAX_MESSAGE_LEN),
873 health_metadata: health_metadata.clone(),
874 auth_provider,
875 table_name: table_name.to_owned(),
876 }
877 }
878
879 pub(super) async fn request<T>(
887 &self,
888 msg: T,
889 metadata: &MetadataMap,
890 ) -> Result<Request<T>, error::BigTableError> {
891 let mut request = Request::new(msg);
892 *request.metadata_mut() = metadata.clone();
893 if let Some(provider) = &self.auth_provider {
894 let token = provider
895 .token(BIGTABLE_DATA_SCOPES)
896 .await
897 .map_err(error::BigTableError::Auth)?;
898 let value: AsciiMetadataValue = format!("Bearer {}", token.as_str())
899 .parse()
900 .map_err(|e| error::BigTableError::Config(format!("Invalid auth token: {e}")))?;
901 request.metadata_mut().insert("authorization", value);
902 }
903 Ok(request)
904 }
905
906 pub async fn health_check(
913 &mut self,
914 metrics: &Arc<StatsdClient>,
915 app_profile_id: &str,
916 ) -> Result<bool, error::BigTableError> {
917 let random_uaid = Uuid::new_v4().simple().to_string();
924 let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
925 req.filter = Some(bigtable::RowFilter {
926 filter: Some(bigtable::row_filter::Filter::BlockAllFilter(true)),
927 });
928 let _r = retry_policy(RETRY_COUNT)
929 .retry_if(
930 || async {
931 let request = self.request(req.clone(), &self.health_metadata).await?;
932 self.conn
933 .clone()
934 .read_rows(request)
935 .await
936 .map_err(error::BigTableError::Read)
937 },
938 retryable_bt_err(metrics),
939 )
940 .await?;
941
942 debug!("🉑 health check");
943 Ok(true)
944 }
945}
946
947#[async_trait]
948impl DbClient for BigTableClientImpl {
949 async fn add_user(&self, user: &User) -> DbResult<()> {
951 trace!("🉑 Adding user");
952 let Some(ref version) = user.version else {
953 return Err(DbError::General(
954 "add_user expected a user version field".to_owned(),
955 ));
956 };
957 let row = self.user_to_row(user, version);
958
959 let row_key_filter = bigtable::RowFilter {
961 filter: Some(bigtable::row_filter::Filter::RowKeyRegexFilter(
962 format!("^{}$", row.row_key).into_bytes(),
963 )),
964 };
965 let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
966
967 if self.check_and_mutate_row(row, filter, false).await? {
968 return Err(DbError::Conditional);
969 }
970 Ok(())
971 }
972
973 async fn update_user(&self, user: &mut User) -> DbResult<bool> {
989 let Some(ref version) = user.version else {
990 return Err(DbError::General(
991 "update_user expected a user version field".to_owned(),
992 ));
993 };
994
995 let mut filters = vec![router_gc_policy_filter()];
996 filters.extend(version_filter(version));
997 let filter = filter_chain(filters);
998
999 let new_version = Uuid::new_v4();
1000 let row = self.user_to_row(user, &new_version);
1002
1003 let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
1004 user.version = Some(new_version);
1005 Ok(predicate_matched)
1006 }
1007
1008 async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
1009 let row_key = uaid.as_simple().to_string();
1010 let mut req = self.read_row_request(&row_key);
1011 let mut filters = vec![router_gc_policy_filter()];
1012 filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
1013 req.filter = Some(filter_chain(filters));
1014 let Some(mut row) = self.read_row(req).await? else {
1015 return Ok(None);
1016 };
1017
1018 trace!("🉑 Found a record for {}", row_key);
1019
1020 let connected_at_cell = match row.take_required_cell("connected_at") {
1021 Ok(cell) => cell,
1022 Err(_) => {
1023 if !is_incomplete_router_record(&row.cells) {
1024 return Err(DbError::Integrity(
1025 "Expected column: connected_at".to_owned(),
1026 Some(format!("{row:#?}")),
1027 ));
1028 }
1029 trace!("🉑 Dropping an incomplete user record for {}", row_key);
1034 self.metrics
1035 .incr_with_tags(MetricName::DatabaseDropUser)
1036 .with_tag("reason", "incomplete_record")
1037 .send();
1038 self.remove_user(uaid).await?;
1039 return Ok(None);
1040 }
1041 };
1042
1043 let mut result = User {
1044 uaid: *uaid,
1045 connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1046 router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1047 record_version: Some(to_u64(
1048 row.take_required_cell("record_version")?.value,
1049 "record_version",
1050 )?),
1051 version: Some(
1052 row.take_required_cell("version")?
1053 .value
1054 .try_into()
1055 .map_err(|e| {
1056 DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1057 })?,
1058 ),
1059 ..Default::default()
1060 };
1061
1062 if let Some(cell) = row.take_cell("router_data") {
1063 result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1064 DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1065 })?;
1066 }
1067
1068 if let Some(cell) = row.take_cell("node_id") {
1069 result.node_id = Some(to_string(cell.value, "node_id")?);
1070 }
1071
1072 if let Some(cell) = row.take_cell("current_timestamp") {
1073 result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1074 }
1075
1076 result.priv_channels = channels_from_cells(&row.cells)?;
1078
1079 Ok(Some(result))
1080 }
1081
1082 async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1083 let row_key = uaid.simple().to_string();
1084 self.delete_row(&row_key).await?;
1085 Ok(())
1086 }
1087
1088 async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1089 let channels = HashSet::from_iter([channel_id.to_owned()]);
1090 self.add_channels(uaid, channels).await
1091 }
1092
1093 async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1096 let row_key = uaid.simple().to_string();
1110 let mut row = Row::new(row_key);
1111 let expiry = std::time::SystemTime::now() + self.router_ttl();
1112
1113 row.add_cells(
1117 ROUTER_FAMILY,
1118 channels_to_cells(Cow::Owned(channels), expiry),
1119 );
1120
1121 self.write_row(row).await?;
1122 Ok(())
1123 }
1124
1125 async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1126 let row_key = uaid.simple().to_string();
1127 let mut req = self.read_row_request(&row_key);
1128
1129 let cq_filter = bigtable::RowFilter {
1130 filter: Some(bigtable::row_filter::Filter::ColumnQualifierRegexFilter(
1131 "^chid:.*$".as_bytes().to_vec(),
1132 )),
1133 };
1134 req.filter = Some(filter_chain(vec![
1135 router_gc_policy_filter(),
1136 family_filter(format!("^{ROUTER_FAMILY}$")),
1137 cq_filter,
1138 ]));
1139
1140 let Some(row) = self.read_row(req).await? else {
1141 return Ok(Default::default());
1142 };
1143 channels_from_cells(&row.cells)
1144 }
1145
1146 async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1148 let row_key = uaid.simple().to_string();
1149 let mut req = self.check_and_mutate_row_request(&row_key);
1150
1151 let column = format!("chid:{}", channel_id.as_hyphenated());
1153 let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1154
1155 let mut row = Row::new(row_key);
1157 let expiry = std::time::SystemTime::now() + self.router_ttl();
1158 row.cells
1159 .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1160 mutations.extend(self.get_mutations(row.cells)?);
1161
1162 let cq_filter = bigtable::RowFilter {
1164 filter: Some(bigtable::row_filter::Filter::ColumnQualifierRegexFilter(
1165 format!("^{column}$").into_bytes(),
1166 )),
1167 };
1168 req.predicate_filter = Some(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1169 req.true_mutations = mutations;
1170
1171 Ok(self.check_and_mutate(req).await?)
1172 }
1173
1174 async fn remove_node_id(
1176 &self,
1177 uaid: &Uuid,
1178 _node_id: &str,
1179 _connected_at: u64,
1180 version: &Option<Uuid>,
1181 ) -> DbResult<bool> {
1182 let row_key = uaid.simple().to_string();
1183 trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1184 let Some(version) = version else {
1185 return Err(DbError::General("Expected a user version field".to_owned()));
1186 };
1187
1188 let mut req = self.check_and_mutate_row_request(&row_key);
1189
1190 let mut filters = vec![router_gc_policy_filter()];
1191 filters.extend(version_filter(version));
1192 req.predicate_filter = Some(filter_chain(filters));
1193 req.true_mutations = self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?;
1194
1195 Ok(self.check_and_mutate(req).await?)
1196 }
1197
1198 async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1200 let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1201 debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1202 trace!(
1203 "🉑 timestamp: {:?}",
1204 &message.timestamp.to_be_bytes().to_vec()
1205 );
1206 let mut row = Row::new(row_key);
1207
1208 let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1212 trace!(
1213 "🉑 Message Expiry {}",
1214 expiry
1215 .duration_since(SystemTime::UNIX_EPOCH)
1216 .unwrap_or_default()
1217 .as_millis()
1218 );
1219
1220 let mut cells: Vec<cell::Cell> = Vec::new();
1221
1222 let is_topic = message.topic.is_some();
1223 let family = if is_topic {
1224 MESSAGE_TOPIC_FAMILY
1225 } else {
1226 MESSAGE_FAMILY
1227 };
1228 cells.extend(vec![
1229 cell::Cell {
1230 qualifier: "ttl".to_owned(),
1231 value: message.ttl.to_be_bytes().to_vec(),
1232 timestamp: expiry,
1233 ..Default::default()
1234 },
1235 cell::Cell {
1236 qualifier: "timestamp".to_owned(),
1237 value: message.timestamp.to_be_bytes().to_vec(),
1238 timestamp: expiry,
1239 ..Default::default()
1240 },
1241 cell::Cell {
1242 qualifier: "version".to_owned(),
1243 value: message.version.into_bytes(),
1244 timestamp: expiry,
1245 ..Default::default()
1246 },
1247 ]);
1248 if let Some(headers) = message.headers
1249 && !headers.is_empty()
1250 {
1251 cells.push(cell::Cell {
1252 qualifier: "headers".to_owned(),
1253 value: json!(headers).to_string().into_bytes(),
1254 timestamp: expiry,
1255 ..Default::default()
1256 });
1257 }
1258 #[cfg(feature = "reliable_report")]
1259 {
1260 if let Some(reliability_id) = message.reliability_id {
1261 trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1262 cells.push(cell::Cell {
1263 qualifier: "reliability_id".to_owned(),
1264 value: reliability_id.into_bytes(),
1265 timestamp: expiry,
1266 ..Default::default()
1267 });
1268 }
1269 if let Some(reliable_state) = message.reliable_state {
1270 cells.push(cell::Cell {
1271 qualifier: "reliable_state".to_owned(),
1272 value: reliable_state.to_string().into_bytes(),
1273 timestamp: expiry,
1274 ..Default::default()
1275 });
1276 }
1277 }
1278 if let Some(data) = message.data {
1279 cells.push(cell::Cell {
1280 qualifier: "data".to_owned(),
1281 value: data.into_bytes(),
1282 timestamp: expiry,
1283 ..Default::default()
1284 });
1285 }
1286
1287 row.add_cells(family, cells);
1288 trace!("🉑 Adding row");
1289 self.write_row(row).await?;
1290
1291 self.metrics
1292 .incr_with_tags(MetricName::NotificationMessageStored)
1293 .with_tag("topic", &is_topic.to_string())
1294 .with_tag("database", &self.name())
1295 .send();
1296 Ok(())
1297 }
1298
1299 async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1304 for message in messages {
1306 self.save_message(uaid, message).await?;
1307 }
1308 Ok(())
1309 }
1310
1311 async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1325 let row_key = uaid.simple().to_string();
1326 debug!(
1327 "🉑 Updating {} current_timestamp: {:?}",
1328 &row_key,
1329 timestamp.to_be_bytes().to_vec()
1330 );
1331 let expiry = std::time::SystemTime::now() + self.router_ttl();
1332 let mut row = Row::new(row_key.clone());
1333
1334 row.cells.insert(
1335 ROUTER_FAMILY.to_owned(),
1336 vec![
1337 cell::Cell {
1338 qualifier: "current_timestamp".to_owned(),
1339 value: timestamp.to_be_bytes().to_vec(),
1340 timestamp: expiry,
1341 ..Default::default()
1342 },
1343 new_version_cell(expiry),
1344 ],
1345 );
1346
1347 self.write_row(row).await?;
1348
1349 Ok(())
1350 }
1351
1352 async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1354 trace!(
1355 "🉑 attemping to delete {:?} :: {:?}",
1356 uaid.to_string(),
1357 chidmessageid
1358 );
1359 let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1360 debug!("🉑🔥 Deleting message {}", &row_key);
1361 self.delete_row(&row_key).await?;
1362 self.metrics
1363 .incr_with_tags(MetricName::NotificationMessageDeleted)
1364 .with_tag("database", &self.name())
1365 .send();
1366 Ok(())
1367 }
1368
1369 async fn fetch_topic_messages(
1371 &self,
1372 uaid: &Uuid,
1373 limit: usize,
1374 ) -> DbResult<FetchMessageResponse> {
1375 let start_key = format!("{}#01:", uaid.simple());
1376 let end_key = format!("{}#02:", uaid.simple());
1377 let mut req = bigtable::ReadRowsRequest {
1378 table_name: self.settings.table_name.clone(),
1379 app_profile_id: self.settings.app_profile_id.clone(),
1380 rows: Some(bigtable::RowSet {
1381 row_keys: Vec::new(),
1382 row_ranges: vec![bigtable::RowRange {
1383 start_key: Some(bigtable::row_range::StartKey::StartKeyOpen(
1384 start_key.into_bytes(),
1385 )),
1386 end_key: Some(bigtable::row_range::EndKey::EndKeyOpen(
1387 end_key.into_bytes(),
1388 )),
1389 }],
1390 }),
1391 ..Default::default()
1392 };
1393
1394 let mut filters = message_gc_policy_filter()?;
1395 filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1396
1397 req.filter = Some(filter_chain(filters));
1398 if limit > 0 {
1399 trace!("🉑 Setting limit to {limit}");
1400 req.rows_limit = limit as i64;
1401 }
1402 let rows = self.read_rows(req).await?;
1403 debug!(
1404 "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1405 rows.len(),
1406 limit
1407 );
1408
1409 let messages = self.rows_to_notifications(rows)?;
1410
1411 Ok(FetchMessageResponse {
1415 messages,
1416 timestamp: None,
1417 })
1418 }
1419
1420 async fn fetch_timestamp_messages(
1423 &self,
1424 uaid: &Uuid,
1425 timestamp: Option<u64>,
1426 limit: usize,
1427 ) -> DbResult<FetchMessageResponse> {
1428 let start_key = if let Some(ts) = timestamp {
1429 format!("{}#02:{}z", uaid.simple(), ts)
1432 } else {
1433 format!("{}#02:", uaid.simple())
1434 };
1435 let end_key = format!("{}#03:", uaid.simple());
1436 let mut req = bigtable::ReadRowsRequest {
1437 table_name: self.settings.table_name.clone(),
1438 app_profile_id: self.settings.app_profile_id.clone(),
1439 rows: Some(bigtable::RowSet {
1440 row_keys: Vec::new(),
1441 row_ranges: vec![bigtable::RowRange {
1442 start_key: Some(bigtable::row_range::StartKey::StartKeyOpen(
1443 start_key.into_bytes(),
1444 )),
1445 end_key: Some(bigtable::row_range::EndKey::EndKeyOpen(
1446 end_key.into_bytes(),
1447 )),
1448 }],
1449 }),
1450 ..Default::default()
1451 };
1452
1453 let mut filters = message_gc_policy_filter()?;
1467 filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1468
1469 req.filter = Some(filter_chain(filters));
1470 if limit > 0 {
1471 req.rows_limit = limit as i64;
1472 }
1473 let rows = self.read_rows(req).await?;
1474 debug!(
1475 "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1476 timestamp,
1477 rows.len(),
1478 limit,
1479 );
1480
1481 let messages = self.rows_to_notifications(rows)?;
1482 let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1484 Ok(FetchMessageResponse {
1485 messages,
1486 timestamp,
1487 })
1488 }
1489
1490 async fn health_check(&self) -> DbResult<bool> {
1491 Ok(self
1492 .pool
1493 .get()
1494 .await?
1495 .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1496 .await?)
1497 }
1498
1499 async fn router_table_exists(&self) -> DbResult<bool> {
1502 Ok(true)
1503 }
1504
1505 async fn message_table_exists(&self) -> DbResult<bool> {
1508 Ok(true)
1509 }
1510
1511 #[cfg(feature = "reliable_report")]
1512 async fn log_report(
1513 &self,
1514 reliability_id: &str,
1515 new_state: crate::reliability::ReliabilityState,
1516 ) -> DbResult<()> {
1517 let row_key = reliability_id.to_owned();
1518
1519 let mut row = Row::new(row_key);
1520 let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1521
1522 let cells: Vec<cell::Cell> = vec![cell::Cell {
1524 qualifier: new_state.to_string(),
1525 value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1526 timestamp: expiry,
1527 ..Default::default()
1528 }];
1529
1530 row.add_cells(RELIABLE_LOG_FAMILY, cells);
1531
1532 self.write_row(row).await?;
1533
1534 Ok(())
1535 }
1536
1537 fn box_clone(&self) -> Box<dyn DbClient> {
1538 Box::new(self.clone())
1539 }
1540
1541 fn name(&self) -> String {
1542 "Bigtable".to_owned()
1543 }
1544
1545 fn pool_status(&self) -> Option<deadpool::Status> {
1546 Some(self.pool.pool.status())
1547 }
1548}
1549
1550#[cfg(all(test, feature = "emulator"))]
1551mod tests {
1552
1553 use std::sync::Arc;
1558 use std::time::SystemTime;
1559
1560 use cadence::StatsdClient;
1561 use uuid;
1562
1563 use super::*;
1564 use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1565
1566 const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1567 const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1568 const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1569
1570 fn now() -> u64 {
1571 SystemTime::now()
1572 .duration_since(SystemTime::UNIX_EPOCH)
1573 .unwrap()
1574 .as_secs()
1575 }
1576
1577 fn new_client() -> DbResult<BigTableClientImpl> {
1578 let env_dsn = format!(
1579 "grpc://{}",
1580 std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1581 );
1582 let settings = DbSettings {
1583 dsn: Some(env_dsn),
1589 db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1590 .to_string(),
1591 };
1592
1593 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1594
1595 BigTableClientImpl::new(metrics, &settings)
1596 }
1597
1598 #[test]
1599 fn escape_bytes_for_regex() {
1600 let b = b"hi";
1601 assert_eq!(escape_bytes(b), b.to_vec());
1602 assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1603 let b = b"\xe2\x80\xb3";
1604 assert_eq!(escape_bytes(b), b.to_vec());
1605 let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1607 assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1608 let b = b"\xc0";
1609 assert_eq!(escape_bytes(b), b.to_vec());
1610 assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1611 }
1612
1613 #[actix_rt::test]
1614 async fn health_check() {
1615 let client = new_client().unwrap();
1616
1617 let result = client.health_check().await;
1618 assert!(result.is_ok());
1619 assert!(result.unwrap());
1620 }
1621
1622 #[actix_rt::test]
1625 async fn timestamp_granularity_rejects_sub_millisecond_micros() {
1626 let client = new_client().unwrap();
1627 let uaid = gen_test_uaid();
1628 let row_key = uaid.simple().to_string();
1629 let _ = client.remove_user(&uaid).await;
1630
1631 let mut req = client.mutate_row_request(&row_key);
1632 req.mutations = vec![bigtable::Mutation {
1633 mutation: Some(bigtable::mutation::Mutation::SetCell(
1634 bigtable::mutation::SetCell {
1635 family_name: ROUTER_FAMILY.to_owned(),
1636 column_qualifier: b"granularity_probe".to_vec(),
1637 timestamp_micros: 1,
1638 value: b"x".to_vec(),
1639 },
1640 )),
1641 }];
1642
1643 assert!(
1644 client.mutate_row(req).await.is_err(),
1645 "expected granularity mismatch for timestamp_micros=1"
1646 );
1647
1648 let _ = client.remove_user(&uaid).await;
1649 }
1650
1651 #[actix_rt::test]
1654 async fn run_gauntlet() -> DbResult<()> {
1655 let client = new_client()?;
1656
1657 let connected_at = ms_since_epoch();
1658
1659 let uaid = Uuid::parse_str(TEST_USER).unwrap();
1660 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1661 let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1662
1663 let node_id = "test_node".to_owned();
1664
1665 let _ = client.remove_user(&uaid).await;
1667
1668 let test_user = User {
1669 uaid,
1670 router_type: "webpush".to_owned(),
1671 connected_at,
1672 router_data: None,
1673 node_id: Some(node_id.clone()),
1674 ..Default::default()
1675 };
1676
1677 let _ = client.remove_user(&uaid).await;
1680
1681 client.add_user(&test_user).await?;
1683 let fetched = client.get_user(&uaid).await?;
1684 assert!(fetched.is_some());
1685 let fetched = fetched.unwrap();
1686 assert_eq!(fetched.router_type, "webpush".to_owned());
1687
1688 let connected_at = ms_since_epoch();
1690
1691 client.add_channel(&uaid, &chid).await?;
1693 let channels = client.get_channels(&uaid).await?;
1694 assert!(channels.contains(&chid));
1695
1696 let mut new_channels: HashSet<Uuid> = HashSet::new();
1698 new_channels.insert(chid);
1699 for _ in 1..10 {
1700 new_channels.insert(uuid::Uuid::new_v4());
1701 }
1702 let chid_to_remove = uuid::Uuid::new_v4();
1703 new_channels.insert(chid_to_remove);
1704 client.add_channels(&uaid, new_channels.clone()).await?;
1705 let channels = client.get_channels(&uaid).await?;
1706 assert_eq!(channels, new_channels);
1707
1708 assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1710 assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1711 new_channels.remove(&chid_to_remove);
1712 let channels = client.get_channels(&uaid).await?;
1713 assert_eq!(channels, new_channels);
1714
1715 let mut updated = User {
1719 connected_at,
1720 ..test_user.clone()
1721 };
1722 let result = client.update_user(&mut updated).await;
1723 assert!(result.is_ok());
1724 assert!(!result.unwrap());
1725
1726 let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1728 assert_eq!(fetched.connected_at, fetched2.connected_at);
1729
1730 let mut updated = User {
1732 connected_at: fetched.connected_at + 300,
1733 ..fetched2
1734 };
1735 let result = client.update_user(&mut updated).await;
1736 assert!(result.is_ok());
1737 assert!(result.unwrap());
1738 assert_ne!(
1739 fetched2.connected_at,
1740 client.get_user(&uaid).await?.unwrap().connected_at
1741 );
1742
1743 client
1745 .increment_storage(
1746 &fetched.uaid,
1747 SystemTime::now()
1748 .duration_since(SystemTime::UNIX_EPOCH)
1749 .unwrap()
1750 .as_secs(),
1751 )
1752 .await?;
1753
1754 let test_data = "An_encrypted_pile_of_crap".to_owned();
1755 let timestamp = now();
1756 let sort_key = now();
1757 let test_notification = crate::db::Notification {
1759 channel_id: chid,
1760 version: "test".to_owned(),
1761 ttl: 300,
1762 timestamp,
1763 data: Some(test_data.clone()),
1764 sortkey_timestamp: Some(sort_key),
1765 ..Default::default()
1766 };
1767 let res = client.save_message(&uaid, test_notification.clone()).await;
1768 assert!(res.is_ok());
1769
1770 let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1771 assert_ne!(fetched.messages.len(), 0);
1772 let fm = fetched.messages.pop().unwrap();
1773 assert_eq!(fm.channel_id, test_notification.channel_id);
1774 assert_eq!(fm.data, Some(test_data));
1775
1776 let fetched = client
1778 .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1779 .await?;
1780 assert_ne!(fetched.messages.len(), 0);
1781
1782 let fetched = client
1784 .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1785 .await?;
1786 assert_eq!(fetched.messages.len(), 0);
1787
1788 assert!(
1790 client
1791 .remove_message(&uaid, &test_notification.chidmessageid())
1792 .await
1793 .is_ok()
1794 );
1795
1796 assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1797
1798 client.add_channel(&uaid, &topic_chid).await?;
1800 let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1801 let timestamp = now();
1802 let sort_key = now();
1803 let test_notification = crate::db::Notification {
1805 channel_id: topic_chid,
1806 version: "test".to_owned(),
1807 ttl: 300,
1808 topic: Some("topic".to_owned()),
1809 timestamp,
1810 data: Some(test_data.clone()),
1811 sortkey_timestamp: Some(sort_key),
1812 ..Default::default()
1813 };
1814 assert!(
1815 client
1816 .save_message(&uaid, test_notification.clone())
1817 .await
1818 .is_ok()
1819 );
1820
1821 let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1822 assert_ne!(fetched.messages.len(), 0);
1823 let fm = fetched.messages.pop().unwrap();
1824 assert_eq!(fm.channel_id, test_notification.channel_id);
1825 assert_eq!(fm.data, Some(test_data));
1826
1827 let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1829 assert_ne!(fetched.messages.len(), 0);
1830
1831 assert!(
1833 client
1834 .remove_message(&uaid, &test_notification.chidmessageid())
1835 .await
1836 .is_ok()
1837 );
1838
1839 assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1840
1841 let msgs = client
1842 .fetch_timestamp_messages(&uaid, None, 999)
1843 .await?
1844 .messages;
1845 assert!(msgs.is_empty());
1846
1847 let fetched = client.get_user(&uaid).await?.unwrap();
1848 assert!(
1849 client
1850 .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1851 .await
1852 .is_ok()
1853 );
1854 let fetched = client.get_user(&uaid).await?.unwrap();
1856 assert_eq!(fetched.node_id, None);
1857
1858 assert!(client.remove_user(&uaid).await.is_ok());
1859
1860 assert!(client.get_user(&uaid).await?.is_none());
1861
1862 Ok(())
1863 }
1864
1865 #[actix_rt::test]
1866 async fn read_cells_family_id() -> DbResult<()> {
1867 let client = new_client().unwrap();
1868 let uaid = gen_test_uaid();
1869 client.remove_user(&uaid).await.unwrap();
1870
1871 let qualifier = "foo".to_owned();
1872
1873 let row_key = uaid.simple().to_string();
1874 let mut row = Row::new(row_key.clone());
1875 row.cells.insert(
1876 ROUTER_FAMILY.to_owned(),
1877 vec![cell::Cell {
1878 qualifier: qualifier.to_owned(),
1879 value: "bar".as_bytes().to_vec(),
1880 ..Default::default()
1881 }],
1882 );
1883 client.write_row(row).await.unwrap();
1884 let req = client.read_row_request(&row_key);
1885 let Some(row) = client.read_row(req).await.unwrap() else {
1886 panic!("Expected row");
1887 };
1888 assert_eq!(row.cells.len(), 1);
1889 assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1890 client.remove_user(&uaid).await
1891 }
1892
1893 #[actix_rt::test]
1894 async fn add_user_existing() {
1895 let client = new_client().unwrap();
1896 let uaid = gen_test_uaid();
1897 let user = User {
1898 uaid,
1899 ..Default::default()
1900 };
1901 client.remove_user(&uaid).await.unwrap();
1902
1903 client.add_user(&user).await.unwrap();
1904 let err = client.add_user(&user).await.unwrap_err();
1905 assert!(matches!(err, DbError::Conditional));
1906 }
1907
1908 #[actix_rt::test]
1909 async fn version_check() {
1910 let client = new_client().unwrap();
1911 let uaid = gen_test_uaid();
1912 let user = User {
1913 uaid,
1914 ..Default::default()
1915 };
1916 client.remove_user(&uaid).await.unwrap();
1917
1918 client.add_user(&user).await.unwrap();
1919 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1920 assert!(client.update_user(&mut user.clone()).await.unwrap());
1921
1922 let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1923 assert_ne!(user.version, fetched.version);
1924 assert!(!client.update_user(&mut user).await.unwrap());
1926
1927 client.remove_user(&uaid).await.unwrap();
1928 }
1929
1930 #[actix_rt::test]
1931 async fn lingering_chid_record() {
1932 let client = new_client().unwrap();
1933 let uaid = gen_test_uaid();
1934 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1935 let user = User {
1936 uaid,
1937 ..Default::default()
1938 };
1939 client.remove_user(&uaid).await.unwrap();
1940
1941 client.add_channel(&uaid, &chid).await.unwrap();
1943
1944 assert!(client.get_user(&uaid).await.unwrap().is_none());
1947
1948 client.add_user(&user).await.unwrap();
1949 assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1951
1952 client.remove_user(&uaid).await.unwrap();
1953 }
1954
1955 #[actix_rt::test]
1956 async fn lingering_current_timestamp() {
1957 let client = new_client().unwrap();
1958 let uaid = gen_test_uaid();
1959 client.remove_user(&uaid).await.unwrap();
1960
1961 client
1962 .increment_storage(&uaid, crate::util::sec_since_epoch())
1963 .await
1964 .unwrap();
1965 assert!(client.get_user(&uaid).await.unwrap().is_none());
1966
1967 client.remove_user(&uaid).await.unwrap();
1968 }
1969
1970 #[actix_rt::test]
1971 async fn lingering_chid_w_version_record() {
1972 let client = new_client().unwrap();
1973 let uaid = gen_test_uaid();
1974 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1975 client.remove_user(&uaid).await.unwrap();
1976
1977 client.add_channel(&uaid, &chid).await.unwrap();
1978 assert!(client.remove_channel(&uaid, &chid).await.unwrap());
1979 assert!(client.get_user(&uaid).await.unwrap().is_none());
1980
1981 client.remove_user(&uaid).await.unwrap();
1982 }
1983
1984 #[actix_rt::test]
1985 async fn channel_and_current_timestamp_ttl_updates() {
1986 let client = new_client().unwrap();
1987 let uaid = gen_test_uaid();
1988 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1989 client.remove_user(&uaid).await.unwrap();
1990
1991 let user = User {
1993 uaid,
1994 ..Default::default()
1995 };
1996 client.add_user(&user).await.unwrap();
1997
1998 client.add_channel(&uaid, &chid).await.unwrap();
1999 client
2000 .add_channel(&uaid, &uuid::Uuid::new_v4())
2001 .await
2002 .unwrap();
2003
2004 client
2005 .increment_storage(
2006 &uaid,
2007 SystemTime::now()
2008 .duration_since(SystemTime::UNIX_EPOCH)
2009 .unwrap()
2010 .as_secs(),
2011 )
2012 .await
2013 .unwrap();
2014
2015 let req = client.read_row_request(&uaid.as_simple().to_string());
2016 let Some(mut row) = client.read_row(req).await.unwrap() else {
2017 panic!("Expected row");
2018 };
2019
2020 let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
2023 for mut cells in row.cells.into_values() {
2024 let Some(cell) = cells.pop() else {
2025 continue;
2026 };
2027 assert!(
2028 cell.timestamp >= ca_expiry,
2029 "{} cell timestamp should >= connected_at's",
2030 cell.qualifier
2031 );
2032 }
2033
2034 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
2035
2036 tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
2038 client.update_user(&mut user).await.unwrap();
2039
2040 let req = client.read_row_request(&uaid.as_simple().to_string());
2042 let Some(mut row) = client.read_row(req).await.unwrap() else {
2043 panic!("Expected row");
2044 };
2045
2046 let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
2047
2048 assert!(ca_expiry2 > ca_expiry);
2049
2050 for mut cells in row.cells.into_values() {
2051 let Some(cell) = cells.pop() else {
2052 continue;
2053 };
2054 assert!(
2055 cell.timestamp >= ca_expiry2,
2056 "{} cell timestamp expiry should exceed connected_at's",
2057 cell.qualifier
2058 );
2059 }
2060
2061 client.remove_user(&uaid).await.unwrap();
2062 }
2063}