Skip to content

Commit

Permalink
Merge pull request dpkp#227 from wizzat-feature/py3
Browse files Browse the repository at this point in the history
Python 3 Support

Conflicts:
	kafka/producer.py
	test/test_client.py
	test/test_client_integration.py
	test/test_codec.py
	test/test_consumer.py
	test/test_consumer_integration.py
	test/test_failover_integration.py
	test/test_producer.py
	test/test_producer_integration.py
	test/test_protocol.py
	test/test_util.py
  • Loading branch information
Dana Powers committed Sep 8, 2014
2 parents a99384f + be23042 commit 715425c
Show file tree
Hide file tree
Showing 28 changed files with 444 additions and 344 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ language: python
python:
- 2.6
- 2.7
- 3.3
- 3.4
- pypy

env:
Expand Down
17 changes: 9 additions & 8 deletions kafka/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import binascii
import collections
import copy
import functools
Expand All @@ -20,7 +21,7 @@

class KafkaClient(object):

CLIENT_ID = "kafka-python"
CLIENT_ID = b"kafka-python"
ID_GEN = itertools.count()

# NOTE: The timeout given to the client should always be greater than the
Expand Down Expand Up @@ -81,7 +82,7 @@ def _next_id(self):
"""
Generate a new correlation id
"""
return KafkaClient.ID_GEN.next()
return next(KafkaClient.ID_GEN)

def _send_broker_unaware_request(self, requestId, request):
"""
Expand All @@ -96,7 +97,7 @@ def _send_broker_unaware_request(self, requestId, request):
return response
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
"trying next server: %s" % (binascii.b2a_hex(request), host, port, e))

raise KafkaUnavailableError("All servers failed to process request")

Expand Down Expand Up @@ -145,7 +146,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn(broker.host, broker.port)
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
Expand All @@ -160,11 +161,11 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
response = conn.recv(requestId)
except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
"from server %s: %s", binascii.b2a_hex(request), conn, e)
failed = True
except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
binascii.b2a_hex(request), conn, e)
failed = True

if failed:
Expand Down Expand Up @@ -233,8 +234,8 @@ def copy(self):
A reinit() has to be done on the copy before it can be used again
"""
c = copy.deepcopy(self)
for k, v in c.conns.items():
c.conns[k] = v.copy()
for key in c.conns:
c.conns[key] = self.conns[key].copy()
return c

def reinit(self):
Expand Down
19 changes: 11 additions & 8 deletions kafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from cStringIO import StringIO
from io import BytesIO
import gzip
import struct

_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
import six
from six.moves import xrange

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'

try:
Expand All @@ -21,7 +24,7 @@ def has_snappy():


def gzip_encode(payload):
buffer = StringIO()
buffer = BytesIO()
handle = gzip.GzipFile(fileobj=buffer, mode="w")
handle.write(payload)
handle.close()
Expand All @@ -32,7 +35,7 @@ def gzip_encode(payload):


def gzip_decode(payload):
buffer = StringIO(payload)
buffer = BytesIO(payload)
handle = gzip.GzipFile(fileobj=buffer, mode='r')
result = handle.read()
handle.close()
Expand Down Expand Up @@ -68,9 +71,9 @@ def _chunker():
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]

out = StringIO()
out = BytesIO()

header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])

out.write(header)
Expand Down Expand Up @@ -121,8 +124,8 @@ def snappy_decode(payload):

if _detect_xerial_stream(payload):
# TODO ? Should become a fileobj ?
out = StringIO()
byt = buffer(payload[16:])
out = BytesIO()
byt = payload[16:]
length = len(byt)
cursor = 0

Expand Down
14 changes: 10 additions & 4 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import struct
from threading import local

import six

from kafka.common import ConnectionError

