1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8
9use again::RetryPolicy;
10use async_trait::async_trait;
11use cadence::StatsdClient;
12use chrono::TimeDelta;
13use futures_util::StreamExt;
14use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
15use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
16use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
17use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
18use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
19use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
20use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
21use protobuf::RepeatedField;
22use serde_json::{from_str, json};
23use uuid::Uuid;
24
25use crate::db::RangeKey;
26use crate::db::{
27 client::{DbClient, FetchMessageResponse},
28 error::{DbError, DbResult},
29 DbSettings, Notification, User, USER_RECORD_VERSION,
30};
31use crate::metric_name::MetricName;
32use crate::metrics::StatsdClientExt;
33use crate::MAX_ROUTER_TTL_SECS;
34
35pub use self::metadata::MetadataBuilder;
36use self::row::{Row, RowCells};
37use super::pool::BigTablePool;
38use super::BigTableDbSettings;
39
40pub mod cell;
41pub mod error;
42pub(crate) mod merge;
43pub mod metadata;
44pub mod row;
45
46pub type RowKey = String;
48
49pub type Qualifier = String;
54pub type FamilyId = String;
55
56const ROUTER_FAMILY: &str = "router";
57const MESSAGE_FAMILY: &str = "message"; const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
59#[cfg(feature = "reliable_report")]
60const RELIABLE_LOG_FAMILY: &str = "reliability";
61#[cfg(feature = "reliable_report")]
62pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
65
66pub(crate) const RETRY_COUNT: usize = 5;
67
68struct Uaid(Uuid);
71
72impl Display for Uaid {
73 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
74 write!(f, "{}", self.0.as_simple())
75 }
76}
77
78impl From<Uaid> for String {
79 fn from(uaid: Uaid) -> String {
80 uaid.0.as_simple().to_string()
81 }
82}
83
84#[derive(Clone)]
85pub struct BigTableClientImpl {
87 pub(crate) settings: BigTableDbSettings,
88 metrics: Arc<StatsdClient>,
90 pool: BigTablePool,
92 metadata: Metadata,
93 admin_metadata: Metadata,
94}
95
96fn router_gc_policy_filter() -> data::RowFilter {
98 let mut latest_cell_filter = data::RowFilter::default();
99 latest_cell_filter.set_cells_per_column_limit_filter(1);
100 latest_cell_filter
101}
102
103fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
106 let mut timestamp_filter = data::RowFilter::default();
107 let bt_now: i64 = SystemTime::now()
108 .duration_since(SystemTime::UNIX_EPOCH)
109 .map_err(error::BigTableError::WriteTime)?
110 .as_millis() as i64;
111 let mut range_filter = data::TimestampRange::default();
112 range_filter.set_start_timestamp_micros(bt_now * 1000);
113 timestamp_filter.set_timestamp_range_filter(range_filter);
114
115 Ok(vec![router_gc_policy_filter(), timestamp_filter])
116}
117
118fn family_filter(regex: String) -> data::RowFilter {
120 let mut filter = data::RowFilter::default();
121 filter.set_family_name_regex_filter(regex);
122 filter
123}
124
125fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
129 let mut vec = Vec::with_capacity(bytes.len() * 2);
130 for &b in bytes {
131 if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
132 if b == b'\0' {
133 vec.extend("\\x00".as_bytes());
138 continue;
139 }
140 vec.push(b'\\');
141 }
142 vec.push(b);
143 }
144 vec
145}
146
147fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
150 let mut cq_filter = data::RowFilter::default();
151 cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());
152
153 let mut value_filter = data::RowFilter::default();
154 value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));
155
156 vec![
157 family_filter(format!("^{ROUTER_FAMILY}$")),
158 cq_filter,
159 value_filter,
160 ]
161}
162
163fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
165 cell::Cell {
166 qualifier: "version".to_owned(),
167 value: Uuid::new_v4().into(),
168 timestamp,
169 ..Default::default()
170 }
171}
172
173fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
175 let mut chain = RowFilter_Chain::default();
176 chain.set_filters(filters.into());
177 let mut filter = RowFilter::default();
178 filter.set_chain(chain);
179 filter
180}
181
182fn read_row_request(
184 table_name: &str,
185 app_profile_id: &str,
186 row_key: &str,
187) -> bigtable::ReadRowsRequest {
188 let mut req = bigtable::ReadRowsRequest::default();
189 req.set_table_name(table_name.to_owned());
190 req.set_app_profile_id(app_profile_id.to_owned());
191
192 let mut row_keys = RepeatedField::default();
193 row_keys.push(row_key.as_bytes().to_vec());
194 let mut row_set = data::RowSet::default();
195 row_set.set_row_keys(row_keys);
196 req.set_rows(row_set);
197
198 req
199}
200
201fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
202 let v: [u8; 8] = value
203 .try_into()
204 .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
205 Ok(u64::from_be_bytes(v))
206}
207
208fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
209 String::from_utf8(value).map_err(|e| {
210 debug!("🉑 cannot read string {}: {:?}", name, e);
211 DbError::DeserializeString(name.to_owned())
212 })
213}
214
215fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
219 let mut result = HashSet::new();
220 for cells in cells.values() {
221 let Some(cell) = cells.last() else {
222 continue;
223 };
224 let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
225 return Err(DbError::Integrity(
226 "get_channels expected: chid:<chid>".to_owned(),
227 None,
228 ));
229 };
230 result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
231 }
232 Ok(result)
233}
234
235fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
237 let channels = channels.into_owned();
238 let mut cells = Vec::with_capacity(channels.len().min(100_000));
239 for (i, channel_id) in channels.into_iter().enumerate() {
240 if i >= 100_000 {
244 break;
245 }
246 cells.push(cell::Cell {
247 qualifier: format!("chid:{}", channel_id.as_hyphenated()),
248 timestamp: expiry,
249 ..Default::default()
250 });
251 }
252 cells
253}
254
255pub fn retry_policy(max: usize) -> RetryPolicy {
256 RetryPolicy::default()
257 .with_max_retries(max)
258 .with_jitter(true)
259}
260
261fn retryable_internal_err(status: &RpcStatus) -> bool {
262 match status.code() {
263 RpcStatusCode::UNKNOWN => status
264 .message()
265 .eq_ignore_ascii_case("error occurred when fetching oauth2 token."),
266 RpcStatusCode::INTERNAL => [
267 "rst_stream",
268 "rst stream",
269 "received unexpected eos on data frame from server",
270 ]
271 .contains(&status.message().to_lowercase().as_str()),
272 RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
273 _ => false,
274 }
275}
276
277pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
278 let mut metric = metrics
279 .incr_with_tags(MetricName::DatabaseRetry)
280 .with_tag("error", err_type)
281 .with_tag("type", "bigtable");
282 if let Some(code) = code {
283 metric = metric.with_tag("code", code);
284 }
285 metric.send();
286}
287
288pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool + '_ {
289 move |err| {
290 debug!("🉑 Checking grpcio::Error...{err}");
291 match err {
292 grpcio::Error::RpcFailure(status) => {
293 info!("GRPC Failure :{:?}", status);
294 let retry = retryable_internal_err(status);
295 if retry {
296 metric(metrics, "RpcFailure", Some(&status.code().to_string()));
297 }
298 retry
299 }
300 grpcio::Error::BindFail(_) => {
301 metric(metrics, "BindFail", None);
302 true
303 }
304 grpcio::Error::CallFailure(grpc_call_status) => {
307 let retry = grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR;
308 if retry {
309 metric(
310 metrics,
311 "CallFailure",
312 Some(&format!("{grpc_call_status:?}")),
313 );
314 }
315 retry
316 }
317 _ => false,
318 }
319 }
320}
321
322pub fn retryable_bt_err(
323 metrics: &Arc<StatsdClient>,
324) -> impl Fn(&error::BigTableError) -> bool + '_ {
325 move |err| {
326 debug!("🉑 Checking BigTableError...{err}");
327 match err {
328 error::BigTableError::InvalidRowResponse(e)
329 | error::BigTableError::Read(e)
330 | error::BigTableError::Write(e)
331 | error::BigTableError::GRPC(e) => retryable_grpcio_err(metrics)(e),
332 _ => false,
333 }
334 }
335}
336
337fn is_incomplete_router_record(cells: &RowCells) -> bool {
351 cells
352 .keys()
353 .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
354}
355
356fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
357 ::grpcio::CallOption::default().headers(metadata)
358}
359
360impl BigTableClientImpl {
385 pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
386 debug!("🏊 BT Pool new");
388 let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
389 info!("🉑 {:#?}", db_settings);
390 let pool = BigTablePool::new(settings, &metrics)?;
391
392 let metadata = db_settings.metadata()?;
394 let admin_metadata = db_settings.admin_metadata()?;
395 Ok(Self {
396 settings: db_settings,
397 metrics,
398 metadata,
399 admin_metadata,
400 pool,
401 })
402 }
403
404 fn router_ttl(&self) -> Duration {
405 self.settings
406 .max_router_ttl
407 .unwrap_or(Duration::from_secs(MAX_ROUTER_TTL_SECS))
408 }
409
410 pub fn spawn_sweeper(&self, interval: Duration) {
412 self.pool.spawn_sweeper(interval);
413 }
414
415 fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
417 read_row_request(
418 &self.settings.table_name,
419 &self.settings.app_profile_id,
420 row_key,
421 )
422 }
423
424 fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
426 let mut req = bigtable::MutateRowRequest::default();
427 req.set_table_name(self.settings.table_name.clone());
428 req.set_app_profile_id(self.settings.app_profile_id.clone());
429 req.set_row_key(row_key.as_bytes().to_vec());
430 req
431 }
432
433 fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
435 let mut req = bigtable::CheckAndMutateRowRequest::default();
436 req.set_table_name(self.settings.table_name.clone());
437 req.set_app_profile_id(self.settings.app_profile_id.clone());
438 req.set_row_key(row_key.as_bytes().to_vec());
439 req
440 }
441
442 async fn mutate_row(
444 &self,
445 req: bigtable::MutateRowRequest,
446 ) -> Result<(), error::BigTableError> {
447 let bigtable = self.pool.get().await?;
448 retry_policy(self.settings.retry_count)
449 .retry_if(
450 || async {
451 bigtable
452 .conn
453 .mutate_row_opt(&req, call_opts(self.metadata.clone()))
454 },
455 retryable_grpcio_err(&self.metrics),
456 )
457 .await
458 .map_err(error::BigTableError::Write)?;
459 Ok(())
460 }
461
462 #[allow(unused)]
464 async fn mutate_rows(
465 &self,
466 req: bigtable::MutateRowsRequest,
467 ) -> Result<(), error::BigTableError> {
468 let bigtable = self.pool.get().await?;
469 let resp = retry_policy(self.settings.retry_count)
471 .retry_if(
472 || async {
473 bigtable
474 .conn
475 .mutate_rows_opt(&req, call_opts(self.metadata.clone()))
476 },
477 retryable_grpcio_err(&self.metrics),
478 )
479 .await
480 .map_err(error::BigTableError::Write)?;
481
482 let mut stream = Box::pin(resp);
489 let mut cnt = 0;
490 loop {
491 let (result, remainder) = StreamExt::into_future(stream).await;
492 if let Some(result) = result {
493 debug!("🎏 Result block: {}", cnt);
494 match result {
495 Ok(r) => {
496 for e in r.get_entries() {
497 if e.has_status() {
498 let status = e.get_status();
499 let code = error::MutateRowStatus::from(status.get_code());
501 if !code.is_ok() {
502 return Err(error::BigTableError::Status(
503 code,
504 status.get_message().to_owned(),
505 ));
506 }
507 debug!("🎏 Response: {} OK", e.index);
508 }
509 }
510 }
511 Err(e) => return Err(error::BigTableError::Write(e)),
512 };
513 cnt += 1;
514 } else {
515 debug!("🎏 Done!");
516 break;
517 }
518 stream = remainder;
519 }
520
521 Ok(())
522 }
523
524 async fn read_row(
526 &self,
527 req: bigtable::ReadRowsRequest,
528 ) -> Result<Option<row::Row>, error::BigTableError> {
529 let mut rows = self.read_rows(req).await?;
530 Ok(rows.pop_first().map(|(_, v)| v))
531 }
532
533 async fn read_rows(
537 &self,
538 req: ReadRowsRequest,
539 ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
540 let bigtable = self.pool.get().await?;
541 let resp = retry_policy(self.settings.retry_count)
542 .retry_if(
543 || async {
544 let resp: grpcio::ClientSStreamReceiver<bigtable::ReadRowsResponse> = bigtable
545 .conn
546 .read_rows_opt(&req, call_opts(self.metadata.clone()))
547 .map_err(error::BigTableError::Read)?;
548 merge::RowMerger::process_chunks(resp).await
549 },
550 retryable_bt_err(&self.metrics),
551 )
552 .await?;
553 Ok(resp)
554 }
555
556 async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
560 let mut req = self.mutate_row_request(&row.row_key);
561 let mutations = self.get_mutations(row.cells)?;
566 req.set_mutations(mutations);
567 self.mutate_row(req).await?;
568 Ok(())
569 }
570
571 fn get_mutations(
573 &self,
574 cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
575 ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
576 let mut mutations = protobuf::RepeatedField::default();
577 for (family_id, cells) in cells {
578 for cell in cells {
579 let mut mutation = data::Mutation::default();
580 let mut set_cell = data::Mutation_SetCell::default();
581 let timestamp = cell
582 .timestamp
583 .duration_since(SystemTime::UNIX_EPOCH)
584 .map_err(error::BigTableError::WriteTime)?;
585 set_cell.family_name.clone_from(&family_id);
586 set_cell.set_column_qualifier(cell.qualifier.clone().into_bytes());
587 set_cell.set_value(cell.value);
588 set_cell.set_timestamp_micros((timestamp.as_millis() * 1000) as i64);
591 debug!("🉑 expiring in {:?}", timestamp.as_millis());
592 mutation.set_set_cell(set_cell);
593 mutations.push(mutation);
594 }
595 }
596 Ok(mutations)
597 }
598
599 async fn check_and_mutate_row(
607 &self,
608 row: row::Row,
609 filter: RowFilter,
610 state: bool,
611 ) -> Result<bool, error::BigTableError> {
612 let mut req = self.check_and_mutate_row_request(&row.row_key);
613 let mutations = self.get_mutations(row.cells)?;
614 req.set_predicate_filter(filter);
615 if state {
616 req.set_true_mutations(mutations);
617 } else {
618 req.set_false_mutations(mutations);
619 }
620 self.check_and_mutate(req).await
621 }
622
623 async fn check_and_mutate(
624 &self,
625 req: bigtable::CheckAndMutateRowRequest,
626 ) -> Result<bool, error::BigTableError> {
627 let bigtable = self.pool.get().await?;
628 let resp = retry_policy(self.settings.retry_count)
629 .retry_if(
630 || async {
631 bigtable
634 .conn
635 .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
636 },
637 retryable_grpcio_err(&self.metrics),
638 )
639 .await
640 .map_err(error::BigTableError::Write)?;
641 debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
642 Ok(resp.get_predicate_matched())
643 }
644
645 fn get_delete_mutations(
646 &self,
647 family: &str,
648 column_names: &[&str],
649 time_range: Option<&data::TimestampRange>,
650 ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
651 let mut mutations = protobuf::RepeatedField::default();
652 for column in column_names {
653 let mut mutation = data::Mutation::default();
654 let mut del_cell = data::Mutation_DeleteFromColumn::default();
658 del_cell.set_family_name(family.to_owned());
659 del_cell.set_column_qualifier(column.as_bytes().to_vec());
660 if let Some(range) = time_range {
661 del_cell.set_time_range(range.clone());
662 }
663 mutation.set_delete_from_column(del_cell);
664 mutations.push(mutation);
665 }
666 Ok(mutations)
667 }
668
669 #[allow(unused)]
670 #[allow(unused)]
672 async fn delete_cells(
673 &self,
674 row_key: &str,
675 family: &str,
676 column_names: &[&str],
677 time_range: Option<&data::TimestampRange>,
678 ) -> Result<(), error::BigTableError> {
679 let mut req = self.mutate_row_request(row_key);
680 req.set_mutations(self.get_delete_mutations(family, column_names, time_range)?);
681 self.mutate_row(req).await
682 }
683
684 async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
686 let mut req = self.mutate_row_request(row_key);
687 let mut mutations = protobuf::RepeatedField::default();
688 let mut mutation = data::Mutation::default();
689 mutation.set_delete_from_row(data::Mutation_DeleteFromRow::default());
690 mutations.push(mutation);
691 req.set_mutations(mutations);
692 self.mutate_row(req).await
693 }
694
695 #[allow(unused)]
700 async fn delete_rows(&self, row_key: &str) -> Result<bool, error::BigTableError> {
701 let admin = BigtableTableAdminClient::new(self.pool.get_channel()?);
702 let mut req = DropRowRangeRequest::new();
703 req.set_name(self.settings.table_name.clone());
704 req.set_row_key_prefix(row_key.as_bytes().to_vec());
705
706 admin
707 .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone()))
708 .map_err(|e| {
709 error!("{:?}", e);
710 error::BigTableError::Admin(
711 format!(
712 "Could not send delete command for {}",
713 &self.settings.table_name
714 ),
715 Some(e.to_string()),
716 )
717 })?
718 .await
719 .map_err(|e| {
720 error!("post await: {:?}", e);
721 error::BigTableError::Admin(
722 format!(
723 "Could not delete data from table {}",
724 &self.settings.table_name
725 ),
726 Some(e.to_string()),
727 )
728 })?;
729
730 Ok(true)
731 }
732
733 fn rows_to_notifications(
734 &self,
735 rows: BTreeMap<String, Row>,
736 ) -> Result<Vec<Notification>, DbError> {
737 rows.into_iter()
738 .map(|(row_key, row)| self.row_to_notification(&row_key, row))
739 .collect()
740 }
741
742 fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
743 let Some((_, chidmessageid)) = row_key.split_once('#') else {
744 return Err(DbError::Integrity(
745 "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
746 None,
747 ));
748 };
749 let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
750 DbError::Integrity(
751 format!("rows_to_notification expected chidmessageid: {e}"),
752 None,
753 )
754 })?;
755
756 let mut notif = Notification {
758 channel_id: range_key.channel_id,
759 topic: range_key.topic,
760 sortkey_timestamp: range_key.sortkey_timestamp,
761 version: to_string(row.take_required_cell("version")?.value, "version")?,
762 ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
763 timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
764 ..Default::default()
765 };
766
767 if let Some(cell) = row.take_cell("data") {
769 notif.data = Some(to_string(cell.value, "data")?);
770 }
771 #[cfg(feature = "reliable_report")]
772 {
773 if let Some(cell) = row.take_cell("reliability_id") {
774 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
775 }
776 if let Some(cell) = row.take_cell("reliable_state") {
777 notif.reliable_state = Some(
778 crate::reliability::ReliabilityState::from_str(&to_string(
779 cell.value,
780 "reliable_state",
781 )?)
782 .map_err(|e| {
783 DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
784 })?,
785 );
786 }
787 }
788 if let Some(cell) = row.take_cell("headers") {
789 notif.headers = Some(
790 serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
791 .map_err(|e| DbError::Serialization(e.to_string()))?,
792 );
793 }
794 if let Some(cell) = row.take_cell("reliability_id") {
795 trace!("🚣 Is reliable");
796 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
797 }
798
799 trace!("🚣 Deserialized message row: {:?}", ¬if);
800 Ok(notif)
801 }
802
803 fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
809 let row_key = user.uaid.simple().to_string();
810 let mut row = Row::new(row_key);
811 let expiry = std::time::SystemTime::now() + self.router_ttl();
812
813 let mut cells: Vec<cell::Cell> = vec![
814 cell::Cell {
815 qualifier: "connected_at".to_owned(),
816 value: user.connected_at.to_be_bytes().to_vec(),
817 timestamp: expiry,
818 ..Default::default()
819 },
820 cell::Cell {
821 qualifier: "router_type".to_owned(),
822 value: user.router_type.clone().into_bytes(),
823 timestamp: expiry,
824 ..Default::default()
825 },
826 cell::Cell {
827 qualifier: "record_version".to_owned(),
828 value: user
829 .record_version
830 .unwrap_or(USER_RECORD_VERSION)
831 .to_be_bytes()
832 .to_vec(),
833 timestamp: expiry,
834 ..Default::default()
835 },
836 cell::Cell {
837 qualifier: "version".to_owned(),
838 value: (*version).into(),
839 timestamp: expiry,
840 ..Default::default()
841 },
842 ];
843
844 if let Some(router_data) = &user.router_data {
845 cells.push(cell::Cell {
846 qualifier: "router_data".to_owned(),
847 value: json!(router_data).to_string().as_bytes().to_vec(),
848 timestamp: expiry,
849 ..Default::default()
850 });
851 };
852 if let Some(current_timestamp) = user.current_timestamp {
853 cells.push(cell::Cell {
854 qualifier: "current_timestamp".to_owned(),
855 value: current_timestamp.to_be_bytes().to_vec(),
856 timestamp: expiry,
857 ..Default::default()
858 });
859 };
860 if let Some(node_id) = &user.node_id {
861 cells.push(cell::Cell {
862 qualifier: "node_id".to_owned(),
863 value: node_id.as_bytes().to_vec(),
864 timestamp: expiry,
865 ..Default::default()
866 });
867 };
868
869 cells.extend(channels_to_cells(
870 Cow::Borrowed(&user.priv_channels),
871 expiry,
872 ));
873
874 row.add_cells(ROUTER_FAMILY, cells);
875 row
876 }
877}
878
879#[derive(Clone)]
880pub struct BigtableDb {
881 pub(super) conn: BigtableClient,
882 pub(super) health_metadata: Metadata,
883 table_name: String,
884}
885
886impl BigtableDb {
887 pub fn new(channel: Channel, health_metadata: &Metadata, table_name: &str) -> Self {
888 Self {
889 conn: BigtableClient::new(channel),
890 health_metadata: health_metadata.clone(),
891 table_name: table_name.to_owned(),
892 }
893 }
894 pub async fn health_check(
901 &mut self,
902 metrics: &Arc<StatsdClient>,
903 app_profile_id: &str,
904 ) -> Result<bool, error::BigTableError> {
905 let random_uaid = Uuid::new_v4().simple().to_string();
912 let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
913 let mut filter = data::RowFilter::default();
914 filter.set_block_all_filter(true);
915 req.set_filter(filter);
916 let _r = retry_policy(RETRY_COUNT)
917 .retry_if(
918 || async {
919 self.conn
920 .read_rows_opt(&req, call_opts(self.health_metadata.clone()))
921 },
922 retryable_grpcio_err(metrics),
923 )
924 .await
925 .map_err(error::BigTableError::Read)?;
926
927 debug!("🉑 health check");
928 Ok(true)
929 }
930}
931
932#[async_trait]
933impl DbClient for BigTableClientImpl {
934 async fn add_user(&self, user: &User) -> DbResult<()> {
936 trace!("🉑 Adding user");
937 let Some(ref version) = user.version else {
938 return Err(DbError::General(
939 "add_user expected a user version field".to_owned(),
940 ));
941 };
942 let row = self.user_to_row(user, version);
943
944 let mut row_key_filter = RowFilter::default();
946 row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
947 let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
948
949 if self.check_and_mutate_row(row, filter, false).await? {
950 return Err(DbError::Conditional);
951 }
952 Ok(())
953 }
954
955 async fn update_user(&self, user: &mut User) -> DbResult<bool> {
971 let Some(ref version) = user.version else {
972 return Err(DbError::General(
973 "update_user expected a user version field".to_owned(),
974 ));
975 };
976
977 let mut filters = vec![router_gc_policy_filter()];
978 filters.extend(version_filter(version));
979 let filter = filter_chain(filters);
980
981 let new_version = Uuid::new_v4();
982 let row = self.user_to_row(user, &new_version);
984
985 let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
986 user.version = Some(new_version);
987 Ok(predicate_matched)
988 }
989
990 async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
991 let row_key = uaid.as_simple().to_string();
992 let mut req = self.read_row_request(&row_key);
993 let mut filters = vec![router_gc_policy_filter()];
994 filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
995 req.set_filter(filter_chain(filters));
996 let Some(mut row) = self.read_row(req).await? else {
997 return Ok(None);
998 };
999
1000 trace!("🉑 Found a record for {}", row_key);
1001
1002 let connected_at_cell = match row.take_required_cell("connected_at") {
1003 Ok(cell) => cell,
1004 Err(_) => {
1005 if !is_incomplete_router_record(&row.cells) {
1006 return Err(DbError::Integrity(
1007 "Expected column: connected_at".to_owned(),
1008 Some(format!("{row:#?}")),
1009 ));
1010 }
1011 trace!("🉑 Dropping an incomplete user record for {}", row_key);
1016 self.metrics
1017 .incr_with_tags(MetricName::DatabaseDropUser)
1018 .with_tag("reason", "incomplete_record")
1019 .send();
1020 self.remove_user(uaid).await?;
1021 return Ok(None);
1022 }
1023 };
1024
1025 let mut result = User {
1026 uaid: *uaid,
1027 connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1028 router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1029 record_version: Some(to_u64(
1030 row.take_required_cell("record_version")?.value,
1031 "record_version",
1032 )?),
1033 version: Some(
1034 row.take_required_cell("version")?
1035 .value
1036 .try_into()
1037 .map_err(|e| {
1038 DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1039 })?,
1040 ),
1041 ..Default::default()
1042 };
1043
1044 if let Some(cell) = row.take_cell("router_data") {
1045 result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1046 DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1047 })?;
1048 }
1049
1050 if let Some(cell) = row.take_cell("node_id") {
1051 result.node_id = Some(to_string(cell.value, "node_id")?);
1052 }
1053
1054 if let Some(cell) = row.take_cell("current_timestamp") {
1055 result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1056 }
1057
1058 result.priv_channels = channels_from_cells(&row.cells)?;
1060
1061 Ok(Some(result))
1062 }
1063
1064 async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1065 let row_key = uaid.simple().to_string();
1066 self.delete_row(&row_key).await?;
1067 Ok(())
1068 }
1069
1070 async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1071 let channels = HashSet::from_iter([channel_id.to_owned()]);
1072 self.add_channels(uaid, channels).await
1073 }
1074
1075 async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1078 let row_key = uaid.simple().to_string();
1092 let mut row = Row::new(row_key);
1093 let expiry = std::time::SystemTime::now() + self.router_ttl();
1094
1095 row.add_cells(
1099 ROUTER_FAMILY,
1100 channels_to_cells(Cow::Owned(channels), expiry),
1101 );
1102
1103 self.write_row(row).await?;
1104 Ok(())
1105 }
1106
1107 async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1108 let row_key = uaid.simple().to_string();
1109 let mut req = self.read_row_request(&row_key);
1110
1111 let mut cq_filter = data::RowFilter::default();
1112 cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());
1113 req.set_filter(filter_chain(vec![
1114 router_gc_policy_filter(),
1115 family_filter(format!("^{ROUTER_FAMILY}$")),
1116 cq_filter,
1117 ]));
1118
1119 let Some(row) = self.read_row(req).await? else {
1120 return Ok(Default::default());
1121 };
1122 channels_from_cells(&row.cells)
1123 }
1124
1125 async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1127 let row_key = uaid.simple().to_string();
1128 let mut req = self.check_and_mutate_row_request(&row_key);
1129
1130 let column = format!("chid:{}", channel_id.as_hyphenated());
1132 let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1133
1134 let mut row = Row::new(row_key);
1136 let expiry = std::time::SystemTime::now() + self.router_ttl();
1137 row.cells
1138 .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1139 mutations.extend(self.get_mutations(row.cells)?);
1140
1141 let mut cq_filter = data::RowFilter::default();
1143 cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
1144 req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1145 req.set_true_mutations(mutations);
1146
1147 Ok(self.check_and_mutate(req).await?)
1148 }
1149
1150 async fn remove_node_id(
1152 &self,
1153 uaid: &Uuid,
1154 _node_id: &str,
1155 _connected_at: u64,
1156 version: &Option<Uuid>,
1157 ) -> DbResult<bool> {
1158 let row_key = uaid.simple().to_string();
1159 trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1160 let Some(version) = version else {
1161 return Err(DbError::General("Expected a user version field".to_owned()));
1162 };
1163
1164 let mut req = self.check_and_mutate_row_request(&row_key);
1165
1166 let mut filters = vec![router_gc_policy_filter()];
1167 filters.extend(version_filter(version));
1168 req.set_predicate_filter(filter_chain(filters));
1169 req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);
1170
1171 Ok(self.check_and_mutate(req).await?)
1172 }
1173
1174 async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1176 let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1177 debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1178 trace!(
1179 "🉑 timestamp: {:?}",
1180 &message.timestamp.to_be_bytes().to_vec()
1181 );
1182 let mut row = Row::new(row_key);
1183
1184 let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1188 trace!(
1189 "🉑 Message Expiry {}",
1190 expiry
1191 .duration_since(SystemTime::UNIX_EPOCH)
1192 .unwrap_or_default()
1193 .as_millis()
1194 );
1195
1196 let mut cells: Vec<cell::Cell> = Vec::new();
1197
1198 let is_topic = message.topic.is_some();
1199 let family = if is_topic {
1200 MESSAGE_TOPIC_FAMILY
1201 } else {
1202 MESSAGE_FAMILY
1203 };
1204 cells.extend(vec![
1205 cell::Cell {
1206 qualifier: "ttl".to_owned(),
1207 value: message.ttl.to_be_bytes().to_vec(),
1208 timestamp: expiry,
1209 ..Default::default()
1210 },
1211 cell::Cell {
1212 qualifier: "timestamp".to_owned(),
1213 value: message.timestamp.to_be_bytes().to_vec(),
1214 timestamp: expiry,
1215 ..Default::default()
1216 },
1217 cell::Cell {
1218 qualifier: "version".to_owned(),
1219 value: message.version.into_bytes(),
1220 timestamp: expiry,
1221 ..Default::default()
1222 },
1223 ]);
1224 if let Some(headers) = message.headers {
1225 if !headers.is_empty() {
1226 cells.push(cell::Cell {
1227 qualifier: "headers".to_owned(),
1228 value: json!(headers).to_string().into_bytes(),
1229 timestamp: expiry,
1230 ..Default::default()
1231 });
1232 }
1233 }
1234 #[cfg(feature = "reliable_report")]
1235 {
1236 if let Some(reliability_id) = message.reliability_id {
1237 trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1238 cells.push(cell::Cell {
1239 qualifier: "reliability_id".to_owned(),
1240 value: reliability_id.into_bytes(),
1241 timestamp: expiry,
1242 ..Default::default()
1243 });
1244 }
1245 if let Some(reliable_state) = message.reliable_state {
1246 cells.push(cell::Cell {
1247 qualifier: "reliable_state".to_owned(),
1248 value: reliable_state.to_string().into_bytes(),
1249 timestamp: expiry,
1250 ..Default::default()
1251 });
1252 }
1253 }
1254 if let Some(data) = message.data {
1255 cells.push(cell::Cell {
1256 qualifier: "data".to_owned(),
1257 value: data.into_bytes(),
1258 timestamp: expiry,
1259 ..Default::default()
1260 });
1261 }
1262
1263 row.add_cells(family, cells);
1264 trace!("🉑 Adding row");
1265 self.write_row(row).await?;
1266
1267 self.metrics
1268 .incr_with_tags(MetricName::NotificationMessageStored)
1269 .with_tag("topic", &is_topic.to_string())
1270 .with_tag("database", &self.name())
1271 .send();
1272 Ok(())
1273 }
1274
1275 async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1280 for message in messages {
1282 self.save_message(uaid, message).await?;
1283 }
1284 Ok(())
1285 }
1286
1287 async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1301 let row_key = uaid.simple().to_string();
1302 debug!(
1303 "🉑 Updating {} current_timestamp: {:?}",
1304 &row_key,
1305 timestamp.to_be_bytes().to_vec()
1306 );
1307 let expiry = std::time::SystemTime::now() + self.router_ttl();
1308 let mut row = Row::new(row_key.clone());
1309
1310 row.cells.insert(
1311 ROUTER_FAMILY.to_owned(),
1312 vec![
1313 cell::Cell {
1314 qualifier: "current_timestamp".to_owned(),
1315 value: timestamp.to_be_bytes().to_vec(),
1316 timestamp: expiry,
1317 ..Default::default()
1318 },
1319 new_version_cell(expiry),
1320 ],
1321 );
1322
1323 self.write_row(row).await?;
1324
1325 Ok(())
1326 }
1327
1328 async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1330 trace!(
1331 "🉑 attemping to delete {:?} :: {:?}",
1332 uaid.to_string(),
1333 chidmessageid
1334 );
1335 let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1336 debug!("🉑🔥 Deleting message {}", &row_key);
1337 self.delete_row(&row_key).await?;
1338 self.metrics
1339 .incr_with_tags(MetricName::NotificationMessageDeleted)
1340 .with_tag("database", &self.name())
1341 .send();
1342 Ok(())
1343 }
1344
1345 async fn fetch_topic_messages(
1347 &self,
1348 uaid: &Uuid,
1349 limit: usize,
1350 ) -> DbResult<FetchMessageResponse> {
1351 let mut req = ReadRowsRequest::default();
1352 req.set_table_name(self.settings.table_name.clone());
1353 req.set_app_profile_id(self.settings.app_profile_id.clone());
1354
1355 let start_key = format!("{}#01:", uaid.simple());
1356 let end_key = format!("{}#02:", uaid.simple());
1357 let mut rows = data::RowSet::default();
1358 let mut row_range = data::RowRange::default();
1359 row_range.set_start_key_open(start_key.into_bytes());
1360 row_range.set_end_key_open(end_key.into_bytes());
1361 let mut row_ranges = RepeatedField::default();
1362 row_ranges.push(row_range);
1363 rows.set_row_ranges(row_ranges);
1364 req.set_rows(rows);
1365
1366 let mut filters = message_gc_policy_filter()?;
1367 filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1368
1369 req.set_filter(filter_chain(filters));
1370 if limit > 0 {
1371 trace!("🉑 Setting limit to {limit}");
1372 req.set_rows_limit(limit as i64);
1373 }
1374 let rows = self.read_rows(req).await?;
1375 debug!(
1376 "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1377 rows.len(),
1378 limit
1379 );
1380
1381 let messages = self.rows_to_notifications(rows)?;
1382
1383 Ok(FetchMessageResponse {
1387 messages,
1388 timestamp: None,
1389 })
1390 }
1391
1392 async fn fetch_timestamp_messages(
1395 &self,
1396 uaid: &Uuid,
1397 timestamp: Option<u64>,
1398 limit: usize,
1399 ) -> DbResult<FetchMessageResponse> {
1400 let mut req = ReadRowsRequest::default();
1401 req.set_table_name(self.settings.table_name.clone());
1402 req.set_app_profile_id(self.settings.app_profile_id.clone());
1403
1404 let mut rows = data::RowSet::default();
1405 let mut row_range = data::RowRange::default();
1406
1407 let start_key = if let Some(ts) = timestamp {
1408 format!("{}#02:{}z", uaid.simple(), ts)
1411 } else {
1412 format!("{}#02:", uaid.simple())
1413 };
1414 let end_key = format!("{}#03:", uaid.simple());
1415 row_range.set_start_key_open(start_key.into_bytes());
1416 row_range.set_end_key_open(end_key.into_bytes());
1417
1418 let mut row_ranges = RepeatedField::default();
1419 row_ranges.push(row_range);
1420 rows.set_row_ranges(row_ranges);
1421 req.set_rows(rows);
1422
1423 let mut filters = message_gc_policy_filter()?;
1437 filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1438
1439 req.set_filter(filter_chain(filters));
1440 if limit > 0 {
1441 req.set_rows_limit(limit as i64);
1442 }
1443 let rows = self.read_rows(req).await?;
1444 debug!(
1445 "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1446 timestamp,
1447 rows.len(),
1448 limit,
1449 );
1450
1451 let messages = self.rows_to_notifications(rows)?;
1452 let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1454 Ok(FetchMessageResponse {
1455 messages,
1456 timestamp,
1457 })
1458 }
1459
1460 async fn health_check(&self) -> DbResult<bool> {
1461 Ok(self
1462 .pool
1463 .get()
1464 .await?
1465 .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1466 .await?)
1467 }
1468
1469 async fn router_table_exists(&self) -> DbResult<bool> {
1472 Ok(true)
1473 }
1474
1475 async fn message_table_exists(&self) -> DbResult<bool> {
1478 Ok(true)
1479 }
1480
1481 #[cfg(feature = "reliable_report")]
1482 async fn log_report(
1483 &self,
1484 reliability_id: &str,
1485 new_state: crate::reliability::ReliabilityState,
1486 ) -> DbResult<()> {
1487 let row_key = reliability_id.to_owned();
1488
1489 let mut row = Row::new(row_key);
1490 let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1491
1492 let cells: Vec<cell::Cell> = vec![cell::Cell {
1494 qualifier: new_state.to_string(),
1495 value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1496 timestamp: expiry,
1497 ..Default::default()
1498 }];
1499
1500 row.add_cells(RELIABLE_LOG_FAMILY, cells);
1501
1502 self.write_row(row).await?;
1503
1504 Ok(())
1505 }
1506
1507 fn box_clone(&self) -> Box<dyn DbClient> {
1508 Box::new(self.clone())
1509 }
1510
1511 fn name(&self) -> String {
1512 "Bigtable".to_owned()
1513 }
1514
1515 fn pool_status(&self) -> Option<deadpool::Status> {
1516 Some(self.pool.pool.status())
1517 }
1518}
1519
1520#[cfg(all(test, feature = "emulator"))]
1521mod tests {
1522
1523 use std::sync::Arc;
1528 use std::time::SystemTime;
1529
1530 use cadence::StatsdClient;
1531 use uuid;
1532
1533 use super::*;
1534 use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1535
1536 const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1537 const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1538 const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1539
1540 fn now() -> u64 {
1541 SystemTime::now()
1542 .duration_since(SystemTime::UNIX_EPOCH)
1543 .unwrap()
1544 .as_secs()
1545 }
1546
1547 fn new_client() -> DbResult<BigTableClientImpl> {
1548 let env_dsn = format!(
1549 "grpc://{}",
1550 std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1551 );
1552 let settings = DbSettings {
1553 dsn: Some(env_dsn),
1559 db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1560 .to_string(),
1561 };
1562
1563 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1564
1565 BigTableClientImpl::new(metrics, &settings)
1566 }
1567
1568 #[test]
1569 fn escape_bytes_for_regex() {
1570 let b = b"hi";
1571 assert_eq!(escape_bytes(b), b.to_vec());
1572 assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1573 let b = b"\xe2\x80\xb3";
1574 assert_eq!(escape_bytes(b), b.to_vec());
1575 let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1577 assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1578 let b = b"\xc0";
1579 assert_eq!(escape_bytes(b), b.to_vec());
1580 assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1581 }
1582
1583 #[actix_rt::test]
1584 async fn health_check() {
1585 let client = new_client().unwrap();
1586
1587 let result = client.health_check().await;
1588 assert!(result.is_ok());
1589 assert!(result.unwrap());
1590 }
1591
1592 #[actix_rt::test]
1595 async fn run_gauntlet() -> DbResult<()> {
1596 let client = new_client()?;
1597
1598 let connected_at = ms_since_epoch();
1599
1600 let uaid = Uuid::parse_str(TEST_USER).unwrap();
1601 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1602 let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1603
1604 let node_id = "test_node".to_owned();
1605
1606 let _ = client.remove_user(&uaid).await;
1608
1609 let test_user = User {
1610 uaid,
1611 router_type: "webpush".to_owned(),
1612 connected_at,
1613 router_data: None,
1614 node_id: Some(node_id.clone()),
1615 ..Default::default()
1616 };
1617
1618 let _ = client.remove_user(&uaid).await;
1621
1622 client.add_user(&test_user).await?;
1624 let fetched = client.get_user(&uaid).await?;
1625 assert!(fetched.is_some());
1626 let fetched = fetched.unwrap();
1627 assert_eq!(fetched.router_type, "webpush".to_owned());
1628
1629 let connected_at = ms_since_epoch();
1631
1632 client.add_channel(&uaid, &chid).await?;
1634 let channels = client.get_channels(&uaid).await?;
1635 assert!(channels.contains(&chid));
1636
1637 let mut new_channels: HashSet<Uuid> = HashSet::new();
1639 new_channels.insert(chid);
1640 for _ in 1..10 {
1641 new_channels.insert(uuid::Uuid::new_v4());
1642 }
1643 let chid_to_remove = uuid::Uuid::new_v4();
1644 new_channels.insert(chid_to_remove);
1645 client.add_channels(&uaid, new_channels.clone()).await?;
1646 let channels = client.get_channels(&uaid).await?;
1647 assert_eq!(channels, new_channels);
1648
1649 assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1651 assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1652 new_channels.remove(&chid_to_remove);
1653 let channels = client.get_channels(&uaid).await?;
1654 assert_eq!(channels, new_channels);
1655
1656 let mut updated = User {
1660 connected_at,
1661 ..test_user.clone()
1662 };
1663 let result = client.update_user(&mut updated).await;
1664 assert!(result.is_ok());
1665 assert!(!result.unwrap());
1666
1667 let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1669 assert_eq!(fetched.connected_at, fetched2.connected_at);
1670
1671 let mut updated = User {
1673 connected_at: fetched.connected_at + 300,
1674 ..fetched2
1675 };
1676 let result = client.update_user(&mut updated).await;
1677 assert!(result.is_ok());
1678 assert!(result.unwrap());
1679 assert_ne!(
1680 fetched2.connected_at,
1681 client.get_user(&uaid).await?.unwrap().connected_at
1682 );
1683
1684 client
1686 .increment_storage(
1687 &fetched.uaid,
1688 SystemTime::now()
1689 .duration_since(SystemTime::UNIX_EPOCH)
1690 .unwrap()
1691 .as_secs(),
1692 )
1693 .await?;
1694
1695 let test_data = "An_encrypted_pile_of_crap".to_owned();
1696 let timestamp = now();
1697 let sort_key = now();
1698 let test_notification = crate::db::Notification {
1700 channel_id: chid,
1701 version: "test".to_owned(),
1702 ttl: 300,
1703 timestamp,
1704 data: Some(test_data.clone()),
1705 sortkey_timestamp: Some(sort_key),
1706 ..Default::default()
1707 };
1708 let res = client.save_message(&uaid, test_notification.clone()).await;
1709 assert!(res.is_ok());
1710
1711 let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1712 assert_ne!(fetched.messages.len(), 0);
1713 let fm = fetched.messages.pop().unwrap();
1714 assert_eq!(fm.channel_id, test_notification.channel_id);
1715 assert_eq!(fm.data, Some(test_data));
1716
1717 let fetched = client
1719 .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1720 .await?;
1721 assert_ne!(fetched.messages.len(), 0);
1722
1723 let fetched = client
1725 .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1726 .await?;
1727 assert_eq!(fetched.messages.len(), 0);
1728
1729 assert!(client
1731 .remove_message(&uaid, &test_notification.chidmessageid())
1732 .await
1733 .is_ok());
1734
1735 assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1736
1737 client.add_channel(&uaid, &topic_chid).await?;
1739 let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1740 let timestamp = now();
1741 let sort_key = now();
1742 let test_notification = crate::db::Notification {
1744 channel_id: topic_chid,
1745 version: "test".to_owned(),
1746 ttl: 300,
1747 topic: Some("topic".to_owned()),
1748 timestamp,
1749 data: Some(test_data.clone()),
1750 sortkey_timestamp: Some(sort_key),
1751 ..Default::default()
1752 };
1753 assert!(client
1754 .save_message(&uaid, test_notification.clone())
1755 .await
1756 .is_ok());
1757
1758 let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1759 assert_ne!(fetched.messages.len(), 0);
1760 let fm = fetched.messages.pop().unwrap();
1761 assert_eq!(fm.channel_id, test_notification.channel_id);
1762 assert_eq!(fm.data, Some(test_data));
1763
1764 let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1766 assert_ne!(fetched.messages.len(), 0);
1767
1768 assert!(client
1770 .remove_message(&uaid, &test_notification.chidmessageid())
1771 .await
1772 .is_ok());
1773
1774 assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1775
1776 let msgs = client
1777 .fetch_timestamp_messages(&uaid, None, 999)
1778 .await?
1779 .messages;
1780 assert!(msgs.is_empty());
1781
1782 let fetched = client.get_user(&uaid).await?.unwrap();
1783 assert!(client
1784 .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1785 .await
1786 .is_ok());
1787 let fetched = client.get_user(&uaid).await?.unwrap();
1789 assert_eq!(fetched.node_id, None);
1790
1791 assert!(client.remove_user(&uaid).await.is_ok());
1792
1793 assert!(client.get_user(&uaid).await?.is_none());
1794
1795 Ok(())
1796 }
1797
1798 #[actix_rt::test]
1799 async fn read_cells_family_id() -> DbResult<()> {
1800 let client = new_client().unwrap();
1801 let uaid = gen_test_uaid();
1802 client.remove_user(&uaid).await.unwrap();
1803
1804 let qualifier = "foo".to_owned();
1805
1806 let row_key = uaid.simple().to_string();
1807 let mut row = Row::new(row_key.clone());
1808 row.cells.insert(
1809 ROUTER_FAMILY.to_owned(),
1810 vec![cell::Cell {
1811 qualifier: qualifier.to_owned(),
1812 value: "bar".as_bytes().to_vec(),
1813 ..Default::default()
1814 }],
1815 );
1816 client.write_row(row).await.unwrap();
1817 let req = client.read_row_request(&row_key);
1818 let Some(row) = client.read_row(req).await.unwrap() else {
1819 panic!("Expected row");
1820 };
1821 assert_eq!(row.cells.len(), 1);
1822 assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1823 client.remove_user(&uaid).await
1824 }
1825
1826 #[actix_rt::test]
1827 async fn add_user_existing() {
1828 let client = new_client().unwrap();
1829 let uaid = gen_test_uaid();
1830 let user = User {
1831 uaid,
1832 ..Default::default()
1833 };
1834 client.remove_user(&uaid).await.unwrap();
1835
1836 client.add_user(&user).await.unwrap();
1837 let err = client.add_user(&user).await.unwrap_err();
1838 assert!(matches!(err, DbError::Conditional));
1839 }
1840
1841 #[actix_rt::test]
1842 async fn version_check() {
1843 let client = new_client().unwrap();
1844 let uaid = gen_test_uaid();
1845 let user = User {
1846 uaid,
1847 ..Default::default()
1848 };
1849 client.remove_user(&uaid).await.unwrap();
1850
1851 client.add_user(&user).await.unwrap();
1852 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1853 assert!(client.update_user(&mut user.clone()).await.unwrap());
1854
1855 let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1856 assert_ne!(user.version, fetched.version);
1857 assert!(!client.update_user(&mut user).await.unwrap());
1859
1860 client.remove_user(&uaid).await.unwrap();
1861 }
1862
1863 #[actix_rt::test]
1864 async fn lingering_chid_record() {
1865 let client = new_client().unwrap();
1866 let uaid = gen_test_uaid();
1867 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1868 let user = User {
1869 uaid,
1870 ..Default::default()
1871 };
1872 client.remove_user(&uaid).await.unwrap();
1873
1874 client.add_channel(&uaid, &chid).await.unwrap();
1876
1877 assert!(client.get_user(&uaid).await.unwrap().is_none());
1880
1881 client.add_user(&user).await.unwrap();
1882 assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1884
1885 client.remove_user(&uaid).await.unwrap();
1886 }
1887
1888 #[actix_rt::test]
1889 async fn lingering_current_timestamp() {
1890 let client = new_client().unwrap();
1891 let uaid = gen_test_uaid();
1892 client.remove_user(&uaid).await.unwrap();
1893
1894 client
1895 .increment_storage(&uaid, ms_since_epoch())
1896 .await
1897 .unwrap();
1898 assert!(client.get_user(&uaid).await.unwrap().is_none());
1899
1900 client.remove_user(&uaid).await.unwrap();
1901 }
1902
1903 #[actix_rt::test]
1904 async fn lingering_chid_w_version_record() {
1905 let client = new_client().unwrap();
1906 let uaid = gen_test_uaid();
1907 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1908 client.remove_user(&uaid).await.unwrap();
1909
1910 client.add_channel(&uaid, &chid).await.unwrap();
1911 assert!(client.remove_channel(&uaid, &chid).await.unwrap());
1912 assert!(client.get_user(&uaid).await.unwrap().is_none());
1913
1914 client.remove_user(&uaid).await.unwrap();
1915 }
1916
1917 #[actix_rt::test]
1918 async fn channel_and_current_timestamp_ttl_updates() {
1919 let client = new_client().unwrap();
1920 let uaid = gen_test_uaid();
1921 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1922 client.remove_user(&uaid).await.unwrap();
1923
1924 let user = User {
1926 uaid,
1927 ..Default::default()
1928 };
1929 client.add_user(&user).await.unwrap();
1930
1931 client.add_channel(&uaid, &chid).await.unwrap();
1932 client
1933 .add_channel(&uaid, &uuid::Uuid::new_v4())
1934 .await
1935 .unwrap();
1936
1937 client
1938 .increment_storage(
1939 &uaid,
1940 SystemTime::now()
1941 .duration_since(SystemTime::UNIX_EPOCH)
1942 .unwrap()
1943 .as_secs(),
1944 )
1945 .await
1946 .unwrap();
1947
1948 let req = client.read_row_request(&uaid.as_simple().to_string());
1949 let Some(mut row) = client.read_row(req).await.unwrap() else {
1950 panic!("Expected row");
1951 };
1952
1953 let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
1956 for mut cells in row.cells.into_values() {
1957 let Some(cell) = cells.pop() else {
1958 continue;
1959 };
1960 assert!(
1961 cell.timestamp >= ca_expiry,
1962 "{} cell timestamp should >= connected_at's",
1963 cell.qualifier
1964 );
1965 }
1966
1967 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1968
1969 tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
1971 client.update_user(&mut user).await.unwrap();
1972
1973 let req = client.read_row_request(&uaid.as_simple().to_string());
1975 let Some(mut row) = client.read_row(req).await.unwrap() else {
1976 panic!("Expected row");
1977 };
1978
1979 let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
1980
1981 assert!(ca_expiry2 > ca_expiry);
1982
1983 for mut cells in row.cells.into_values() {
1984 let Some(cell) = cells.pop() else {
1985 continue;
1986 };
1987 assert!(
1988 cell.timestamp >= ca_expiry2,
1989 "{} cell timestamp expiry should exceed connected_at's",
1990 cell.qualifier
1991 );
1992 }
1993
1994 client.remove_user(&uaid).await.unwrap();
1995 }
1996}