Skip to content

Commit

Permalink
Add KafkaClient test to show that request / response ordering is broken
Browse files Browse the repository at this point in the history
  • Loading branch information
Dana Powers committed Jun 10, 2015
1 parent 4c9a3c6 commit 6117265
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion test/test_client_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

from kafka.common import (
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
KafkaTimeoutError
KafkaTimeoutError, ProduceRequest
)
from kafka.protocol import create_message

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
Expand Down Expand Up @@ -49,6 +50,35 @@ def test_ensure_topic_exists(self):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)

@kafka_versions('all')
def test_send_produce_request_maintains_request_response_order(self):

self.client.ensure_topic_exists(b'foo', timeout=1)
self.client.ensure_topic_exists(b'bar', timeout=1)

requests = [
ProduceRequest(
b'foo', 0,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'bar', 1,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'foo', 1,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'bar', 0,
[create_message(b'a'), create_message(b'b')]),
]

responses = self.client.send_produce_request(requests)
while len(responses):
request = requests.pop()
response = responses.pop()
self.assertEqual(request.topic, response.topic)
self.assertEqual(request.partition, response.partition)


####################
# Offset Tests #
####################
Expand Down

0 comments on commit 6117265

Please sign in to comment.