autoendpoint/
metrics.rs

1use std::{sync::Arc, time::Instant};
2
3use actix_web::{dev::Payload, web::Data, FromRequest, HttpMessage, HttpRequest};
4use cadence::{Metric, MetricError, NopMetricSink, StatsdClient, Timed};
5use futures::future;
6
7use autopush_common::{metric_name::MetricName, metrics::StatsdClientExt, tags::Tags};
8
9use crate::{error::ApiError, server::AppState, settings::Settings};
10
11#[derive(Debug, Clone)]
12pub struct MetricTimer {
13    pub label: String,
14    pub start: Instant,
15    pub tags: Tags,
16}
17
18#[derive(Debug, Clone)]
19pub struct Metrics {
20    client: Option<Arc<StatsdClient>>,
21    timer: Option<MetricTimer>,
22    tags: Option<Tags>,
23}
24
25impl Drop for Metrics {
26    fn drop(&mut self) {
27        let tags = self.tags.clone().unwrap_or_default();
28        if let Some(client) = self.client.as_ref() {
29            if let Some(timer) = self.timer.as_ref() {
30                let lapse = (Instant::now() - timer.start).as_millis() as u64;
31                trace!(
32                    "⌚ Ending timer at nanos: {:?} : {:?}",
33                    &timer.label,
34                    lapse;
35                    tags
36                );
37                let mut tagged = client.time_with_tags(&timer.label, lapse);
38                // Include any "hard coded" tags.
39                // tagged = tagged.with_tag("version", env!("CARGO_PKG_VERSION"));
40                let tags = timer.tags.tags.clone();
41                let keys = tags.keys();
42                for tag in keys {
43                    tagged = tagged.with_tag(tag, tags.get(tag).unwrap())
44                }
45                match tagged.try_send() {
46                    Err(e) => {
47                        // eat the metric, but log the error
48                        warn!("⚠️ Metric {} error: {:?} ", &timer.label, e);
49                    }
50                    Ok(v) => {
51                        trace!("⌚ {:?}", v.as_metric_str());
52                    }
53                }
54            }
55        }
56    }
57}
58
59impl FromRequest for Metrics {
60    type Error = ApiError;
61    type Future = future::Ready<Result<Self, Self::Error>>;
62
63    fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
64        future::ok(Metrics::from(req))
65    }
66}
67
68impl From<&HttpRequest> for Metrics {
69    fn from(req: &HttpRequest) -> Self {
70        let exts = req.extensions();
71        let def_tags = Tags::from_request_head(req.head());
72        let tags = exts.get::<Tags>().unwrap_or(&def_tags);
73        Metrics {
74            client: Some(metrics_from_req(req)),
75            tags: Some(tags.clone()),
76            timer: None,
77        }
78    }
79}
80
81impl From<StatsdClient> for Metrics {
82    fn from(client: StatsdClient) -> Self {
83        Metrics {
84            client: Some(Arc::new(client)),
85            tags: None,
86            timer: None,
87        }
88    }
89}
90
91impl From<&actix_web::web::Data<AppState>> for Metrics {
92    fn from(state: &actix_web::web::Data<AppState>) -> Self {
93        Metrics {
94            client: Some(state.metrics.clone()),
95            tags: None,
96            timer: None,
97        }
98    }
99}
100
101impl Metrics {
102    #![allow(unused)] // TODO: Start using metrics
103
104    pub fn sink() -> StatsdClient {
105        StatsdClient::builder("", NopMetricSink).build()
106    }
107
108    pub fn noop() -> Self {
109        Self {
110            client: Some(Arc::new(Self::sink())),
111            timer: None,
112            tags: None,
113        }
114    }
115
116    pub fn start_timer(&mut self, label: &str, tags: Option<Tags>) {
117        let mut mtags = self.tags.clone().unwrap_or_default();
118        if let Some(t) = tags {
119            mtags.extend(t.tags)
120        }
121
122        trace!("⌚ Starting timer... {:?}", &label; &mtags);
123        self.timer = Some(MetricTimer {
124            label: label.to_owned(),
125            start: Instant::now(),
126            tags: mtags,
127        });
128    }
129
130    // increment a counter with no tags data.
131    pub fn incr(self, metric: MetricName) {
132        self.incr_with_tags(metric, None)
133    }
134
135    pub fn incr_with_tags(self, metric: MetricName, tags: Option<Tags>) {
136        if let Some(client) = self.client.as_ref() {
137            let mut tagged = client.incr_with_tags(metric.clone());
138            let mut mtags = self.tags.clone().unwrap_or_default();
139            if let Some(t) = tags {
140                mtags.tags.extend(t.tags)
141            }
142            let tag_keys = mtags.tags.keys();
143            for key in tag_keys.clone() {
144                tagged = tagged.with_tag(key, mtags.tags.get(key).unwrap());
145            }
146            match tagged.try_send() {
147                Err(e) => {
148                    // eat the metric, but log the error
149                    warn!("⚠️ Metric {} error: {:?}", metric.as_ref(), e; mtags);
150                }
151                Ok(v) => trace!("☑️ {:?}", v.as_metric_str()),
152            }
153        }
154    }
155}
156
157pub fn metrics_from_req(req: &HttpRequest) -> Arc<StatsdClient> {
158    req.app_data::<Data<AppState>>()
159        .expect("Could not get state in metrics_from_req")
160        .metrics
161        .clone()
162}
163
164/// Create a cadence StatsdClient from the given options
165pub fn metrics_from_settings(settings: &Settings) -> Result<StatsdClient, MetricError> {
166    let client = autopush_common::metrics::builder(
167        &settings.statsd_label,
168        &settings.statsd_host,
169        settings.statsd_port,
170    )?
171    .build();
172    Ok(client)
173}