autoconnect_settings/
app_state.rs

1use std::{sync::Arc, time::Duration};
2
3#[cfg(feature = "bigtable")]
4use autopush_common::db::bigtable::BigTableClientImpl;
5use cadence::StatsdClient;
6use config::ConfigError;
7use fernet::{Fernet, MultiFernet};
8use tokio::sync::RwLock;
9
10use autoconnect_common::{
11    broadcast::BroadcastChangeTracker, megaphone::init_and_spawn_megaphone_updater,
12    registry::ClientRegistry,
13};
14use autopush_common::db::{client::DbClient, DbSettings, StorageType};
15#[cfg(feature = "reliable_report")]
16use autopush_common::reliability::PushReliability;
17
18use crate::{Settings, ENV_PREFIX};
19
20#[derive(Clone)]
21pub struct AppState {
22    /// Handle to the data storage object
23    pub db: Box<dyn DbClient>,
24    pub metrics: Arc<StatsdClient>,
25    pub http: reqwest::Client,
26
27    /// Encryption object for the endpoint URL
28    pub fernet: MultiFernet,
29    /// The connected WebSocket clients
30    pub clients: Arc<ClientRegistry>,
31    /// The Megaphone Broadcast change tracker
32    pub broadcaster: Arc<RwLock<BroadcastChangeTracker>>,
33
34    pub settings: Settings,
35    pub router_url: String,
36    pub endpoint_url: String,
37
38    #[cfg(feature = "reliable_report")]
39    pub reliability: Arc<PushReliability>,
40}
41
42impl AppState {
43    pub fn from_settings(settings: Settings) -> Result<Self, ConfigError> {
44        let crypto_key = &settings.crypto_key;
45        if !(crypto_key.starts_with('[') && crypto_key.ends_with(']')) {
46            return Err(ConfigError::Message(format!(
47                "Invalid {ENV_PREFIX}_CRYPTO_KEY"
48            )));
49        }
50        let crypto_key = &crypto_key[1..crypto_key.len() - 1];
51        debug!("🔐 Fernet keys: {:?}", &crypto_key);
52        let fernets: Vec<Fernet> = crypto_key
53            .split(',')
54            .map(|s| s.trim().to_string())
55            .map(|key| {
56                Fernet::new(&key).unwrap_or_else(|| panic!("Invalid {ENV_PREFIX}_CRYPTO_KEY"))
57            })
58            .collect();
59        let fernet = MultiFernet::new(fernets);
60        let metrics = autopush_common::metrics::builder(
61            &settings.statsd_label,
62            &settings.statsd_host,
63            settings.statsd_port,
64        )
65        .map_err(|e| ConfigError::Message(e.to_string()))?
66        // Temporary tag to distinguish from the legacy autopush(connect)
67        .with_tag("autoconnect", "true")
68        .build();
69        let metrics = Arc::new(metrics);
70
71        let db_settings = DbSettings {
72            dsn: settings.db_dsn.clone(),
73            db_settings: settings.db_settings.clone(),
74        };
75        let storage_type = StorageType::from_dsn(&db_settings.dsn);
76
77        #[allow(unused)]
78        let db: Box<dyn DbClient> = match storage_type {
79            #[cfg(feature = "bigtable")]
80            StorageType::BigTable => {
81                let client = BigTableClientImpl::new(metrics.clone(), &db_settings)
82                    .map_err(|e| ConfigError::Message(e.to_string()))?;
83                client.spawn_sweeper(Duration::from_secs(30));
84                Box::new(client)
85            }
86            _ => panic!(
87                "Invalid Storage type {:?}. Check {}__DB_DSN.",
88                storage_type,
89                ENV_PREFIX.to_uppercase()
90            ),
91        };
92
93        #[cfg(feature = "reliable_report")]
94        let reliability = Arc::new(
95            PushReliability::new(
96                &settings.reliability_dsn,
97                db.clone(),
98                &metrics,
99                settings.reliability_retry_count,
100            )
101            .map_err(|e| {
102                ConfigError::Message(format!("Could not start Reliability connection: {e:?}"))
103            })?,
104        );
105        let http = reqwest::Client::builder()
106            .timeout(Duration::from_secs(1))
107            .build()
108            .unwrap_or_else(|e| panic!("Error while building reqwest::Client: {e}"));
109        let broadcaster = Arc::new(RwLock::new(BroadcastChangeTracker::new(Vec::new())));
110
111        let router_url = settings.router_url();
112        let endpoint_url = settings.endpoint_url();
113
114        Ok(Self {
115            db,
116            metrics,
117            http,
118            fernet,
119            clients: Arc::new(ClientRegistry::default()),
120            broadcaster,
121            settings,
122            router_url,
123            endpoint_url,
124            #[cfg(feature = "reliable_report")]
125            reliability,
126        })
127    }
128
129    /// Initialize the `BroadcastChangeTracker`
130    ///
131    /// Via `autoconnect_common::megaphone::init_and_spawn_megaphone_updater`
132    pub async fn init_and_spawn_megaphone_updater(&self) -> Result<(), ConfigError> {
133        let Some(ref url) = self.settings.megaphone_api_url else {
134            return Ok(());
135        };
136        let Some(ref token) = self.settings.megaphone_api_token else {
137            return Err(ConfigError::Message(format!(
138                "{ENV_PREFIX}__MEGAPHONE_API_URL requires {ENV_PREFIX}__MEGAPHONE_API_TOKEN"
139            )));
140        };
141        init_and_spawn_megaphone_updater(
142            &self.broadcaster,
143            &self.http,
144            &self.metrics,
145            url,
146            token,
147            self.settings.megaphone_poll_interval,
148        )
149        .await
150        .map_err(|e| ConfigError::Message(e.to_string()))?;
151        Ok(())
152    }
153}
154
155/// For tests
156#[cfg(debug_assertions)]
157impl Default for AppState {
158    fn default() -> Self {
159        Self::from_settings(Settings::test_settings()).unwrap()
160    }
161}