Skip to content

Commit

Permalink
gevent compatible from several PR
Browse files Browse the repository at this point in the history
  • Loading branch information
quard8 committed Dec 16, 2014
1 parent 3689529 commit ffd141a
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 19 deletions.
1 change: 1 addition & 0 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, hosts, client_id=CLIENT_ID,

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"

host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(
Expand Down
19 changes: 18 additions & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def collect_hosts(hosts, randomize=True):
randomize the returned list.
"""

if isinstance(hosts, list):
return hosts

if isinstance(hosts, six.string_types):
hosts = hosts.strip().split(',')

Expand All @@ -38,7 +41,21 @@ def collect_hosts(hosts, randomize=True):
return result


class KafkaConnection(local):
class ConnectionBaseClass(type):
"""
Metaclass to make KafkaConnection work with gevent.
"""
def __new__(mcs, name, bases, attrs):
if local.__module__ == 'gevent.local':
# change base to object to avoid using one connection per greenlet
bases = (object,)
else:
bases = (local,)

return type(name, bases, attrs)


class KafkaConnection(six.with_metaclass(ConnectionBaseClass)):
"""
A socket connection to a single Kafka broker
Expand Down
11 changes: 11 additions & 0 deletions kafka/green/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from kafka import *

from kafka.green.producer import _Producer, _SimpleProducer, _KeyedProducer
from kafka.green.conn import _KafkaConnection
from kafka.green.client import _KafkaClient

Producer=_Producer
SimpleProducer=_SimpleProducer
KeyedProducer=_KeyedProducer
KafkaConnection=_KafkaConnection
KafkaClient=_KafkaClient
27 changes: 27 additions & 0 deletions kafka/green/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from kafka.client import KafkaClient, DEFAULT_SOCKET_TIMEOUT_SECONDS

from .conn import _KafkaConnection

class _KafkaClient(KafkaClient):

def __init__(self, hosts, client_id=KafkaClient.CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(_KafkaClient, self).__init__(hosts=hosts, client_id=client_id, timeout=timeout)

def copy(self):
# have to override this since copy.deepcopy cannot serialize
# a gevent.socket
return _KafkaClient(self.hosts, self.client_id, self.timeout)

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"
host_key = (host, port)

if host_key not in self.conns:
self.conns[host_key] = _KafkaConnection(
host,
port,
timeout=self.timeout
)

return self.conns[host_key]
29 changes: 29 additions & 0 deletions kafka/green/conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import gevent.socket as socket
import logging

from kafka.conn import KafkaConnection

log = logging.getLogger("kafka")

class _KafkaConnection(KafkaConnection):
"""
Gevent version of kafka.KafkaConnection class. Uses
gevent.socket instead of socket.socket.
"""
def __init__(self, host, port, timeout=10):
super(_KafkaConnection, self).__init__(host, port, timeout)

def reinit(self):
"""
Re-initialize the socket connection
"""
log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port))

if self._sock:
self.close()

try:
self._sock = socket.create_connection((self.host, self.port), self.timeout)
except socket.error:
log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port))
self._raise_connection_error()
39 changes: 39 additions & 0 deletions kafka/green/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from kafka.producer.base import Producer, _send_upstream, STOP_ASYNC_PRODUCER
from kafka.producer.simple import SimpleProducer
from kafka.producer.keyed import KeyedProducer

import gevent
from gevent.queue import Queue


class _ProducerMixin(object):

def _setup_async(self, batch_send_every_t, batch_send_every_n):
self.queue = Queue() # Messages are sent through this queue
self.proc = gevent.spawn(_send_upstream,
self.queue,
self.client.copy(),
self.codec,
batch_send_every_t,
batch_send_every_n,
self.req_acks,
self.ack_timeout)

def stop(self, timeout=1):
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None))
self.proc.join(timeout)
if self.proc.dead is False:
self.proc.kill()


class _Producer(_ProducerMixin, Producer):
pass


class _SimpleProducer(_ProducerMixin, SimpleProducer):
pass


class _KeyedProducer(_ProducerMixin, KeyedProducer):
pass
43 changes: 25 additions & 18 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from collections import defaultdict
from multiprocessing import Queue, Process

import gevent

import six

from kafka.common import (
Expand Down Expand Up @@ -49,7 +51,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# timeout is reached
while count > 0 and timeout >= 0:
try:
topic_partition, msg, key = queue.get(timeout=timeout)
topic_partition, msg, key = queue.get_nowait()

except Empty:
break
Expand Down Expand Up @@ -80,6 +82,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
except Exception:
log.exception("Unable to send message")

gevent.sleep(0.1)


class Producer(object):
"""
Expand Down Expand Up @@ -136,22 +140,25 @@ def __init__(self, client, async=False,
self.codec = codec

if self.async:
log.warning("async producer does not guarantee message delivery!")
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
args=(self.queue,
self.client.copy(),
self.codec,
batch_send_every_t,
batch_send_every_n,
self.req_acks,
self.ack_timeout))

# Process will die if main thread exits
self.proc.daemon = True
self.proc.start()
self._setup_async(batch_send_every_t, batch_send_every_n)

def _setup_async(self, batch_send_every_t, batch_send_every_n):
log.warning("async producer does not guarantee message delivery!")
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
args=(self.queue,
self.client.copy(),
self.codec,
batch_send_every_t,
batch_send_every_n,
self.req_acks,
self.ack_timeout))

# Process will die if main thread exits
self.proc.daemon = True
self.proc.start()

def send_messages(self, topic, partition, *msg):
"""
Expand Down Expand Up @@ -188,7 +195,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):

if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition), m, key))
self.queue.put_nowait((TopicAndPartition(topic, partition), m, key))
resp = []
else:
messages = create_message_set(msg, self.codec, key)
Expand Down

0 comments on commit ffd141a

Please sign in to comment.