Skip to content

Commit

Permalink
Use MurmurHash2 for key partition hashing
Browse files Browse the repository at this point in the history
  • Loading branch information
Dana Powers committed Jun 10, 2015
1 parent 8e3cd1c commit 4e339a7
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
5 changes: 3 additions & 2 deletions kafka/partitioner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .roundrobin import RoundRobinPartitioner
from .hashed import HashedPartitioner
from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner

__all__ = [
'RoundRobinPartitioner', 'HashedPartitioner'
'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner',
'LegacyPartitioner'
]
96 changes: 95 additions & 1 deletion kafka/partitioner/hashed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
from .base import Partitioner

class HashedPartitioner(Partitioner):

class Murmur2Partitioner(Partitioner):
"""
Implements a partitioner which selects the target partition based on
the hash of the key. Attempts to apply the same hashing
function as mainline java client.
"""
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions

# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
idx = (murmur2(key) & 0x7fffffff) % len(partitions)

return partitions[idx]


class LegacyPartitioner(Partitioner):
"""DEPRECATED -- See Issue 374
Implements a partitioner which selects the target partition based on
the hash of the key
"""
Expand All @@ -12,3 +30,79 @@ def partition(self, key, partitions=None):
idx = hash(key) % size

return partitions[idx]


# Default will change to Murmur2 in 0.10 release
HashedPartitioner = LegacyPartitioner


# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
def murmur2(key):
"""Pure-python Murmur2 implementation.
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
Args:
key: if not a bytearray, converted via bytearray(str(key))
Returns: MurmurHash2 of key bytearray
"""

# Convert key to a bytearray
if not isinstance(key, bytearray):
data = bytearray(str(key))

This comment has been minimized.

Copy link
@rthille

rthille Oct 22, 2015

This will throw an error on Unicode strings... Does anyone know what Java does for Unicode? UTF-8?

This comment has been minimized.

Copy link
@rthille

rthille Oct 22, 2015

And if 'key' is a bytearray, data is never assigned to.

length = len(data)
seed = 0x9747b28c
# 'm' and 'r' are mixing constants generated offline.
# They're not really 'magic', they just happen to work well.
m = 0x5bd1e995
r = 24

# Initialize the hash to a random value
h = seed ^ length
length4 = length / 4

for i in range(length4):
i4 = i * 4
k = ((data[i4 + 0] & 0xff) +
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24))
k &= 0xffffffff
k *= m
k &= 0xffffffff
k ^= (k % 0x100000000) >> r # k ^= k >>> r
k &= 0xffffffff
k *= m
k &= 0xffffffff

h *= m
h &= 0xffffffff
h ^= k
h &= 0xffffffff

# Handle the last few bytes of the input array
extra_bytes = length % 4
if extra_bytes == 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h &= 0xffffffff

if extra_bytes == 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8

This comment has been minimized.

Copy link
@rthille

rthille Oct 22, 2015

Should be if extra_bytes >= 2

h &= 0xffffffff

if extra_bytes == 1:
h ^= (data[length & ~3] & 0xff)

This comment has been minimized.

Copy link
@rthille

rthille Oct 22, 2015

Should be if extra_bytes >= 1

h &= 0xffffffff
h *= m
h &= 0xffffffff

h ^= (h % 0x100000000) >> 13 # h >>> 13;
h &= 0xffffffff
h *= m
h &= 0xffffffff
h ^= (h % 0x100000000) >> 15 # h >>> 15;
h &= 0xffffffff

return h

0 comments on commit 4e339a7

Please sign in to comment.