Skip to content

Commit

Permalink
Only throttle based on the task semaphore (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola authored Oct 14, 2024
1 parent 6f158f3 commit a347066
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 22 deletions.
31 changes: 10 additions & 21 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
config::{ConfigIngester, REDIS_STREAM_DATA_KEY},
postgres::{create_pool as pg_create_pool, report_pgpool},
prom::redis_xack_inc,
prom::{redis_xadd_status_inc},
redis::{AccountHandle, DownloadMetadataJsonHandle, IngestStream, TransactionHandle},
util::create_shutdown,
},
Expand All @@ -12,7 +12,7 @@ use {
redis::aio::MultiplexedConnection,
std::sync::Arc,
tokio::time::{sleep, Duration},
tracing::{error, warn},
tracing::{warn},
};

fn download_metadata_notifier_v2(
Expand All @@ -32,18 +32,20 @@ fn download_metadata_notifier_v2(

let info_bytes = serde_json::to_vec(&info)?;

redis::cmd("XADD")
let xadd = redis::cmd("XADD")
.arg(&stream)
.arg("MAXLEN")
.arg("~")
.arg(stream_maxlen)
.arg("*")
.arg(REDIS_STREAM_DATA_KEY)
.arg(info_bytes)
.query_async(&mut connection)
.await?;
.query_async::<_, redis::Value>(&mut connection)
.await;

redis_xack_inc(&stream, 1);
let status = xadd.map(|_| ()).map_err(|_| ());

redis_xadd_status_inc(&stream, status, 1);

Ok(())
})
Expand Down Expand Up @@ -127,27 +129,14 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {

report.abort();

let (accounts, transactions, snapshots, download_metadatas) = futures::future::join4(
futures::future::join_all(vec![
accounts.stop(),
transactions.stop(),
snapshots.stop(),
download_metadatas.stop(),
)
])
.await;

if let Err(e) = accounts {
error!(target: "ingester", action = "stop_accounts", message = "Failed to stop accounts stream cleanly", error = ?e);
}
if let Err(e) = transactions {
error!(target: "ingester", action = "stop_transactions", message = "Failed to stop transactions stream cleanly", error = ?e);
}
if let Err(e) = snapshots {
error!(target: "ingester", action = "stop_snapshots", message = "Failed to stop snapshots stream cleanly", error = ?e);
}
if let Err(e) = download_metadatas {
error!(target: "ingester", action = "stop_download_metadatas", message = "Failed to stop download_metadatas stream cleanly", error = ?e);
}

pool.close().await;

Ok::<(), anyhow::Error>(())
Expand Down
2 changes: 1 addition & 1 deletion grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ impl<H: MessageHandler> IngestStream<H> {

break;
},
result = self.read(&mut connection), if ack_tx.capacity() >= config.batch_size => {
result = self.read(&mut connection) => {
match result {
Ok(reply) => {
for StreamKey { key: _, ids } in reply.keys {
Expand Down

0 comments on commit a347066

Please sign in to comment.