Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer Redis Messages #179

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions grpc-ingest/config-ingester.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ snapshots:
xack_buffer_size: 10_000
# maximum size of a batch for redis acknowledgment processing.
xack_batch_max_size: 500
# maximum number of redis messages to keep to buffer.
messages_buffer_size: 200

# accounts configuration for the ingester
accounts:
Expand Down
10 changes: 10 additions & 0 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,22 @@ pub struct ConfigIngestStream {
deserialize_with = "deserialize_usize_str"
)]
pub xack_buffer_size: usize,
#[serde(
default = "ConfigIngestStream::default_message_buffer_size",
deserialize_with = "deserialize_usize_str"
)]
pub message_buffer_size: usize,
}

impl ConfigIngestStream {
pub const fn default_xack_buffer_size() -> usize {
1_000
}

pub const fn default_message_buffer_size() -> usize {
100
}

pub const fn default_max_concurrency() -> usize {
2
}
Expand Down
118 changes: 75 additions & 43 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ impl Acknowledge {

let count = ids.len();

ack_tasks_total_inc(&config.name);
match redis::pipe()
.atomic()
.xack(&config.name, &config.group, &ids)
Expand Down Expand Up @@ -403,6 +402,68 @@ impl<H: MessageHandler> IngestStream<H> {
let (ack_tx, mut ack_rx) = tokio::sync::mpsc::channel::<String>(config.xack_buffer_size);
let (ack_shutdown_tx, mut ack_shutdown_rx) = tokio::sync::oneshot::channel::<()>();

let (msg_tx, mut msg_rx) =
tokio::sync::mpsc::channel::<Vec<StreamId>>(config.message_buffer_size);
let (msg_shutdown_tx, mut msg_shutdown_rx) = tokio::sync::oneshot::channel::<()>();

let config_messages = Arc::clone(&config);

let messages = tokio::spawn(async move {
let mut tasks = FuturesUnordered::new();
let config = Arc::clone(&config_messages);
let handler = handler.clone();

loop {
tokio::select! {
Some(ids) = msg_rx.recv() => {
for StreamId { id, map } in ids {
if tasks.len() >= config.max_concurrency {
tasks.next().await;
}

let handler = handler.clone();
let ack_tx = ack_tx.clone();
let config = Arc::clone(&config);

ingest_tasks_total_inc(&config.name);

tasks.push(tokio::spawn(async move {
let result = handler.handle(map).await.map_err(IngestMessageError::into);

match result {
Ok(()) => {
program_transformer_task_status_inc(ProgramTransformerTaskStatusKind::Success);
}
Err(IngestMessageError::RedisStreamMessage(e)) => {
error!("Failed to process message: {:?}", e);
program_transformer_task_status_inc(e.into());
}
Err(IngestMessageError::DownloadMetadataJson(e)) => {
program_transformer_task_status_inc(e.into());
}
Err(IngestMessageError::ProgramTransformer(e)) => {
error!("Failed to process message: {:?}", e);
program_transformer_task_status_inc(e.into());
}
}

if let Err(e) = ack_tx.send(id).await {
error!(target: "ingest_stream", "action=send_ack stream={} error={:?}", &config.name, e);
}

ingest_tasks_total_dec(&config.name);
}));
}
}
_ = &mut msg_shutdown_rx => {
break;
}
}
}

while (tasks.next().await).is_some() {}
});

let ack = tokio::spawn({
let config = Arc::clone(&config);
let mut pending = Vec::new();
Expand All @@ -426,6 +487,7 @@ impl<H: MessageHandler> IngestStream<H> {
let ids = std::mem::take(&mut pending);
let handler = Arc::clone(&handler);


tasks.push(tokio::spawn(async move {
handler.handle(ids).await;
}));
Expand All @@ -439,6 +501,9 @@ impl<H: MessageHandler> IngestStream<H> {
}
let ids = std::mem::take(&mut pending);
let handler = Arc::clone(&handler);

ack_tasks_total_inc(&config.name);

tasks.push(tokio::spawn(async move {
handler.handle(ids).await;
}));
Expand Down Expand Up @@ -482,11 +547,9 @@ impl<H: MessageHandler> IngestStream<H> {

let control = tokio::spawn({
let mut connection = connection.clone();
let mut tasks = FuturesUnordered::new();

async move {
let config = Arc::clone(&config);
let handler = handler.clone();

debug!(target: "ingest_stream", "action=read_stream_start stream={}", config.name);

Expand All @@ -495,7 +558,13 @@ impl<H: MessageHandler> IngestStream<H> {

tokio::select! {
_ = &mut shutdown_rx => {
while (tasks.next().await).is_some() {}
if let Err(e) = msg_shutdown_tx.send(()) {
error!(target: "ingest_stream", "action=msg_shutdown stream={} error={:?}", &config.name, e);
}

if let Err(e) = messages.await {
error!(target: "ingest_stream", "action=await_messages stream={} error={:?}", &config.name, e);
}

if let Err(e) = ack_shutdown_tx.send(()) {
error!(target: "ingest_stream", "action=ack_shutdown stream={} error={:?}", &config.name, e);
Expand All @@ -517,45 +586,8 @@ impl<H: MessageHandler> IngestStream<H> {

redis_xread_inc(&config.name, count);

for StreamId { id, map } in ids {
if tasks.len() >= config.max_concurrency {
tasks.next().await;
}

let handler = handler.clone();
let ack_tx = ack_tx.clone();
let config = Arc::clone(&config);

ingest_tasks_total_inc(&config.name);

tasks.push(tokio::spawn(async move {
let result = handler.handle(map).await.map_err(IngestMessageError::into);

match result {
Ok(()) => {
program_transformer_task_status_inc(ProgramTransformerTaskStatusKind::Success);
}
Err(IngestMessageError::RedisStreamMessage(e)) => {
error!("Failed to process message: {:?}", e);

program_transformer_task_status_inc(e.into());
}
Err(IngestMessageError::DownloadMetadataJson(e)) => {
program_transformer_task_status_inc(e.into());
}
Err(IngestMessageError::ProgramTransformer(e)) => {
error!("Failed to process message: {:?}", e);

program_transformer_task_status_inc(e.into());
}
}

ingest_tasks_total_dec(&config.name);

if let Err(e) = ack_tx.send(id).await {
error!(target: "ingest_stream", "action=send_ack stream={} error={:?}", &config.name, e);
}
}));
if let Err(e) = msg_tx.send(ids).await {
error!(target: "ingest_stream", "action=send_ids stream={} error={:?}", &config.name, e);
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion prometheus-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ scrape_configs:
- job_name: "prometheus"
honor_labels: true
static_configs:
- targets: ["host.docker.internal:8873", "host.docker.internal:8875"]
- targets:
[
"host.docker.internal:8873",
"host.docker.internal:8875",
"host.docker.internal:8876",
]
Loading