diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index 76227b9708e8e6..1629aaf612fd52 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -25,6 +25,7 @@ use { }, stream_cancel::{Trigger, Tripwire}, thiserror::Error, + tokio::time::Instant, tokio::{net::TcpStream, pin, select, sync::broadcast}, tokio_util::compat::TokioAsyncReadCompatExt, }; @@ -399,11 +400,13 @@ async fn handle_connection( Err(err) => return Err(err.into()), }, result = broadcast_receiver.recv() => { - + let time = Instant::now(); // In both possible error cases (closed or lagged) we disconnect the client. if let Some(json) = broadcast_handler.handle(result?)? { sender.send_text(&*json).await?; } + let send_time = time.elapsed().as_micros(); + datapoint_info!("rpc-pubsub-broadcast-receive-send-us", ("time", send_time, i64)); }, _ = &mut tripwire => { warn!("disconnecting websocket client: shutting down"); diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 538124c6b2e0b7..49807fe3e99d9e 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -316,7 +316,10 @@ impl RpcNotifier { let time = Instant::now(); let _ = self.sender.send(notification); let send_time = time.elapsed().as_micros(); - datapoint_info!("rpc-pubsub-send-time-us", ("time", send_time, i64)); + datapoint_info!( + "rpc-pubsub-broadcast-send-time-us", + ("time", send_time, i64) + ); inc_new_counter_info!("rpc-pubsub-messages", 1); inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len());