diff --git a/admin.go b/admin.go index 287e3dee6..3b8cb61e7 100644 --- a/admin.go +++ b/admin.go @@ -452,7 +452,6 @@ func dependsOnSpecificNode(resource ConfigResource) bool { } func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) { - var entries []ConfigEntry var resources []*ConfigResource resources = append(resources, &resource) @@ -461,6 +460,14 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, Resources: resources, } + if ca.conf.Version.IsAtLeast(V1_1_0_0) { + request.Version = 1 + } + + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 2 + } + var ( b *Broker err error diff --git a/admin_test.go b/admin_test.go index 92a5415c0..4ac9ffc72 100644 --- a/admin_test.go +++ b/admin_test.go @@ -2,10 +2,6 @@ package sarama import ( "errors" - "fmt" - "io/ioutil" - "log" - "os" "strings" "testing" ) @@ -492,35 +488,64 @@ func TestClusterAdminDescribeConfig(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) - config := NewConfig() - config.Version = V1_0_0_0 - admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) - if err != nil { - t.Fatal(err) - } + var tests = []struct { + saramaVersion KafkaVersion + requestVersion int16 + includeSynonyms bool + }{ + {V1_0_0_0, 0, false}, + {V1_1_0_0, 1, true}, + {V1_1_1_0, 1, true}, + {V2_0_0_0, 2, true}, + } + for _, tt := range tests { + config := NewConfig() + config.Version = tt.saramaVersion + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = admin.Close() + }() + + resource := ConfigResource{ + Name: "r1", + Type: TopicResource, + ConfigNames: []string{"my_topic"}, + } - resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}} - entries, err := admin.DescribeConfig(resource) - if err != nil { - t.Fatal(err) - } + entries, err := admin.DescribeConfig(resource) + if err != nil { + t.Fatal(err) + } - if len(entries) <= 0 { - t.Fatal(errors.New("no resource present")) - } + history := seedBroker.History() + describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest) + if !ok { + t.Fatal("failed to find DescribeConfigsRequest in mockBroker history") + } - err = admin.Close() - if err != nil { - t.Fatal(err) + if describeReq.Version != tt.requestVersion { + t.Fatalf( + "requestVersion %v did not match expected %v", + describeReq.Version, tt.requestVersion) + } + + if len(entries) <= 0 { + t.Fatal(errors.New("no resource present")) + } + if tt.includeSynonyms { + if len(entries[0].Synonyms) == 0 { + t.Fatal("expected synonyms to have been included") + } + } } } // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config // is sent to the broker in the resource struct, _not_ the controller func TestClusterAdminDescribeBrokerConfig(t *testing.T) { - Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags) - defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }() - controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) diff --git a/mockresponses.go b/mockresponses.go index 72df4b363..984e5aaca 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -731,7 +731,11 @@ func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*DescribeConfigsRequest) - res := &DescribeConfigsResponse{} + res := &DescribeConfigsResponse{ + Version: req.Version, + } + + includeSynonyms := (req.Version > 0) for _, r := range req.Resources { var configEntries []*ConfigEntry @@ -763,23 +767,42 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { Configs: configEntries, }) case TopicResource: - configEntries = append(configEntries, - &ConfigEntry{Name: "max.message.bytes", - Value: "1000000", - ReadOnly: false, - Default: true, - Sensitive: false, - }, &ConfigEntry{Name: "retention.ms", - Value: "5000", - ReadOnly: false, - Default: false, - Sensitive: false, - }, &ConfigEntry{Name: "password", - Value: "12345", - ReadOnly: false, - Default: false, - Sensitive: true, - }) + maxMessageBytes := &ConfigEntry{Name: "max.message.bytes", + Value: "1000000", + ReadOnly: false, + Default: true, + Sensitive: false, + } + if includeSynonyms { + maxMessageBytes.Synonyms = []*ConfigSynonym{ + { + ConfigName: "max.message.bytes", + ConfigValue: "500000", + }, + } + } + retentionMs := &ConfigEntry{Name: "retention.ms", + Value: "5000", + ReadOnly: false, + Default: false, + Sensitive: false, + } + if includeSynonyms { + retentionMs.Synonyms = []*ConfigSynonym{ + { + ConfigName: "log.retention.ms", + ConfigValue: "2500", + }, + } + } + password := &ConfigEntry{Name: "password", + Value: "12345", + ReadOnly: false, + Default: false, + Sensitive: true, + } + configEntries = append( + configEntries, maxMessageBytes, retentionMs, password) res.Resources = append(res.Resources, &ResourceResponse{ Name: r.Name, Configs: configEntries,