Skip to content

Commit

Permalink
Buffer redis messages (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola authored Nov 11, 2024
1 parent f311e14 commit f8bdef2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 44 deletions.
2 changes: 2 additions & 0 deletions grpc-ingest/config-ingester.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ snapshots:
xack_buffer_size: 10_000
# maximum size of a batch for redis acknowledgment processing.
xack_batch_max_size: 500
# maximum number of redis messages to keep to buffer.
messages_buffer_size: 200

# accounts configuration for the ingester
accounts:
Expand Down
10 changes: 10 additions & 0 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,22 @@ pub struct ConfigIngestStream {
deserialize_with = "deserialize_usize_str"
)]
pub xack_buffer_size: usize,
#[serde(
default = "ConfigIngestStream::default_message_buffer_size",
deserialize_with = "deserialize_usize_str"
)]
pub message_buffer_size: usize,
}

impl ConfigIngestStream {
pub const fn default_xack_buffer_size() -> usize {
1_000
}

pub const fn default_message_buffer_size() -> usize {
100
}

pub const fn default_max_concurrency() -> usize {
2
}
Expand Down
118 changes: 75 additions & 43 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ impl Acknowledge {

let count = ids.len();

ack_tasks_total_inc(&config.name);
match redis::pipe()
.atomic()
.xack(&config.name, &config.group, &ids)
Expand Down Expand Up @@ -403,6 +402,68 @@ impl<H: MessageHandler> IngestStream<H> {
let (ack_tx, mut ack_rx) = tokio::sync::mpsc::channel::<String>(config.xack_buffer_size);
let (ack_shutdown_tx, mut ack_shutdown_rx) = tokio::sync::oneshot::channel::<()>();

let (msg_tx, mut msg_rx) =
tokio::sync::mpsc::channel::<Vec<StreamId>>(config.message_buffer_size);
let (msg_shutdown_tx, mut msg_shutdown_rx) = tokio::sync::oneshot::channel::<()>();

let config_messages = Arc::clone(&config);

let messages = tokio::spawn(async move {
let mut tasks = FuturesUnordered::new();
let config = Arc::clone(&config_messages);
let handler = handler.clone();

loop {
tokio::select! {
Some(ids) = msg_rx.recv() => {
for StreamId { id, map } in ids {
if tasks.len() >= config.max_concurrency {
tasks.next().await;
}

let handler = handler.clone();
let ack_tx = ack_tx.clone();
let config = Arc::clone(&config);

ingest_tasks_total_inc(&config.name);

tasks.push(tokio::spawn(async move {
let result = handler.handle(map).await.map_err(IngestMessageError::into);

match result {
Ok(()) => {
program_transformer_task_status_inc(ProgramTransformerTaskStatusKind::Success);
}
Err(IngestMessageError::RedisStreamMessage(e)) => {
error!("Failed to process message: {:?}", e);
program_transformer_task_status_inc(e.into());
}
Err(IngestMessageError::DownloadMetadataJson(e)) => {
program_transformer_task_status_inc(e.into());
}
Err(IngestMessageError::ProgramTransformer(e)) => {
error!("Failed to process message: {:?}", e);
program_transformer_task_status_inc(e.into());
}
}

if let Err(e) = ack_tx.send(id).await {
error!(target: "ingest_stream", "action=send_ack stream={} error={:?}", &config.name, e);
}

ingest_tasks_total_dec(&config.name);
}));
}
}
_ = &mut msg_shutdown_rx => {
break;
}
}
}

while (tasks.next().await).is_some() {}
});

let ack = tokio::spawn({
let config = Arc::clone(&config);
let mut pending = Vec::new();
Expand All @@ -426,6 +487,7 @@ impl<H: MessageHandler> IngestStream<H> {
let ids = std::mem::take(&mut pending);
let handler = Arc::clone(&handler);


tasks.push(tokio::spawn(async move {
handler.handle(ids).await;
}));
Expand All @@ -439,6 +501,9 @@ impl<H: MessageHandler> IngestStream<H> {
}
let ids = std::mem::take(&mut pending);
let handler = Arc::clone(&handler);

ack_tasks_total_inc(&config.name);

tasks.push(tokio::spawn(async move {
handler.handle(ids).await;
}));
Expand Down Expand Up @@ -482,11 +547,9 @@ impl<H: MessageHandler> IngestStream<H> {

let control = tokio::spawn({
let mut connection = connection.clone();
let mut tasks = FuturesUnordered::new();

async move {
let config = Arc::clone(&config);
let handler = handler.clone();

debug!(target: "ingest_stream", "action=read_stream_start stream={}", config.name);

Expand All @@ -495,7 +558,13 @@ impl<H: MessageHandler> IngestStream<H> {

tokio::select! {
_ = &mut shutdown_rx => {
while (tasks.next().await).is_some() {}
if let Err(e) = msg_shutdown_tx.send(()) {
error!(target: "ingest_stream", "action=msg_shutdown stream={} error={:?}", &config.name, e);
}

if let Err(e) = messages.await {
error!(target: "ingest_stream", "action=await_messages stream={} error={:?}", &config.name, e);
}

if let Err(e) = ack_shutdown_tx.send(()) {
error!(target: "ingest_stream", "action=ack_shutdown stream={} error={:?}", &config.name, e);
Expand All @@ -517,45 +586,8 @@ impl<H: MessageHandler> IngestStream<H> {

redis_xread_inc(&config.name, count);

for StreamId { id, map } in ids {
if tasks.len() >= config.max_concurrency {
tasks.next().await;
}

let handler = handler.clone();
let ack_tx = ack_tx.clone();
let config = Arc::clone(&config);

ingest_tasks_total_inc(&config.name);

tasks.push(tokio::spawn(async move {
let result = handler.handle(map).await.map_err(IngestMessageError::into);

match result {
Ok(()) => {
program_transformer_task_status_inc(ProgramTransformerTaskStatusKind::Success);
}
Err(IngestMessageError::RedisStreamMessage(e)) => {
error!("Failed to process message: {:?}", e);

program_transformer_task_status_inc(e.into());
}
Err(IngestMessageError::DownloadMetadataJson(e)) => {
program_transformer_task_status_inc(e.into());
}
Err(IngestMessageError::ProgramTransformer(e)) => {
error!("Failed to process message: {:?}", e);

program_transformer_task_status_inc(e.into());
}
}

ingest_tasks_total_dec(&config.name);

if let Err(e) = ack_tx.send(id).await {
error!(target: "ingest_stream", "action=send_ack stream={} error={:?}", &config.name, e);
}
}));
if let Err(e) = msg_tx.send(ids).await {
error!(target: "ingest_stream", "action=send_ids stream={} error={:?}", &config.name, e);
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion prometheus-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ scrape_configs:
- job_name: "prometheus"
honor_labels: true
static_configs:
- targets: ["host.docker.internal:8873", "host.docker.internal:8875"]
- targets:
[
"host.docker.internal:8873",
"host.docker.internal:8875",
"host.docker.internal:8876",
]

0 comments on commit f8bdef2

Please sign in to comment.