Skip to content

Commit

Permalink
Only Time Based Flushes (#161)
Browse files Browse the repository at this point in the history
* Include prometheus metric explorer

* Only xadd flush on a timeer

* Follow prom metric naming conventions

* Logging changes for ping-pong grpc
  • Loading branch information
kespinola authored Oct 9, 2024
1 parent 118baec commit 01bbe99
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 85 deletions.
10 changes: 10 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ services:
- "8001:8001"
- "8899:8899"
- "9900:9900"
prometheus:
image: prom/prometheus:latest
container_name: prometheus
volumes:
- ./prometheus-config.yaml:/etc/prometheus/prometheus-config.yaml
command: ["--config.file=/etc/prometheus/prometheus-config.yaml"]
ports:
- "9091:9090"
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
grafana_data: { }
graphite_data: { }
5 changes: 2 additions & 3 deletions grpc-ingest/config-grpc2redis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ transactions:
- BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY
redis:
url: redis://localhost:6379
pipeline_max_size: 10
pipeline_max_idle_ms: 10
max_xadd_in_process: 100
pipeline_max_size: 1_000
pipeline_max_idle_ms: 100_000
21 changes: 1 addition & 20 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,36 +181,17 @@ impl ConfigGrpcTransactions {
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigGrpcRedis {
pub url: String,
#[serde(
default = "ConfigGrpcRedis::default_pipeline_max_size",
deserialize_with = "deserialize_usize_str"
)]
pub pipeline_max_size: usize,
#[serde(
default = "ConfigGrpcRedis::default_pipeline_max_idle",
deserialize_with = "deserialize_duration_str",
rename = "pipeline_max_idle_ms"
deserialize_with = "deserialize_duration_str"
)]
pub pipeline_max_idle: Duration,
#[serde(
default = "ConfigGrpcRedis::default_max_xadd_in_process",
deserialize_with = "deserialize_usize_str"
)]
pub max_xadd_in_process: usize,
}

impl ConfigGrpcRedis {
pub const fn default_pipeline_max_size() -> usize {
10
}

pub const fn default_pipeline_max_idle() -> Duration {
Duration::from_millis(10)
}

pub const fn default_max_xadd_in_process() -> usize {
100
}
}

pub fn deserialize_duration_str<'de, D>(deserializer: D) -> Result<Duration, D::Error>
Expand Down
37 changes: 15 additions & 22 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
fn handle(
&self,
job: GrpcJob,
handle: topograph::executor::Handle<'a, GrpcJob, Nonblock<Tokio>>,
_handle: topograph::executor::Handle<'a, GrpcJob, Nonblock<Tokio>>,
) -> impl futures::Future<Output = Self::Output> + Send + 'a {
let config = Arc::clone(&self.config);
let connection = self.connection.clone();
Expand All @@ -68,10 +68,9 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
let counts = flush.as_ref().unwrap_or_else(|counts| counts);

for (stream, count) in counts.iter() {
debug!(message = "Redis pipe flushed", ?stream, ?status, ?count);
redis_xadd_status_inc(stream, status, *count);
}

debug!(message = "Redis pipe flushed", ?status, ?counts);
}
GrpcJob::ProcessSubscribeUpdate(update) => {
let accounts_stream = config.accounts.stream.clone();
Expand All @@ -92,8 +91,6 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
"*",
account.encode_to_vec(),
);

debug!(message = "Account update", ?account,);
}
UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
Expand All @@ -102,39 +99,36 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
"*",
transaction.encode_to_vec(),
);

debug!(message = "Transaction update", ?transaction);
}
UpdateOneof::Ping(_) => {
subscribe_tx
let ping = subscribe_tx
.lock()
.await
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: PING_ID }),
..Default::default()
})
.await
.map_err(|err| {
warn!(message = "Failed to send ping", ?err);
})
.ok();

debug!(message = "Ping", id = PING_ID);
.await;

