autoconnect_common/
broadcast.rs

1/// Megaphone is the system that allows us to inform clients that values have changed and need to be
2/// updated. These values are external to WebPush, but we can use the persistent, long lived WebPush
3/// connection that Autopush provides in order to send those change updates. We use the WebPush `Ping`
4/// in order to send Broadcast messages that contain these updates.
5///
6/// The remote Megaphone API server and the associated server access token are defined in the settings.
7///
8/// The Client has a list of `BroadcastKey` values that it monitors. The external system uses a set of Strings.
9/// We map the String to a `BroadcastKey` value.
10///
11/// see api discussion: https://docs.google.com/document/d/1Wxqf1a4HDkKgHDIswPmhmdvk8KPoMEh2q6SPhaz4LNE/edit#
12///
13use std::collections::HashMap;
14
15use serde_derive::{Deserialize, Serialize};
16use strum_macros::{AsRefStr, Display};
17
18use autopush_common::errors::{ApcErrorKind, Result};
19
20use crate::protocol::BroadcastValue;
21
22/// A Broadcast entry Key in a BroadcastRegistry
23/// This is the way that both the client and server identify a given Broadcast.
24type BroadcastKey = u32;
25
26/// Broadcast Subscriptions a client is subscribed to and the last change seen
27#[derive(Debug, Default)]
28pub struct BroadcastSubs {
29    broadcast_list: Vec<BroadcastKey>, // subscribed broadcast ids
30    change_count: u32,                 // the last known change
31}
32
33/// The server maintained list of Broadcasts
34#[derive(Debug)]
35struct BroadcastRegistry {
36    lookup: HashMap<String, BroadcastKey>, // mapping of broadcast string identifiers to internal BroadcastKeys
37    table: Vec<String>,                    // List of known broadcast string identifiers
38}
39
40impl BroadcastRegistry {
41    fn new() -> BroadcastRegistry {
42        BroadcastRegistry {
43            lookup: HashMap::new(),
44            table: Vec::new(),
45        }
46    }
47
48    /// Add's a new broadcast to the lookup table, returns the existing key if
49    /// the broadcast already exists
50    fn add_broadcast(&mut self, broadcast_id: String) -> BroadcastKey {
51        if let Some(v) = self.lookup.get(&broadcast_id) {
52            return *v;
53        }
54        let i = self.table.len() as u32;
55        self.table.push(broadcast_id.clone());
56        self.lookup.insert(broadcast_id, i);
57        i
58    }
59
60    fn lookup_id(&self, key: BroadcastKey) -> Option<String> {
61        self.table.get(key as usize).cloned()
62    }
63
64    fn lookup_key(&self, broadcast_id: &str) -> Option<BroadcastKey> {
65        self.lookup.get(broadcast_id).cloned()
66    }
67}
68
69/// An individual broadcast and the current change count
70#[derive(Debug)]
71struct BroadcastRevision {
72    change_count: u32,
73    broadcast: BroadcastKey,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr, Display)]
77pub enum BroadcastErrorKind {
78    #[strum(serialize = "Broadcast not found")]
79    NotFound,
80}
81
82/// A provided Broadcast/Version used for `BroadcastSubsInit`, client comparisons, and outgoing
83/// deltas
84#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
85pub struct Broadcast {
86    broadcast_id: String,
87    version: String,
88}
89
90impl Broadcast {
91    /// Errors out a broadcast for broadcasts that weren't found
92    pub fn error(self) -> Broadcast {
93        Broadcast {
94            broadcast_id: self.broadcast_id,
95            version: BroadcastErrorKind::NotFound.to_string(),
96        }
97    }
98}
99
100// Handy From impls for common hashmap to/from conversions
101impl From<(String, String)> for Broadcast {
102    fn from(val: (String, String)) -> Broadcast {
103        Broadcast {
104            broadcast_id: val.0,
105            version: val.1,
106        }
107    }
108}
109
110impl From<Broadcast> for (String, BroadcastValue) {
111    fn from(bcast: Broadcast) -> (String, BroadcastValue) {
112        (bcast.broadcast_id, BroadcastValue::Value(bcast.version))
113    }
114}
115
116impl Broadcast {
117    pub fn from_hashmap(val: HashMap<String, String>) -> Vec<Broadcast> {
118        val.into_iter().map(|v| v.into()).collect()
119    }
120
121    pub fn vec_into_hashmap(broadcasts: Vec<Broadcast>) -> HashMap<String, BroadcastValue> {
122        broadcasts.into_iter().map(|v| v.into()).collect()
123    }
124}
125
126/// Return to `Server::broadcast_init` the result of the first delta call for a client
127/// given a full list of broadcast id's and versions.
128///
129/// NOTE: this type is broken down immediately.
130#[derive(Debug)]
131pub struct BroadcastSubsInit(
132    pub BroadcastSubs,  // client provided list of subscriptions
133    pub Vec<Broadcast>, // server provided list of string IDs and versions
134);
135
136/// BroadcastChangeTracker tracks the broadcasts, their change_count, and the
137/// broadcast lookup registry
138#[derive(Debug)]
139pub struct BroadcastChangeTracker {
140    broadcast_list: Vec<BroadcastRevision>,
141    broadcast_registry: BroadcastRegistry,
142    broadcast_versions: HashMap<BroadcastKey, String>,
143    change_count: u32,
144}
145
146impl BroadcastChangeTracker {
147    /// Creates a new `BroadcastChangeTracker` initialized with the provided `broadcasts`.
148    pub fn new(broadcasts: Vec<Broadcast>) -> BroadcastChangeTracker {
149        let mut tracker = BroadcastChangeTracker {
150            broadcast_list: Vec::new(),
151            broadcast_registry: BroadcastRegistry::new(),
152            broadcast_versions: HashMap::new(),
153            change_count: 0,
154        };
155        for srv in broadcasts {
156            let key = tracker.broadcast_registry.add_broadcast(srv.broadcast_id);
157            tracker.broadcast_versions.insert(key, srv.version);
158        }
159        tracker
160    }
161
162    /// Add a `Vec` of `Broadcast`s via `self.add_broadcast`
163    ///
164    /// Returning the latest change_count (or `None` for an empty `Vec`)
165    pub fn add_broadcasts(&mut self, broadcasts: Vec<Broadcast>) -> Option<u32> {
166        let mut change_count = None;
167        for broadcast in broadcasts {
168            change_count.replace(self.add_broadcast(broadcast));
169        }
170        change_count
171    }
172
173    /// Add a new broadcast to the BroadcastChangeTracker, triggering a change_count increase.
174    /// Note: If the broadcast already exists, it will be updated instead.
175    pub fn add_broadcast(&mut self, broadcast: Broadcast) -> u32 {
176        if let Ok(change_count) = self.update_broadcast(broadcast.clone()) {
177            trace!("📢 returning change count {}", &change_count);
178            return change_count;
179        }
180        self.change_count += 1;
181        let key = self
182            .broadcast_registry
183            .add_broadcast(broadcast.broadcast_id);
184        self.broadcast_versions.insert(key, broadcast.version);
185        self.broadcast_list.push(BroadcastRevision {
186            change_count: self.change_count,
187            broadcast: key,
188        });
189        self.change_count
190    }
191
192    /// Update a `broadcast` to a new revision, triggering a change_count increase.
193    ///
194    /// Returns an error if the `broadcast` was never initialized/added.
195    pub fn update_broadcast(&mut self, broadcast: Broadcast) -> Result<u32> {
196        let b_id = broadcast.broadcast_id.clone();
197        let old_count = self.change_count;
198        let key = self
199            .broadcast_registry
200            .lookup_key(&broadcast.broadcast_id)
201            .ok_or(ApcErrorKind::BroadcastError(
202                BroadcastErrorKind::NotFound.to_string(),
203            ))?;
204
205        if let Some(ver) = self.broadcast_versions.get_mut(&key) {
206            if *ver == broadcast.version {
207                return Ok(self.change_count);
208            }
209            *ver = broadcast.version;
210        } else {
211            trace!("📢 Not found: {b_id}");
212            return Err(
213                ApcErrorKind::BroadcastError(BroadcastErrorKind::NotFound.to_string()).into(),
214            );
215        }
216
217        trace!("📢 New version of {b_id}");
218        // Check to see if this broadcast has been updated since initialization
219        let bcast_index = self
220            .broadcast_list
221            .iter()
222            .enumerate()
223            .filter_map(|(i, bcast)| (bcast.broadcast == key).then_some(i))
224            .next();
225        self.change_count += 1;
226        if let Some(bcast_index) = bcast_index {
227            trace!("📢  {} index: {}", &b_id, &bcast_index);
228            let mut bcast = self.broadcast_list.remove(bcast_index);
229            bcast.change_count = self.change_count;
230            self.broadcast_list.push(bcast);
231        } else {
232            trace!("📢 adding broadcast list for {}", &b_id);
233            self.broadcast_list.push(BroadcastRevision {
234                change_count: self.change_count,
235                broadcast: key,
236            })
237        }
238        if old_count != self.change_count {
239            trace!("📢 New Change available");
240        }
241        Ok(self.change_count)
242    }
243
244    /// Returns the new broadcast versions since the provided `client_set`.
245    pub fn change_count_delta(&self, client_set: &mut BroadcastSubs) -> Option<Vec<Broadcast>> {
246        if self.change_count <= client_set.change_count {
247            return None;
248        }
249        let mut bcast_delta = Vec::new();
250        for bcast in self.broadcast_list.iter().rev() {
251            if bcast.change_count <= client_set.change_count {
252                break;
253            }
254            if !client_set.broadcast_list.contains(&bcast.broadcast) {
255                continue;
256            }
257            if let Some(ver) = self.broadcast_versions.get(&bcast.broadcast) {
258                if let Some(bcast_id) = self.broadcast_registry.lookup_id(bcast.broadcast) {
259                    bcast_delta.push(Broadcast {
260                        broadcast_id: bcast_id,
261                        version: (*ver).clone(),
262                    });
263                }
264            }
265        }
266        client_set.change_count = self.change_count;
267        (!bcast_delta.is_empty()).then_some(bcast_delta)
268    }
269
270    /// Returns a delta for `broadcasts` that are out of date with the latest version and a
271    /// the collection of broadcast subscriptions.
272    pub fn broadcast_delta(&self, broadcasts: &[Broadcast]) -> BroadcastSubsInit {
273        let mut bcast_list = Vec::new();
274        let mut bcast_delta = Vec::new();
275        for bcast in broadcasts.iter() {
276            if let Some(bcast_key) = self.broadcast_registry.lookup_key(&bcast.broadcast_id) {
277                if let Some(ver) = self.broadcast_versions.get(&bcast_key) {
278                    if *ver != bcast.version {
279                        bcast_delta.push(Broadcast {
280                            broadcast_id: bcast.broadcast_id.clone(),
281                            version: (*ver).clone(),
282                        });
283                    }
284                }
285                bcast_list.push(bcast_key);
286            }
287        }
288        BroadcastSubsInit(
289            BroadcastSubs {
290                broadcast_list: bcast_list,
291                change_count: self.change_count,
292            },
293            bcast_delta,
294        )
295    }
296
297    /// Update a `BroadcastSubs` to account for new broadcasts.
298    ///
299    /// Returns broadcasts that have changed.
300    pub fn subscribe_to_broadcasts(
301        &self,
302        broadcast_subs: &mut BroadcastSubs,
303        broadcasts: &[Broadcast],
304    ) -> Option<Vec<Broadcast>> {
305        let mut bcast_delta = self.change_count_delta(broadcast_subs).unwrap_or_default();
306        for bcast in broadcasts.iter() {
307            if let Some(bcast_key) = self.broadcast_registry.lookup_key(&bcast.broadcast_id) {
308                if let Some(ver) = self.broadcast_versions.get(&bcast_key) {
309                    if *ver != bcast.version {
310                        bcast_delta.push(Broadcast {
311                            broadcast_id: bcast.broadcast_id.clone(),
312                            version: (*ver).clone(),
313                        });
314                    }
315                }
316                broadcast_subs.broadcast_list.push(bcast_key)
317            }
318        }
319        (!bcast_delta.is_empty()).then_some(bcast_delta)
320    }
321
322    /// Check a broadcast list and return unknown broadcast id's with their appropriate error
323    pub fn missing_broadcasts(&self, broadcasts: &[Broadcast]) -> Vec<Broadcast> {
324        broadcasts
325            .iter()
326            .filter(|&b| {
327                self.broadcast_registry
328                    .lookup_key(&b.broadcast_id)
329                    .is_none()
330            })
331            .map(|b| b.clone().error())
332            .collect()
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    fn make_broadcast_base() -> Vec<Broadcast> {
341        vec![
342            Broadcast {
343                broadcast_id: String::from("bcasta"),
344                version: String::from("rev1"),
345            },
346            Broadcast {
347                broadcast_id: String::from("bcastb"),
348                version: String::from("revalha"),
349            },
350        ]
351    }
352
353    #[test]
354    fn test_broadcast_change_tracker() {
355        let broadcasts = make_broadcast_base();
356        let desired_broadcasts = broadcasts.clone();
357        let mut tracker = BroadcastChangeTracker::new(broadcasts);
358        let BroadcastSubsInit(mut broadcast_subs, delta) =
359            tracker.broadcast_delta(&desired_broadcasts);
360        assert_eq!(delta.len(), 0);
361        assert_eq!(broadcast_subs.change_count, 0);
362        assert_eq!(broadcast_subs.broadcast_list.len(), 2);
363
364        tracker
365            .update_broadcast(Broadcast {
366                broadcast_id: String::from("bcasta"),
367                version: String::from("rev2"),
368            })
369            .ok();
370        let delta = tracker.change_count_delta(&mut broadcast_subs);
371        assert!(delta.is_some());
372        let delta = delta.unwrap();
373        assert_eq!(delta.len(), 1);
374    }
375
376    #[test]
377    fn test_broadcast_change_handles_new_broadcasts() {
378        let broadcasts = make_broadcast_base();
379        let desired_broadcasts = broadcasts.clone();
380        let mut tracker = BroadcastChangeTracker::new(broadcasts);
381        let BroadcastSubsInit(mut broadcast_subs, _) = tracker.broadcast_delta(&desired_broadcasts);
382
383        tracker.add_broadcast(Broadcast {
384            broadcast_id: String::from("bcastc"),
385            version: String::from("revmega"),
386        });
387        let delta = tracker.change_count_delta(&mut broadcast_subs);
388        assert!(delta.is_none());
389
390        let delta = tracker
391            .subscribe_to_broadcasts(
392                &mut broadcast_subs,
393                &[Broadcast {
394                    broadcast_id: String::from("bcastc"),
395                    version: String::from("revision_alpha"),
396                }],
397            )
398            .unwrap();
399        assert_eq!(delta.len(), 1);
400        assert_eq!(delta[0].version, String::from("revmega"));
401        assert_eq!(broadcast_subs.change_count, 1);
402        assert_eq!(tracker.broadcast_list.len(), 1);
403    }
404}