1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt;
4use std::fmt::Display;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8
9use again::RetryPolicy;
10use async_trait::async_trait;
11use cadence::StatsdClient;
12use chrono::TimeDelta;
13use futures_util::StreamExt;
14use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
15use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
16use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
17use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
18use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
19use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
20use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
21use protobuf::RepeatedField;
22use serde_json::{from_str, json};
23use uuid::Uuid;
24
25use crate::db::RangeKey;
26use crate::db::{
27 client::{DbClient, FetchMessageResponse},
28 error::{DbError, DbResult},
29 DbSettings, Notification, User, MAX_ROUTER_TTL, USER_RECORD_VERSION,
30};
31use crate::metric_name::MetricName;
32use crate::metrics::StatsdClientExt;
33
34pub use self::metadata::MetadataBuilder;
35use self::row::{Row, RowCells};
36use super::pool::BigTablePool;
37use super::BigTableDbSettings;
38
39pub mod cell;
40pub mod error;
41pub(crate) mod merge;
42pub mod metadata;
43pub mod row;
44
45pub type RowKey = String;
47
48pub type Qualifier = String;
53pub type FamilyId = String;
54
55const ROUTER_FAMILY: &str = "router";
56const MESSAGE_FAMILY: &str = "message"; const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
58#[cfg(feature = "reliable_report")]
59const RELIABLE_LOG_FAMILY: &str = "reliability";
60#[cfg(feature = "reliable_report")]
61pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);
64
65pub(crate) const RETRY_COUNT: usize = 5;
66
67struct Uaid(Uuid);
70
71impl Display for Uaid {
72 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
73 write!(f, "{}", self.0.as_simple())
74 }
75}
76
77impl From<Uaid> for String {
78 fn from(uaid: Uaid) -> String {
79 uaid.0.as_simple().to_string()
80 }
81}
82
83#[derive(Clone)]
84pub struct BigTableClientImpl {
86 pub(crate) settings: BigTableDbSettings,
87 metrics: Arc<StatsdClient>,
89 pool: BigTablePool,
91 metadata: Metadata,
92 admin_metadata: Metadata,
93}
94
95fn router_gc_policy_filter() -> data::RowFilter {
97 let mut latest_cell_filter = data::RowFilter::default();
98 latest_cell_filter.set_cells_per_column_limit_filter(1);
99 latest_cell_filter
100}
101
102fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
105 let mut timestamp_filter = data::RowFilter::default();
106 let bt_now: i64 = SystemTime::now()
107 .duration_since(SystemTime::UNIX_EPOCH)
108 .map_err(error::BigTableError::WriteTime)?
109 .as_millis() as i64;
110 let mut range_filter = data::TimestampRange::default();
111 range_filter.set_start_timestamp_micros(bt_now * 1000);
112 timestamp_filter.set_timestamp_range_filter(range_filter);
113
114 Ok(vec![router_gc_policy_filter(), timestamp_filter])
115}
116
117fn family_filter(regex: String) -> data::RowFilter {
119 let mut filter = data::RowFilter::default();
120 filter.set_family_name_regex_filter(regex);
121 filter
122}
123
124fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
128 let mut vec = Vec::with_capacity(bytes.len() * 2);
129 for &b in bytes {
130 if !b.is_ascii_alphanumeric() && b != b'_' && (b & 128) == 0 {
131 if b == b'\0' {
132 vec.extend("\\x00".as_bytes());
137 continue;
138 }
139 vec.push(b'\\');
140 }
141 vec.push(b);
142 }
143 vec
144}
145
146fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
149 let mut cq_filter = data::RowFilter::default();
150 cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());
151
152 let mut value_filter = data::RowFilter::default();
153 value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));
154
155 vec![
156 family_filter(format!("^{ROUTER_FAMILY}$")),
157 cq_filter,
158 value_filter,
159 ]
160}
161
162fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
164 cell::Cell {
165 qualifier: "version".to_owned(),
166 value: Uuid::new_v4().into(),
167 timestamp,
168 ..Default::default()
169 }
170}
171
172fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
174 let mut chain = RowFilter_Chain::default();
175 chain.set_filters(filters.into());
176 let mut filter = RowFilter::default();
177 filter.set_chain(chain);
178 filter
179}
180
181fn read_row_request(
183 table_name: &str,
184 app_profile_id: &str,
185 row_key: &str,
186) -> bigtable::ReadRowsRequest {
187 let mut req = bigtable::ReadRowsRequest::default();
188 req.set_table_name(table_name.to_owned());
189 req.set_app_profile_id(app_profile_id.to_owned());
190
191 let mut row_keys = RepeatedField::default();
192 row_keys.push(row_key.as_bytes().to_vec());
193 let mut row_set = data::RowSet::default();
194 row_set.set_row_keys(row_keys);
195 req.set_rows(row_set);
196
197 req
198}
199
200fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
201 let v: [u8; 8] = value
202 .try_into()
203 .map_err(|_| DbError::DeserializeU64(name.to_owned()))?;
204 Ok(u64::from_be_bytes(v))
205}
206
207fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
208 String::from_utf8(value).map_err(|e| {
209 debug!("🉑 cannot read string {}: {:?}", name, e);
210 DbError::DeserializeString(name.to_owned())
211 })
212}
213
214fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
218 let mut result = HashSet::new();
219 for cells in cells.values() {
220 let Some(cell) = cells.last() else {
221 continue;
222 };
223 let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
224 return Err(DbError::Integrity(
225 "get_channels expected: chid:<chid>".to_owned(),
226 None,
227 ));
228 };
229 result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
230 }
231 Ok(result)
232}
233
234fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
236 let channels = channels.into_owned();
237 let mut cells = Vec::with_capacity(channels.len().min(100_000));
238 for (i, channel_id) in channels.into_iter().enumerate() {
239 if i >= 100_000 {
243 break;
244 }
245 cells.push(cell::Cell {
246 qualifier: format!("chid:{}", channel_id.as_hyphenated()),
247 timestamp: expiry,
248 ..Default::default()
249 });
250 }
251 cells
252}
253
254pub fn retry_policy(max: usize) -> RetryPolicy {
255 RetryPolicy::default()
256 .with_max_retries(max)
257 .with_jitter(true)
258}
259
260fn retryable_internal_err(status: &RpcStatus) -> bool {
261 match status.code() {
262 RpcStatusCode::UNKNOWN => status
263 .message()
264 .eq_ignore_ascii_case("error occurred when fetching oauth2 token."),
265 RpcStatusCode::INTERNAL => [
266 "rst_stream",
267 "rst stream",
268 "received unexpected eos on data frame from server",
269 ]
270 .contains(&status.message().to_lowercase().as_str()),
271 RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
272 _ => false,
273 }
274}
275
276pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
277 let mut metric = metrics
278 .incr_with_tags(MetricName::DatabaseRetry)
279 .with_tag("error", err_type)
280 .with_tag("type", "bigtable");
281 if let Some(code) = code {
282 metric = metric.with_tag("code", code);
283 }
284 metric.send();
285}
286
287pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool + '_ {
288 move |err| {
289 debug!("🉑 Checking grpcio::Error...{err}");
290 match err {
291 grpcio::Error::RpcFailure(status) => {
292 info!("GRPC Failure :{:?}", status);
293 let retry = retryable_internal_err(status);
294 if retry {
295 metric(metrics, "RpcFailure", Some(&status.code().to_string()));
296 }
297 retry
298 }
299 grpcio::Error::BindFail(_) => {
300 metric(metrics, "BindFail", None);
301 true
302 }
303 grpcio::Error::CallFailure(grpc_call_status) => {
306 let retry = grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR;
307 if retry {
308 metric(
309 metrics,
310 "CallFailure",
311 Some(&format!("{grpc_call_status:?}")),
312 );
313 }
314 retry
315 }
316 _ => false,
317 }
318 }
319}
320
321pub fn retryable_bt_err(
322 metrics: &Arc<StatsdClient>,
323) -> impl Fn(&error::BigTableError) -> bool + '_ {
324 move |err| {
325 debug!("🉑 Checking BigTableError...{err}");
326 match err {
327 error::BigTableError::InvalidRowResponse(e)
328 | error::BigTableError::Read(e)
329 | error::BigTableError::Write(e)
330 | error::BigTableError::GRPC(e) => retryable_grpcio_err(metrics)(e),
331 _ => false,
332 }
333 }
334}
335
336fn is_incomplete_router_record(cells: &RowCells) -> bool {
350 cells
351 .keys()
352 .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
353}
354
355fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
356 ::grpcio::CallOption::default().headers(metadata)
357}
358
359impl BigTableClientImpl {
384 pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
385 debug!("🏊 BT Pool new");
387 let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?;
388 info!("🉑 {:#?}", db_settings);
389 let pool = BigTablePool::new(settings, &metrics)?;
390
391 let metadata = db_settings.metadata()?;
393 let admin_metadata = db_settings.admin_metadata()?;
394 Ok(Self {
395 settings: db_settings,
396 metrics,
397 metadata,
398 admin_metadata,
399 pool,
400 })
401 }
402
403 pub fn spawn_sweeper(&self, interval: Duration) {
405 self.pool.spawn_sweeper(interval);
406 }
407
408 fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
410 read_row_request(
411 &self.settings.table_name,
412 &self.settings.app_profile_id,
413 row_key,
414 )
415 }
416
417 fn mutate_row_request(&self, row_key: &str) -> bigtable::MutateRowRequest {
419 let mut req = bigtable::MutateRowRequest::default();
420 req.set_table_name(self.settings.table_name.clone());
421 req.set_app_profile_id(self.settings.app_profile_id.clone());
422 req.set_row_key(row_key.as_bytes().to_vec());
423 req
424 }
425
426 fn check_and_mutate_row_request(&self, row_key: &str) -> bigtable::CheckAndMutateRowRequest {
428 let mut req = bigtable::CheckAndMutateRowRequest::default();
429 req.set_table_name(self.settings.table_name.clone());
430 req.set_app_profile_id(self.settings.app_profile_id.clone());
431 req.set_row_key(row_key.as_bytes().to_vec());
432 req
433 }
434
435 async fn mutate_row(
437 &self,
438 req: bigtable::MutateRowRequest,
439 ) -> Result<(), error::BigTableError> {
440 let bigtable = self.pool.get().await?;
441 retry_policy(self.settings.retry_count)
442 .retry_if(
443 || async {
444 bigtable
445 .conn
446 .mutate_row_opt(&req, call_opts(self.metadata.clone()))
447 },
448 retryable_grpcio_err(&self.metrics),
449 )
450 .await
451 .map_err(error::BigTableError::Write)?;
452 Ok(())
453 }
454
455 #[allow(unused)]
457 async fn mutate_rows(
458 &self,
459 req: bigtable::MutateRowsRequest,
460 ) -> Result<(), error::BigTableError> {
461 let bigtable = self.pool.get().await?;
462 let resp = retry_policy(self.settings.retry_count)
464 .retry_if(
465 || async {
466 bigtable
467 .conn
468 .mutate_rows_opt(&req, call_opts(self.metadata.clone()))
469 },
470 retryable_grpcio_err(&self.metrics),
471 )
472 .await
473 .map_err(error::BigTableError::Write)?;
474
475 let mut stream = Box::pin(resp);
482 let mut cnt = 0;
483 loop {
484 let (result, remainder) = stream.into_future().await;
485 if let Some(result) = result {
486 debug!("🎏 Result block: {}", cnt);
487 match result {
488 Ok(r) => {
489 for e in r.get_entries() {
490 if e.has_status() {
491 let status = e.get_status();
492 let code = error::MutateRowStatus::from(status.get_code());
494 if !code.is_ok() {
495 return Err(error::BigTableError::Status(
496 code,
497 status.get_message().to_owned(),
498 ));
499 }
500 debug!("🎏 Response: {} OK", e.index);
501 }
502 }
503 }
504 Err(e) => return Err(error::BigTableError::Write(e)),
505 };
506 cnt += 1;
507 } else {
508 debug!("🎏 Done!");
509 break;
510 }
511 stream = remainder;
512 }
513
514 Ok(())
515 }
516
517 async fn read_row(
519 &self,
520 req: bigtable::ReadRowsRequest,
521 ) -> Result<Option<row::Row>, error::BigTableError> {
522 let mut rows = self.read_rows(req).await?;
523 Ok(rows.pop_first().map(|(_, v)| v))
524 }
525
526 async fn read_rows(
530 &self,
531 req: ReadRowsRequest,
532 ) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
533 let bigtable = self.pool.get().await?;
534 let resp = retry_policy(self.settings.retry_count)
535 .retry_if(
536 || async {
537 let resp: grpcio::ClientSStreamReceiver<bigtable::ReadRowsResponse> = bigtable
538 .conn
539 .read_rows_opt(&req, call_opts(self.metadata.clone()))
540 .map_err(error::BigTableError::Read)?;
541 merge::RowMerger::process_chunks(resp).await
542 },
543 retryable_bt_err(&self.metrics),
544 )
545 .await?;
546 Ok(resp)
547 }
548
549 async fn write_row(&self, row: row::Row) -> Result<(), error::BigTableError> {
553 let mut req = self.mutate_row_request(&row.row_key);
554 let mutations = self.get_mutations(row.cells)?;
559 req.set_mutations(mutations);
560 self.mutate_row(req).await?;
561 Ok(())
562 }
563
564 fn get_mutations(
566 &self,
567 cells: HashMap<FamilyId, Vec<crate::db::bigtable::bigtable_client::cell::Cell>>,
568 ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
569 let mut mutations = protobuf::RepeatedField::default();
570 for (family_id, cells) in cells {
571 for cell in cells {
572 let mut mutation = data::Mutation::default();
573 let mut set_cell = data::Mutation_SetCell::default();
574 let timestamp = cell
575 .timestamp
576 .duration_since(SystemTime::UNIX_EPOCH)
577 .map_err(error::BigTableError::WriteTime)?;
578 set_cell.family_name.clone_from(&family_id);
579 set_cell.set_column_qualifier(cell.qualifier.clone().into_bytes());
580 set_cell.set_value(cell.value);
581 set_cell.set_timestamp_micros((timestamp.as_millis() * 1000) as i64);
584 debug!("🉑 expiring in {:?}", timestamp.as_millis());
585 mutation.set_set_cell(set_cell);
586 mutations.push(mutation);
587 }
588 }
589 Ok(mutations)
590 }
591
592 async fn check_and_mutate_row(
600 &self,
601 row: row::Row,
602 filter: RowFilter,
603 state: bool,
604 ) -> Result<bool, error::BigTableError> {
605 let mut req = self.check_and_mutate_row_request(&row.row_key);
606 let mutations = self.get_mutations(row.cells)?;
607 req.set_predicate_filter(filter);
608 if state {
609 req.set_true_mutations(mutations);
610 } else {
611 req.set_false_mutations(mutations);
612 }
613 self.check_and_mutate(req).await
614 }
615
616 async fn check_and_mutate(
617 &self,
618 req: bigtable::CheckAndMutateRowRequest,
619 ) -> Result<bool, error::BigTableError> {
620 let bigtable = self.pool.get().await?;
621 let resp = retry_policy(self.settings.retry_count)
622 .retry_if(
623 || async {
624 bigtable
627 .conn
628 .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
629 },
630 retryable_grpcio_err(&self.metrics),
631 )
632 .await
633 .map_err(error::BigTableError::Write)?;
634 debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
635 Ok(resp.get_predicate_matched())
636 }
637
638 fn get_delete_mutations(
639 &self,
640 family: &str,
641 column_names: &[&str],
642 time_range: Option<&data::TimestampRange>,
643 ) -> Result<protobuf::RepeatedField<data::Mutation>, error::BigTableError> {
644 let mut mutations = protobuf::RepeatedField::default();
645 for column in column_names {
646 let mut mutation = data::Mutation::default();
647 let mut del_cell = data::Mutation_DeleteFromColumn::default();
651 del_cell.set_family_name(family.to_owned());
652 del_cell.set_column_qualifier(column.as_bytes().to_vec());
653 if let Some(range) = time_range {
654 del_cell.set_time_range(range.clone());
655 }
656 mutation.set_delete_from_column(del_cell);
657 mutations.push(mutation);
658 }
659 Ok(mutations)
660 }
661
662 #[allow(unused)]
663 #[allow(unused)]
665 async fn delete_cells(
666 &self,
667 row_key: &str,
668 family: &str,
669 column_names: &[&str],
670 time_range: Option<&data::TimestampRange>,
671 ) -> Result<(), error::BigTableError> {
672 let mut req = self.mutate_row_request(row_key);
673 req.set_mutations(self.get_delete_mutations(family, column_names, time_range)?);
674 self.mutate_row(req).await
675 }
676
677 async fn delete_row(&self, row_key: &str) -> Result<(), error::BigTableError> {
679 let mut req = self.mutate_row_request(row_key);
680 let mut mutations = protobuf::RepeatedField::default();
681 let mut mutation = data::Mutation::default();
682 mutation.set_delete_from_row(data::Mutation_DeleteFromRow::default());
683 mutations.push(mutation);
684 req.set_mutations(mutations);
685 self.mutate_row(req).await
686 }
687
688 #[allow(unused)]
693 async fn delete_rows(&self, row_key: &str) -> Result<bool, error::BigTableError> {
694 let admin = BigtableTableAdminClient::new(self.pool.get_channel()?);
695 let mut req = DropRowRangeRequest::new();
696 req.set_name(self.settings.table_name.clone());
697 req.set_row_key_prefix(row_key.as_bytes().to_vec());
698
699 admin
700 .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone()))
701 .map_err(|e| {
702 error!("{:?}", e);
703 error::BigTableError::Admin(
704 format!(
705 "Could not send delete command for {}",
706 &self.settings.table_name
707 ),
708 Some(e.to_string()),
709 )
710 })?
711 .await
712 .map_err(|e| {
713 error!("post await: {:?}", e);
714 error::BigTableError::Admin(
715 format!(
716 "Could not delete data from table {}",
717 &self.settings.table_name
718 ),
719 Some(e.to_string()),
720 )
721 })?;
722
723 Ok(true)
724 }
725
726 fn rows_to_notifications(
727 &self,
728 rows: BTreeMap<String, Row>,
729 ) -> Result<Vec<Notification>, DbError> {
730 rows.into_iter()
731 .map(|(row_key, row)| self.row_to_notification(&row_key, row))
732 .collect()
733 }
734
735 fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result<Notification, DbError> {
736 let Some((_, chidmessageid)) = row_key.split_once('#') else {
737 return Err(DbError::Integrity(
738 "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(),
739 None,
740 ));
741 };
742 let range_key = RangeKey::parse_chidmessageid(chidmessageid).map_err(|e| {
743 DbError::Integrity(
744 format!("rows_to_notification expected chidmessageid: {e}"),
745 None,
746 )
747 })?;
748
749 let mut notif = Notification {
751 channel_id: range_key.channel_id,
752 topic: range_key.topic,
753 sortkey_timestamp: range_key.sortkey_timestamp,
754 version: to_string(row.take_required_cell("version")?.value, "version")?,
755 ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?,
756 timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?,
757 ..Default::default()
758 };
759
760 if let Some(cell) = row.take_cell("data") {
762 notif.data = Some(to_string(cell.value, "data")?);
763 }
764 #[cfg(feature = "reliable_report")]
765 {
766 if let Some(cell) = row.take_cell("reliability_id") {
767 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
768 }
769 if let Some(cell) = row.take_cell("reliable_state") {
770 notif.reliable_state = Some(
771 crate::reliability::ReliabilityState::from_str(&to_string(
772 cell.value,
773 "reliable_state",
774 )?)
775 .map_err(|e| {
776 DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
777 })?,
778 );
779 }
780 }
781 if let Some(cell) = row.take_cell("headers") {
782 notif.headers = Some(
783 serde_json::from_str::<HashMap<String, String>>(&to_string(cell.value, "headers")?)
784 .map_err(|e| DbError::Serialization(e.to_string()))?,
785 );
786 }
787 if let Some(cell) = row.take_cell("reliability_id") {
788 trace!("🚣 Is reliable");
789 notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
790 }
791
792 trace!("🚣 Deserialized message row: {:?}", ¬if);
793 Ok(notif)
794 }
795
796 fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
802 let row_key = user.uaid.simple().to_string();
803 let mut row = Row::new(row_key);
804 let expiry =
805 std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
806
807 let mut cells: Vec<cell::Cell> = vec![
808 cell::Cell {
809 qualifier: "connected_at".to_owned(),
810 value: user.connected_at.to_be_bytes().to_vec(),
811 timestamp: expiry,
812 ..Default::default()
813 },
814 cell::Cell {
815 qualifier: "router_type".to_owned(),
816 value: user.router_type.clone().into_bytes(),
817 timestamp: expiry,
818 ..Default::default()
819 },
820 cell::Cell {
821 qualifier: "record_version".to_owned(),
822 value: user
823 .record_version
824 .unwrap_or(USER_RECORD_VERSION)
825 .to_be_bytes()
826 .to_vec(),
827 timestamp: expiry,
828 ..Default::default()
829 },
830 cell::Cell {
831 qualifier: "version".to_owned(),
832 value: (*version).into(),
833 timestamp: expiry,
834 ..Default::default()
835 },
836 ];
837
838 if let Some(router_data) = &user.router_data {
839 cells.push(cell::Cell {
840 qualifier: "router_data".to_owned(),
841 value: json!(router_data).to_string().as_bytes().to_vec(),
842 timestamp: expiry,
843 ..Default::default()
844 });
845 };
846 if let Some(current_timestamp) = user.current_timestamp {
847 cells.push(cell::Cell {
848 qualifier: "current_timestamp".to_owned(),
849 value: current_timestamp.to_be_bytes().to_vec(),
850 timestamp: expiry,
851 ..Default::default()
852 });
853 };
854 if let Some(node_id) = &user.node_id {
855 cells.push(cell::Cell {
856 qualifier: "node_id".to_owned(),
857 value: node_id.as_bytes().to_vec(),
858 timestamp: expiry,
859 ..Default::default()
860 });
861 };
862
863 cells.extend(channels_to_cells(
864 Cow::Borrowed(&user.priv_channels),
865 expiry,
866 ));
867
868 row.add_cells(ROUTER_FAMILY, cells);
869 row
870 }
871}
872
873#[derive(Clone)]
874pub struct BigtableDb {
875 pub(super) conn: BigtableClient,
876 pub(super) health_metadata: Metadata,
877 table_name: String,
878}
879
880impl BigtableDb {
881 pub fn new(channel: Channel, health_metadata: &Metadata, table_name: &str) -> Self {
882 Self {
883 conn: BigtableClient::new(channel),
884 health_metadata: health_metadata.clone(),
885 table_name: table_name.to_owned(),
886 }
887 }
888 pub async fn health_check(
895 &mut self,
896 metrics: &Arc<StatsdClient>,
897 app_profile_id: &str,
898 ) -> Result<bool, error::BigTableError> {
899 let random_uaid = Uuid::new_v4().simple().to_string();
906 let mut req = read_row_request(&self.table_name, app_profile_id, &random_uaid);
907 let mut filter = data::RowFilter::default();
908 filter.set_block_all_filter(true);
909 req.set_filter(filter);
910 let _r = retry_policy(RETRY_COUNT)
911 .retry_if(
912 || async {
913 self.conn
914 .read_rows_opt(&req, call_opts(self.health_metadata.clone()))
915 },
916 retryable_grpcio_err(metrics),
917 )
918 .await
919 .map_err(error::BigTableError::Read)?;
920
921 debug!("🉑 health check");
922 Ok(true)
923 }
924}
925
926#[async_trait]
927impl DbClient for BigTableClientImpl {
928 async fn add_user(&self, user: &User) -> DbResult<()> {
930 trace!("🉑 Adding user");
931 let Some(ref version) = user.version else {
932 return Err(DbError::General(
933 "add_user expected a user version field".to_owned(),
934 ));
935 };
936 let row = self.user_to_row(user, version);
937
938 let mut row_key_filter = RowFilter::default();
940 row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
941 let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);
942
943 if self.check_and_mutate_row(row, filter, false).await? {
944 return Err(DbError::Conditional);
945 }
946 Ok(())
947 }
948
949 async fn update_user(&self, user: &mut User) -> DbResult<bool> {
965 let Some(ref version) = user.version else {
966 return Err(DbError::General(
967 "update_user expected a user version field".to_owned(),
968 ));
969 };
970
971 let mut filters = vec![router_gc_policy_filter()];
972 filters.extend(version_filter(version));
973 let filter = filter_chain(filters);
974
975 let new_version = Uuid::new_v4();
976 let row = self.user_to_row(user, &new_version);
978
979 let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
980 user.version = Some(new_version);
981 Ok(predicate_matched)
982 }
983
984 async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
985 let row_key = uaid.as_simple().to_string();
986 let mut req = self.read_row_request(&row_key);
987 let mut filters = vec![router_gc_policy_filter()];
988 filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
989 req.set_filter(filter_chain(filters));
990 let Some(mut row) = self.read_row(req).await? else {
991 return Ok(None);
992 };
993
994 trace!("🉑 Found a record for {}", row_key);
995
996 let connected_at_cell = match row.take_required_cell("connected_at") {
997 Ok(cell) => cell,
998 Err(_) => {
999 if !is_incomplete_router_record(&row.cells) {
1000 return Err(DbError::Integrity(
1001 "Expected column: connected_at".to_owned(),
1002 Some(format!("{row:#?}")),
1003 ));
1004 }
1005 trace!("🉑 Dropping an incomplete user record for {}", row_key);
1010 self.metrics
1011 .incr_with_tags(MetricName::DatabaseDropUser)
1012 .with_tag("reason", "incomplete_record")
1013 .send();
1014 self.remove_user(uaid).await?;
1015 return Ok(None);
1016 }
1017 };
1018
1019 let mut result = User {
1020 uaid: *uaid,
1021 connected_at: to_u64(connected_at_cell.value, "connected_at")?,
1022 router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
1023 record_version: Some(to_u64(
1024 row.take_required_cell("record_version")?.value,
1025 "record_version",
1026 )?),
1027 version: Some(
1028 row.take_required_cell("version")?
1029 .value
1030 .try_into()
1031 .map_err(|e| {
1032 DbError::Serialization(format!("Could not deserialize version: {e:?}"))
1033 })?,
1034 ),
1035 ..Default::default()
1036 };
1037
1038 if let Some(cell) = row.take_cell("router_data") {
1039 result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| {
1040 DbError::Serialization(format!("Could not deserialize router_type: {e:?}"))
1041 })?;
1042 }
1043
1044 if let Some(cell) = row.take_cell("node_id") {
1045 result.node_id = Some(to_string(cell.value, "node_id")?);
1046 }
1047
1048 if let Some(cell) = row.take_cell("current_timestamp") {
1049 result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
1050 }
1051
1052 result.priv_channels = channels_from_cells(&row.cells)?;
1054
1055 Ok(Some(result))
1056 }
1057
1058 async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
1059 let row_key = uaid.simple().to_string();
1060 self.delete_row(&row_key).await?;
1061 Ok(())
1062 }
1063
1064 async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
1065 let channels = HashSet::from_iter([channel_id.to_owned()]);
1066 self.add_channels(uaid, channels).await
1067 }
1068
1069 async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
1072 let row_key = uaid.simple().to_string();
1086 let mut row = Row::new(row_key);
1087 let expiry =
1088 std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
1089
1090 row.add_cells(
1094 ROUTER_FAMILY,
1095 channels_to_cells(Cow::Owned(channels), expiry),
1096 );
1097
1098 self.write_row(row).await?;
1099 Ok(())
1100 }
1101
1102 async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
1103 let row_key = uaid.simple().to_string();
1104 let mut req = self.read_row_request(&row_key);
1105
1106 let mut cq_filter = data::RowFilter::default();
1107 cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());
1108 req.set_filter(filter_chain(vec![
1109 router_gc_policy_filter(),
1110 family_filter(format!("^{ROUTER_FAMILY}$")),
1111 cq_filter,
1112 ]));
1113
1114 let Some(row) = self.read_row(req).await? else {
1115 return Ok(Default::default());
1116 };
1117 channels_from_cells(&row.cells)
1118 }
1119
1120 async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
1122 let row_key = uaid.simple().to_string();
1123 let mut req = self.check_and_mutate_row_request(&row_key);
1124
1125 let column = format!("chid:{}", channel_id.as_hyphenated());
1127 let mut mutations = self.get_delete_mutations(ROUTER_FAMILY, &[column.as_ref()], None)?;
1128
1129 let mut row = Row::new(row_key);
1131 let expiry =
1132 std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
1133 row.cells
1134 .insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
1135 mutations.extend(self.get_mutations(row.cells)?);
1136
1137 let mut cq_filter = data::RowFilter::default();
1139 cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
1140 req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
1141 req.set_true_mutations(mutations);
1142
1143 Ok(self.check_and_mutate(req).await?)
1144 }
1145
1146 async fn remove_node_id(
1148 &self,
1149 uaid: &Uuid,
1150 _node_id: &str,
1151 _connected_at: u64,
1152 version: &Option<Uuid>,
1153 ) -> DbResult<bool> {
1154 let row_key = uaid.simple().to_string();
1155 trace!("🉑 Removing node_id for: {row_key} (version: {version:?}) ",);
1156 let Some(ref version) = version else {
1157 return Err(DbError::General("Expected a user version field".to_owned()));
1158 };
1159
1160 let mut req = self.check_and_mutate_row_request(&row_key);
1161
1162 let mut filters = vec![router_gc_policy_filter()];
1163 filters.extend(version_filter(version));
1164 req.set_predicate_filter(filter_chain(filters));
1165 req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);
1166
1167 Ok(self.check_and_mutate(req).await?)
1168 }
1169
1170 async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
1172 let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
1173 debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
1174 trace!(
1175 "🉑 timestamp: {:?}",
1176 &message.timestamp.to_be_bytes().to_vec()
1177 );
1178 let mut row = Row::new(row_key);
1179
1180 let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
1184 trace!(
1185 "🉑 Message Expiry {}",
1186 expiry
1187 .duration_since(SystemTime::UNIX_EPOCH)
1188 .unwrap_or_default()
1189 .as_millis()
1190 );
1191
1192 let mut cells: Vec<cell::Cell> = Vec::new();
1193
1194 let is_topic = message.topic.is_some();
1195 let family = if is_topic {
1196 MESSAGE_TOPIC_FAMILY
1197 } else {
1198 MESSAGE_FAMILY
1199 };
1200 cells.extend(vec![
1201 cell::Cell {
1202 qualifier: "ttl".to_owned(),
1203 value: message.ttl.to_be_bytes().to_vec(),
1204 timestamp: expiry,
1205 ..Default::default()
1206 },
1207 cell::Cell {
1208 qualifier: "timestamp".to_owned(),
1209 value: message.timestamp.to_be_bytes().to_vec(),
1210 timestamp: expiry,
1211 ..Default::default()
1212 },
1213 cell::Cell {
1214 qualifier: "version".to_owned(),
1215 value: message.version.into_bytes(),
1216 timestamp: expiry,
1217 ..Default::default()
1218 },
1219 ]);
1220 if let Some(headers) = message.headers {
1221 if !headers.is_empty() {
1222 cells.push(cell::Cell {
1223 qualifier: "headers".to_owned(),
1224 value: json!(headers).to_string().into_bytes(),
1225 timestamp: expiry,
1226 ..Default::default()
1227 });
1228 }
1229 }
1230 #[cfg(feature = "reliable_report")]
1231 {
1232 if let Some(reliability_id) = message.reliability_id {
1233 trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id);
1234 cells.push(cell::Cell {
1235 qualifier: "reliability_id".to_owned(),
1236 value: reliability_id.into_bytes(),
1237 timestamp: expiry,
1238 ..Default::default()
1239 });
1240 }
1241 if let Some(reliable_state) = message.reliable_state {
1242 cells.push(cell::Cell {
1243 qualifier: "reliable_state".to_owned(),
1244 value: reliable_state.to_string().into_bytes(),
1245 timestamp: expiry,
1246 ..Default::default()
1247 });
1248 }
1249 }
1250 if let Some(data) = message.data {
1251 cells.push(cell::Cell {
1252 qualifier: "data".to_owned(),
1253 value: data.into_bytes(),
1254 timestamp: expiry,
1255 ..Default::default()
1256 });
1257 }
1258
1259 row.add_cells(family, cells);
1260 trace!("🉑 Adding row");
1261 self.write_row(row).await?;
1262
1263 self.metrics
1264 .incr_with_tags(MetricName::NotificationMessageStored)
1265 .with_tag("topic", &is_topic.to_string())
1266 .with_tag("database", &self.name())
1267 .send();
1268 Ok(())
1269 }
1270
1271 async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
1276 for message in messages {
1278 self.save_message(uaid, message).await?;
1279 }
1280 Ok(())
1281 }
1282
1283 async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
1297 let row_key = uaid.simple().to_string();
1298 debug!(
1299 "🉑 Updating {} current_timestamp: {:?}",
1300 &row_key,
1301 timestamp.to_be_bytes().to_vec()
1302 );
1303 let expiry =
1304 std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
1305 let mut row = Row::new(row_key.clone());
1306
1307 row.cells.insert(
1308 ROUTER_FAMILY.to_owned(),
1309 vec![
1310 cell::Cell {
1311 qualifier: "current_timestamp".to_owned(),
1312 value: timestamp.to_be_bytes().to_vec(),
1313 timestamp: expiry,
1314 ..Default::default()
1315 },
1316 new_version_cell(expiry),
1317 ],
1318 );
1319
1320 self.write_row(row).await?;
1321
1322 Ok(())
1323 }
1324
1325 async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
1327 trace!(
1328 "🉑 attemping to delete {:?} :: {:?}",
1329 uaid.to_string(),
1330 chidmessageid
1331 );
1332 let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
1333 debug!("🉑🔥 Deleting message {}", &row_key);
1334 self.delete_row(&row_key).await?;
1335 self.metrics
1336 .incr_with_tags(MetricName::NotificationMessageDeleted)
1337 .with_tag("database", &self.name())
1338 .send();
1339 Ok(())
1340 }
1341
1342 async fn fetch_topic_messages(
1344 &self,
1345 uaid: &Uuid,
1346 limit: usize,
1347 ) -> DbResult<FetchMessageResponse> {
1348 let mut req = ReadRowsRequest::default();
1349 req.set_table_name(self.settings.table_name.clone());
1350 req.set_app_profile_id(self.settings.app_profile_id.clone());
1351
1352 let start_key = format!("{}#01:", uaid.simple());
1353 let end_key = format!("{}#02:", uaid.simple());
1354 let mut rows = data::RowSet::default();
1355 let mut row_range = data::RowRange::default();
1356 row_range.set_start_key_open(start_key.into_bytes());
1357 row_range.set_end_key_open(end_key.into_bytes());
1358 let mut row_ranges = RepeatedField::default();
1359 row_ranges.push(row_range);
1360 rows.set_row_ranges(row_ranges);
1361 req.set_rows(rows);
1362
1363 let mut filters = message_gc_policy_filter()?;
1364 filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));
1365
1366 req.set_filter(filter_chain(filters));
1367 if limit > 0 {
1368 trace!("🉑 Setting limit to {limit}");
1369 req.set_rows_limit(limit as i64);
1370 }
1371 let rows = self.read_rows(req).await?;
1372 debug!(
1373 "🉑 Fetch Topic Messages. Found {} row(s) of {}",
1374 rows.len(),
1375 limit
1376 );
1377
1378 let messages = self.rows_to_notifications(rows)?;
1379
1380 Ok(FetchMessageResponse {
1384 messages,
1385 timestamp: None,
1386 })
1387 }
1388
1389 async fn fetch_timestamp_messages(
1392 &self,
1393 uaid: &Uuid,
1394 timestamp: Option<u64>,
1395 limit: usize,
1396 ) -> DbResult<FetchMessageResponse> {
1397 let mut req = ReadRowsRequest::default();
1398 req.set_table_name(self.settings.table_name.clone());
1399 req.set_app_profile_id(self.settings.app_profile_id.clone());
1400
1401 let mut rows = data::RowSet::default();
1402 let mut row_range = data::RowRange::default();
1403
1404 let start_key = if let Some(ts) = timestamp {
1405 format!("{}#02:{}z", uaid.simple(), ts)
1408 } else {
1409 format!("{}#02:", uaid.simple())
1410 };
1411 let end_key = format!("{}#03:", uaid.simple());
1412 row_range.set_start_key_open(start_key.into_bytes());
1413 row_range.set_end_key_open(end_key.into_bytes());
1414
1415 let mut row_ranges = RepeatedField::default();
1416 row_ranges.push(row_range);
1417 rows.set_row_ranges(row_ranges);
1418 req.set_rows(rows);
1419
1420 let mut filters = message_gc_policy_filter()?;
1434 filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));
1435
1436 req.set_filter(filter_chain(filters));
1437 if limit > 0 {
1438 req.set_rows_limit(limit as i64);
1439 }
1440 let rows = self.read_rows(req).await?;
1441 debug!(
1442 "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
1443 timestamp,
1444 rows.len(),
1445 limit,
1446 );
1447
1448 let messages = self.rows_to_notifications(rows)?;
1449 let timestamp = messages.last().and_then(|m| m.sortkey_timestamp);
1451 Ok(FetchMessageResponse {
1452 messages,
1453 timestamp,
1454 })
1455 }
1456
1457 async fn health_check(&self) -> DbResult<bool> {
1458 Ok(self
1459 .pool
1460 .get()
1461 .await?
1462 .health_check(&self.metrics.clone(), &self.settings.app_profile_id)
1463 .await?)
1464 }
1465
1466 async fn router_table_exists(&self) -> DbResult<bool> {
1469 Ok(true)
1470 }
1471
1472 async fn message_table_exists(&self) -> DbResult<bool> {
1475 Ok(true)
1476 }
1477
1478 #[cfg(feature = "reliable_report")]
1479 async fn log_report(
1480 &self,
1481 reliability_id: &str,
1482 new_state: crate::reliability::ReliabilityState,
1483 ) -> DbResult<()> {
1484 let row_key = reliability_id.to_owned();
1485
1486 let mut row = Row::new(row_key);
1487 let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);
1488
1489 let cells: Vec<cell::Cell> = vec![cell::Cell {
1491 qualifier: new_state.to_string(),
1492 value: crate::util::ms_since_epoch().to_be_bytes().to_vec(),
1493 timestamp: expiry,
1494 ..Default::default()
1495 }];
1496
1497 row.add_cells(RELIABLE_LOG_FAMILY, cells);
1498
1499 self.write_row(row).await?;
1500
1501 Ok(())
1502 }
1503
1504 fn box_clone(&self) -> Box<dyn DbClient> {
1505 Box::new(self.clone())
1506 }
1507
1508 fn name(&self) -> String {
1509 "Bigtable".to_owned()
1510 }
1511
1512 fn pool_status(&self) -> Option<deadpool::Status> {
1513 Some(self.pool.pool.status())
1514 }
1515}
1516
1517#[cfg(all(test, feature = "emulator"))]
1518mod tests {
1519
1520 use std::sync::Arc;
1525 use std::time::SystemTime;
1526
1527 use cadence::StatsdClient;
1528 use uuid;
1529
1530 use super::*;
1531 use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};
1532
1533 const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
1534 const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
1535 const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
1536
1537 fn now() -> u64 {
1538 SystemTime::now()
1539 .duration_since(SystemTime::UNIX_EPOCH)
1540 .unwrap()
1541 .as_secs()
1542 }
1543
1544 fn new_client() -> DbResult<BigTableClientImpl> {
1545 let env_dsn = format!(
1546 "grpc://{}",
1547 std::env::var("BIGTABLE_EMULATOR_HOST").unwrap_or("localhost:8080".to_owned())
1548 );
1549 let settings = DbSettings {
1550 dsn: Some(env_dsn),
1556 db_settings: json!({"table_name": "projects/test/instances/test/tables/autopush"})
1557 .to_string(),
1558 };
1559
1560 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
1561
1562 BigTableClientImpl::new(metrics, &settings)
1563 }
1564
1565 #[test]
1566 fn escape_bytes_for_regex() {
1567 let b = b"hi";
1568 assert_eq!(escape_bytes(b), b.to_vec());
1569 assert_eq!(escape_bytes(b"h.*i!"), b"h\\.\\*i\\!".to_vec());
1570 let b = b"\xe2\x80\xb3";
1571 assert_eq!(escape_bytes(b), b.to_vec());
1572 let b = [b'f', b'o', b'\0', b'2', b'2', b'o'];
1574 assert_eq!(escape_bytes(&b), b"fo\\x0022o".to_vec());
1575 let b = b"\xc0";
1576 assert_eq!(escape_bytes(b), b.to_vec());
1577 assert_eq!(escape_bytes(b"\x03"), b"\\\x03".to_vec());
1578 }
1579
1580 #[actix_rt::test]
1581 async fn health_check() {
1582 let client = new_client().unwrap();
1583
1584 let result = client.health_check().await;
1585 assert!(result.is_ok());
1586 assert!(result.unwrap());
1587 }
1588
1589 #[actix_rt::test]
1592 async fn run_gauntlet() -> DbResult<()> {
1593 let client = new_client()?;
1594
1595 let connected_at = ms_since_epoch();
1596
1597 let uaid = Uuid::parse_str(TEST_USER).unwrap();
1598 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1599 let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
1600
1601 let node_id = "test_node".to_owned();
1602
1603 let _ = client.remove_user(&uaid).await;
1605
1606 let test_user = User {
1607 uaid,
1608 router_type: "webpush".to_owned(),
1609 connected_at,
1610 router_data: None,
1611 node_id: Some(node_id.clone()),
1612 ..Default::default()
1613 };
1614
1615 let _ = client.remove_user(&uaid).await;
1618
1619 client.add_user(&test_user).await?;
1621 let fetched = client.get_user(&uaid).await?;
1622 assert!(fetched.is_some());
1623 let fetched = fetched.unwrap();
1624 assert_eq!(fetched.router_type, "webpush".to_owned());
1625
1626 let connected_at = ms_since_epoch();
1628
1629 client.add_channel(&uaid, &chid).await?;
1631 let channels = client.get_channels(&uaid).await?;
1632 assert!(channels.contains(&chid));
1633
1634 let mut new_channels: HashSet<Uuid> = HashSet::new();
1636 new_channels.insert(chid);
1637 for _ in 1..10 {
1638 new_channels.insert(uuid::Uuid::new_v4());
1639 }
1640 let chid_to_remove = uuid::Uuid::new_v4();
1641 new_channels.insert(chid_to_remove);
1642 client.add_channels(&uaid, new_channels.clone()).await?;
1643 let channels = client.get_channels(&uaid).await?;
1644 assert_eq!(channels, new_channels);
1645
1646 assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
1648 assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
1649 new_channels.remove(&chid_to_remove);
1650 let channels = client.get_channels(&uaid).await?;
1651 assert_eq!(channels, new_channels);
1652
1653 let mut updated = User {
1657 connected_at,
1658 ..test_user.clone()
1659 };
1660 let result = client.update_user(&mut updated).await;
1661 assert!(result.is_ok());
1662 assert!(!result.unwrap());
1663
1664 let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
1666 assert_eq!(fetched.connected_at, fetched2.connected_at);
1667
1668 let mut updated = User {
1670 connected_at: fetched.connected_at + 300,
1671 ..fetched2
1672 };
1673 let result = client.update_user(&mut updated).await;
1674 assert!(result.is_ok());
1675 assert!(result.unwrap());
1676 assert_ne!(
1677 fetched2.connected_at,
1678 client.get_user(&uaid).await?.unwrap().connected_at
1679 );
1680
1681 client
1683 .increment_storage(
1684 &fetched.uaid,
1685 SystemTime::now()
1686 .duration_since(SystemTime::UNIX_EPOCH)
1687 .unwrap()
1688 .as_secs(),
1689 )
1690 .await?;
1691
1692 let test_data = "An_encrypted_pile_of_crap".to_owned();
1693 let timestamp = now();
1694 let sort_key = now();
1695 let test_notification = crate::db::Notification {
1697 channel_id: chid,
1698 version: "test".to_owned(),
1699 ttl: 300,
1700 timestamp,
1701 data: Some(test_data.clone()),
1702 sortkey_timestamp: Some(sort_key),
1703 ..Default::default()
1704 };
1705 let res = client.save_message(&uaid, test_notification.clone()).await;
1706 assert!(res.is_ok());
1707
1708 let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
1709 assert_ne!(fetched.messages.len(), 0);
1710 let fm = fetched.messages.pop().unwrap();
1711 assert_eq!(fm.channel_id, test_notification.channel_id);
1712 assert_eq!(fm.data, Some(test_data));
1713
1714 let fetched = client
1716 .fetch_timestamp_messages(&uaid, Some(timestamp - 10), 999)
1717 .await?;
1718 assert_ne!(fetched.messages.len(), 0);
1719
1720 let fetched = client
1722 .fetch_timestamp_messages(&uaid, Some(timestamp + 10), 999)
1723 .await?;
1724 assert_eq!(fetched.messages.len(), 0);
1725
1726 assert!(client
1728 .remove_message(&uaid, &test_notification.chidmessageid())
1729 .await
1730 .is_ok());
1731
1732 assert!(client.remove_channel(&uaid, &chid).await.is_ok());
1733
1734 client.add_channel(&uaid, &topic_chid).await?;
1736 let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
1737 let timestamp = now();
1738 let sort_key = now();
1739 let test_notification = crate::db::Notification {
1741 channel_id: topic_chid,
1742 version: "test".to_owned(),
1743 ttl: 300,
1744 topic: Some("topic".to_owned()),
1745 timestamp,
1746 data: Some(test_data.clone()),
1747 sortkey_timestamp: Some(sort_key),
1748 ..Default::default()
1749 };
1750 assert!(client
1751 .save_message(&uaid, test_notification.clone())
1752 .await
1753 .is_ok());
1754
1755 let mut fetched = client.fetch_topic_messages(&uaid, 999).await?;
1756 assert_ne!(fetched.messages.len(), 0);
1757 let fm = fetched.messages.pop().unwrap();
1758 assert_eq!(fm.channel_id, test_notification.channel_id);
1759 assert_eq!(fm.data, Some(test_data));
1760
1761 let fetched = client.fetch_topic_messages(&uaid, 999).await?;
1763 assert_ne!(fetched.messages.len(), 0);
1764
1765 assert!(client
1767 .remove_message(&uaid, &test_notification.chidmessageid())
1768 .await
1769 .is_ok());
1770
1771 assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
1772
1773 let msgs = client
1774 .fetch_timestamp_messages(&uaid, None, 999)
1775 .await?
1776 .messages;
1777 assert!(msgs.is_empty());
1778
1779 let fetched = client.get_user(&uaid).await?.unwrap();
1780 assert!(client
1781 .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
1782 .await
1783 .is_ok());
1784 let fetched = client.get_user(&uaid).await?.unwrap();
1786 assert_eq!(fetched.node_id, None);
1787
1788 assert!(client.remove_user(&uaid).await.is_ok());
1789
1790 assert!(client.get_user(&uaid).await?.is_none());
1791
1792 Ok(())
1793 }
1794
1795 #[actix_rt::test]
1796 async fn read_cells_family_id() -> DbResult<()> {
1797 let client = new_client().unwrap();
1798 let uaid = gen_test_uaid();
1799 client.remove_user(&uaid).await.unwrap();
1800
1801 let qualifier = "foo".to_owned();
1802
1803 let row_key = uaid.simple().to_string();
1804 let mut row = Row::new(row_key.clone());
1805 row.cells.insert(
1806 ROUTER_FAMILY.to_owned(),
1807 vec![cell::Cell {
1808 qualifier: qualifier.to_owned(),
1809 value: "bar".as_bytes().to_vec(),
1810 ..Default::default()
1811 }],
1812 );
1813 client.write_row(row).await.unwrap();
1814 let req = client.read_row_request(&row_key);
1815 let Some(row) = client.read_row(req).await.unwrap() else {
1816 panic!("Expected row");
1817 };
1818 assert_eq!(row.cells.len(), 1);
1819 assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str());
1820 client.remove_user(&uaid).await
1821 }
1822
1823 #[actix_rt::test]
1824 async fn add_user_existing() {
1825 let client = new_client().unwrap();
1826 let uaid = gen_test_uaid();
1827 let user = User {
1828 uaid,
1829 ..Default::default()
1830 };
1831 client.remove_user(&uaid).await.unwrap();
1832
1833 client.add_user(&user).await.unwrap();
1834 let err = client.add_user(&user).await.unwrap_err();
1835 assert!(matches!(err, DbError::Conditional));
1836 }
1837
1838 #[actix_rt::test]
1839 async fn version_check() {
1840 let client = new_client().unwrap();
1841 let uaid = gen_test_uaid();
1842 let user = User {
1843 uaid,
1844 ..Default::default()
1845 };
1846 client.remove_user(&uaid).await.unwrap();
1847
1848 client.add_user(&user).await.unwrap();
1849 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1850 assert!(client.update_user(&mut user.clone()).await.unwrap());
1851
1852 let fetched = client.get_user(&uaid).await.unwrap().unwrap();
1853 assert_ne!(user.version, fetched.version);
1854 assert!(!client.update_user(&mut user).await.unwrap());
1856
1857 client.remove_user(&uaid).await.unwrap();
1858 }
1859
1860 #[actix_rt::test]
1861 async fn lingering_chid_record() {
1862 let client = new_client().unwrap();
1863 let uaid = gen_test_uaid();
1864 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1865 let user = User {
1866 uaid,
1867 ..Default::default()
1868 };
1869 client.remove_user(&uaid).await.unwrap();
1870
1871 client.add_channel(&uaid, &chid).await.unwrap();
1873
1874 assert!(client.get_user(&uaid).await.unwrap().is_none());
1877
1878 client.add_user(&user).await.unwrap();
1879 assert!(client.get_channels(&uaid).await.unwrap().is_empty());
1881
1882 client.remove_user(&uaid).await.unwrap();
1883 }
1884
1885 #[actix_rt::test]
1886 async fn lingering_current_timestamp() {
1887 let client = new_client().unwrap();
1888 let uaid = gen_test_uaid();
1889 client.remove_user(&uaid).await.unwrap();
1890
1891 client
1892 .increment_storage(&uaid, ms_since_epoch())
1893 .await
1894 .unwrap();
1895 assert!(client.get_user(&uaid).await.unwrap().is_none());
1896
1897 client.remove_user(&uaid).await.unwrap();
1898 }
1899
1900 #[actix_rt::test]
1901 async fn lingering_chid_w_version_record() {
1902 let client = new_client().unwrap();
1903 let uaid = gen_test_uaid();
1904 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1905 client.remove_user(&uaid).await.unwrap();
1906
1907 client.add_channel(&uaid, &chid).await.unwrap();
1908 assert!(client.remove_channel(&uaid, &chid).await.unwrap());
1909 assert!(client.get_user(&uaid).await.unwrap().is_none());
1910
1911 client.remove_user(&uaid).await.unwrap();
1912 }
1913
1914 #[actix_rt::test]
1915 async fn channel_and_current_timestamp_ttl_updates() {
1916 let client = new_client().unwrap();
1917 let uaid = gen_test_uaid();
1918 let chid = Uuid::parse_str(TEST_CHID).unwrap();
1919 client.remove_user(&uaid).await.unwrap();
1920
1921 let user = User {
1923 uaid,
1924 ..Default::default()
1925 };
1926 client.add_user(&user).await.unwrap();
1927
1928 client.add_channel(&uaid, &chid).await.unwrap();
1929 client
1930 .add_channel(&uaid, &uuid::Uuid::new_v4())
1931 .await
1932 .unwrap();
1933
1934 client
1935 .increment_storage(
1936 &uaid,
1937 SystemTime::now()
1938 .duration_since(SystemTime::UNIX_EPOCH)
1939 .unwrap()
1940 .as_secs(),
1941 )
1942 .await
1943 .unwrap();
1944
1945 let req = client.read_row_request(&uaid.as_simple().to_string());
1946 let Some(mut row) = client.read_row(req).await.unwrap() else {
1947 panic!("Expected row");
1948 };
1949
1950 let ca_expiry = row.take_required_cell("connected_at").unwrap().timestamp;
1953 for mut cells in row.cells.into_values() {
1954 let Some(cell) = cells.pop() else {
1955 continue;
1956 };
1957 assert!(
1958 cell.timestamp >= ca_expiry,
1959 "{} cell timestamp should >= connected_at's",
1960 cell.qualifier
1961 );
1962 }
1963
1964 let mut user = client.get_user(&uaid).await.unwrap().unwrap();
1965
1966 tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
1968 client.update_user(&mut user).await.unwrap();
1969
1970 let req = client.read_row_request(&uaid.as_simple().to_string());
1972 let Some(mut row) = client.read_row(req).await.unwrap() else {
1973 panic!("Expected row");
1974 };
1975
1976 let ca_expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
1977
1978 assert!(ca_expiry2 > ca_expiry);
1979
1980 for mut cells in row.cells.into_values() {
1981 let Some(cell) = cells.pop() else {
1982 continue;
1983 };
1984 assert!(
1985 cell.timestamp >= ca_expiry2,
1986 "{} cell timestamp expiry should exceed connected_at's",
1987 cell.qualifier
1988 );
1989 }
1990
1991 client.remove_user(&uaid).await.unwrap();
1992 }
1993}