autoendpoint/extractors/
notification.rs1use crate::error::{ApiError, ApiErrorKind, ApiResult};
2use crate::extractors::{
3 message_id::MessageId, notification_headers::NotificationHeaders, subscription::Subscription,
4};
5use crate::server::AppState;
6use actix_web::{dev::Payload, web, FromRequest, HttpRequest};
7use autopush_common::util::{b64_encode_url, ms_since_epoch, sec_since_epoch};
8use cadence::CountedExt;
9use fernet::MultiFernet;
10use futures::{future, FutureExt};
11use std::collections::HashMap;
12use uuid::Uuid;
13
14#[derive(Clone, Debug)]
16pub struct Notification {
17 pub message_id: String,
19 pub subscription: Subscription,
21 pub headers: NotificationHeaders,
23 pub timestamp: u64,
25 pub sort_key_timestamp: u64,
27 pub data: Option<String>,
29 #[cfg(feature = "reliable_report")]
30 pub reliable_state: Option<autopush_common::reliability::ReliabilityState>,
32 #[cfg(feature = "reliable_report")]
33 pub reliability_id: Option<String>,
34}
35
36impl FromRequest for Notification {
37 type Error = ApiError;
38 type Future = future::LocalBoxFuture<'static, Result<Self, Self::Error>>;
39
40 fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
41 let req = req.clone();
42 let mut payload = payload.take();
43
44 async move {
45 let subscription = Subscription::extract(&req).await?;
46 let app_state = web::Data::<AppState>::extract(&req)
47 .await
48 .expect("No server state found");
49
50 let data = web::Bytes::from_request(&req, &mut payload)
52 .await
53 .map_err(|e| {
54 debug!("▶▶ Request read payload error: {:?}", &e);
55 ApiErrorKind::PayloadError(e)
56 })?;
57
58 let data = if data.is_empty() {
60 None
61 } else {
62 Some(b64_encode_url(&data.to_vec()))
63 };
64
65 let headers = NotificationHeaders::from_request(&req, data.is_some())?;
66 let timestamp = sec_since_epoch();
67 let sort_key_timestamp = ms_since_epoch();
68 let message_id = Self::generate_message_id(
69 &app_state.fernet,
70 subscription.user.uaid,
71 subscription.channel_id,
72 headers.topic.as_deref(),
73 sort_key_timestamp,
74 );
75
76 #[cfg(feature = "reliable_report")]
77 let reliability_id = subscription.reliability_id.clone();
78
79 #[allow(unused_mut)]
80 let mut notif = Notification {
81 message_id,
82 subscription,
83 headers,
84 timestamp,
85 sort_key_timestamp,
86 data,
87 #[cfg(feature = "reliable_report")]
88 reliable_state: None,
89 #[cfg(feature = "reliable_report")]
90 reliability_id,
91 };
92
93 #[cfg(feature = "reliable_report")]
94 notif
96 .record_reliability(
97 &app_state.reliability,
98 autopush_common::reliability::ReliabilityState::Received,
99 )
100 .await;
101
102 if let Some(encoding) = ¬if.headers.encoding {
104 if notif.data.is_some() {
105 app_state
106 .metrics
107 .incr(&format!("updates.notification.encoding.{encoding}"))
108 .ok();
109 }
110 }
111
112 Ok(notif)
113 }
114 .boxed_local()
115 }
116}
117
118impl From<Notification> for autopush_common::notification::Notification {
119 fn from(notification: Notification) -> Self {
120 let topic = notification.headers.topic.clone();
121 let sortkey_timestamp = topic.is_none().then_some(notification.sort_key_timestamp);
122 autopush_common::notification::Notification {
123 channel_id: notification.subscription.channel_id,
124 version: notification.message_id,
125 ttl: notification.headers.ttl as u64,
126 topic,
127 timestamp: notification.timestamp,
128 data: notification.data,
129 sortkey_timestamp,
130 reliability_id: notification.subscription.reliability_id,
131 headers: {
132 let headers: HashMap<String, String> = notification.headers.into();
133 if headers.is_empty() {
134 None
135 } else {
136 Some(headers)
137 }
138 },
139 #[cfg(feature = "reliable_report")]
140 reliable_state: notification.reliable_state,
141 }
142 }
143}
144
145impl Notification {
146 fn generate_message_id(
157 fernet: &MultiFernet,
158 uaid: Uuid,
159 channel_id: Uuid,
160 topic: Option<&str>,
161 timestamp: u64,
162 ) -> String {
163 let message_id = if let Some(topic) = topic {
164 MessageId::WithTopic {
165 uaid,
166 channel_id,
167 topic: topic.to_string(),
168 }
169 } else {
170 MessageId::WithoutTopic {
171 uaid,
172 channel_id,
173 timestamp,
174 }
175 };
176
177 message_id.encrypt(fernet)
178 }
179
180 pub fn has_topic(&self) -> bool {
181 self.headers.topic.is_some()
182 }
183
184 pub fn serialize_for_delivery(&self) -> ApiResult<HashMap<&'static str, serde_json::Value>> {
191 let mut map = HashMap::new();
192
193 map.insert(
194 "channelID",
195 serde_json::to_value(self.subscription.channel_id)?,
196 );
197 map.insert("version", serde_json::to_value(&self.message_id)?);
198 map.insert("ttl", serde_json::to_value(self.headers.ttl)?);
199 map.insert("topic", serde_json::to_value(&self.headers.topic)?);
200 map.insert("timestamp", serde_json::to_value(self.timestamp)?);
201 #[cfg(feature = "reliable_report")]
202 {
203 if let Some(reliability_id) = &self.subscription.reliability_id {
204 map.insert("reliability_id", serde_json::to_value(reliability_id)?);
205 }
206 if let Some(reliable_state) = self.reliable_state {
207 map.insert(
208 "reliable_state",
209 serde_json::to_value(reliable_state.to_string())?,
210 );
211 }
212 }
213 if let Some(data) = &self.data {
214 map.insert("data", serde_json::to_value(data)?);
215
216 let headers: HashMap<_, _> = self.headers.clone().into();
217 map.insert("headers", serde_json::to_value(headers)?);
218 }
219
220 Ok(map)
221 }
222
223 #[cfg(feature = "reliable_report")]
224 pub async fn record_reliability(
225 &mut self,
226 reliability: &autopush_common::reliability::PushReliability,
227 state: autopush_common::reliability::ReliabilityState,
228 ) {
229 self.reliable_state = reliability
230 .record(
231 &self.reliability_id,
232 state,
233 &self.reliable_state,
234 Some(self.timestamp + self.headers.ttl as u64),
235 )
236 .await
237 .inspect_err(|e| {
238 warn!("🔍⚠️ Unable to record reliability state log: {:?}", e);
239 })
240 .unwrap_or(Some(state))
241 }
242}