autoconnect_common/
megaphone.rs1use std::{collections::HashMap, error::Error, io, sync::Arc, time::Duration};
2
3use actix_web::rt;
4use autopush_common::metric_name::MetricName;
5use autopush_common::metrics::StatsdClientExt;
6use cadence::StatsdClient;
7use serde_derive::Deserialize;
8use tokio::sync::RwLock;
9
10use crate::broadcast::{Broadcast, BroadcastChangeTracker};
11
12#[derive(Deserialize)]
14pub struct MegaphoneResponse {
15 pub broadcasts: HashMap<String, String>,
16}
17
18pub async fn init_and_spawn_megaphone_updater(
24 broadcaster: &Arc<RwLock<BroadcastChangeTracker>>,
25 http: &reqwest::Client,
26 metrics: &Arc<StatsdClient>,
27 url: &str,
28 poll_interval: Duration,
29) -> reqwest::Result<()> {
30 updater(broadcaster, http, url).await?;
31
32 let broadcaster = Arc::clone(broadcaster);
33 let http = http.clone();
34 let metrics = Arc::clone(metrics);
35 let url = url.to_owned();
36 rt::spawn(async move {
37 loop {
38 rt::time::sleep(poll_interval).await;
39 if let Err(e) = updater(&broadcaster, &http, &url).await {
40 report_updater_error(&metrics, e);
41 } else {
42 metrics
43 .incr_with_tags(MetricName::MegaphoneUpdaterOk)
44 .send();
45 }
46 }
47 });
48
49 Ok(())
50}
51
52fn report_updater_error(metrics: &Arc<StatsdClient>, err: reqwest::Error) {
54 let reason = if err.is_timeout() {
55 "timeout"
56 } else if err.is_connect() {
57 "connect"
58 } else if is_io(&err) {
59 "io"
60 } else {
61 "unknown"
62 };
63 metrics
64 .incr_with_tags(MetricName::MegaphoneUpdaterError)
65 .with_tag("reason", reason)
66 .send();
67 if reason == "unknown" {
68 error!("📢megaphone::updater failed: {}", err);
69 sentry::capture_event(sentry::event_from_error(&err));
70 } else {
71 trace!("📢megaphone::updater failed (reason: {}): {}", reason, err);
72 }
73}
74
75async fn updater(
77 broadcaster: &Arc<RwLock<BroadcastChangeTracker>>,
78 http: &reqwest::Client,
79 url: &str,
80) -> reqwest::Result<()> {
81 trace!("📢megaphone::updater");
82 let MegaphoneResponse { broadcasts } = http
83 .get(url)
84 .send()
85 .await?
86 .error_for_status()?
87 .json()
88 .await?;
89 let broadcasts = Broadcast::from_hashmap(broadcasts);
90 if !broadcasts.is_empty() {
91 let change_count = broadcaster.write().await.add_broadcasts(broadcasts);
92 trace!("📢 add_broadcast change_count: {:?}", change_count);
93 }
94 Ok(())
95}
96
97fn is_io(err: &reqwest::Error) -> bool {
99 let mut source = err.source();
100 while let Some(err) = source {
101 if let Some(hyper_err) = err.downcast_ref::<hyper::Error>()
102 && is_hyper_io(hyper_err)
103 {
104 return true;
105 }
106 source = err.source();
107 }
108 false
109}
110
111fn is_hyper_io(err: &hyper::Error) -> bool {
113 let mut source = err.source();
114 while let Some(err) = source {
115 if err.downcast_ref::<io::Error>().is_some() {
116 return true;
117 }
118 source = err.source();
119 }
120 false
121}