Skip to content

Commit

Permalink
Add some metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
niks3089 committed Apr 5, 2024
1 parent aafd827 commit 0846dc6
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ prometheus = "0.13.2"
prost = "0.12.1"
protobuf-src = "1.1.0"
rdkafka = "0.34.0"
scopeguard = "1.2.0"
serde = "1.0.145"
serde_json = "1.0.86"
serde_yaml = "0.9.25"
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ hyper = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
prometheus = { workspace = true }
scopeguard = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
Expand Down
11 changes: 9 additions & 2 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use {
crate::{
config::{ConfigBlockFailAction, ConfigGrpc},
filters::{Filter, FilterAccountsDataSlice},
prom::{self, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE},
prom::{
self, CONNECTIONS_TOTAL, GEYSER_LOOP_HISTOGRAM, MESSAGE_QUEUE_SIZE,
SNAPSHOT_MESSAGE_QUEUE_SIZE,
},
version::GrpcVersionInfo,
},
log::{error, info},
Expand Down Expand Up @@ -806,6 +809,10 @@ impl GrpcService {
loop {
tokio::select! {
Some(message) = messages_rx.recv() => {
let start_time = Instant::now();
let _guard = scopeguard::guard((), |_| {
GEYSER_LOOP_HISTOGRAM.observe(start_time.elapsed().as_millis() as f64);
});
MESSAGE_QUEUE_SIZE.dec();

// Update blocks info
Expand Down Expand Up @@ -1092,7 +1099,7 @@ impl GrpcService {
while is_alive {
let message = match snapshot_rx.try_recv() {
Ok(message) => {
MESSAGE_QUEUE_SIZE.dec();
SNAPSHOT_MESSAGE_QUEUE_SIZE.dec();
match message {
Some(message) => message,
None => break,
Expand Down
10 changes: 7 additions & 3 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use {
crate::{
config::Config,
grpc::{GrpcService, Message},
prom::{self, PrometheusService, MESSAGE_QUEUE_SIZE},
prom::{
self, PrometheusService, INCOMING_MESSAGES_COUNTER, MESSAGE_QUEUE_SIZE,
SNAPSHOT_MESSAGE_QUEUE_SIZE,
},
},
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
Expand Down Expand Up @@ -34,6 +37,7 @@ pub struct PluginInner {

impl PluginInner {
fn send_message(&self, message: Message) {
INCOMING_MESSAGES_COUNTER.inc();
if self.grpc_channel.send(message).is_ok() {
MESSAGE_QUEUE_SIZE.inc();
}
Expand Down Expand Up @@ -134,7 +138,7 @@ impl GeyserPlugin for Plugin {
if is_startup {
if let Some(channel) = &inner.snapshot_channel {
match channel.send(Some(message)) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Ok(()) => SNAPSHOT_MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
}
}
Expand All @@ -150,7 +154,7 @@ impl GeyserPlugin for Plugin {
self.with_inner(|inner| {
if let Some(channel) = &inner.snapshot_channel {
match channel.send(None) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Ok(()) => SNAPSHOT_MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
}
}
Expand Down
20 changes: 19 additions & 1 deletion yellowstone-grpc-geyser/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use {
Body, Request, Response, Server, StatusCode,
},
log::error,
prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
prometheus::{
Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry,
TextEncoder,
},
solana_geyser_plugin_interface::geyser_plugin_interface::SlotStatus,
std::sync::Once,
tokio::sync::oneshot,
Expand All @@ -31,10 +34,22 @@ lazy_static::lazy_static! {
&["reason"]
).unwrap();

pub static ref INCOMING_MESSAGES_COUNTER: IntCounter = IntCounter::new(
"incoming_messages_counter", "Incoming message counter"
).unwrap();

pub static ref MESSAGE_QUEUE_SIZE: IntGauge = IntGauge::new(
"message_queue_size", "Size of geyser message queue"
).unwrap();

pub static ref SNAPSHOT_MESSAGE_QUEUE_SIZE: IntGauge = IntGauge::new(
"snapshot_message_queue_size", "Size of snapshot message queue"
).unwrap();

pub static ref GEYSER_LOOP_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("geyser_loop_histogram", "Processing loop time")
).unwrap();

pub static ref CONNECTIONS_TOTAL: IntGauge = IntGauge::new(
"connections_total", "Total number of connections to GRPC service"
).unwrap();
Expand All @@ -61,6 +76,9 @@ impl PrometheusService {
register!(INVALID_FULL_BLOCKS);
register!(MESSAGE_QUEUE_SIZE);
register!(CONNECTIONS_TOTAL);
register!(INCOMING_MESSAGES_COUNTER);
register!(SNAPSHOT_MESSAGE_QUEUE_SIZE);
register!(GEYSER_LOOP_HISTOGRAM);

VERSION
.with_label_values(&[
Expand Down

0 comments on commit 0846dc6

Please sign in to comment.