Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(kafka source): fix acknowledgement handling during shutdown and r…
…ebalance events (vectordotdev#17497) * test(kafka source): integration tests for acknowledgement handling during shutdown and rebalance events When running locally, this is most easily tested using an existing kafka topic pre-populated with, say 100k messages: ❯ kcat -b $BROKER_ADDR -P -t TestTopic -K : -l <(for i in $(seq 1 100000); do echo "${i}:{\"value\": ${i}}"; done); ...then running the tests, targeting that topic through environment variables. This can be a bit finicky with regard to timings, so KAFKA_SHUTDOWN_DELAY controls how long to run the first consumer before shutting down (drain at shutdown test), and KAFKA_CONSUMER_DELAY controls the time between starting new consumers during the rebalancing test. ❯ KAFKA_SEND_COUNT=0 \ KAFKA_EXPECT_COUNT=100000 \ KAFKA_TEST_TOPIC=TestTopic \ KAFKA_CONSUMER_DELAY=5000 \ KAFKA_SHUTDOWN_DELAY=5000 \ KAFKA_HOST=$BROKER_ADDR \ KAFKA_PORT=9092 \ cargo test --lib --no-default-features -F sources-kafka -F kafka-integration-tests drains_acknowledgement * fix(kafka source): drain pending acknowledgements on shutdown and rebalance * fix(kafka source): performance improvements for acknowledgement handling on many partitions Instead of tokio StreamMap, which gets very slow when more than a handful of partitions are involved, use a task and forwarding channel for each partition. Introduces a little bookkeeping, but scales well to at least hundreds of partitions * clippy fixes, and remove unnecessary last_offset tracking * cargo fmt again * fmt * clean up handle_messages loop and add a tracing span for metrics collection * fixup changes lost after merging master * clippy warning * enhancement(kafka source): kafka source uses a dedicated task per partition to consume & acknowledge messages * make the spelling checker happy, maybe? * emit a debug log instead of panicking if a shutdown happens during a consumer rebalance * improved partition eof handling * add OptionFuture to drain deadline and EOF handling, and use is_subset to detect when all expected partitions are finished draining * replace OnceCell with OnceLock * cargo fmt * create clear distinction between consuming and draining states * add "complete" as a terminal state, and "keep_consuming", "keep_draining", and "complete" methods for descriptive state (non)transitions * use state transition methods consistently for all state transitions * slightly clearer assertion messages about what is expected * update obsolete comment, make coordinator loop condition explicit * use keep_consuming from the drain_timeout while consuming handler * rely solely on adding/removing entries in expect_drain to detect when draining is complete * fix comment :P * clippy/fmt fixes * minor cleanup: during shutdown, use is_drain_complete to detect the already-finished state * integration test uses `FuturesUnordered` for better performance Co-authored-by: Doug Smith <[email protected]> * use FuturesUnordered * use 6 partitions for integration test * integration test using 125k messages * add drain_timeout_ms option for kafka source * enforce drain_timeout_ms < session_timeout_ms when building kafka source * generate component docs * use Option::{unzip, map_or} methods Co-authored-by: Bruce Guenter <[email protected]> * remove OnceLock on callback channel sender, and other review cleanups - avoid panic in case split_partition_queue returns None - remove `pub` markers that are not needed - add comments around drain coordination signalling - cargo fmt * members of Keys struct are cloned once per consumed partition, instead of entire config object * cargo fmt and fix clippy warnings --------- Co-authored-by: Doug Smith <[email protected]> Co-authored-by: Bruce Guenter <[email protected]>
- Loading branch information