autoconnect_settings/
app_state.rs

1use std::{sync::Arc, time::Duration};
2
3#[cfg(feature = "bigtable")]
4use autopush_common::db::bigtable::BigTableClientImpl;
5#[cfg(feature = "redis")]
6use autopush_common::db::redis::RedisClientImpl;
7use cadence::StatsdClient;
8use config::ConfigError;
9use fernet::{Fernet, MultiFernet};
10use tokio::sync::RwLock;
11
12use autoconnect_common::{
13    broadcast::BroadcastChangeTracker, megaphone::init_and_spawn_megaphone_updater,
14    registry::ClientRegistry,
15};
16use autopush_common::db::{client::DbClient, DbSettings, StorageType};
17#[cfg(feature = "reliable_report")]
18use autopush_common::reliability::PushReliability;
19
20use crate::{Settings, ENV_PREFIX};
21
22#[derive(Clone)]
23pub struct AppState {
24    /// Handle to the data storage object
25    pub db: Box<dyn DbClient>,
26    pub metrics: Arc<StatsdClient>,
27    pub http: reqwest::Client,
28
29    /// Encryption object for the endpoint URL
30    pub fernet: MultiFernet,
31    /// The connected WebSocket clients
32    pub clients: Arc<ClientRegistry>,
33    /// The Megaphone Broadcast change tracker
34    pub broadcaster: Arc<RwLock<BroadcastChangeTracker>>,
35
36    pub settings: Settings,
37    pub router_url: String,
38    pub endpoint_url: String,
39
40    #[cfg(feature = "reliable_report")]
41    pub reliability: Arc<PushReliability>,
42}
43
44impl AppState {
45    pub fn from_settings(settings: Settings) -> Result<Self, ConfigError> {
46        let crypto_key = &settings.crypto_key;
47        if !(crypto_key.starts_with('[') && crypto_key.ends_with(']')) {
48            return Err(ConfigError::Message(format!(
49                "Invalid {ENV_PREFIX}_CRYPTO_KEY"
50            )));
51        }
52        let crypto_key = &crypto_key[1..crypto_key.len() - 1];
53        debug!("🔐 Fernet keys: {:?}", &crypto_key);
54        let fernets: Vec<Fernet> = crypto_key
55            .split(',')
56            .map(|s| s.trim().to_string())
57            .map(|key| {
58                Fernet::new(&key).unwrap_or_else(|| panic!("Invalid {ENV_PREFIX}_CRYPTO_KEY"))
59            })
60            .collect();
61        let fernet = MultiFernet::new(fernets);
62        let metrics = autopush_common::metrics::builder(
63            &settings.statsd_label,
64            &settings.statsd_host,
65            settings.statsd_port,
66        )
67        .map_err(|e| ConfigError::Message(e.to_string()))?
68        // Temporary tag to distinguish from the legacy autopush(connect)
69        .with_tag("autoconnect", "true")
70        .build();
71        let metrics = Arc::new(metrics);
72
73        let db_settings = DbSettings {
74            dsn: settings.db_dsn.clone(),
75            db_settings: settings.db_settings.clone(),
76        };
77        let storage_type = StorageType::from_dsn(&db_settings.dsn);
78
79        #[allow(unused)]
80        let db: Box<dyn DbClient> = match storage_type {
81            #[cfg(feature = "bigtable")]
82            StorageType::BigTable => {
83                let client = BigTableClientImpl::new(metrics.clone(), &db_settings)
84                    .map_err(|e| ConfigError::Message(e.to_string()))?;
85                client.spawn_sweeper(Duration::from_secs(30));
86                Box::new(client)
87            }
88            #[cfg(feature = "redis")]
89            StorageType::Redis => Box::new(
90                RedisClientImpl::new(metrics.clone(), &db_settings)
91                    .map_err(|e| ConfigError::Message(e.to_string()))?,
92            ),
93            _ => panic!(
94                "Invalid Storage type {:?}. Check {}__DB_DSN.",
95                storage_type,
96                ENV_PREFIX.to_uppercase()
97            ),
98        };
99
100        #[cfg(feature = "reliable_report")]
101        let reliability = Arc::new(
102            PushReliability::new(
103                &settings.reliability_dsn,
104                db.clone(),
105                &metrics,
106                settings.reliability_retry_count,
107            )
108            .map_err(|e| {
109                ConfigError::Message(format!("Could not start Reliability connection: {e:?}"))
110            })?,
111        );
112        let http = reqwest::Client::builder()
113            .timeout(Duration::from_secs(1))
114            .build()
115            .unwrap_or_else(|e| panic!("Error while building reqwest::Client: {e}"));
116        let broadcaster = Arc::new(RwLock::new(BroadcastChangeTracker::new(Vec::new())));
117
118        let router_url = settings.router_url();
119        let endpoint_url = settings.endpoint_url();
120
121        Ok(Self {
122            db,
123            metrics,
124            http,
125            fernet,
126            clients: Arc::new(ClientRegistry::default()),
127            broadcaster,
128            settings,
129            router_url,
130            endpoint_url,
131            #[cfg(feature = "reliable_report")]
132            reliability,
133        })
134    }
135
136    /// Initialize the `BroadcastChangeTracker`
137    ///
138    /// Via `autoconnect_common::megaphone::init_and_spawn_megaphone_updater`
139    pub async fn init_and_spawn_megaphone_updater(&self) -> Result<(), ConfigError> {
140        let Some(ref url) = self.settings.megaphone_api_url else {
141            return Ok(());
142        };
143        let Some(ref token) = self.settings.megaphone_api_token else {
144            return Err(ConfigError::Message(format!(
145                "{ENV_PREFIX}__MEGAPHONE_API_URL requires {ENV_PREFIX}__MEGAPHONE_API_TOKEN"
146            )));
147        };
148        init_and_spawn_megaphone_updater(
149            &self.broadcaster,
150            &self.http,
151            &self.metrics,
152            url,
153            token,
154            self.settings.megaphone_poll_interval,
155        )
156        .await
157        .map_err(|e| ConfigError::Message(e.to_string()))?;
158        Ok(())
159    }
160}
161
162/// For tests
163#[cfg(debug_assertions)]
164impl Default for AppState {
165    fn default() -> Self {
166        Self::from_settings(Settings::test_settings()).unwrap()
167    }
168}