diff --git a/async_producer_test.go b/async_producer_test.go index ab762ba48..855d6ae2a 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -314,18 +314,30 @@ func TestAsyncProducerFailureRetry(t *testing.T) { func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { tt := func(t *testing.T, kErr KError) { - seedBroker := NewMockBroker(t, 1) - leader1 := NewMockBroker(t, 2) - leader2 := NewMockBroker(t, 3) + seedBroker := NewMockBroker(t, 0) + broker1 := NewMockBroker(t, 1) + broker2 := NewMockBroker(t, 2) + + mockLeader := func(leaderID int32) *MockMetadataResponse { + return NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). + SetBroker(broker1.Addr(), broker1.BrokerID()). + SetBroker(broker2.Addr(), broker2.BrokerID()). + SetLeader("my_topic", 0, leaderID). + SetLeader("my_topic", 1, leaderID) + } - metadataLeader1 := new(MetadataResponse) - metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) - metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) - metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError) - seedBroker.Returns(metadataLeader1) + seedBroker.SetHandlerByMap( + map[string]MockResponse{ + "MetadataRequest": mockLeader(broker1.BrokerID()), + }, + ) config := NewTestConfig() + config.ClientID = "TestAsyncProducerRecoveryWithRetriesDisabled" config.Producer.Flush.Messages = 2 + config.Producer.Flush.Frequency = 100 * time.Millisecond config.Producer.Return.Successes = true config.Producer.Retry.Max = 0 // disable! config.Producer.Retry.Backoff = 0 @@ -335,33 +347,46 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { t.Fatal(err) } + broker1.SetHandlerByMap( + map[string]MockResponse{ + "MetadataRequest": mockLeader(broker1.BrokerID()), + "ProduceRequest": NewMockProduceResponse(t). + SetError("my_topic", 0, kErr). + SetError("my_topic", 1, kErr), + }, + ) + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1} - prodNotLeader := new(ProduceResponse) - prodNotLeader.AddTopicPartition("my_topic", 0, kErr) - prodNotLeader.AddTopicPartition("my_topic", 1, kErr) - leader1.Returns(prodNotLeader) expectResults(t, producer, 0, 2) - producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} - metadataLeader2 := new(MetadataResponse) - metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) - metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) - metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError) - leader1.Returns(metadataLeader2) - leader1.Returns(metadataLeader2) + seedBroker.SetHandlerByMap( + map[string]MockResponse{ + "MetadataRequest": mockLeader(broker2.BrokerID()), + }, + ) + broker1.SetHandlerByMap( + map[string]MockResponse{ + "MetadataRequest": mockLeader(broker2.BrokerID()), + }, + ) + broker2.SetHandlerByMap( + map[string]MockResponse{ + "MetadataRequest": mockLeader(broker2.BrokerID()), + "ProduceRequest": NewMockProduceResponse(t). + SetError("my_topic", 0, ErrNoError). + SetError("my_topic", 1, ErrNoError), + }, + ) + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1} - prodSuccess := new(ProduceResponse) - prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError) - leader2.Returns(prodSuccess) expectResults(t, producer, 2, 0) - seedBroker.Close() - leader1.Close() - leader2.Close() closeProducer(t, producer) + seedBroker.Close() + broker1.Close() + broker2.Close() } t.Run("retriable error", func(t *testing.T) {