Skip to content

Commit

Permalink
add statsd metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
suurkivi committed Nov 25, 2024
1 parent ec81b22 commit b843a35
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 10 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ rand = "0.8.5"
humantime-serde = "1.1.1"
humantime = "2.1.0"
itertools = "0.13.0"
cadence = "1.5.0"

[build-dependencies]
tonic-build = "0.9.2"
Expand Down
3 changes: 0 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ COPY src ./src
ENV RUST_BACKTRACE=full
RUN cargo build

## Pre-generate some configurations we can use
RUN target/debug/setup --propose-value-delay=250ms

#################################################################################

FROM ubuntu:24.04
Expand Down
22 changes: 18 additions & 4 deletions src/bin/setup.rs → src/bin/setup_local_testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ use std::time::Duration;
#[command(author, version, about, long_about = None)]
struct Args {
/// Delay for proposing a value (e.g. "250ms")
#[arg(long, value_parser = parse_duration)]
#[arg(long, value_parser = parse_duration, default_value = "250ms")]
propose_value_delay: Duration,

/// Metrics prefix. note: node ID will be appended before config file written
#[arg(long, default_value = "snapchain")]
metrics_prefix: String,

#[arg(long, default_value = "127.0.0.1:8125")]
metrics_addr: String,
}

fn parse_duration(arg: &str) -> Result<Duration, String> {
Expand All @@ -30,7 +37,7 @@ async fn main() {
let base_gossip_port = 50050;
for i in 1..=nodes {
let id = i;
let db_dir = format!(".rocks");
let db_dir = format!("nodes/{id}/.rocks");

if !std::path::Path::new(format!("nodes/{id}").as_str()).exists() {
std::fs::create_dir(format!("nodes/{id}")).expect("Failed to create node directory");
Expand All @@ -42,23 +49,30 @@ async fn main() {
let secret_key = hex::encode(SecretKey::generate());
let rpc_port = base_rpc_port + i;
let gossip_port = base_gossip_port + i;
let host = format!("172.100.0.1{i}");
let host = format!("127.0.0.1{i}");
let rpc_address = format!("{host}:{rpc_port}");
let gossip_multi_addr = format!("/ip4/{host}/udp/{gossip_port}/quic-v1");
let other_nodes_addresses = (1..=nodes)
.filter(|&x| x != id)
.map(|x| format!("/ip4/172.100.0.1{x}/udp/{:?}/quic-v1", base_gossip_port + x))
.map(|x| format!("/ip4/127.0.0.1{x}/udp/{:?}/quic-v1", base_gossip_port + x))
.collect::<Vec<String>>()
.join(",");

let propose_value_delay = humantime::format_duration(args.propose_value_delay);

let metrics_prefix = format!("{}{}", args.metrics_prefix, id);
let metrics_addr = args.metrics_addr.clone();

let config_file_content = format!(
r#"
id = {id}
rpc_address="{rpc_address}"
rocksdb_dir="{db_dir}"
[metrics]
prefix="{metrics_prefix}"
addr="{metrics_addr}"
[gossip]
address="{gossip_multi_addr}"
bootstrap_peers = "{other_nodes_addresses}"
Expand Down
17 changes: 17 additions & 0 deletions src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ use serde::{Deserialize, Serialize};
use std::error::Error;
use std::path::Path;

#[derive(Debug, Deserialize, Serialize)]
pub struct MetricsConfig {
pub prefix: String,
pub addr: String,
}

impl Default for MetricsConfig {
fn default() -> Self {
Self {
prefix: "".to_string(), //TODO: "snapchain" eventually
addr: "127.0.0.1:8125".to_string(),
}
}
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
pub id: u32,
Expand All @@ -19,6 +34,7 @@ pub struct Config {
pub rpc_address: String,
pub rocksdb_dir: String,
pub clear_db: bool,
pub metrics: MetricsConfig,
}

impl Default for Config {
Expand All @@ -33,6 +49,7 @@ impl Default for Config {
rpc_address: "0.0.0.0:3383".to_string(),
rocksdb_dir: ".rocks".to_string(),
clear_db: false,
metrics: MetricsConfig::default(),
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use cadence::{Counted, CountedExt, Gauged};
use malachite_metrics::{Metrics, SharedRegistry};
use snapchain::proto::snapchain::Block;
use snapchain::storage::store::BlockStore;
use std::collections::HashMap;
use std::error::Error;
use std::net;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -50,6 +52,23 @@ async fn main() -> Result<(), Box<dyn Error>> {
warn!("Cleared db at {:?}", db_dir);
}

if app_config.metrics.prefix == "" {
// TODO: consider removing this check
return Err("metrics prefix must be specified in config".into());
}

// TODO: parsing to SocketAddr only allows for IPs, DNS names won't work
let (metrics_host, metrics_port) = match app_config.metrics.addr.parse::<SocketAddr>() {
Ok(addr) => Ok((addr.ip().to_string(), addr.port())),
Err(e) => Err(format!("invalid metrics address: {}", e)),
}?;

let host = (metrics_host, metrics_port);
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = cadence::UdpMetricSink::from(host, socket)?;
let client1 = cadence::StatsdClient::builder(app_config.metrics.prefix.as_str(), sink).build();
let client1 = Arc::new(client1);

let addr = app_config.gossip.address.clone();
let grpc_addr = app_config.rpc_address.clone();
let grpc_socket_addr: SocketAddr = grpc_addr.parse()?;
Expand Down Expand Up @@ -127,6 +146,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
None,
block_store.clone(),
app_config.rocksdb_dir,
client1.clone(),
)
.await;

Expand Down
7 changes: 6 additions & 1 deletion src/node/snapchain_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use crate::storage::db::RocksDB;
use crate::storage::store::engine::{BlockEngine, MempoolMessage, ShardEngine};
use crate::storage::store::shard::ShardStore;
use crate::storage::store::BlockStore;
use cadence::StatsdClient;
use libp2p::identity::ed25519::Keypair;
use malachite_config::TimeoutConfig;
use malachite_metrics::Metrics;
use ractor::ActorRef;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::warn;

Expand All @@ -26,6 +28,7 @@ pub struct SnapchainNode {
pub messages_tx_by_shard: HashMap<u32, mpsc::Sender<MempoolMessage>>,
pub shard_stores: HashMap<u32, ShardStore>,
pub address: Address,
metrics_client: Arc<StatsdClient>,
}

impl SnapchainNode {
Expand All @@ -37,6 +40,7 @@ impl SnapchainNode {
block_tx: Option<mpsc::Sender<Block>>,
block_store: BlockStore,
rocksdb_dir: String,
metrics_client: Arc<StatsdClient>,
) -> Self {
let validator_address = Address(keypair.public().to_bytes());

Expand Down Expand Up @@ -79,7 +83,7 @@ impl SnapchainNode {
db.open().unwrap();
let shard_store = ShardStore::new(db);
shard_stores.insert(shard_id, shard_store.clone());
let engine = ShardEngine::new(shard_id, shard_store);
let engine = ShardEngine::new(shard_id, shard_store, metrics_client.clone());

let messages_tx = engine.messages_tx();

Expand Down Expand Up @@ -172,6 +176,7 @@ impl SnapchainNode {
messages_tx_by_shard: shard_messages,
address: validator_address,
shard_stores,
metrics_client,
}
}

Expand Down
47 changes: 46 additions & 1 deletion src/storage/store/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::storage::store::BlockStore;
use crate::storage::trie;
use crate::storage::trie::merkle_trie;
use crate::storage::trie::merkle_trie::TrieKey;
use cadence::{Counted, CountedExt, Gauged, StatsdClient};
use itertools::Itertools;
use message::MessageType;
use snapchain::{Block, ShardChunk, Transaction};
Expand Down Expand Up @@ -91,6 +92,7 @@ pub struct ShardEngine {
cast_store: Store,
pub db: Arc<RocksDB>,
onchain_event_store: OnchainEventStore,
metrics_client: Arc<StatsdClient>,
}

fn encode_vec(data: &[Vec<u8>]) -> String {
Expand All @@ -101,7 +103,11 @@ fn encode_vec(data: &[Vec<u8>]) -> String {
}

impl ShardEngine {
pub fn new(shard_id: u32, shard_store: ShardStore) -> ShardEngine {
pub fn new(
shard_id: u32,
shard_store: ShardStore,
metrics_client: Arc<StatsdClient>,
) -> ShardEngine {
let db = &*shard_store.db;

// TODO: adding the trie here introduces many calls that want to return errors. Rethink unwrap strategy.
Expand All @@ -124,6 +130,7 @@ impl ShardEngine {
cast_store,
db,
onchain_event_store,
metrics_client,
}
}

Expand Down Expand Up @@ -203,13 +210,30 @@ impl ShardEngine {
transactions
}

fn incr(&self, key: &str) {
let key = format!("shard{}.{}", self.shard_id, key);
_ = self.metrics_client.incr(key.as_str())
}

fn count(&self, key: &str, value: u64) {
let key = format!("shard{}.{}", self.shard_id, key);
_ = self.metrics_client.count(key.as_str(), value)
}

fn gauge(&self, key: &str, value: u64) {
let key = format!("shard{}.{}", self.shard_id, key);
_ = self.metrics_client.gauge(key.as_str(), value)
}

pub fn propose_state_change(&mut self, shard: u32) -> ShardStateChange {
let mut txn = RocksDbTransactionBatch::new();
let result = self.prepare_proposal(&mut txn, shard).unwrap(); //TODO: don't unwrap()

// TODO: this should probably operate automatically via drop trait
self.trie.reload(&self.db).unwrap();

self.incr("propose");

result
}

Expand Down Expand Up @@ -373,6 +397,15 @@ impl ShardEngine {
}

self.trie.reload(&*self.shard_store.db).unwrap();

if result {
self.incr("validate.true");
self.count("validate.false", 0)
} else {
self.incr("validate.false");
self.count("validate.true", 0);
}

result
}

Expand All @@ -390,6 +423,18 @@ impl ShardEngine {
self.db.commit(txn).unwrap();
self.trie.reload(&self.db).unwrap();

self.incr("commit");

let block_number = &shard_chunk
.header
.as_ref()
.unwrap()
.height
.unwrap()
.block_number;

self.gauge("block_height", *block_number);

match self.shard_store.put_shard_chunk(shard_chunk) {
Err(err) => {
error!("Unable to write shard chunk to store {}", err)
Expand Down
6 changes: 5 additions & 1 deletion src/storage/store/engine_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ mod tests {
use crate::storage::trie::merkle_trie::TrieKey;
use crate::utils::factory::{events_factory, messages_factory};
use prost::Message as _;
use std::sync::Arc;
use tempfile;
use tracing_subscriber::EnvFilter;

const FID_FOR_TEST: u32 = 1234;

fn new_engine() -> (ShardEngine, tempfile::TempDir) {
let metrics_client =
Arc::new(cadence::StatsdClient::builder("", cadence::NopMetricSink {}).build());

let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join("a.db");

let db = db::RocksDB::new(db_path.to_str().unwrap());
db.open().unwrap();

let shard_store = ShardStore::new(db);
(ShardEngine::new(1, shard_store), dir)
(ShardEngine::new(1, shard_store, metrics_client), dir)
}

fn enable_logging() {
Expand Down
4 changes: 4 additions & 0 deletions tests/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ fn make_tmp_path() -> String {

impl NodeForTest {
pub async fn create(keypair: Keypair, num_shards: u32, grpc_port: u32) -> Self {
let metrics_client =
Arc::new(cadence::StatsdClient::builder("", cadence::NopMetricSink {}).build());

let mut config = snapchain::consensus::consensus::Config::default();
config = config.with_shard_ids((1..=num_shards).collect());

Expand All @@ -72,6 +75,7 @@ impl NodeForTest {
Some(block_tx),
block_store.clone(),
make_tmp_path(),
metrics_client,
)
.await;

Expand Down

0 comments on commit b843a35

Please sign in to comment.