Skip to content

Commit

Permalink
Add more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
niks3089 committed Apr 8, 2024
1 parent 0846dc6 commit 12c0c0e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 52 deletions.
126 changes: 74 additions & 52 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use {
config::{ConfigBlockFailAction, ConfigGrpc},
filters::{Filter, FilterAccountsDataSlice},
prom::{
self, CONNECTIONS_TOTAL, GEYSER_LOOP_HISTOGRAM, MESSAGE_QUEUE_SIZE,
SNAPSHOT_MESSAGE_QUEUE_SIZE,
self, BLOCK_UPDATE_HISTOGRAM, CONNECTIONS_TOTAL, GEYSER_LOOP_HISTOGRAM,
MESSAGE_QUEUE_SIZE, REMOVE_OUTDATED_HISTOGRAM, SNAPSHOT_MESSAGE_QUEUE_SIZE,
UPDATE_RECONSTRUCTION_HISTOGRAM,
},
version::GrpcVersionInfo,
},
Expand Down Expand Up @@ -811,77 +812,97 @@ impl GrpcService {
Some(message) = messages_rx.recv() => {
let start_time = Instant::now();
let _guard = scopeguard::guard((), |_| {
GEYSER_LOOP_HISTOGRAM.observe(start_time.elapsed().as_millis() as f64);
GEYSER_LOOP_HISTOGRAM.observe(start_time.elapsed().as_secs() as f64);
});
MESSAGE_QUEUE_SIZE.dec();

// Update blocks info
if let Some(blocks_meta_tx) = &blocks_meta_tx {
if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.clone());
{
let block_update_start = Instant::now();
let _block_update_guard = scopeguard::guard((), |_| {
BLOCK_UPDATE_HISTOGRAM.observe(block_update_start.elapsed().as_secs() as f64);
});

// Update blocks info
if let Some(blocks_meta_tx) = &blocks_meta_tx {
if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.clone());
}
}
}

// Remove outdated block reconstruction info
match &message {
// On startup we can receive few Confirmed/Finalized slots without BlockMeta message
// With saved first Processed slot we can ignore errors caused by startup process
Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => {
processed_first_slot = Some(msg.slot);
}
Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => {
// keep extra 10 slots
if let Some(msg_slot) = msg.slot.checked_sub(10) {
loop {
match messages.keys().next().cloned() {
Some(slot) if slot < msg_slot => {
if let Some(slot_messages) = messages.remove(&slot) {
match processed_first_slot {
Some(processed_first) if slot <= processed_first => continue,
None => continue,
_ => {}
}

if !slot_messages.sealed && slot_messages.finalized_at.is_some() {
let mut reasons = vec![];
if let Some(block_meta) = slot_messages.block_meta {
let block_txn_count = block_meta.executed_transaction_count as usize;
let msg_txn_count = slot_messages.transactions.len();
if block_txn_count != msg_txn_count {
reasons.push("InvalidTxnCount");
error!("failed to reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}");
}
let block_entries_count = block_meta.entries_count as usize;
let msg_entries_count = slot_messages.entries.len();
if block_entries_count != msg_entries_count {
reasons.push("InvalidEntriesCount");
error!("failed to reconstruct #{slot} -- entries count: {block_entries_count} vs {msg_entries_count}");
}
} else {
reasons.push("NoBlockMeta");
{
let remove_outdated_start = Instant::now();
let _remove_outdated_guard = scopeguard::guard((), |_| {
REMOVE_OUTDATED_HISTOGRAM.observe(remove_outdated_start.elapsed().as_secs() as f64);
});

match &message {
// On startup we can receive few Confirmed/Finalized slots without BlockMeta message
// With saved first Processed slot we can ignore errors caused by startup process
Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => {
processed_first_slot = Some(msg.slot);
}
Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => {
// keep extra 10 slots
if let Some(msg_slot) = msg.slot.checked_sub(10) {
loop {
match messages.keys().next().cloned() {
Some(slot) if slot < msg_slot => {
if let Some(slot_messages) = messages.remove(&slot) {
match processed_first_slot {
Some(processed_first) if slot <= processed_first => continue,
None => continue,
_ => {}
}
let reason = reasons.join(",");

prom::update_invalid_blocks(format!("failed reconstruct {reason}"));
match block_fail_action {
ConfigBlockFailAction::Log => {
error!("failed reconstruct #{slot} {reason}");
if !slot_messages.sealed && slot_messages.finalized_at.is_some() {
let mut reasons = vec![];
if let Some(block_meta) = slot_messages.block_meta {
let block_txn_count = block_meta.executed_transaction_count as usize;
let msg_txn_count = slot_messages.transactions.len();
if block_txn_count != msg_txn_count {
reasons.push("InvalidTxnCount");
error!("failed to reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}");
}
let block_entries_count = block_meta.entries_count as usize;
let msg_entries_count = slot_messages.entries.len();
if block_entries_count != msg_entries_count {
reasons.push("InvalidEntriesCount");
error!("failed to reconstruct #{slot} -- entries count: {block_entries_count} vs {msg_entries_count}");
}
} else {
reasons.push("NoBlockMeta");
}
ConfigBlockFailAction::Panic => {
panic!("failed reconstruct #{slot} {reason}");
let reason = reasons.join(",");

prom::update_invalid_blocks(format!("failed reconstruct {reason}"));
match block_fail_action {
ConfigBlockFailAction::Log => {
error!("failed reconstruct #{slot} {reason}");
}
ConfigBlockFailAction::Panic => {
panic!("failed reconstruct #{slot} {reason}");
}
}
}
}
}
_ => break,
}
_ => break,
}
}
}
_ => {}
}
_ => {}
}

{
let update_reconstruction_start = Instant::now();
let _update_reconstruction_guard = scopeguard::guard((), |_| {
UPDATE_RECONSTRUCTION_HISTOGRAM.observe(update_reconstruction_start.elapsed().as_secs() as f64);
});

// Update block reconstruction info
let slot_messages = messages.entry(message.get_slot()).or_default();
if !matches!(message, Message::Slot(_)) {
Expand Down Expand Up @@ -1043,6 +1064,7 @@ impl GrpcService {
}
}
}
}
() = &mut processed_sleep => {
if !processed_messages.is_empty() {
let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
Expand Down
13 changes: 13 additions & 0 deletions yellowstone-grpc-geyser/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ lazy_static::lazy_static! {
pub static ref CONNECTIONS_TOTAL: IntGauge = IntGauge::new(
"connections_total", "Total number of connections to GRPC service"
).unwrap();

pub static ref BLOCK_UPDATE_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("block_update_histogram", "Processing loop time")
).unwrap();
pub static ref REMOVE_OUTDATED_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("remove_outdated_histogram", "Processing loop time")
).unwrap();
pub static ref UPDATE_RECONSTRUCTION_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("update_reconstruction_histogram", "Processing loop time")
).unwrap();
}

#[derive(Debug)]
Expand All @@ -79,6 +89,9 @@ impl PrometheusService {
register!(INCOMING_MESSAGES_COUNTER);
register!(SNAPSHOT_MESSAGE_QUEUE_SIZE);
register!(GEYSER_LOOP_HISTOGRAM);
register!(BLOCK_UPDATE_HISTOGRAM);
register!(REMOVE_OUTDATED_HISTOGRAM);
register!(UPDATE_RECONSTRUCTION_HISTOGRAM);

VERSION
.with_label_values(&[
Expand Down

0 comments on commit 12c0c0e

Please sign in to comment.