autoconnect_common/
megaphone.rs

1use std::{collections::HashMap, error::Error, io, sync::Arc, time::Duration};
2
3use actix_web::rt;
4use autopush_common::metric_name::MetricName;
5use autopush_common::metrics::StatsdClientExt;
6use cadence::StatsdClient;
7use serde_derive::Deserialize;
8use tokio::sync::RwLock;
9
10use crate::broadcast::{Broadcast, BroadcastChangeTracker};
11
12/// The payload provided by the Megaphone service
13#[derive(Deserialize)]
14pub struct MegaphoneResponse {
15    pub broadcasts: HashMap<String, String>,
16}
17
18/// Initialize the `BroadcastChangeTracker`
19///
20/// Immediately populates it with the current Broadcasts polled from the
21/// Megaphone service, then spawns a background task to periodically refresh
22/// it
23pub async fn init_and_spawn_megaphone_updater(
24    broadcaster: &Arc<RwLock<BroadcastChangeTracker>>,
25    http: &reqwest::Client,
26    metrics: &Arc<StatsdClient>,
27    url: &str,
28    token: &str,
29    poll_interval: Duration,
30) -> reqwest::Result<()> {
31    updater(broadcaster, http, url, token).await?;
32
33    let broadcaster = Arc::clone(broadcaster);
34    let http = http.clone();
35    let metrics = Arc::clone(metrics);
36    let url = url.to_owned();
37    let token = token.to_owned();
38    rt::spawn(async move {
39        loop {
40            rt::time::sleep(poll_interval).await;
41            if let Err(e) = updater(&broadcaster, &http, &url, &token).await {
42                report_updater_error(&metrics, e);
43            } else {
44                metrics
45                    .incr_with_tags(MetricName::MegaphoneUpdaterOk)
46                    .send();
47            }
48        }
49    });
50
51    Ok(())
52}
53
54/// Emits a log, metric and Sentry event depending on the type of Error
55fn report_updater_error(metrics: &Arc<StatsdClient>, err: reqwest::Error) {
56    let reason = if err.is_timeout() {
57        "timeout"
58    } else if err.is_connect() {
59        "connect"
60    } else if is_io(&err) {
61        "io"
62    } else {
63        "unknown"
64    };
65    metrics
66        .incr_with_tags(MetricName::MegaphoneUpdaterError)
67        .with_tag("reason", reason)
68        .send();
69    if reason == "unknown" {
70        error!("📢megaphone::updater failed: {}", err);
71        sentry::capture_event(sentry::event_from_error(&err));
72    } else {
73        trace!("📢megaphone::updater failed (reason: {}): {}", reason, err);
74    }
75}
76
77/// Refresh the `BroadcastChangeTracker`'s Broadcasts from the Megaphone service
78async fn updater(
79    broadcaster: &Arc<RwLock<BroadcastChangeTracker>>,
80    http: &reqwest::Client,
81    url: &str,
82    token: &str,
83) -> reqwest::Result<()> {
84    trace!("📢megaphone::updater");
85    let MegaphoneResponse { broadcasts } = http
86        .get(url)
87        .header("Authorization", token)
88        .send()
89        .await?
90        .error_for_status()?
91        .json()
92        .await?;
93    let broadcasts = Broadcast::from_hashmap(broadcasts);
94    if !broadcasts.is_empty() {
95        let change_count = broadcaster.write().await.add_broadcasts(broadcasts);
96        trace!("📢 add_broadcast change_count: {:?}", change_count);
97    }
98    Ok(())
99}
100
101/// Determine if a source of [reqwest::Error] was a [hyper::Error] Io Error
102fn is_io(err: &reqwest::Error) -> bool {
103    let mut source = err.source();
104    while let Some(err) = source {
105        if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
106            if is_hyper_io(hyper_err) {
107                return true;
108            }
109        }
110        source = err.source();
111    }
112    false
113}
114
115/// Determine if a source of [hyper::Error] was an [io::Error]
116fn is_hyper_io(err: &hyper::Error) -> bool {
117    let mut source = err.source();
118    while let Some(err) = source {
119        if err.downcast_ref::<io::Error>().is_some() {
120            return true;
121        }
122        source = err.source();
123    }
124    false
125}