Skip to content

Commit

Permalink
Manage prometheus registry in Oak Runtime (project-oak#941)
Browse files Browse the repository at this point in the history
- Instantiates and manages prometheus registry in the Runtime,
  instead of using a static instance
- Uses labels for finer grained collection of metrics
- Better instrumentation for gRPC server pseudo-node
- Exposes metrics on `/metrics` endpoint
  • Loading branch information
rbehjati authored May 14, 2020
1 parent f9a54ea commit b5322ac
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 64 deletions.
143 changes: 108 additions & 35 deletions oak/server/rust/oak_runtime/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,126 @@
//

use prometheus::{
opts, register_histogram, register_int_counter, register_int_gauge, Histogram, IntCounter,
IntGauge,
proto::MetricFamily, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry,
};

pub mod server;

/// Helper struct with functions for creating and registering metrics.
struct MetricsBuilder {
pub registry: Registry,
}

/// Mostly copied from https://github.com/grpc-ecosystem/java-grpc-prometheus
#[derive(Clone)]
pub struct GrpcServerMetrics {
/// Total number of RPCs started on the server.
pub grpc_server_started_total: IntCounterVec,
/// Total number of RPCs completed on the server, regardless of success or failure.
pub grpc_server_handled_total: IntCounterVec,
/// Histogram of response latency of RPCs handled by the server, in seconds.
pub grpc_server_handled_latency_seconds: HistogramVec,
/// Histogram of response sizes of RPCs handled by the server.
pub grpc_response_size_bytes: HistogramVec,
}

/// Struct that collects all metrics for monitoring the Oak Runtime.
#[derive(Clone)]
pub struct RuntimeMetrics {
pub runtime_nodes_total: IntGauge,
}

/// Struct that collects all the metrics in one place
#[derive(Clone)]
pub struct Metrics {
pub grpc_request_duration: Histogram,
pub grpc_requests_total: IntCounter,
pub grpc_response_size: Histogram,
pub runtime_nodes_count: IntGauge,
registry: Registry,
pub runtime_metrics: RuntimeMetrics,
pub grpc_server_metrics: GrpcServerMetrics,
}

// TODO(#899): For testability implement a trait with methods for updating the metrics.
// TODO(#899): Instead of using a global Registry, the Runtime should instantiate and manage the
// Registry
impl Metrics {
fn new() -> Self {
impl MetricsBuilder {
pub fn new() -> Self {
Self {
grpc_request_duration: register_histogram!(
"grpc_request_duration_seconds",
"The gRPC request latencies in seconds."
)
.expect("Creating grpc_request_duration_seconds metric failed."),

grpc_requests_total: register_int_counter!(opts!(
"grpc_requests_total",
"Total number of gRPC requests received."
))
.expect("Creating grpc_requests_total metric failed."),

grpc_response_size: register_histogram!(
registry: Registry::new(),
}
}

fn register<T: 'static + prometheus::core::Collector + Clone>(&self, metric: T) -> T {
self.registry.register(Box::new(metric.clone())).unwrap();
metric
}
}

fn counter_vec(metric_name: &str, labels: &[&str], help: &str) -> IntCounterVec {
let opts = Opts::new(metric_name, help);
IntCounterVec::new(opts, labels).unwrap()
}

fn histogram_vec(metric_name: &str, label: &[&str], help: &str) -> HistogramVec {
let opts = HistogramOpts::new(metric_name, help);
HistogramVec::new(opts, label).unwrap()
}

fn int_gauge(metric_name: &str, help: &str) -> IntGauge {
let opts = Opts::new(metric_name, help);
IntGauge::with_opts(opts).unwrap()
}

impl GrpcServerMetrics {
fn new(builder: &MetricsBuilder) -> Self {
GrpcServerMetrics {
grpc_server_started_total: builder.register(counter_vec(
"grpc_server_started_total",
&["method_name"],
"Total number of RPCs started on the server.",
)),
grpc_server_handled_total: builder.register(counter_vec(
"grpc_server_handled_total",
&["method_name", "status_code"],
"Total number of RPCs completed on the server, regardless of success or failure.",
)),
grpc_server_handled_latency_seconds: builder.register(histogram_vec(
"grpc_server_handled_latency_seconds",
&["method_name"],
"Histogram of response latency of RPCs handled by the server.",
)),
grpc_response_size_bytes: builder.register(histogram_vec(
"grpc_response_size_bytes",
"The gRPC response sizes in bytes."
)
.expect("Creating grpc_response_size_bytes metric failed."),

runtime_nodes_count: register_int_gauge!(opts!(
"runtime_nodes_count",
"Number of nodes in the runtime."
))
.expect("Creating runtime_nodes_count metric failed."),
&["method_name"],
"Histogram of response sizes of RPCs handled by the server.",
)),
}
}
}

impl RuntimeMetrics {
fn new(builder: &MetricsBuilder) -> Self {
RuntimeMetrics {
runtime_nodes_total: builder.register(int_gauge(
"runtime_nodes_total",
"Number of nodes in the runtime.",
)),
}
}
}

impl Metrics {
pub fn new() -> Self {
let builder = MetricsBuilder::new();
Self {
runtime_metrics: RuntimeMetrics::new(&builder),
grpc_server_metrics: GrpcServerMetrics::new(&builder),
registry: builder.registry,
}
}

pub fn gather(&self) -> Vec<MetricFamily> {
self.registry.gather()
}
}

lazy_static::lazy_static! {
pub static ref METRICS: Metrics = Metrics::new();
impl Default for Metrics {
fn default() -> Self {
Metrics::new()
}
}
45 changes: 34 additions & 11 deletions oak/server/rust/oak_runtime/src/metrics/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.
//

use http::{method::Method, StatusCode};
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Expand Down Expand Up @@ -42,9 +43,9 @@ impl std::fmt::Display for MetricsServerError {

impl std::error::Error for MetricsServerError {}

async fn serve_metrics(_req: Request<Body>) -> Result<Response<Body>, MetricsServerError> {
async fn handle_metrics_request(runtime: &Runtime) -> Result<Response<Body>, MetricsServerError> {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let metric_families = runtime.gather_metrics();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer).map_err(|e| {
MetricsServerError::EncodingError(format!("Could not encode metrics data: {}", e))
Expand All @@ -53,25 +54,47 @@ async fn serve_metrics(_req: Request<Body>) -> Result<Response<Body>, MetricsSer
info!("Metrics size: {}", buffer.len());

Response::builder()
.status(http::StatusCode::OK)
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.map_err(|e| {
MetricsServerError::ResponseError(format!("Could not build the response: {}", e))
})
}

async fn make_server(port: u16, notify_receiver: tokio::sync::oneshot::Receiver<()>) {
async fn serve_metrics(
runtime: Arc<Runtime>,
req: Request<Body>,
) -> Result<Response<Body>, MetricsServerError> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => handle_metrics_request(&runtime).await,
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not Found!\n"))
.unwrap()),
}
}

async fn make_server(
port: u16,
runtime: Arc<Runtime>,
notify_receiver: tokio::sync::oneshot::Receiver<()>,
) {
let addr = SocketAddr::from(([0, 0, 0, 0], port));

// A `Service` is needed for every connection, so this
// creates one from the `process_metrics` function.
let make_svc = make_service_fn(|_conn| async {
// service_fn converts our function into a `Service`
Ok::<_, hyper::Error>(service_fn(serve_metrics))
// creates one from the `serve_metrics` function.
let make_service = make_service_fn(move |_conn| {
let runtime = runtime.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req| {
let runtime = runtime.clone();
serve_metrics(runtime, req)
}))
}
});

let server = Server::bind(&addr).serve(make_svc);
let server = Server::bind(&addr).serve(make_service);
let graceful = server.with_graceful_shutdown(async {
// Treat notification failure the same as a notification.
let _ = notify_receiver.await;
Expand All @@ -91,9 +114,9 @@ async fn make_server(port: u16, notify_receiver: tokio::sync::oneshot::Receiver<
// triggered.
pub fn start_metrics_server(
port: u16,
_runtime: Arc<Runtime>,
runtime: Arc<Runtime>,
notify_receiver: tokio::sync::oneshot::Receiver<()>,
) {
let mut tokio_runtime = tokio::runtime::Runtime::new().expect("Couldn't create Tokio runtime");
tokio_runtime.block_on(make_server(port, notify_receiver));
tokio_runtime.block_on(make_server(port, runtime, notify_receiver));
}
29 changes: 25 additions & 4 deletions oak/server/rust/oak_runtime/src/node/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//

use crate::{
metrics::METRICS,
node::{grpc::codec::VecCodec, Node},
runtime::RuntimeProxy,
};
Expand Down Expand Up @@ -185,11 +184,19 @@ impl Service<http::Request<hyper::Body>> for HttpRequestHandler {
request.uri().path().to_string(),
);

let method_name = request.uri().path().to_string();
let metrics_data = self.runtime.metrics_data();
let future = async move {
debug!("Processing an HTTP/2 request: {:?}", request);
let mut grpc_service = Grpc::new(VecCodec::default());
let response = grpc_service.unary(grpc_handler, request).await;
debug!("Sending an HTTP/2 response: {:?}", response);
let stc = format!("{}", response.status());
metrics_data
.grpc_server_metrics
.grpc_server_handled_total
.with_label_values(&[&method_name, &stc])
.inc();
Ok(response)
};

Expand All @@ -214,10 +221,19 @@ impl UnaryService<Vec<u8>> for GrpcRequestHandler {

fn call(&mut self, request: tonic::Request<Vec<u8>>) -> Self::Future {
let handler = self.clone();
let metrics_data = handler.runtime.metrics_data();
let future = async move {
debug!("Processing a gRPC request: {:?}", request);
METRICS.grpc_requests_total.inc();
let timer = METRICS.grpc_request_duration.start_timer();
metrics_data
.grpc_server_metrics
.grpc_server_started_total
.with_label_values(&[&handler.method_name])
.inc();
let timer = metrics_data
.grpc_server_metrics
.grpc_server_handled_latency_seconds
.with_label_values(&[&handler.method_name])
.start_timer();

// Create a gRPC request.
// TODO(#953): Add streaming support.
Expand Down Expand Up @@ -329,7 +345,12 @@ impl GrpcRequestHandler {
.map(|message| {
// Return an empty HTTP body if the `message` is None.
message.map_or(vec![], |m| {
METRICS.grpc_response_size.observe(m.data.len() as f64);
self.runtime
.metrics_data()
.grpc_server_metrics
.grpc_response_size_bytes
.with_label_values(&[&self.method_name])
.observe(m.data.len() as f64);
m.data
})
})
Expand Down
Loading

0 comments on commit b5322ac

Please sign in to comment.