Skip to content

Commit

Permalink
Merge pull request #134 from wizzat/conn_refactor
Browse files Browse the repository at this point in the history
conn.py performance improvements, make examples work, add another example
  • Loading branch information
dpkp committed Mar 22, 2014
2 parents e937e3f + a6fc260 commit 9bed11d
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 39 deletions.
51 changes: 38 additions & 13 deletions example.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,23 +1,48 @@
import logging
#!/usr/bin/env python
import threading, logging, time

from kafka.client import KafkaClient, FetchRequest, ProduceRequest
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer

def produce_example(client):
producer = SimpleProducer(client, "my-topic")
producer.send_messages("test")
class Producer(threading.Thread):
daemon = True

def consume_example(client):
consumer = SimpleConsumer(client, "test-group", "my-topic")
for message in consumer:
print(message)
def run(self):
client = KafkaClient("localhost:9092")
producer = SimpleProducer(client)

while True:
producer.send_messages('my-topic', "test")
producer.send_messages('my-topic', "\xc2Hola, mundo!")

time.sleep(1)


class Consumer(threading.Thread):
daemon = True

def run(self):
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "my-topic")

for message in consumer:
print(message)

def main():
client = KafkaClient("localhost:9092")
produce_example(client)
consume_example(client)
threads = [
Producer(),
Consumer()
]

for t in threads:
t.start()

time.sleep(5)

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.DEBUG
)
main()
6 changes: 3 additions & 3 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from functools import partial
from itertools import count

from kafka.common import (ErrorMapping, TopicAndPartition,
from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
LeaderUnavailableError,
Expand Down Expand Up @@ -199,8 +199,8 @@ def _raise_on_response_error(self, resp):
self.reset_topic_metadata(resp.topic)

raise BrokerResponseError(
"Request for %s failed with errorcode=%d" %
(TopicAndPartition(resp.topic, resp.partition), resp.error))
"Request for %s failed with errorcode=%d (%s)" %
(TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))

#################
# Public API #
Expand Down
36 changes: 21 additions & 15 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,28 @@
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])


ErrorStrings = {
-1 : 'UNKNOWN',
0 : 'NO_ERROR',
1 : 'OFFSET_OUT_OF_RANGE',
2 : 'INVALID_MESSAGE',
3 : 'UNKNOWN_TOPIC_OR_PARTITON',
4 : 'INVALID_FETCH_SIZE',
5 : 'LEADER_NOT_AVAILABLE',
6 : 'NOT_LEADER_FOR_PARTITION',
7 : 'REQUEST_TIMED_OUT',
8 : 'BROKER_NOT_AVAILABLE',
9 : 'REPLICA_NOT_AVAILABLE',
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',
}

class ErrorMapping(object):
# Many of these are not actually used by the client
UNKNOWN = -1
NO_ERROR = 0
OFFSET_OUT_OF_RANGE = 1
INVALID_MESSAGE = 2
UNKNOWN_TOPIC_OR_PARTITON = 3
INVALID_FETCH_SIZE = 4
LEADER_NOT_AVAILABLE = 5
NOT_LEADER_FOR_PARTITION = 6
REQUEST_TIMED_OUT = 7
BROKER_NOT_AVAILABLE = 8
REPLICA_NOT_AVAILABLE = 9
MESSAGE_SIZE_TO_LARGE = 10
STALE_CONTROLLER_EPOCH = 11
OFFSET_METADATA_TOO_LARGE = 12
pass

for k, v in ErrorStrings.items():
setattr(ErrorMapping, v, k)

#################
# Exceptions #
Expand Down
19 changes: 11 additions & 8 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self.timeout = timeout
self._sock.settimeout(self.timeout)
self._dirty = False
self._sock = None

self.reinit()

def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
Expand All @@ -73,24 +72,28 @@ def _raise_connection_error(self):

def _read_bytes(self, num_bytes):
bytes_left = num_bytes
resp = ''
responses = []

log.debug("About to read %d bytes from Kafka", num_bytes)
if self._dirty:
self.reinit()

while bytes_left:
try:
data = self._sock.recv(bytes_left)
data = self._sock.recv(min(bytes_left, 4096))
except socket.error:
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()

if data == '':
log.error("Not enough data to read this response")
self._raise_connection_error()

bytes_left -= len(data)
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
resp += data
responses.append(data)

return resp
return ''.join(responses)

##################
# Public API #
Expand Down
60 changes: 60 additions & 0 deletions load_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env python
import threading, logging, time, collections

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer

msg_size = 524288

class Producer(threading.Thread):
daemon = True
big_msg = "1" * msg_size

def run(self):
client = KafkaClient("localhost:9092")
producer = SimpleProducer(client)
self.sent = 0

while True:
producer.send_messages('my-topic', self.big_msg)
self.sent += 1


class Consumer(threading.Thread):
daemon = True

def run(self):
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "my-topic",
max_buffer_size = None,
)
self.valid = 0
self.invalid = 0

for message in consumer:
if len(message.message.value) == msg_size:
self.valid += 1
else:
self.invalid += 1

def main():
threads = [
Producer(),
Consumer()
]

for t in threads:
t.start()

time.sleep(10)
print 'Messages sent: %d' % threads[0].sent
print 'Messages recvd: %d' % threads[1].valid
print 'Messages invalid: %d' % threads[1].invalid

if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.DEBUG
)
main()

0 comments on commit 9bed11d

Please sign in to comment.