log = logging.getLogger("kafka")
Expand All @@ -19,7 +21,7 @@ def collect_hosts(hosts, randomize=True):
randomize the returned list.
"""

if isinstance(hosts, basestring):
if isinstance(hosts, six.string_types):
hosts = hosts.strip().split(',')

result = []
Expand Down Expand Up @@ -92,7 +94,7 @@ def _read_bytes(self, num_bytes):
# Receiving empty string from recv signals
# that the socket is in error. we will never get
# more data from this socket
if data == '':
if data == b'':
raise socket.error("Not enough data to read message -- did server kill socket?")

except socket.error:
Expand All @@ -103,7 +105,7 @@ def _read_bytes(self, num_bytes):
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
responses.append(data)

return ''.join(responses)
return b''.join(responses)

##################
# Public API #
Expand Down Expand Up @@ -144,7 +146,7 @@ def recv(self, request_id):

# Read the remainder of the response
resp = self._read_bytes(size)
return str(resp)
return resp

def copy(self):
"""
Expand All @@ -153,6 +155,10 @@ def copy(self):
return a new KafkaConnection object
"""
c = copy.deepcopy(self)
# Python 3 doesn't copy custom attributes of the threadlocal subclass
c.host = copy.copy(self.host)
c.port = copy.copy(self.port)
c.timeout = copy.copy(self.timeout)
c._sock = None
return c

Expand Down
17 changes: 13 additions & 4 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
from __future__ import absolute_import

from itertools import izip_longest, repeat
try:
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
except ImportError: # python 2
from itertools import izip_longest as izip_longest, repeat
import logging
import time
import numbers
from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue

import six

try:
from Queue import Empty, Queue
except ImportError: # python 2
from queue import Empty, Queue

import kafka.common
from kafka.common import (
Expand Down Expand Up @@ -420,7 +429,7 @@ def _fetch(self):
for p in self.fetch_offsets.keys())
while partitions:
requests = []
for partition, buffer_size in partitions.iteritems():
for partition, buffer_size in six.iteritems(partitions):
requests.append(FetchRequest(self.topic, partition,
self.fetch_offsets[partition],
buffer_size))
Expand Down Expand Up @@ -582,7 +591,7 @@ def __init__(self, client, group, topic, auto_commit=True,
for chunk in chunks:
chunk = filter(lambda x: x is not None, chunk)
args = (client.copy(),
group, topic, chunk,
group, topic, list(chunk),
self.queue, self.start, self.exit,
self.pause, self.size)

Expand Down
2 changes: 1 addition & 1 deletion kafka/partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def partition(self, key, partitions):
if self.partitions != partitions:
self._set_partitions(partitions)

return self.iterpart.next()
return next(self.iterpart)


class HashedPartitioner(Partitioner):
Expand Down
16 changes: 11 additions & 5 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
import time
import random

from Queue import Empty
try:
from queue import Empty
except ImportError:
from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process

import six
from six.moves import xrange

from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
Expand Down Expand Up @@ -173,7 +179,7 @@ def send_messages(self, topic, partition, *msg):
raise TypeError("msg is not a list or tuple!")

# Raise TypeError if any message is not encoded as bytes
if any(not isinstance(m, bytes) for m in msg):
if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type bytes")

if self.async:
Expand Down Expand Up @@ -221,7 +227,7 @@ class SimpleProducer(Producer):
batch_send_every_t - If set, messages are send after this timeout
random_start - If true, randomize the initial partition which the
the first message block will be published to, otherwise
if false, the first message block will always publish
if false, the first message block will always publish
to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
Expand Down Expand Up @@ -252,9 +258,9 @@ def _next_partition(self, topic):
if self.random_start:
num_partitions = len(self.client.topic_partitions[topic])
for _ in xrange(random.randint(0, num_partitions-1)):
self.partition_cycles[topic].next()
next(self.partition_cycles[topic])

return self.partition_cycles[topic].next()
return next(self.partition_cycles[topic])

def send_messages(self, topic, *msg):
partition = self._next_partition(topic)
Expand Down
17 changes: 10 additions & 7 deletions kafka/protocol.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import struct
import zlib

import six

from six.moves import xrange

from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
Expand All @@ -13,7 +16,7 @@
UnsupportedCodecError
)
from kafka.util import (
read_short_string, read_int_string, relative_unpack,
crc32, read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition
)

Expand Down Expand Up @@ -67,7 +70,7 @@ def _encode_message_set(cls, messages):
Offset => int64
MessageSize => int32
"""
message_set = ""
message_set = b""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
Expand All @@ -94,8 +97,8 @@ def _encode_message(cls, message):
msg = struct.pack('>BB', message.magic, message.attributes)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
crc = zlib.crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
crc = crc32(msg)
msg = struct.pack('>I%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg
Expand Down Expand Up @@ -145,8 +148,8 @@ def _decode_message(cls, data, offset):
The offset is actually read from decode_message_set_iter (it is part
of the MessageSet payload).
"""
((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
if crc != zlib.crc32(data[4:]):
((crc, magic, att), cur) = relative_unpack('>IBB', data, 0)
if crc != crc32(data[4:]):
raise ChecksumError("Message checksum failed")

(key, cur) = read_int_string(data, cur)
Expand Down
17 changes: 12 additions & 5 deletions kafka/util.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import binascii
import collections
import struct
import sys
from threading import Thread, Event

import six

from kafka.common import BufferUnderflowError


def crc32(data):
return binascii.crc32(data) & 0xffffffff


def write_int_string(s):
if s is not None and not isinstance(s, str):
raise TypeError('Expected "%s" to be str\n'
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>i', -1)
Expand All @@ -17,12 +24,12 @@ def write_int_string(s):


def write_short_string(s):
if s is not None and not isinstance(s, str):
raise TypeError('Expected "%s" to be str\n'
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>h', -1)
elif len(s) > 32767 and sys.version < (2, 7):
elif len(s) > 32767 and sys.version_info < (2, 7):
# Python 2.6 issues a deprecation warning instead of a struct error
raise struct.error(len(s))
else:
Expand Down
Loading

0 comments on commit 715425c

Please sign in to comment.