Skip to content

Commit

Permalink
Address PR comments - RackAwareDistributionGoal
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrlaltluc committed Jul 27, 2023
1 parent 68d9f7b commit 4538862
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 24 deletions.
14 changes: 7 additions & 7 deletions controllers/tests/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is an add_broker operation for execution", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock1())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock1())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -105,7 +105,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("add_broker operation is finished with completedWithError and 30s has not elapsed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock2())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock2())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -132,7 +132,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("add_broker operation is finished with completedWithError and 30s has elapsed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock5())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock5())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -161,7 +161,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is an errored remove_broker and an add_broker operation", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock3())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock3())
// First operation will get completedWithError
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expand Down Expand Up @@ -208,7 +208,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is a new remove_broker and an errored remove_broker operation with pause annotation", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
operation.Labels["pause"] = "true"
err := k8sClient.Create(ctx, &operation)
Expand Down Expand Up @@ -257,7 +257,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is a new remove_broker and an errored remove_broker operation with ignore ErrorPolicy", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4())
// Creating first operation
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
operation.Spec.ErrorPolicy = v1alpha1.ErrorPolicyIgnore
Expand Down Expand Up @@ -307,7 +307,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("Cruise Control makes the Status operation async", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock7())
operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down
16 changes: 5 additions & 11 deletions controllers/tests/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
When("new storage is added", Serial, func() {

JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))

err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -147,7 +147,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new storage is added but there is a not JBOD capacityConfig for that", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -227,7 +227,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new storage is added and one broker is JBOD and another is not JBOD", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -312,7 +312,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new broker is added", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1())
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1())
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -400,7 +400,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("a broker is removed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1())
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1())
err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Expand Down Expand Up @@ -447,12 +447,6 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
})

func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return mock, nil
}
}

func getScaleMockCCTask1() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
Expand Down
28 changes: 28 additions & 0 deletions controllers/tests/mocks/scale_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mocks

import (
"context"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/scale"
)

func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return mock, nil
}
}
41 changes: 36 additions & 5 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"strings"

"emperror.dev/errors"
types2 "github.com/banzaicloud/go-cruise-control/pkg/types"
properties "github.com/banzaicloud/koperator/properties/pkg"
"github.com/go-logr/logr"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,6 +41,7 @@ import (

"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
"github.com/banzaicloud/koperator/pkg/jmxextractor"
"github.com/banzaicloud/koperator/pkg/k8sutil"
Expand Down Expand Up @@ -98,7 +101,8 @@ var (
// Reconciler implements the Component Reconciler
type Reconciler struct {
resources.Reconciler
kafkaClientProvider kafkaclient.Provider
kafkaClientProvider kafkaclient.Provider
CruiseControlScalerFactory func(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster) (scale.CruiseControlScaler, error)
}

// New creates a new reconciler for Kafka
Expand All @@ -109,7 +113,8 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk
DirectClient: directClient,
KafkaCluster: cluster,
},
kafkaClientProvider: kafkaClientProvider,
kafkaClientProvider: kafkaClientProvider,
CruiseControlScalerFactory: scale.ScaleFactoryFn(),
}
}

Expand Down Expand Up @@ -471,12 +476,11 @@ func (r *Reconciler) reconcileKafkaPodDelete(ctx context.Context, log logr.Logge

if len(podsDeletedFromSpec) > 0 {
if !arePodsAlreadyDeleted(podsDeletedFromSpec, log) {
cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster)
// FIXME: we should reuse the context of the Kafka Controller
cc, err := scale.NewCruiseControlScaler(context.TODO(), scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster))
cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster)
if err != nil {
return errorfactory.New(errorfactory.CruiseControlNotReady{}, err,
"failed to initialize Cruise Control Scaler", "cruise control url", cruiseControlURL)
"failed to initialize Cruise Control Scaler", "cruise control url", scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster))
}

brokerStates := []scale.KafkaBrokerState{
Expand Down Expand Up @@ -923,6 +927,12 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
if len(terminatingOrPendingPods) >= r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
}
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && len(terminatingOrPendingPods) > 0 {
err = r.isCruiseControlRackAwareDistributionGoalViolated()
if err != nil {
return err
}
}
kafkaBrokerAvailabilityZoneMap := getBrokerAzMap(r.KafkaCluster)
currentPodAz, _ := r.getBrokerAz(currentPod, kafkaBrokerAvailabilityZoneMap)
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods, kafkaBrokerAvailabilityZoneMap) {
Expand Down Expand Up @@ -989,6 +999,27 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
return nil
}

func (r *Reconciler) isCruiseControlRackAwareDistributionGoalViolated() error {
cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster)
cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster)
if err != nil {
return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, "failed to initialize Cruise Control", "cruise control url", cruiseControlURL)
}
status, err := cc.Status(context.TODO())
if err != nil {
return errorfactory.New(errorfactory.CruiseControlNotReady{}, errors.New("failed to get status from Cruise Control"), "rolling upgrade in progress")
}
if !slices.Contains(status.Result.AnalyzerState.ReadyGoals, types2.RackAwareDistributionGoal) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is not ready"), "rolling upgrade in progress")
}
for _, anomaly := range status.Result.AnomalyDetectorState.RecentGoalViolations {
if slices.Contains(anomaly.FixableViolatedGoals, types2.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, types2.RackAwareDistributionGoal) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is violated"), "rolling upgrade in progress")
}
}
return nil
}

func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}, kafkaBrokerAvailabilityZoneMap map[int32]string) bool {
if currentPodAz == "" && len(impactedReplicas) > 0 {
return true
Expand Down
Loading

0 comments on commit 4538862

Please sign in to comment.