autoconnect_settings/
lib.rs

1mod app_state;
2
3extern crate slog;
4#[macro_use]
5extern crate slog_scope;
6extern crate serde_derive;
7
8use std::{io, net::ToSocketAddrs, time::Duration};
9
10use config::{Config, ConfigError, Environment, File};
11use fernet::Fernet;
12use lazy_static::lazy_static;
13use serde::{Deserialize, Deserializer};
14use serde_json::json;
15
16use autopush_common::util::deserialize_u32_to_duration;
17
18pub use app_state::AppState;
19
20pub const ENV_PREFIX: &str = "autoconnect";
21
22lazy_static! {
23    static ref HOSTNAME: String = mozsvc_common::get_hostname()
24        .expect("Couldn't get_hostname")
25        .into_string()
26        .expect("Couldn't convert get_hostname");
27    static ref RESOLVED_HOSTNAME: String = resolve_ip(&HOSTNAME)
28        .unwrap_or_else(|_| panic!("Failed to resolve hostname: {}", *HOSTNAME));
29}
30
31/// Resolve a hostname to its IP if possible
32fn resolve_ip(hostname: &str) -> io::Result<String> {
33    Ok((hostname, 0)
34        .to_socket_addrs()?
35        .next()
36        .map_or_else(|| hostname.to_owned(), |addr| addr.ip().to_string()))
37}
38
39/// Indicate whether the port should be included for the given scheme
40fn include_port(scheme: &str, port: u16) -> bool {
41    !((scheme == "http" && port == 80) || (scheme == "https" && port == 443))
42}
43
44/// The Applications settings, read from CLI, Environment or settings file, for the
45/// autoconnect application. These are later converted to
46/// [autoconnect::autoconnect-settings::AppState].
47#[derive(Clone, Debug, Deserialize)]
48#[serde(default)]
49pub struct Settings {
50    /// The application port to listen on
51    pub port: u16,
52    /// The DNS specified name of the application host to used for internal routing
53    pub hostname: Option<String>,
54    /// The override hostname to use for internal routing (NOTE: requires `hostname` to be set)
55    pub resolve_hostname: bool,
56    /// The internal webpush routing port
57    pub router_port: u16,
58    /// The DNS name to use for internal routing
59    pub router_hostname: Option<String>,
60    /// The server based ping interval (also used for Broadcast sends)
61    #[serde(deserialize_with = "deserialize_f64_to_duration")]
62    pub auto_ping_interval: Duration,
63    /// How long to wait for a response Pong before being timed out and connection drop
64    #[serde(deserialize_with = "deserialize_f64_to_duration")]
65    pub auto_ping_timeout: Duration,
66    /// How long to wait for the initial connection handshake.
67    #[serde(deserialize_with = "deserialize_u32_to_duration")]
68    pub open_handshake_timeout: Duration,
69    /// How long to wait while closing a connection for the response handshake.
70    #[serde(deserialize_with = "deserialize_u32_to_duration")]
71    pub close_handshake_timeout: Duration,
72    /// The URL scheme (http/https) for the endpoint URL
73    pub endpoint_scheme: String,
74    /// The host url for the endpoint URL (differs from `hostname` and `resolve_hostname`)
75    pub endpoint_hostname: String,
76    /// The optional port override for the endpoint URL
77    pub endpoint_port: u16,
78    /// The seed key to use for endpoint encryption
79    pub crypto_key: String,
80    /// The host name to send recorded metrics
81    pub statsd_host: Option<String>,
82    /// The port number to send recorded metrics
83    pub statsd_port: u16,
84    /// The root label to apply to metrics.
85    pub statsd_label: String,
86    /// The DSN to connect to the storage engine (Used to select between storage systems)
87    pub db_dsn: Option<String>,
88    /// JSON set of specific database settings (See data storage engines)
89    pub db_settings: String,
90    /// Server endpoint to pull Broadcast ID change values (Sent in Pings)
91    pub megaphone_api_url: Option<String>,
92    /// Broadcast token for authentication
93    pub megaphone_api_token: Option<String>,
94    /// How often to poll the server for new data
95    #[serde(deserialize_with = "deserialize_u32_to_duration")]
96    pub megaphone_poll_interval: Duration,
97    /// Use human readable (simplified, non-JSON)
98    pub human_logs: bool,
99    /// Maximum allowed number of backlogged messages. Exceeding this number will
100    /// trigger a user reset because the user may have been offline way too long.
101    pub msg_limit: u32,
102    /// Sets the maximum number of concurrent connections per actix-web worker.
103    ///
104    /// All socket listeners will stop accepting connections when this limit is
105    /// reached for each worker.
106    pub actix_max_connections: Option<usize>,
107    /// Sets number of actix-web workers to start (per bind address).
108    ///
109    /// By default, the number of available physical CPUs is used as the worker count.
110    pub actix_workers: Option<usize>,
111    #[cfg(feature = "reliable_report")]
112    /// The DNS for the reliability data store. This is normally a Redis compatible
113    /// storage system. See [Connection Parameters](https://docs.rs/redis/latest/redis/#connection-parameters)
114    /// for details.
115    pub reliability_dsn: Option<String>,
116    #[cfg(feature = "reliable_report")]
117    /// Max number of retries for retries for Redis transactions
118    pub reliability_retry_count: usize,
119}
120
121impl Default for Settings {
122    fn default() -> Self {
123        Self {
124            port: 8080,
125            hostname: None,
126            resolve_hostname: false,
127            router_port: 8081,
128            router_hostname: None,
129            auto_ping_interval: Duration::from_secs(300),
130            auto_ping_timeout: Duration::from_secs(4),
131            open_handshake_timeout: Duration::from_secs(5),
132            close_handshake_timeout: Duration::from_secs(0),
133            endpoint_scheme: "http".to_owned(),
134            endpoint_hostname: "localhost".to_owned(),
135            endpoint_port: 8082,
136            crypto_key: format!("[{}]", Fernet::generate_key()),
137            statsd_host: Some("localhost".to_owned()),
138            // Matches the legacy value
139            statsd_label: "autoconnect".to_owned(),
140            statsd_port: 8125,
141            db_dsn: None,
142            db_settings: "".to_owned(),
143            megaphone_api_url: None,
144            megaphone_api_token: None,
145            megaphone_poll_interval: Duration::from_secs(30),
146            human_logs: false,
147            msg_limit: 150,
148            actix_max_connections: None,
149            actix_workers: None,
150            #[cfg(feature = "reliable_report")]
151            reliability_dsn: None,
152            #[cfg(feature = "reliable_report")]
153            reliability_retry_count: autopush_common::redis_util::MAX_TRANSACTION_LOOP,
154        }
155    }
156}
157
158impl Settings {
159    /// Load the settings from the config files in order first then the environment.
160    pub fn with_env_and_config_files(filenames: &[String]) -> Result<Self, ConfigError> {
161        let mut s = Config::builder();
162
163        // Merge the configs from the files
164        for filename in filenames {
165            s = s.add_source(File::with_name(filename));
166        }
167
168        // Merge the environment overrides
169        s = s.add_source(Environment::with_prefix(&ENV_PREFIX.to_uppercase()).separator("__"));
170
171        let built = s.build()?;
172        let s = built.try_deserialize::<Settings>()?;
173        s.validate()?;
174        Ok(s)
175    }
176
177    pub fn router_url(&self) -> String {
178        let router_scheme = "http";
179        let url = format!(
180            "{}://{}",
181            router_scheme,
182            self.router_hostname
183                .as_ref()
184                .map_or_else(|| self.get_hostname(), String::clone),
185        );
186        if include_port(router_scheme, self.router_port) {
187            format!("{}:{}", url, self.router_port)
188        } else {
189            url
190        }
191    }
192
193    pub fn endpoint_url(&self) -> String {
194        let url = format!("{}://{}", self.endpoint_scheme, self.endpoint_hostname,);
195        if include_port(&self.endpoint_scheme, self.endpoint_port) {
196            format!("{}:{}", url, self.endpoint_port)
197        } else {
198            url
199        }
200    }
201
202    fn get_hostname(&self) -> String {
203        if let Some(ref hostname) = self.hostname {
204            if self.resolve_hostname {
205                resolve_ip(hostname)
206                    .unwrap_or_else(|_| panic!("Failed to resolve provided hostname: {hostname}"))
207            } else {
208                hostname.clone()
209            }
210        } else if self.resolve_hostname {
211            RESOLVED_HOSTNAME.clone()
212        } else {
213            HOSTNAME.clone()
214        }
215    }
216
217    pub fn validate(&self) -> Result<(), ConfigError> {
218        let non_zero = |val: Duration, name| {
219            if val.is_zero() {
220                return Err(ConfigError::Message(format!(
221                    "Invalid {ENV_PREFIX}_{name}: cannot be 0"
222                )));
223            }
224            Ok(())
225        };
226        non_zero(self.megaphone_poll_interval, "MEGAPHONE_POLL_INTERVAL")?;
227        non_zero(self.auto_ping_interval, "AUTO_PING_INTERVAL")?;
228        non_zero(self.auto_ping_timeout, "AUTO_PING_TIMEOUT")?;
229        Ok(())
230    }
231
232    pub fn test_settings() -> Self {
233        let db_dsn = Some("grpc://localhost:8086".to_string());
234        // BigTable DB_SETTINGS.
235        let db_settings = json!({
236            "table_name":"projects/test/instances/test/tables/autopush",
237            "message_family":"message",
238            "router_family":"router",
239            "message_topic_family":"message_topic",
240        })
241        .to_string();
242        Self {
243            db_dsn,
244            db_settings,
245            ..Default::default()
246        }
247    }
248}
249
250fn deserialize_f64_to_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
251where
252    D: Deserializer<'de>,
253{
254    let seconds: f64 = Deserialize::deserialize(deserializer)?;
255    Ok(Duration::new(
256        seconds as u64,
257        (seconds.fract() * 1_000_000_000.0) as u32,
258    ))
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use slog_scope::trace;
265
266    #[test]
267    fn test_router_url() {
268        let mut settings = Settings {
269            router_hostname: Some("testname".to_string()),
270            router_port: 80,
271            ..Default::default()
272        };
273        let url = settings.router_url();
274        assert_eq!("http://testname", url);
275
276        settings.router_port = 8080;
277        let url = settings.router_url();
278        assert_eq!("http://testname:8080", url);
279    }
280
281    #[test]
282    fn test_endpoint_url() {
283        let mut settings = Settings {
284            endpoint_hostname: "testname".to_string(),
285            endpoint_port: 80,
286            endpoint_scheme: "http".to_string(),
287            ..Default::default()
288        };
289        let url = settings.endpoint_url();
290        assert_eq!("http://testname", url);
291
292        settings.endpoint_port = 8080;
293        let url = settings.endpoint_url();
294        assert_eq!("http://testname:8080", url);
295
296        settings.endpoint_port = 443;
297        settings.endpoint_scheme = "https".to_string();
298        let url = settings.endpoint_url();
299        assert_eq!("https://testname", url);
300
301        settings.endpoint_port = 8080;
302        let url = settings.endpoint_url();
303        assert_eq!("https://testname:8080", url);
304    }
305
306    #[test]
307    fn test_default_settings() {
308        // Test that the Config works the way we expect it to.
309        use std::env;
310        let port = format!("{ENV_PREFIX}__PORT").to_uppercase();
311        let msg_limit = format!("{ENV_PREFIX}__MSG_LIMIT").to_uppercase();
312        let fernet = format!("{ENV_PREFIX}__CRYPTO_KEY").to_uppercase();
313
314        let v1 = env::var(&port);
315        let v2 = env::var(&msg_limit);
316        env::set_var(&port, "9123");
317        env::set_var(&msg_limit, "123");
318        env::set_var(&fernet, "[mqCGb8D-N7mqx6iWJov9wm70Us6kA9veeXdb8QUuzLQ=]");
319        let settings = Settings::with_env_and_config_files(&Vec::new()).unwrap();
320        assert_eq!(settings.endpoint_hostname, "localhost".to_owned());
321        assert_eq!(&settings.port, &9123);
322        assert_eq!(&settings.msg_limit, &123);
323        assert_eq!(
324            &settings.crypto_key,
325            "[mqCGb8D-N7mqx6iWJov9wm70Us6kA9veeXdb8QUuzLQ=]"
326        );
327        assert_eq!(settings.open_handshake_timeout, Duration::from_secs(5));
328
329        // reset (just in case)
330        if let Ok(p) = v1 {
331            trace!("Resetting {}", &port);
332            env::set_var(&port, p);
333        } else {
334            env::remove_var(&port);
335        }
336        if let Ok(p) = v2 {
337            trace!("Resetting {}", msg_limit);
338            env::set_var(&msg_limit, p);
339        } else {
340            env::remove_var(&msg_limit);
341        }
342        env::remove_var(&fernet);
343    }
344}