autoconnect_common/
megaphone.rs1use std::{collections::HashMap, error::Error, io, sync::Arc, time::Duration};
2
3use actix_web::rt;
4use autopush_common::metric_name::MetricName;
5use autopush_common::metrics::StatsdClientExt;
6use cadence::StatsdClient;
7use serde_derive::Deserialize;
8use tokio::sync::RwLock;
9
10use crate::broadcast::{Broadcast, BroadcastChangeTracker};
11
12#[derive(Deserialize)]
14pub struct MegaphoneResponse {
15 pub broadcasts: HashMap<String, String>,
16}
17
18pub async fn init_and_spawn_megaphone_updater(
24 broadcaster: &Arc<RwLock<BroadcastChangeTracker>>,
25 http: &reqwest::Client,
26 metrics: &Arc<StatsdClient>,
27 url: &str,
28 token: &str,
29 poll_interval: Duration,
30) -> reqwest::Result<()> {
31 updater(broadcaster, http, url, token).await?;
32
33 let broadcaster = Arc::clone(broadcaster);
34 let http = http.clone();
35 let metrics = Arc::clone(metrics);
36 let url = url.to_owned();
37 let token = token.to_owned();
38 rt::spawn(async move {
39 loop {
40 rt::time::sleep(poll_interval).await;
41 if let Err(e) = updater(&broadcaster, &http, &url, &token).await {
42 report_updater_error(&metrics, e);
43 } else {
44 metrics
45 .incr_with_tags(MetricName::MegaphoneUpdaterOk)
46 .send();
47 }
48 }
49 });
50
51 Ok(())
52}
53
54fn report_updater_error(metrics: &Arc<StatsdClient>, err: reqwest::Error) {
56 let reason = if err.is_timeout() {
57 "timeout"
58 } else if err.is_connect() {
59 "connect"
60 } else if is_io(&err) {
61 "io"
62 } else {
63 "unknown"
64 };
65 metrics
66 .incr_with_tags(MetricName::MegaphoneUpdaterError)
67 .with_tag("reason", reason)
68 .send();
69 if reason == "unknown" {
70 error!("📢megaphone::updater failed: {}", err);
71 sentry::capture_event(sentry::event_from_error(&err));
72 } else {
73 trace!("📢megaphone::updater failed (reason: {}): {}", reason, err);
74 }
75}
76
77async fn updater(
79 broadcaster: &Arc<RwLock<BroadcastChangeTracker>>,
80 http: &reqwest::Client,
81 url: &str,
82 token: &str,
83) -> reqwest::Result<()> {
84 trace!("📢megaphone::updater");
85 let MegaphoneResponse { broadcasts } = http
86 .get(url)
87 .header("Authorization", token)
88 .send()
89 .await?
90 .error_for_status()?
91 .json()
92 .await?;
93 let broadcasts = Broadcast::from_hashmap(broadcasts);
94 if !broadcasts.is_empty() {
95 let change_count = broadcaster.write().await.add_broadcasts(broadcasts);
96 trace!("📢 add_broadcast change_count: {:?}", change_count);
97 }
98 Ok(())
99}
100
101fn is_io(err: &reqwest::Error) -> bool {
103 let mut source = err.source();
104 while let Some(err) = source {
105 if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
106 if is_hyper_io(hyper_err) {
107 return true;
108 }
109 }
110 source = err.source();
111 }
112 false
113}
114
115fn is_hyper_io(err: &hyper::Error) -> bool {
117 let mut source = err.source();
118 while let Some(err) = source {
119 if err.downcast_ref::<io::Error>().is_some() {
120 return true;
121 }
122 source = err.source();
123 }
124 false
125}