Skip to content

Commit

Permalink
Merge pull request #403 from dpkp/client_request_response_ordering
Browse files Browse the repository at this point in the history
Client request response ordering
  • Loading branch information
dpkp committed Jun 10, 2015
2 parents 4c9a3c6 + 66b6b4a commit 8e3cd1c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 11 deletions.
30 changes: 20 additions & 10 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
Arguments:
payloads: list of object-like entities with a topic (str) and
partition (int) attribute
partition (int) attribute; payloads with duplicate topic-partitions
are not supported.
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
Expand All @@ -152,6 +153,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
List of response objects in the same order as the supplied payloads
"""
# encoders / decoders do not maintain ordering currently
# so we need to keep this so we can rebuild order before returning
original_ordering = [(p.topic, p.partition) for p in payloads]

# Group the requests by topic+partition
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
Expand All @@ -165,7 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# For each broker, send the list of request payloads
# and collect the responses and errors
responses_by_broker = collections.defaultdict(list)
responses = {}
broker_failures = []
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
Expand All @@ -184,7 +189,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
'to server %s: %s', requestId, broker, e)

for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)

# No exception, try to get response
else:
Expand All @@ -196,7 +202,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
log.debug('Request %s does not expect a response '
'(skipping conn.recv)', requestId)
for payload in payloads:
responses_by_broker[broker].append(None)
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
continue

try:
Expand All @@ -208,12 +215,17 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
requestId, broker, e)

for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)

else:
_resps = []
for payload_response in decoder_fn(response):
responses_by_broker[broker].append(payload_response)
log.debug('Response %s: %s', requestId, responses_by_broker[broker])
topic_partition = (payload_response.topic,
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
log.debug('Response %s: %s', requestId, _resps)

# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
Expand All @@ -223,9 +235,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
self.reset_all_metadata()

# Return responses in the same order as provided
responses_by_payload = [responses_by_broker[broker].pop(0)
for broker in brokers_for_payloads]
return responses_by_payload
return [responses[tp] for tp in original_ordering]

def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
Expand Down
32 changes: 31 additions & 1 deletion test/test_client_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

from kafka.common import (
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
KafkaTimeoutError
KafkaTimeoutError, ProduceRequest
)
from kafka.protocol import create_message

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
Expand Down Expand Up @@ -49,6 +50,35 @@ def test_ensure_topic_exists(self):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)

@kafka_versions('all')
def test_send_produce_request_maintains_request_response_order(self):

self.client.ensure_topic_exists(b'foo', timeout=1)
self.client.ensure_topic_exists(b'bar', timeout=1)

requests = [
ProduceRequest(
b'foo', 0,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'bar', 1,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'foo', 1,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'bar', 0,
[create_message(b'a'), create_message(b'b')]),
]

responses = self.client.send_produce_request(requests)
while len(responses):
request = requests.pop()
response = responses.pop()
self.assertEqual(request.topic, response.topic)
self.assertEqual(request.partition, response.partition)


####################
# Offset Tests #
####################
Expand Down

0 comments on commit 8e3cd1c

Please sign in to comment.