From 1275f1ad67ea02cb0ba4ca476695268611d1f185 Mon Sep 17 00:00:00 2001 From: Price Hiller Date: Wed, 18 Dec 2024 17:37:25 -0600 Subject: [PATCH] fix(kafka sink): retry messages that result in kafka policy violations (#22041) * fix(kafka sink): retry messages that result in kafka policy violations Problem: Some messages were getting dropped by Vector due to Kafka throwing `PolicyViolation` errors. These should be retried as a policy can be as simple as a more aggressive rate limit. ---------- Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error. ---------- Note: A dynamic back off may be better, as there may be a rate limit out there that still needs more than 100ms to back off on requests. ---------- See the original issue at https://github.com/vectordotdev/vector/issues/22026 Closes https://github.com/vectordotdev/vector/issues/22026 * Update changelog.d/22026-retry-kafka-policy-violations.fix.md --------- Co-authored-by: Jesse Szwedko --- changelog.d/22026-retry-kafka-policy-violations.fix.md | 3 +++ src/sinks/kafka/service.rs | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 changelog.d/22026-retry-kafka-policy-violations.fix.md diff --git a/changelog.d/22026-retry-kafka-policy-violations.fix.md b/changelog.d/22026-retry-kafka-policy-violations.fix.md new file mode 100644 index 0000000000000..80f5652f886d2 --- /dev/null +++ b/changelog.d/22026-retry-kafka-policy-violations.fix.md @@ -0,0 +1,3 @@ +Retry Kafka messages that error with a policy violation so messages are not lost. + +authors: PriceHiller diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 087795864b594..94017e22e7f36 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -159,9 +159,12 @@ impl Service for KafkaService { }) .map_err(|(err, _)| err); } - // Producer queue is full. + // Producer queue is full or a policy has been violated and the request should + // be retried Err(( - KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), + KafkaError::MessageProduction( + RDKafkaErrorCode::QueueFull | RDKafkaErrorCode::PolicyViolation, + ), original_record, )) => { if blocked_state.is_none() {