1use std::collections::HashMap;
14
15use serde_derive::{Deserialize, Serialize};
16use strum_macros::{AsRefStr, Display};
17
18use autopush_common::errors::{ApcErrorKind, Result};
19
20use crate::protocol::BroadcastValue;
21
22type BroadcastKey = u32;
25
26#[derive(Debug, Default)]
28pub struct BroadcastSubs {
29 broadcast_list: Vec<BroadcastKey>, change_count: u32, }
32
33#[derive(Debug)]
35struct BroadcastRegistry {
36 lookup: HashMap<String, BroadcastKey>, table: Vec<String>, }
39
40impl BroadcastRegistry {
41 fn new() -> BroadcastRegistry {
42 BroadcastRegistry {
43 lookup: HashMap::new(),
44 table: Vec::new(),
45 }
46 }
47
48 fn add_broadcast(&mut self, broadcast_id: String) -> BroadcastKey {
51 if let Some(v) = self.lookup.get(&broadcast_id) {
52 return *v;
53 }
54 let i = self.table.len() as u32;
55 self.table.push(broadcast_id.clone());
56 self.lookup.insert(broadcast_id, i);
57 i
58 }
59
60 fn lookup_id(&self, key: BroadcastKey) -> Option<String> {
61 self.table.get(key as usize).cloned()
62 }
63
64 fn lookup_key(&self, broadcast_id: &str) -> Option<BroadcastKey> {
65 self.lookup.get(broadcast_id).cloned()
66 }
67}
68
69#[derive(Debug)]
71struct BroadcastRevision {
72 change_count: u32,
73 broadcast: BroadcastKey,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr, Display)]
77pub enum BroadcastErrorKind {
78 #[strum(serialize = "Broadcast not found")]
79 NotFound,
80}
81
82#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
85pub struct Broadcast {
86 broadcast_id: String,
87 version: String,
88}
89
90impl Broadcast {
91 pub fn error(self) -> Broadcast {
93 Broadcast {
94 broadcast_id: self.broadcast_id,
95 version: BroadcastErrorKind::NotFound.to_string(),
96 }
97 }
98}
99
100impl From<(String, String)> for Broadcast {
102 fn from(val: (String, String)) -> Broadcast {
103 Broadcast {
104 broadcast_id: val.0,
105 version: val.1,
106 }
107 }
108}
109
110impl From<Broadcast> for (String, BroadcastValue) {
111 fn from(bcast: Broadcast) -> (String, BroadcastValue) {
112 (bcast.broadcast_id, BroadcastValue::Value(bcast.version))
113 }
114}
115
116impl Broadcast {
117 pub fn from_hashmap(val: HashMap<String, String>) -> Vec<Broadcast> {
118 val.into_iter().map(|v| v.into()).collect()
119 }
120
121 pub fn vec_into_hashmap(broadcasts: Vec<Broadcast>) -> HashMap<String, BroadcastValue> {
122 broadcasts.into_iter().map(|v| v.into()).collect()
123 }
124}
125
126#[derive(Debug)]
131pub struct BroadcastSubsInit(
132 pub BroadcastSubs, pub Vec<Broadcast>, );
135
136#[derive(Debug)]
139pub struct BroadcastChangeTracker {
140 broadcast_list: Vec<BroadcastRevision>,
141 broadcast_registry: BroadcastRegistry,
142 broadcast_versions: HashMap<BroadcastKey, String>,
143 change_count: u32,
144}
145
146impl BroadcastChangeTracker {
147 pub fn new(broadcasts: Vec<Broadcast>) -> BroadcastChangeTracker {
149 let mut tracker = BroadcastChangeTracker {
150 broadcast_list: Vec::new(),
151 broadcast_registry: BroadcastRegistry::new(),
152 broadcast_versions: HashMap::new(),
153 change_count: 0,
154 };
155 for srv in broadcasts {
156 let key = tracker.broadcast_registry.add_broadcast(srv.broadcast_id);
157 tracker.broadcast_versions.insert(key, srv.version);
158 }
159 tracker
160 }
161
162 pub fn add_broadcasts(&mut self, broadcasts: Vec<Broadcast>) -> Option<u32> {
166 let mut change_count = None;
167 for broadcast in broadcasts {
168 change_count.replace(self.add_broadcast(broadcast));
169 }
170 change_count
171 }
172
173 pub fn add_broadcast(&mut self, broadcast: Broadcast) -> u32 {
176 if let Ok(change_count) = self.update_broadcast(broadcast.clone()) {
177 trace!("📢 returning change count {}", &change_count);
178 return change_count;
179 }
180 self.change_count += 1;
181 let key = self
182 .broadcast_registry
183 .add_broadcast(broadcast.broadcast_id);
184 self.broadcast_versions.insert(key, broadcast.version);
185 self.broadcast_list.push(BroadcastRevision {
186 change_count: self.change_count,
187 broadcast: key,
188 });
189 self.change_count
190 }
191
192 pub fn update_broadcast(&mut self, broadcast: Broadcast) -> Result<u32> {
196 let b_id = broadcast.broadcast_id.clone();
197 let old_count = self.change_count;
198 let key = self
199 .broadcast_registry
200 .lookup_key(&broadcast.broadcast_id)
201 .ok_or(ApcErrorKind::BroadcastError(
202 BroadcastErrorKind::NotFound.to_string(),
203 ))?;
204
205 if let Some(ver) = self.broadcast_versions.get_mut(&key) {
206 if *ver == broadcast.version {
207 return Ok(self.change_count);
208 }
209 *ver = broadcast.version;
210 } else {
211 trace!("📢 Not found: {b_id}");
212 return Err(
213 ApcErrorKind::BroadcastError(BroadcastErrorKind::NotFound.to_string()).into(),
214 );
215 }
216
217 trace!("📢 New version of {b_id}");
218 let bcast_index = self
220 .broadcast_list
221 .iter()
222 .enumerate()
223 .filter_map(|(i, bcast)| (bcast.broadcast == key).then_some(i))
224 .next();
225 self.change_count += 1;
226 if let Some(bcast_index) = bcast_index {
227 trace!("📢 {} index: {}", &b_id, &bcast_index);
228 let mut bcast = self.broadcast_list.remove(bcast_index);
229 bcast.change_count = self.change_count;
230 self.broadcast_list.push(bcast);
231 } else {
232 trace!("📢 adding broadcast list for {}", &b_id);
233 self.broadcast_list.push(BroadcastRevision {
234 change_count: self.change_count,
235 broadcast: key,
236 })
237 }
238 if old_count != self.change_count {
239 trace!("📢 New Change available");
240 }
241 Ok(self.change_count)
242 }
243
244 pub fn change_count_delta(&self, client_set: &mut BroadcastSubs) -> Option<Vec<Broadcast>> {
246 if self.change_count <= client_set.change_count {
247 return None;
248 }
249 let mut bcast_delta = Vec::new();
250 for bcast in self.broadcast_list.iter().rev() {
251 if bcast.change_count <= client_set.change_count {
252 break;
253 }
254 if !client_set.broadcast_list.contains(&bcast.broadcast) {
255 continue;
256 }
257 if let Some(ver) = self.broadcast_versions.get(&bcast.broadcast) {
258 if let Some(bcast_id) = self.broadcast_registry.lookup_id(bcast.broadcast) {
259 bcast_delta.push(Broadcast {
260 broadcast_id: bcast_id,
261 version: (*ver).clone(),
262 });
263 }
264 }
265 }
266 client_set.change_count = self.change_count;
267 (!bcast_delta.is_empty()).then_some(bcast_delta)
268 }
269
270 pub fn broadcast_delta(&self, broadcasts: &[Broadcast]) -> BroadcastSubsInit {
273 let mut bcast_list = Vec::new();
274 let mut bcast_delta = Vec::new();
275 for bcast in broadcasts.iter() {
276 if let Some(bcast_key) = self.broadcast_registry.lookup_key(&bcast.broadcast_id) {
277 if let Some(ver) = self.broadcast_versions.get(&bcast_key) {
278 if *ver != bcast.version {
279 bcast_delta.push(Broadcast {
280 broadcast_id: bcast.broadcast_id.clone(),
281 version: (*ver).clone(),
282 });
283 }
284 }
285 bcast_list.push(bcast_key);
286 }
287 }
288 BroadcastSubsInit(
289 BroadcastSubs {
290 broadcast_list: bcast_list,
291 change_count: self.change_count,
292 },
293 bcast_delta,
294 )
295 }
296
297 pub fn subscribe_to_broadcasts(
301 &self,
302 broadcast_subs: &mut BroadcastSubs,
303 broadcasts: &[Broadcast],
304 ) -> Option<Vec<Broadcast>> {
305 let mut bcast_delta = self.change_count_delta(broadcast_subs).unwrap_or_default();
306 for bcast in broadcasts.iter() {
307 if let Some(bcast_key) = self.broadcast_registry.lookup_key(&bcast.broadcast_id) {
308 if let Some(ver) = self.broadcast_versions.get(&bcast_key) {
309 if *ver != bcast.version {
310 bcast_delta.push(Broadcast {
311 broadcast_id: bcast.broadcast_id.clone(),
312 version: (*ver).clone(),
313 });
314 }
315 }
316 broadcast_subs.broadcast_list.push(bcast_key)
317 }
318 }
319 (!bcast_delta.is_empty()).then_some(bcast_delta)
320 }
321
322 pub fn missing_broadcasts(&self, broadcasts: &[Broadcast]) -> Vec<Broadcast> {
324 broadcasts
325 .iter()
326 .filter(|&b| {
327 self.broadcast_registry
328 .lookup_key(&b.broadcast_id)
329 .is_none()
330 })
331 .map(|b| b.clone().error())
332 .collect()
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 fn make_broadcast_base() -> Vec<Broadcast> {
341 vec![
342 Broadcast {
343 broadcast_id: String::from("bcasta"),
344 version: String::from("rev1"),
345 },
346 Broadcast {
347 broadcast_id: String::from("bcastb"),
348 version: String::from("revalha"),
349 },
350 ]
351 }
352
353 #[test]
354 fn test_broadcast_change_tracker() {
355 let broadcasts = make_broadcast_base();
356 let desired_broadcasts = broadcasts.clone();
357 let mut tracker = BroadcastChangeTracker::new(broadcasts);
358 let BroadcastSubsInit(mut broadcast_subs, delta) =
359 tracker.broadcast_delta(&desired_broadcasts);
360 assert_eq!(delta.len(), 0);
361 assert_eq!(broadcast_subs.change_count, 0);
362 assert_eq!(broadcast_subs.broadcast_list.len(), 2);
363
364 tracker
365 .update_broadcast(Broadcast {
366 broadcast_id: String::from("bcasta"),
367 version: String::from("rev2"),
368 })
369 .ok();
370 let delta = tracker.change_count_delta(&mut broadcast_subs);
371 assert!(delta.is_some());
372 let delta = delta.unwrap();
373 assert_eq!(delta.len(), 1);
374 }
375
376 #[test]
377 fn test_broadcast_change_handles_new_broadcasts() {
378 let broadcasts = make_broadcast_base();
379 let desired_broadcasts = broadcasts.clone();
380 let mut tracker = BroadcastChangeTracker::new(broadcasts);
381 let BroadcastSubsInit(mut broadcast_subs, _) = tracker.broadcast_delta(&desired_broadcasts);
382
383 tracker.add_broadcast(Broadcast {
384 broadcast_id: String::from("bcastc"),
385 version: String::from("revmega"),
386 });
387 let delta = tracker.change_count_delta(&mut broadcast_subs);
388 assert!(delta.is_none());
389
390 let delta = tracker
391 .subscribe_to_broadcasts(
392 &mut broadcast_subs,
393 &[Broadcast {
394 broadcast_id: String::from("bcastc"),
395 version: String::from("revision_alpha"),
396 }],
397 )
398 .unwrap();
399 assert_eq!(delta.len(), 1);
400 assert_eq!(delta[0].version, String::from("revmega"));
401 assert_eq!(broadcast_subs.change_count, 1);
402 assert_eq!(tracker.broadcast_list.len(), 1);
403 }
404}