Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages getting dropped unintentionally on kafka sink #22026

Closed
heshanperera-alert opened this issue Dec 12, 2024 · 3 comments · Fixed by #22041
Closed

Messages getting dropped unintentionally on kafka sink #22026

heshanperera-alert opened this issue Dec 12, 2024 · 3 comments · Fixed by #22041
Labels
sink: kafka Anything `kafka` sink related type: bug A code related bug.

Comments

@heshanperera-alert
Copy link

heshanperera-alert commented Dec 12, 2024

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

I am using Kafka sink to send messages to azure eventhub. I have noted that messages gets dropped when the azure event-hub hits the throttling limits. When I checked the vector's internal metrics, i can see that these messages gets dropped unintenionally. Is there a way to determine whether these events are retried automatically by vector?

I am going through the documentation on Kafka sink, but there doesn't seem to have a way or a flag to retry these events upon failures

Version

0.37.1

Debug Output

vector_common::internal_event::service: Service call failed. No retries or retries exhausted. error=Some(KafkaError (Message production error: PolicyViolation (Broker: Policy violation)))

@heshanperera-alert heshanperera-alert added the type: bug A code related bug. label Dec 12, 2024
@pront pront added the sink: kafka Anything `kafka` sink related label Dec 12, 2024
@jszwedko
Copy link
Member

Thanks @heshanperera-alert . It does seem like this error should be retried. I think the change would need to be somewhere in here if anyone feels motivated:

match this.kafka_producer.send_result(record) {
// Record was successfully enqueued on the producer.
Ok(fut) => {
// Drop the blocked state (if any), as the producer is no longer blocked.
drop(blocked_state.take());
return fut
.await
.expect("producer unexpectedly dropped")
.map(|_| KafkaResponse {
event_byte_size,
raw_byte_size,
event_status: EventStatus::Delivered,
})
.map_err(|(err, _)| err);
}
// Producer queue is full.
Err((
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
original_record,
)) => {
if blocked_state.is_none() {
blocked_state =
Some(BlockedRecordState::new(Arc::clone(&this.records_blocked)));
}
record = original_record;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// A final/non-retriable error occurred.
Err((
err @ KafkaError::MessageProduction(
RDKafkaErrorCode::InvalidMessage
| RDKafkaErrorCode::InvalidMessageSize
| RDKafkaErrorCode::MessageSizeTooLarge
| RDKafkaErrorCode::UnknownTopicOrPartition
| RDKafkaErrorCode::InvalidRecord
| RDKafkaErrorCode::InvalidRequiredAcks
| RDKafkaErrorCode::TopicAuthorizationFailed
| RDKafkaErrorCode::UnsupportedForMessageFormat
| RDKafkaErrorCode::ClusterAuthorizationFailed,
),
_,
)) => return Err(err),
// A different error occurred. Set event status to Errored not Rejected.
Err(_) => {
return Ok(KafkaResponse {
event_byte_size: config::telemetry().create_request_count_byte_size(),
raw_byte_size: 0,
event_status: EventStatus::Errored,
})
}
};
}

@PriceHiller
Copy link
Contributor

So would a simple fix be to shove the RDKafkaErrorCode::PolicyVioltion into that area like so?

diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs
index 087795864..4772833e9 100644
--- a/src/sinks/kafka/service.rs
+++ b/src/sinks/kafka/service.rs
@@ -161,7 +161,7 @@ impl Service<KafkaRequest> for KafkaService {
                     }
                     // Producer queue is full.
                     Err((
-                        KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
+                        KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull | RDKafkaErrorCode::PolicyViolation),
                         original_record,
                     )) => {
                         if blocked_state.is_none() {

I'm not entirely certain how that chunk of code works, it appears to just delay by a bit and retry the request after 100ms if I'm understanding it right.

@jszwedko
Copy link
Member

So would a simple fix be to shove the RDKafkaErrorCode::PolicyVioltion into that area like so?

diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs
index 087795864..4772833e9 100644
--- a/src/sinks/kafka/service.rs
+++ b/src/sinks/kafka/service.rs
@@ -161,7 +161,7 @@ impl Service<KafkaRequest> for KafkaService {
                     }
                     // Producer queue is full.
                     Err((
-                        KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
+                        KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull | RDKafkaErrorCode::PolicyViolation),
                         original_record,
                     )) => {
                         if blocked_state.is_none() {

I'm not entirely certain how that chunk of code works, it appears to just delay by a bit and retry the request after 100ms if I'm understanding it right.

Yeah, I think that would work. And yes, it just waits 100ms and retries.

PriceHiller added a commit to PriceHiller/vector that referenced this issue Dec 16, 2024
Problem: Some messages were getting dropped by Vector due to Kafka
throwing `PolicyViolation` errors. These should be retried as a policy
can be as simple as a more aggressive rate limit.

----------

Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error.

----------

Note: A dynamic back off may be better, as there may be a rate limit out
there that still needs more than 100ms to back off on requests.

----------

See the original issue at vectordotdev#22026

Closes vectordotdev#22026
PriceHiller added a commit to PriceHiller/vector that referenced this issue Dec 16, 2024
Problem: Some messages were getting dropped by Vector due to Kafka
throwing `PolicyViolation` errors. These should be retried as a policy
can be as simple as a more aggressive rate limit.

----------

Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error.

----------

Note: A dynamic back off may be better, as there may be a rate limit out
there that still needs more than 100ms to back off on requests.

----------

See the original issue at vectordotdev#22026

Closes vectordotdev#22026
PriceHiller added a commit to PriceHiller/vector that referenced this issue Dec 16, 2024
Problem: Some messages were getting dropped by Vector due to Kafka
throwing `PolicyViolation` errors. These should be retried as a policy
can be as simple as a more aggressive rate limit.

----------

Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error.

----------

Note: A dynamic back off may be better, as there may be a rate limit out
there that still needs more than 100ms to back off on requests.

----------

See the original issue at vectordotdev#22026

Closes vectordotdev#22026
PriceHiller added a commit to PriceHiller/vector that referenced this issue Dec 16, 2024
Problem: Some messages were getting dropped by Vector due to Kafka
throwing `PolicyViolation` errors. These should be retried as a policy
can be as simple as a more aggressive rate limit.

----------

Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error.

----------

Note: A dynamic back off may be better, as there may be a rate limit out
there that still needs more than 100ms to back off on requests.

----------

See the original issue at vectordotdev#22026

Closes vectordotdev#22026
github-merge-queue bot pushed a commit that referenced this issue Dec 19, 2024
#22041)

* fix(kafka sink): retry messages that result in kafka policy violations

Problem: Some messages were getting dropped by Vector due to Kafka
throwing `PolicyViolation` errors. These should be retried as a policy
can be as simple as a more aggressive rate limit.

----------

Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error.

----------

Note: A dynamic back off may be better, as there may be a rate limit out
there that still needs more than 100ms to back off on requests.

----------

See the original issue at #22026

Closes #22026

* Update changelog.d/22026-retry-kafka-policy-violations.fix.md

---------

Co-authored-by: Jesse Szwedko <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sink: kafka Anything `kafka` sink related type: bug A code related bug.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants