From 0d9da136d0df8e56e6ba543c0fc1062dd31791f6 Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Sat, 10 Jun 2023 19:31:58 +0300 Subject: [PATCH 01/10] Allow concurrent broker restarts from same AZ (broker rack) --- .../banzaicloud_v1beta1_kafkacluster.yaml | 8 + controllers/tests/common_test.go | 2 +- go.mod | 1 + go.sum | 1 + pkg/kafkaclient/provider.go | 11 + pkg/resources/kafka/kafka.go | 130 ++++++- pkg/resources/kafka/kafka_test.go | 354 ++++++++++++++++++ pkg/resources/kafka/mocks/KafkaClient.go | 138 +++++++ .../kafka/mocks/SubResourceClient.go | 122 ++++++ 9 files changed, 753 insertions(+), 14 deletions(-) create mode 100644 pkg/resources/kafka/mocks/KafkaClient.go create mode 100644 pkg/resources/kafka/mocks/SubResourceClient.go diff --git a/config/samples/banzaicloud_v1beta1_kafkacluster.yaml b/config/samples/banzaicloud_v1beta1_kafkacluster.yaml index 47aefbd0a..24ec8c0b7 100644 --- a/config/samples/banzaicloud_v1beta1_kafkacluster.yaml +++ b/config/samples/banzaicloud_v1beta1_kafkacluster.yaml @@ -55,6 +55,14 @@ spec: # alerts with 'rollingupgrade' # failureThreshold: 1 + # concurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If + # it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the + # brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker + # racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than + # 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used. + # This is a safe way to speed up the rolling upgrade. + # concurrentBrokerRestartsAllowed: 1 + # brokerConfigGroups specifies multiple broker configs with unique name brokerConfigGroups: # Specify desired group name (eg., 'default_group') diff --git a/controllers/tests/common_test.go b/controllers/tests/common_test.go index c164a2498..15938c3e9 100644 --- a/controllers/tests/common_test.go +++ b/controllers/tests/common_test.go @@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster { CCJMXExporterConfig: "custom_property: custom_value", }, ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true", - RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartCountPerRack: 1}, }, } } diff --git a/go.mod b/go.mod index 00cbd7668..712619d4d 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect + github.com/stretchr/objx v0.5.0 // indirect golang.org/x/tools v0.7.0 // indirect ) diff --git a/go.sum b/go.sum index ca2457a75..604721f0c 100644 --- a/go.sum +++ b/go.sum @@ -433,6 +433,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/pkg/kafkaclient/provider.go b/pkg/kafkaclient/provider.go index 4432ad34d..7a71372c8 100644 --- a/pkg/kafkaclient/provider.go +++ b/pkg/kafkaclient/provider.go @@ -15,6 +15,7 @@ package kafkaclient import ( + "github.com/stretchr/testify/mock" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/banzaicloud/koperator/api/v1beta1" @@ -45,3 +46,13 @@ func NewDefaultProvider() Provider { func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) { return NewFromCluster(client, cluster) } + +// MockedProvider is a Testify mock for providing Kafka clients that can be mocks too +type MockedProvider struct { + mock.Mock +} + +func (m *MockedProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) { + args := m.Called(client, cluster) + return args.Get(0).(KafkaClient), args.Get(1).(func()), args.Error(2) +} diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 81d91a7ff..e58c0a13c 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "reflect" + "regexp" "sort" "strconv" "strings" @@ -86,12 +87,23 @@ const ( nonControllerBrokerReconcilePriority // controllerBrokerReconcilePriority the priority used for controller broker used to define its priority in the reconciliation order controllerBrokerReconcilePriority + + // defaultConcurrentBrokerRestartsAllowed the default number of brokers that can be restarted in parallel + defaultConcurrentBrokerRestartsAllowed = 1 +) + +var ( + // kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs + kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`) ) // Reconciler implements the Component Reconciler type Reconciler struct { resources.Reconciler + // kafkaClientProvider is used to create a new KafkaClient kafkaClientProvider kafkaclient.Provider + // kafkaBrokerAvailabilityZoneMap is a map of broker id to availability zone used in concurrent broker restarts logic + kafkaBrokerAvailabilityZoneMap map[int32]string } // New creates a new reconciler for Kafka @@ -102,9 +114,39 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk DirectClient: directClient, KafkaCluster: cluster, }, - kafkaClientProvider: kafkaClientProvider, + kafkaClientProvider: kafkaClientProvider, + kafkaBrokerAvailabilityZoneMap: getBrokerAzMap(cluster), + } +} + +func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string { + brokerAzMap := make(map[int32]string) + for _, broker := range cluster.Spec.Brokers { + brokerRack := getBrokerRack(broker.ReadOnlyConfig) + if brokerRack != "" { + brokerAzMap[broker.Id] = brokerRack + } + } + // if incomplete broker AZ information, consider all brokers as being in different AZs + if len(brokerAzMap) != len(cluster.Spec.Brokers) { + for _, broker := range cluster.Spec.Brokers { + brokerAzMap[broker.Id] = fmt.Sprintf("%d", broker.Id) + } + } + return brokerAzMap +} + +func getBrokerRack(readOnlyConfig string) string { + if readOnlyConfig == "" { + return "" + } + match := kafkaConfigBrokerRackRegex.FindStringSubmatch(readOnlyConfig) + if len(match) == 2 { + return match[1] } + return "" } + func getCreatedPvcForBroker( ctx context.Context, c client.Reader, @@ -874,18 +916,23 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo if err != nil { return errors.WrapIf(err, "failed to reconcile resource") } - for _, pod := range podList.Items { - pod := pod - if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) { - return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating"), "rolling upgrade in progress") - } - if k8sutil.IsPodContainsPendingContainer(&pod) { - return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still creating"), "rolling upgrade in progress") - } + if len(podList.Items) < len(r.KafkaCluster.Spec.Brokers) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod count differs from brokers spec"), "rolling upgrade in progress") } - errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount + // Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once + concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed() + terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items) + if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") + } + currentPodAz := r.getBrokerAz(currentPod) + if concurrentBrokerRestartsAllowed > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating or creating from another AZ"), "rolling upgrade in progress") + } + // Check broker count with out-of-sync and offline replicas against the rolling upgrade failure threshold + errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster) if err != nil { return errorfactory.New(errorfactory.BrokersUnreachable{}, err, "could not connect to kafka brokers") @@ -905,7 +952,6 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo if len(outOfSyncReplicas) > 0 { log.Info("out-of-sync replicas", "IDs", outOfSyncReplicas) } - impactedReplicas := make(map[int32]struct{}) for _, brokerID := range allOfflineReplicas { impactedReplicas[brokerID] = struct{}{} @@ -913,12 +959,17 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo for _, brokerID := range outOfSyncReplicas { impactedReplicas[brokerID] = struct{}{} } - errorCount += len(impactedReplicas) - if errorCount >= r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("cluster is not healthy"), "rolling upgrade in progress") } + + // If multiple concurrent restarts and broker failures allowed, restart only brokers from the same AZ + if concurrentBrokerRestartsAllowed > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 { + if r.existsFailedBrokerFromAnotherRack(currentPodAz, impactedReplicas) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("broker is not healthy from another AZ"), "rolling upgrade in progress") + } + } } } @@ -940,6 +991,38 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return nil } +func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}) bool { + if currentPodAz == "" && len(impactedReplicas) > 0 { + return true + } + for brokerWithFailure := range impactedReplicas { + if currentPodAz != r.kafkaBrokerAvailabilityZoneMap[brokerWithFailure] { + return true + } + } + return false +} + +func (r *Reconciler) getConcurrentBrokerRestartsAllowed() int { + if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 { + return r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack + } + return defaultConcurrentBrokerRestartsAllowed +} + +func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod) bool { + if currentPodAz == "" && len(terminatingOrPendingPods) > 0 { + return true + } + for _, terminatingOrPendingPod := range terminatingOrPendingPods { + if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod) { + return true + } + } + return false +} + +//nolint:funlen func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error { brokersVolumesState := make(map[string]map[string]v1beta1.VolumeState) var brokerIds []string @@ -1289,6 +1372,27 @@ func (r *Reconciler) determineControllerId() (int32, error) { return controllerID, nil } +func getPodsInTerminatingOrPendingState(items []corev1.Pod) []corev1.Pod { + var pods []corev1.Pod + for _, pod := range items { + if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) { + pods = append(pods, pod) + } + if k8sutil.IsPodContainsPendingContainer(&pod) { + pods = append(pods, pod) + } + } + return pods +} + +func (r *Reconciler) getBrokerAz(pod *corev1.Pod) string { + brokerId, err := strconv.ParseInt(pod.Labels[v1beta1.BrokerIdLabelKey], 10, 32) + if err != nil { + return "" + } + return r.kafkaBrokerAvailabilityZoneMap[int32(brokerId)] +} + func getServiceFromExternalListener(client client.Client, cluster *v1beta1.KafkaCluster, eListenerName string, ingressConfigName string) (*corev1.Service, error) { foundLBService := &corev1.Service{} diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index a1a5cda40..7600fd848 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -18,6 +18,7 @@ import ( "context" "reflect" "testing" + "time" "emperror.dev/errors" "github.com/go-logr/logr" @@ -25,6 +26,9 @@ import ( "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/banzaicloud/koperator/pkg/kafkaclient" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -950,3 +954,353 @@ func TestGetServerPasswordKeysAndUsers(t *testing.T) { //nolint funlen }) } } + +// nolint funlen +func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { + t.Parallel() + testCases := []struct { + testName string + kafkaCluster v1beta1.KafkaCluster + desiredPod *corev1.Pod + currentPod *corev1.Pod + pods []corev1.Pod + allOfflineReplicas []int32 + outOfSyncReplicas []int32 + errorExpected bool + }{ + { + testName: "Pod is not deleted if pod list count different from spec", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 101}, {Id: 201}, {Id: 301}}, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{}, + pods: []corev1.Pod{}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if allowed concurrent restarts not specified (default=1) and another pod is restarting", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 101}, {Id: 201}, {Id: 301}}, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if allowed concurrent restarts equals pods restarting", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 101}, {Id: 201}, {Id: 301}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if broker.rack is not set in all read-only configs, if another pod is restarting", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 101}, {Id: 102}, {Id: 201}, {Id: 102}, {Id: 301}, {Id: 302}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if broker.rack is not set in some read-only configs, if another pod is restarting", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: ""}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: ""}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: ""}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if allowed concurrent restarts is not specified and failure threshold is reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 1, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + outOfSyncReplicas: []int32{101}, + errorExpected: true, + }, + { + testName: "Pod is deleted if allowed concurrent restarts is not specified and failure threshold is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 1, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + errorExpected: false, + }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, even if allowed concurrent restarts is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + outOfSyncReplicas: []int32{201}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if failure is in another AZ, even if allowed concurrent restarts is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + outOfSyncReplicas: []int32{201}, + errorExpected: true, + }, + { + testName: "Pod is deleted if failure is in same AZ and allowed concurrent restarts is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + outOfSyncReplicas: []int32{101}, + errorExpected: false, + }, + } + + mockCtrl := gomock.NewController(t) + + for _, test := range testCases { + mockClient := mocks.NewMockClient(mockCtrl) + mockKafkaClientProvider := new(kafkaclient.MockedProvider) + + t.Run(test.testName, func(t *testing.T) { + r := New(mockClient, nil, &test.kafkaCluster, mockKafkaClientProvider) + + // Mock client + mockClient.EXPECT().List( + context.TODO(), + gomock.AssignableToTypeOf(&corev1.PodList{}), + client.InNamespace("kafka"), + gomock.Any(), + ).Do(func(ctx context.Context, list *corev1.PodList, opts ...client.ListOption) { + list.Items = test.pods + }).Return(nil) + if !test.errorExpected { + mockClient.EXPECT().Delete(context.TODO(), test.currentPod).Return(nil) + } + + // Mock kafka client + mockedKafkaClient := new(mocks.KafkaClient) + mockedKafkaClient.On("AllOfflineReplicas").Return(test.outOfSyncReplicas, nil) + mockedKafkaClient.On("OutOfSyncReplicas").Return(test.outOfSyncReplicas, nil) + mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil) + + // Call the handleRollingUpgrade function with the provided test.desiredPod and test.currentPod + err := r.handleRollingUpgrade(logf.Log, test.desiredPod, test.currentPod, reflect.TypeOf(test.desiredPod)) + + // Test that the expected error is returned + if test.errorExpected { + assert.NotNil(t, err, "Expected an error but got nil") + } else { + assert.Nil(t, err, "Expected no error but got one") + } + }) + } +} diff --git a/pkg/resources/kafka/mocks/KafkaClient.go b/pkg/resources/kafka/mocks/KafkaClient.go new file mode 100644 index 000000000..efcb719e8 --- /dev/null +++ b/pkg/resources/kafka/mocks/KafkaClient.go @@ -0,0 +1,138 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import ( + "github.com/Shopify/sarama" + "github.com/stretchr/testify/mock" + + "github.com/banzaicloud/koperator/api/v1alpha1" + "github.com/banzaicloud/koperator/pkg/kafkaclient" +) + +// KafkaClient is a Testify mock for Kafka client +type KafkaClient struct { + mock.Mock +} + +func (k *KafkaClient) NumBrokers() int { + args := k.Called() + return args.Int(0) +} + +func (k *KafkaClient) ListTopics() (map[string]sarama.TopicDetail, error) { + args := k.Called() + return args.Get(0).(map[string]sarama.TopicDetail), args.Error(1) +} + +func (k *KafkaClient) CreateTopic(createTopicOptions *kafkaclient.CreateTopicOptions) error { + args := k.Called(createTopicOptions) + return args.Error(0) +} + +func (k *KafkaClient) EnsurePartitionCount(topic string, partitionCount int32) (bool, error) { + args := k.Called(topic, partitionCount) + return args.Bool(0), args.Error(1) +} + +func (k *KafkaClient) EnsureTopicConfig(topic string, config map[string]*string) error { + args := k.Called(topic, config) + return args.Error(0) +} + +func (k *KafkaClient) DeleteTopic(topic string, force bool) error { + args := k.Called(topic, force) + return args.Error(0) +} + +func (k *KafkaClient) GetTopic(topic string) (*sarama.TopicDetail, error) { + args := k.Called(topic) + return args.Get(0).(*sarama.TopicDetail), args.Error(1) +} + +func (k *KafkaClient) DescribeTopic(topic string) (*sarama.TopicMetadata, error) { + args := k.Called(topic) + return args.Get(0).(*sarama.TopicMetadata), args.Error(1) +} + +func (k *KafkaClient) CreateUserACLs(accessType v1alpha1.KafkaAccessType, patternType v1alpha1.KafkaPatternType, dn string, topic string) (err error) { + args := k.Called(accessType, patternType, dn, topic) + return args.Error(0) +} + +func (k *KafkaClient) ListUserACLs() (acls []sarama.ResourceAcls, err error) { + args := k.Called() + return args.Get(0).([]sarama.ResourceAcls), args.Error(1) +} + +func (k *KafkaClient) DeleteUserACLs(dn string, patternType v1alpha1.KafkaPatternType) (err error) { + args := k.Called(dn, patternType) + return args.Error(0) +} + +func (k *KafkaClient) Brokers() map[int32]string { + args := k.Called() + return args.Get(0).(map[int32]string) +} + +func (k *KafkaClient) DescribeCluster() ([]*sarama.Broker, int32, error) { + args := k.Called() + return args.Get(0).([]*sarama.Broker), args.Get(1).(int32), args.Error(2) +} + +func (k *KafkaClient) AllOfflineReplicas() ([]int32, error) { + args := k.Called() + return args.Get(0).([]int32), args.Error(1) +} + +func (k *KafkaClient) OutOfSyncReplicas() ([]int32, error) { + args := k.Called() + return args.Get(0).([]int32), args.Error(1) +} + +func (k *KafkaClient) AlterPerBrokerConfig(brokerId int32, config map[string]*string, validateOnly bool) error { + args := k.Called(brokerId, config, validateOnly) + return args.Error(0) +} + +func (k *KafkaClient) DescribePerBrokerConfig(brokerId int32, configNames []string) ([]*sarama.ConfigEntry, error) { + args := k.Called(brokerId, configNames) + return args.Get(0).([]*sarama.ConfigEntry), args.Error(1) +} + +func (k *KafkaClient) AlterClusterWideConfig(config map[string]*string, validateOnly bool) error { + args := k.Called(config, validateOnly) + return args.Error(0) +} + +func (k *KafkaClient) DescribeClusterWideConfig() ([]sarama.ConfigEntry, error) { + args := k.Called() + return args.Get(0).([]sarama.ConfigEntry), args.Error(1) +} + +func (k *KafkaClient) TopicMetaToStatus(topicMeta *sarama.TopicMetadata) *v1alpha1.KafkaTopicStatus { + args := k.Called(topicMeta) + return args.Get(0).(*v1alpha1.KafkaTopicStatus) +} + +func (k *KafkaClient) Open() error { + args := k.Called() + return args.Error(0) +} + +func (k *KafkaClient) Close() error { + args := k.Called() + return args.Error(0) +} diff --git a/pkg/resources/kafka/mocks/SubResourceClient.go b/pkg/resources/kafka/mocks/SubResourceClient.go new file mode 100644 index 000000000..a8ff33d76 --- /dev/null +++ b/pkg/resources/kafka/mocks/SubResourceClient.go @@ -0,0 +1,122 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import ( + "context" + "reflect" + + "github.com/golang/mock/gomock" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// MockSubResourceClient is a mock of SubResourceClient interface. +type MockSubResourceClient struct { + ctrl *gomock.Controller + recorder *MockSubResourceClientMockRecorder +} + +// MockSubResourceClientMockRecorder is the mock recorder for MockSubResourceClient. +type MockSubResourceClientMockRecorder struct { + mock *MockSubResourceClient +} + +// NewMockSubResourceClient creates a new mock instance. +func NewMockSubResourceClient(ctrl *gomock.Controller) *MockSubResourceClient { + mock := &MockSubResourceClient{ctrl: ctrl} + mock.recorder = &MockSubResourceClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSubResourceClient) EXPECT() *MockSubResourceClientMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockSubResourceClient) Create(arg0 context.Context, arg1, arg2 client.Object, arg3 ...client.SubResourceCreateOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Create", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Create indicates an expected call of Create. +func (mr *MockSubResourceClientMockRecorder) Create(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockSubResourceClient)(nil).Create), varargs...) +} + +// Get mocks base method. +func (m *MockSubResourceClient) Get(arg0 context.Context, arg1, arg2 client.Object, arg3 ...client.SubResourceGetOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Get", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Get indicates an expected call of Get. +func (mr *MockSubResourceClientMockRecorder) Get(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockSubResourceClient)(nil).Get), varargs...) +} + +// Patch mocks base method. +func (m *MockSubResourceClient) Patch(arg0 context.Context, arg1 client.Object, arg2 client.Patch, arg3 ...client.SubResourcePatchOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Patch", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Patch indicates an expected call of Patch. +func (mr *MockSubResourceClientMockRecorder) Patch(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Patch", reflect.TypeOf((*MockSubResourceClient)(nil).Patch), varargs...) +} + +// Update mocks base method. +func (m *MockSubResourceClient) Update(arg0 context.Context, arg1 client.Object, arg2 ...client.SubResourceUpdateOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Update", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Update indicates an expected call of Update. +func (mr *MockSubResourceClientMockRecorder) Update(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockSubResourceClient)(nil).Update), varargs...) +} From 29e0dc3e074011a1d309616fe131bb4a6e70a09d Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Fri, 21 Jul 2023 15:00:00 +0300 Subject: [PATCH 02/10] Address PR comments - part 1 --- pkg/resources/kafka/kafka.go | 53 ++++++------- pkg/resources/kafka/kafka_test.go | 125 +++++++++++++++++++++++++++++- 2 files changed, 144 insertions(+), 34 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index e58c0a13c..48e2da619 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -24,6 +24,7 @@ import ( "strings" "emperror.dev/errors" + properties "github.com/banzaicloud/koperator/properties/pkg" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -87,9 +88,6 @@ const ( nonControllerBrokerReconcilePriority // controllerBrokerReconcilePriority the priority used for controller broker used to define its priority in the reconciliation order controllerBrokerReconcilePriority - - // defaultConcurrentBrokerRestartsAllowed the default number of brokers that can be restarted in parallel - defaultConcurrentBrokerRestartsAllowed = 1 ) var ( @@ -100,10 +98,7 @@ var ( // Reconciler implements the Component Reconciler type Reconciler struct { resources.Reconciler - // kafkaClientProvider is used to create a new KafkaClient kafkaClientProvider kafkaclient.Provider - // kafkaBrokerAvailabilityZoneMap is a map of broker id to availability zone used in concurrent broker restarts logic - kafkaBrokerAvailabilityZoneMap map[int32]string } // New creates a new reconciler for Kafka @@ -114,17 +109,19 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk DirectClient: directClient, KafkaCluster: cluster, }, - kafkaClientProvider: kafkaClientProvider, - kafkaBrokerAvailabilityZoneMap: getBrokerAzMap(cluster), + kafkaClientProvider: kafkaClientProvider, } } func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string { brokerAzMap := make(map[int32]string) for _, broker := range cluster.Spec.Brokers { - brokerRack := getBrokerRack(broker.ReadOnlyConfig) - if brokerRack != "" { - brokerAzMap[broker.Id] = brokerRack + readOnlyConfigs, err := properties.NewFromString(broker.ReadOnlyConfig) + if err == nil { + brokerRack, brokerRackConfigFound := readOnlyConfigs.Get("broker.rack") + if brokerRackConfigFound { + brokerAzMap[broker.Id] = brokerRack.Value() + } } } // if incomplete broker AZ information, consider all brokers as being in different AZs @@ -916,18 +913,19 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo if err != nil { return errors.WrapIf(err, "failed to reconcile resource") } + // Check that all pods are present as in spec, before checking for terminating or pending pods, as we can have absent pods if len(podList.Items) < len(r.KafkaCluster.Spec.Brokers) { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod count differs from brokers spec"), "rolling upgrade in progress") } // Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once - concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed() terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items) - if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed { - return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") + if len(terminatingOrPendingPods) >= r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") } - currentPodAz := r.getBrokerAz(currentPod) - if concurrentBrokerRestartsAllowed > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods) { + kafkaBrokerAvailabilityZoneMap := getBrokerAzMap(r.KafkaCluster) + currentPodAz := r.getBrokerAz(currentPod, kafkaBrokerAvailabilityZoneMap) + if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods, kafkaBrokerAvailabilityZoneMap) { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating or creating from another AZ"), "rolling upgrade in progress") } @@ -965,8 +963,8 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo } // If multiple concurrent restarts and broker failures allowed, restart only brokers from the same AZ - if concurrentBrokerRestartsAllowed > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 { - if r.existsFailedBrokerFromAnotherRack(currentPodAz, impactedReplicas) { + if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 { + if r.existsFailedBrokerFromAnotherRack(currentPodAz, impactedReplicas, kafkaBrokerAvailabilityZoneMap) { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("broker is not healthy from another AZ"), "rolling upgrade in progress") } } @@ -991,31 +989,24 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return nil } -func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}) bool { +func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}, kafkaBrokerAvailabilityZoneMap map[int32]string) bool { if currentPodAz == "" && len(impactedReplicas) > 0 { return true } for brokerWithFailure := range impactedReplicas { - if currentPodAz != r.kafkaBrokerAvailabilityZoneMap[brokerWithFailure] { + if currentPodAz != kafkaBrokerAvailabilityZoneMap[brokerWithFailure] { return true } } return false } -func (r *Reconciler) getConcurrentBrokerRestartsAllowed() int { - if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 { - return r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack - } - return defaultConcurrentBrokerRestartsAllowed -} - -func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod) bool { +func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod, kafkaBrokerAvailabilityZoneMap map[int32]string) bool { if currentPodAz == "" && len(terminatingOrPendingPods) > 0 { return true } for _, terminatingOrPendingPod := range terminatingOrPendingPods { - if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod) { + if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod, kafkaBrokerAvailabilityZoneMap) { return true } } @@ -1385,12 +1376,12 @@ func getPodsInTerminatingOrPendingState(items []corev1.Pod) []corev1.Pod { return pods } -func (r *Reconciler) getBrokerAz(pod *corev1.Pod) string { +func (r *Reconciler) getBrokerAz(pod *corev1.Pod, kafkaBrokerAvailabilityZoneMap map[int32]string) string { brokerId, err := strconv.ParseInt(pod.Labels[v1beta1.BrokerIdLabelKey], 10, 32) if err != nil { return "" } - return r.kafkaBrokerAvailabilityZoneMap[int32(brokerId)] + return kafkaBrokerAvailabilityZoneMap[int32(brokerId)] } func getServiceFromExternalListener(client client.Client, cluster *v1beta1.KafkaCluster, diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 7600fd848..817c5064d 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1125,7 +1125,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { errorExpected: true, }, { - testName: "Pod is deleted if allowed concurrent restarts is not specified and failure threshold is not reached", + testName: "Pod is deleted if allowed concurrent restarts is default and failure threshold is not reached", kafkaCluster: v1beta1.KafkaCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "kafka", @@ -1140,7 +1140,8 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ - FailureThreshold: 1, + FailureThreshold: 1, + ConcurrentBrokerRestartCountPerRack: 1, }, }, Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, @@ -1262,6 +1263,124 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { outOfSyncReplicas: []int32{101}, errorExpected: false, }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if there are out-of-sync replicas in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + outOfSyncReplicas: []int32{101}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if there are offline replicas in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + allOfflineReplicas: []int32{101}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, if broker rack value contains dashes", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az-1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az-2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az-3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + errorExpected: true, + }, } mockCtrl := gomock.NewController(t) @@ -1288,7 +1407,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { // Mock kafka client mockedKafkaClient := new(mocks.KafkaClient) - mockedKafkaClient.On("AllOfflineReplicas").Return(test.outOfSyncReplicas, nil) + mockedKafkaClient.On("AllOfflineReplicas").Return(test.allOfflineReplicas, nil) mockedKafkaClient.On("OutOfSyncReplicas").Return(test.outOfSyncReplicas, nil) mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil) From 523d62d40b5b2547ab74c7ce75b782b78db5d95d Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Wed, 26 Jul 2023 09:48:02 +0300 Subject: [PATCH 03/10] Address PR comments - part 2 --- pkg/resources/kafka/kafka.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 48e2da619..3414f9c63 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -924,7 +924,7 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") } kafkaBrokerAvailabilityZoneMap := getBrokerAzMap(r.KafkaCluster) - currentPodAz := r.getBrokerAz(currentPod, kafkaBrokerAvailabilityZoneMap) + currentPodAz, _ := r.getBrokerAz(currentPod, kafkaBrokerAvailabilityZoneMap) if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods, kafkaBrokerAvailabilityZoneMap) { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating or creating from another AZ"), "rolling upgrade in progress") } @@ -1006,7 +1006,8 @@ func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, term return true } for _, terminatingOrPendingPod := range terminatingOrPendingPods { - if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod, kafkaBrokerAvailabilityZoneMap) { + terminatingOrPendingPodAz, err := r.getBrokerAz(&terminatingOrPendingPod, kafkaBrokerAvailabilityZoneMap) + if err != nil || currentPodAz != terminatingOrPendingPodAz { return true } } @@ -1376,12 +1377,12 @@ func getPodsInTerminatingOrPendingState(items []corev1.Pod) []corev1.Pod { return pods } -func (r *Reconciler) getBrokerAz(pod *corev1.Pod, kafkaBrokerAvailabilityZoneMap map[int32]string) string { +func (r *Reconciler) getBrokerAz(pod *corev1.Pod, kafkaBrokerAvailabilityZoneMap map[int32]string) (string, error) { brokerId, err := strconv.ParseInt(pod.Labels[v1beta1.BrokerIdLabelKey], 10, 32) if err != nil { - return "" + return "", err } - return kafkaBrokerAvailabilityZoneMap[int32(brokerId)] + return kafkaBrokerAvailabilityZoneMap[int32(brokerId)], nil } func getServiceFromExternalListener(client client.Client, cluster *v1beta1.KafkaCluster, From 9a397073c2b6e06b8f353f6da7ac1308ca75dc22 Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Wed, 26 Jul 2023 10:57:50 +0300 Subject: [PATCH 04/10] Address PR comments - part 3 --- Makefile | 5 + pkg/resources/kafka/kafka_test.go | 36 +- pkg/resources/kafka/mocks/KafkaClient.go | 381 ++++++++++++++---- .../kafka/mocks/SubResourceClient.go | 122 ------ 4 files changed, 334 insertions(+), 210 deletions(-) delete mode 100644 pkg/resources/kafka/mocks/SubResourceClient.go diff --git a/Makefile b/Makefile index 142b5e3e6..0b12076a5 100644 --- a/Makefile +++ b/Makefile @@ -275,3 +275,8 @@ mock-generate: bin/mockgen -package mocks \ -destination pkg/resources/kafka/mocks/Client.go \ sigs.k8s.io/controller-runtime/pkg/client Client + $(BIN_DIR)/mockgen \ + -copyright_file $(BOILERPLATE_DIR)/header.generated.txt \ + -package mocks \ + -destination pkg/resources/kafka/mocks/KafkaClient.go \ + -source pkg/kafkaclient/client.go diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 817c5064d..9b557a27c 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1121,8 +1121,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, - outOfSyncReplicas: []int32{101}, - errorExpected: true, + errorExpected: true, }, { testName: "Pod is deleted if allowed concurrent restarts is default and failure threshold is not reached", @@ -1156,7 +1155,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, - errorExpected: false, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{}, + errorExpected: false, }, { testName: "Pod is not deleted if pod is restarting in another AZ, even if allowed concurrent restarts is not reached", @@ -1190,8 +1191,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, - outOfSyncReplicas: []int32{201}, - errorExpected: true, + errorExpected: true, }, { testName: "Pod is not deleted if failure is in another AZ, even if allowed concurrent restarts is not reached", @@ -1225,8 +1225,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, - outOfSyncReplicas: []int32{201}, - errorExpected: true, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{201}, + errorExpected: true, }, { testName: "Pod is deleted if failure is in same AZ and allowed concurrent restarts is not reached", @@ -1260,8 +1261,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, - outOfSyncReplicas: []int32{101}, - errorExpected: false, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{101}, + errorExpected: false, }, { testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures", @@ -1319,8 +1321,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, - outOfSyncReplicas: []int32{101}, - errorExpected: true, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{101}, + errorExpected: true, }, { testName: "Pod is not deleted if there are offline replicas in another AZ, if brokers per AZ < tolerated failures", @@ -1350,6 +1353,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, allOfflineReplicas: []int32{101}, + outOfSyncReplicas: []int32{}, errorExpected: true, }, { @@ -1406,9 +1410,13 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { } // Mock kafka client - mockedKafkaClient := new(mocks.KafkaClient) - mockedKafkaClient.On("AllOfflineReplicas").Return(test.allOfflineReplicas, nil) - mockedKafkaClient.On("OutOfSyncReplicas").Return(test.outOfSyncReplicas, nil) + mockedKafkaClient := mocks.NewMockKafkaClient(mockCtrl) + if test.allOfflineReplicas != nil { + mockedKafkaClient.EXPECT().AllOfflineReplicas().Return(test.allOfflineReplicas, nil) + } + if test.outOfSyncReplicas != nil { + mockedKafkaClient.EXPECT().OutOfSyncReplicas().Return(test.outOfSyncReplicas, nil) + } mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil) // Call the handleRollingUpgrade function with the provided test.desiredPod and test.currentPod diff --git a/pkg/resources/kafka/mocks/KafkaClient.go b/pkg/resources/kafka/mocks/KafkaClient.go index efcb719e8..e2b10acdd 100644 --- a/pkg/resources/kafka/mocks/KafkaClient.go +++ b/pkg/resources/kafka/mocks/KafkaClient.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// Copyright 2023 Cisco Systems, Inc. and/or its affiliates // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,128 +11,361 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/kafkaclient/client.go +// Package mocks is a generated GoMock package. package mocks import ( - "github.com/Shopify/sarama" - "github.com/stretchr/testify/mock" + reflect "reflect" - "github.com/banzaicloud/koperator/api/v1alpha1" - "github.com/banzaicloud/koperator/pkg/kafkaclient" + sarama "github.com/Shopify/sarama" + v1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1" + kafkaclient "github.com/banzaicloud/koperator/pkg/kafkaclient" + gomock "github.com/golang/mock/gomock" ) -// KafkaClient is a Testify mock for Kafka client -type KafkaClient struct { - mock.Mock +// MockKafkaClient is a mock of KafkaClient interface. +type MockKafkaClient struct { + ctrl *gomock.Controller + recorder *MockKafkaClientMockRecorder +} + +// MockKafkaClientMockRecorder is the mock recorder for MockKafkaClient. +type MockKafkaClientMockRecorder struct { + mock *MockKafkaClient +} + +// NewMockKafkaClient creates a new mock instance. +func NewMockKafkaClient(ctrl *gomock.Controller) *MockKafkaClient { + mock := &MockKafkaClient{ctrl: ctrl} + mock.recorder = &MockKafkaClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockKafkaClient) EXPECT() *MockKafkaClientMockRecorder { + return m.recorder +} + +// AllOfflineReplicas mocks base method. +func (m *MockKafkaClient) AllOfflineReplicas() ([]int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AllOfflineReplicas") + ret0, _ := ret[0].([]int32) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AllOfflineReplicas indicates an expected call of AllOfflineReplicas. +func (mr *MockKafkaClientMockRecorder) AllOfflineReplicas() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllOfflineReplicas", reflect.TypeOf((*MockKafkaClient)(nil).AllOfflineReplicas)) +} + +// AlterClusterWideConfig mocks base method. +func (m *MockKafkaClient) AlterClusterWideConfig(arg0 map[string]*string, arg1 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AlterClusterWideConfig", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AlterClusterWideConfig indicates an expected call of AlterClusterWideConfig. +func (mr *MockKafkaClientMockRecorder) AlterClusterWideConfig(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AlterClusterWideConfig", reflect.TypeOf((*MockKafkaClient)(nil).AlterClusterWideConfig), arg0, arg1) +} + +// AlterPerBrokerConfig mocks base method. +func (m *MockKafkaClient) AlterPerBrokerConfig(arg0 int32, arg1 map[string]*string, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AlterPerBrokerConfig", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AlterPerBrokerConfig indicates an expected call of AlterPerBrokerConfig. +func (mr *MockKafkaClientMockRecorder) AlterPerBrokerConfig(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AlterPerBrokerConfig", reflect.TypeOf((*MockKafkaClient)(nil).AlterPerBrokerConfig), arg0, arg1, arg2) +} + +// Brokers mocks base method. +func (m *MockKafkaClient) Brokers() map[int32]string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Brokers") + ret0, _ := ret[0].(map[int32]string) + return ret0 +} + +// Brokers indicates an expected call of Brokers. +func (mr *MockKafkaClientMockRecorder) Brokers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Brokers", reflect.TypeOf((*MockKafkaClient)(nil).Brokers)) +} + +// Close mocks base method. +func (m *MockKafkaClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockKafkaClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockKafkaClient)(nil).Close)) +} + +// CreateTopic mocks base method. +func (m *MockKafkaClient) CreateTopic(arg0 *kafkaclient.CreateTopicOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateTopic", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateTopic indicates an expected call of CreateTopic. +func (mr *MockKafkaClientMockRecorder) CreateTopic(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTopic", reflect.TypeOf((*MockKafkaClient)(nil).CreateTopic), arg0) +} + +// CreateUserACLs mocks base method. +func (m *MockKafkaClient) CreateUserACLs(arg0 v1alpha1.KafkaAccessType, arg1 v1alpha1.KafkaPatternType, arg2, arg3 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateUserACLs", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateUserACLs indicates an expected call of CreateUserACLs. +func (mr *MockKafkaClientMockRecorder) CreateUserACLs(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateUserACLs", reflect.TypeOf((*MockKafkaClient)(nil).CreateUserACLs), arg0, arg1, arg2, arg3) +} + +// DeleteTopic mocks base method. +func (m *MockKafkaClient) DeleteTopic(arg0 string, arg1 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteTopic", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTopic indicates an expected call of DeleteTopic. +func (mr *MockKafkaClientMockRecorder) DeleteTopic(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTopic", reflect.TypeOf((*MockKafkaClient)(nil).DeleteTopic), arg0, arg1) +} + +// DeleteUserACLs mocks base method. +func (m *MockKafkaClient) DeleteUserACLs(arg0 string, arg1 v1alpha1.KafkaPatternType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteUserACLs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteUserACLs indicates an expected call of DeleteUserACLs. +func (mr *MockKafkaClientMockRecorder) DeleteUserACLs(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteUserACLs", reflect.TypeOf((*MockKafkaClient)(nil).DeleteUserACLs), arg0, arg1) +} + +// DescribeCluster mocks base method. +func (m *MockKafkaClient) DescribeCluster() ([]*sarama.Broker, int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeCluster") + ret0, _ := ret[0].([]*sarama.Broker) + ret1, _ := ret[1].(int32) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// DescribeCluster indicates an expected call of DescribeCluster. +func (mr *MockKafkaClientMockRecorder) DescribeCluster() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeCluster", reflect.TypeOf((*MockKafkaClient)(nil).DescribeCluster)) +} + +// DescribeClusterWideConfig mocks base method. +func (m *MockKafkaClient) DescribeClusterWideConfig() ([]sarama.ConfigEntry, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeClusterWideConfig") + ret0, _ := ret[0].([]sarama.ConfigEntry) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeClusterWideConfig indicates an expected call of DescribeClusterWideConfig. +func (mr *MockKafkaClientMockRecorder) DescribeClusterWideConfig() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeClusterWideConfig", reflect.TypeOf((*MockKafkaClient)(nil).DescribeClusterWideConfig)) } -func (k *KafkaClient) NumBrokers() int { - args := k.Called() - return args.Int(0) +// DescribePerBrokerConfig mocks base method. +func (m *MockKafkaClient) DescribePerBrokerConfig(arg0 int32, arg1 []string) ([]*sarama.ConfigEntry, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribePerBrokerConfig", arg0, arg1) + ret0, _ := ret[0].([]*sarama.ConfigEntry) + ret1, _ := ret[1].(error) + return ret0, ret1 } -func (k *KafkaClient) ListTopics() (map[string]sarama.TopicDetail, error) { - args := k.Called() - return args.Get(0).(map[string]sarama.TopicDetail), args.Error(1) +// DescribePerBrokerConfig indicates an expected call of DescribePerBrokerConfig. +func (mr *MockKafkaClientMockRecorder) DescribePerBrokerConfig(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribePerBrokerConfig", reflect.TypeOf((*MockKafkaClient)(nil).DescribePerBrokerConfig), arg0, arg1) } -func (k *KafkaClient) CreateTopic(createTopicOptions *kafkaclient.CreateTopicOptions) error { - args := k.Called(createTopicOptions) - return args.Error(0) +// DescribeTopic mocks base method. +func (m *MockKafkaClient) DescribeTopic(arg0 string) (*sarama.TopicMetadata, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeTopic", arg0) + ret0, _ := ret[0].(*sarama.TopicMetadata) + ret1, _ := ret[1].(error) + return ret0, ret1 } -func (k *KafkaClient) EnsurePartitionCount(topic string, partitionCount int32) (bool, error) { - args := k.Called(topic, partitionCount) - return args.Bool(0), args.Error(1) +// DescribeTopic indicates an expected call of DescribeTopic. +func (mr *MockKafkaClientMockRecorder) DescribeTopic(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeTopic", reflect.TypeOf((*MockKafkaClient)(nil).DescribeTopic), arg0) } -func (k *KafkaClient) EnsureTopicConfig(topic string, config map[string]*string) error { - args := k.Called(topic, config) - return args.Error(0) +// EnsurePartitionCount mocks base method. +func (m *MockKafkaClient) EnsurePartitionCount(arg0 string, arg1 int32) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnsurePartitionCount", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 } -func (k *KafkaClient) DeleteTopic(topic string, force bool) error { - args := k.Called(topic, force) - return args.Error(0) +// EnsurePartitionCount indicates an expected call of EnsurePartitionCount. +func (mr *MockKafkaClientMockRecorder) EnsurePartitionCount(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsurePartitionCount", reflect.TypeOf((*MockKafkaClient)(nil).EnsurePartitionCount), arg0, arg1) } -func (k *KafkaClient) GetTopic(topic string) (*sarama.TopicDetail, error) { - args := k.Called(topic) - return args.Get(0).(*sarama.TopicDetail), args.Error(1) +// EnsureTopicConfig mocks base method. +func (m *MockKafkaClient) EnsureTopicConfig(arg0 string, arg1 map[string]*string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnsureTopicConfig", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 } -func (k *KafkaClient) DescribeTopic(topic string) (*sarama.TopicMetadata, error) { - args := k.Called(topic) - return args.Get(0).(*sarama.TopicMetadata), args.Error(1) +// EnsureTopicConfig indicates an expected call of EnsureTopicConfig. +func (mr *MockKafkaClientMockRecorder) EnsureTopicConfig(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureTopicConfig", reflect.TypeOf((*MockKafkaClient)(nil).EnsureTopicConfig), arg0, arg1) } -func (k *KafkaClient) CreateUserACLs(accessType v1alpha1.KafkaAccessType, patternType v1alpha1.KafkaPatternType, dn string, topic string) (err error) { - args := k.Called(accessType, patternType, dn, topic) - return args.Error(0) +// GetTopic mocks base method. +func (m *MockKafkaClient) GetTopic(arg0 string) (*sarama.TopicDetail, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTopic", arg0) + ret0, _ := ret[0].(*sarama.TopicDetail) + ret1, _ := ret[1].(error) + return ret0, ret1 } -func (k *KafkaClient) ListUserACLs() (acls []sarama.ResourceAcls, err error) { - args := k.Called() - return args.Get(0).([]sarama.ResourceAcls), args.Error(1) +// GetTopic indicates an expected call of GetTopic. +func (mr *MockKafkaClientMockRecorder) GetTopic(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTopic", reflect.TypeOf((*MockKafkaClient)(nil).GetTopic), arg0) } -func (k *KafkaClient) DeleteUserACLs(dn string, patternType v1alpha1.KafkaPatternType) (err error) { - args := k.Called(dn, patternType) - return args.Error(0) +// ListTopics mocks base method. +func (m *MockKafkaClient) ListTopics() (map[string]sarama.TopicDetail, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListTopics") + ret0, _ := ret[0].(map[string]sarama.TopicDetail) + ret1, _ := ret[1].(error) + return ret0, ret1 } -func (k *KafkaClient) Brokers() map[int32]string { - args := k.Called() - return args.Get(0).(map[int32]string) +// ListTopics indicates an expected call of ListTopics. +func (mr *MockKafkaClientMockRecorder) ListTopics() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTopics", reflect.TypeOf((*MockKafkaClient)(nil).ListTopics)) } -func (k *KafkaClient) DescribeCluster() ([]*sarama.Broker, int32, error) { - args := k.Called() - return args.Get(0).([]*sarama.Broker), args.Get(1).(int32), args.Error(2) +// ListUserACLs mocks base method. +func (m *MockKafkaClient) ListUserACLs() ([]sarama.ResourceAcls, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListUserACLs") + ret0, _ := ret[0].([]sarama.ResourceAcls) + ret1, _ := ret[1].(error) + return ret0, ret1 } -func (k *KafkaClient) AllOfflineReplicas() ([]int32, error) { - args := k.Called() - return args.Get(0).([]int32), args.Error(1) +// ListUserACLs indicates an expected call of ListUserACLs. +func (mr *MockKafkaClientMockRecorder) ListUserACLs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListUserACLs", reflect.TypeOf((*MockKafkaClient)(nil).ListUserACLs)) } -func (k *KafkaClient) OutOfSyncReplicas() ([]int32, error) { - args := k.Called() - return args.Get(0).([]int32), args.Error(1) +// NumBrokers mocks base method. +func (m *MockKafkaClient) NumBrokers() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NumBrokers") + ret0, _ := ret[0].(int) + return ret0 } -func (k *KafkaClient) AlterPerBrokerConfig(brokerId int32, config map[string]*string, validateOnly bool) error { - args := k.Called(brokerId, config, validateOnly) - return args.Error(0) +// NumBrokers indicates an expected call of NumBrokers. +func (mr *MockKafkaClientMockRecorder) NumBrokers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NumBrokers", reflect.TypeOf((*MockKafkaClient)(nil).NumBrokers)) } -func (k *KafkaClient) DescribePerBrokerConfig(brokerId int32, configNames []string) ([]*sarama.ConfigEntry, error) { - args := k.Called(brokerId, configNames) - return args.Get(0).([]*sarama.ConfigEntry), args.Error(1) +// Open mocks base method. +func (m *MockKafkaClient) Open() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open") + ret0, _ := ret[0].(error) + return ret0 } -func (k *KafkaClient) AlterClusterWideConfig(config map[string]*string, validateOnly bool) error { - args := k.Called(config, validateOnly) - return args.Error(0) +// Open indicates an expected call of Open. +func (mr *MockKafkaClientMockRecorder) Open() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockKafkaClient)(nil).Open)) } -func (k *KafkaClient) DescribeClusterWideConfig() ([]sarama.ConfigEntry, error) { - args := k.Called() - return args.Get(0).([]sarama.ConfigEntry), args.Error(1) +// OutOfSyncReplicas mocks base method. +func (m *MockKafkaClient) OutOfSyncReplicas() ([]int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OutOfSyncReplicas") + ret0, _ := ret[0].([]int32) + ret1, _ := ret[1].(error) + return ret0, ret1 } -func (k *KafkaClient) TopicMetaToStatus(topicMeta *sarama.TopicMetadata) *v1alpha1.KafkaTopicStatus { - args := k.Called(topicMeta) - return args.Get(0).(*v1alpha1.KafkaTopicStatus) +// OutOfSyncReplicas indicates an expected call of OutOfSyncReplicas. +func (mr *MockKafkaClientMockRecorder) OutOfSyncReplicas() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OutOfSyncReplicas", reflect.TypeOf((*MockKafkaClient)(nil).OutOfSyncReplicas)) } -func (k *KafkaClient) Open() error { - args := k.Called() - return args.Error(0) +// TopicMetaToStatus mocks base method. +func (m *MockKafkaClient) TopicMetaToStatus(meta *sarama.TopicMetadata) *v1alpha1.KafkaTopicStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TopicMetaToStatus", meta) + ret0, _ := ret[0].(*v1alpha1.KafkaTopicStatus) + return ret0 } -func (k *KafkaClient) Close() error { - args := k.Called() - return args.Error(0) +// TopicMetaToStatus indicates an expected call of TopicMetaToStatus. +func (mr *MockKafkaClientMockRecorder) TopicMetaToStatus(meta interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TopicMetaToStatus", reflect.TypeOf((*MockKafkaClient)(nil).TopicMetaToStatus), meta) } diff --git a/pkg/resources/kafka/mocks/SubResourceClient.go b/pkg/resources/kafka/mocks/SubResourceClient.go deleted file mode 100644 index a8ff33d76..000000000 --- a/pkg/resources/kafka/mocks/SubResourceClient.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mocks - -import ( - "context" - "reflect" - - "github.com/golang/mock/gomock" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// MockSubResourceClient is a mock of SubResourceClient interface. -type MockSubResourceClient struct { - ctrl *gomock.Controller - recorder *MockSubResourceClientMockRecorder -} - -// MockSubResourceClientMockRecorder is the mock recorder for MockSubResourceClient. -type MockSubResourceClientMockRecorder struct { - mock *MockSubResourceClient -} - -// NewMockSubResourceClient creates a new mock instance. -func NewMockSubResourceClient(ctrl *gomock.Controller) *MockSubResourceClient { - mock := &MockSubResourceClient{ctrl: ctrl} - mock.recorder = &MockSubResourceClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockSubResourceClient) EXPECT() *MockSubResourceClientMockRecorder { - return m.recorder -} - -// Create mocks base method. -func (m *MockSubResourceClient) Create(arg0 context.Context, arg1, arg2 client.Object, arg3 ...client.SubResourceCreateOption) error { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1, arg2} - for _, a := range arg3 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "Create", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// Create indicates an expected call of Create. -func (mr *MockSubResourceClientMockRecorder) Create(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockSubResourceClient)(nil).Create), varargs...) -} - -// Get mocks base method. -func (m *MockSubResourceClient) Get(arg0 context.Context, arg1, arg2 client.Object, arg3 ...client.SubResourceGetOption) error { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1, arg2} - for _, a := range arg3 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "Get", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// Get indicates an expected call of Get. -func (mr *MockSubResourceClientMockRecorder) Get(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockSubResourceClient)(nil).Get), varargs...) -} - -// Patch mocks base method. -func (m *MockSubResourceClient) Patch(arg0 context.Context, arg1 client.Object, arg2 client.Patch, arg3 ...client.SubResourcePatchOption) error { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1, arg2} - for _, a := range arg3 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "Patch", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// Patch indicates an expected call of Patch. -func (mr *MockSubResourceClientMockRecorder) Patch(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Patch", reflect.TypeOf((*MockSubResourceClient)(nil).Patch), varargs...) -} - -// Update mocks base method. -func (m *MockSubResourceClient) Update(arg0 context.Context, arg1 client.Object, arg2 ...client.SubResourceUpdateOption) error { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "Update", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// Update indicates an expected call of Update. -func (mr *MockSubResourceClientMockRecorder) Update(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockSubResourceClient)(nil).Update), varargs...) -} From 9306fcf6ed0ae3310a4f29ba5ce04cfa70874bed Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Wed, 26 Jul 2023 13:18:19 +0300 Subject: [PATCH 05/10] Address PR comments - RackAwareDistributionGoal --- .../cruisecontroloperation_controller_test.go | 14 +-- .../cruisecontroltask_controller_test.go | 16 +-- controllers/tests/mocks/scale_factory.go | 28 ++++++ pkg/resources/kafka/kafka.go | 41 +++++++- pkg/resources/kafka/kafka_test.go | 97 ++++++++++++++++++- pkg/scale/scale.go | 1 + pkg/scale/types.go | 1 + 7 files changed, 174 insertions(+), 24 deletions(-) create mode 100644 controllers/tests/mocks/scale_factory.go diff --git a/controllers/tests/cruisecontroloperation_controller_test.go b/controllers/tests/cruisecontroloperation_controller_test.go index 03f7e14af..11690d62b 100644 --- a/controllers/tests/cruisecontroloperation_controller_test.go +++ b/controllers/tests/cruisecontroloperation_controller_test.go @@ -77,7 +77,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("there is an add_broker operation for execution", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock1()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock1()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) @@ -105,7 +105,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("add_broker operation is finished with completedWithError and 30s has not elapsed", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock2()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock2()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) @@ -132,7 +132,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("add_broker operation is finished with completedWithError and 30s has elapsed", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock5()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock5()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) @@ -161,7 +161,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("there is an errored remove_broker and an add_broker operation", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock3()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock3()) // First operation will get completedWithError operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) @@ -208,7 +208,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("there is a new remove_broker and an errored remove_broker operation with pause annotation", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) operation.Labels["pause"] = "true" err := k8sClient.Create(ctx, &operation) @@ -257,7 +257,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("there is a new remove_broker and an errored remove_broker operation with ignore ErrorPolicy", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4()) // Creating first operation operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) operation.Spec.ErrorPolicy = v1alpha1.ErrorPolicyIgnore @@ -307,7 +307,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("Cruise Control makes the Status operation async", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock7()) operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) diff --git a/controllers/tests/cruisecontroltask_controller_test.go b/controllers/tests/cruisecontroltask_controller_test.go index 1b99978a2..8547de098 100644 --- a/controllers/tests/cruisecontroltask_controller_test.go +++ b/controllers/tests/cruisecontroltask_controller_test.go @@ -88,7 +88,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { When("new storage is added", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { if err := k8sClient.Get(ctx, types.NamespacedName{ @@ -147,7 +147,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("new storage is added but there is a not JBOD capacityConfig for that", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, @@ -227,7 +227,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("new storage is added and one broker is JBOD and another is not JBOD", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, @@ -312,7 +312,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("new broker is added", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1()) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1()) err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, @@ -400,7 +400,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("a broker is removed", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1()) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1()) err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { if err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, @@ -447,12 +447,6 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) -func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) { - return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) { - return mock, nil - } -} - func getScaleMockCCTask1() *mocks.MockCruiseControlScaler { mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) diff --git a/controllers/tests/mocks/scale_factory.go b/controllers/tests/mocks/scale_factory.go new file mode 100644 index 000000000..ca707b402 --- /dev/null +++ b/controllers/tests/mocks/scale_factory.go @@ -0,0 +1,28 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import ( + "context" + + "github.com/banzaicloud/koperator/api/v1beta1" + "github.com/banzaicloud/koperator/pkg/scale" +) + +func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) { + return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) { + return mock, nil + } +} diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 3414f9c63..d58ee5c46 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -24,8 +24,10 @@ import ( "strings" "emperror.dev/errors" + types2 "github.com/banzaicloud/go-cruise-control/pkg/types" properties "github.com/banzaicloud/koperator/properties/pkg" "github.com/go-logr/logr" + "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,6 +41,7 @@ import ( "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" + banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" "github.com/banzaicloud/koperator/pkg/jmxextractor" "github.com/banzaicloud/koperator/pkg/k8sutil" @@ -98,7 +101,8 @@ var ( // Reconciler implements the Component Reconciler type Reconciler struct { resources.Reconciler - kafkaClientProvider kafkaclient.Provider + kafkaClientProvider kafkaclient.Provider + CruiseControlScalerFactory func(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster) (scale.CruiseControlScaler, error) } // New creates a new reconciler for Kafka @@ -109,7 +113,8 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk DirectClient: directClient, KafkaCluster: cluster, }, - kafkaClientProvider: kafkaClientProvider, + kafkaClientProvider: kafkaClientProvider, + CruiseControlScalerFactory: scale.ScaleFactoryFn(), } } @@ -471,12 +476,11 @@ func (r *Reconciler) reconcileKafkaPodDelete(ctx context.Context, log logr.Logge if len(podsDeletedFromSpec) > 0 { if !arePodsAlreadyDeleted(podsDeletedFromSpec, log) { - cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster) // FIXME: we should reuse the context of the Kafka Controller - cc, err := scale.NewCruiseControlScaler(context.TODO(), scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster)) + cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster) if err != nil { return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, - "failed to initialize Cruise Control Scaler", "cruise control url", cruiseControlURL) + "failed to initialize Cruise Control Scaler", "cruise control url", scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster)) } brokerStates := []scale.KafkaBrokerState{ @@ -923,6 +927,12 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo if len(terminatingOrPendingPods) >= r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") } + if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && len(terminatingOrPendingPods) > 0 { + err = r.isCruiseControlRackAwareDistributionGoalViolated() + if err != nil { + return err + } + } kafkaBrokerAvailabilityZoneMap := getBrokerAzMap(r.KafkaCluster) currentPodAz, _ := r.getBrokerAz(currentPod, kafkaBrokerAvailabilityZoneMap) if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods, kafkaBrokerAvailabilityZoneMap) { @@ -989,6 +999,27 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return nil } +func (r *Reconciler) isCruiseControlRackAwareDistributionGoalViolated() error { + cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster) + cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster) + if err != nil { + return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, "failed to initialize Cruise Control", "cruise control url", cruiseControlURL) + } + status, err := cc.Status(context.TODO()) + if err != nil { + return errorfactory.New(errorfactory.CruiseControlNotReady{}, errors.New("failed to get status from Cruise Control"), "rolling upgrade in progress") + } + if !slices.Contains(status.Result.AnalyzerState.ReadyGoals, types2.RackAwareDistributionGoal) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is not ready"), "rolling upgrade in progress") + } + for _, anomaly := range status.Result.AnomalyDetectorState.RecentGoalViolations { + if slices.Contains(anomaly.FixableViolatedGoals, types2.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, types2.RackAwareDistributionGoal) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is violated"), "rolling upgrade in progress") + } + } + return nil +} + func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}, kafkaBrokerAvailabilityZoneMap map[int32]string) bool { if currentPodAz == "" && len(impactedReplicas) > 0 { return true diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 9b557a27c..52a9253da 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -21,6 +21,8 @@ import ( "time" "emperror.dev/errors" + ccTypes "github.com/banzaicloud/go-cruise-control/pkg/types" + "github.com/banzaicloud/koperator/pkg/scale" "github.com/go-logr/logr" "github.com/golang/mock/gomock" "github.com/onsi/gomega" @@ -36,6 +38,7 @@ import ( "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" + controllerMocks "github.com/banzaicloud/koperator/controllers/tests/mocks" "github.com/banzaicloud/koperator/pkg/resources" mocks "github.com/banzaicloud/koperator/pkg/resources/kafka/mocks" ) @@ -966,6 +969,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { pods []corev1.Pod allOfflineReplicas []int32 outOfSyncReplicas []int32 + ccStatus *scale.StatusTaskResult errorExpected bool }{ { @@ -1055,6 +1059,14 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, + ccStatus: &scale.StatusTaskResult{ + Result: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, errorExpected: true, }, { @@ -1088,6 +1100,14 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, + ccStatus: &scale.StatusTaskResult{ + Result: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, errorExpected: true, }, { @@ -1191,6 +1211,14 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, + ccStatus: &scale.StatusTaskResult{ + Result: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, errorExpected: true, }, { @@ -1229,6 +1257,42 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { outOfSyncReplicas: []int32{201}, errorExpected: true, }, + { + testName: "Pod is deleted if all pods are running and CC RackAwareDistributionGoal is not ready and allowed concurrent restarts is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{101}, + errorExpected: false, + }, { testName: "Pod is deleted if failure is in same AZ and allowed concurrent restarts is not reached", kafkaCluster: v1beta1.KafkaCluster{ @@ -1263,7 +1327,15 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { }, allOfflineReplicas: []int32{}, outOfSyncReplicas: []int32{101}, - errorExpected: false, + ccStatus: &scale.StatusTaskResult{ + Result: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, + errorExpected: false, }, { testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures", @@ -1292,6 +1364,14 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, + ccStatus: &scale.StatusTaskResult{ + Result: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, errorExpected: true, }, { @@ -1383,6 +1463,14 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, + ccStatus: &scale.StatusTaskResult{ + Result: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, errorExpected: true, }, } @@ -1419,6 +1507,13 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { } mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil) + // Mock Cruise Control client + mockCruiseControl := controllerMocks.NewMockCruiseControlScaler(mockCtrl) + if test.ccStatus != nil { + mockCruiseControl.EXPECT().Status(context.TODO()).Return(*test.ccStatus, nil) + } + r.CruiseControlScalerFactory = controllerMocks.NewMockScaleFactory(mockCruiseControl) + // Call the handleRollingUpgrade function with the provided test.desiredPod and test.currentPod err := r.handleRollingUpgrade(logf.Log, test.desiredPod, test.currentPod, reflect.TypeOf(test.desiredPod)) diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index 5fcac7ea3..9dd4d9743 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -137,6 +137,7 @@ func (cc *cruiseControlScaler) Status(ctx context.Context) (StatusTaskResult, er State: v1beta1.CruiseControlTaskCompleted, }, Status: &status, + Result: resp.Result, }, nil } diff --git a/pkg/scale/types.go b/pkg/scale/types.go index c383e30d5..4ea945a02 100644 --- a/pkg/scale/types.go +++ b/pkg/scale/types.go @@ -64,6 +64,7 @@ const ( type StatusTaskResult struct { TaskResult *Result Status *CruiseControlStatus + Result *types.StateResult } // CruiseControlStatus struct is used to describe internal state of Cruise Control. From 1ede45523df4b2596d1bdc20cee9a83ee692cc26 Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Thu, 27 Jul 2023 16:56:52 +0300 Subject: [PATCH 06/10] Address PR comments - part 5 --- pkg/resources/kafka/kafka.go | 11 ----- pkg/resources/kafka/kafka_test.go | 82 +++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 11 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index d58ee5c46..379308fde 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -138,17 +138,6 @@ func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string { return brokerAzMap } -func getBrokerRack(readOnlyConfig string) string { - if readOnlyConfig == "" { - return "" - } - match := kafkaConfigBrokerRackRegex.FindStringSubmatch(readOnlyConfig) - if len(match) == 2 { - return match[1] - } - return "" -} - func getCreatedPvcForBroker( ctx context.Context, c client.Reader, diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 52a9253da..3ddd893d0 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1526,3 +1526,85 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { }) } } + +func TestGetBrokerAzMap(t *testing.T) { + t.Parallel() + testCases := []struct { + testName string + kafkaCluster v1beta1.KafkaCluster + expectedAzMap map[int32]string + }{ + { + testName: "Brokers have different AZs if no broker rack value is set", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: ""}, + {Id: 201, ReadOnlyConfig: ""}, + {Id: 301, ReadOnlyConfig: ""}, + }, + }, + }, + expectedAzMap: map[int32]string{101: "101", 201: "201", 301: "301"}, + }, + { + testName: "Brokers have different AZs if one broker has no broker rack value set", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: ""}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}, + }, + }, + }, + expectedAzMap: map[int32]string{101: "101", 102: "102", 201: "201", 202: "202", 301: "301", 302: "302"}, + }, + { + testName: "Brokers have different AZs if read only configs is a corrupted string for one broker", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack;az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + }, + }, + expectedAzMap: map[int32]string{101: "101", 201: "201", 301: "301"}, + }, + { + testName: "Brokers have correct AZs if read only configs is valid for all brokers", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az-1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az-1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az-2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az-2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az-3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az-3"}, + }, + }, + }, + expectedAzMap: map[int32]string{ + 101: "az-1", + 102: "az-1", + 201: "az-2", + 202: "az-2", + 301: "az-3", + 302: "az-3", + }, + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + azMap := getBrokerAzMap(&test.kafkaCluster) + assert.Equal(t, test.expectedAzMap, azMap) + }) + } +} From 997f46e83ec3d8270ef9508bdc4ca207291fd444 Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Fri, 28 Jul 2023 10:44:37 +0300 Subject: [PATCH 07/10] Update API dependency --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 712619d4d..2d7ef2c1f 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/banzaicloud/istio-client-go v0.0.17 github.com/banzaicloud/istio-operator/api/v2 v2.15.1 github.com/banzaicloud/k8s-objectmatcher v1.8.0 - github.com/banzaicloud/koperator/api v0.28.6 + github.com/banzaicloud/koperator/api v0.28.7 github.com/banzaicloud/koperator/properties v0.4.1 github.com/cert-manager/cert-manager v1.11.2 github.com/cisco-open/cluster-registry-controller/api v0.2.5 diff --git a/go.sum b/go.sum index 604721f0c..3b2fa7f1d 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg= github.com/banzaicloud/koperator/api v0.28.6 h1:ZsOAXAsg34O78qVCEHx84cdp57HlCje6zjzXHhvtXf4= github.com/banzaicloud/koperator/api v0.28.6/go.mod h1:AGGQ+aTBklaaG8ErotNPlP/nS47MYLc/jFVW7AsDiEE= +github.com/banzaicloud/koperator/api v0.28.7 h1:G6ICLzuz6Tumcsl9ZaqZ46ccwdAc1rXjidP03v6Kqp4= +github.com/banzaicloud/koperator/api v0.28.7/go.mod h1:AGGQ+aTBklaaG8ErotNPlP/nS47MYLc/jFVW7AsDiEE= github.com/banzaicloud/koperator/properties v0.4.1 h1:SB2QgXlcK1Dc7Z1rg65PJifErDa8OQnoWCCJgmC7SGc= github.com/banzaicloud/koperator/properties v0.4.1/go.mod h1:TcL+llxuhW3UeQtVEDYEXGouFLF2P+LuZZVudSb6jyA= github.com/banzaicloud/operator-tools v0.28.0 h1:GSfc0qZr6zo7WrNxdgWZE1LcTChPU8QFYOTDirYVtIM= From e861dc7e03f0b8a4c2ad9c1ed56e0508bc43ac32 Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Fri, 28 Jul 2023 10:55:40 +0300 Subject: [PATCH 08/10] Fix linting errors --- pkg/resources/kafka/kafka.go | 9 ++------- pkg/resources/kafka/kafka_test.go | 3 ++- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 379308fde..698b1e75d 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -18,14 +18,12 @@ import ( "context" "fmt" "reflect" - "regexp" "sort" "strconv" "strings" "emperror.dev/errors" types2 "github.com/banzaicloud/go-cruise-control/pkg/types" - properties "github.com/banzaicloud/koperator/properties/pkg" "github.com/go-logr/logr" "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" @@ -35,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" + properties "github.com/banzaicloud/koperator/properties/pkg" + apiutil "github.com/banzaicloud/koperator/api/util" "github.com/banzaicloud/k8s-objectmatcher/patch" @@ -93,11 +93,6 @@ const ( controllerBrokerReconcilePriority ) -var ( - // kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs - kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`) -) - // Reconciler implements the Component Reconciler type Reconciler struct { resources.Reconciler diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 3ddd893d0..92290400c 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -22,7 +22,6 @@ import ( "emperror.dev/errors" ccTypes "github.com/banzaicloud/go-cruise-control/pkg/types" - "github.com/banzaicloud/koperator/pkg/scale" "github.com/go-logr/logr" "github.com/golang/mock/gomock" "github.com/onsi/gomega" @@ -30,6 +29,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/banzaicloud/koperator/pkg/scale" + "github.com/banzaicloud/koperator/pkg/kafkaclient" corev1 "k8s.io/api/core/v1" From 59dcd099a9c1cf65df74585a12fc24af21880c1c Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Fri, 28 Jul 2023 11:07:10 +0300 Subject: [PATCH 09/10] empty commit to retrigger flaky tests From adac7932a9524bfd57b4949693c7c0e558c1a247 Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Thu, 3 Aug 2023 15:01:14 +0300 Subject: [PATCH 10/10] Address PR comments - part 6 --- pkg/resources/kafka/kafka.go | 14 +++++++------- pkg/resources/kafka/kafka_test.go | 14 +++++++------- pkg/scale/scale.go | 2 +- pkg/scale/types.go | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 698b1e75d..a76f196e3 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -23,7 +23,7 @@ import ( "strings" "emperror.dev/errors" - types2 "github.com/banzaicloud/go-cruise-control/pkg/types" + ccTypes "github.com/banzaicloud/go-cruise-control/pkg/types" "github.com/go-logr/logr" "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" @@ -912,7 +912,7 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") } if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && len(terminatingOrPendingPods) > 0 { - err = r.isCruiseControlRackAwareDistributionGoalViolated() + err = r.checkCCRackAwareDistributionGoal() if err != nil { return err } @@ -983,21 +983,21 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return nil } -func (r *Reconciler) isCruiseControlRackAwareDistributionGoalViolated() error { +func (r *Reconciler) checkCCRackAwareDistributionGoal() error { cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster) cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster) if err != nil { return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, "failed to initialize Cruise Control", "cruise control url", cruiseControlURL) } - status, err := cc.Status(context.TODO()) + status, err := cc.Status(context.Background()) if err != nil { return errorfactory.New(errorfactory.CruiseControlNotReady{}, errors.New("failed to get status from Cruise Control"), "rolling upgrade in progress") } - if !slices.Contains(status.Result.AnalyzerState.ReadyGoals, types2.RackAwareDistributionGoal) { + if !slices.Contains(status.State.AnalyzerState.ReadyGoals, ccTypes.RackAwareDistributionGoal) { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is not ready"), "rolling upgrade in progress") } - for _, anomaly := range status.Result.AnomalyDetectorState.RecentGoalViolations { - if slices.Contains(anomaly.FixableViolatedGoals, types2.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, types2.RackAwareDistributionGoal) { + for _, anomaly := range status.State.AnomalyDetectorState.RecentGoalViolations { + if slices.Contains(anomaly.FixableViolatedGoals, ccTypes.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, ccTypes.RackAwareDistributionGoal) { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is violated"), "rolling upgrade in progress") } } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 92290400c..3b2de2259 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1061,7 +1061,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1102,7 +1102,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1213,7 +1213,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1329,7 +1329,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { allOfflineReplicas: []int32{}, outOfSyncReplicas: []int32{101}, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1366,7 +1366,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1465,7 +1465,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1511,7 +1511,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { // Mock Cruise Control client mockCruiseControl := controllerMocks.NewMockCruiseControlScaler(mockCtrl) if test.ccStatus != nil { - mockCruiseControl.EXPECT().Status(context.TODO()).Return(*test.ccStatus, nil) + mockCruiseControl.EXPECT().Status(context.Background()).Return(*test.ccStatus, nil) } r.CruiseControlScalerFactory = controllerMocks.NewMockScaleFactory(mockCruiseControl) diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index 9dd4d9743..90afd7ff5 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -137,7 +137,7 @@ func (cc *cruiseControlScaler) Status(ctx context.Context) (StatusTaskResult, er State: v1beta1.CruiseControlTaskCompleted, }, Status: &status, - Result: resp.Result, + State: resp.Result, }, nil } diff --git a/pkg/scale/types.go b/pkg/scale/types.go index 4ea945a02..6a7c0bcbf 100644 --- a/pkg/scale/types.go +++ b/pkg/scale/types.go @@ -64,7 +64,7 @@ const ( type StatusTaskResult struct { TaskResult *Result Status *CruiseControlStatus - Result *types.StateResult + State *types.StateResult } // CruiseControlStatus struct is used to describe internal state of Cruise Control.