From 61172651082365a8d3dc244d531d0c02e888a138 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 00:48:19 -0700 Subject: [PATCH 1/2] Add KafkaClient test to show that request / response ordering is broken --- test/test_client_integration.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 585123ba9..a6ea8f70c 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -2,8 +2,9 @@ from kafka.common import ( FetchRequest, OffsetCommitRequest, OffsetFetchRequest, - KafkaTimeoutError + KafkaTimeoutError, ProduceRequest ) +from kafka.protocol import create_message from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions @@ -49,6 +50,35 @@ def test_ensure_topic_exists(self): with self.assertRaises(KafkaTimeoutError): self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0) + @kafka_versions('all') + def test_send_produce_request_maintains_request_response_order(self): + + self.client.ensure_topic_exists(b'foo', timeout=1) + self.client.ensure_topic_exists(b'bar', timeout=1) + + requests = [ + ProduceRequest( + b'foo', 0, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'bar', 1, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'foo', 1, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'bar', 0, + [create_message(b'a'), create_message(b'b')]), + ] + + responses = self.client.send_produce_request(requests) + while len(responses): + request = requests.pop() + response = responses.pop() + self.assertEqual(request.topic, response.topic) + self.assertEqual(request.partition, response.partition) + + #################### # Offset Tests # #################### From 66b6b4aa6ee7c4461a4e43b2512e76ba3f04230f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 00:48:36 -0700 Subject: [PATCH 2/2] Fix KafkaClient request / response ordering --- kafka/client.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index da8617559..1bd8587b2 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -138,7 +138,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): Arguments: payloads: list of object-like entities with a topic (str) and - partition (int) attribute + partition (int) attribute; payloads with duplicate topic-partitions + are not supported. encode_fn: a method to encode the list of payloads to a request body, must accept client_id, correlation_id, and payloads as @@ -152,6 +153,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): List of response objects in the same order as the supplied payloads """ + # encoders / decoders do not maintain ordering currently + # so we need to keep this so we can rebuild order before returning + original_ordering = [(p.topic, p.partition) for p in payloads] + # Group the requests by topic+partition brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) @@ -165,7 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): # For each broker, send the list of request payloads # and collect the responses and errors - responses_by_broker = collections.defaultdict(list) + responses = {} broker_failures = [] for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() @@ -184,7 +189,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): 'to server %s: %s', requestId, broker, e) for payload in payloads: - responses_by_broker[broker].append(FailedPayloadsError(payload)) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) # No exception, try to get response else: @@ -196,7 +202,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): log.debug('Request %s does not expect a response ' '(skipping conn.recv)', requestId) for payload in payloads: - responses_by_broker[broker].append(None) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = None continue try: @@ -208,12 +215,17 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): requestId, broker, e) for payload in payloads: - responses_by_broker[broker].append(FailedPayloadsError(payload)) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) else: + _resps = [] for payload_response in decoder_fn(response): - responses_by_broker[broker].append(payload_response) - log.debug('Response %s: %s', requestId, responses_by_broker[broker]) + topic_partition = (payload_response.topic, + payload_response.partition) + responses[topic_partition] = payload_response + _resps.append(payload_response) + log.debug('Response %s: %s', requestId, _resps) # Connection errors generally mean stale metadata # although sometimes it means incorrect api request @@ -223,9 +235,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): self.reset_all_metadata() # Return responses in the same order as provided - responses_by_payload = [responses_by_broker[broker].pop(0) - for broker in brokers_for_payloads] - return responses_by_payload + return [responses[tp] for tp in original_ordering] def __repr__(self): return '' % (self.client_id)