match ping {
Ok(_) => {
debug!(message = "Ping sent successfully", id = PING_ID)
}
Err(err) => {
warn!(message = "Failed to send ping", ?err, id = PING_ID)
}
}
}
UpdateOneof::Pong(pong) => {
if pong.id == PING_ID {
debug!(message = "Pong", id = PING_ID);
debug!(message = "Pong received", id = PING_ID);
} else {
warn!(message = "Unknown pong id", id = pong.id);
warn!(message = "Unknown pong id received", id = pong.id);
}
}
var => warn!(message = "Unknown update variant", ?var),
}
}

if pipe.size() >= config.redis.pipeline_max_size {
handle.push(GrpcJob::FlushRedisPipe);
}
}
}
}
Expand Down Expand Up @@ -197,7 +191,6 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
exec.push(GrpcJob::FlushRedisPipe);
}
Some(Ok(msg)) = stream.next() => {
debug!(message = "Received gRPC message", ?msg);
exec.push(GrpcJob::ProcessSubscribeUpdate(Box::new(msg)));
}
_ = shutdown.next() => {
Expand Down
72 changes: 36 additions & 36 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,46 @@ use {
lazy_static::lazy_static! {
static ref REGISTRY: Registry = Registry::new();

static ref VERSION: IntCounterVec = IntCounterVec::new(
Opts::new("version", "Plugin version info"),
static ref VERSION_INFO_METRIC: IntCounterVec = IntCounterVec::new(
Opts::new("version_info", "Plugin version info"),
&["buildts", "git", "package", "proto", "rustc", "solana", "version"]
).unwrap();

static ref REDIS_XLEN_TOTAL: IntGaugeVec = IntGaugeVec::new(
Opts::new("redis_xlen_total", "Length of stream in Redis"),
static ref REDIS_STREAM_LENGTH: IntGaugeVec = IntGaugeVec::new(
Opts::new("redis_stream_length", "Length of stream in Redis"),
&["stream"]
).unwrap();

static ref REDIS_XADD_STATUS: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xadd_status", "Status of messages sent to Redis stream"),
static ref REDIS_XADD_STATUS_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xadd_status_count", "Status of messages sent to Redis stream"),
&["stream", "status"]
).unwrap();

static ref REDIS_XACK_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xack_total", "Total number of processed messages"),
static ref REDIS_XACK_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xack_count", "Total number of processed messages"),
&["stream"]
).unwrap();

static ref PGPOOL_CONNECTIONS_TOTAL: IntGaugeVec = IntGaugeVec::new(
Opts::new("pgpool_connections_total", "Total number of connections in Postgres Pool"),
static ref PGPOOL_CONNECTIONS: IntGaugeVec = IntGaugeVec::new(
Opts::new("pgpool_connections", "Total number of connections in Postgres Pool"),
&["kind"]
).unwrap();

static ref PROGRAM_TRANSFORMER_TASKS_TOTAL: IntGauge = IntGauge::new(
"program_transformer_tasks_total", "Number of tasks spawned for program transform"
static ref PROGRAM_TRANSFORMER_TASKS: IntGauge = IntGauge::new(
"program_transformer_tasks", "Number of tasks spawned for program transform"
).unwrap();

static ref PROGRAM_TRANSFORMER_TASK_STATUS: IntCounterVec = IntCounterVec::new(
Opts::new("program_transformer_task_status", "Status of processed messages"),
static ref PROGRAM_TRANSFORMER_TASK_STATUS_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("program_transformer_task_status_count", "Status of processed messages"),
&["status"],
).unwrap();

static ref DOWNLOAD_METADATA_INSERTED_TOTAL: IntCounter = IntCounter::new(
"download_metadata_inserted_total", "Total number of inserted tasks for download metadata"
static ref DOWNLOAD_METADATA_INSERTED_COUNT: IntCounter = IntCounter::new(
"download_metadata_inserted_count", "Total number of inserted tasks for download metadata"
).unwrap();

static ref INGEST_TASKS_TOTAL: IntGaugeVec = IntGaugeVec::new(
Opts::new("ingest_tasks_total", "Number of tasks spawned for ingest"),
static ref INGEST_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("ingest_tasks", "Number of tasks spawned for ingest"),
&["stream"]
).unwrap();
}
Expand All @@ -69,17 +69,17 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
.expect("collector can't be registered");
};
}
register!(VERSION);
register!(REDIS_XLEN_TOTAL);
register!(REDIS_XADD_STATUS);
register!(REDIS_XACK_TOTAL);
register!(PGPOOL_CONNECTIONS_TOTAL);
register!(PROGRAM_TRANSFORMER_TASKS_TOTAL);
register!(PROGRAM_TRANSFORMER_TASK_STATUS);
register!(DOWNLOAD_METADATA_INSERTED_TOTAL);
register!(INGEST_TASKS_TOTAL);
register!(VERSION_INFO_METRIC);
register!(REDIS_STREAM_LENGTH);
register!(REDIS_XADD_STATUS_COUNT);
register!(REDIS_XACK_COUNT);
register!(PGPOOL_CONNECTIONS);
register!(PROGRAM_TRANSFORMER_TASKS);
register!(PROGRAM_TRANSFORMER_TASK_STATUS_COUNT);
register!(DOWNLOAD_METADATA_INSERTED_COUNT);
register!(INGEST_TASKS);

VERSION
VERSION_INFO_METRIC
.with_label_values(&[
VERSION_INFO.buildts,
VERSION_INFO.git,
Expand Down Expand Up @@ -130,19 +130,19 @@ fn not_found_handler() -> Response<Body> {
}

pub fn redis_xlen_set(stream: &str, len: usize) {
REDIS_XLEN_TOTAL
REDIS_STREAM_LENGTH
.with_label_values(&[stream])
.set(len as i64);
}

pub fn redis_xadd_status_inc(stream: &str, status: Result<(), ()>, delta: usize) {
REDIS_XADD_STATUS
REDIS_XADD_STATUS_COUNT
.with_label_values(&[stream, if status.is_ok() { "success" } else { "failed" }])
.inc_by(delta as u64);
}

pub fn redis_xack_inc(stream: &str, delta: usize) {
REDIS_XACK_TOTAL
REDIS_XACK_COUNT
.with_label_values(&[stream])
.inc_by(delta as u64)
}
Expand All @@ -154,7 +154,7 @@ pub enum PgpoolConnectionsKind {
}

pub fn pgpool_connections_set(kind: PgpoolConnectionsKind, size: usize) {
PGPOOL_CONNECTIONS_TOTAL
PGPOOL_CONNECTIONS
.with_label_values(&[match kind {
PgpoolConnectionsKind::Total => "total",
PgpoolConnectionsKind::Idle => "idle",
Expand All @@ -163,15 +163,15 @@ pub fn pgpool_connections_set(kind: PgpoolConnectionsKind, size: usize) {
}

pub fn ingest_tasks_total_inc(stream: &str) {
INGEST_TASKS_TOTAL.with_label_values(&[stream]).inc()
INGEST_TASKS.with_label_values(&[stream]).inc()
}

pub fn ingest_tasks_total_dec(stream: &str) {
INGEST_TASKS_TOTAL.with_label_values(&[stream]).dec()
INGEST_TASKS.with_label_values(&[stream]).dec()
}

pub fn ingest_tasks_reset(stream: &str) {
INGEST_TASKS_TOTAL.with_label_values(&[stream]).set(0)
INGEST_TASKS.with_label_values(&[stream]).set(0)
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -279,7 +279,7 @@ impl ProgramTransformerTaskStatusKind {
}

pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKind) {
PROGRAM_TRANSFORMER_TASK_STATUS
PROGRAM_TRANSFORMER_TASK_STATUS_COUNT
.with_label_values(&[kind.to_str()])
.inc()
}
4 changes: 0 additions & 4 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,4 @@ impl TrackedPipeline {
Err(_) => Err(counts),
}
}

pub fn size(&self) -> usize {
self.counts.values().sum()
}
}
9 changes: 9 additions & 0 deletions prometheus-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
global:
scrape_interval: 1s
evaluation_interval: 5s

scrape_configs:
- job_name: "prometheus"
honor_labels: true
static_configs:
- targets: ["host.docker.internal:8873", "host.docker.internal:8875"]

0 comments on commit 01bbe99

Please sign in to comment.