-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: FNV1a 32-Bit Partitioner #2300
base: master
Are you sure you want to change the base?
Conversation
Hey @dpkp - How shall we proceed with this? Any directions? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's give it a try. I'm not willing to make it the default partitioner, but I'm all for giving users the choice to try other partitioning algorithms.
Thanks @wbarnha - Could you please guide me to the next steps? I think someone needs to mark it as "reviewed" in the Reviewable tool. |
I'll include it after the next release of kafka-python. I have a very specific release plan for 2.0.3/2.1.0. This will probably be included in 2.2.0/2.1.0. |
…terations for Kafka 0.8.2 and Python 3.12 (dpkp#159) * skip failing tests for PyPy since they work locally * Reconfigure tests for PyPy and 3.12 * Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2 * Update test_partitioner.py * Update test_producer.py * Timeout tests after ten minutes * Set 0.8.2.2 to be experimental from hereon * Formally support PyPy 3.9
* Test Kafka 0.8.2.2 using Python 3.11 in the meantime * Override PYTHON_LATEST conditionally in python-package.yml * Update python-package.yml * add python annotation to kafka version test matrix * Update python-package.yml * try python 3.10
* Remove support for EOL'ed versions of Python * Update setup.py
Too many MRs to review... so little time.
After stop/start kafka service, kafka-python may use 100% CPU caused by busy-retry while the socket was closed. This fix the issue by unregister the socket if the fd is negative. Co-authored-by: Orange Kao <[email protected]>
Co-authored-by: Ryar Nyah <[email protected]>
Co-authored-by: Denis Otkidach <[email protected]>
The former has been deprecated since setuptools 56 Co-authored-by: micwoj92 <[email protected]>
* docs: Update syntax in README.rst * docs: Update code block syntax in docs/index.rst --------- Co-authored-by: HalfSweet <[email protected]>
* Fix crc32c's __main__ for Python 3 * Remove TODO from _crc32c.py --------- Co-authored-by: Yonatan Goldschmidt <[email protected]>
Co-authored-by: Dave Voutila <[email protected]>
…pkp#155) * handling OSError * better error output * removed traceback logging --------- Co-authored-by: Alexander Sibiryakov <[email protected]>
…pkp#134) wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis <[email protected]> Co-authored-by: shimon-armis <[email protected]>
Co-authored-by: drewdogg <[email protected]>
* Support custom SASL mechanisms There is some interest in supporting various SASL mechanisms not currently included in the library: * dpkp#2110 (DMS) * dpkp#2204 (SSPI) * dpkp#2232 (AWS_MSK_IAM) Adding these mechanisms in the core library may be undesirable due to: * Increased maintenance burden. * Unavailable testing environments. * Vendor specificity. This commit provides a quick prototype for a pluggable SASL system. --- **Example** To define a custom SASL mechanism a module must implement two methods: ```py def validate_config(conn): # Check configuration values, available libraries, etc. assert conn.config['vendor_specific_setting'] is not None, ( 'vendor_specific_setting required when sasl_mechanism=MY_SASL' ) def try_authenticate(conn, future): # Do authentication routine and return resolved Future with failed # or succeeded state. ``` And then the custom mechanism should be registered before initializing a KafkaAdminClient, KafkaConsumer, or KafkaProducer: ```py import kafka.sasl from kafka import KafkaProducer import my_sasl kafka.sasl.register_mechanism('MY_SASL', my_sasl) producer = KafkaProducer(sasl_mechanism='MY_SASL') ``` --- **Notes** **ABCs** This prototype does not implement an ABC for custom SASL mechanisms. Using an ABC would reduce a few of the explicit assertions involved with registering a mechanism and is a viable option. Due to differing feature sets between py2/py3 this option was not explored, but shouldn't be difficult. **Private Methods** This prototype relies on some methods that are currently marked as **private** in `BrokerConnection`. * `._can_send_recv` * `._lock` * `._recv_bytes_blocking` * `._send_bytes_blocking` A pluggable system would require stable interfaces for these actions. **Alternative Approach** If the module-scoped dict modification in `register_mechanism` feels too clunky maybe the addtional mechanisms can be specified via an argument when initializing one of the `Kafka*` classes? * Add test_msk.py by @mattoberle * add msk to __init__ and check for extension in conn.py * rename try_authenticate in msk.py * fix imports * fix imports * add botocore to requirements-dev.txt * add boto3 to requirements-dev.txt * add awscli to requirements-dev.txt * add awscli to workflow since it takes too long to install normally * just install botocore i guess * just install boto3 i guess * force reinstall awscli * try something weird * ok now the dang tests should work and if they don't i'll cry * skip the msk test for now... * Revert "skip the msk test for now..." This reverts commit 1c29667. * skip the msk test for now... * nvm just needed to update tox lol * Update kafka/sasl/gssapi.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> * Update kafka/sasl/oauthbearer.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> * Update kafka/sasl/plain.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> * Update kafka/sasl/scram.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> * Update kafka/sasl/msk.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> --------- Co-authored-by: Matt Oberle <[email protected]> Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Now that the codebase has been modernised by using pyupgrade, we can also remove all backported vendor modules, and all uses of them.
I implemented API KEY 35 from the official Apache Kafka documentation. This functionality is requested in issue # 2163 and this is an implementation proposal. Co-authored-by: chopatate <[email protected]>
… topic naming (dpkp#172) * Update conftest.py to use request.node.originalname instead for legal topic naming Otherwise parametrization doesn't work. * Update test/conftest.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> --------- Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
* KIP-345 Add static consumer membership support * KIP-345 Add examples to docs * KIP-345 Add leave_group_on_close flag https://issues.apache.org/jira/browse/KAFKA-6995 * KIP-345 Add tests for static membership * KIP-345 Update docs for leave_group_on_close option * Update changelog.rst * remove six from base.py * Update base.py * Update base.py * Update base.py * Update changelog.rst * Update README.rst --------- Co-authored-by: Denis Kazakov <[email protected]> Co-authored-by: Denis Kazakov <[email protected]>
* Add typing * define types as Struct for simplicity's sake
…before definition (dpkp#138) * fix if statement logic and add zstd check * fix if statement logic and add zstd uncompress * fix imports * avoid variable be used before definition * Remove unused import from legacy_records.py --------- Co-authored-by: Alexandre Souza <[email protected]>
…not installed (dpkp#175) Closes wbarnha#174.
…pkp#132) * Add connection_timeout_ms and reset the timeout counter more often * Refactor last_attempt -> last_activity This semantically reflects the new usage of the variable better * Make tests work again * Add unit tests of new BrokerConnection functionality The test mocks parts of BrokerConnection in order to assert that the connection state machine allows long-lasting connections as long as the state progresses often enough * Re-introduce last_attempt to avoid breakage --------- Co-authored-by: Liam S. Crouch <[email protected]>
* Fix ssl connection after wrap_ssl * test * refactor * remove global level * test * revert test * address comments
* stop pylint complaint for uncovered conditional flow * add todo to revisit * formatting makes me happy :) * Fix errors raised by new version of Pylint so tests pass again
Test test/test_consumer_group.py::test_group and test/test_admin_integration.py::test_describe_consumer_group_exists busy-retry and this might have caused Java not having enough CPU time on GitHub runner, and result in test failure.
According to [rfc5802](https://datatracker.ietf.org/doc/html/rfc5802), username should escape special characters before sending to the server. > The characters ',' or '=' in usernames are sent as '=2C' and '=3D' respectively. If the server receives a username that contains '=' not followed by either '2C' or '3D', then the server MUST fail the authentication.
test/test_consumer_integration.py::test_kafka_consumer__blocking failed in https://github.com/wbarnha/kafka-python-ng/actions/runs/10361086008/job/28680735389?pr=186 because it took 592ms to finish. Output from the GitHub runner attached This commit increase TIMEOUT_MS so it is less likely to fail on GitHub runner. # Ask for 5 messages, 10 in queue. Get 5 back, no blocking messages = [] with Timer() as t: for i in range(5): msg = next(consumer) messages.append(msg) assert_message_count(messages, 5) > assert t.interval < (TIMEOUT_MS / 1000.0) E assert 0.5929090976715088 < (500 / 1000.0) E + where 0.5929090976715088 = <test.testutil.Timer object at 0x7f6c4b50e960>.interval Co-authored-by: William Barnhart <[email protected]>
Summary
This PR introduces FNV1-a 32-bit hasher as a new partitioner. The tests confirmed that this implementation yields the same hash and partition selection as Goka/Sarama.
Why
Kafka leaves the partition selection up to the producer. This creates a potential discrepancy issue when different libraries are used. As different libraries across different programming languages may use different hashers, or different configurations for them, the messages may arrive at partitions different than expected. When different hashers or different configurations are used, the message with the same key may arrive at an unexpected partition.
This was the case for us at Beat. We realized that messages that are produced by Python services using Kafka-python arrive at partitions that are different than what consumer services expect. We realized that this is because Kafka-python uses murmur2 as the default partitioner, while Goka library uses Sarama which uses FNV1a-32.
What
This PR introduces a new partitioner that is based on FNV1a 32-bit. It is a separate and isolated implementation. Therefore users can select using the default one, or this one easily.
The partitioner is designed to match the partitioner of Goka/Sarama. It uses the "a" variant of FNV based on 32-bit. It also utilizes twos-complement decimal conversions, and the same parameters used for mentioned libraries. As a result, with this partitioner, python-kafka calculates the same hash and selects the same partition as Goka/Sarama does. In our tests, we experimentally proved that this implementation outputs the same hash and partitioner as Goka/Sarama does.
Who
I worked together on this change with Evgenia Martynova at Beat. Beat is a ride-hailing company that has engineering hubs in Athens and Amsterdam. We are always on the look for great engineers! We are hiring!
This change is