1use std::{
4 collections::HashSet,
5 convert::Infallible,
6 fmt::{self, Write as _},
7 future::{self, Future},
8 net::SocketAddr,
9 pin::Pin,
10 str,
11 sync::Arc,
12 time::{Duration, Instant},
13};
14
15use http_body_util::BodyExt as _;
16use hyper::{
17 body::Incoming, header, server::conn::http1, service::service_fn, Method, Request, Response,
18 StatusCode, Uri,
19};
20use hyper_util::{
21 client::legacy::Client,
22 rt::{TokioExecutor, TokioIo},
23};
24use tokio::{io, net::TcpListener, sync::watch};
25use vise::{Format, MetricsCollection, Registry};
26
27use crate::metrics::{Facade, EXPORTER_METRICS};
28
29#[cfg(test)]
30mod tests;
31
32#[derive(Clone)]
33struct MetricsExporterInner {
34 registry: Arc<Registry>,
35 format: Format,
36}
37
38impl MetricsExporterInner {
39 async fn render_body(&self) -> String {
40 let latency = EXPORTER_METRICS.scrape_latency[&Facade::Vise].start();
41 let registry = Arc::clone(&self.registry);
42 let format = self.format;
43 let buffer = tokio::task::spawn_blocking(move || {
47 let mut buffer = String::with_capacity(1_024);
48 registry.encode(&mut buffer, format).unwrap();
49 buffer
51 })
52 .await
53 .unwrap(); let latency = latency.observe();
56 let scraped_size = buffer.len();
57 EXPORTER_METRICS.scraped_size[&Facade::Vise].observe(scraped_size);
58 tracing::debug!(
59 latency_sec = latency.as_secs_f64(),
60 scraped_size,
61 "Scraped metrics using `vise` façade in {latency:?} (scraped size: {scraped_size}B)"
62 );
63 buffer
64 }
65
66 async fn render(&self) -> Response<String> {
67 let content_type = if matches!(self.format, Format::Prometheus) {
68 Format::PROMETHEUS_CONTENT_TYPE
69 } else {
70 Format::OPEN_METRICS_CONTENT_TYPE
71 };
72 Response::builder()
73 .status(StatusCode::OK)
74 .header(header::CONTENT_TYPE, content_type)
75 .body(self.render_body().await)
76 .unwrap()
77 }
78}
79
80pub struct MetricsExporter<'a> {
91 inner: MetricsExporterInner,
92 shutdown_future: Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
93}
94
95impl fmt::Debug for MetricsExporter<'_> {
96 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
97 formatter
98 .debug_struct("MetricsExporter")
99 .field("registry", &self.inner.registry)
100 .finish_non_exhaustive()
101 }
102}
103
104impl Default for MetricsExporter<'_> {
107 fn default() -> Self {
108 Self::new(MetricsCollection::default().collect().into())
109 }
110}
111
112impl<'a> MetricsExporter<'a> {
113 pub fn new(registry: Arc<Registry>) -> Self {
116 Self::log_metrics_stats(®istry);
117 Self {
118 inner: MetricsExporterInner {
119 registry,
120 format: Format::OpenMetricsForPrometheus,
121 },
122 shutdown_future: Box::pin(future::pending()),
123 }
124 }
125
126 fn log_metrics_stats(registry: &Registry) {
127 const SAMPLED_CRATE_COUNT: usize = 5;
128
129 let groups = registry.descriptors().groups();
130 let group_count = groups.len();
131 let metric_count = registry.descriptors().metric_count();
132
133 let mut unique_crates = HashSet::new();
134 for group in groups {
135 let crate_info = (group.crate_name, group.crate_version);
136 if unique_crates.insert(crate_info) && unique_crates.len() >= SAMPLED_CRATE_COUNT {
137 break;
138 }
139 }
140 let mut crates = String::with_capacity(unique_crates.len() * 16);
141 for (crate_name, crate_version) in unique_crates {
143 write!(crates, "{crate_name} {crate_version}, ").unwrap();
144 }
145 crates.push_str("...");
146
147 tracing::info!(
148 "Created metrics exporter with {metric_count} metrics in {group_count} groups from crates {crates}"
149 );
150 }
151
152 #[must_use]
158 pub fn with_format(mut self, format: Format) -> Self {
159 self.inner.format = format;
160 self
161 }
162
163 #[must_use]
165 pub fn with_graceful_shutdown<F>(mut self, shutdown: F) -> Self
166 where
167 F: Future<Output = ()> + Send + 'a,
168 {
169 self.shutdown_future = Box::pin(shutdown);
170 self
171 }
172
173 pub async fn start(self, bind_address: SocketAddr) -> io::Result<()> {
183 tracing::info!("Starting Prometheus exporter web server on {bind_address}");
184 self.bind(bind_address).await?.start().await?;
185 tracing::info!("Prometheus metrics exporter server shut down");
186 Ok(())
187 }
188
189 pub async fn bind(mut self, bind_address: SocketAddr) -> io::Result<MetricsServer<'a>> {
195 let listener = TcpListener::bind(bind_address).await?;
196 let local_addr = listener.local_addr()?;
197 let server = async move {
198 let (started_shutdown_sender, started_shutdown) = watch::channel(());
199 loop {
200 let stream = tokio::select! {
201 res = listener.accept() => res?.0,
202 () = &mut self.shutdown_future => break,
203 };
204
205 let io = TokioIo::new(stream);
206 let inner = self.inner.clone();
207 let mut started_shutdown = started_shutdown.clone();
208 tokio::spawn(async move {
209 let conn = http1::Builder::new().serve_connection(
210 io,
211 service_fn(|_| async { Ok::<_, Infallible>(inner.render().await) }),
212 );
213 tokio::pin!(conn);
214
215 let res = tokio::select! {
216 _ = started_shutdown.changed() => {
217 conn.as_mut().graceful_shutdown();
218 conn.await
219 }
220 res = conn.as_mut() => res,
221 };
222 if let Err(err) = res {
223 tracing::warn!(%err, "Error serving connection");
224 }
225 });
226 }
227
228 tracing::info!("Stop signal received, Prometheus metrics exporter is shutting down");
229 drop(started_shutdown);
231 started_shutdown_sender.send_replace(());
232 started_shutdown_sender.closed().await;
234
235 Ok(())
236 };
237
238 Ok(MetricsServer {
239 server: Box::pin(server),
240 local_addr,
241 })
242 }
243
244 #[allow(clippy::missing_panics_doc)]
246 pub async fn push_to_gateway(self, endpoint: Uri, interval: Duration) {
247 const ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
250
251 tracing::info!(
252 "Starting push-based Prometheus exporter to `{endpoint}` with push interval {interval:?}"
253 );
254
255 let client = Client::builder(TokioExecutor::new()).build_http();
256 let mut shutdown = self.shutdown_future;
257 let mut last_error_log_timestamp = None::<Instant>;
258 loop {
259 let mut shutdown_requested = false;
260 if tokio::time::timeout(interval, &mut shutdown).await.is_ok() {
261 tracing::info!(
262 "Stop signal received, Prometheus metrics exporter is shutting down"
263 );
264 shutdown_requested = true;
265 }
266
267 let request = Request::builder()
268 .method(Method::PUT)
269 .uri(endpoint.clone())
270 .header(header::CONTENT_TYPE, Format::OPEN_METRICS_CONTENT_TYPE)
271 .body(self.inner.render_body().await)
272 .expect("Failed creating Prometheus push gateway request");
273
274 match client.request(request).await {
275 Ok(response) => {
276 if !response.status().is_success() {
277 let should_log_error = last_error_log_timestamp
278 .map_or(true, |timestamp| timestamp.elapsed() >= ERROR_LOG_INTERVAL);
279 if should_log_error {
280 tokio::spawn(report_erroneous_response(endpoint.clone(), response));
282 last_error_log_timestamp = Some(Instant::now());
283 }
286 }
287 }
288 Err(err) => {
289 let should_log_error = last_error_log_timestamp
290 .map_or(true, |timestamp| timestamp.elapsed() >= ERROR_LOG_INTERVAL);
291 if should_log_error {
292 tracing::error!(
293 %err,
294 %endpoint,
295 "Error submitting metrics to Prometheus push gateway"
296 );
297 last_error_log_timestamp = Some(Instant::now());
298 }
299 }
300 }
301 if shutdown_requested {
302 break;
303 }
304 }
305 }
306}
307
308async fn report_erroneous_response(endpoint: Uri, response: Response<Incoming>) {
309 let status = response.status();
310
311 let body = match response.into_body().collect().await {
312 Ok(body) => body.to_bytes(),
313 Err(err) => {
314 tracing::error!(
315 %err,
316 %status,
317 %endpoint,
318 "Failed reading erroneous response from Prometheus push gateway"
319 );
320 return;
321 }
322 };
323
324 let err_body: String;
325 let body = match str::from_utf8(&body) {
326 Ok(body) => body,
327 Err(err) => {
328 let body_length = body.len();
329 err_body = format!("(Non UTF-8 body with length {body_length}B: {err})");
330 &err_body
331 }
332 };
333 tracing::warn!(
334 %status,
335 %body,
336 %endpoint,
337 "Error pushing metrics to Prometheus push gateway"
338 );
339}
340
341#[must_use = "Server should be `start()`ed"]
345pub struct MetricsServer<'a> {
346 server: Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>,
347 local_addr: SocketAddr,
348}
349
350impl fmt::Debug for MetricsServer<'_> {
351 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
352 formatter
353 .debug_struct("MetricsServer")
354 .field("local_addr", &self.local_addr)
355 .finish_non_exhaustive()
356 }
357}
358
359impl MetricsServer<'_> {
360 pub fn local_addr(&self) -> SocketAddr {
362 self.local_addr
363 }
364
365 pub async fn start(self) -> io::Result<()> {
371 self.server.await
372 }
373}