1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::{sync::Arc, time::Duration};

#[cfg(feature = "bigtable")]
use autopush_common::db::bigtable::BigTableClientImpl;
use cadence::StatsdClient;
use config::ConfigError;
use fernet::{Fernet, MultiFernet};
use tokio::sync::RwLock;

use autoconnect_common::{
    broadcast::BroadcastChangeTracker, megaphone::init_and_spawn_megaphone_updater,
    registry::ClientRegistry,
};
use autopush_common::db::{client::DbClient, DbSettings, StorageType};

use crate::{Settings, ENV_PREFIX};

#[derive(Clone)]
pub struct AppState {
    /// Handle to the data storage object
    pub db: Box<dyn DbClient>,
    pub metrics: Arc<StatsdClient>,
    pub http: reqwest::Client,

    /// Encryption object for the endpoint URL
    pub fernet: MultiFernet,
    /// The connected WebSocket clients
    pub clients: Arc<ClientRegistry>,
    /// The Megaphone Broadcast change tracker
    pub broadcaster: Arc<RwLock<BroadcastChangeTracker>>,

    pub settings: Settings,
    pub router_url: String,
    pub endpoint_url: String,
}

impl AppState {
    pub fn from_settings(settings: Settings) -> Result<Self, ConfigError> {
        let crypto_key = &settings.crypto_key;
        if !(crypto_key.starts_with('[') && crypto_key.ends_with(']')) {
            return Err(ConfigError::Message(format!(
                "Invalid {ENV_PREFIX}_CRYPTO_KEY"
            )));
        }
        let crypto_key = &crypto_key[1..crypto_key.len() - 1];
        debug!("🔐 Fernet keys: {:?}", &crypto_key);
        let fernets: Vec<Fernet> = crypto_key
            .split(',')
            .map(|s| s.trim().to_string())
            .map(|key| {
                Fernet::new(&key).unwrap_or_else(|| panic!("Invalid {ENV_PREFIX}_CRYPTO_KEY"))
            })
            .collect();
        let fernet = MultiFernet::new(fernets);
        let metrics = autopush_common::metrics::builder(
            &settings.statsd_label,
            &settings.statsd_host,
            settings.statsd_port,
        )
        .map_err(|e| ConfigError::Message(e.to_string()))?
        // Temporary tag to distinguish from the legacy autopush(connect)
        .with_tag("autoconnect", "true")
        .build();
        let metrics = Arc::new(metrics);

        let db_settings = DbSettings {
            dsn: settings.db_dsn.clone(),
            db_settings: settings.db_settings.clone(),
        };
        let storage_type = StorageType::from_dsn(&db_settings.dsn);
        #[allow(unused)]
        let db: Box<dyn DbClient> = match storage_type {
            #[cfg(feature = "bigtable")]
            StorageType::BigTable => {
                let client = BigTableClientImpl::new(metrics.clone(), &db_settings)
                    .map_err(|e| ConfigError::Message(e.to_string()))?;
                client.spawn_sweeper(Duration::from_secs(30));
                Box::new(client)
            }
            _ => panic!(
                "Invalid Storage type {:?}. Check {}__DB_DSN.",
                storage_type,
                ENV_PREFIX.to_uppercase()
            ),
        };
        let http = reqwest::Client::builder()
            .timeout(Duration::from_secs(1))
            .build()
            .unwrap_or_else(|e| panic!("Error while building reqwest::Client: {}", e));
        let broadcaster = Arc::new(RwLock::new(BroadcastChangeTracker::new(Vec::new())));

        let router_url = settings.router_url();
        let endpoint_url = settings.endpoint_url();

        Ok(Self {
            db,
            metrics,
            http,
            fernet,
            clients: Arc::new(ClientRegistry::default()),
            broadcaster,
            settings,
            router_url,
            endpoint_url,
        })
    }

    /// Initialize the `BroadcastChangeTracker`
    ///
    /// Via `autoconnect_common::megaphone::init_and_spawn_megaphone_updater`
    pub async fn init_and_spawn_megaphone_updater(&self) -> Result<(), ConfigError> {
        let Some(ref url) = self.settings.megaphone_api_url else {
            return Ok(());
        };
        let Some(ref token) = self.settings.megaphone_api_token else {
            return Err(ConfigError::Message(format!(
                "{ENV_PREFIX}__MEGAPHONE_API_URL requires {ENV_PREFIX}__MEGAPHONE_API_TOKEN"
            )));
        };
        init_and_spawn_megaphone_updater(
            &self.broadcaster,
            &self.http,
            &self.metrics,
            url,
            token,
            self.settings.megaphone_poll_interval,
        )
        .await
        .map_err(|e| ConfigError::Message(e.to_string()))?;
        Ok(())
    }
}

/// For tests
#[cfg(debug_assertions)]
impl Default for AppState {
    fn default() -> Self {
        Self::from_settings(Settings::test_settings()).unwrap()
    }
}