diff --git a/kafka/client.py b/kafka/client.py index 8c786944e..b9e28a523 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -49,6 +49,7 @@ def __init__(self, hosts, client_id=CLIENT_ID, def _get_conn(self, host, port): "Get or create a connection to a broker using host and port" + host_key = (host, port) if host_key not in self.conns: self.conns[host_key] = KafkaConnection( diff --git a/kafka/conn.py b/kafka/conn.py index ddfee8bb7..f98bc52aa 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -21,6 +21,9 @@ def collect_hosts(hosts, randomize=True): randomize the returned list. """ + if isinstance(hosts, list): + return hosts + if isinstance(hosts, six.string_types): hosts = hosts.strip().split(',') @@ -38,7 +41,21 @@ def collect_hosts(hosts, randomize=True): return result -class KafkaConnection(local): +class ConnectionBaseClass(type): + """ + Metaclass to make KafkaConnection work with gevent. + """ + def __new__(mcs, name, bases, attrs): + if local.__module__ == 'gevent.local': + # change base to object to avoid using one connection per greenlet + bases = (object,) + else: + bases = (local,) + + return type(name, bases, attrs) + + +class KafkaConnection(six.with_metaclass(ConnectionBaseClass)): """ A socket connection to a single Kafka broker diff --git a/kafka/green/__init__.py b/kafka/green/__init__.py new file mode 100644 index 000000000..798ba5108 --- /dev/null +++ b/kafka/green/__init__.py @@ -0,0 +1,11 @@ +from kafka import * + +from kafka.green.producer import _Producer, _SimpleProducer, _KeyedProducer +from kafka.green.conn import _KafkaConnection +from kafka.green.client import _KafkaClient + +Producer=_Producer +SimpleProducer=_SimpleProducer +KeyedProducer=_KeyedProducer +KafkaConnection=_KafkaConnection +KafkaClient=_KafkaClient diff --git a/kafka/green/client.py b/kafka/green/client.py new file mode 100644 index 000000000..e0392588e --- /dev/null +++ b/kafka/green/client.py @@ -0,0 +1,27 @@ +from kafka.client import KafkaClient, DEFAULT_SOCKET_TIMEOUT_SECONDS + +from .conn import _KafkaConnection + +class _KafkaClient(KafkaClient): + + def __init__(self, hosts, client_id=KafkaClient.CLIENT_ID, + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + super(_KafkaClient, self).__init__(hosts=hosts, client_id=client_id, timeout=timeout) + + def copy(self): + # have to override this since copy.deepcopy cannot serialize + # a gevent.socket + return _KafkaClient(self.hosts, self.client_id, self.timeout) + + def _get_conn(self, host, port): + "Get or create a connection to a broker using host and port" + host_key = (host, port) + + if host_key not in self.conns: + self.conns[host_key] = _KafkaConnection( + host, + port, + timeout=self.timeout + ) + + return self.conns[host_key] diff --git a/kafka/green/conn.py b/kafka/green/conn.py new file mode 100644 index 000000000..0160159f2 --- /dev/null +++ b/kafka/green/conn.py @@ -0,0 +1,29 @@ +import gevent.socket as socket +import logging + +from kafka.conn import KafkaConnection + +log = logging.getLogger("kafka") + +class _KafkaConnection(KafkaConnection): + """ + Gevent version of kafka.KafkaConnection class. Uses + gevent.socket instead of socket.socket. + """ + def __init__(self, host, port, timeout=10): + super(_KafkaConnection, self).__init__(host, port, timeout) + + def reinit(self): + """ + Re-initialize the socket connection + """ + log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port)) + + if self._sock: + self.close() + + try: + self._sock = socket.create_connection((self.host, self.port), self.timeout) + except socket.error: + log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port)) + self._raise_connection_error() diff --git a/kafka/green/producer.py b/kafka/green/producer.py new file mode 100644 index 000000000..0791123b6 --- /dev/null +++ b/kafka/green/producer.py @@ -0,0 +1,39 @@ +from kafka.producer.base import Producer, _send_upstream, STOP_ASYNC_PRODUCER +from kafka.producer.simple import SimpleProducer +from kafka.producer.keyed import KeyedProducer + +import gevent +from gevent.queue import Queue + + +class _ProducerMixin(object): + + def _setup_async(self, batch_send_every_t, batch_send_every_n): + self.queue = Queue() # Messages are sent through this queue + self.proc = gevent.spawn(_send_upstream, + self.queue, + self.client.copy(), + self.codec, + batch_send_every_t, + batch_send_every_n, + self.req_acks, + self.ack_timeout) + + def stop(self, timeout=1): + if self.async: + self.queue.put((STOP_ASYNC_PRODUCER, None)) + self.proc.join(timeout) + if self.proc.dead is False: + self.proc.kill() + + +class _Producer(_ProducerMixin, Producer): + pass + + +class _SimpleProducer(_ProducerMixin, SimpleProducer): + pass + + +class _KeyedProducer(_ProducerMixin, KeyedProducer): + pass diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92c9..3fac8e7bb 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -10,6 +10,8 @@ from collections import defaultdict from multiprocessing import Queue, Process +import gevent + import six from kafka.common import ( @@ -49,7 +51,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # timeout is reached while count > 0 and timeout >= 0: try: - topic_partition, msg, key = queue.get(timeout=timeout) + topic_partition, msg, key = queue.get_nowait() except Empty: break @@ -80,6 +82,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, except Exception: log.exception("Unable to send message") + gevent.sleep(0.1) + class Producer(object): """ @@ -136,22 +140,25 @@ def __init__(self, client, async=False, self.codec = codec if self.async: - log.warning("async producer does not guarantee message delivery!") - log.warning("Current implementation does not retry Failed messages") - log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=_send_upstream, - args=(self.queue, - self.client.copy(), - self.codec, - batch_send_every_t, - batch_send_every_n, - self.req_acks, - self.ack_timeout)) - - # Process will die if main thread exits - self.proc.daemon = True - self.proc.start() + self._setup_async(batch_send_every_t, batch_send_every_n) + + def _setup_async(self, batch_send_every_t, batch_send_every_n): + log.warning("async producer does not guarantee message delivery!") + log.warning("Current implementation does not retry Failed messages") + log.warning("Use at your own risk! (or help improve with a PR!)") + self.queue = Queue() # Messages are sent through this queue + self.proc = Process(target=_send_upstream, + args=(self.queue, + self.client.copy(), + self.codec, + batch_send_every_t, + batch_send_every_n, + self.req_acks, + self.ack_timeout)) + + # Process will die if main thread exits + self.proc.daemon = True + self.proc.start() def send_messages(self, topic, partition, *msg): """ @@ -188,7 +195,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + self.queue.put_nowait((TopicAndPartition(topic, partition), m, key)) resp = [] else: messages = create_message_set(msg, self.codec, key)