autoconnect_settings/
app_state.rs1use std::{sync::Arc, time::Duration};
2
3#[cfg(feature = "bigtable")]
4use autopush_common::db::bigtable::BigTableClientImpl;
5#[cfg(feature = "postgres")]
6use autopush_common::db::postgres::PgClientImpl;
7#[cfg(feature = "redis")]
8use autopush_common::db::redis::RedisClientImpl;
9use cadence::StatsdClient;
10use config::ConfigError;
11use fernet::{Fernet, MultiFernet};
12use tokio::sync::RwLock;
13
14use autoconnect_common::{
15 broadcast::BroadcastChangeTracker, megaphone::init_and_spawn_megaphone_updater,
16 registry::ClientRegistry,
17};
18use autopush_common::db::{client::DbClient, DbSettings, StorageType};
19#[cfg(feature = "reliable_report")]
20use autopush_common::reliability::PushReliability;
21
22use crate::{Settings, ENV_PREFIX};
23
24#[derive(Clone)]
25pub struct AppState {
26 pub db: Box<dyn DbClient>,
28 pub metrics: Arc<StatsdClient>,
29 pub http: reqwest::Client,
30
31 pub fernet: MultiFernet,
33 pub clients: Arc<ClientRegistry>,
35 pub broadcaster: Arc<RwLock<BroadcastChangeTracker>>,
37
38 pub settings: Settings,
39 pub router_url: String,
40 pub endpoint_url: String,
41
42 #[cfg(feature = "reliable_report")]
43 pub reliability: Arc<PushReliability>,
44}
45
46impl AppState {
47 pub fn from_settings(settings: Settings) -> Result<Self, ConfigError> {
48 let crypto_key = &settings.crypto_key;
49 if !(crypto_key.starts_with('[') && crypto_key.ends_with(']')) {
50 return Err(ConfigError::Message(format!(
51 "Invalid {ENV_PREFIX}_CRYPTO_KEY"
52 )));
53 }
54 let crypto_key = &crypto_key[1..crypto_key.len() - 1];
55 debug!("🔐 Fernet keys: {:?}", &crypto_key);
56 let fernets: Vec<Fernet> = crypto_key
57 .split(',')
58 .map(|s| s.trim().to_string())
59 .map(|key| {
60 Fernet::new(&key).unwrap_or_else(|| panic!("Invalid {ENV_PREFIX}_CRYPTO_KEY"))
61 })
62 .collect();
63 let fernet = MultiFernet::new(fernets);
64 let metrics = autopush_common::metrics::builder(
65 &settings.statsd_label,
66 &settings.statsd_host,
67 settings.statsd_port,
68 )
69 .map_err(|e| ConfigError::Message(e.to_string()))?
70 .with_tag("autoconnect", "true")
72 .build();
73 let metrics = Arc::new(metrics);
74
75 let db_settings = DbSettings {
76 dsn: settings.db_dsn.clone(),
77 db_settings: settings.db_settings.clone(),
78 };
79 let storage_type = StorageType::from_dsn(&db_settings.dsn);
80
81 #[allow(unused)]
82 let db: Box<dyn DbClient> = match storage_type {
83 #[cfg(feature = "bigtable")]
84 StorageType::BigTable => {
85 let client = BigTableClientImpl::new(metrics.clone(), &db_settings)
86 .map_err(|e| ConfigError::Message(e.to_string()))?;
87 client.spawn_sweeper(Duration::from_secs(30));
88 Box::new(client)
89 }
90 #[cfg(feature = "postgres")]
91 StorageType::Postgres => {
92 let client = PgClientImpl::new(metrics.clone(), &db_settings)
93 .map_err(|e| ConfigError::Message(e.to_string()))?;
94 Box::new(client)
95 }
96 #[cfg(feature = "redis")]
97 StorageType::Redis => Box::new(
98 RedisClientImpl::new(metrics.clone(), &db_settings)
99 .map_err(|e| ConfigError::Message(e.to_string()))?,
100 ),
101 _ => panic!(
102 "Invalid Storage type {:?}. Check {}__DB_DSN.",
103 storage_type,
104 ENV_PREFIX.to_uppercase()
105 ),
106 };
107
108 #[cfg(feature = "reliable_report")]
109 let reliability = Arc::new(
110 PushReliability::new(
111 &settings.reliability_dsn,
112 db.clone(),
113 &metrics,
114 settings.reliability_retry_count,
115 )
116 .map_err(|e| {
117 ConfigError::Message(format!("Could not start Reliability connection: {e:?}"))
118 })?,
119 );
120 let http = reqwest::Client::builder()
121 .timeout(Duration::from_secs(1))
122 .build()
123 .unwrap_or_else(|e| panic!("Error while building reqwest::Client: {e}"));
124 let broadcaster = Arc::new(RwLock::new(BroadcastChangeTracker::new(Vec::new())));
125
126 let router_url = settings.router_url();
127 let endpoint_url = settings.endpoint_url();
128
129 Ok(Self {
130 db,
131 metrics,
132 http,
133 fernet,
134 clients: Arc::new(ClientRegistry::default()),
135 broadcaster,
136 settings,
137 router_url,
138 endpoint_url,
139 #[cfg(feature = "reliable_report")]
140 reliability,
141 })
142 }
143
144 pub async fn init_and_spawn_megaphone_updater(&self) -> Result<(), ConfigError> {
148 let Some(ref url) = self.settings.megaphone_api_url else {
149 return Ok(());
150 };
151 let Some(ref token) = self.settings.megaphone_api_token else {
152 return Err(ConfigError::Message(format!(
153 "{ENV_PREFIX}__MEGAPHONE_API_URL requires {ENV_PREFIX}__MEGAPHONE_API_TOKEN"
154 )));
155 };
156 init_and_spawn_megaphone_updater(
157 &self.broadcaster,
158 &self.http,
159 &self.metrics,
160 url,
161 token,
162 self.settings.megaphone_poll_interval,
163 )
164 .await
165 .map_err(|e| ConfigError::Message(e.to_string()))?;
166 Ok(())
167 }
168}
169
170#[cfg(debug_assertions)]
172impl Default for AppState {
173 fn default() -> Self {
174 Self::from_settings(Settings::test_settings()).unwrap()
175 }
176}