Skip to content
This repository has been archived by the owner on Nov 29, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' of github.com:strimzi/strimzi-canary into add_otlp
Browse files Browse the repository at this point in the history
Signed-off-by: Melchior Moulin <[email protected]>
  • Loading branch information
melchiormoulin committed Jun 29, 2022
2 parents 0ef6989 + d459874 commit b8e5bac
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 23 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
# CHANGELOG

## 0.4.0

* Fixed replicas assignment taking rack configuration into account (#190)
* Use separate sarama clients for producer/consumer in order to reliably measure message latency (#188)
* Adjust default histogram bucket sizes (#194)

## 0.3.0

* Forced cleanup.policy to "delete" for the canary topic (#173)
* Allow to change canary logging verbosity and enable/disable Sarama logger at runtime (#166)
* Added Prometheus support via `PodMonitor` and sample Grafana dashboard for exposed metrics
* Treat ETIMEDOUT (TCP keep-alive failure) as a disconnection condition too (#159)
* Updated dependency to Sarama 1.33.0 (#176)
* Updated dependency to Sarama 1.34.0 (#180)
* Updated default Kafka version used by Sarama library to 3.1.0
* Use 250ms as consumer fetch max wait timeout with Sarama 1.34.0 (#184)

## 0.2.0

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ The configuration file described in more detail the next section.
| `RECONCILE_INTERVAL_MS` | It defines how often the tool has to send and receive messages (in ms). | `30000` | |
| `CLIENT_ID` | The client id used for configuring producer and consumer. | `strimzi-canary-client` | |
| `CONSUMER_GROUP_ID` | Group id for the consumer group joined by the canary consumer. | `strimzi-canary-group` | |
| `PRODUCER_LATENCY_BUCKETS` | Buckets of the histogram related to the producer latency metric (in ms). | `100,200,400,800,1600` | |
| `ENDTOEND_LATENCY_BUCKETS` | Buckets of the histogram related to the end to end latency metric between producer and consumer (in ms). | `100,200,400,800,1600` | |
| `PRODUCER_LATENCY_BUCKETS` | Buckets of the histogram related to the producer latency metric (in ms). | `2,5,10,20,50,100,200,400` | |
| `ENDTOEND_LATENCY_BUCKETS` | Buckets of the histogram related to the end to end latency metric between producer and consumer (in ms). | `5,10,20,50,100,200,400,800` | |
| `EXPECTED_CLUSTER_SIZE` | Expected number of brokers in the Kafka cluster where the canary connects to. This parameter avoids that the tool runs more partitions reassignment of the topic while the Kafka cluster is starting up and the brokers are coming one by one. `-1` means "dynamic" reassignment as described above. When greater than 0, the canary waits for the Kafka cluster having the expected number of brokers running before creating the topic and assigning the partitions | `-1` | |
| `KAFKA_VERSION` | Version of the Kafka cluster | `3.1.0` | |
| `SARAMA_LOG_ENABLED` | Enables the Sarama client logging. | `false` | `saramaLogEnabled` |
Expand Down
31 changes: 23 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ var (
}, nil)
)

var saramaLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags)

func initTracerProvider(exporterType string) *sdktrace.TracerProvider {
resources, _ := resource.New(context.Background(),
Expand Down Expand Up @@ -76,6 +75,7 @@ func exporterTracing(exporterType string) sdktrace.SpanExporter {
}
return exporter
}
var saramaLogger = log.New(io.Discard, "[Sarama] ", log.Ldate | log.Lmicroseconds)

func main() {

Expand Down Expand Up @@ -112,15 +112,24 @@ func main() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

client, err := newClient(canaryConfig)
saramaConfig, err := createSaramaConfig(canaryConfig)
if err != nil {
glog.Fatalf("Error creating new Sarama client: %v", err)
glog.Fatalf("Error creating Sarama config: %v", err)
}

topicService := services.NewTopicService(canaryConfig, client.Config())
producerService := services.NewProducerService(canaryConfig, client)
consumerService := services.NewConsumerService(canaryConfig, client)
connectionService := services.NewConnectionService(canaryConfig, client.Config())
producerClient, err := newClientWithRetry(canaryConfig, saramaConfig)
if err != nil {
glog.Fatalf("Error creating producer Sarama client: %v", err)
}
consumerClient, err := newClientWithRetry(canaryConfig, saramaConfig)
if err != nil {
glog.Fatalf("Error creating consumer Sarama client: %v", err)
}

topicService := services.NewTopicService(canaryConfig, saramaConfig)
producerService := services.NewProducerService(canaryConfig, producerClient)
consumerService := services.NewConsumerService(canaryConfig, consumerClient)
connectionService := services.NewConnectionService(canaryConfig, saramaConfig)

canaryManager := workers.NewCanaryManager(canaryConfig, topicService, producerService, consumerService, connectionService, statusService)
canaryManager.Start()
Expand All @@ -130,11 +139,13 @@ func main() {
canaryManager.Stop()
httpServer.Stop()
dynamicConfigWatcher.Close()
_ = producerClient.Close()
_ = consumerClient.Close()

glog.Infof("Strimzi canary stopped")
}

func newClient(canaryConfig *config.CanaryConfig) (sarama.Client, error) {
func createSaramaConfig(canaryConfig *config.CanaryConfig) (*sarama.Config, error) {
config := sarama.NewConfig()
kafkaVersion, err := sarama.ParseKafkaVersion(canaryConfig.KafkaVersion)
if err != nil {
Expand Down Expand Up @@ -162,6 +173,10 @@ func newClient(canaryConfig *config.CanaryConfig) (sarama.Client, error) {
}
}

return config, nil
}

func newClientWithRetry(canaryConfig *config.CanaryConfig, config *sarama.Config) (sarama.Client, error) {
backoff := services.NewBackoff(canaryConfig.BootstrapBackoffMaxAttempts, canaryConfig.BootstrapBackoffScale*time.Millisecond, services.MaxDefault)
for {
client, clientErr := sarama.NewClient(canaryConfig.BootstrapServers, config)
Expand Down
2 changes: 1 addition & 1 deletion install/020-Deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
serviceAccountName: strimzi-canary
containers:
- name: strimzi-canary
image: quay.io/strimzi/canary:0.2.0
image: quay.io/strimzi/canary:0.3.0
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: my-cluster-kafka-bootstrap:9092
Expand Down
4 changes: 2 additions & 2 deletions internal/config/canary_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ const (
ReconcileIntervalDefault = 30000
ClientIDDefault = "strimzi-canary-client"
ConsumerGroupIDDefault = "strimzi-canary-group"
ProducerLatencyBucketsDefault = "100,200,400,800,1600"
EndToEndLatencyBucketsDefault = "100,200,400,800,1600"
ProducerLatencyBucketsDefault = "2,5,10,20,50,100,200,400"
EndToEndLatencyBucketsDefault = "5,10,20,50,100,200,400,800"
ExpectedClusterSizeDefault = -1 // "dynamic" reassignment is enabled
KafkaVersionDefault = "3.1.0"
SaramaLogEnabledDefault = false
Expand Down
71 changes: 63 additions & 8 deletions internal/services/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,12 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) {
func (ts *TopicService) Close() {
glog.Infof("Closing topic service")

if err := ts.admin.Close(); err != nil {
glog.Fatalf("Error closing the Sarama cluster admin: %v", err)
if ts.admin != nil {
if err := ts.admin.Close(); err != nil {
glog.Fatalf("Error closing the Sarama cluster admin: %v", err)
}
ts.admin = nil
}
ts.admin = nil
glog.Infof("Topic service closed")
}

Expand Down Expand Up @@ -312,14 +314,67 @@ func (ts *TopicService) requestedAssignments(currentPartitions int, brokers []*s
return brokers[i].ID() < brokers[j].ID()
})

assignments := make(map[int32][]int32, int(partitions))
// now adjust the broker ordering to produce a rack alternated list.
// if the brokers have no rack information (or only some brokers have it) this next part has no effect.
//
// for example:
// assuming: 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
// rackMap contains:
// rack1: {0, 5}
// rack2: {3, 4}
// rack3: {1, 2}
// rack alternated broker list:
// 0, 3, 1, 5, 4, 2

rackMap := make(map[string][]*sarama.Broker)
var rackNames []string
brokersWithRack := 0
for _, broker := range brokers {
if broker.Rack() != "" {
brokersWithRack++
if _, ok := rackMap[broker.Rack()]; !ok {
rackMap[broker.Rack()] = make([]*sarama.Broker, 0)
rackNames = append(rackNames, broker.Rack())
}
rackMap[broker.Rack()] = append(rackMap[broker.Rack()], broker)
}
}

if len(brokers) != brokersWithRack {
if brokersWithRack > 0 {
glog.Warningf("Not *all* brokers have rack assignments (%d/%d), topic %s will be created without rack awareness", brokersWithRack, len(brokers), ts.canaryConfig.Topic)
}
} else {
index := 0

for ;; {
again := false

for _, rackName := range rackNames {
brokerList := rackMap[rackName]
if len(brokerList) > 0 {
var head *sarama.Broker
head, rackMap[rackName] = brokerList[0], brokerList[1:]
brokers[index] = head
index++
again = true
}
}

if !again {
break
}
}
}

assignments := make(map[int32][]int32, partitions)
for p := 0; p < int(partitions); p++ {
assignments[int32(p)] = make([]int32, int(replicationFactor))
assignments[int32(p)] = make([]int32, replicationFactor)
k := p
for r := 0; r < int(replicationFactor); r++ {
for r := 0; r < replicationFactor; r++ {
// get brokers ID for assignment from the brokers list and not using
// just a monotonic increasing index because there could be "hole" (a broker down)
assignments[int32(p)][r] = brokers[int32(k%int(brokersNumber))].ID()
assignments[int32(p)][r] = brokers[int32(k%brokersNumber)].ID()
k++
}
}
Expand All @@ -338,7 +393,7 @@ func (ts *TopicService) currentAssignments(topicMetadata *sarama.TopicMetadata)

// Alter the replica assignment for the partitions
//
// After the request for the replica assignement, it run a loop for checking if the reassignment is still ongoing
// After the request for the replica assignment, it run a loop for checking if the reassignment is still ongoing
// It returns when the reassignment is done or there is an error
func (ts *TopicService) alterAssignments(assignments [][]int32) error {
if err := ts.admin.AlterPartitionReassignments(ts.canaryConfig.Topic, assignments); err != nil {
Expand Down
176 changes: 176 additions & 0 deletions internal/services/topic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
//
// Copyright Strimzi authors.
// License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
//

// +build unit_test

// Package services defines an interface for canary services and related implementations

package services

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/strimzi/strimzi-canary/internal/config"
"math/rand"
"reflect"
"testing"
"time"
"unsafe"
)

func TestRequestedAssignments(t *testing.T) {
var tests = []struct {
name string
numPartitions int
numBrokers int
useRack bool
brokersWithMultipleLeaders []int32
expectedMinISR int
}{
{"one broker", 1, 1, false, []int32{}, 1},
{"three brokers without rack info", 3, 3, false, []int32{}, 2},
{"fewer brokers than partitions", 3, 2, false, []int32{0}, 1},
{"six brokers with rack info", 6, 6, true, []int32{}, 2},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &config.CanaryConfig{
Topic: "test",
}

brokers, brokerMap := createBrokers(t, tt.numBrokers, tt.useRack)

ts := NewTopicService(cfg, nil)

assignments, minISR := ts.requestedAssignments(tt.numPartitions, brokers)

if tt.expectedMinISR != minISR {
t.Errorf("unexpected minISR, got = %d, want = %d", minISR, tt.expectedMinISR)
}

for _, brokerIds := range assignments {
duplicatedBrokers := make(map[int32]int)

for _, brokerId := range brokerIds {
if _, ok := duplicatedBrokers[brokerId]; !ok {
duplicatedBrokers[brokerId] = 1
} else {
duplicatedBrokers[brokerId] = duplicatedBrokers[brokerId] + 1
}

}

for brokerId, count := range duplicatedBrokers {
if count > 1 {
t.Errorf("partition is replicated on same broker (%d) more than once (%d)", brokerId, count)
}
}
}

leaderBrokers := make(map[int32]int)
for _, brokerIds := range assignments {

leaderBrokerId := brokerIds[0]
if _, ok := leaderBrokers[leaderBrokerId]; !ok {
leaderBrokers[leaderBrokerId] = 1
} else {
leaderBrokers[leaderBrokerId] = leaderBrokers[leaderBrokerId] + 1
}
}

for brokerId, count := range leaderBrokers {
if count > 1 {
found := false
for _, expectedBrokerId := range tt.brokersWithMultipleLeaders {
if expectedBrokerId == brokerId {
found = true
break
}
}

if !found {
t.Errorf("broker %d is leader for more than one partition (%d)", brokerId, count)
}
}
}

if tt.useRack {
for i, brokerIds := range assignments {
rackBrokerId := make(map[string][]int32)
for _, brokerId := range brokerIds {
broker := brokerMap[brokerId]
_, ok := rackBrokerId[broker.Rack()]
if !ok {
rackBrokerId[broker.Rack()] = make([]int32, 0)
}
rackBrokerId[broker.Rack()] = append(rackBrokerId[broker.Rack()], broker.ID())
}

for rack, brokerIds := range rackBrokerId {
if len(brokerIds) > 1 {
t.Errorf("partition %d has been assigned to %d brokers %v in rackBrokerId %s", i, len(brokerIds), brokerIds, rack)
}
}
}
}

ts.Close()
})
}

}

func createBrokers(t *testing.T, num int, rack bool) ([]*sarama.Broker, map[int32]*sarama.Broker) {
brokers := make([]*sarama.Broker, 0)
brokerMap := make(map[int32]*sarama.Broker)
for i := 0; i < num ; i++ {
broker := &sarama.Broker{}

setBrokerID(t, broker, i)

brokers = append(brokers, broker)
brokerMap[broker.ID()] = broker
}

rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(brokers), func(i, j int) { brokers[i], brokers[j] = brokers[j], brokers[i] })

if rack {
rackNames := make([]string, 3)
for i, _ := range rackNames {
rackNames[i] = fmt.Sprintf("useRack%d", i)
}

for i, broker := range brokers {
rackName := rackNames[i%3]
setBrokerRack(t, broker, rackName)
}
}

return brokers, brokerMap
}

func setBrokerID(t *testing.T, broker *sarama.Broker, i int) {
idV := reflect.ValueOf(broker).Elem().FieldByName("id")
setUnexportedField(idV, int32(i))
if int32(i) != broker.ID() {
t.Errorf("failed to set id by reflection")
}
}

func setBrokerRack(t *testing.T, broker *sarama.Broker, rackName string) {
rackV := reflect.ValueOf(broker).Elem().FieldByName("rack")
setUnexportedField(rackV, &rackName)
if rackName != broker.Rack() {
t.Errorf("failed to set useRack by reflection")
}
}

func setUnexportedField(field reflect.Value, value interface{}) {
reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).
Elem().
Set(reflect.ValueOf(value))
}
Loading

0 comments on commit b8e5bac

Please sign in to comment.