From e99a751081f59189ed818220100a1db2d8a13b6f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 4 Apr 2015 13:57:15 -0700 Subject: [PATCH 1/4] Fix python3 str/bytes bug in KafkaConsumer.set_topic_partitions({(topic, partition): offset, }) --- kafka/consumer/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 6f5bcdd94..59597d9d5 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -224,7 +224,7 @@ def set_topic_partitions(self, *topics): topic = kafka_bytestring(key[0]) partition = key[1] self._consume_topic_partition(topic, partition) - self._offsets.fetch[key] = value + self._offsets.fetch[(topic, partition)] = value else: raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg)) From 87b2ca8e60832170a3c4ab3e391509ce40cb6faa Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 4 Apr 2015 20:19:40 -0700 Subject: [PATCH 2/4] call _consume_topic_partition() before storing offset in KafkaConsumer.set_topic_partitions((topic, partition, offset)) --- kafka/consumer/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 59597d9d5..423ba63fc 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -194,10 +194,10 @@ def set_topic_partitions(self, *topics): elif isinstance(arg, tuple): topic = kafka_bytestring(arg[0]) partition = arg[1] + self._consume_topic_partition(topic, partition) if len(arg) == 3: offset = arg[2] self._offsets.fetch[(topic, partition)] = offset - self._consume_topic_partition(topic, partition) # { topic: partitions, ... } dict elif isinstance(arg, dict): From 811fd4cbb903064e3961c60a0b39c43b9473c322 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 4 Apr 2015 21:04:58 -0700 Subject: [PATCH 3/4] Use list comprehension on _topics in KafkaConsumer.fetch_messages --- kafka/consumer/kafka.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 423ba63fc..79cee28d6 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -312,16 +312,16 @@ def fetch_messages(self): max_wait_time = self._config['fetch_wait_max_ms'] min_bytes = self._config['fetch_min_bytes'] - # Get current fetch offsets - offsets = self._offsets.fetch - if not offsets: - if not self._topics: - raise KafkaConfigurationError('No topics or partitions configured') + if not self._topics: + raise KafkaConfigurationError('No topics or partitions configured') + + if not self._offsets.fetch: raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') - fetches = [] - for topic_partition, offset in six.iteritems(offsets): - fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes)) + fetches = [FetchRequest(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) + for (topic, partition) in self._topics] # client.send_fetch_request will collect topic/partition requests by leader # and send each group as a single FetchRequest to the correct broker From c0fc334612f7a98ab98c0f970288ebe0023b42db Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 4 Apr 2015 21:34:24 -0700 Subject: [PATCH 4/4] Use kafka_bytestring when decoding message topics in KafkaConsumer.fetch_messages --- kafka/consumer/kafka.py | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 79cee28d6..47a5b00b4 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -336,49 +336,53 @@ def fetch_messages(self): return for resp in responses: - topic_partition = (resp.topic, resp.partition) + topic = kafka_bytestring(resp.topic) + partition = resp.partition try: check_error(resp) except OffsetOutOfRangeError: - logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d ' - '(Highwatermark: %d)', - resp.topic, resp.partition, - offsets[topic_partition], resp.highwaterMark) + logger.warning('OffsetOutOfRange: topic %s, partition %d, ' + 'offset %d (Highwatermark: %d)', + topic, partition, + self.offsets._fetch[(topic, partition)], + resp.highwaterMark) # Reset offset - self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) + self._offsets.fetch[(topic, partition)] = ( + self._reset_partition_offset((topic, partition)) + ) continue except NotLeaderForPartitionError: logger.warning("NotLeaderForPartitionError for %s - %d. " "Metadata may be out of date", - resp.topic, resp.partition) + topic, partition) self._refresh_metadata_on_error() continue except RequestTimedOutError: logger.warning("RequestTimedOutError for %s - %d", - resp.topic, resp.partition) + topic, partition) continue # Track server highwater mark - self._offsets.highwater[topic_partition] = resp.highwaterMark + self._offsets.highwater[(topic, partition)] = resp.highwaterMark # Yield each message # Kafka-python could raise an exception during iteration # we are not catching -- user will need to address for (offset, message) in resp.messages: # deserializer_class could raise an exception here - msg = KafkaMessage(resp.topic, - resp.partition, - offset, message.key, - self._config['deserializer_class'](message.value)) - - if offset < self._offsets.fetch[topic_partition]: - logger.debug('Skipping message %s because its offset is less than the consumer offset', - msg) + val = self._config['deserializer_class'](message.value) + msg = KafkaMessage(topic, partition, offset, message.key, val) + + # in some cases the server will return earlier messages + # than we requested. skip them per kafka spec + if offset < self._offsets.fetch[(topic, partition)]: + logger.debug('message offset less than fetched offset ' + 'skipping: %s', msg) continue # Only increment fetch offset if we safely got the message and deserialized - self._offsets.fetch[topic_partition] = offset + 1 + self._offsets.fetch[(topic, partition)] = offset + 1 # Then yield to user yield msg