Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer topic/partition fixes #365

Merged
merged 4 commits into from
Apr 6, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions kafka/consumer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ def set_topic_partitions(self, *topics):
elif isinstance(arg, tuple):
topic = kafka_bytestring(arg[0])
partition = arg[1]
self._consume_topic_partition(topic, partition)
if len(arg) == 3:
offset = arg[2]
self._offsets.fetch[(topic, partition)] = offset
self._consume_topic_partition(topic, partition)

# { topic: partitions, ... } dict
elif isinstance(arg, dict):
Expand All @@ -224,7 +224,7 @@ def set_topic_partitions(self, *topics):
topic = kafka_bytestring(key[0])
partition = key[1]
self._consume_topic_partition(topic, partition)
self._offsets.fetch[key] = value
self._offsets.fetch[(topic, partition)] = value

else:
raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
Expand Down Expand Up @@ -312,16 +312,16 @@ def fetch_messages(self):
max_wait_time = self._config['fetch_wait_max_ms']
min_bytes = self._config['fetch_min_bytes']

# Get current fetch offsets
offsets = self._offsets.fetch
if not offsets:
if not self._topics:
raise KafkaConfigurationError('No topics or partitions configured')
if not self._topics:
raise KafkaConfigurationError('No topics or partitions configured')

if not self._offsets.fetch:
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')

fetches = []
for topic_partition, offset in six.iteritems(offsets):
fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
fetches = [FetchRequest(topic, partition,
self._offsets.fetch[(topic, partition)],
max_bytes)
for (topic, partition) in self._topics]

# client.send_fetch_request will collect topic/partition requests by leader
# and send each group as a single FetchRequest to the correct broker
Expand All @@ -336,49 +336,53 @@ def fetch_messages(self):
return

for resp in responses:
topic_partition = (resp.topic, resp.partition)
topic = kafka_bytestring(resp.topic)
partition = resp.partition
try:
check_error(resp)
except OffsetOutOfRangeError:
logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
'(Highwatermark: %d)',
resp.topic, resp.partition,
offsets[topic_partition], resp.highwaterMark)
logger.warning('OffsetOutOfRange: topic %s, partition %d, '
'offset %d (Highwatermark: %d)',
topic, partition,
self.offsets._fetch[(topic, partition)],
resp.highwaterMark)
# Reset offset
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
self._offsets.fetch[(topic, partition)] = (
self._reset_partition_offset((topic, partition))
)
continue

except NotLeaderForPartitionError:
logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
resp.topic, resp.partition)
topic, partition)
self._refresh_metadata_on_error()
continue

except RequestTimedOutError:
logger.warning("RequestTimedOutError for %s - %d",
resp.topic, resp.partition)
topic, partition)
continue

# Track server highwater mark
self._offsets.highwater[topic_partition] = resp.highwaterMark
self._offsets.highwater[(topic, partition)] = resp.highwaterMark

# Yield each message
# Kafka-python could raise an exception during iteration
# we are not catching -- user will need to address
for (offset, message) in resp.messages:
# deserializer_class could raise an exception here
msg = KafkaMessage(resp.topic,
resp.partition,
offset, message.key,
self._config['deserializer_class'](message.value))

if offset < self._offsets.fetch[topic_partition]:
logger.debug('Skipping message %s because its offset is less than the consumer offset',
msg)
val = self._config['deserializer_class'](message.value)
msg = KafkaMessage(topic, partition, offset, message.key, val)

# in some cases the server will return earlier messages
# than we requested. skip them per kafka spec
if offset < self._offsets.fetch[(topic, partition)]:
logger.debug('message offset less than fetched offset '
'skipping: %s', msg)
continue
# Only increment fetch offset if we safely got the message and deserialized
self._offsets.fetch[topic_partition] = offset + 1
self._offsets.fetch[(topic, partition)] = offset + 1

# Then yield to user
yield msg
Expand Down