Skip to content

Commit

Permalink
fix(test): improve flakey async producer test
Browse files Browse the repository at this point in the history
The TestAsyncProducerRecoveryWithRetriesDisabled periodically fails and
then passes on a re-run. Stepping through it I believe this was because
the test had assumed metadata requests would always go to the
seed/bootstrap broker, but they can actually go to any broker. So there
was a 1 in 3 chance that the test would fail because it would await
timeout of metadata requests that were never going to be responded to.

Rewriting the test to use the handler map capability rather than the
expectations interface and configuring the mock metadata responses on
all mock brokers allows the test to pass quickly and reliably.
  • Loading branch information
dnwe committed May 5, 2021
1 parent ae85104 commit 5e1dfc7
Showing 1 changed file with 51 additions and 26 deletions.
77 changes: 51 additions & 26 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,18 +314,30 @@ func TestAsyncProducerFailureRetry(t *testing.T) {

func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
tt := func(t *testing.T, kErr KError) {
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)
seedBroker := NewMockBroker(t, 0)
broker1 := NewMockBroker(t, 1)
broker2 := NewMockBroker(t, 2)

mockLeader := func(leaderID int32) *MockMetadataResponse {
return NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(broker1.Addr(), broker1.BrokerID()).
SetBroker(broker2.Addr(), broker2.BrokerID()).
SetLeader("my_topic", 0, leaderID).
SetLeader("my_topic", 1, leaderID)
}

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader1)
seedBroker.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker1.BrokerID()),
},
)

config := NewTestConfig()
config.ClientID = "TestAsyncProducerRecoveryWithRetriesDisabled"
config.Producer.Flush.Messages = 2
config.Producer.Flush.Frequency = 100 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 0 // disable!
config.Producer.Retry.Backoff = 0
Expand All @@ -335,33 +347,46 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
t.Fatal(err)
}

broker1.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker1.BrokerID()),
"ProduceRequest": NewMockProduceResponse(t).
SetError("my_topic", 0, kErr).
SetError("my_topic", 1, kErr),
},
)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
leader1.Returns(prodNotLeader)
expectResults(t, producer, 0, 2)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
metadataLeader2 := new(MetadataResponse)
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
leader1.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
seedBroker.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker2.BrokerID()),
},
)
broker1.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker2.BrokerID()),
},
)
broker2.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker2.BrokerID()),
"ProduceRequest": NewMockProduceResponse(t).
SetError("my_topic", 0, ErrNoError).
SetError("my_topic", 1, ErrNoError),
},
)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
leader2.Returns(prodSuccess)
expectResults(t, producer, 2, 0)

seedBroker.Close()
leader1.Close()
leader2.Close()
closeProducer(t, producer)
seedBroker.Close()
broker1.Close()
broker2.Close()
}

t.Run("retriable error", func(t *testing.T) {
Expand Down

0 comments on commit 5e1dfc7

Please sign in to comment.