From 3bfca14df91a2e0b571fd40e187fdf803cefb177 Mon Sep 17 00:00:00 2001 From: Price Hiller Date: Mon, 16 Dec 2024 13:51:19 -0600 Subject: [PATCH] fix(kafka sink): retry messages that result in kafka policy violations Problem: Some messages were getting dropped by Vector due to Kafka throwing `PolicyViolation` errors. These should be retried as a policy can be as simple as a more aggressive rate limit. ---------- Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error. ---------- Note: A dynamic back off may be better, as there may be a rate limit out there that still needs more than 100ms to back off on requests. ---------- See the original issue at https://github.com/vectordotdev/vector/issues/22026 Closes https://github.com/vectordotdev/vector/issues/22026 --- src/sinks/kafka/service.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 087795864b594..94017e22e7f36 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -159,9 +159,12 @@ impl Service for KafkaService { }) .map_err(|(err, _)| err); } - // Producer queue is full. + // Producer queue is full or a policy has been violated and the request should + // be retried Err(( - KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), + KafkaError::MessageProduction( + RDKafkaErrorCode::QueueFull | RDKafkaErrorCode::PolicyViolation, + ), original_record, )) => { if blocked_state.is_none() {