From fcf06ea1fe8c432c0ca492e6d16eb37159e59e67 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Tue, 31 May 2022 12:32:02 +0200 Subject: [PATCH 01/11] Updated Sarama to 1.34.0 (#181) Signed-off-by: Paolo Patierno --- CHANGELOG.md | 2 +- go.mod | 2 +- go.sum | 21 +++++++++------------ 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe8f4fd..cb4a4d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ * Allow to change canary logging verbosity and enable/disable Sarama logger at runtime (#166) * Added Prometheus support via `PodMonitor` and sample Grafana dashboard for exposed metrics * Treat ETIMEDOUT (TCP keep-alive failure) as a disconnection condition too (#159) -* Updated dependency to Sarama 1.33.0 (#176) +* Updated dependency to Sarama 1.34.0 (#180) * Updated default Kafka version used by Sarama library to 3.1.0 ## 0.2.0 diff --git a/go.mod b/go.mod index f6802bb..97f967b 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/strimzi/strimzi-canary go 1.13 require ( - github.com/Shopify/sarama v1.33.0 + github.com/Shopify/sarama v1.34.0 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/prometheus/client_golang v1.8.0 github.com/xdg-go/scram v1.1.1 diff --git a/go.sum b/go.sum index 6ffbd9f..192c438 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= -github.com/Shopify/sarama v1.33.0 h1:2K4mB9M4fo46sAM7t6QTsmSO8dLX1OqznLM7vn3OjZ8= -github.com/Shopify/sarama v1.33.0/go.mod h1:lYO7LwEBkE0iAeTl94UfPSrDaavFzSFlmn+5isARATQ= +github.com/Shopify/sarama v1.34.0 h1:j4zTaFHFnfvuV2fdLZyXqIg0Tu4Mzl9f064Z5/H+o4o= +github.com/Shopify/sarama v1.34.0/go.mod h1:V2ceE9UupUf4/oP1Z38SI49fAnD0/MtkqDDHvolIeeQ= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= @@ -66,8 +66,6 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= -github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns= -github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -106,9 +104,8 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= -github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -241,9 +238,10 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= -github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE= +github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -371,8 +369,8 @@ golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y= +golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -431,9 +429,8 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= From 4671a13b3f8d3a00848d23a9bc74a7b675bab40a Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Wed, 1 Jun 2022 16:15:33 +0200 Subject: [PATCH 02/11] Fixed consumer fetch max wait time due to Sarama potential bug (#183) Signed-off-by: Paolo Patierno --- cmd/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/main.go b/cmd/main.go index 0fb6a09..ca96c2d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -36,6 +36,7 @@ var ( ) var saramaLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags) + func main() { // get canary configuration canaryConfig := config.NewCanaryConfig() @@ -98,7 +99,9 @@ func newClient(canaryConfig *config.CanaryConfig) (sarama.Client, error) { config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 0 config.Consumer.Return.Errors = true - + // this Sarama fix https://github.com/Shopify/sarama/pull/2227 increases the canary e2e latency + // it shows a potential bug in Sarama. We revert the value back here while waiting for a Sarama fix + config.Consumer.MaxWaitTime = 250 * time.Millisecond if canaryConfig.TLSEnabled { config.Net.TLS.Enable = true From de5514772b8077faea87335d74c0bfa4f5b5956f Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Sun, 5 Jun 2022 16:05:36 +0200 Subject: [PATCH 03/11] Updated CHANGELOG with missing fix for 0.3.0 (#185) * Updated CHANGELOG with missing fix for 0.3.0 Signed-off-by: Paolo Patierno * Used suggested text Signed-off-by: Paolo Patierno --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb4a4d7..25e48cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * Treat ETIMEDOUT (TCP keep-alive failure) as a disconnection condition too (#159) * Updated dependency to Sarama 1.34.0 (#180) * Updated default Kafka version used by Sarama library to 3.1.0 +* Use 250ms as consumer fetch max wait timeout with Sarama 1.34.0 (#184) ## 0.2.0 From b69d21244b01aa3f725b60cc75c4b9bc0f8f6af0 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Sun, 5 Jun 2022 16:40:31 +0200 Subject: [PATCH 04/11] Added clarification on not enough samples in status time window (#182) * Added clarification on not enough samples in status time window Signed-off-by: Paolo Patierno * Update README.md Co-authored-by: PaulRMellor <47596553+PaulRMellor@users.noreply.github.com> Co-authored-by: PaulRMellor <47596553+PaulRMellor@users.noreply.github.com> --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 4c5ce96..69bfc59 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,8 @@ The `Consuming` field provides information about the `Percentage` of messages co } ``` +If the time window has not ended, the `/status` endpoint cannot report a percentage of correctly consumed messages. Instead, it returns `Percentage: -1`. The canary also logs `Error processing consumed records percentage: No data samples available in the time window ring`. In this case, you wait until the time window has ended for the sampling to complete. + ## Metrics In order to check how your Apache Kafka cluster is behaving, the Canary provides the following metrics on the corresponding HTTP endpoint. From b89af57d02a444fb369bce2ebe6185ac59cdac13 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Sun, 5 Jun 2022 16:44:55 +0200 Subject: [PATCH 05/11] Updated installation file to new 0.3.0 release (#186) Signed-off-by: Paolo Patierno --- install/020-Deployment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/install/020-Deployment.yaml b/install/020-Deployment.yaml index 899ff4e..6e1f14b 100644 --- a/install/020-Deployment.yaml +++ b/install/020-Deployment.yaml @@ -17,7 +17,7 @@ spec: serviceAccountName: strimzi-canary containers: - name: strimzi-canary - image: quay.io/strimzi/canary:0.2.0 + image: quay.io/strimzi/canary:0.3.0 env: - name: KAFKA_BOOTSTRAP_SERVERS value: my-cluster-kafka-bootstrap:9092 From 2302fe6aeeeada5fadcf647c0030e6ef7e1174c4 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Sun, 5 Jun 2022 16:49:13 +0200 Subject: [PATCH 06/11] Moving to 0.4.0-SNAPSHOT release (#187) Signed-off-by: Paolo Patierno --- release.version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release.version b/release.version index e09dd94..9b3788c 100644 --- a/release.version +++ b/release.version @@ -1 +1 @@ -0.3.0-SNAPSHOT \ No newline at end of file +0.4.0-SNAPSHOT \ No newline at end of file From a5d72fb51135c20422f81baaa0c09b0fdb8f7ae5 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 15 Jun 2022 17:40:56 +0100 Subject: [PATCH 07/11] fix: have requestedassignment respect rack (#191) * fix: have requestedassignment respect rack (#190) Signed-off-by: kwall * typo in test error message. Signed-off-by: kwall * describe algorithm to produce the rack alternated broker list Signed-off-by: kwall --- internal/services/topic.go | 71 +++++++++++-- internal/services/topic_test.go | 176 ++++++++++++++++++++++++++++++++ 2 files changed, 239 insertions(+), 8 deletions(-) create mode 100644 internal/services/topic_test.go diff --git a/internal/services/topic.go b/internal/services/topic.go index e99c1aa..ea66c8f 100644 --- a/internal/services/topic.go +++ b/internal/services/topic.go @@ -219,10 +219,12 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { func (ts *TopicService) Close() { glog.Infof("Closing topic service") - if err := ts.admin.Close(); err != nil { - glog.Fatalf("Error closing the Sarama cluster admin: %v", err) + if ts.admin != nil { + if err := ts.admin.Close(); err != nil { + glog.Fatalf("Error closing the Sarama cluster admin: %v", err) + } + ts.admin = nil } - ts.admin = nil glog.Infof("Topic service closed") } @@ -312,14 +314,67 @@ func (ts *TopicService) requestedAssignments(currentPartitions int, brokers []*s return brokers[i].ID() < brokers[j].ID() }) - assignments := make(map[int32][]int32, int(partitions)) + // now adjust the broker ordering to produce a rack alternated list. + // if the brokers have no rack information (or only some brokers have it) this next part has no effect. + // + // for example: + // assuming: 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1" + // rackMap contains: + // rack1: {0, 5} + // rack2: {3, 4} + // rack3: {1, 2} + // rack alternated broker list: + // 0, 3, 1, 5, 4, 2 + + rackMap := make(map[string][]*sarama.Broker) + var rackNames []string + brokersWithRack := 0 + for _, broker := range brokers { + if broker.Rack() != "" { + brokersWithRack++ + if _, ok := rackMap[broker.Rack()]; !ok { + rackMap[broker.Rack()] = make([]*sarama.Broker, 0) + rackNames = append(rackNames, broker.Rack()) + } + rackMap[broker.Rack()] = append(rackMap[broker.Rack()], broker) + } + } + + if len(brokers) != brokersWithRack { + if brokersWithRack > 0 { + glog.Warningf("Not *all* brokers have rack assignments (%d/%d), topic %s will be created without rack awareness", brokersWithRack, len(brokers), ts.canaryConfig.Topic) + } + } else { + index := 0 + + for ;; { + again := false + + for _, rackName := range rackNames { + brokerList := rackMap[rackName] + if len(brokerList) > 0 { + var head *sarama.Broker + head, rackMap[rackName] = brokerList[0], brokerList[1:] + brokers[index] = head + index++ + again = true + } + } + + if !again { + break + } + } + } + + assignments := make(map[int32][]int32, partitions) for p := 0; p < int(partitions); p++ { - assignments[int32(p)] = make([]int32, int(replicationFactor)) + assignments[int32(p)] = make([]int32, replicationFactor) k := p - for r := 0; r < int(replicationFactor); r++ { + for r := 0; r < replicationFactor; r++ { // get brokers ID for assignment from the brokers list and not using // just a monotonic increasing index because there could be "hole" (a broker down) - assignments[int32(p)][r] = brokers[int32(k%int(brokersNumber))].ID() + assignments[int32(p)][r] = brokers[int32(k%brokersNumber)].ID() k++ } } @@ -338,7 +393,7 @@ func (ts *TopicService) currentAssignments(topicMetadata *sarama.TopicMetadata) // Alter the replica assignment for the partitions // -// After the request for the replica assignement, it run a loop for checking if the reassignment is still ongoing +// After the request for the replica assignment, it run a loop for checking if the reassignment is still ongoing // It returns when the reassignment is done or there is an error func (ts *TopicService) alterAssignments(assignments [][]int32) error { if err := ts.admin.AlterPartitionReassignments(ts.canaryConfig.Topic, assignments); err != nil { diff --git a/internal/services/topic_test.go b/internal/services/topic_test.go new file mode 100644 index 0000000..32ed2f7 --- /dev/null +++ b/internal/services/topic_test.go @@ -0,0 +1,176 @@ +// +// Copyright Strimzi authors. +// License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). +// + +// +build unit_test + +// Package services defines an interface for canary services and related implementations + +package services + +import ( + "fmt" + "github.com/Shopify/sarama" + "github.com/strimzi/strimzi-canary/internal/config" + "math/rand" + "reflect" + "testing" + "time" + "unsafe" +) + +func TestRequestedAssignments(t *testing.T) { + var tests = []struct { + name string + numPartitions int + numBrokers int + useRack bool + brokersWithMultipleLeaders []int32 + expectedMinISR int + }{ + {"one broker", 1, 1, false, []int32{}, 1}, + {"three brokers without rack info", 3, 3, false, []int32{}, 2}, + {"fewer brokers than partitions", 3, 2, false, []int32{0}, 1}, + {"six brokers with rack info", 6, 6, true, []int32{}, 2}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &config.CanaryConfig{ + Topic: "test", + } + + brokers, brokerMap := createBrokers(t, tt.numBrokers, tt.useRack) + + ts := NewTopicService(cfg, nil) + + assignments, minISR := ts.requestedAssignments(tt.numPartitions, brokers) + + if tt.expectedMinISR != minISR { + t.Errorf("unexpected minISR, got = %d, want = %d", minISR, tt.expectedMinISR) + } + + for _, brokerIds := range assignments { + duplicatedBrokers := make(map[int32]int) + + for _, brokerId := range brokerIds { + if _, ok := duplicatedBrokers[brokerId]; !ok { + duplicatedBrokers[brokerId] = 1 + } else { + duplicatedBrokers[brokerId] = duplicatedBrokers[brokerId] + 1 + } + + } + + for brokerId, count := range duplicatedBrokers { + if count > 1 { + t.Errorf("partition is replicated on same broker (%d) more than once (%d)", brokerId, count) + } + } + } + + leaderBrokers := make(map[int32]int) + for _, brokerIds := range assignments { + + leaderBrokerId := brokerIds[0] + if _, ok := leaderBrokers[leaderBrokerId]; !ok { + leaderBrokers[leaderBrokerId] = 1 + } else { + leaderBrokers[leaderBrokerId] = leaderBrokers[leaderBrokerId] + 1 + } + } + + for brokerId, count := range leaderBrokers { + if count > 1 { + found := false + for _, expectedBrokerId := range tt.brokersWithMultipleLeaders { + if expectedBrokerId == brokerId { + found = true + break + } + } + + if !found { + t.Errorf("broker %d is leader for more than one partition (%d)", brokerId, count) + } + } + } + + if tt.useRack { + for i, brokerIds := range assignments { + rackBrokerId := make(map[string][]int32) + for _, brokerId := range brokerIds { + broker := brokerMap[brokerId] + _, ok := rackBrokerId[broker.Rack()] + if !ok { + rackBrokerId[broker.Rack()] = make([]int32, 0) + } + rackBrokerId[broker.Rack()] = append(rackBrokerId[broker.Rack()], broker.ID()) + } + + for rack, brokerIds := range rackBrokerId { + if len(brokerIds) > 1 { + t.Errorf("partition %d has been assigned to %d brokers %v in rackBrokerId %s", i, len(brokerIds), brokerIds, rack) + } + } + } + } + + ts.Close() + }) + } + +} + +func createBrokers(t *testing.T, num int, rack bool) ([]*sarama.Broker, map[int32]*sarama.Broker) { + brokers := make([]*sarama.Broker, 0) + brokerMap := make(map[int32]*sarama.Broker) + for i := 0; i < num ; i++ { + broker := &sarama.Broker{} + + setBrokerID(t, broker, i) + + brokers = append(brokers, broker) + brokerMap[broker.ID()] = broker + } + + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(brokers), func(i, j int) { brokers[i], brokers[j] = brokers[j], brokers[i] }) + + if rack { + rackNames := make([]string, 3) + for i, _ := range rackNames { + rackNames[i] = fmt.Sprintf("useRack%d", i) + } + + for i, broker := range brokers { + rackName := rackNames[i%3] + setBrokerRack(t, broker, rackName) + } + } + + return brokers, brokerMap +} + +func setBrokerID(t *testing.T, broker *sarama.Broker, i int) { + idV := reflect.ValueOf(broker).Elem().FieldByName("id") + setUnexportedField(idV, int32(i)) + if int32(i) != broker.ID() { + t.Errorf("failed to set id by reflection") + } +} + +func setBrokerRack(t *testing.T, broker *sarama.Broker, rackName string) { + rackV := reflect.ValueOf(broker).Elem().FieldByName("rack") + setUnexportedField(rackV, &rackName) + if rackName != broker.Rack() { + t.Errorf("failed to set useRack by reflection") + } +} + +func setUnexportedField(field reflect.Value, value interface{}) { + reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())). + Elem(). + Set(reflect.ValueOf(value)) +} From 6f75785ca2a28b79df37a19b4dbcafd53909fa97 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Thu, 16 Jun 2022 00:15:41 +0200 Subject: [PATCH 08/11] Update CHANGELOG with issue #190 in 0.4.0 (#192) Signed-off-by: Paolo Patierno --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25e48cb..cda3cd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # CHANGELOG +## 0.4.0 + +* Fixed replicas assignment taking rack configuration into account (#190) + ## 0.3.0 * Forced cleanup.policy to "delete" for the canary topic (#173) From e21cfb03df98840def617d9f5eb72bd4b68f6994 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Sun, 26 Jun 2022 22:10:00 +0100 Subject: [PATCH 09/11] include milliseconds is sarama debug log (#189) Signed-off-by: kwall --- cmd/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/main.go b/cmd/main.go index ca96c2d..7f7f757 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -35,7 +35,7 @@ var ( }, nil) ) -var saramaLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags) +var saramaLogger = log.New(io.Discard, "[Sarama] ", log.Ldate | log.Lmicroseconds) func main() { // get canary configuration From 07a6ce314618bafa7b7a9bb8197d6bcce1d26569 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 29 Jun 2022 09:17:41 +0100 Subject: [PATCH 10/11] fix: #188 use separate sarama clients for producer and consumer (#193) Sharing a client meant that the client's await for FetchResponse could skew the latency measurement. This change means that the producer and consumer services create their own clients. Signed-off-by: Keith Wall --- CHANGELOG.md | 1 + cmd/main.go | 32 ++++++++++++++++++++++---------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cda3cd9..c5f4010 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 0.4.0 * Fixed replicas assignment taking rack configuration into account (#190) +* Use separate sarama clients for producer/consumer in order to reliably measure message latency (#188) ## 0.3.0 diff --git a/cmd/main.go b/cmd/main.go index 7f7f757..92a3ad1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -63,15 +63,24 @@ func main() { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - client, err := newClient(canaryConfig) + saramaConfig, err := createSaramaConfig(canaryConfig) if err != nil { - glog.Fatalf("Error creating new Sarama client: %v", err) + glog.Fatalf("Error creating Sarama config: %v", err) } - topicService := services.NewTopicService(canaryConfig, client.Config()) - producerService := services.NewProducerService(canaryConfig, client) - consumerService := services.NewConsumerService(canaryConfig, client) - connectionService := services.NewConnectionService(canaryConfig, client.Config()) + producerClient, err := newClientWithRetry(canaryConfig, saramaConfig) + if err != nil { + glog.Fatalf("Error creating producer Sarama client: %v", err) + } + consumerClient, err := newClientWithRetry(canaryConfig, saramaConfig) + if err != nil { + glog.Fatalf("Error creating consumer Sarama client: %v", err) + } + + topicService := services.NewTopicService(canaryConfig, saramaConfig) + producerService := services.NewProducerService(canaryConfig, producerClient) + consumerService := services.NewConsumerService(canaryConfig, consumerClient) + connectionService := services.NewConnectionService(canaryConfig, saramaConfig) canaryManager := workers.NewCanaryManager(canaryConfig, topicService, producerService, consumerService, connectionService, statusService) canaryManager.Start() @@ -81,11 +90,13 @@ func main() { canaryManager.Stop() httpServer.Stop() dynamicConfigWatcher.Close() + _ = producerClient.Close() + _ = consumerClient.Close() glog.Infof("Strimzi canary stopped") } -func newClient(canaryConfig *config.CanaryConfig) (sarama.Client, error) { +func createSaramaConfig(canaryConfig *config.CanaryConfig) (*sarama.Config, error) { config := sarama.NewConfig() kafkaVersion, err := sarama.ParseKafkaVersion(canaryConfig.KafkaVersion) if err != nil { @@ -99,9 +110,6 @@ func newClient(canaryConfig *config.CanaryConfig) (sarama.Client, error) { config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 0 config.Consumer.Return.Errors = true - // this Sarama fix https://github.com/Shopify/sarama/pull/2227 increases the canary e2e latency - // it shows a potential bug in Sarama. We revert the value back here while waiting for a Sarama fix - config.Consumer.MaxWaitTime = 250 * time.Millisecond if canaryConfig.TLSEnabled { config.Net.TLS.Enable = true @@ -116,6 +124,10 @@ func newClient(canaryConfig *config.CanaryConfig) (sarama.Client, error) { } } + return config, nil +} + +func newClientWithRetry(canaryConfig *config.CanaryConfig, config *sarama.Config) (sarama.Client, error) { backoff := services.NewBackoff(canaryConfig.BootstrapBackoffMaxAttempts, canaryConfig.BootstrapBackoffScale*time.Millisecond, services.MaxDefault) for { client, clientErr := sarama.NewClient(canaryConfig.BootstrapServers, config) From d4598748f56237c18215b4d897b4a30e49e9a30a Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 29 Jun 2022 11:23:34 +0100 Subject: [PATCH 11/11] fix: adjust default producer/end-to-end latency buckets. (#194) following #188, the default producer/end-to-end histogram buckets used by ihe canary are no longer ideal with all of the obseverations falling into the first (<=100ms) bucket. This change introduces several smaller buckets and drops the largest ones. Signed-off-by: kwall --- CHANGELOG.md | 1 + README.md | 4 ++-- internal/config/canary_config.go | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c5f4010..540a129 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * Fixed replicas assignment taking rack configuration into account (#190) * Use separate sarama clients for producer/consumer in order to reliably measure message latency (#188) +* Adjust default histogram bucket sizes (#194) ## 0.3.0 diff --git a/README.md b/README.md index 69bfc59..17fb1ba 100644 --- a/README.md +++ b/README.md @@ -62,8 +62,8 @@ The configuration file described in more detail the next section. | `RECONCILE_INTERVAL_MS` | It defines how often the tool has to send and receive messages (in ms). | `30000` | | | `CLIENT_ID` | The client id used for configuring producer and consumer. | `strimzi-canary-client` | | | `CONSUMER_GROUP_ID` | Group id for the consumer group joined by the canary consumer. | `strimzi-canary-group` | | -| `PRODUCER_LATENCY_BUCKETS` | Buckets of the histogram related to the producer latency metric (in ms). | `100,200,400,800,1600` | | -| `ENDTOEND_LATENCY_BUCKETS` | Buckets of the histogram related to the end to end latency metric between producer and consumer (in ms). | `100,200,400,800,1600` | | +| `PRODUCER_LATENCY_BUCKETS` | Buckets of the histogram related to the producer latency metric (in ms). | `2,5,10,20,50,100,200,400` | | +| `ENDTOEND_LATENCY_BUCKETS` | Buckets of the histogram related to the end to end latency metric between producer and consumer (in ms). | `5,10,20,50,100,200,400,800` | | | `EXPECTED_CLUSTER_SIZE` | Expected number of brokers in the Kafka cluster where the canary connects to. This parameter avoids that the tool runs more partitions reassignment of the topic while the Kafka cluster is starting up and the brokers are coming one by one. `-1` means "dynamic" reassignment as described above. When greater than 0, the canary waits for the Kafka cluster having the expected number of brokers running before creating the topic and assigning the partitions | `-1` | | | `KAFKA_VERSION` | Version of the Kafka cluster | `3.1.0` | | | `SARAMA_LOG_ENABLED` | Enables the Sarama client logging. | `false` | `saramaLogEnabled` | diff --git a/internal/config/canary_config.go b/internal/config/canary_config.go index 94e29f9..fe36381 100644 --- a/internal/config/canary_config.go +++ b/internal/config/canary_config.go @@ -57,8 +57,8 @@ const ( ReconcileIntervalDefault = 30000 ClientIDDefault = "strimzi-canary-client" ConsumerGroupIDDefault = "strimzi-canary-group" - ProducerLatencyBucketsDefault = "100,200,400,800,1600" - EndToEndLatencyBucketsDefault = "100,200,400,800,1600" + ProducerLatencyBucketsDefault = "2,5,10,20,50,100,200,400" + EndToEndLatencyBucketsDefault = "5,10,20,50,100,200,400,800" ExpectedClusterSizeDefault = -1 // "dynamic" reassignment is enabled KafkaVersionDefault = "3.1.0" SaramaLogEnabledDefault = false