1mod app_state;
2
3extern crate slog;
4#[macro_use]
5extern crate slog_scope;
6extern crate serde_derive;
7
8use std::{io, net::ToSocketAddrs, time::Duration};
9
10use config::{Config, ConfigError, Environment, File};
11use fernet::Fernet;
12use lazy_static::lazy_static;
13use serde::{Deserialize, Deserializer};
14use serde_json::json;
15
16use autopush_common::util::deserialize_u32_to_duration;
17
18pub use app_state::AppState;
19
20pub const ENV_PREFIX: &str = "autoconnect";
21
22lazy_static! {
23 static ref HOSTNAME: String = mozsvc_common::get_hostname()
24 .expect("Couldn't get_hostname")
25 .into_string()
26 .expect("Couldn't convert get_hostname");
27 static ref RESOLVED_HOSTNAME: String = resolve_ip(&HOSTNAME)
28 .unwrap_or_else(|_| panic!("Failed to resolve hostname: {}", *HOSTNAME));
29}
30
31fn resolve_ip(hostname: &str) -> io::Result<String> {
33 Ok((hostname, 0)
34 .to_socket_addrs()?
35 .next()
36 .map_or_else(|| hostname.to_owned(), |addr| addr.ip().to_string()))
37}
38
39fn include_port(scheme: &str, port: u16) -> bool {
41 !((scheme == "http" && port == 80) || (scheme == "https" && port == 443))
42}
43
44#[derive(Clone, Debug, Deserialize)]
48#[serde(default)]
49pub struct Settings {
50 pub port: u16,
52 pub hostname: Option<String>,
54 pub resolve_hostname: bool,
56 pub router_port: u16,
58 pub router_hostname: Option<String>,
60 #[serde(deserialize_with = "deserialize_f64_to_duration")]
62 pub auto_ping_interval: Duration,
63 #[serde(deserialize_with = "deserialize_f64_to_duration")]
65 pub auto_ping_timeout: Duration,
66 #[serde(deserialize_with = "deserialize_u32_to_duration")]
68 pub open_handshake_timeout: Duration,
69 #[serde(deserialize_with = "deserialize_u32_to_duration")]
71 pub close_handshake_timeout: Duration,
72 pub endpoint_scheme: String,
74 pub endpoint_hostname: String,
76 pub endpoint_port: u16,
78 pub crypto_key: String,
80 pub statsd_host: Option<String>,
82 pub statsd_port: u16,
84 pub statsd_label: String,
86 pub db_dsn: Option<String>,
88 pub db_settings: String,
90 pub megaphone_api_url: Option<String>,
92 pub megaphone_api_token: Option<String>,
94 #[serde(deserialize_with = "deserialize_u32_to_duration")]
96 pub megaphone_poll_interval: Duration,
97 pub human_logs: bool,
99 pub msg_limit: u32,
102 pub actix_max_connections: Option<usize>,
107 pub actix_workers: Option<usize>,
111 #[cfg(feature = "reliable_report")]
112 pub reliability_dsn: Option<String>,
116 #[cfg(feature = "reliable_report")]
117 pub reliability_retry_count: usize,
119}
120
121impl Default for Settings {
122 fn default() -> Self {
123 Self {
124 port: 8080,
125 hostname: None,
126 resolve_hostname: false,
127 router_port: 8081,
128 router_hostname: None,
129 auto_ping_interval: Duration::from_secs(300),
130 auto_ping_timeout: Duration::from_secs(4),
131 open_handshake_timeout: Duration::from_secs(5),
132 close_handshake_timeout: Duration::from_secs(0),
133 endpoint_scheme: "http".to_owned(),
134 endpoint_hostname: "localhost".to_owned(),
135 endpoint_port: 8082,
136 crypto_key: format!("[{}]", Fernet::generate_key()),
137 statsd_host: Some("localhost".to_owned()),
138 statsd_label: "autoconnect".to_owned(),
140 statsd_port: 8125,
141 db_dsn: None,
142 db_settings: "".to_owned(),
143 megaphone_api_url: None,
144 megaphone_api_token: None,
145 megaphone_poll_interval: Duration::from_secs(30),
146 human_logs: false,
147 msg_limit: 150,
148 actix_max_connections: None,
149 actix_workers: None,
150 #[cfg(feature = "reliable_report")]
151 reliability_dsn: None,
152 #[cfg(feature = "reliable_report")]
153 reliability_retry_count: autopush_common::redis_util::MAX_TRANSACTION_LOOP,
154 }
155 }
156}
157
158impl Settings {
159 pub fn with_env_and_config_files(filenames: &[String]) -> Result<Self, ConfigError> {
161 let mut s = Config::builder();
162
163 for filename in filenames {
165 s = s.add_source(File::with_name(filename));
166 }
167
168 s = s.add_source(Environment::with_prefix(&ENV_PREFIX.to_uppercase()).separator("__"));
170
171 let built = s.build()?;
172 let s = built.try_deserialize::<Settings>()?;
173 s.validate()?;
174 Ok(s)
175 }
176
177 pub fn router_url(&self) -> String {
178 let router_scheme = "http";
179 let url = format!(
180 "{}://{}",
181 router_scheme,
182 self.router_hostname
183 .as_ref()
184 .map_or_else(|| self.get_hostname(), String::clone),
185 );
186 if include_port(router_scheme, self.router_port) {
187 format!("{}:{}", url, self.router_port)
188 } else {
189 url
190 }
191 }
192
193 pub fn endpoint_url(&self) -> String {
194 let url = format!("{}://{}", self.endpoint_scheme, self.endpoint_hostname,);
195 if include_port(&self.endpoint_scheme, self.endpoint_port) {
196 format!("{}:{}", url, self.endpoint_port)
197 } else {
198 url
199 }
200 }
201
202 fn get_hostname(&self) -> String {
203 if let Some(ref hostname) = self.hostname {
204 if self.resolve_hostname {
205 resolve_ip(hostname)
206 .unwrap_or_else(|_| panic!("Failed to resolve provided hostname: {hostname}"))
207 } else {
208 hostname.clone()
209 }
210 } else if self.resolve_hostname {
211 RESOLVED_HOSTNAME.clone()
212 } else {
213 HOSTNAME.clone()
214 }
215 }
216
217 pub fn validate(&self) -> Result<(), ConfigError> {
218 let non_zero = |val: Duration, name| {
219 if val.is_zero() {
220 return Err(ConfigError::Message(format!(
221 "Invalid {ENV_PREFIX}_{name}: cannot be 0"
222 )));
223 }
224 Ok(())
225 };
226 non_zero(self.megaphone_poll_interval, "MEGAPHONE_POLL_INTERVAL")?;
227 non_zero(self.auto_ping_interval, "AUTO_PING_INTERVAL")?;
228 non_zero(self.auto_ping_timeout, "AUTO_PING_TIMEOUT")?;
229 Ok(())
230 }
231
232 pub fn test_settings() -> Self {
233 let db_dsn = Some("grpc://localhost:8086".to_string());
234 let db_settings = json!({
236 "table_name":"projects/test/instances/test/tables/autopush",
237 "message_family":"message",
238 "router_family":"router",
239 "message_topic_family":"message_topic",
240 })
241 .to_string();
242 Self {
243 db_dsn,
244 db_settings,
245 ..Default::default()
246 }
247 }
248}
249
250fn deserialize_f64_to_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
251where
252 D: Deserializer<'de>,
253{
254 let seconds: f64 = Deserialize::deserialize(deserializer)?;
255 Ok(Duration::new(
256 seconds as u64,
257 (seconds.fract() * 1_000_000_000.0) as u32,
258 ))
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use slog_scope::trace;
265
266 #[test]
267 fn test_router_url() {
268 let mut settings = Settings {
269 router_hostname: Some("testname".to_string()),
270 router_port: 80,
271 ..Default::default()
272 };
273 let url = settings.router_url();
274 assert_eq!("http://testname", url);
275
276 settings.router_port = 8080;
277 let url = settings.router_url();
278 assert_eq!("http://testname:8080", url);
279 }
280
281 #[test]
282 fn test_endpoint_url() {
283 let mut settings = Settings {
284 endpoint_hostname: "testname".to_string(),
285 endpoint_port: 80,
286 endpoint_scheme: "http".to_string(),
287 ..Default::default()
288 };
289 let url = settings.endpoint_url();
290 assert_eq!("http://testname", url);
291
292 settings.endpoint_port = 8080;
293 let url = settings.endpoint_url();
294 assert_eq!("http://testname:8080", url);
295
296 settings.endpoint_port = 443;
297 settings.endpoint_scheme = "https".to_string();
298 let url = settings.endpoint_url();
299 assert_eq!("https://testname", url);
300
301 settings.endpoint_port = 8080;
302 let url = settings.endpoint_url();
303 assert_eq!("https://testname:8080", url);
304 }
305
306 #[test]
307 fn test_default_settings() {
308 use std::env;
310 let port = format!("{ENV_PREFIX}__PORT").to_uppercase();
311 let msg_limit = format!("{ENV_PREFIX}__MSG_LIMIT").to_uppercase();
312 let fernet = format!("{ENV_PREFIX}__CRYPTO_KEY").to_uppercase();
313
314 let v1 = env::var(&port);
315 let v2 = env::var(&msg_limit);
316 env::set_var(&port, "9123");
317 env::set_var(&msg_limit, "123");
318 env::set_var(&fernet, "[mqCGb8D-N7mqx6iWJov9wm70Us6kA9veeXdb8QUuzLQ=]");
319 let settings = Settings::with_env_and_config_files(&Vec::new()).unwrap();
320 assert_eq!(settings.endpoint_hostname, "localhost".to_owned());
321 assert_eq!(&settings.port, &9123);
322 assert_eq!(&settings.msg_limit, &123);
323 assert_eq!(
324 &settings.crypto_key,
325 "[mqCGb8D-N7mqx6iWJov9wm70Us6kA9veeXdb8QUuzLQ=]"
326 );
327 assert_eq!(settings.open_handshake_timeout, Duration::from_secs(5));
328
329 if let Ok(p) = v1 {
331 trace!("Resetting {}", &port);
332 env::set_var(&port, p);
333 } else {
334 env::remove_var(&port);
335 }
336 if let Ok(p) = v2 {
337 trace!("Resetting {}", msg_limit);
338 env::set_var(&msg_limit, p);
339 } else {
340 env::remove_var(&msg_limit);
341 }
342 env::remove_var(&fernet);
343 }
344}