diff --git a/kafka/client.py b/kafka/client.py index 48a534e41..0b2a1888e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,11 +2,11 @@ import collections import copy import functools -import itertools import logging import time import kafka.common +from ctypes import c_uint32 from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, @@ -23,7 +23,7 @@ class KafkaClient(object): CLIENT_ID = b"kafka-python" - ID_GEN = itertools.count() + ID_GEN = 0 # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a @@ -101,7 +101,8 @@ def _next_id(self): """ Generate a new correlation id """ - return next(KafkaClient.ID_GEN) + KafkaClient.ID_GEN = c_uint32(KafkaClient.ID_GEN + 1).value + return KafkaClient.ID_GEN def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ diff --git a/kafka/protocol.py b/kafka/protocol.py index 2a39de620..1c844791c 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -52,7 +52,7 @@ def _encode_message_header(cls, client_id, correlation_id, request_key): """ Encode the common request envelope """ - return struct.pack('>hhih%ds' % len(client_id), + return struct.pack('>hhIh%ds' % len(client_id), request_key, # ApiKey 0, # ApiVersion correlation_id, # CorrelationId