1use std::collections::HashSet;
2use std::fmt;
3use std::fmt::Display;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::SystemTime;
7
8use async_trait::async_trait;
9use cadence::{CountedExt, StatsdClient};
10use deadpool_redis::Config;
11use deadpool_redis::redis::{AsyncCommands, SetExpiry, SetOptions, pipe};
12use uuid::Uuid;
13
14use crate::db::redis::StorableNotification;
15use crate::db::{
16 DbSettings, Notification, User,
17 client::{DbClient, FetchMessageResponse},
18 error::{DbError, DbResult},
19};
20use crate::util::{ms_since_epoch, sec_since_epoch};
21
22use super::RedisDbSettings;
23
24fn now_secs() -> u64 {
25 SystemTime::now()
27 .duration_since(SystemTime::UNIX_EPOCH)
28 .unwrap()
29 .as_secs()
30}
31
32struct Uaid<'a>(&'a Uuid);
34
35impl<'a> Display for Uaid<'a> {
36 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37 write!(f, "{}", self.0.as_hyphenated())
38 }
39}
40
41impl<'a> From<Uaid<'a>> for String {
42 fn from(uaid: Uaid) -> String {
43 uaid.0.as_hyphenated().to_string()
44 }
45}
46
47struct ChannelId<'a>(&'a Uuid);
48
49impl<'a> Display for ChannelId<'a> {
50 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51 write!(f, "{}", self.0.as_hyphenated())
52 }
53}
54
55impl<'a> From<ChannelId<'a>> for String {
56 fn from(chid: ChannelId) -> String {
57 chid.0.as_hyphenated().to_string()
58 }
59}
60
61#[derive(Clone)]
62pub struct RedisClientImpl {
64 pub pool: deadpool_redis::Pool,
66 metrics: Arc<StatsdClient>,
68 router_opts: SetOptions,
69 notification_opts: SetOptions,
71}
72
73impl RedisClientImpl {
74 pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
75 debug!("🐰 New redis client");
76 let dsn = settings.dsn.clone().ok_or(DbError::General(
77 "Redis DSN not configured. Set `db_dsn` to `redis://HOST:PORT` in settings.".to_owned(),
78 ))?;
79 let db_settings = RedisDbSettings::try_from(settings.db_settings.as_ref())?;
80 info!("🐰 {:#?}", db_settings);
81 let router_ttl_secs = db_settings.router_ttl.unwrap_or_default().as_secs();
82 let notification_ttl_secs = db_settings.notification_ttl.unwrap_or_default().as_secs();
83
84 let config = Config::from_url(dsn);
85 let pool = config
86 .builder()
87 .map_err(|e| DbError::General(format!("Could not create Redis pool: {:?}", e)))?
88 .create_timeout(db_settings.create_timeout)
89 .runtime(deadpool_redis::Runtime::Tokio1)
90 .build()
91 .map_err(|e| DbError::General(format!("Could not create Redis pool: {:?}", e)))?;
92 Ok(Self {
98 pool,
99 metrics,
100 router_opts: SetOptions::default().with_expiration(SetExpiry::EX(router_ttl_secs)),
101 notification_opts: SetOptions::default()
102 .with_expiration(SetExpiry::EX(notification_ttl_secs)),
103 })
104 }
105
106 async fn connection(&self) -> DbResult<deadpool_redis::Connection> {
111 self.pool.get().await.map_err(|e| {
112 DbError::RedisError(redis::RedisError::from((
113 redis::ErrorKind::IoError,
114 "Could not get Redis connection from pool",
115 format!("{:?}", e),
116 )))
117 })
118 }
119
120 fn user_key(&self, uaid: &Uaid) -> String {
121 format!("autopush/user/{}", uaid)
122 }
123
124 fn last_co_key(&self, uaid: &Uaid) -> String {
126 format!("autopush/co/{}", uaid)
127 }
128
129 fn storage_timestamp_key(&self, uaid: &Uaid) -> String {
131 format!("autopush/timestamp/{}", uaid)
132 }
133
134 fn channel_list_key(&self, uaid: &Uaid) -> String {
135 format!("autopush/channels/{}", uaid)
136 }
137
138 fn message_list_key(&self, uaid: &Uaid) -> String {
139 format!("autopush/msgs/{}", uaid)
140 }
141
142 fn message_exp_list_key(&self, uaid: &Uaid) -> String {
143 format!("autopush/msgs_exp/{}", uaid)
144 }
145
146 fn message_key(&self, uaid: &Uaid, chidmessageid: &str) -> String {
147 format!("autopush/msg/{}/{}", uaid, chidmessageid)
148 }
149
150 #[cfg(feature = "reliable_report")]
151 fn reliability_key(
152 &self,
153 reliability_id: &str,
154 state: &crate::reliability::ReliabilityState,
155 ) -> String {
156 format!("autopush/reliability/{}/{}", reliability_id, state)
157 }
158
159 #[cfg(test)]
160 async fn fetch_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<Option<String>> {
162 let message_key = self.message_key(&Uaid(uaid), chidmessageid);
163 let mut con = self.connection().await?;
164 debug!("🐰 Fetching message from {}", &message_key);
165 let message = con.get::<String, Option<String>>(message_key).await?;
166 Ok(message)
167 }
168}
169
170#[async_trait]
171impl DbClient for RedisClientImpl {
172 async fn add_user(&self, user: &User) -> DbResult<()> {
174 let uaid = Uaid(&user.uaid);
175 let user_key = self.user_key(&uaid);
176 let mut con = self.connection().await?;
177 let co_key = self.last_co_key(&uaid);
178 trace!("🐰 Adding user {} at {}:{}", &user.uaid, &user_key, &co_key);
179 trace!("🐰 Logged at {}", &user.connected_at);
180 pipe()
181 .set_options(co_key, ms_since_epoch(), self.router_opts)
182 .set_options(user_key, serde_json::to_string(user)?, self.router_opts)
183 .exec_async(&mut con)
184 .await?;
185 Ok(())
186 }
187
188 async fn update_user(&self, user: &mut User) -> DbResult<bool> {
200 trace!("🐰 Updating user");
201 let mut con = self.connection().await?;
202 let co_key = self.last_co_key(&Uaid(&user.uaid));
203 let last_co: Option<u64> = con.get(&co_key).await?;
204 if last_co.is_some_and(|c| c < user.connected_at) {
205 trace!(
206 "🐰 Was connected at {}, now at {}",
207 last_co.unwrap(),
208 &user.connected_at
209 );
210 self.add_user(user).await?;
211 Ok(true)
212 } else {
213 Ok(false)
214 }
215 }
216
217 async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
218 let mut con = self.connection().await?;
219 let user_key = self.user_key(&Uaid(uaid));
220 let user: Option<User> = con
221 .get::<&str, Option<String>>(&user_key)
222 .await?
223 .and_then(|s| serde_json::from_str(s.as_ref()).ok());
224 if user.is_some() {
225 trace!("🐰 Found a record for {}", &uaid);
226 }
227 Ok(user)
228 }
229
230 async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
231 let uaid = Uaid(uaid);
232 let mut con = self.connection().await?;
233 let user_key = self.user_key(&uaid);
234 let co_key = self.last_co_key(&uaid);
235 let chan_list_key = self.channel_list_key(&uaid);
236 let msg_list_key = self.message_list_key(&uaid);
237 let exp_list_key = self.message_exp_list_key(&uaid);
238 let timestamp_key = self.storage_timestamp_key(&uaid);
239 pipe()
240 .del(&user_key)
241 .del(&co_key)
242 .del(&chan_list_key)
243 .del(&msg_list_key)
244 .del(&exp_list_key)
245 .del(×tamp_key)
246 .exec_async(&mut con)
247 .await?;
248 Ok(())
249 }
250
251 async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
252 let uaid = Uaid(uaid);
253 let mut con = self.connection().await?;
254 let co_key = self.last_co_key(&uaid);
255 let chan_list_key = self.channel_list_key(&uaid);
256
257 let _: () = pipe()
258 .rpush(chan_list_key, channel_id.as_hyphenated().to_string())
259 .set_options(co_key, ms_since_epoch(), self.router_opts)
260 .exec_async(&mut con)
261 .await?;
262 Ok(())
263 }
264
265 async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
267 let uaid = Uaid(uaid);
268 let mut con = self.connection().await?;
270 let co_key = self.last_co_key(&uaid);
271 let chan_list_key = self.channel_list_key(&uaid);
272 pipe()
273 .set_options(co_key, ms_since_epoch(), self.router_opts)
274 .rpush(
275 chan_list_key,
276 channels
277 .into_iter()
278 .map(|c| c.as_hyphenated().to_string())
279 .collect::<Vec<String>>(),
280 )
281 .exec_async(&mut con)
282 .await?;
283 Ok(())
284 }
285
286 async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
287 let uaid = Uaid(uaid);
288 let mut con = self.connection().await?;
289 let chan_list_key = self.channel_list_key(&uaid);
290 let channels: HashSet<Uuid> = con
291 .lrange::<&str, HashSet<String>>(&chan_list_key, 0, -1)
292 .await?
293 .into_iter()
294 .filter_map(|s| Uuid::from_str(&s).ok())
295 .collect();
296 trace!("🐰 Found {} channels for {}", channels.len(), &uaid);
297 Ok(channels)
298 }
299
300 async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
302 let uaid = Uaid(uaid);
303 let channel_id = ChannelId(channel_id);
304 let mut con = self.connection().await?;
305 let co_key = self.last_co_key(&uaid);
306 let chan_list_key = self.channel_list_key(&uaid);
307 trace!("🐰 Removing channel {}", channel_id);
309 let (status,): (bool,) = pipe()
310 .set_options(co_key, ms_since_epoch(), self.router_opts)
311 .ignore()
312 .lrem(&chan_list_key, 1, channel_id.to_string())
313 .query_async(&mut con)
314 .await?;
315 Ok(status)
316 }
317
318 async fn remove_node_id(
320 &self,
321 uaid: &Uuid,
322 _node_id: &str,
323 _connected_at: u64,
324 _version: &Option<Uuid>,
325 ) -> DbResult<bool> {
326 if let Some(mut user) = self.get_user(uaid).await? {
327 user.node_id = None;
328 self.update_user(&mut user).await?;
329 }
330 Ok(true)
331 }
332
333 async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
337 let uaid = Uaid(uaid);
338 let mut con = self.connection().await?;
339 let msg_list_key = self.message_list_key(&uaid);
340 let exp_list_key = self.message_exp_list_key(&uaid);
341 let msg_id = &message.chidmessageid();
342 let msg_key = self.message_key(&uaid, msg_id);
343 let storable: StorableNotification = message.into();
344
345 debug!("🐰 Saving message {} :: {:?}", &msg_key, &storable);
346 trace!(
347 "🐰 timestamp: {:?}",
348 &storable.timestamp.to_be_bytes().to_vec()
349 );
350
351 let expiry = now_secs() + storable.ttl;
354 trace!("🐰 Message Expiry {}, currently:{} ", expiry, now_secs());
355
356 let mut pipe = pipe();
357
358 let is_topic = storable.topic.is_some();
363
364 let notif_opts = self
367 .notification_opts
368 .with_expiration(SetExpiry::EXAT(expiry));
369
370 debug!("🐰 Saving to {}", &msg_key);
373 pipe.set_options(msg_key, serde_json::to_string(&storable)?, notif_opts)
374 .zadd(&exp_list_key, msg_id, expiry)
377 .zadd(&msg_list_key, msg_id, sec_since_epoch());
378
379 let _: () = pipe.exec_async(&mut con).await?;
380 self.metrics
381 .incr_with_tags("notification.message.stored")
382 .with_tag("topic", &is_topic.to_string())
383 .with_tag("database", &self.name())
384 .send();
385 Ok(())
386 }
387
388 async fn save_messages(&self, uaid: &Uuid, messages: Vec<Notification>) -> DbResult<()> {
393 for message in messages {
395 self.save_message(uaid, message).await?;
396 }
397 Ok(())
398 }
399
400 async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
402 let uaid = Uaid(uaid);
403 debug!("🐰🔥 Incrementing storage to {}", timestamp);
404 let msg_list_key = self.message_list_key(&uaid);
405 let exp_list_key = self.message_exp_list_key(&uaid);
406 let storage_timestamp_key = self.storage_timestamp_key(&uaid);
407 let mut con = self.connection().await?;
408 trace!("🐇 SEARCH: increment: {:?} - {}", &exp_list_key, timestamp);
409 let exp_id_list: Vec<String> = con.zrangebyscore(&exp_list_key, 0, timestamp).await?;
410 if !exp_id_list.is_empty() {
411 let delete_msg_keys: Vec<String> = exp_id_list
414 .clone()
415 .into_iter()
416 .map(|msg_id| self.message_key(&uaid, &msg_id))
417 .collect();
418
419 trace!(
420 "🐰🔥:rem: Deleting {} : [{:?}]",
421 msg_list_key, &delete_msg_keys
422 );
423 trace!("🐰🔥:rem: Deleting {} : [{:?}]", exp_list_key, &exp_id_list);
424 pipe()
425 .set_options::<_, _>(&storage_timestamp_key, timestamp, self.router_opts)
426 .del(&delete_msg_keys)
427 .zrem(&msg_list_key, &exp_id_list)
428 .zrem(&exp_list_key, &exp_id_list)
429 .exec_async(&mut con)
430 .await?;
431 } else {
432 con.set_options::<_, _, ()>(&storage_timestamp_key, timestamp, self.router_opts)
433 .await?;
434 }
435 Ok(())
436 }
437
438 async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> {
440 let uaid = Uaid(uaid);
441 trace!(
442 "🐰 attemping to delete {:?} :: {:?}",
443 uaid.to_string(),
444 chidmessageid
445 );
446 let msg_key = self.message_key(&uaid, chidmessageid);
447 let msg_list_key = self.message_list_key(&uaid);
448 let exp_list_key = self.message_exp_list_key(&uaid);
449 debug!("🐰🔥 Deleting message {}", &msg_key);
450 let mut con = self.connection().await?;
451 trace!(
454 "🐰🔥:remsg: Deleting {} : {:?}",
455 msg_list_key, &chidmessageid
456 );
457 trace!(
458 "🐰🔥:remsg: Deleting {} : {:?}",
459 exp_list_key, &chidmessageid
460 );
461 pipe()
462 .del(&msg_key)
463 .zrem(&msg_list_key, chidmessageid)
464 .zrem(&exp_list_key, chidmessageid)
465 .exec_async(&mut con)
466 .await?;
467 self.metrics
468 .incr_with_tags("notification.message.deleted")
469 .with_tag("database", &self.name())
470 .send();
471 Ok(())
472 }
473
474 async fn fetch_topic_messages(
476 &self,
477 _uaid: &Uuid,
478 _limit: usize,
479 ) -> DbResult<FetchMessageResponse> {
480 Ok(FetchMessageResponse {
481 messages: vec![],
482 timestamp: None,
483 })
484 }
485
486 async fn fetch_timestamp_messages(
493 &self,
494 uaid: &Uuid,
495 timestamp: Option<u64>,
496 limit: usize,
497 ) -> DbResult<FetchMessageResponse> {
498 let uaid = Uaid(uaid);
499 trace!("🐰 Fetching {} messages since {:?}", limit, timestamp);
500 let mut con = self.connection().await?;
501 let msg_list_key = self.message_list_key(&uaid);
502 let timestamp = if let Some(timestamp) = timestamp {
503 timestamp
504 } else {
505 let storage_timestamp_key = self.storage_timestamp_key(&uaid);
506 con.get(&storage_timestamp_key).await.unwrap_or(0)
507 };
508 trace!(
510 "🐇 SEARCH: zrangebyscore {:?} {} +inf withscores limit 0 {:?}",
511 &msg_list_key, timestamp, limit,
512 );
513 let results = con
514 .zrangebyscore_limit_withscores::<&str, &str, &str, Vec<(String, u64)>>(
515 &msg_list_key,
516 ×tamp.to_string(),
517 "+inf",
518 0,
519 limit as isize,
520 )
521 .await?;
522 let (messages_id, mut scores): (Vec<String>, Vec<u64>) = results
523 .into_iter()
524 .map(|(id, s): (String, u64)| (self.message_key(&uaid, &id), s))
525 .unzip();
526 if messages_id.is_empty() {
527 trace!("🐰 No message found");
528 return Ok(FetchMessageResponse {
529 messages: vec![],
530 timestamp: None,
531 });
532 }
533 let messages: Vec<Notification> = con
534 .mget::<&Vec<String>, Vec<Option<String>>>(&messages_id)
535 .await?
536 .into_iter()
537 .filter_map(|opt: Option<String>| {
538 if let Some(m) = opt {
539 serde_json::from_str(&m)
540 .inspect_err(|e| {
541 error!("🐰 ERROR parsing entry: {:?}", e);
547 })
548 .ok()
549 } else {
550 None
551 }
552 })
553 .collect();
554 if messages.is_empty() {
555 trace!("🐰 No Valid messages found");
556 return Ok(FetchMessageResponse {
557 timestamp: None,
558 messages: vec![],
559 });
560 }
561 let timestamp = scores.pop();
562 trace!("🐰 Found {} messages until {:?}", messages.len(), timestamp);
563 Ok(FetchMessageResponse {
564 messages,
565 timestamp,
566 })
567 }
568
569 #[cfg(feature = "reliable_report")]
570 async fn log_report(
571 &self,
572 reliability_id: &str,
573 state: crate::reliability::ReliabilityState,
574 ) -> DbResult<()> {
575 use crate::MAX_NOTIFICATION_TTL_SECS;
576
577 trace!("🐰 Logging reliability report");
578 let mut con = self.connection().await?;
579 let reliability_key = self.reliability_key(reliability_id, &state);
581 let expiry = MAX_NOTIFICATION_TTL_SECS;
583 let opts = SetOptions::default().with_expiration(SetExpiry::EX(expiry));
584 let mut pipe = pipe();
585 pipe.set_options(reliability_key, sec_since_epoch(), opts)
586 .exec_async(&mut con)
587 .await?;
588 Ok(())
589 }
590
591 async fn health_check(&self) -> DbResult<bool> {
592 let _: () = self.connection().await?.ping().await?;
593 Ok(true)
594 }
595
596 async fn router_table_exists(&self) -> DbResult<bool> {
598 Ok(true)
599 }
600
601 async fn message_table_exists(&self) -> DbResult<bool> {
603 Ok(true)
604 }
605
606 fn box_clone(&self) -> Box<dyn DbClient> {
607 Box::new(self.clone())
608 }
609
610 fn name(&self) -> String {
611 "Redis".to_owned()
612 }
613
614 fn pool_status(&self) -> Option<deadpool::Status> {
615 None
616 }
617}
618
619#[cfg(test)]
620mod tests {
621 use crate::{logging::init_test_logging, util::ms_since_epoch};
622 use rand::prelude::*;
623 use std::env;
624
625 use super::*;
626 const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
627 const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB";
628
629 fn new_client() -> DbResult<RedisClientImpl> {
630 let host = env::var("REDIS_HOST").unwrap_or("localhost".into());
632 let env_dsn = format!("redis://{host}");
633 debug!("🐰 Connecting to {env_dsn}");
634 let settings = DbSettings {
635 dsn: Some(env_dsn),
636 db_settings: "".into(),
637 };
638 let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
639 RedisClientImpl::new(metrics, &settings)
640 }
641
642 fn gen_test_user() -> String {
643 let mut rng = rand::rng();
645 let test_num = rng.random::<u8>();
646 format!("DEADBEEF-0000-0000-{:04}-{:012}", test_num, now_secs())
647 }
648
649 #[actix_rt::test]
650 async fn health_check() {
651 let client = new_client().unwrap();
652
653 let result = client.health_check().await;
654 assert!(result.is_ok());
655 assert!(result.unwrap());
656 }
657
658 #[actix_rt::test]
660 async fn wipe_expired() -> DbResult<()> {
661 init_test_logging();
662 let client = new_client()?;
663
664 let connected_at = ms_since_epoch();
665
666 let uaid = Uuid::parse_str(&gen_test_user()).unwrap();
667 let chid = Uuid::parse_str(TEST_CHID).unwrap();
668
669 let node_id = "test_node".to_owned();
670
671 let _ = client.remove_user(&uaid).await;
673
674 let test_user = User {
675 uaid,
676 router_type: "webpush".to_owned(),
677 connected_at,
678 router_data: None,
679 node_id: Some(node_id.clone()),
680 ..Default::default()
681 };
682
683 let _ = client.remove_user(&uaid).await;
686
687 let timestamp = now_secs();
689 client.add_user(&test_user).await?;
690 let test_notification = crate::db::Notification {
691 channel_id: chid,
692 version: "test".to_owned(),
693 ttl: 1,
694 timestamp,
695 data: Some("Encrypted".into()),
696 sortkey_timestamp: Some(timestamp),
697 ..Default::default()
698 };
699 client.save_message(&uaid, test_notification).await?;
700 client.increment_storage(&uaid, timestamp + 1).await?;
701 let msgs = client.fetch_timestamp_messages(&uaid, None, 999).await?;
702 assert_eq!(msgs.messages.len(), 0);
703 assert!(client.remove_user(&uaid).await.is_ok());
704 Ok(())
705 }
706
707 #[actix_rt::test]
710 async fn run_gauntlet() -> DbResult<()> {
711 init_test_logging();
712 let client = new_client()?;
713
714 let connected_at = ms_since_epoch();
715
716 let user_id = &gen_test_user();
717 let uaid = Uuid::parse_str(user_id).unwrap();
718 let chid = Uuid::parse_str(TEST_CHID).unwrap();
719 let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap();
720
721 let node_id = "test_node".to_owned();
722
723 let _ = client.remove_user(&uaid).await;
725
726 let test_user = User {
727 uaid,
728 router_type: "webpush".to_owned(),
729 connected_at,
730 router_data: None,
731 node_id: Some(node_id.clone()),
732 ..Default::default()
733 };
734
735 let _ = client.remove_user(&uaid).await;
738
739 client.add_user(&test_user).await?;
741 let fetched = client.get_user(&uaid).await?;
742 assert!(fetched.is_some());
743 let fetched = fetched.unwrap();
744 assert_eq!(fetched.router_type, "webpush".to_owned());
745
746 let connected_at = ms_since_epoch();
748
749 client.add_channel(&uaid, &chid).await?;
751 let channels = client.get_channels(&uaid).await?;
752 assert!(channels.contains(&chid));
753
754 let mut new_channels: HashSet<Uuid> = HashSet::new();
756 new_channels.insert(chid);
757 for _ in 1..10 {
758 new_channels.insert(uuid::Uuid::new_v4());
759 }
760 let chid_to_remove = uuid::Uuid::new_v4();
761 new_channels.insert(chid_to_remove);
762 client.add_channels(&uaid, new_channels.clone()).await?;
763 let channels = client.get_channels(&uaid).await?;
764 assert_eq!(channels, new_channels);
765
766 assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
768 assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
769 new_channels.remove(&chid_to_remove);
770 let channels = client.get_channels(&uaid).await?;
771 assert_eq!(channels, new_channels);
772
773 let mut updated = User {
777 connected_at,
778 ..test_user.clone()
779 };
780 let result = client.update_user(&mut updated).await;
781 assert!(result.is_ok());
782 assert!(!result.unwrap());
783
784 let fetched2 = client.get_user(&fetched.uaid).await?.unwrap();
786 assert_eq!(fetched.connected_at, fetched2.connected_at);
787
788 let mut updated = User {
790 connected_at: fetched.connected_at + 300,
791 ..fetched2
792 };
793 let result = client.update_user(&mut updated).await;
794 assert!(result.is_ok());
795 assert!(result.unwrap());
796 assert_ne!(
797 fetched2.connected_at,
798 client.get_user(&uaid).await?.unwrap().connected_at
799 );
800
801 client
803 .increment_storage(
804 &fetched.uaid,
805 SystemTime::now()
806 .duration_since(SystemTime::UNIX_EPOCH)
807 .unwrap()
808 .as_secs(),
809 )
810 .await?;
811
812 let test_data = "An_encrypted_pile_of_crap".to_owned();
813 let timestamp = now_secs();
814 let sort_key = now_secs();
815 let fetch_timestamp = timestamp;
816 let test_notification = crate::db::Notification {
818 channel_id: chid,
819 version: "test".to_owned(),
820 ttl: 300,
821 timestamp,
822 data: Some(test_data.clone()),
823 sortkey_timestamp: Some(sort_key),
824 ..Default::default()
825 };
826 let res = client.save_message(&uaid, test_notification.clone()).await;
827 assert!(res.is_ok());
828
829 let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
830 assert_ne!(fetched.messages.len(), 0);
831 let fm = fetched.messages.pop().unwrap();
832 assert_eq!(fm.channel_id, test_notification.channel_id);
833 assert_eq!(fm.data, Some(test_data));
834
835 let fetched = client
837 .fetch_timestamp_messages(&uaid, Some(fetch_timestamp - 10), 999)
838 .await?;
839 assert_ne!(fetched.messages.len(), 0);
840
841 let fetched = client
843 .fetch_timestamp_messages(&uaid, Some(fetch_timestamp + 10), 999)
844 .await?;
845 assert_eq!(fetched.messages.len(), 0);
846
847 assert!(
849 client
850 .remove_message(&uaid, &test_notification.chidmessageid())
851 .await
852 .is_ok()
853 );
854
855 assert!(client.remove_channel(&uaid, &chid).await.is_ok());
856
857 let msgs = client
858 .fetch_timestamp_messages(&uaid, None, 999)
859 .await?
860 .messages;
861 assert!(msgs.is_empty());
862
863 client.add_channel(&uaid, &topic_chid).await?;
867 let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
868 let timestamp = now_secs();
869 let sort_key = now_secs();
870
871 let test_notification_0 = crate::db::Notification {
873 channel_id: topic_chid,
874 version: "version0".to_owned(),
875 ttl: 300,
876 topic: Some("topic".to_owned()),
877 timestamp,
878 data: Some(test_data.clone()),
879 sortkey_timestamp: Some(sort_key),
880 ..Default::default()
881 };
882 assert!(
883 client
884 .save_message(&uaid, test_notification_0.clone())
885 .await
886 .is_ok()
887 );
888
889 let test_notification = crate::db::Notification {
890 timestamp: now_secs(),
891 version: "version1".to_owned(),
892 sortkey_timestamp: Some(sort_key + 10),
893 ..test_notification_0
894 };
895
896 assert!(
897 client
898 .save_message(&uaid, test_notification.clone())
899 .await
900 .is_ok()
901 );
902
903 let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
904 assert_eq!(fetched.messages.len(), 1);
905 let fm = fetched.messages.pop().unwrap();
906 assert_eq!(fm.channel_id, test_notification.channel_id);
907 assert_eq!(fm.data, Some(test_data));
908
909 let fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?;
911 assert_ne!(fetched.messages.len(), 0);
912
913 assert!(
915 client
916 .remove_message(&uaid, &test_notification.chidmessageid())
917 .await
918 .is_ok()
919 );
920
921 assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok());
922
923 let msgs = client
924 .fetch_timestamp_messages(&uaid, None, 999)
925 .await?
926 .messages;
927 assert!(msgs.is_empty());
928
929 let fetched = client.get_user(&uaid).await?.unwrap();
930 assert!(
931 client
932 .remove_node_id(&uaid, &node_id, connected_at, &fetched.version)
933 .await
934 .is_ok()
935 );
936 let fetched = client.get_user(&uaid).await?.unwrap();
938 assert_eq!(fetched.node_id, None);
939
940 assert!(client.remove_user(&uaid).await.is_ok());
941
942 assert!(client.get_user(&uaid).await?.is_none());
943 Ok(())
944 }
945
946 #[actix_rt::test]
947 async fn test_expiry() -> DbResult<()> {
948 init_test_logging();
950 let client = new_client()?;
951
952 let uaid = Uuid::parse_str(&gen_test_user()).unwrap();
953 let chid = Uuid::parse_str(TEST_CHID).unwrap();
954 let now = now_secs();
955
956 let test_notification = crate::db::Notification {
957 channel_id: chid,
958 version: "test".to_owned(),
959 ttl: 2,
960 timestamp: now,
961 data: Some("SomeData".into()),
962 sortkey_timestamp: Some(now),
963 ..Default::default()
964 };
965 debug!("Writing test notif");
966 let res = client.save_message(&uaid, test_notification.clone()).await;
967 assert!(res.is_ok());
968 let key = client.message_key(&Uaid(&uaid), &test_notification.chidmessageid());
969 debug!("Checking {}...", &key);
970 let msg = client
971 .fetch_message(&uaid, &test_notification.chidmessageid())
972 .await?;
973 assert!(!msg.unwrap().is_empty());
974 debug!("Purging...");
975 client.increment_storage(&uaid, now + 2).await?;
976 debug!("Checking {}...", &key);
977 let cc = client
978 .fetch_message(&uaid, &test_notification.chidmessageid())
979 .await;
980 assert_eq!(cc.unwrap(), None);
981 assert!(client.remove_user(&uaid).await.is_ok());
983 Ok(())
984 }
985}