Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: FNV1a 32-Bit Partitioner #2300

Open
wants to merge 47 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ce5138c
FNV1a_32 bit partitioner class implementation
kemalty Jan 27, 2022
7c8c878
Added tests and improved docs
kemalty Jan 27, 2022
b95e46d
Rename project from kafka-python to kafka-python-ng (#1)
wbarnha Mar 7, 2024
78c74c0
Fix artifact downloads for release
wbarnha Mar 7, 2024
e796019
Fix badge links in README.rst
wbarnha Mar 8, 2024
38e159a
Reconfigure tests to complete in a more timely manner and skip some i…
wbarnha Mar 8, 2024
93c6e34
Merge branch 'master' into feat/fnv1a-partitioner
wbarnha Mar 8, 2024
e762321
Test Kafka 0.8.2.2 using Python 3.10 in the meantime (#161)
wbarnha Mar 9, 2024
00750aa
Remove support for EOL'ed versions of Python (#160)
wbarnha Mar 9, 2024
5bd1323
Stop testing Python 3.13 in python-package.yml (#162)
wbarnha Mar 9, 2024
cda8f81
Avoid 100% CPU usage while socket is closed (#156)
wbarnha Mar 9, 2024
c02df08
Fix DescribeConfigsResponse_v1 config_source (#150)
wbarnha Mar 9, 2024
f867312
Merge branch 'master' into feat/fnv1a-partitioner
wbarnha Mar 10, 2024
65eacfb
Fix base class of DescribeClientQuotasResponse_v0 (#144)
wbarnha Mar 10, 2024
e0ebe5d
Update license_file to license_files (#131)
wbarnha Mar 10, 2024
26bb3eb
Update some RST documentation syntax (#130)
wbarnha Mar 10, 2024
88763da
Fix crc32c's __main__ for Python 3 (#142)
wbarnha Mar 10, 2024
b1a4c53
Strip trailing dot off hostname. (#133)
wbarnha Mar 10, 2024
18eaa2d
Handle OSError to properly recycle SSL connection, fix infinite loop …
wbarnha Mar 10, 2024
54cbd63
client_async: Allow throwing an exception upon socket error during (#…
wbarnha Mar 10, 2024
eb6fd9b
Log connection errors at ERROR level (#139)
wbarnha Mar 12, 2024
b1f844f
Merge branch 'master' into feat/fnv1a-partitioner
wbarnha Mar 14, 2024
6ad79a4
Support custom SASL mechanisms including AWS MSK (#170)
wbarnha Mar 18, 2024
deeccfa
Update python-package.yml to have 15m as timeout
wbarnha Mar 18, 2024
fcca556
Run pyupgrade on everything. (#171)
wbarnha Mar 18, 2024
a856dc4
Remove all vendoring (#169)
s-t-e-v-e-n-k Mar 19, 2024
339de91
Merge branch 'master' into feat/fnv1a-partitioner
wbarnha Mar 19, 2024
2f2ccb1
Support Describe log dirs (#145)
wbarnha Mar 19, 2024
0259502
Update conftest.py to use request.node.originalname instead for legal…
wbarnha Mar 20, 2024
3c124b2
KIP-345 Static membership implementation (#137)
wbarnha Mar 20, 2024
1f4d8e1
Merge branch 'master' into feat/fnv1a-partitioner
wbarnha Mar 22, 2024
56065da
Use monkeytype to create some semblance of typing (#173)
wbarnha Mar 26, 2024
cbf317b
Add zstd support on legacy record and ensure no variable is referred …
wbarnha Mar 26, 2024
af1a5f0
Update __init__.py of SASL to catch ImportErrors in case botocore is …
wbarnha Mar 27, 2024
aba153f
Add botocore to extras in setup.py
wbarnha Mar 27, 2024
6c9eb37
Add connection_timeout_ms and reset the timeout counter more often (#…
wbarnha Apr 3, 2024
6756974
add validate_config function for msk module (#176)
Sharu95 Apr 4, 2024
ec9cae5
Merge branch 'master' into feat/fnv1a-partitioner
wbarnha Apr 9, 2024
611471f
Fix ssl connection (#178)
dingxiong Apr 10, 2024
deebd8f
Fix badge typo in README.rst
wbarnha Apr 23, 2024
448017e
Merge branch 'dpkp:master' into master
wbarnha May 14, 2024
5e461a7
Patch pylint warnings so tests pass again (#184)
wbarnha Jul 12, 2024
401896b
Update README.rst to close #179
wbarnha Jul 17, 2024
31a6b92
Avoid busy retry (#192)
orange-kao Aug 9, 2024
9bee9fc
fix scram username character escape (#196)
debuggings Aug 15, 2024
6104623
Improve test/test_consumer_integration.py in GitHub runner (#194)
orange-kao Oct 3, 2024
4aff793
Merge branch 'master' into feat/fnv1a-partitioner
wbarnha Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions kafka/partitioner/fnv1a_32.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from __future__ import absolute_import

import random

from kafka.vendor import six


class FNV1a32Partitioner(object):
"""Partitioner with FNV1a 32-bit hash algorithm.

Hashes key to partition using FNV1a 32-bit hashing.
If key is None, selects partition randomly from available,
or from all partitions if none are currently available.
"""
@classmethod
def __call__(cls, key, all_partitions, available):
"""
Partitioner implementation using FNV1a 32-bit hashing function and
decimal conversion with two's complement. If key is passed with None
value, the selection of the partition is random.

The implementation details are selected to make sure the same key
is mapped to the same partition in Goka/Sarama. It is confirmed
that this implementation works the same as the partitioner of
github.com/lovoo/goka v1.0.5 with Go version 1.16.

Algorithm details:
http://www.isthe.com/chongo/tech/comp/fnv/#FNV-param

:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
if key is None:
if available:
return random.choice(available)
return random.choice(all_partitions)

key_hash = _get_fnv1a_32(key)
key_hash = _get_twos_complement_32bit(key_hash)
key_hash = abs(key_hash)
idx = key_hash % len(all_partitions)
return all_partitions[idx]


def _get_twos_complement_32bit(value: int) -> int:
"""
Returns the signed two's complement decimal conversion.

Algorithm details:
http://sandbox.mc.edu/~bennet/cs110/tc/tctod.html

Taken from:
https://stackoverflow.com/questions/1604464/twos-complement-in-python
"""
bit_base = 32
if (value & (1 << (bit_base - 1))) != 0:
value = value - (1 << bit_base)
return value


def _get_fnv1a_32(key: bytes) -> int:
"""
Returns the FNV1a 32bit hash of the given key.

Algorithm details:
http://www.isthe.com/chongo/tech/comp/fnv/#FNV-param

Taken from:
https://github.com/znerol/py-fnvhash/blob/master/fnvhash/__init__.py
"""
# We set the same init_offset and prime for the FNV hasher as
# defined in Golang FNV package. The Go FNV is the package Sarama
# uses for its hashing calculations under the hood.
# References:
# https://cs.opensource.google/go/go/+/refs/tags/go1.17.3:src/hash/fnv/fnv.go;l=31
# https://cs.opensource.google/go/go/+/refs/tags/go1.17.3:src/hash/fnv/fnv.go;l=35
init_offset = 0x811c9dc5
prime = 0x01000193
hash_size = 2 ** 32

# Python2 bytes is really a str, causing the bitwise operations below to fail
# so convert to bytearray.
if six.PY2:
key = bytearray(bytes(key))

key_hash = init_offset
for byte in key:
key_hash = key_hash ^ byte
key_hash = (key_hash * prime) % hash_size
return key_hash
15 changes: 15 additions & 0 deletions test/test_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from kafka.partitioner import DefaultPartitioner, murmur2
from kafka.partitioner.fnv1a_32 import FNV1a32Partitioner, _get_twos_complement_32bit, _get_fnv1a_32


def test_default_partitioner():
Expand Down Expand Up @@ -36,3 +37,17 @@ def test_murmur2_not_ascii():
# Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode
murmur2(b'\xa4')
murmur2(b'\x81' * 1000)


@pytest.mark.parametrize("key,partitions,available,expected", [
(b"123", [0, 1, 2], [0, 1, 2], 2),
(b"123", [0, 1], [0, 1], 1),
(b"123", [0], [0], 0),
(b"f232oo3232", [0, 1, 2, 3], [0, 1, 2, 3], 2),
(b"f232oo3232", [0, 1], [0, 1], 0),
(b"f232oo3232", [0], [0], 0),
])
def test_fnv1a_32_partitioner(key, partitions, available, expected):
partitioner = FNV1a32Partitioner()
out = partitioner(key, partitions, available)
assert out == expected