Skip to content

Commit

Permalink
Another attempt to fix failover test flapping
Browse files Browse the repository at this point in the history
  • Loading branch information
Dana Powers committed Sep 5, 2014
1 parent b99a9ee commit b0f8593
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions test/test_failover_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ def tearDownClass(cls):

@kafka_versions("all")
def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
key = random_string(5)
topic = self.topic
partition = 0

# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=False)
producer = Producer(self.client, async=False,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)

# Send 10 random messages
self._send_random_messages(producer, topic, partition, 10)
self._send_random_messages(producer, topic, partition, 100)

# kill leader for partition
broker = self._kill_leader(topic, partition)
Expand All @@ -74,20 +77,19 @@ def test_switch_leader(self):
self.assertTrue(recovered)

# send some more messages to new leader
self._send_random_messages(producer, topic, partition, 10)
self._send_random_messages(producer, topic, partition, 100)

# count number of messages
count = self._count_messages('test_switch_leader group', topic,
partitions=(partition,))

# Should be equal to 10 before + 1 recovery + 10 after
self.assertEquals(count, 21)
self.assert_message_count(topic, 201, partitions=(partition,))


#@kafka_versions("all")
@unittest2.skip("async producer does not support reliable failover yet")
def test_switch_leader_async(self):
key, topic, partition = random_string(5), self.topic, 0
key = random_string(5)
topic = self.topic
partition = 0

# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=True)
Expand All @@ -112,11 +114,8 @@ def test_switch_leader_async(self):
producer.stop()

# count number of messages
count = self._count_messages('test_switch_leader_async group', topic,
partitions=(partition,))

# Should be equal to 10 before + 1 recovery + 10 after
self.assertEquals(count, 21)
count = self.assert_message_count(topic, 21, partitions=(partition,))


def _send_random_messages(self, producer, topic, partition, n):
Expand All @@ -133,17 +132,25 @@ def _kill_leader(self, topic, partition):
broker.close()
return broker

def _count_messages(self, group, topic, timeout=1, partitions=None):
def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers])

client = KafkaClient(hosts)
group = random_string(10)
consumer = SimpleConsumer(client, group, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)

count = consumer.pending(partitions)
started_at = time.time()
pending = consumer.pending(partitions)

# Keep checking if it isn't immediately correct, subject to timeout
while pending != check_count and (time.time() - started_at < timeout):
pending = consumer.pending(partitions)

consumer.stop()
client.close()
return count

self.assertEqual(pending, check_count)

0 comments on commit b0f8593

Please sign in to comment.