1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8
9use again::RetryPolicy;
10use async_trait::async_trait;
11use cadence::StatsdClient;
12#[cfg(feature = "reliable_report")]
13use chrono::TimeDelta;
14use futures_util::StreamExt;
15use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
16use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
17use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
18use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
19use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
20use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
21use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
22use protobuf::RepeatedField;
23use serde_json::{from_str, json};
24use uuid::Uuid;
25
26use crate::db::{
27 client::{DbClient, FetchMessageResponse},
28 error::{DbError, DbResult},
29 models::RangeKey,
30 DbSettings, Notification, User, USER_RECORD_VERSION,
31};
32use crate::metric_name::MetricName;
33use crate::metrics::StatsdClientExt;
34use crate::MAX_ROUTER_TTL_SECS;
35
36pub use self::metadata::MetadataBuilder;
37use self::row::{Row, RowCells};
38use super::pool::BigTablePool;
39use super::BigTableDbSettings;
40
41pub mod cell;
42pub mod error;
43pub(crate) mod merge;
44pub mod metadata;
45pub mod row;
46
47pub type RowKey = String;
49
50pub type Qualifier = String;
55pub type FamilyId = String;
56
57const ROUTER_FAMILY: &str = "router";
58const MESSAGE_FAMILY: &str = "message"; const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
60#[cfg(feature = "reliable_report")]
61const RELIABLE_LOG_FAMILY: &str = "reliability";
62#[cfg(feature = "reliable_report")]
63pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
66
67pub(crate) const RETRY_COUNT: usize = 5;
68
69struct Uaid(Uuid);
72
73impl Display for Uaid {
74 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75 write!(f, "{}", self.0.as_simple())
76 }
77}
78
79impl From<Uaid> for String {
80 fn from(uaid: Uaid) -> String {
81 uaid.0.as_simple().to_string()
82 }
83}
84
85#[derive(Clone)]
86pub struct BigTableClientImpl {
88 pub(crate) settings: BigTableDbSettings,
89 metrics: Arc<StatsdClient>,
91 pool: BigTablePool,
93 metadata: Metadata,
94 admin_metadata: Metadata,
95}
96
97fn router_gc_policy_filter() -> data::RowFilter {
99 let mut latest_cell_filter = data::RowFilter::default();
100 latest_cell_filter.set_cells_per_column_limit_filter(1);
101 latest_cell_filter
102}
103
104fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
107 let mut timestamp_filter = data::RowFilter::default();
108 let bt_now: i64 = SystemTime::now()
109 .duration_since(SystemTime::UNIX_EPOCH)
110 .map_err(error::BigTableError::WriteTime)?
111 .as_millis() as i64;
112 let mut range_filter = data::TimestampRange::default();
113 range_filter.set_start_timestamp_micros(bt_now * 1000);
114 timestamp_filter.set_timestamp_range_filter(range_filter);
115
116 Ok(vec![router_gc_policy_filter(), timestamp_filter])
117}
118
119fn family_filter(regex: String) -> data::RowFilter {
121 let mut filter = data::RowFilter::default();
122 filter.set_family_name_regex_filter(regex);
123 filter
124}
125
126fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
130 let mut vec = Vec::with_capacity(bytes.len() * 2);
131 for &b in bytes {
132 if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
133 if b == b'\0' {
134 vec.extend("\\x00".as_bytes());
139 continue;
140 }
141 vec.push(b'\\');
142 }
143 vec.push(b);
144 }
145 vec
146}
147
148fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
151 let mut cq_filter = data::RowFilter::default();
152 cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());
153
154 let mut value_filter = data::RowFilter::default();
155 value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));
156
157 vec![
158 family_filter(format!("^{ROUTER_FAMILY}$")),
159 cq_filter,
160 value_filter,
161 ]
162}
163
164fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
166 cell::Cell {
167 qualifier: "version".to_owned(),
168 value: Uuid::new_v4().into(),
169 timestamp,
170 ..Default::default()
171 }
172}
173
174fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
176 let mut chain = RowFilter_Chain::default();
177 chain.set_filters(filters.into());
178 let mut filter = RowFilter::default();
179 filter.set_chain(chain);
180 filter
181}
182
183fn read_row_request(
185 table_name: &str,
186 app_profile_id: &str,
187 row_key: &str,
188) -> bigtable::ReadRowsRequest {
189 let mut req = bigtable::ReadRowsRequest::default();
190 req.set_table_name(table_name.to_owned());
191 req.set_app_profile_id(app_profile_id.to_owned());
192
193 let mut row_keys = RepeatedField::default();
194 row_keys.push(row_key.as_bytes().to_vec());
195 let mut row_set = data::RowSet::default();
196 row_set.set_row_keys(row_keys);
197 req.set_rows(row_set);
198
199 req
200}
201
202fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
203 let v: [u8; 8] = value
204 .try_into()
205 .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
206 Ok(u64::from_be_bytes(v))
207}
208
209fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
210 String::from_utf8(value).map_err(|e| {
211 debug!("🉑 cannot read string {}: {:?}", name, e);
212 DbError::DeserializeString(name.to_owned())
213 })
214}
215
216fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
220 let mut result = HashSet::new();
221 for cells in cells.values() {
222 let Some(cell) = cells.last() else {
223 continue;
224 };
225 let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
226 return Err(DbError::Integrity(
227 "get_channels expected: chid:<chid>".to_owned(),
228 None,
229 ));
230 };
231 result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
232 }
233 Ok(result)
234}
235
236fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
238 let channels = channels.into_owned();
239 let mut cells = Vec::with_capacity(channels.len().min(100_000));
240 for (i, channel_id) in channels.into_iter().enumerate() {
241 if i >= 100_000 {
245 break;
246 }
247 cells.push(cell::Cell {
248 qualifier: format!("chid:{}", channel_id.as_hyphenated()),
249 timestamp: expiry,
250 ..Default::default()
251 });
252 }
253 cells
254}
255
256pub fn retry_policy(max: usize) -> RetryPolicy {
257 RetryPolicy::default()
258 .with_max_retries(max)
259 .with_jitter(true)
260}
261
262fn retryable_internal_err(status: &RpcStatus) -> bool {
263 match status.code() {
264 RpcStatusCode::UNKNOWN => status
265 .message()
266 .eq_ignore_ascii_case("error occurred when fetching oauth2 token."),
267 RpcStatusCode::INTERNAL => [
268 "rst_stream",
269 "rst stream",
270 "received unexpected eos on data frame from server",
271 ]
272 .contains(&status.message().to_lowercase().as_str()),
273 RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
274 _ => false,
275 }
276}
277
278pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
279 let mut metric = metrics
280 .incr_with_tags(MetricName::DatabaseRetry)
281 .with_tag("error", err_type)
282 .with_tag("type", "bigtable");
283 if let Some(code) = code {
284 metric = metric.with_tag("code", code);
285 }
286 metric.send();
287}
288
289pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool + '_ {
290 move |err| {
291 debug!("🉑 Checking grpcio::Error...{err}");
292 match err {
293 grpcio::Error::RpcFailure(status) => {
294 info!("GRPC Failure :{:?}", status);
295 let retry = retryable_internal_err(status);
296 if retry {
297 metric(metrics, "RpcFailure", Some(&status.code().to_string()));
298 }
299 retry
300 }
301 grpcio::Error::BindFail(_) => {
302 metric(metrics, "BindFail", None);
303 true
304 }
305 grpcio::Error::CallFailure(grpc_call_status) => {
308 let retry = grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR;
309 if retry {
310 metric(
311 metrics,
312 "CallFailure",
313 Some(&format!("{grpc_call_status:?}")),
314 );
315 }
316 retry
317 }
318 _ => false,
319 }
320 }
321}
322
323pub fn retryable_bt_err(
324 metrics: &Arc<StatsdClient>,
325) -> impl Fn(&error::BigTableError) -> bool + '_ {
326 move |err| {
327 debug!("🉑 Checking BigTableError...{err}");
328 match err {
329 error::BigTableError::InvalidRowResponse(e)
330 | error::BigTableError::Read(e)
331 | error::BigTableError::Write(e)
332 | error::BigTableError::GRPC(e) => retryable_grpcio_err(metrics)(e),
333 _ => false,
334 }
335 }
336}
337
338fn is_incomplete_router_record(cells: &RowCells) -> bool {
352 cells
353 .keys()
354 .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
355}
356
357fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
358 ::grpcio::CallOption::default().headers(metadata)
359}
360
361impl BigTableClientImpl {
386 pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
387 debug!("🏊 BT Pool new");
389 let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
390 info!("🉑 {:#?}", db_settings);
391 let pool = BigTablePool::new(settings, &metrics)?;
392
393 let metadata = db_settings.metadata()?;
395 let admin_metadata = db_settings.admin_metadata()?;
396 Ok(Self {
397 settings: db_settings,
398 metrics,
399 metadata,
400 admin_metadata,
401 pool,
402 })
403 }
404
405 fn router_ttl(&self) -> Duration {
406 self.settings
407 .max_router_ttl
408 .unwrap_or(Duration::from_secs(MAX_ROUTER_TTL_SECS))
409 }
410
411 pub fn spawn_sweeper(&self, interval: Duration) {
413 self.pool.spawn_sweeper(interval);
414 }
415
416 fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
418 read_row_request(
419 &self.settings.table_name,
420 &self.settings.app_profile_id,
421 row_key,
422 )
423 }
424
425 fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
427 let mut req = bigtable::MutateRowRequest::default();
428 req.set_table_name(self.settings.table_name.clone());
429 req.set_app_profile_id(self.settings.app_profile_id.clone());
430 req.set_row_key(row_key.as_bytes().to_vec());
431 req
432 }
433
434 fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
436 let mut req = bigtable::CheckAndMutateRowRequest::default();
437 req.set_table_name(self.settings.table_name.clone());
438 req.set_app_profile_id(self.settings.app_profile_id.clone());
439 req.set_row_key(row_key.as_bytes().to_vec());
440 req
441 }
442
443 async fn mutate_row(
445 &self,
446 req: bigtable::MutateRowRequest,
447 ) -> Result<(), error::BigTableError> {
448 let bigtable = self.pool.get().await?;
449 retry_policy(self.settings.retry_count)
450 .retry_if(
451 || async {
452 bigtable
453 .conn
454 .mutate_row_opt(&req, call_opts(self.metadata.clone()))
455 },
456 retryable_grpcio_err(&self.metrics),
457 )
458 .await
459 .map_err(error::BigTableError::Write)?;
460 Ok(())
461 }
462
463 #[allow(unused)]
465 async fn mutate_rows(
466 &self,
467 req: bigtable::MutateRowsRequest,
468 ) -> Result<(), error::BigTableError> {
469 let bigtable = self.pool.get().await?;
470 let resp = retry_policy(self.settings.retry_count)
472 .retry_if(
473 || async {
474 bigtable
475 .conn
476 .mutate_rows_opt(&req, call_opts(self.metadata.clone()))
477 },
478 retryable_grpcio_err(&self.metrics),
479 )
480 .await
481 .map_err(error::BigTableError::Write)?;
482
483 let mut stream = Box::pin(resp);
490 let mut cnt = 0;
491 loop {
492 let (result, remainder) = StreamExt::into_future(stream).await;
493 if let Some(result) = result {
494 debug!("🎏 Result block: {}", cnt);
495 match result {
496 Ok(r) => {
497 for e in r.get_entries() {
498 if e.has_status() {
499 let status = e.get_status();
500 let code = error::MutateRowStatus::from(status.get_code());
502 if !code.is_ok() {
503 return Err(error::BigTableError::Status(
504 code,
505 status.get_message().to_owned(),
506 ));
507 }
508 debug!("🎏 Response: {} OK", e.index);
509 }
510 }
511 }
512 Err(e) => return Err(error::BigTableError::Write(e)),
513 };
514 cnt += 1;
515 } else {
516 debug!("🎏 Done!");
517 break;
518 }
519 stream = remainder;
520 }
521
522 Ok(())
523 }
524
525 async fn read_row(
527 &self,
528 req: bigtable::ReadRowsRequest,
529 ) -> Result<Option<row::Row>, error::BigTableError> {
530 let mut rows = self.read_rows(req).await?;
531 Ok(rows.pop_first().map(|(_, v)| v))
532 }
533
534 async fn read_rows(
538 &self,
539 req: ReadRowsRequest,
540 ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
541 let bigtable = self.pool.get().await?;
542 let resp = retry_policy(self.settings.retry_count)
543 .retry_if(
544 || async {
545 let resp: grpcio::ClientSStreamReceiver<bigtable::ReadRowsResponse> = bigtable
546 .conn
547 .read_rows_opt(&req, call_opts(self.metadata.clone()))
548 .map_err(error::BigTableError::Read)?;
549 merge::RowMerger::process_chunks(resp).await
550 },
551 retryable_bt_err(&self.metrics),
552 )
553 .await?;
554 Ok(resp)
555 }
556
557 async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
561 let mut req = self.mutate_row_request(&row.row_key);
562 let mutations = self.get_mutations(row.cells)?;
567 req.set_mutations(mutations);
568 self.mutate_row(req).await?;
569 Ok(())
570 }
571
572 fn get_mutations(
574 &self,
575 cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
576 ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
577 let mut mutations = protobuf::RepeatedField::default();
578 for (family_id, cells) in cells {
579 for cell in cells {
580 let mut mutation = data::Mutation::default();
581 let mut set_cell = data::Mutation_SetCell::default();
582 let timestamp = cell
583 .timestamp
584 .duration_since(SystemTime::UNIX_EPOCH)
585 .map_err(error::BigTableError::WriteTime)?;
586 set_cell.family_name.clone_from(&family_id);
587 set_cell.set_column_qualifier(cell.qualifier.clone().into_bytes());
588 set_cell.set_value(cell.value);
589 set_cell.set_timestamp_micros((timestamp.as_millis() * 1000) as i64);
592 debug!("🉑 expiring in {:?}", timestamp.as_millis());
593 mutation.set_set_cell(set_cell);
594 mutations.push(mutation);
595 }
596 }
597 Ok(mutations)
598 }
599
600 async fn check_and_mutate_row(
608 &self,
609 row: row::Row,
610 filter: RowFilter,
611 state: bool,
612 ) -> Result<bool, error::BigTableError> {
613 let mut req = self.check_and_mutate_row_request(&row.row_key);
614 let mutations = self.get_mutations(row.cells)?;
615 req.set_predicate_filter(filter);
616 if state {
617 req.set_true_mutations(mutations);
618 } else {
619 req.set_false_mutations(mutations);
620 }
621 self.check_and_mutate(req).await
622 }
623
624 async fn check_and_mutate(
625 &self,
626 req: bigtable::CheckAndMutateRowRequest,
627 ) -> Result<bool, error::BigTableError> {
628 let bigtable = self.pool.get().await?;
629 let resp = retry_policy(self.settings.retry_count)
630 .retry_if(
631 || async {
632 bigtable
635 .conn
636 .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
637 },
638 retryable_grpcio_err(&self.metrics),
639 )
640 .await
641 .map_err(error::BigTableError::Write)?;
642 debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
643 Ok(resp.get_predicate_matched())
644 }
645
646 fn get_delete_mutations(
647 &self,
648 family: &str,
649 column_names: &[&str],
650 time_range: Option<&data::TimestampRange>,
651 ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
652 let mut mutations = protobuf::RepeatedField::default();
653 for column in column_names {
654 let mut mutation = data::Mutation::default();
655 let mut del_cell = data::Mutation_DeleteFromColumn::default();
659 del_cell.set_family_name(family.to_owned());
660 del_cell.set_column_qualifier(column.as_bytes().to_vec());
661 if let Some(range) = time_range {
662 del_cell.set_time_range(range.clone());
663 }
664 mutation.set_delete_from_column(del_cell);
665 mutations.push(mutation);
666 }
667 Ok(mutations)
668 }
669
670 #[allow(unused)]
671 #[allow(unused)]
673 async fn delete_cells(
674 &self,
675 row_key: &str,
676 family: &str,
677 column_names: &[&str],
678 time_range: Option<&data::TimestampRange>,
679 ) -> Result<(), error::BigTableError> {
680 let mut req = self.mutate_row_request(row_key);
681 req.set_mutations(self.get_delete_mutations(family, column_names, time_range)?);
682 self.mutate_row(req).await
683 }
684
685 async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
687 let mut req = self.mutate_row_request(row_key);
688 let mut mutations = protobuf::RepeatedField::default();
689 let mut mutation = data::Mutation::default();
690 mutation.set_delete_from_row(data::Mutation_DeleteFromRow::default());
691 mutations.push(mutation);
692 req.set_mutations(mutations);
693 self.mutate_row(req).await
694 }
695
696 #[allow(unused)]
701 async fn delete_rows(&self, row_key: &str) -> Result<bool, error::BigTableError> {
702 let admin = BigtableTableAdminClient::new(self.pool.get_channel()?);
703 let mut req = DropRowRangeRequest::new();
704 req.set_name(self.settings.table_name.clone());
705 req.set_row_key_prefix(row_key.as_bytes().to_vec());
706
707 admin
708 .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone()))
709 .map_err(|e| {
710 error!("{:?}", e);
711 error::BigTableError::Admin(
712 format!(
713 "Could not send delete command for {}",
714 &self.settings.table_name
715 ),
716 Some(e.to_string()),
717 )
718 })?
719 .await
720 .map_err(|e| {
721 error!("post await: {:?}", e);
722 error::BigTableError::Admin(
723 format!(
724 "Could not delete data from table {}",
725 &self.settings.table_name
726 ),
727 Some(e.to_string()),
728 )
729 })?;
730
731 Ok(true)
732 }
733
734 fn rows_to_notifications(
735 &self,
736 rows: BTreeMap<String, Row>,
737 ) -> Result<Vec<Notification>, DbError> {
738 rows.into_iter()
739 .map(|(row_key, row)| self.row_to_notification(&row_key, row))
740 .collect()
741 }
742
743 fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
744 let Some((_, chidmessageid)) = row_key.split_once('#') else {
745 return Err(DbError::Integrity(
746 "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
747 None,
748 ));
749 };
750 let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
751 DbError::Integrity(
752 format!("rows_to_notification expected chidmessageid: {e}"),
753 None,
754 )
755 })?;
756
757 let mut notif = Notification {
759 channel_id: range_key.channel_id,
760 topic: range_key.topic,
761 sortkey_timestamp: range_key.sortkey_timestamp,
762 version: to_string(row.take_required_cell("version")?.value, "version")?,
763 ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
764 timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
765 ..Default::default()
766 };
767
768 if let Some(cell) = row.take_cell("data") {
770 notif.data = Some(to_string(cell.value, "data")?);
771 }
772 #[cfg(feature = "reliable_report")]
773 {
774 if let Some(cell) = row.take_cell("reliability_id") {
775 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
776 }
777 if let Some(cell) = row.take_cell("reliable_state") {
778 notif.reliable_state = Some(
779 crate::reliability::ReliabilityState::from_str(&to_string(
780 cell.value,
781 "reliable_state",
782 )?)
783 .map_err(|e| {
784 DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
785 })?,
786 );
787 }
788 }
789 if let Some(cell) = row.take_cell("headers") {
790 notif.headers = Some(
791 serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
792 .map_err(|e| DbError::Serialization(e.to_string()))?,
793 );
794 }
795 #[cfg(feature = "reliable_report")]
796 if let Some(cell) = row.take_cell("reliability_id") {
797 trace!("🚣 Is reliable");
798 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
799 }
800
801 trace!("🚣 Deserialized message row: {:?}", ¬if);
802 Ok(notif)
803 }
804
805 fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
811 let row_key = user.uaid.simple().to_string();
812 let mut row = Row::new(row_key);
813 let expiry = std::time::SystemTime::now() + self.router_ttl();
814
815 let mut cells: Vec<cell::Cell> = vec![
816 cell::Cell {
817 qualifier: "connected_at".to_owned(),
818 value: user.connected_at.to_be_bytes().to_vec(),
819 timestamp: expiry,
820 ..Default::default()
821 },
822 cell::Cell {
823 qualifier: "router_type".to_owned(),
824 value: user.router_type.clone().into_bytes(),
825 timestamp: expiry,
826 ..Default::default()
827 },
828 cell::Cell {
829 qualifier: "record_version".to_owned(),
830 value: user
831 .record_version
832 .unwrap_or(USER_RECORD_VERSION)
833 .to_be_bytes()
834 .to_vec(),
835 timestamp: expiry,
836 ..Default::default()
837 },
838 cell::Cell {
839 qualifier: "version".to_owned(),
840 value: (*version).into(),
841 timestamp: expiry,
842 ..Default::default()
843 },
844 ];
845
846 if let Some(router_data) = &user.router_data {
847 cells.push(cell::Cell {
848 qualifier: "router_data".to_owned(),
849 value: json!(router_data).to_string().as_bytes().to_vec(),
850 timestamp: expiry,
851 ..Default::default()
852 });
853 };
854 if let Some(current_timestamp) = user.current_timestamp {
855 cells.push(cell::Cell {
856 qualifier: "current_timestamp".to_owned(),
857 value: current_timestamp.to_be_bytes().to_vec(),
858 timestamp: expiry,
859 ..Default::default()
860 });
861 };
862 if let Some(node_id) = &user.node_id {
863 cells.push(cell::Cell {
864 qualifier: "node_id".to_owned(),
865 value: node_id.as_bytes().to_vec(),
866 timestamp: expiry,
867 ..Default::default()
868 });
869 };
870
871 cells.extend(channels_to_cells(
872 Cow::Borrowed(&user.priv_channels),
873 expiry,
874 ));
875
876 row.add_cells(ROUTER_FAMILY, cells);
877 row
878 }
879}
880
881#[derive(Clone)]
882pub struct BigtableDb {
883 pub(super) conn: BigtableClient,
884 pub(super) health_metadata: Metadata,
885 table_name: String,
886}
887
888impl BigtableDb {
889 pub fn new(channel: Channel, health_metadata: &Metadata, table_name: &str) -> Self {
890 Self {
891 conn: BigtableClient::new(channel),
892 health_metadata: health_metadata.clone(),
893 table_name: table_name.to_owned(),
894 }
895 }
896 pub async fn health_check(
903 &mut self,
904 metrics: &Arc<StatsdClient>,
905 app_profile_id: &str,
906 ) -> Result<bool, error::BigTableError> {
907 let random_uaid = Uuid::new_v4().simple().to_string();
914 let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
915 let mut filter = data::RowFilter::default();
916 filter.set_block_all_filter(true);
917 req.set_filter(filter);
918 let _r = retry_policy(RETRY_COUNT)
919 .retry_if(
920 || async {
921 self.conn
922 .read_rows_opt(&req, call_opts(self.health_metadata.clone()))
923 },
924 retryable_grpcio_err(metrics),
925 )
926 .await
927 .map_err(error::BigTableError::Read)?;
928
929 debug!("🉑 health check");
930 Ok(true)
931 }
932}
933
934#[async_trait]
935impl DbClient for BigTableClientImpl {
936 async fn add_user(&self, user: &User) -> DbResult<()> {
938 trace!("🉑 Adding user");
939 let Some(ref version) = user.version else {
940 return Err(DbError::General(
941 "add_user expected a user version field".to_owned(),
942 ));
943 };
944 let row = self.user_to_row(user, version);
945
946 let mut row_key_filter = RowFilter::default();
948 row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
949 let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
950
951 if self.check_and_mutate_row(row, filter, false).await? {
952 return Err(DbError::Conditional);
953 }
954 Ok(())
955 }
956
957 async fn update_user(&self, user: &mut User) -> DbResult<bool> {
973 let Some(ref version) = user.version else {
974 return Err(DbError::General(
975 "update_user expected a user version field".to_owned(),
976 ));
977 };
978
979 let mut filters = vec![router_gc_policy_filter()];
980 filters.extend(version_filter(version));
981 let filter = filter_chain(filters);
982
983 let new_version = Uuid::new_v4();
984 let row = self.user_to_row(user, &new_version);
986
987 let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
988 user.version = Some(new_version);
989 Ok(predicate_matched)
990 }
991
992 async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
993 let row_key = uaid.as_simple().to_string();
994 let mut req = self.read_row_request(&row_key);
995 let mut filters = vec![router_gc_policy_filter()];
996 filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
997 req.set_filter(filter_chain(filters));
998 let Some(mut row) = self.read_row(req).await? else {
999 return Ok(None);
1000 };
1001
1002 trace!("🉑 Found a record for {}", row_key);
1003
1004 let connected_at_cell = match row.take_required_cell("connected_at") {
1005 Ok(cell) => cell,
1006 Err(_) => {
1007 if !is_incomplete_router_record(&row.cells) {
1008 return Err(DbError::Integrity(
1009 "Expected column: connected_at".to_owned(),
1010 Some(format!("{row:#?}")),
1011 ));
1012 }
1013 trace!("🉑 Dropping an incomplete user record for {}", row_key);
1018 self.metrics
1019 .incr_with_tags(MetricName::DatabaseDropUser)
1020 .with_tag("reason", "incomplete_record")
1021 .send();
1022 self.remove_user(uaid).await?;
1023 return Ok(None);
1024 }
1025 };
1026
1027 let mut result = User {
1028 uaid: *uaid,
1029 connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1030 router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1031 record_version: Some(to_u64(
1032 row.take_required_cell("record_version")?.value,
1033 "record_version",
1034 )?),
1035 version: Some(
1036 row.take_required_cell("version")?
1037 .value
1038 .try_into()
1039 .map_err(|e| {
1040 DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1041 })?,
1042 ),
1043 ..Default::default()
1044 };
1045
1046 if let Some(cell) = row.take_cell("router_data") {
1047 result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1048 DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1049 })?;
1050 }
1051
1052 if let Some(cell) = row.take_cell("node_id") {
1053 result.node_id = Some(to_string(cell.value, "node_id")?);
1054 }
1055
1056 if let Some(cell) = row.take_cell("current_timestamp") {
1057 result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1058 }
1059
1060 result.priv_channels = channels_from_cells(&row.cells)?;
1062
1063 Ok(Some(result))
1064 }
1065
1066 async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1067 let row_key = uaid.simple().to_string();
1068 self.delete_row(&row_key).await?;
1069 Ok(())
1070 }
1071
1072 async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1073 let channels = HashSet::from_iter([channel_id.to_owned()]);
1074 self.add_channels(uaid, channels).await
1075 }
1076
1077 async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1080 let row_key = uaid.simple().to_string();
1094 let mut row = Row::new(row_key);
1095 let expiry = std::time::SystemTime::now() + self.router_ttl();
1096
1097 row.add_cells(
1101 ROUTER_FAMILY,
1102 channels_to_cells(Cow::Owned(channels), expiry),
1103 );
1104
1105 self.write_row(row).await?;
1106 Ok(())
1107 }
1108
1109 async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1110 let row_key = uaid.simple().to_string();
1111 let mut req = self.read_row_request(&row_key);
1112
1113 let mut cq_filter = data::RowFilter::default();
1114 cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());
1115 req.set_filter(filter_chain(vec![
1116 router_gc_policy_filter(),
1117 family_filter(format!("^{ROUTER_FAMILY}$")),
1118 cq_filter,
1119 ]));
1120
1121 let Some(row) = self.read_row(req).await? else {
1122 return Ok(Default::default());
1123 };
1124 channels_from_cells(&row.cells)
1125 }
1126
1127 async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1129 let row_key = uaid.simple().to_string();
1130 let mut req = self.check_and_mutate_row_request(&row_key);
1131
1132 let column = format!("chid:{}", channel_id.as_hyphenated());
1134 let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1135
1136 let mut row = Row::new(row_key);
1138 let expiry = std::time::SystemTime::now() + self.router_ttl();
1139 row.cells
1140 .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1141 mutations.extend(self.get_mutations(row.cells)?);
1142
1143 let mut cq_filter = data::RowFilter::default();
1145 cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
1146 req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1147 req.set_true_mutations(mutations);
1148
1149 Ok(self.check_and_mutate(req).await?)
1150 }
1151
1152 async fn remove_node_id(
1154 &self,
1155 uaid: &Uuid,
1156 _node_id: &str,
1157 _connected_at: u64,
1158 version: &Option<Uuid>,
1159 ) -> DbResult<bool> {
1160 let row_key = uaid.simple().to_string();
1161 trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1162 let Some(version) = version else {
1163 return Err(DbError::General("Expected a user version field".to_owned()));
1164 };
1165
1166 let mut req = self.check_and_mutate_row_request(&row_key);
1167
1168 let mut filters = vec![router_gc_policy_filter()];
1169 filters.extend(version_filter(version));
1170 req.set_predicate_filter(filter_chain(filters));
1171 req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);
1172
1173 Ok(self.check_and_mutate(req).await?)
1174 }
1175
1176 async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1178 let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1179 debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1180 trace!(
1181 "🉑 timestamp: {:?}",
1182 &message.timestamp.to_be_bytes().to_vec()
1183 );
1184 let mut row = Row::new(row_key);
1185
1186 let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1190 trace!(
1191 "🉑 Message Expiry {}",
1192 expiry
1193 .duration_since(SystemTime::UNIX_EPOCH)
1194 .unwrap_or_default()
1195 .as_millis()
1196 );
1197
1198 let mut cells: Vec<cell::Cell> = Vec::new();
1199
1200 let is_topic = message.topic.is_some();
1201 let family = if is_topic {
1202 MESSAGE_TOPIC_FAMILY
1203 } else {
1204 MESSAGE_FAMILY
1205 };
1206 cells.extend(vec![
1207 cell::Cell {
1208 qualifier: "ttl".to_owned(),
1209 value: message.ttl.to_be_bytes().to_vec(),
1210 timestamp: expiry,
1211 ..Default::default()
1212 },
1213 cell::Cell {
1214 qualifier: "timestamp".to_owned(),
1215 value: message.timestamp.to_be_bytes().to_vec(),
1216 timestamp: expiry,
1217 ..Default::default()
1218 },
1219 cell::Cell {
1220 qualifier: "version".to_owned(),
1221 value: message.version.into_bytes(),
1222 timestamp: expiry,
1223 ..Default::default()
1224 },
1225 ]);
1226 if let Some(headers) = message.headers {
1227 if !headers.is_empty() {
1228 cells.push(cell::Cell {
1229 qualifier: "headers".to_owned(),
1230 value: json!(headers).to_string().into_bytes(),
1231 timestamp: expiry,
1232 ..Default::default()
1233 });
1234 }
1235 }
1236 #[cfg(feature = "reliable_report")]
1237 {
1238 if let Some(reliability_id) = message.reliability_id {
1239 trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1240 cells.push(cell::Cell {
1241 qualifier: "reliability_id".to_owned(),
1242 value: reliability_id.into_bytes(),
1243 timestamp: expiry,
1244 ..Default::default()
1245 });
1246 }
1247 if let Some(reliable_state) = message.reliable_state {
1248 cells.push(cell::Cell {
1249 qualifier: "reliable_state".to_owned(),
1250 value: reliable_state.to_string().into_bytes(),
1251 timestamp: expiry,
1252 ..Default::default()
1253 });
1254 }
1255 }
1256 if let Some(data) = message.data {
1257 cells.push(cell::Cell {
1258 qualifier: "data".to_owned(),
1259 value: data.into_bytes(),
1260 timestamp: expiry,
1261 ..Default::default()
1262 });
1263 }
1264
1265 row.add_cells(family, cells);
1266 trace!("🉑 Adding row");
1267 self.write_row(row).await?;
1268
1269 self.metrics
1270 .incr_with_tags(MetricName::NotificationMessageStored)
1271 .with_tag("topic", &is_topic.to_string())
1272 .with_tag("database", &self.name())
1273 .send();
1274 Ok(())
1275 }
1276
1277 async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1282 for message in messages {
1284 self.save_message(uaid, message).await?;
1285 }
1286 Ok(())
1287 }
1288
1289 async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1303 let row_key = uaid.simple().to_string();
1304 debug!(
1305 "🉑 Updating {} current_timestamp: {:?}",
1306 &row_key,
1307 timestamp.to_be_bytes().to_vec()
1308 );
1309 let expiry = std::time::SystemTime::now() + self.router_ttl();
1310 let mut row = Row::new(row_key.clone());
1311
1312 row.cells.insert(
1313 ROUTER_FAMILY.to_owned(),
1314 vec![
1315 cell::Cell {
1316 qualifier: "current_timestamp".to_owned(),
1317 value: timestamp.to_be_bytes().to_vec(),
1318 timestamp: expiry,
1319 ..Default::default()
1320 },
1321 new_version_cell(expiry),
1322 ],
1323 );
1324
1325 self.write_row(row).await?;
1326
1327 Ok(())
1328 }
1329
1330 async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1332 trace!(
1333 "🉑 attemping to delete {:?} :: {:?}",
1334 uaid.to_string(),
1335 chidmessageid
1336 );
1337 let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1338 debug!("🉑🔥 Deleting message {}", &row_key);
1339 self.delete_row(&row_key).await?;
1340 self.metrics
1341 .incr_with_tags(MetricName::NotificationMessageDeleted)
1342 .with_tag("database", &self.name())
1343 .send();
1344 Ok(())
1345 }
1346
1347 async fn fetch_topic_messages(
1349 &self,
1350 uaid: &Uuid,
1351 limit: usize,
1352 ) -> DbResult<FetchMessageResponse> {
1353 let mut req = ReadRowsRequest::default();
1354 req.set_table_name(self.settings.table_name.clone());
1355 req.set_app_profile_id(self.settings.app_profile_id.clone());
1356
1357 let start_key = format!("{}#01:", uaid.simple());
1358 let end_key = format!("{}#02:", uaid.simple());
1359 let mut rows = data::RowSet::default();
1360 let mut row_range = data::RowRange::default();
1361 row_range.set_start_key_open(start_key.into_bytes());
1362 row_range.set_end_key_open(end_key.into_bytes());
1363 let mut row_ranges = RepeatedField::default();
1364 row_ranges.push(row_range);
1365 rows.set_row_ranges(row_ranges);
1366 req.set_rows(rows);
1367
1368 let mut filters = message_gc_policy_filter()?;
1369 filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1370
1371 req.set_filter(filter_chain(filters));
1372 if limit > 0 {
1373 trace!("🉑 Setting limit to {limit}");
1374 req.set_rows_limit(limit as i64);
1375 }
1376 let rows = self.read_rows(req).await?;
1377 debug!(
1378 "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1379 rows.len(),
1380 limit
1381 );
1382
1383 let messages = self.rows_to_notifications(rows)?;
1384
1385 Ok(FetchMessageResponse {
1389 messages,
1390 timestamp: None,
1391 })
1392 }
1393
1394 async fn fetch_timestamp_messages(
1397 &self,
1398 uaid: &Uuid,
1399 timestamp: Option<u64>,
1400 limit: usize,
1401 ) -> DbResult<FetchMessageResponse> {
1402 let mut req = ReadRowsRequest::default();
1403 req.set_table_name(self.settings.table_name.clone());
1404 req.set_app_profile_id(self.settings.app_profile_id.clone());
1405
1406 let mut rows = data::RowSet::default();
1407 let mut row_range = data::RowRange::default();
1408
1409 let start_key = if let Some(ts) = timestamp {
1410 format!("{}#02:{}z", uaid.simple(), ts)
1413 } else {
1414 format!("{}#02:", uaid.simple())
1415 };
1416 let end_key = format!("{}#03:", uaid.simple());
1417 row_range.set_start_key_open(start_key.into_bytes());
1418 row_range.set_end_key_open(end_key.into_bytes());
1419
1420 let mut row_ranges = RepeatedField::default();
1421 row_ranges.push(row_range);
1422 rows.set_row_ranges(row_ranges);
1423 req.set_rows(rows);
1424
1425 let mut filters = message_gc_policy_filter()?;
1439 filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1440
1441 req.set_filter(filter_chain(filters));
1442 if limit > 0 {
1443 req.set_rows_limit(limit as i64);
1444 }
1445 let rows = self.read_rows(req).await?;
1446 debug!(
1447 "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1448 timestamp,
1449 rows.len(),
1450 limit,
1451 );
1452
1453 let messages = self.rows_to_notifications(rows)?;
1454 let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1456 Ok(FetchMessageResponse {
1457 messages,
1458 timestamp,
1459 })
1460 }
1461
1462 async fn health_check(&self) -> DbResult<bool> {
1463 Ok(self
1464 .pool
1465 .get()
1466 .await?
1467 .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1468 .await?)
1469 }
1470
1471 async fn router_table_exists(&self) -> DbResult<bool> {
1474 Ok(true)
1475 }
1476
1477 async fn message_table_exists(&self) -> DbResult<bool> {
1480 Ok(true)
1481 }
1482
1483 #[cfg(feature = "reliable_report")]
1484 async fn log_report(
1485 &self,
1486 reliability_id: &str,
1487 new_state: crate::reliability::ReliabilityState,
1488 ) -> DbResult<()> {
1489 let row_key = reliability_id.to_owned();
1490
1491 let mut row = Row::new(row_key);
1492 let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1493
1494 let cells: Vec<cell::Cell> = vec![cell::Cell {
1496 qualifier: new_state.to_string(),
1497 value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1498 timestamp: expiry,
1499 ..Default::default()
1500 }];
1501
1502 row.add_cells(RELIABLE_LOG_FAMILY, cells);
1503
1504 self.write_row(row).await?;
1505
1506 Ok(())
1507 }
1508
1509 fn box_clone(&self) -> Box<dyn DbClient> {
1510 Box::new(self.clone())
1511 }
1512
1513 fn name(&self) -> String {
1514 "Bigtable".to_owned()
1515 }
1516
1517 fn pool_status(&self) -> Option<deadpool::Status> {
1518 Some(self.pool.pool.status())
1519 }
1520}
1521
1522#[cfg(all(test, feature = "emulator"))]
1523mod tests {
1524
1525 use std::sync::Arc;
1530 use std::time::SystemTime;
1531
1532 use cadence::StatsdClient;
1533 use uuid;
1534
1535 use super::*;
1536 use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1537
1538 const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1539 const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1540 const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1541
1542 fn now() -> u64 {
1543 SystemTime::now()
1544 .duration_since(SystemTime::UNIX_EPOCH)
1545 .unwrap()
1546 .as_secs()
1547 }
1548
1549 fn new_client() -> DbResult<BigTableClientImpl> {
1550 let env_dsn = format!(
1551 "grpc://{}",
1552 std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1553 );
1554 let settings = DbSettings {
1555 dsn: Some(env_dsn),
1561 db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1562 .to_string(),
1563 };
1564
1565 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1566
1567 BigTableClientImpl::new(metrics, &settings)
1568 }
1569
1570 #[test]
1571 fn escape_bytes_for_regex() {
1572 let b = b"hi";
1573 assert_eq!(escape_bytes(b), b.to_vec());
1574 assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1575 let b = b"\xe2\x80\xb3";
1576 assert_eq!(escape_bytes(b), b.to_vec());
1577 let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1579 assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1580 let b = b"\xc0";
1581 assert_eq!(escape_bytes(b), b.to_vec());
1582 assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1583 }
1584
1585 #[actix_rt::test]
1586 async fn health_check() {
1587 let client = new_client().unwrap();
1588
1589 let result = client.health_check().await;
1590 assert!(result.is_ok());
1591 assert!(result.unwrap());
1592 }
1593
1594 #[actix_rt::test]
1597 async fn run_gauntlet() -> DbResult<()> {
1598 let client = new_client()?;
1599
1600 let connected_at = ms_since_epoch();
1601
1602 let uaid = Uuid::parse_str(TEST_USER).unwrap();
1603 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1604 let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1605
1606 let node_id = "test_node".to_owned();
1607
1608 let _ = client.remove_user(&uaid).await;
1610
1611 let test_user = User {
1612 uaid,
1613 router_type: "webpush".to_owned(),
1614 connected_at,
1615 router_data: None,
1616 node_id: Some(node_id.clone()),
1617 ..Default::default()
1618 };
1619
1620 let _ = client.remove_user(&uaid).await;
1623
1624 client.add_user(&test_user).await?;
1626 let fetched = client.get_user(&uaid).await?;
1627 assert!(fetched.is_some());
1628 let fetched = fetched.unwrap();
1629 assert_eq!(fetched.router_type, "webpush".to_owned());
1630
1631 let connected_at = ms_since_epoch();
1633
1634 client.add_channel(&uaid, &chid).await?;
1636 let channels = client.get_channels(&uaid).await?;
1637 assert!(channels.contains(&chid));
1638
1639 let mut new_channels: HashSet<Uuid> = HashSet::new();
1641 new_channels.insert(chid);
1642 for _ in 1..10 {
1643 new_channels.insert(uuid::Uuid::new_v4());
1644 }
1645 let chid_to_remove = uuid::Uuid::new_v4();
1646 new_channels.insert(chid_to_remove);
1647 client.add_channels(&uaid, new_channels.clone()).await?;
1648 let channels = client.get_channels(&uaid).await?;
1649 assert_eq!(channels, new_channels);
1650
1651 assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1653 assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1654 new_channels.remove(&chid_to_remove);
1655 let channels = client.get_channels(&uaid).await?;
1656 assert_eq!(channels, new_channels);
1657
1658 let mut updated = User {
1662 connected_at,
1663 ..test_user.clone()
1664 };
1665 let result = client.update_user(&mut updated).await;
1666 assert!(result.is_ok());
1667 assert!(!result.unwrap());
1668
1669 let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1671 assert_eq!(fetched.connected_at, fetched2.connected_at);
1672
1673 let mut updated = User {
1675 connected_at: fetched.connected_at + 300,
1676 ..fetched2
1677 };
1678 let result = client.update_user(&mut updated).await;
1679 assert!(result.is_ok());
1680 assert!(result.unwrap());
1681 assert_ne!(
1682 fetched2.connected_at,
1683 client.get_user(&uaid).await?.unwrap().connected_at
1684 );
1685
1686 client
1688 .increment_storage(
1689 &fetched.uaid,
1690 SystemTime::now()
1691 .duration_since(SystemTime::UNIX_EPOCH)
1692 .unwrap()
1693 .as_secs(),
1694 )
1695 .await?;
1696
1697 let test_data = "An_encrypted_pile_of_crap".to_owned();
1698 let timestamp = now();
1699 let sort_key = now();
1700 let test_notification = crate::db::Notification {
1702 channel_id: chid,
1703 version: "test".to_owned(),
1704 ttl: 300,
1705 timestamp,
1706 data: Some(test_data.clone()),
1707 sortkey_timestamp: Some(sort_key),
1708 ..Default::default()
1709 };
1710 let res = client.save_message(&uaid, test_notification.clone()).await;
1711 assert!(res.is_ok());
1712
1713 let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1714 assert_ne!(fetched.messages.len(), 0);
1715 let fm = fetched.messages.pop().unwrap();
1716 assert_eq!(fm.channel_id, test_notification.channel_id);
1717 assert_eq!(fm.data, Some(test_data));
1718
1719 let fetched = client
1721 .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1722 .await?;
1723 assert_ne!(fetched.messages.len(), 0);
1724
1725 let fetched = client
1727 .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1728 .await?;
1729 assert_eq!(fetched.messages.len(), 0);
1730
1731 assert!(client
1733 .remove_message(&uaid, &test_notification.chidmessageid())
1734 .await
1735 .is_ok());
1736
1737 assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1738
1739 client.add_channel(&uaid, &topic_chid).await?;
1741 let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1742 let timestamp = now();
1743 let sort_key = now();
1744 let test_notification = crate::db::Notification {
1746 channel_id: topic_chid,
1747 version: "test".to_owned(),
1748 ttl: 300,
1749 topic: Some("topic".to_owned()),
1750 timestamp,
1751 data: Some(test_data.clone()),
1752 sortkey_timestamp: Some(sort_key),
1753 ..Default::default()
1754 };
1755 assert!(client
1756 .save_message(&uaid, test_notification.clone())
1757 .await
1758 .is_ok());
1759
1760 let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1761 assert_ne!(fetched.messages.len(), 0);
1762 let fm = fetched.messages.pop().unwrap();
1763 assert_eq!(fm.channel_id, test_notification.channel_id);
1764 assert_eq!(fm.data, Some(test_data));
1765
1766 let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1768 assert_ne!(fetched.messages.len(), 0);
1769
1770 assert!(client
1772 .remove_message(&uaid, &test_notification.chidmessageid())
1773 .await
1774 .is_ok());
1775
1776 assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1777
1778 let msgs = client
1779 .fetch_timestamp_messages(&uaid, None, 999)
1780 .await?
1781 .messages;
1782 assert!(msgs.is_empty());
1783
1784 let fetched = client.get_user(&uaid).await?.unwrap();
1785 assert!(client
1786 .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1787 .await
1788 .is_ok());
1789 let fetched = client.get_user(&uaid).await?.unwrap();
1791 assert_eq!(fetched.node_id, None);
1792
1793 assert!(client.remove_user(&uaid).await.is_ok());
1794
1795 assert!(client.get_user(&uaid).await?.is_none());
1796
1797 Ok(())
1798 }
1799
1800 #[actix_rt::test]
1801 async fn read_cells_family_id() -> DbResult<()> {
1802 let client = new_client().unwrap();
1803 let uaid = gen_test_uaid();
1804 client.remove_user(&uaid).await.unwrap();
1805
1806 let qualifier = "foo".to_owned();
1807
1808 let row_key = uaid.simple().to_string();
1809 let mut row = Row::new(row_key.clone());
1810 row.cells.insert(
1811 ROUTER_FAMILY.to_owned(),
1812 vec![cell::Cell {
1813 qualifier: qualifier.to_owned(),
1814 value: "bar".as_bytes().to_vec(),
1815 ..Default::default()
1816 }],
1817 );
1818 client.write_row(row).await.unwrap();
1819 let req = client.read_row_request(&row_key);
1820 let Some(row) = client.read_row(req).await.unwrap() else {
1821 panic!("Expected row");
1822 };
1823 assert_eq!(row.cells.len(), 1);
1824 assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1825 client.remove_user(&uaid).await
1826 }
1827
1828 #[actix_rt::test]
1829 async fn add_user_existing() {
1830 let client = new_client().unwrap();
1831 let uaid = gen_test_uaid();
1832 let user = User {
1833 uaid,
1834 ..Default::default()
1835 };
1836 client.remove_user(&uaid).await.unwrap();
1837
1838 client.add_user(&user).await.unwrap();
1839 let err = client.add_user(&user).await.unwrap_err();
1840 assert!(matches!(err, DbError::Conditional));
1841 }
1842
1843 #[actix_rt::test]
1844 async fn version_check() {
1845 let client = new_client().unwrap();
1846 let uaid = gen_test_uaid();
1847 let user = User {
1848 uaid,
1849 ..Default::default()
1850 };
1851 client.remove_user(&uaid).await.unwrap();
1852
1853 client.add_user(&user).await.unwrap();
1854 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1855 assert!(client.update_user(&mut user.clone()).await.unwrap());
1856
1857 let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1858 assert_ne!(user.version, fetched.version);
1859 assert!(!client.update_user(&mut user).await.unwrap());
1861
1862 client.remove_user(&uaid).await.unwrap();
1863 }
1864
1865 #[actix_rt::test]
1866 async fn lingering_chid_record() {
1867 let client = new_client().unwrap();
1868 let uaid = gen_test_uaid();
1869 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1870 let user = User {
1871 uaid,
1872 ..Default::default()
1873 };
1874 client.remove_user(&uaid).await.unwrap();
1875
1876 client.add_channel(&uaid, &chid).await.unwrap();
1878
1879 assert!(client.get_user(&uaid).await.unwrap().is_none());
1882
1883 client.add_user(&user).await.unwrap();
1884 assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1886
1887 client.remove_user(&uaid).await.unwrap();
1888 }
1889
1890 #[actix_rt::test]
1891 async fn lingering_current_timestamp() {
1892 let client = new_client().unwrap();
1893 let uaid = gen_test_uaid();
1894 client.remove_user(&uaid).await.unwrap();
1895
1896 client
1897 .increment_storage(&uaid, crate::util::sec_since_epoch())
1898 .await
1899 .unwrap();
1900 assert!(client.get_user(&uaid).await.unwrap().is_none());
1901
1902 client.remove_user(&uaid).await.unwrap();
1903 }
1904
1905 #[actix_rt::test]
1906 async fn lingering_chid_w_version_record() {
1907 let client = new_client().unwrap();
1908 let uaid = gen_test_uaid();
1909 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1910 client.remove_user(&uaid).await.unwrap();
1911
1912 client.add_channel(&uaid, &chid).await.unwrap();
1913 assert!(client.remove_channel(&uaid, &chid).await.unwrap());
1914 assert!(client.get_user(&uaid).await.unwrap().is_none());
1915
1916 client.remove_user(&uaid).await.unwrap();
1917 }
1918
1919 #[actix_rt::test]
1920 async fn channel_and_current_timestamp_ttl_updates() {
1921 let client = new_client().unwrap();
1922 let uaid = gen_test_uaid();
1923 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1924 client.remove_user(&uaid).await.unwrap();
1925
1926 let user = User {
1928 uaid,
1929 ..Default::default()
1930 };
1931 client.add_user(&user).await.unwrap();
1932
1933 client.add_channel(&uaid, &chid).await.unwrap();
1934 client
1935 .add_channel(&uaid, &uuid::Uuid::new_v4())
1936 .await
1937 .unwrap();
1938
1939 client
1940 .increment_storage(
1941 &uaid,
1942 SystemTime::now()
1943 .duration_since(SystemTime::UNIX_EPOCH)
1944 .unwrap()
1945 .as_secs(),
1946 )
1947 .await
1948 .unwrap();
1949
1950 let req = client.read_row_request(&uaid.as_simple().to_string());
1951 let Some(mut row) = client.read_row(req).await.unwrap() else {
1952 panic!("Expected row");
1953 };
1954
1955 let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
1958 for mut cells in row.cells.into_values() {
1959 let Some(cell) = cells.pop() else {
1960 continue;
1961 };
1962 assert!(
1963 cell.timestamp >= ca_expiry,
1964 "{} cell timestamp should >= connected_at's",
1965 cell.qualifier
1966 );
1967 }
1968
1969 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1970
1971 tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
1973 client.update_user(&mut user).await.unwrap();
1974
1975 let req = client.read_row_request(&uaid.as_simple().to_string());
1977 let Some(mut row) = client.read_row(req).await.unwrap() else {
1978 panic!("Expected row");
1979 };
1980
1981 let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
1982
1983 assert!(ca_expiry2 > ca_expiry);
1984
1985 for mut cells in row.cells.into_values() {
1986 let Some(cell) = cells.pop() else {
1987 continue;
1988 };
1989 assert!(
1990 cell.timestamp >= ca_expiry2,
1991 "{} cell timestamp expiry should exceed connected_at's",
1992 cell.qualifier
1993 );
1994 }
1995
1996 client.remove_user(&uaid).await.unwrap();
1997 }
1998}