diff --git a/CHANGELOG.md b/CHANGELOG.md index fe8f4fd..540a129 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,20 @@ # CHANGELOG +## 0.4.0 + +* Fixed replicas assignment taking rack configuration into account (#190) +* Use separate sarama clients for producer/consumer in order to reliably measure message latency (#188) +* Adjust default histogram bucket sizes (#194) + ## 0.3.0 * Forced cleanup.policy to "delete" for the canary topic (#173) * Allow to change canary logging verbosity and enable/disable Sarama logger at runtime (#166) * Added Prometheus support via `PodMonitor` and sample Grafana dashboard for exposed metrics * Treat ETIMEDOUT (TCP keep-alive failure) as a disconnection condition too (#159) -* Updated dependency to Sarama 1.33.0 (#176) +* Updated dependency to Sarama 1.34.0 (#180) * Updated default Kafka version used by Sarama library to 3.1.0 +* Use 250ms as consumer fetch max wait timeout with Sarama 1.34.0 (#184) ## 0.2.0 diff --git a/README.md b/README.md index 2d90e6b..6db49f9 100644 --- a/README.md +++ b/README.md @@ -62,8 +62,8 @@ The configuration file described in more detail the next section. | `RECONCILE_INTERVAL_MS` | It defines how often the tool has to send and receive messages (in ms). | `30000` | | | `CLIENT_ID` | The client id used for configuring producer and consumer. | `strimzi-canary-client` | | | `CONSUMER_GROUP_ID` | Group id for the consumer group joined by the canary consumer. | `strimzi-canary-group` | | -| `PRODUCER_LATENCY_BUCKETS` | Buckets of the histogram related to the producer latency metric (in ms). | `100,200,400,800,1600` | | -| `ENDTOEND_LATENCY_BUCKETS` | Buckets of the histogram related to the end to end latency metric between producer and consumer (in ms). | `100,200,400,800,1600` | | +| `PRODUCER_LATENCY_BUCKETS` | Buckets of the histogram related to the producer latency metric (in ms). | `2,5,10,20,50,100,200,400` | | +| `ENDTOEND_LATENCY_BUCKETS` | Buckets of the histogram related to the end to end latency metric between producer and consumer (in ms). | `5,10,20,50,100,200,400,800` | | | `EXPECTED_CLUSTER_SIZE` | Expected number of brokers in the Kafka cluster where the canary connects to. This parameter avoids that the tool runs more partitions reassignment of the topic while the Kafka cluster is starting up and the brokers are coming one by one. `-1` means "dynamic" reassignment as described above. When greater than 0, the canary waits for the Kafka cluster having the expected number of brokers running before creating the topic and assigning the partitions | `-1` | | | `KAFKA_VERSION` | Version of the Kafka cluster | `3.1.0` | | | `SARAMA_LOG_ENABLED` | Enables the Sarama client logging. | `false` | `saramaLogEnabled` | diff --git a/cmd/main.go b/cmd/main.go index f18b735..1d06c4e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -43,7 +43,6 @@ var ( }, nil) ) -var saramaLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags) func initTracerProvider(exporterType string) *sdktrace.TracerProvider { resources, _ := resource.New(context.Background(), @@ -76,6 +75,7 @@ func exporterTracing(exporterType string) sdktrace.SpanExporter { } return exporter } +var saramaLogger = log.New(io.Discard, "[Sarama] ", log.Ldate | log.Lmicroseconds) func main() { @@ -112,15 +112,24 @@ func main() { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - client, err := newClient(canaryConfig) + saramaConfig, err := createSaramaConfig(canaryConfig) if err != nil { - glog.Fatalf("Error creating new Sarama client: %v", err) + glog.Fatalf("Error creating Sarama config: %v", err) } - topicService := services.NewTopicService(canaryConfig, client.Config()) - producerService := services.NewProducerService(canaryConfig, client) - consumerService := services.NewConsumerService(canaryConfig, client) - connectionService := services.NewConnectionService(canaryConfig, client.Config()) + producerClient, err := newClientWithRetry(canaryConfig, saramaConfig) + if err != nil { + glog.Fatalf("Error creating producer Sarama client: %v", err) + } + consumerClient, err := newClientWithRetry(canaryConfig, saramaConfig) + if err != nil { + glog.Fatalf("Error creating consumer Sarama client: %v", err) + } + + topicService := services.NewTopicService(canaryConfig, saramaConfig) + producerService := services.NewProducerService(canaryConfig, producerClient) + consumerService := services.NewConsumerService(canaryConfig, consumerClient) + connectionService := services.NewConnectionService(canaryConfig, saramaConfig) canaryManager := workers.NewCanaryManager(canaryConfig, topicService, producerService, consumerService, connectionService, statusService) canaryManager.Start() @@ -130,11 +139,13 @@ func main() { canaryManager.Stop() httpServer.Stop() dynamicConfigWatcher.Close() + _ = producerClient.Close() + _ = consumerClient.Close() glog.Infof("Strimzi canary stopped") } -func newClient(canaryConfig *config.CanaryConfig) (sarama.Client, error) { +func createSaramaConfig(canaryConfig *config.CanaryConfig) (*sarama.Config, error) { config := sarama.NewConfig() kafkaVersion, err := sarama.ParseKafkaVersion(canaryConfig.KafkaVersion) if err != nil { @@ -162,6 +173,10 @@ func newClient(canaryConfig *config.CanaryConfig) (sarama.Client, error) { } } + return config, nil +} + +func newClientWithRetry(canaryConfig *config.CanaryConfig, config *sarama.Config) (sarama.Client, error) { backoff := services.NewBackoff(canaryConfig.BootstrapBackoffMaxAttempts, canaryConfig.BootstrapBackoffScale*time.Millisecond, services.MaxDefault) for { client, clientErr := sarama.NewClient(canaryConfig.BootstrapServers, config) diff --git a/install/020-Deployment.yaml b/install/020-Deployment.yaml index 899ff4e..6e1f14b 100644 --- a/install/020-Deployment.yaml +++ b/install/020-Deployment.yaml @@ -17,7 +17,7 @@ spec: serviceAccountName: strimzi-canary containers: - name: strimzi-canary - image: quay.io/strimzi/canary:0.2.0 + image: quay.io/strimzi/canary:0.3.0 env: - name: KAFKA_BOOTSTRAP_SERVERS value: my-cluster-kafka-bootstrap:9092 diff --git a/internal/config/canary_config.go b/internal/config/canary_config.go index 838081d..3232d12 100644 --- a/internal/config/canary_config.go +++ b/internal/config/canary_config.go @@ -58,8 +58,8 @@ const ( ReconcileIntervalDefault = 30000 ClientIDDefault = "strimzi-canary-client" ConsumerGroupIDDefault = "strimzi-canary-group" - ProducerLatencyBucketsDefault = "100,200,400,800,1600" - EndToEndLatencyBucketsDefault = "100,200,400,800,1600" + ProducerLatencyBucketsDefault = "2,5,10,20,50,100,200,400" + EndToEndLatencyBucketsDefault = "5,10,20,50,100,200,400,800" ExpectedClusterSizeDefault = -1 // "dynamic" reassignment is enabled KafkaVersionDefault = "3.1.0" SaramaLogEnabledDefault = false diff --git a/internal/services/topic.go b/internal/services/topic.go index e99c1aa..ea66c8f 100644 --- a/internal/services/topic.go +++ b/internal/services/topic.go @@ -219,10 +219,12 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { func (ts *TopicService) Close() { glog.Infof("Closing topic service") - if err := ts.admin.Close(); err != nil { - glog.Fatalf("Error closing the Sarama cluster admin: %v", err) + if ts.admin != nil { + if err := ts.admin.Close(); err != nil { + glog.Fatalf("Error closing the Sarama cluster admin: %v", err) + } + ts.admin = nil } - ts.admin = nil glog.Infof("Topic service closed") } @@ -312,14 +314,67 @@ func (ts *TopicService) requestedAssignments(currentPartitions int, brokers []*s return brokers[i].ID() < brokers[j].ID() }) - assignments := make(map[int32][]int32, int(partitions)) + // now adjust the broker ordering to produce a rack alternated list. + // if the brokers have no rack information (or only some brokers have it) this next part has no effect. + // + // for example: + // assuming: 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1" + // rackMap contains: + // rack1: {0, 5} + // rack2: {3, 4} + // rack3: {1, 2} + // rack alternated broker list: + // 0, 3, 1, 5, 4, 2 + + rackMap := make(map[string][]*sarama.Broker) + var rackNames []string + brokersWithRack := 0 + for _, broker := range brokers { + if broker.Rack() != "" { + brokersWithRack++ + if _, ok := rackMap[broker.Rack()]; !ok { + rackMap[broker.Rack()] = make([]*sarama.Broker, 0) + rackNames = append(rackNames, broker.Rack()) + } + rackMap[broker.Rack()] = append(rackMap[broker.Rack()], broker) + } + } + + if len(brokers) != brokersWithRack { + if brokersWithRack > 0 { + glog.Warningf("Not *all* brokers have rack assignments (%d/%d), topic %s will be created without rack awareness", brokersWithRack, len(brokers), ts.canaryConfig.Topic) + } + } else { + index := 0 + + for ;; { + again := false + + for _, rackName := range rackNames { + brokerList := rackMap[rackName] + if len(brokerList) > 0 { + var head *sarama.Broker + head, rackMap[rackName] = brokerList[0], brokerList[1:] + brokers[index] = head + index++ + again = true + } + } + + if !again { + break + } + } + } + + assignments := make(map[int32][]int32, partitions) for p := 0; p < int(partitions); p++ { - assignments[int32(p)] = make([]int32, int(replicationFactor)) + assignments[int32(p)] = make([]int32, replicationFactor) k := p - for r := 0; r < int(replicationFactor); r++ { + for r := 0; r < replicationFactor; r++ { // get brokers ID for assignment from the brokers list and not using // just a monotonic increasing index because there could be "hole" (a broker down) - assignments[int32(p)][r] = brokers[int32(k%int(brokersNumber))].ID() + assignments[int32(p)][r] = brokers[int32(k%brokersNumber)].ID() k++ } } @@ -338,7 +393,7 @@ func (ts *TopicService) currentAssignments(topicMetadata *sarama.TopicMetadata) // Alter the replica assignment for the partitions // -// After the request for the replica assignement, it run a loop for checking if the reassignment is still ongoing +// After the request for the replica assignment, it run a loop for checking if the reassignment is still ongoing // It returns when the reassignment is done or there is an error func (ts *TopicService) alterAssignments(assignments [][]int32) error { if err := ts.admin.AlterPartitionReassignments(ts.canaryConfig.Topic, assignments); err != nil { diff --git a/internal/services/topic_test.go b/internal/services/topic_test.go new file mode 100644 index 0000000..32ed2f7 --- /dev/null +++ b/internal/services/topic_test.go @@ -0,0 +1,176 @@ +// +// Copyright Strimzi authors. +// License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). +// + +// +build unit_test + +// Package services defines an interface for canary services and related implementations + +package services + +import ( + "fmt" + "github.com/Shopify/sarama" + "github.com/strimzi/strimzi-canary/internal/config" + "math/rand" + "reflect" + "testing" + "time" + "unsafe" +) + +func TestRequestedAssignments(t *testing.T) { + var tests = []struct { + name string + numPartitions int + numBrokers int + useRack bool + brokersWithMultipleLeaders []int32 + expectedMinISR int + }{ + {"one broker", 1, 1, false, []int32{}, 1}, + {"three brokers without rack info", 3, 3, false, []int32{}, 2}, + {"fewer brokers than partitions", 3, 2, false, []int32{0}, 1}, + {"six brokers with rack info", 6, 6, true, []int32{}, 2}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &config.CanaryConfig{ + Topic: "test", + } + + brokers, brokerMap := createBrokers(t, tt.numBrokers, tt.useRack) + + ts := NewTopicService(cfg, nil) + + assignments, minISR := ts.requestedAssignments(tt.numPartitions, brokers) + + if tt.expectedMinISR != minISR { + t.Errorf("unexpected minISR, got = %d, want = %d", minISR, tt.expectedMinISR) + } + + for _, brokerIds := range assignments { + duplicatedBrokers := make(map[int32]int) + + for _, brokerId := range brokerIds { + if _, ok := duplicatedBrokers[brokerId]; !ok { + duplicatedBrokers[brokerId] = 1 + } else { + duplicatedBrokers[brokerId] = duplicatedBrokers[brokerId] + 1 + } + + } + + for brokerId, count := range duplicatedBrokers { + if count > 1 { + t.Errorf("partition is replicated on same broker (%d) more than once (%d)", brokerId, count) + } + } + } + + leaderBrokers := make(map[int32]int) + for _, brokerIds := range assignments { + + leaderBrokerId := brokerIds[0] + if _, ok := leaderBrokers[leaderBrokerId]; !ok { + leaderBrokers[leaderBrokerId] = 1 + } else { + leaderBrokers[leaderBrokerId] = leaderBrokers[leaderBrokerId] + 1 + } + } + + for brokerId, count := range leaderBrokers { + if count > 1 { + found := false + for _, expectedBrokerId := range tt.brokersWithMultipleLeaders { + if expectedBrokerId == brokerId { + found = true + break + } + } + + if !found { + t.Errorf("broker %d is leader for more than one partition (%d)", brokerId, count) + } + } + } + + if tt.useRack { + for i, brokerIds := range assignments { + rackBrokerId := make(map[string][]int32) + for _, brokerId := range brokerIds { + broker := brokerMap[brokerId] + _, ok := rackBrokerId[broker.Rack()] + if !ok { + rackBrokerId[broker.Rack()] = make([]int32, 0) + } + rackBrokerId[broker.Rack()] = append(rackBrokerId[broker.Rack()], broker.ID()) + } + + for rack, brokerIds := range rackBrokerId { + if len(brokerIds) > 1 { + t.Errorf("partition %d has been assigned to %d brokers %v in rackBrokerId %s", i, len(brokerIds), brokerIds, rack) + } + } + } + } + + ts.Close() + }) + } + +} + +func createBrokers(t *testing.T, num int, rack bool) ([]*sarama.Broker, map[int32]*sarama.Broker) { + brokers := make([]*sarama.Broker, 0) + brokerMap := make(map[int32]*sarama.Broker) + for i := 0; i < num ; i++ { + broker := &sarama.Broker{} + + setBrokerID(t, broker, i) + + brokers = append(brokers, broker) + brokerMap[broker.ID()] = broker + } + + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(brokers), func(i, j int) { brokers[i], brokers[j] = brokers[j], brokers[i] }) + + if rack { + rackNames := make([]string, 3) + for i, _ := range rackNames { + rackNames[i] = fmt.Sprintf("useRack%d", i) + } + + for i, broker := range brokers { + rackName := rackNames[i%3] + setBrokerRack(t, broker, rackName) + } + } + + return brokers, brokerMap +} + +func setBrokerID(t *testing.T, broker *sarama.Broker, i int) { + idV := reflect.ValueOf(broker).Elem().FieldByName("id") + setUnexportedField(idV, int32(i)) + if int32(i) != broker.ID() { + t.Errorf("failed to set id by reflection") + } +} + +func setBrokerRack(t *testing.T, broker *sarama.Broker, rackName string) { + rackV := reflect.ValueOf(broker).Elem().FieldByName("rack") + setUnexportedField(rackV, &rackName) + if rackName != broker.Rack() { + t.Errorf("failed to set useRack by reflection") + } +} + +func setUnexportedField(field reflect.Value, value interface{}) { + reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())). + Elem(). + Set(reflect.ValueOf(value)) +} diff --git a/release.version b/release.version index e09dd94..9b3788c 100644 --- a/release.version +++ b/release.version @@ -1 +1 @@ -0.3.0-SNAPSHOT \ No newline at end of file +0.4.0-SNAPSHOT \ No newline at end of file