From b843a358d391db05527ac39360410fdeb5eba278 Mon Sep 17 00:00:00 2001 From: suurkivi <136639517+suurkivi@users.noreply.github.com> Date: Mon, 25 Nov 2024 12:54:31 -0800 Subject: [PATCH] add statsd metrics --- Cargo.lock | 19 ++++++++ Cargo.toml | 1 + Dockerfile | 3 -- src/bin/{setup.rs => setup_local_testnet.rs} | 22 +++++++-- src/cfg.rs | 17 +++++++ src/main.rs | 20 +++++++++ src/node/snapchain_node.rs | 7 ++- src/storage/store/engine.rs | 47 +++++++++++++++++++- src/storage/store/engine_tests.rs | 6 ++- tests/consensus_test.rs | 4 ++ 10 files changed, 136 insertions(+), 10 deletions(-) rename src/bin/{setup.rs => setup_local_testnet.rs} (78%) diff --git a/Cargo.lock b/Cargo.lock index c4bcc4f..393f93f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,6 +1307,15 @@ dependencies = [ "serde", ] +[[package]] +name = "cadence" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62fd689c825a93386a2ac05a46f88342c6df9ec3e79416f665650614e92e7475" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "cc" version = "1.2.1" @@ -1499,6 +1508,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -5145,6 +5163,7 @@ dependencies = [ "alloy-transport", "async-trait", "blake3", + "cadence", "clap", "ed25519-dalek", "figment", diff --git a/Cargo.toml b/Cargo.toml index 12452c1..6215a86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ rand = "0.8.5" humantime-serde = "1.1.1" humantime = "2.1.0" itertools = "0.13.0" +cadence = "1.5.0" [build-dependencies] tonic-build = "0.9.2" diff --git a/Dockerfile b/Dockerfile index 5257c69..7786a9f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,9 +29,6 @@ COPY src ./src ENV RUST_BACKTRACE=full RUN cargo build -## Pre-generate some configurations we can use -RUN target/debug/setup --propose-value-delay=250ms - ################################################################################# FROM ubuntu:24.04 diff --git a/src/bin/setup.rs b/src/bin/setup_local_testnet.rs similarity index 78% rename from src/bin/setup.rs rename to src/bin/setup_local_testnet.rs index 05460d2..d6127d0 100644 --- a/src/bin/setup.rs +++ b/src/bin/setup_local_testnet.rs @@ -6,8 +6,15 @@ use std::time::Duration; #[command(author, version, about, long_about = None)] struct Args { /// Delay for proposing a value (e.g. "250ms") - #[arg(long, value_parser = parse_duration)] + #[arg(long, value_parser = parse_duration, default_value = "250ms")] propose_value_delay: Duration, + + /// Metrics prefix. note: node ID will be appended before config file written + #[arg(long, default_value = "snapchain")] + metrics_prefix: String, + + #[arg(long, default_value = "127.0.0.1:8125")] + metrics_addr: String, } fn parse_duration(arg: &str) -> Result { @@ -30,7 +37,7 @@ async fn main() { let base_gossip_port = 50050; for i in 1..=nodes { let id = i; - let db_dir = format!(".rocks"); + let db_dir = format!("nodes/{id}/.rocks"); if !std::path::Path::new(format!("nodes/{id}").as_str()).exists() { std::fs::create_dir(format!("nodes/{id}")).expect("Failed to create node directory"); @@ -42,23 +49,30 @@ async fn main() { let secret_key = hex::encode(SecretKey::generate()); let rpc_port = base_rpc_port + i; let gossip_port = base_gossip_port + i; - let host = format!("172.100.0.1{i}"); + let host = format!("127.0.0.1{i}"); let rpc_address = format!("{host}:{rpc_port}"); let gossip_multi_addr = format!("/ip4/{host}/udp/{gossip_port}/quic-v1"); let other_nodes_addresses = (1..=nodes) .filter(|&x| x != id) - .map(|x| format!("/ip4/172.100.0.1{x}/udp/{:?}/quic-v1", base_gossip_port + x)) + .map(|x| format!("/ip4/127.0.0.1{x}/udp/{:?}/quic-v1", base_gossip_port + x)) .collect::>() .join(","); let propose_value_delay = humantime::format_duration(args.propose_value_delay); + let metrics_prefix = format!("{}{}", args.metrics_prefix, id); + let metrics_addr = args.metrics_addr.clone(); + let config_file_content = format!( r#" id = {id} rpc_address="{rpc_address}" rocksdb_dir="{db_dir}" +[metrics] +prefix="{metrics_prefix}" +addr="{metrics_addr}" + [gossip] address="{gossip_multi_addr}" bootstrap_peers = "{other_nodes_addresses}" diff --git a/src/cfg.rs b/src/cfg.rs index 62c271d..36fb6ae 100644 --- a/src/cfg.rs +++ b/src/cfg.rs @@ -8,6 +8,21 @@ use serde::{Deserialize, Serialize}; use std::error::Error; use std::path::Path; +#[derive(Debug, Deserialize, Serialize)] +pub struct MetricsConfig { + pub prefix: String, + pub addr: String, +} + +impl Default for MetricsConfig { + fn default() -> Self { + Self { + prefix: "".to_string(), //TODO: "snapchain" eventually + addr: "127.0.0.1:8125".to_string(), + } + } +} + #[derive(Debug, Deserialize, Serialize)] pub struct Config { pub id: u32, @@ -19,6 +34,7 @@ pub struct Config { pub rpc_address: String, pub rocksdb_dir: String, pub clear_db: bool, + pub metrics: MetricsConfig, } impl Default for Config { @@ -33,6 +49,7 @@ impl Default for Config { rpc_address: "0.0.0.0:3383".to_string(), rocksdb_dir: ".rocks".to_string(), clear_db: false, + metrics: MetricsConfig::default(), } } } diff --git a/src/main.rs b/src/main.rs index d9c192a..4a8c945 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,10 @@ +use cadence::{Counted, CountedExt, Gauged}; use malachite_metrics::{Metrics, SharedRegistry}; use snapchain::proto::snapchain::Block; use snapchain::storage::store::BlockStore; use std::collections::HashMap; use std::error::Error; +use std::net; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -50,6 +52,23 @@ async fn main() -> Result<(), Box> { warn!("Cleared db at {:?}", db_dir); } + if app_config.metrics.prefix == "" { + // TODO: consider removing this check + return Err("metrics prefix must be specified in config".into()); + } + + // TODO: parsing to SocketAddr only allows for IPs, DNS names won't work + let (metrics_host, metrics_port) = match app_config.metrics.addr.parse::() { + Ok(addr) => Ok((addr.ip().to_string(), addr.port())), + Err(e) => Err(format!("invalid metrics address: {}", e)), + }?; + + let host = (metrics_host, metrics_port); + let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap(); + let sink = cadence::UdpMetricSink::from(host, socket)?; + let client1 = cadence::StatsdClient::builder(app_config.metrics.prefix.as_str(), sink).build(); + let client1 = Arc::new(client1); + let addr = app_config.gossip.address.clone(); let grpc_addr = app_config.rpc_address.clone(); let grpc_socket_addr: SocketAddr = grpc_addr.parse()?; @@ -127,6 +146,7 @@ async fn main() -> Result<(), Box> { None, block_store.clone(), app_config.rocksdb_dir, + client1.clone(), ) .await; diff --git a/src/node/snapchain_node.rs b/src/node/snapchain_node.rs index 6f6d630..09bd1bb 100644 --- a/src/node/snapchain_node.rs +++ b/src/node/snapchain_node.rs @@ -11,11 +11,13 @@ use crate::storage::db::RocksDB; use crate::storage::store::engine::{BlockEngine, MempoolMessage, ShardEngine}; use crate::storage::store::shard::ShardStore; use crate::storage::store::BlockStore; +use cadence::StatsdClient; use libp2p::identity::ed25519::Keypair; use malachite_config::TimeoutConfig; use malachite_metrics::Metrics; use ractor::ActorRef; use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use tokio::sync::mpsc; use tracing::warn; @@ -26,6 +28,7 @@ pub struct SnapchainNode { pub messages_tx_by_shard: HashMap>, pub shard_stores: HashMap, pub address: Address, + metrics_client: Arc, } impl SnapchainNode { @@ -37,6 +40,7 @@ impl SnapchainNode { block_tx: Option>, block_store: BlockStore, rocksdb_dir: String, + metrics_client: Arc, ) -> Self { let validator_address = Address(keypair.public().to_bytes()); @@ -79,7 +83,7 @@ impl SnapchainNode { db.open().unwrap(); let shard_store = ShardStore::new(db); shard_stores.insert(shard_id, shard_store.clone()); - let engine = ShardEngine::new(shard_id, shard_store); + let engine = ShardEngine::new(shard_id, shard_store, metrics_client.clone()); let messages_tx = engine.messages_tx(); @@ -172,6 +176,7 @@ impl SnapchainNode { messages_tx_by_shard: shard_messages, address: validator_address, shard_stores, + metrics_client, } } diff --git a/src/storage/store/engine.rs b/src/storage/store/engine.rs index cb0bb5c..13a8539 100644 --- a/src/storage/store/engine.rs +++ b/src/storage/store/engine.rs @@ -12,6 +12,7 @@ use crate::storage::store::BlockStore; use crate::storage::trie; use crate::storage::trie::merkle_trie; use crate::storage::trie::merkle_trie::TrieKey; +use cadence::{Counted, CountedExt, Gauged, StatsdClient}; use itertools::Itertools; use message::MessageType; use snapchain::{Block, ShardChunk, Transaction}; @@ -91,6 +92,7 @@ pub struct ShardEngine { cast_store: Store, pub db: Arc, onchain_event_store: OnchainEventStore, + metrics_client: Arc, } fn encode_vec(data: &[Vec]) -> String { @@ -101,7 +103,11 @@ fn encode_vec(data: &[Vec]) -> String { } impl ShardEngine { - pub fn new(shard_id: u32, shard_store: ShardStore) -> ShardEngine { + pub fn new( + shard_id: u32, + shard_store: ShardStore, + metrics_client: Arc, + ) -> ShardEngine { let db = &*shard_store.db; // TODO: adding the trie here introduces many calls that want to return errors. Rethink unwrap strategy. @@ -124,6 +130,7 @@ impl ShardEngine { cast_store, db, onchain_event_store, + metrics_client, } } @@ -203,6 +210,21 @@ impl ShardEngine { transactions } + fn incr(&self, key: &str) { + let key = format!("shard{}.{}", self.shard_id, key); + _ = self.metrics_client.incr(key.as_str()) + } + + fn count(&self, key: &str, value: u64) { + let key = format!("shard{}.{}", self.shard_id, key); + _ = self.metrics_client.count(key.as_str(), value) + } + + fn gauge(&self, key: &str, value: u64) { + let key = format!("shard{}.{}", self.shard_id, key); + _ = self.metrics_client.gauge(key.as_str(), value) + } + pub fn propose_state_change(&mut self, shard: u32) -> ShardStateChange { let mut txn = RocksDbTransactionBatch::new(); let result = self.prepare_proposal(&mut txn, shard).unwrap(); //TODO: don't unwrap() @@ -210,6 +232,8 @@ impl ShardEngine { // TODO: this should probably operate automatically via drop trait self.trie.reload(&self.db).unwrap(); + self.incr("propose"); + result } @@ -373,6 +397,15 @@ impl ShardEngine { } self.trie.reload(&*self.shard_store.db).unwrap(); + + if result { + self.incr("validate.true"); + self.count("validate.false", 0) + } else { + self.incr("validate.false"); + self.count("validate.true", 0); + } + result } @@ -390,6 +423,18 @@ impl ShardEngine { self.db.commit(txn).unwrap(); self.trie.reload(&self.db).unwrap(); + self.incr("commit"); + + let block_number = &shard_chunk + .header + .as_ref() + .unwrap() + .height + .unwrap() + .block_number; + + self.gauge("block_height", *block_number); + match self.shard_store.put_shard_chunk(shard_chunk) { Err(err) => { error!("Unable to write shard chunk to store {}", err) diff --git a/src/storage/store/engine_tests.rs b/src/storage/store/engine_tests.rs index 9e7ac0f..e82e8d0 100644 --- a/src/storage/store/engine_tests.rs +++ b/src/storage/store/engine_tests.rs @@ -11,12 +11,16 @@ mod tests { use crate::storage::trie::merkle_trie::TrieKey; use crate::utils::factory::{events_factory, messages_factory}; use prost::Message as _; + use std::sync::Arc; use tempfile; use tracing_subscriber::EnvFilter; const FID_FOR_TEST: u32 = 1234; fn new_engine() -> (ShardEngine, tempfile::TempDir) { + let metrics_client = + Arc::new(cadence::StatsdClient::builder("", cadence::NopMetricSink {}).build()); + let dir = tempfile::TempDir::new().unwrap(); let db_path = dir.path().join("a.db"); @@ -24,7 +28,7 @@ mod tests { db.open().unwrap(); let shard_store = ShardStore::new(db); - (ShardEngine::new(1, shard_store), dir) + (ShardEngine::new(1, shard_store, metrics_client), dir) } fn enable_logging() { diff --git a/tests/consensus_test.rs b/tests/consensus_test.rs index 908c896..9106c5e 100644 --- a/tests/consensus_test.rs +++ b/tests/consensus_test.rs @@ -55,6 +55,9 @@ fn make_tmp_path() -> String { impl NodeForTest { pub async fn create(keypair: Keypair, num_shards: u32, grpc_port: u32) -> Self { + let metrics_client = + Arc::new(cadence::StatsdClient::builder("", cadence::NopMetricSink {}).build()); + let mut config = snapchain::consensus::consensus::Config::default(); config = config.with_shard_ids((1..=num_shards).collect()); @@ -72,6 +75,7 @@ impl NodeForTest { Some(block_tx), block_store.clone(), make_tmp_path(), + metrics_client, ) .await;