vise_exporter/exporter/
mod.rs

1//! `MetricsExporter` and closely related types.
2
3use std::{
4    collections::HashSet,
5    convert::Infallible,
6    fmt::{self, Write as _},
7    future::{self, Future},
8    net::SocketAddr,
9    pin::Pin,
10    str,
11    sync::Arc,
12    time::{Duration, Instant},
13};
14
15use http_body_util::BodyExt as _;
16use hyper::{
17    body::Incoming, header, server::conn::http1, service::service_fn, Method, Request, Response,
18    StatusCode, Uri,
19};
20use hyper_util::{
21    client::legacy::Client,
22    rt::{TokioExecutor, TokioIo},
23};
24use tokio::{io, net::TcpListener, sync::watch};
25use vise::{Format, MetricsCollection, Registry};
26
27use crate::metrics::{Facade, EXPORTER_METRICS};
28
29#[cfg(test)]
30mod tests;
31
32#[derive(Clone)]
33struct MetricsExporterInner {
34    registry: Arc<Registry>,
35    format: Format,
36}
37
38impl MetricsExporterInner {
39    async fn render_body(&self) -> String {
40        let latency = EXPORTER_METRICS.scrape_latency[&Facade::Vise].start();
41        let registry = Arc::clone(&self.registry);
42        let format = self.format;
43        // `Registry::encode()` is blocking in the general case (specifically, if collectors are used; they may use
44        // blocking I/O etc.). We cannot make metric collection non-blocking because the underlying library only provides
45        // blocking interface for collectors.
46        let buffer = tokio::task::spawn_blocking(move || {
47            let mut buffer = String::with_capacity(1_024);
48            registry.encode(&mut buffer, format).unwrap();
49            // ^ `unwrap()` is safe; writing to a string never fails.
50            buffer
51        })
52        .await
53        .unwrap(); // propagate panics should they occur in the spawned blocking task
54
55        let latency = latency.observe();
56        let scraped_size = buffer.len();
57        EXPORTER_METRICS.scraped_size[&Facade::Vise].observe(scraped_size);
58        tracing::debug!(
59            latency_sec = latency.as_secs_f64(),
60            scraped_size,
61            "Scraped metrics using `vise` façade in {latency:?} (scraped size: {scraped_size}B)"
62        );
63        buffer
64    }
65
66    async fn render(&self) -> Response<String> {
67        let content_type = if matches!(self.format, Format::Prometheus) {
68            Format::PROMETHEUS_CONTENT_TYPE
69        } else {
70            Format::OPEN_METRICS_CONTENT_TYPE
71        };
72        Response::builder()
73            .status(StatusCode::OK)
74            .header(header::CONTENT_TYPE, content_type)
75            .body(self.render_body().await)
76            .unwrap()
77    }
78}
79
80/// Metrics exporter to Prometheus.
81///
82/// An exporter scrapes metrics from a [`Registry`]. A [`Default`] exporter will use the registry
83/// of all metrics auto-registered in an app and all its (transitive) dependencies, i.e. one
84/// created using [`MetricsCollection::default()`] [`.collect()`](MetricsCollection::collect()).
85/// To have more granular control over the registry, you can provide it explicitly using [`Self::new()`].
86///
87/// # Examples
88///
89/// See crate-level docs for the examples of usage.
90pub struct MetricsExporter<'a> {
91    inner: MetricsExporterInner,
92    shutdown_future: Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
93}
94
95impl fmt::Debug for MetricsExporter<'_> {
96    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
97        formatter
98            .debug_struct("MetricsExporter")
99            .field("registry", &self.inner.registry)
100            .finish_non_exhaustive()
101    }
102}
103
104/// Creates an exporter based on [`MetricsCollection`]`::default().collect()` output (i.e., with all metrics
105/// registered by the app and libs it depends on).
106impl Default for MetricsExporter<'_> {
107    fn default() -> Self {
108        Self::new(MetricsCollection::default().collect().into())
109    }
110}
111
112impl<'a> MetricsExporter<'a> {
113    /// Creates an exporter based on the provided metrics [`Registry`]. Note that the registry
114    /// is in `Arc`, meaning it can be used elsewhere (e.g., to export data in another format).
115    pub fn new(registry: Arc<Registry>) -> Self {
116        Self::log_metrics_stats(&registry);
117        Self {
118            inner: MetricsExporterInner {
119                registry,
120                format: Format::OpenMetricsForPrometheus,
121            },
122            shutdown_future: Box::pin(future::pending()),
123        }
124    }
125
126    fn log_metrics_stats(registry: &Registry) {
127        const SAMPLED_CRATE_COUNT: usize = 5;
128
129        let groups = registry.descriptors().groups();
130        let group_count = groups.len();
131        let metric_count = registry.descriptors().metric_count();
132
133        let mut unique_crates = HashSet::new();
134        for group in groups {
135            let crate_info = (group.crate_name, group.crate_version);
136            if unique_crates.insert(crate_info) && unique_crates.len() >= SAMPLED_CRATE_COUNT {
137                break;
138            }
139        }
140        let mut crates = String::with_capacity(unique_crates.len() * 16);
141        // ^ 16 chars looks like a somewhat reasonable estimate for crate name + version
142        for (crate_name, crate_version) in unique_crates {
143            write!(crates, "{crate_name} {crate_version}, ").unwrap();
144        }
145        crates.push_str("...");
146
147        tracing::info!(
148            "Created metrics exporter with {metric_count} metrics in {group_count} groups from crates {crates}"
149        );
150    }
151
152    /// Sets the export [`Format`]. By default, [`Format::OpenMetricsForPrometheus`] is used
153    /// (i.e., OpenMetrics text format with minor changes so that it is fully parsed by Prometheus).
154    ///
155    /// See `Format` docs for more details on differences between export formats. Note that using
156    /// [`Format::OpenMetrics`] is not fully supported by Prometheus at the time of writing.
157    #[must_use]
158    pub fn with_format(mut self, format: Format) -> Self {
159        self.inner.format = format;
160        self
161    }
162
163    /// Configures graceful shutdown for the exporter server.
164    #[must_use]
165    pub fn with_graceful_shutdown<F>(mut self, shutdown: F) -> Self
166    where
167        F: Future<Output = ()> + Send + 'a,
168    {
169        self.shutdown_future = Box::pin(shutdown);
170        self
171    }
172
173    /// Starts the server on the specified address. This future resolves when the server is shut down.
174    ///
175    /// The server will expose the following endpoints:
176    ///
177    /// - `GET` on any path: serves the metrics in the text format configured using [`Self::with_format()`]
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if binding to the specified address fails.
182    pub async fn start(self, bind_address: SocketAddr) -> io::Result<()> {
183        tracing::info!("Starting Prometheus exporter web server on {bind_address}");
184        self.bind(bind_address).await?.start().await?;
185        tracing::info!("Prometheus metrics exporter server shut down");
186        Ok(())
187    }
188
189    /// Creates an HTTP exporter server and binds it to the specified address.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if binding to the specified address fails.
194    pub async fn bind(mut self, bind_address: SocketAddr) -> io::Result<MetricsServer<'a>> {
195        let listener = TcpListener::bind(bind_address).await?;
196        let local_addr = listener.local_addr()?;
197        let server = async move {
198            let (started_shutdown_sender, started_shutdown) = watch::channel(());
199            loop {
200                let stream = tokio::select! {
201                    res = listener.accept() => res?.0,
202                    () = &mut self.shutdown_future => break,
203                };
204
205                let io = TokioIo::new(stream);
206                let inner = self.inner.clone();
207                let mut started_shutdown = started_shutdown.clone();
208                tokio::spawn(async move {
209                    let conn = http1::Builder::new().serve_connection(
210                        io,
211                        service_fn(|_| async { Ok::<_, Infallible>(inner.render().await) }),
212                    );
213                    tokio::pin!(conn);
214
215                    let res = tokio::select! {
216                        _ = started_shutdown.changed() => {
217                            conn.as_mut().graceful_shutdown();
218                            conn.await
219                        }
220                        res = conn.as_mut() => res,
221                    };
222                    if let Err(err) = res {
223                        tracing::warn!(%err, "Error serving connection");
224                    }
225                });
226            }
227
228            tracing::info!("Stop signal received, Prometheus metrics exporter is shutting down");
229            // Send the graceful shutdown signal to all alive connections.
230            drop(started_shutdown);
231            started_shutdown_sender.send_replace(());
232            // Wait until all connections are dropped.
233            started_shutdown_sender.closed().await;
234
235            Ok(())
236        };
237
238        Ok(MetricsServer {
239            server: Box::pin(server),
240            local_addr,
241        })
242    }
243
244    /// Starts pushing metrics to the `endpoint` with the specified `interval` between pushes.
245    #[allow(clippy::missing_panics_doc)]
246    pub async fn push_to_gateway(self, endpoint: Uri, interval: Duration) {
247        /// Minimum interval between error logs. Prevents spanning logs at `WARN` / `ERROR` level
248        /// too frequently if `interval` is low (e.g., 1s).
249        const ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
250
251        tracing::info!(
252            "Starting push-based Prometheus exporter to `{endpoint}` with push interval {interval:?}"
253        );
254
255        let client = Client::builder(TokioExecutor::new()).build_http();
256        let mut shutdown = self.shutdown_future;
257        let mut last_error_log_timestamp = None::<Instant>;
258        loop {
259            let mut shutdown_requested = false;
260            if tokio::time::timeout(interval, &mut shutdown).await.is_ok() {
261                tracing::info!(
262                    "Stop signal received, Prometheus metrics exporter is shutting down"
263                );
264                shutdown_requested = true;
265            }
266
267            let request = Request::builder()
268                .method(Method::PUT)
269                .uri(endpoint.clone())
270                .header(header::CONTENT_TYPE, Format::OPEN_METRICS_CONTENT_TYPE)
271                .body(self.inner.render_body().await)
272                .expect("Failed creating Prometheus push gateway request");
273
274            match client.request(request).await {
275                Ok(response) => {
276                    if !response.status().is_success() {
277                        let should_log_error = last_error_log_timestamp
278                            .map_or(true, |timestamp| timestamp.elapsed() >= ERROR_LOG_INTERVAL);
279                        if should_log_error {
280                            // Do not block further pushes during error handling.
281                            tokio::spawn(report_erroneous_response(endpoint.clone(), response));
282                            last_error_log_timestamp = Some(Instant::now());
283                            // ^ This timestamp is somewhat imprecise (we don't wait to handle the response),
284                            // but it seems fine for rate-limiting purposes.
285                        }
286                    }
287                }
288                Err(err) => {
289                    let should_log_error = last_error_log_timestamp
290                        .map_or(true, |timestamp| timestamp.elapsed() >= ERROR_LOG_INTERVAL);
291                    if should_log_error {
292                        tracing::error!(
293                            %err,
294                            %endpoint,
295                            "Error submitting metrics to Prometheus push gateway"
296                        );
297                        last_error_log_timestamp = Some(Instant::now());
298                    }
299                }
300            }
301            if shutdown_requested {
302                break;
303            }
304        }
305    }
306}
307
308async fn report_erroneous_response(endpoint: Uri, response: Response<Incoming>) {
309    let status = response.status();
310
311    let body = match response.into_body().collect().await {
312        Ok(body) => body.to_bytes(),
313        Err(err) => {
314            tracing::error!(
315                %err,
316                %status,
317                %endpoint,
318                "Failed reading erroneous response from Prometheus push gateway"
319            );
320            return;
321        }
322    };
323
324    let err_body: String;
325    let body = match str::from_utf8(&body) {
326        Ok(body) => body,
327        Err(err) => {
328            let body_length = body.len();
329            err_body = format!("(Non UTF-8 body with length {body_length}B: {err})");
330            &err_body
331        }
332    };
333    tracing::warn!(
334        %status,
335        %body,
336        %endpoint,
337        "Error pushing metrics to Prometheus push gateway"
338    );
339}
340
341/// Metrics server bound to a certain local address returned by [`MetricsExporter::bind()`].
342///
343/// Useful e.g. if you need to find out which port the server was bound to if the 0th port was specified.
344#[must_use = "Server should be `start()`ed"]
345pub struct MetricsServer<'a> {
346    server: Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
347    local_addr: SocketAddr,
348}
349
350impl fmt::Debug for MetricsServer<'_> {
351    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
352        formatter
353            .debug_struct("MetricsServer")
354            .field("local_addr", &self.local_addr)
355            .finish_non_exhaustive()
356    }
357}
358
359impl MetricsServer<'_> {
360    /// Returns the local address this server is bound to.
361    pub fn local_addr(&self) -> SocketAddr {
362        self.local_addr
363    }
364
365    /// Starts this server. Resolves once the server is shut down.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if starting the server operation fails.
370    pub async fn start(self) -> io::Result<()> {
371        self.server.await
372    }
373}