autoconnect_common/
megaphone.rs

1use std::{collections::HashMap, error::Error, io, sync::Arc, time::Duration};
2
3use actix_web::rt;
4use autopush_common::metric_name::MetricName;
5use autopush_common::metrics::StatsdClientExt;
6use cadence::StatsdClient;
7use serde_derive::Deserialize;
8use tokio::sync::RwLock;
9
10use crate::broadcast::{Broadcast, BroadcastChangeTracker};
11
12/// The payload provided by the Megaphone service
13#[derive(Deserialize)]
14pub struct MegaphoneResponse {
15    pub broadcasts: HashMap<String, String>,
16}
17
18/// Initialize the `BroadcastChangeTracker`
19///
20/// Immediately populates it with the current Broadcasts polled from the
21/// Megaphone service, then spawns a background task to periodically refresh
22/// it
23pub async fn init_and_spawn_megaphone_updater(
24    broadcaster: &Arc<RwLock<BroadcastChangeTracker>>,
25    http: &reqwest::Client,
26    metrics: &Arc<StatsdClient>,
27    url: &str,
28    poll_interval: Duration,
29) -> reqwest::Result<()> {
30    updater(broadcaster, http, url).await?;
31
32    let broadcaster = Arc::clone(broadcaster);
33    let http = http.clone();
34    let metrics = Arc::clone(metrics);
35    let url = url.to_owned();
36    rt::spawn(async move {
37        loop {
38            rt::time::sleep(poll_interval).await;
39            if let Err(e) = updater(&broadcaster, &http, &url).await {
40                report_updater_error(&metrics, e);
41            } else {
42                metrics
43                    .incr_with_tags(MetricName::MegaphoneUpdaterOk)
44                    .send();
45            }
46        }
47    });
48
49    Ok(())
50}
51
52/// Emits a log, metric and Sentry event depending on the type of Error
53fn report_updater_error(metrics: &Arc<StatsdClient>, err: reqwest::Error) {
54    let reason = if err.is_timeout() {
55        "timeout"
56    } else if err.is_connect() {
57        "connect"
58    } else if is_io(&err) {
59        "io"
60    } else {
61        "unknown"
62    };
63    metrics
64        .incr_with_tags(MetricName::MegaphoneUpdaterError)
65        .with_tag("reason", reason)
66        .send();
67    if reason == "unknown" {
68        error!("📢megaphone::updater failed: {}", err);
69        sentry::capture_event(sentry::event_from_error(&err));
70    } else {
71        trace!("📢megaphone::updater failed (reason: {}): {}", reason, err);
72    }
73}
74
75/// Refresh the `BroadcastChangeTracker`'s Broadcasts from the Megaphone service
76async fn updater(
77    broadcaster: &Arc<RwLock<BroadcastChangeTracker>>,
78    http: &reqwest::Client,
79    url: &str,
80) -> reqwest::Result<()> {
81    trace!("📢megaphone::updater");
82    let MegaphoneResponse { broadcasts } = http
83        .get(url)
84        .send()
85        .await?
86        .error_for_status()?
87        .json()
88        .await?;
89    let broadcasts = Broadcast::from_hashmap(broadcasts);
90    if !broadcasts.is_empty() {
91        let change_count = broadcaster.write().await.add_broadcasts(broadcasts);
92        trace!("📢 add_broadcast change_count: {:?}", change_count);
93    }
94    Ok(())
95}
96
97/// Determine if a source of [reqwest::Error] was a [hyper::Error] Io Error
98fn is_io(err: &reqwest::Error) -> bool {
99    let mut source = err.source();
100    while let Some(err) = source {
101        if let Some(hyper_err) = err.downcast_ref::<hyper::Error>()
102            && is_hyper_io(hyper_err)
103        {
104            return true;
105        }
106        source = err.source();
107    }
108    false
109}
110
111/// Determine if a source of [hyper::Error] was an [io::Error]
112fn is_hyper_io(err: &hyper::Error) -> bool {
113    let mut source = err.source();
114    while let Some(err) = source {
115        if err.downcast_ref::<io::Error>().is_some() {
116            return true;
117        }
118        source = err.source();
119    }
120    false
121}