diff --git a/grpc-ingest/src/ingester.rs b/grpc-ingest/src/ingester.rs index 1dbdb42e5..a3d23d029 100644 --- a/grpc-ingest/src/ingester.rs +++ b/grpc-ingest/src/ingester.rs @@ -2,7 +2,7 @@ use { crate::{ config::{ConfigIngester, REDIS_STREAM_DATA_KEY}, postgres::{create_pool as pg_create_pool, report_pgpool}, - prom::redis_xack_inc, + prom::{redis_xadd_status_inc}, redis::{AccountHandle, DownloadMetadataJsonHandle, IngestStream, TransactionHandle}, util::create_shutdown, }, @@ -12,7 +12,7 @@ use { redis::aio::MultiplexedConnection, std::sync::Arc, tokio::time::{sleep, Duration}, - tracing::{error, warn}, + tracing::{warn}, }; fn download_metadata_notifier_v2( @@ -32,7 +32,7 @@ fn download_metadata_notifier_v2( let info_bytes = serde_json::to_vec(&info)?; - redis::cmd("XADD") + let xadd = redis::cmd("XADD") .arg(&stream) .arg("MAXLEN") .arg("~") @@ -40,10 +40,12 @@ fn download_metadata_notifier_v2( .arg("*") .arg(REDIS_STREAM_DATA_KEY) .arg(info_bytes) - .query_async(&mut connection) - .await?; + .query_async::<_, redis::Value>(&mut connection) + .await; - redis_xack_inc(&stream, 1); + let status = xadd.map(|_| ()).map_err(|_| ()); + + redis_xadd_status_inc(&stream, status, 1); Ok(()) }) @@ -127,27 +129,14 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { report.abort(); - let (accounts, transactions, snapshots, download_metadatas) = futures::future::join4( + futures::future::join_all(vec![ accounts.stop(), transactions.stop(), snapshots.stop(), download_metadatas.stop(), - ) + ]) .await; - if let Err(e) = accounts { - error!(target: "ingester", action = "stop_accounts", message = "Failed to stop accounts stream cleanly", error = ?e); - } - if let Err(e) = transactions { - error!(target: "ingester", action = "stop_transactions", message = "Failed to stop transactions stream cleanly", error = ?e); - } - if let Err(e) = snapshots { - error!(target: "ingester", action = "stop_snapshots", message = "Failed to stop snapshots stream cleanly", error = ?e); - } - if let Err(e) = download_metadatas { - error!(target: "ingester", action = "stop_download_metadatas", message = "Failed to stop download_metadatas stream cleanly", error = ?e); - } - pool.close().await; Ok::<(), anyhow::Error>(()) diff --git a/grpc-ingest/src/redis.rs b/grpc-ingest/src/redis.rs index 614067eec..58aab463e 100644 --- a/grpc-ingest/src/redis.rs +++ b/grpc-ingest/src/redis.rs @@ -607,7 +607,7 @@ impl IngestStream { break; }, - result = self.read(&mut connection), if ack_tx.capacity() >= config.batch_size => { + result = self.read(&mut connection) => { match result { Ok(reply) => { for StreamKey { key: _, ids } in reply.keys {