Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow concurrent broker restarts within same broker rack #1001

Merged
merged 11 commits into from
Aug 4, 2023
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,8 @@ mock-generate: bin/mockgen
-package mocks \
-destination pkg/resources/kafka/mocks/Client.go \
sigs.k8s.io/controller-runtime/pkg/client Client
$(BIN_DIR)/mockgen \
-copyright_file $(BOILERPLATE_DIR)/header.generated.txt \
-package mocks \
-destination pkg/resources/kafka/mocks/KafkaClient.go \
-source pkg/kafkaclient/client.go
8 changes: 8 additions & 0 deletions config/samples/banzaicloud_v1beta1_kafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ spec:
# alerts with 'rollingupgrade'
# failureThreshold: 1

# concurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
# it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
# brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
# racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
# 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
# This is a safe way to speed up the rolling upgrade.
# concurrentBrokerRestartsAllowed: 1

# brokerConfigGroups specifies multiple broker configs with unique name
brokerConfigGroups:
# Specify desired group name (eg., 'default_group')
Expand Down
2 changes: 1 addition & 1 deletion controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
CCJMXExporterConfig: "custom_property: custom_value",
},
ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true",
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartCountPerRack: 1},
},
}
}
Expand Down
14 changes: 7 additions & 7 deletions controllers/tests/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is an add_broker operation for execution", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock1())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock1())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -105,7 +105,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("add_broker operation is finished with completedWithError and 30s has not elapsed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock2())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock2())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -132,7 +132,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("add_broker operation is finished with completedWithError and 30s has elapsed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock5())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock5())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -161,7 +161,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is an errored remove_broker and an add_broker operation", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock3())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock3())
// First operation will get completedWithError
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expand Down Expand Up @@ -208,7 +208,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is a new remove_broker and an errored remove_broker operation with pause annotation", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
operation.Labels["pause"] = "true"
err := k8sClient.Create(ctx, &operation)
Expand Down Expand Up @@ -257,7 +257,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is a new remove_broker and an errored remove_broker operation with ignore ErrorPolicy", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4())
// Creating first operation
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
operation.Spec.ErrorPolicy = v1alpha1.ErrorPolicyIgnore
Expand Down Expand Up @@ -307,7 +307,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("Cruise Control makes the Status operation async", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock7())
operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down
16 changes: 5 additions & 11 deletions controllers/tests/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
When("new storage is added", Serial, func() {

JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))

err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -147,7 +147,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new storage is added but there is a not JBOD capacityConfig for that", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -227,7 +227,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new storage is added and one broker is JBOD and another is not JBOD", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -312,7 +312,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new broker is added", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1())
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1())
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -400,7 +400,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("a broker is removed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1())
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1())
err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Expand Down Expand Up @@ -447,12 +447,6 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
})

func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return mock, nil
}
}

func getScaleMockCCTask1() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
Expand Down
28 changes: 28 additions & 0 deletions controllers/tests/mocks/scale_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mocks

import (
"context"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/scale"
)

func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return mock, nil
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/banzaicloud/istio-client-go v0.0.17
github.com/banzaicloud/istio-operator/api/v2 v2.15.1
github.com/banzaicloud/k8s-objectmatcher v1.8.0
github.com/banzaicloud/koperator/api v0.28.6
github.com/banzaicloud/koperator/api v0.28.7
github.com/banzaicloud/koperator/properties v0.4.1
github.com/cert-manager/cert-manager v1.11.2
github.com/cisco-open/cluster-registry-controller/api v0.2.5
Expand Down Expand Up @@ -38,6 +38,7 @@ require (
require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/tools v0.7.0 // indirect
)

Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc
github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg=
github.com/banzaicloud/koperator/api v0.28.6 h1:ZsOAXAsg34O78qVCEHx84cdp57HlCje6zjzXHhvtXf4=
github.com/banzaicloud/koperator/api v0.28.6/go.mod h1:AGGQ+aTBklaaG8ErotNPlP/nS47MYLc/jFVW7AsDiEE=
github.com/banzaicloud/koperator/api v0.28.7 h1:G6ICLzuz6Tumcsl9ZaqZ46ccwdAc1rXjidP03v6Kqp4=
github.com/banzaicloud/koperator/api v0.28.7/go.mod h1:AGGQ+aTBklaaG8ErotNPlP/nS47MYLc/jFVW7AsDiEE=
github.com/banzaicloud/koperator/properties v0.4.1 h1:SB2QgXlcK1Dc7Z1rg65PJifErDa8OQnoWCCJgmC7SGc=
github.com/banzaicloud/koperator/properties v0.4.1/go.mod h1:TcL+llxuhW3UeQtVEDYEXGouFLF2P+LuZZVudSb6jyA=
github.com/banzaicloud/operator-tools v0.28.0 h1:GSfc0qZr6zo7WrNxdgWZE1LcTChPU8QFYOTDirYVtIM=
Expand Down Expand Up @@ -433,6 +435,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down
11 changes: 11 additions & 0 deletions pkg/kafkaclient/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kafkaclient

import (
"github.com/stretchr/testify/mock"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/api/v1beta1"
Expand Down Expand Up @@ -45,3 +46,13 @@ func NewDefaultProvider() Provider {
func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
return NewFromCluster(client, cluster)
}

// MockedProvider is a Testify mock for providing Kafka clients that can be mocks too
type MockedProvider struct {
mock.Mock
}

func (m *MockedProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
args := m.Called(client, cluster)
return args.Get(0).(KafkaClient), args.Get(1).(func()), args.Error(2)
}
Loading