From b5322ac6b355a1052127db8b9b7fb6a039694469 Mon Sep 17 00:00:00 2001 From: rbehjati Date: Thu, 14 May 2020 15:02:27 +0100 Subject: [PATCH] Manage prometheus registry in Oak Runtime (#941) - Instantiates and manages prometheus registry in the Runtime, instead of using a static instance - Uses labels for finer grained collection of metrics - Better instrumentation for gRPC server pseudo-node - Exposes metrics on `/metrics` endpoint --- .../rust/oak_runtime/src/metrics/mod.rs | 143 +++++++++++++----- .../rust/oak_runtime/src/metrics/server.rs | 45 ++++-- .../rust/oak_runtime/src/node/grpc/server.rs | 29 +++- .../rust/oak_runtime/src/runtime/mod.rs | 40 +++-- .../oak_runtime/tests/integration_test.rs | 6 +- 5 files changed, 199 insertions(+), 64 deletions(-) diff --git a/oak/server/rust/oak_runtime/src/metrics/mod.rs b/oak/server/rust/oak_runtime/src/metrics/mod.rs index 9f2a378e5ed..5330d0580f1 100644 --- a/oak/server/rust/oak_runtime/src/metrics/mod.rs +++ b/oak/server/rust/oak_runtime/src/metrics/mod.rs @@ -15,53 +15,126 @@ // use prometheus::{ - opts, register_histogram, register_int_counter, register_int_gauge, Histogram, IntCounter, - IntGauge, + proto::MetricFamily, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry, }; pub mod server; +/// Helper struct with functions for creating and registering metrics. +struct MetricsBuilder { + pub registry: Registry, +} + +/// Mostly copied from https://github.com/grpc-ecosystem/java-grpc-prometheus +#[derive(Clone)] +pub struct GrpcServerMetrics { + /// Total number of RPCs started on the server. + pub grpc_server_started_total: IntCounterVec, + /// Total number of RPCs completed on the server, regardless of success or failure. + pub grpc_server_handled_total: IntCounterVec, + /// Histogram of response latency of RPCs handled by the server, in seconds. + pub grpc_server_handled_latency_seconds: HistogramVec, + /// Histogram of response sizes of RPCs handled by the server. + pub grpc_response_size_bytes: HistogramVec, +} + +/// Struct that collects all metrics for monitoring the Oak Runtime. +#[derive(Clone)] +pub struct RuntimeMetrics { + pub runtime_nodes_total: IntGauge, +} + /// Struct that collects all the metrics in one place +#[derive(Clone)] pub struct Metrics { - pub grpc_request_duration: Histogram, - pub grpc_requests_total: IntCounter, - pub grpc_response_size: Histogram, - pub runtime_nodes_count: IntGauge, + registry: Registry, + pub runtime_metrics: RuntimeMetrics, + pub grpc_server_metrics: GrpcServerMetrics, } -// TODO(#899): For testability implement a trait with methods for updating the metrics. -// TODO(#899): Instead of using a global Registry, the Runtime should instantiate and manage the -// Registry -impl Metrics { - fn new() -> Self { +impl MetricsBuilder { + pub fn new() -> Self { Self { - grpc_request_duration: register_histogram!( - "grpc_request_duration_seconds", - "The gRPC request latencies in seconds." - ) - .expect("Creating grpc_request_duration_seconds metric failed."), - - grpc_requests_total: register_int_counter!(opts!( - "grpc_requests_total", - "Total number of gRPC requests received." - )) - .expect("Creating grpc_requests_total metric failed."), - - grpc_response_size: register_histogram!( + registry: Registry::new(), + } + } + + fn register(&self, metric: T) -> T { + self.registry.register(Box::new(metric.clone())).unwrap(); + metric + } +} + +fn counter_vec(metric_name: &str, labels: &[&str], help: &str) -> IntCounterVec { + let opts = Opts::new(metric_name, help); + IntCounterVec::new(opts, labels).unwrap() +} + +fn histogram_vec(metric_name: &str, label: &[&str], help: &str) -> HistogramVec { + let opts = HistogramOpts::new(metric_name, help); + HistogramVec::new(opts, label).unwrap() +} + +fn int_gauge(metric_name: &str, help: &str) -> IntGauge { + let opts = Opts::new(metric_name, help); + IntGauge::with_opts(opts).unwrap() +} + +impl GrpcServerMetrics { + fn new(builder: &MetricsBuilder) -> Self { + GrpcServerMetrics { + grpc_server_started_total: builder.register(counter_vec( + "grpc_server_started_total", + &["method_name"], + "Total number of RPCs started on the server.", + )), + grpc_server_handled_total: builder.register(counter_vec( + "grpc_server_handled_total", + &["method_name", "status_code"], + "Total number of RPCs completed on the server, regardless of success or failure.", + )), + grpc_server_handled_latency_seconds: builder.register(histogram_vec( + "grpc_server_handled_latency_seconds", + &["method_name"], + "Histogram of response latency of RPCs handled by the server.", + )), + grpc_response_size_bytes: builder.register(histogram_vec( "grpc_response_size_bytes", - "The gRPC response sizes in bytes." - ) - .expect("Creating grpc_response_size_bytes metric failed."), - - runtime_nodes_count: register_int_gauge!(opts!( - "runtime_nodes_count", - "Number of nodes in the runtime." - )) - .expect("Creating runtime_nodes_count metric failed."), + &["method_name"], + "Histogram of response sizes of RPCs handled by the server.", + )), + } + } +} + +impl RuntimeMetrics { + fn new(builder: &MetricsBuilder) -> Self { + RuntimeMetrics { + runtime_nodes_total: builder.register(int_gauge( + "runtime_nodes_total", + "Number of nodes in the runtime.", + )), + } + } +} + +impl Metrics { + pub fn new() -> Self { + let builder = MetricsBuilder::new(); + Self { + runtime_metrics: RuntimeMetrics::new(&builder), + grpc_server_metrics: GrpcServerMetrics::new(&builder), + registry: builder.registry, } } + + pub fn gather(&self) -> Vec { + self.registry.gather() + } } -lazy_static::lazy_static! { - pub static ref METRICS: Metrics = Metrics::new(); +impl Default for Metrics { + fn default() -> Self { + Metrics::new() + } } diff --git a/oak/server/rust/oak_runtime/src/metrics/server.rs b/oak/server/rust/oak_runtime/src/metrics/server.rs index aa7d2ec79ab..4d675114456 100644 --- a/oak/server/rust/oak_runtime/src/metrics/server.rs +++ b/oak/server/rust/oak_runtime/src/metrics/server.rs @@ -14,6 +14,7 @@ // limitations under the License. // +use http::{method::Method, StatusCode}; use hyper::{ header::CONTENT_TYPE, service::{make_service_fn, service_fn}, @@ -42,9 +43,9 @@ impl std::fmt::Display for MetricsServerError { impl std::error::Error for MetricsServerError {} -async fn serve_metrics(_req: Request) -> Result, MetricsServerError> { +async fn handle_metrics_request(runtime: &Runtime) -> Result, MetricsServerError> { let encoder = TextEncoder::new(); - let metric_families = prometheus::gather(); + let metric_families = runtime.gather_metrics(); let mut buffer = vec![]; encoder.encode(&metric_families, &mut buffer).map_err(|e| { MetricsServerError::EncodingError(format!("Could not encode metrics data: {}", e)) @@ -53,7 +54,7 @@ async fn serve_metrics(_req: Request) -> Result, MetricsSer info!("Metrics size: {}", buffer.len()); Response::builder() - .status(http::StatusCode::OK) + .status(StatusCode::OK) .header(CONTENT_TYPE, encoder.format_type()) .body(Body::from(buffer)) .map_err(|e| { @@ -61,17 +62,39 @@ async fn serve_metrics(_req: Request) -> Result, MetricsSer }) } -async fn make_server(port: u16, notify_receiver: tokio::sync::oneshot::Receiver<()>) { +async fn serve_metrics( + runtime: Arc, + req: Request, +) -> Result, MetricsServerError> { + match (req.method(), req.uri().path()) { + (&Method::GET, "/metrics") => handle_metrics_request(&runtime).await, + _ => Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not Found!\n")) + .unwrap()), + } +} + +async fn make_server( + port: u16, + runtime: Arc, + notify_receiver: tokio::sync::oneshot::Receiver<()>, +) { let addr = SocketAddr::from(([0, 0, 0, 0], port)); // A `Service` is needed for every connection, so this - // creates one from the `process_metrics` function. - let make_svc = make_service_fn(|_conn| async { - // service_fn converts our function into a `Service` - Ok::<_, hyper::Error>(service_fn(serve_metrics)) + // creates one from the `serve_metrics` function. + let make_service = make_service_fn(move |_conn| { + let runtime = runtime.clone(); + async move { + Ok::<_, hyper::Error>(service_fn(move |req| { + let runtime = runtime.clone(); + serve_metrics(runtime, req) + })) + } }); - let server = Server::bind(&addr).serve(make_svc); + let server = Server::bind(&addr).serve(make_service); let graceful = server.with_graceful_shutdown(async { // Treat notification failure the same as a notification. let _ = notify_receiver.await; @@ -91,9 +114,9 @@ async fn make_server(port: u16, notify_receiver: tokio::sync::oneshot::Receiver< // triggered. pub fn start_metrics_server( port: u16, - _runtime: Arc, + runtime: Arc, notify_receiver: tokio::sync::oneshot::Receiver<()>, ) { let mut tokio_runtime = tokio::runtime::Runtime::new().expect("Couldn't create Tokio runtime"); - tokio_runtime.block_on(make_server(port, notify_receiver)); + tokio_runtime.block_on(make_server(port, runtime, notify_receiver)); } diff --git a/oak/server/rust/oak_runtime/src/node/grpc/server.rs b/oak/server/rust/oak_runtime/src/node/grpc/server.rs index a7442eb789c..7054f667e17 100644 --- a/oak/server/rust/oak_runtime/src/node/grpc/server.rs +++ b/oak/server/rust/oak_runtime/src/node/grpc/server.rs @@ -15,7 +15,6 @@ // use crate::{ - metrics::METRICS, node::{grpc::codec::VecCodec, Node}, runtime::RuntimeProxy, }; @@ -185,11 +184,19 @@ impl Service> for HttpRequestHandler { request.uri().path().to_string(), ); + let method_name = request.uri().path().to_string(); + let metrics_data = self.runtime.metrics_data(); let future = async move { debug!("Processing an HTTP/2 request: {:?}", request); let mut grpc_service = Grpc::new(VecCodec::default()); let response = grpc_service.unary(grpc_handler, request).await; debug!("Sending an HTTP/2 response: {:?}", response); + let stc = format!("{}", response.status()); + metrics_data + .grpc_server_metrics + .grpc_server_handled_total + .with_label_values(&[&method_name, &stc]) + .inc(); Ok(response) }; @@ -214,10 +221,19 @@ impl UnaryService> for GrpcRequestHandler { fn call(&mut self, request: tonic::Request>) -> Self::Future { let handler = self.clone(); + let metrics_data = handler.runtime.metrics_data(); let future = async move { debug!("Processing a gRPC request: {:?}", request); - METRICS.grpc_requests_total.inc(); - let timer = METRICS.grpc_request_duration.start_timer(); + metrics_data + .grpc_server_metrics + .grpc_server_started_total + .with_label_values(&[&handler.method_name]) + .inc(); + let timer = metrics_data + .grpc_server_metrics + .grpc_server_handled_latency_seconds + .with_label_values(&[&handler.method_name]) + .start_timer(); // Create a gRPC request. // TODO(#953): Add streaming support. @@ -329,7 +345,12 @@ impl GrpcRequestHandler { .map(|message| { // Return an empty HTTP body if the `message` is None. message.map_or(vec![], |m| { - METRICS.grpc_response_size.observe(m.data.len() as f64); + self.runtime + .metrics_data() + .grpc_server_metrics + .grpc_response_size_bytes + .with_label_values(&[&self.method_name]) + .observe(m.data.len() as f64); m.data }) }) diff --git a/oak/server/rust/oak_runtime/src/runtime/mod.rs b/oak/server/rust/oak_runtime/src/runtime/mod.rs index 357317c25f8..78245cb7722 100644 --- a/oak/server/rust/oak_runtime/src/runtime/mod.rs +++ b/oak/server/rust/oak_runtime/src/runtime/mod.rs @@ -16,7 +16,7 @@ use crate::{ message::{Message, NodeMessage}, - metrics::METRICS, + metrics::Metrics, node, runtime::channel::{with_reader_channel, with_writer_channel, Channel}, }; @@ -24,6 +24,7 @@ use core::sync::atomic::{AtomicBool, AtomicU64, Ordering::SeqCst}; use itertools::Itertools; use log::{debug, error, info, trace, warn}; use oak_abi::{label::Label, ChannelReadStatus, OakStatus}; +use prometheus::proto::MetricFamily; use rand::RngCore; use std::{ collections::HashMap, @@ -201,6 +202,8 @@ pub struct Runtime { next_node_id: AtomicU64, aux_servers: Mutex>, + + pub metrics_data: Metrics, } // Methods which translate between ABI handles (Node-relative u64 values) and `ChannelHalf` @@ -273,6 +276,10 @@ impl Runtime { ChannelHalfDirection::Write => Ok(half), } } + + pub fn gather_metrics(&self) -> Vec { + self.metrics_data.gather() + } } // Methods on `RuntimeProxy` for managing the core `Runtime` instance. @@ -289,6 +296,8 @@ impl RuntimeProxy { next_node_id: AtomicU64::new(0), aux_servers: Mutex::new(Vec::new()), + + metrics_data: Metrics::new(), }); let proxy = runtime.proxy_for_new_node(); proxy.runtime.node_configure_instance( @@ -1099,16 +1108,6 @@ impl Runtime { self.update_nodes_count_metric(); } - /// Update the node count metric with the current value. - fn update_nodes_count_metric(&self) { - METRICS.runtime_nodes_count.set( - self.node_infos - .read() - .expect("could not acquire lock on node_infos") - .len() as i64, - ); - } - /// Add the [`JoinHandle`] for a running Node to `NodeInfo`. The provided `NodeId` value /// must already be present in `self.node_infos`. fn add_node_join_handle(&self, node_id: NodeId, node_join_handle: JoinHandle<()>) { @@ -1255,6 +1254,21 @@ impl Runtime { node_id: self.new_node_id(), } } + + /// Update the node count metric with the current value. + fn update_nodes_count_metric(&self) { + self.metrics_data + .runtime_metrics + .runtime_nodes_total + .set(self.get_nodes_count()); + } + + fn get_nodes_count(&self) -> i64 { + self.node_infos + .read() + .expect("could not acquire lock on node_infos") + .len() as i64 + } } /// Manual implementation of the [`Drop`] trait to ensure that all components of @@ -1367,4 +1381,8 @@ impl RuntimeProxy { handles_capacity, ) } + + pub fn metrics_data(&self) -> Metrics { + self.runtime.metrics_data.clone() + } } diff --git a/oak/server/rust/oak_runtime/tests/integration_test.rs b/oak/server/rust/oak_runtime/tests/integration_test.rs index eb05ed3069a..4bea2bea087 100644 --- a/oak/server/rust/oak_runtime/tests/integration_test.rs +++ b/oak/server/rust/oak_runtime/tests/integration_test.rs @@ -61,7 +61,7 @@ mod common { info!("Reading the metrics."); let client = Client::new(); - let uri = format!("http://localhost:{}", &super::METRICS_PORT) + let uri = format!("http://localhost:{}/metrics", &super::METRICS_PORT) .parse::() .expect("Could not parse URI."); @@ -92,8 +92,8 @@ fn test_metrics_gives_the_correct_number_of_nodes() { .block_on(common::read_metrics()) .expect("Reading the metrics failed."); - let value = get_int_metric_value(&res, "runtime_nodes_count"); - assert_eq!(value, Some(2)); + let value = get_int_metric_value(&res, "runtime_nodes_total"); + assert_eq!(value, Some(2), "{}", &res); runtime.stop_runtime(); }