autoconnect_settings/
app_state.rs

1use std::{sync::Arc, time::Duration};
2
3#[cfg(feature = "bigtable")]
4use autopush_common::db::bigtable::BigTableClientImpl;
5#[cfg(feature = "postgres")]
6use autopush_common::db::postgres::PgClientImpl;
7#[cfg(feature = "redis")]
8use autopush_common::db::redis::RedisClientImpl;
9use cadence::StatsdClient;
10use config::ConfigError;
11use fernet::{Fernet, MultiFernet};
12use tokio::sync::RwLock;
13
14use autoconnect_common::{
15    broadcast::BroadcastChangeTracker, megaphone::init_and_spawn_megaphone_updater,
16    registry::ClientRegistry,
17};
18use autopush_common::db::{client::DbClient, DbSettings, StorageType};
19#[cfg(feature = "reliable_report")]
20use autopush_common::reliability::PushReliability;
21
22use crate::{Settings, ENV_PREFIX};
23
24#[derive(Clone)]
25pub struct AppState {
26    /// Handle to the data storage object
27    pub db: Box<dyn DbClient>,
28    pub metrics: Arc<StatsdClient>,
29    pub http: reqwest::Client,
30
31    /// Encryption object for the endpoint URL
32    pub fernet: MultiFernet,
33    /// The connected WebSocket clients
34    pub clients: Arc<ClientRegistry>,
35    /// The Megaphone Broadcast change tracker
36    pub broadcaster: Arc<RwLock<BroadcastChangeTracker>>,
37
38    pub settings: Settings,
39    pub router_url: String,
40    pub endpoint_url: String,
41
42    #[cfg(feature = "reliable_report")]
43    pub reliability: Arc<PushReliability>,
44}
45
46impl AppState {
47    pub fn from_settings(settings: Settings) -> Result<Self, ConfigError> {
48        let crypto_key = &settings.crypto_key;
49        if !(crypto_key.starts_with('[') && crypto_key.ends_with(']')) {
50            return Err(ConfigError::Message(format!(
51                "Invalid {ENV_PREFIX}_CRYPTO_KEY"
52            )));
53        }
54        let crypto_key = &crypto_key[1..crypto_key.len() - 1];
55        debug!("🔐 Fernet keys: {:?}", &crypto_key);
56        let fernets: Vec<Fernet> = crypto_key
57            .split(',')
58            .map(|s| s.trim().to_string())
59            .map(|key| {
60                Fernet::new(&key).unwrap_or_else(|| panic!("Invalid {ENV_PREFIX}_CRYPTO_KEY"))
61            })
62            .collect();
63        let fernet = MultiFernet::new(fernets);
64        let metrics = autopush_common::metrics::builder(
65            &settings.statsd_label,
66            &settings.statsd_host,
67            settings.statsd_port,
68        )
69        .map_err(|e| ConfigError::Message(e.to_string()))?
70        // Temporary tag to distinguish from the legacy autopush(connect)
71        .with_tag("autoconnect", "true")
72        .build();
73        let metrics = Arc::new(metrics);
74
75        let db_settings = DbSettings {
76            dsn: settings.db_dsn.clone(),
77            db_settings: settings.db_settings.clone(),
78        };
79        let storage_type = StorageType::from_dsn(&db_settings.dsn);
80
81        #[allow(unused)]
82        let db: Box<dyn DbClient> = match storage_type {
83            #[cfg(feature = "bigtable")]
84            StorageType::BigTable => {
85                let client = BigTableClientImpl::new(metrics.clone(), &db_settings)
86                    .map_err(|e| ConfigError::Message(e.to_string()))?;
87                client.spawn_sweeper(Duration::from_secs(30));
88                Box::new(client)
89            }
90            #[cfg(feature = "postgres")]
91            StorageType::Postgres => {
92                let client = PgClientImpl::new(metrics.clone(), &db_settings)
93                    .map_err(|e| ConfigError::Message(e.to_string()))?;
94                Box::new(client)
95            }
96            #[cfg(feature = "redis")]
97            StorageType::Redis => Box::new(
98                RedisClientImpl::new(metrics.clone(), &db_settings)
99                    .map_err(|e| ConfigError::Message(e.to_string()))?,
100            ),
101            _ => panic!(
102                "Invalid Storage type {:?}. Check {}__DB_DSN.",
103                storage_type,
104                ENV_PREFIX.to_uppercase()
105            ),
106        };
107
108        #[cfg(feature = "reliable_report")]
109        let reliability = Arc::new(
110            PushReliability::new(
111                &settings.reliability_dsn,
112                db.clone(),
113                &metrics,
114                settings.reliability_retry_count,
115            )
116            .map_err(|e| {
117                ConfigError::Message(format!("Could not start Reliability connection: {e:?}"))
118            })?,
119        );
120        let http = reqwest::Client::builder()
121            .timeout(Duration::from_secs(1))
122            .build()
123            .unwrap_or_else(|e| panic!("Error while building reqwest::Client: {e}"));
124        let broadcaster = Arc::new(RwLock::new(BroadcastChangeTracker::new(Vec::new())));
125
126        let router_url = settings.router_url();
127        let endpoint_url = settings.endpoint_url();
128
129        Ok(Self {
130            db,
131            metrics,
132            http,
133            fernet,
134            clients: Arc::new(ClientRegistry::default()),
135            broadcaster,
136            settings,
137            router_url,
138            endpoint_url,
139            #[cfg(feature = "reliable_report")]
140            reliability,
141        })
142    }
143
144    /// Initialize the `BroadcastChangeTracker`
145    ///
146    /// Via `autoconnect_common::megaphone::init_and_spawn_megaphone_updater`
147    pub async fn init_and_spawn_megaphone_updater(&self) -> Result<(), ConfigError> {
148        let Some(ref url) = self.settings.megaphone_api_url else {
149            return Ok(());
150        };
151        let Some(ref token) = self.settings.megaphone_api_token else {
152            return Err(ConfigError::Message(format!(
153                "{ENV_PREFIX}__MEGAPHONE_API_URL requires {ENV_PREFIX}__MEGAPHONE_API_TOKEN"
154            )));
155        };
156        init_and_spawn_megaphone_updater(
157            &self.broadcaster,
158            &self.http,
159            &self.metrics,
160            url,
161            token,
162            self.settings.megaphone_poll_interval,
163        )
164        .await
165        .map_err(|e| ConfigError::Message(e.to_string()))?;
166        Ok(())
167    }
168}
169
170/// For tests
171#[cfg(debug_assertions)]
172impl Default for AppState {
173    fn default() -> Self {
174        Self::from_settings(Settings::test_settings()).unwrap()
175    }
176}