From 5de8dba37c3a7c131d24d99ea9fb7cf38c19e0c0 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 15 Jan 2020 22:36:57 +0000 Subject: [PATCH 1/2] fix: set DescribeConfigRequest Version field Although fbd8338 had added protocol support for DescribeConfigsRequest v1 and v2, nothing in the admin client was actually setting the Version field to utilise these. Set the Version based on the selected sarama.Config version. Signed-off-by: Dominic Evans --- admin.go | 9 ++++++- admin_test.go | 66 +++++++++++++++++++++++++++++++++++------------- mockresponses.go | 59 ++++++++++++++++++++++++++++++------------- 3 files changed, 98 insertions(+), 36 deletions(-) diff --git a/admin.go b/admin.go index 287e3dee6..3b8cb61e7 100644 --- a/admin.go +++ b/admin.go @@ -452,7 +452,6 @@ func dependsOnSpecificNode(resource ConfigResource) bool { } func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) { - var entries []ConfigEntry var resources []*ConfigResource resources = append(resources, &resource) @@ -461,6 +460,14 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, Resources: resources, } + if ca.conf.Version.IsAtLeast(V1_1_0_0) { + request.Version = 1 + } + + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 2 + } + var ( b *Broker err error diff --git a/admin_test.go b/admin_test.go index 92a5415c0..24f387847 100644 --- a/admin_test.go +++ b/admin_test.go @@ -492,26 +492,58 @@ func TestClusterAdminDescribeConfig(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) - config := NewConfig() - config.Version = V1_0_0_0 - admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) - if err != nil { - t.Fatal(err) - } + var tests = []struct { + saramaVersion KafkaVersion + requestVersion int16 + includeSynonyms bool + }{ + {V1_0_0_0, 0, false}, + {V1_1_0_0, 1, true}, + {V1_1_1_0, 1, true}, + {V2_0_0_0, 2, true}, + } + for _, tt := range tests { + config := NewConfig() + config.Version = tt.saramaVersion + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = admin.Close() + }() + + resource := ConfigResource{ + Name: "r1", + Type: TopicResource, + ConfigNames: []string{"my_topic"}, + } - resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}} - entries, err := admin.DescribeConfig(resource) - if err != nil { - t.Fatal(err) - } + entries, err := admin.DescribeConfig(resource) + if err != nil { + t.Fatal(err) + } - if len(entries) <= 0 { - t.Fatal(errors.New("no resource present")) - } + history := seedBroker.History() + describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest) + if !ok { + t.Fatal("failed to find DescribeConfigsRequest in mockBroker history") + } - err = admin.Close() - if err != nil { - t.Fatal(err) + if describeReq.Version != tt.requestVersion { + t.Fatalf( + "requestVersion %v did not match expected %v", + describeReq.Version, tt.requestVersion) + } + + if len(entries) <= 0 { + t.Fatal(errors.New("no resource present")) + } + if tt.includeSynonyms { + if len(entries[0].Synonyms) == 0 { + t.Fatal("expected synonyms to have been included") + } + } } } diff --git a/mockresponses.go b/mockresponses.go index 72df4b363..984e5aaca 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -731,7 +731,11 @@ func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*DescribeConfigsRequest) - res := &DescribeConfigsResponse{} + res := &DescribeConfigsResponse{ + Version: req.Version, + } + + includeSynonyms := (req.Version > 0) for _, r := range req.Resources { var configEntries []*ConfigEntry @@ -763,23 +767,42 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { Configs: configEntries, }) case TopicResource: - configEntries = append(configEntries, - &ConfigEntry{Name: "max.message.bytes", - Value: "1000000", - ReadOnly: false, - Default: true, - Sensitive: false, - }, &ConfigEntry{Name: "retention.ms", - Value: "5000", - ReadOnly: false, - Default: false, - Sensitive: false, - }, &ConfigEntry{Name: "password", - Value: "12345", - ReadOnly: false, - Default: false, - Sensitive: true, - }) + maxMessageBytes := &ConfigEntry{Name: "max.message.bytes", + Value: "1000000", + ReadOnly: false, + Default: true, + Sensitive: false, + } + if includeSynonyms { + maxMessageBytes.Synonyms = []*ConfigSynonym{ + { + ConfigName: "max.message.bytes", + ConfigValue: "500000", + }, + } + } + retentionMs := &ConfigEntry{Name: "retention.ms", + Value: "5000", + ReadOnly: false, + Default: false, + Sensitive: false, + } + if includeSynonyms { + retentionMs.Synonyms = []*ConfigSynonym{ + { + ConfigName: "log.retention.ms", + ConfigValue: "2500", + }, + } + } + password := &ConfigEntry{Name: "password", + Value: "12345", + ReadOnly: false, + Default: false, + Sensitive: true, + } + configEntries = append( + configEntries, maxMessageBytes, retentionMs, password) res.Resources = append(res.Resources, &ResourceResponse{ Name: r.Name, Configs: configEntries, From 9ba6d8e14f84c29788eb1e0cf8020c396a621182 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 16 Jan 2020 10:31:44 +0000 Subject: [PATCH 2/2] fix: remove Logger inadvertently left in test This Logger modification would occasionally hit a data racein Travis: ``` WARNING: DATA RACE Read at 0x00000130ced0 by goroutine 60: github.com/Shopify/sarama.(*Broker).Close() /home/travis/gopath/src/github.com/Shopify/sarama/broker.go:253 +0x4f0 github.com/Shopify/sarama.safeAsyncClose.func1() /home/travis/gopath/src/github.com/Shopify/sarama/utils.go:52 +0xa3 github.com/Shopify/sarama.withRecover() /home/travis/gopath/src/github.com/Shopify/sarama/utils.go:45 +0x84 Previous write at 0x00000130ced0 by goroutine 75: github.com/Shopify/sarama.TestClusterAdminDescribeBrokerConfig() /home/travis/gopath/src/github.com/Shopify/sarama/admin_test.go:553 +0x220 testing.tRunner() /home/travis/.gimme/versions/go1.12.15.linux.amd64/src/testing/testing.go:865 +0x163 ``` It had inadvertently been included in the test, so just remove it --- admin_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/admin_test.go b/admin_test.go index 24f387847..4ac9ffc72 100644 --- a/admin_test.go +++ b/admin_test.go @@ -2,10 +2,6 @@ package sarama import ( "errors" - "fmt" - "io/ioutil" - "log" - "os" "strings" "testing" ) @@ -550,9 +546,6 @@ func TestClusterAdminDescribeConfig(t *testing.T) { // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config // is sent to the broker in the resource struct, _not_ the controller func TestClusterAdminDescribeBrokerConfig(t *testing.T) { - Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags) - defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }() - controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2)