Skip to content

Commit

Permalink
feat(api): Track params for RPC methods (#1673)
Browse files Browse the repository at this point in the history
## What ❔

Tracks params for the RPC method calls in the metadata middleware, so
that they can be reported for oversized responses, dropped client calls
etc.

## Why ❔

Would allow to debug performance issues more easily.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli authored Apr 24, 2024
1 parent 6cf14a0 commit 1a34c8b
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 52 deletions.
8 changes: 8 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ pub(crate) struct OptionalENConfig {
/// Maximum number of transactions to be stored in the mempool cache. Default is 10000.
#[serde(default = "OptionalENConfig::default_mempool_cache_size")]
pub mempool_cache_size: usize,
/// Enables extended tracing of RPC calls. This may negatively impact performance for nodes under high load
/// (hundreds or thousands RPS).
#[serde(default = "OptionalENConfig::default_extended_api_tracing")]
pub extended_rpc_tracing: bool,

// Health checks
/// Time limit in milliseconds to mark a health check as slow and log the corresponding warning.
Expand Down Expand Up @@ -500,6 +504,10 @@ impl OptionalENConfig {
10_000
}

const fn default_extended_api_tracing() -> bool {
true
}

fn default_main_node_rate_limit_rps() -> NonZeroUsize {
NonZeroUsize::new(100).unwrap()
}
Expand Down
2 changes: 2 additions & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ async fn run_api(
.with_vm_barrier(vm_barrier.clone())
.with_sync_state(sync_state.clone())
.with_mempool_cache(mempool_cache.clone())
.with_extended_tracing(config.optional.extended_rpc_tracing)
.enable_api_namespaces(config.optional.api_namespaces());
if let Some(tree_reader) = &tree_reader {
builder = builder.with_tree_api(tree_reader.clone());
Expand Down Expand Up @@ -549,6 +550,7 @@ async fn run_api(
.with_vm_barrier(vm_barrier)
.with_sync_state(sync_state)
.with_mempool_cache(mempool_cache)
.with_extended_tracing(config.optional.extended_rpc_tracing)
.enable_api_namespaces(config.optional.api_namespaces());
if let Some(tree_reader) = tree_reader {
builder = builder.with_tree_api(tree_reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use zksync_web3_decl::{

#[cfg(test)]
use super::testonly::RecordedMethodCalls;
use crate::api_server::web3::metrics::API_METRICS;
use crate::api_server::web3::metrics::{ObservedRpcParams, API_METRICS};

/// Metadata assigned to a JSON-RPC method call.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -88,9 +88,14 @@ impl MethodTracer {
}
}

pub(super) fn new_call(self: &Arc<Self>, name: &'static str) -> MethodCall {
pub(super) fn new_call<'a>(
self: &Arc<Self>,
name: &'static str,
raw_params: ObservedRpcParams<'a>,
) -> MethodCall<'a> {
MethodCall {
tracer: self.clone(),
params: raw_params,
meta: MethodMetadata::new(name),
is_completed: false,
}
Expand Down Expand Up @@ -118,21 +123,22 @@ impl MethodTracer {
}

#[derive(Debug)]
pub(super) struct MethodCall {
pub(super) struct MethodCall<'a> {
tracer: Arc<MethodTracer>,
meta: MethodMetadata,
params: ObservedRpcParams<'a>,
is_completed: bool,
}

impl Drop for MethodCall {
impl Drop for MethodCall<'_> {
fn drop(&mut self) {
if !self.is_completed {
API_METRICS.observe_dropped_call(&self.meta);
API_METRICS.observe_dropped_call(&self.meta, &self.params);
}
}
}

impl MethodCall {
impl MethodCall<'_> {
pub(super) fn set_as_current(&mut self) -> CurrentMethodGuard<'_> {
let meta = &mut self.meta;
let cell = self.tracer.inner.get_or_default();
Expand All @@ -147,15 +153,21 @@ impl MethodCall {
pub(super) fn observe_response(&mut self, response: &MethodResponse) {
self.is_completed = true;
let meta = &self.meta;
let params = &self.params;
match response.success_or_error {
MethodResponseResult::Success => {
API_METRICS.observe_response_size(meta.name, response.result.len());
API_METRICS.observe_response_size(meta.name, params, response.result.len());
}
MethodResponseResult::Failed(error_code) => {
API_METRICS.observe_protocol_error(meta.name, error_code, meta.has_app_error);
API_METRICS.observe_protocol_error(
meta.name,
params,
error_code,
meta.has_app_error,
);
}
}
API_METRICS.observe_latency(meta);
API_METRICS.observe_latency(meta, params);
#[cfg(test)]
self.tracer.recorder.observe_response(meta, response);
}
Expand Down
135 changes: 107 additions & 28 deletions core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cell::RefCell,
collections::HashSet,
future::Future,
num::NonZeroU32,
Expand All @@ -16,7 +17,9 @@ use governor::{
};
use once_cell::sync::OnceCell;
use pin_project_lite::pin_project;
use rand::{rngs::SmallRng, RngCore, SeedableRng};
use tokio::sync::watch;
use tracing::instrument::{Instrument, Instrumented};
use vise::{
Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, GaugeGuard, Histogram, Metrics,
};
Expand All @@ -27,7 +30,7 @@ use zksync_web3_decl::jsonrpsee::{
};

use super::metadata::{MethodCall, MethodTracer};
use crate::api_server::web3::metrics::API_METRICS;
use crate::api_server::web3::metrics::{ObservedRpcParams, API_METRICS};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "transport", rename_all = "snake_case")]
Expand Down Expand Up @@ -109,32 +112,23 @@ where
/// as metrics.
///
/// As an example, a method handler can set the requested block ID, which would then be used in relevant metric labels.
///
/// # Implementation notes
///
/// We express `TRACE_PARAMS` as a const param rather than a field so that the Rust compiler has more room for optimizations in case tracing
/// is switched off.
#[derive(Debug)]
pub(crate) struct MetadataMiddleware<S> {
pub(crate) struct MetadataMiddleware<S, const TRACE_PARAMS: bool> {
inner: S,
registered_method_names: Arc<HashSet<&'static str>>,
method_tracer: Arc<MethodTracer>,
}

impl<S> MetadataMiddleware<S> {
pub fn new(
inner: S,
registered_method_names: Arc<HashSet<&'static str>>,
method_tracer: Arc<MethodTracer>,
) -> Self {
Self {
inner,
registered_method_names,
method_tracer,
}
}
}

impl<'a, S> RpcServiceT<'a> for MetadataMiddleware<S>
impl<'a, S, const TRACE_PARAMS: bool> RpcServiceT<'a> for MetadataMiddleware<S, TRACE_PARAMS>
where
S: Send + Sync + RpcServiceT<'a>,
{
type Future = WithMethodCall<S::Future>;
type Future = WithMethodCall<'a, S::Future>;

fn call(&self, request: Request<'a>) -> Self::Future {
// "Normalize" the method name by searching it in the set of all registered methods. This extends the lifetime
Expand All @@ -145,23 +139,32 @@ where
.copied()
.unwrap_or("");

WithMethodCall {
call: self.method_tracer.new_call(method_name),
inner: self.inner.call(request),
}
let observed_params = if TRACE_PARAMS {
ObservedRpcParams::new(request.params.as_ref())
} else {
ObservedRpcParams::Unknown
};
let call = self.method_tracer.new_call(method_name, observed_params);
WithMethodCall::new(self.inner.call(request), call)
}
}

pin_project! {
#[derive(Debug)]
pub(crate) struct WithMethodCall<F> {
call: MethodCall,
pub(crate) struct WithMethodCall<'a, F> {
#[pin]
inner: F,
call: MethodCall<'a>,
}
}

impl<F: Future<Output = MethodResponse>> Future for WithMethodCall<F> {
impl<'a, F> WithMethodCall<'a, F> {
fn new(inner: F, call: MethodCall<'a>) -> Self {
Self { inner, call }
}
}

impl<F: Future<Output = MethodResponse>> Future for WithMethodCall<'_, F> {
type Output = MethodResponse;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -178,6 +181,82 @@ impl<F: Future<Output = MethodResponse>> Future for WithMethodCall<F> {
}
}

/// [`tower`] middleware layer that wraps services into [`MetadataMiddleware`]. Implemented as a named type
/// to simplify call sites.
///
/// # Implementation notes
///
/// We express `TRACE_PARAMS` as a const param rather than a field so that the Rust compiler has more room for optimizations in case tracing
/// is switched off.
#[derive(Debug, Clone)]
pub(crate) struct MetadataLayer<const TRACE_PARAMS: bool> {
registered_method_names: Arc<HashSet<&'static str>>,
method_tracer: Arc<MethodTracer>,
}

impl MetadataLayer<false> {
pub fn new(
registered_method_names: Arc<HashSet<&'static str>>,
method_tracer: Arc<MethodTracer>,
) -> Self {
Self {
registered_method_names,
method_tracer,
}
}

pub fn with_param_tracing(self) -> MetadataLayer<true> {
MetadataLayer {
registered_method_names: self.registered_method_names,
method_tracer: self.method_tracer,
}
}
}

impl<Svc, const TRACE_PARAMS: bool> tower::Layer<Svc> for MetadataLayer<TRACE_PARAMS> {
type Service = MetadataMiddleware<Svc, TRACE_PARAMS>;

fn layer(&self, inner: Svc) -> Self::Service {
MetadataMiddleware {
inner,
registered_method_names: self.registered_method_names.clone(),
method_tracer: self.method_tracer.clone(),
}
}
}

/// Middleware that adds tracing spans to each RPC call, so that logs belonging to the same call
/// can be easily filtered.
#[derive(Debug)]
pub(crate) struct CorrelationMiddleware<S> {
inner: S,
}

impl<S> CorrelationMiddleware<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}

impl<'a, S> RpcServiceT<'a> for CorrelationMiddleware<S>
where
S: RpcServiceT<'a>,
{
type Future = Instrumented<S::Future>;

fn call(&self, request: Request<'a>) -> Self::Future {
thread_local! {
static CORRELATION_ID_RNG: RefCell<SmallRng> = RefCell::new(SmallRng::from_entropy());
}

// Wrap a call into a span with unique correlation ID, so that events occurring in the span can be easily filtered.
// This works as a cheap alternative to Open Telemetry tracing with its trace / span IDs.
let correlation_id = CORRELATION_ID_RNG.with(|rng| rng.borrow_mut().next_u64());
let call_span = tracing::debug_span!("rpc_call", correlation_id);
self.inner.call(request).instrument(call_span)
}
}

/// Tracks the timestamp of the last call to the RPC. Used during server shutdown to start dropping new traffic
/// only after this is coordinated by the external load balancer.
#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -293,10 +372,10 @@ mod tests {
}
};

WithMethodCall {
call: method_tracer.new_call("test"),
WithMethodCall::new(
inner,
}
method_tracer.new_call("test", ObservedRpcParams::None),
)
});

if spawn_tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use zksync_web3_decl::{

pub(crate) use self::{
metadata::{MethodMetadata, MethodTracer},
middleware::{LimitMiddleware, MetadataMiddleware, ShutdownMiddleware, TrafficTracker},
middleware::{
CorrelationMiddleware, LimitMiddleware, MetadataLayer, ShutdownMiddleware, TrafficTracker,
},
};
use crate::api_server::tx_sender::SubmitTxError;

Expand Down
Loading

0 comments on commit 1a34c8b

Please sign in to comment.