Skip to content

Commit

Permalink
Merge pull request #1935 from Shopify/ref/heads/dnwe/improve-test
Browse files Browse the repository at this point in the history
fix(test): improve flakey async producer test
  • Loading branch information
dnwe authored May 5, 2021
2 parents 66d5c44 + 5e1dfc7 commit 9d205e2
Showing 1 changed file with 51 additions and 26 deletions.
77 changes: 51 additions & 26 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,18 +314,30 @@ func TestAsyncProducerFailureRetry(t *testing.T) {

func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
tt := func(t *testing.T, kErr KError) {
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)
seedBroker := NewMockBroker(t, 0)
broker1 := NewMockBroker(t, 1)
broker2 := NewMockBroker(t, 2)

mockLeader := func(leaderID int32) *MockMetadataResponse {
return NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(broker1.Addr(), broker1.BrokerID()).
SetBroker(broker2.Addr(), broker2.BrokerID()).
SetLeader("my_topic", 0, leaderID).
SetLeader("my_topic", 1, leaderID)
}

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader1)
seedBroker.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker1.BrokerID()),
},
)

config := NewTestConfig()
config.ClientID = "TestAsyncProducerRecoveryWithRetriesDisabled"
config.Producer.Flush.Messages = 2
config.Producer.Flush.Frequency = 100 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 0 // disable!
config.Producer.Retry.Backoff = 0
Expand All @@ -335,33 +347,46 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
t.Fatal(err)
}

broker1.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker1.BrokerID()),
"ProduceRequest": NewMockProduceResponse(t).
SetError("my_topic", 0, kErr).
SetError("my_topic", 1, kErr),
},
)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
leader1.Returns(prodNotLeader)
expectResults(t, producer, 0, 2)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
metadataLeader2 := new(MetadataResponse)
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
leader1.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
seedBroker.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker2.BrokerID()),
},
)
broker1.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker2.BrokerID()),
},
)
broker2.SetHandlerByMap(
map[string]MockResponse{
"MetadataRequest": mockLeader(broker2.BrokerID()),
"ProduceRequest": NewMockProduceResponse(t).
SetError("my_topic", 0, ErrNoError).
SetError("my_topic", 1, ErrNoError),
},
)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
leader2.Returns(prodSuccess)
expectResults(t, producer, 2, 0)

seedBroker.Close()
leader1.Close()
leader2.Close()
closeProducer(t, producer)
seedBroker.Close()
broker1.Close()
broker2.Close()
}

t.Run("retriable error", func(t *testing.T) {
Expand Down

0 comments on commit 9d205e2

Please sign in to comment.