Skip to content

Commit

Permalink
Allow concurrent broker restarts from same AZ (broker rack) (adobe#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
amuraru committed Dec 12, 2023
1 parent f023d28 commit 514fa07
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 115 deletions.
2 changes: 2 additions & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ require (
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)
// remove once https://github.com/cert-manager/cert-manager/issues/5953 is fixed
replace github.com/Venafi/vcert/v4 => github.com/jetstack/vcert/v4 v4.9.6-0.20230127103832-3aa3dfd6613d
8 changes: 3 additions & 5 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,15 @@ type RollingUpgradeConfig struct {
// alerts with 'rollingupgrade'
FailureThreshold int `json:"failureThreshold"`

// ConcurrentBrokerRestartCountPerRack controls how many brokers can be restarted in parallel during a rolling upgrade. If
// ConcurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
// it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
// brokers are within the same rack (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
// racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
// 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
// This is a safe way to speed up the rolling upgrade. Note that for the rack distribution explained above, Cruise Control
// requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal` to be configured. Default value is 1.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=1
// requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal` to be configured.
// +optional
ConcurrentBrokerRestartCountPerRack int `json:"concurrentBrokerRestartCountPerRack,omitempty"`
ConcurrentBrokerRestartsAllowed int `json:"concurrentBrokerRestartsAllowed,omitempty"`
}

// DisruptionBudget defines the configuration for PodDisruptionBudget where the workload is managed by the kafka-operator
Expand Down
10 changes: 4 additions & 6 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21556,10 +21556,9 @@ spec:
description: RollingUpgradeConfig defines the desired config of the
RollingUpgrade
properties:
concurrentBrokerRestartCountPerRack:
default: 1
description: ConcurrentBrokerRestartCountPerRack controls how
many brokers can be restarted in parallel during a rolling upgrade.
concurrentBrokerRestartsAllowed:
description: ConcurrentBrokerRestartsAllowed controls how many
brokers can be restarted in parallel during a rolling upgrade.
If it is set to a value greater than 1, the operator will restart
up to that amount of brokers in parallel, if the brokers are
within the same rack (as specified by "broker.rack" in broker
Expand All @@ -21570,8 +21569,7 @@ spec:
N is the number of racks used. This is a safe way to speed up
the rolling upgrade. Note that for the rack distribution explained
above, Cruise Control requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal`
to be configured. Default value is 1.
minimum: 1
to be configured.
type: integer
failureThreshold:
description: FailureThreshold controls how many failures the cluster
Expand Down
10 changes: 4 additions & 6 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21556,10 +21556,9 @@ spec:
description: RollingUpgradeConfig defines the desired config of the
RollingUpgrade
properties:
concurrentBrokerRestartCountPerRack:
default: 1
description: ConcurrentBrokerRestartCountPerRack controls how
many brokers can be restarted in parallel during a rolling upgrade.
concurrentBrokerRestartsAllowed:
description: ConcurrentBrokerRestartsAllowed controls how many
brokers can be restarted in parallel during a rolling upgrade.
If it is set to a value greater than 1, the operator will restart
up to that amount of brokers in parallel, if the brokers are
within the same rack (as specified by "broker.rack" in broker
Expand All @@ -21570,8 +21569,7 @@ spec:
N is the number of racks used. This is a safe way to speed up
the rolling upgrade. Note that for the rack distribution explained
above, Cruise Control requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal`
to be configured. Default value is 1.
minimum: 1
to be configured.
type: integer
failureThreshold:
description: FailureThreshold controls how many failures the cluster
Expand Down
18 changes: 11 additions & 7 deletions controllers/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/mock/gomock"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/pkg/scale"

Expand Down Expand Up @@ -370,8 +371,11 @@ func TestCreateCCOperation(t *testing.T) {
},
}

mockCtrl := gomock.NewController(t)

for _, testCase := range testCases {
mockClient := new(mocks.Client)
mockClient := mocks.NewMockClient(mockCtrl)
mockSubResourceClient := mocks.NewMockSubResourceClient(mockCtrl)
scheme := runtime.NewScheme()
_ = v1beta1.AddToScheme(scheme)
_ = corev1.AddToScheme(scheme)
Expand All @@ -389,17 +393,17 @@ func TestCreateCCOperation(t *testing.T) {

// Mock the Create call and capture the operation
var createdOperation *banzaiv1alpha1.CruiseControlOperation
mockClient.On("Create", ctx, mock.IsType(&banzaiv1alpha1.CruiseControlOperation{})).Run(func(args mock.Arguments) {
createdOperation = args.Get(1).(*banzaiv1alpha1.CruiseControlOperation)
mockClient.EXPECT().Create(ctx, gomock.AssignableToTypeOf(&banzaiv1alpha1.CruiseControlOperation{})).Do(func(ctx context.Context, obj client.Object, opts ...client.CreateOption) {
createdOperation = obj.(*banzaiv1alpha1.CruiseControlOperation)
createdOperation.ObjectMeta.Name = "generated-name"
}).Return(nil)

// Mock the Status call
mockClient.On("Status").Return(mockClient)
mockClient.EXPECT().Status().Return(mockSubResourceClient)

// Mock the Update call
mockClient.On("Update", ctx, mock.IsType(&banzaiv1alpha1.CruiseControlOperation{})).Run(func(args mock.Arguments) {
arg := args.Get(1).(*banzaiv1alpha1.CruiseControlOperation)
mockSubResourceClient.EXPECT().Update(ctx, gomock.AssignableToTypeOf(&banzaiv1alpha1.CruiseControlOperation{})).Do(func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) {
arg := obj.(*banzaiv1alpha1.CruiseControlOperation)
createdOperation.Status = arg.Status
}).Return(nil)

Expand Down
2 changes: 1 addition & 1 deletion controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
CCJMXExporterConfig: "custom_property: custom_value",
},
ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true",
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartCountPerRack: 1},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartsAllowed: 1},
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/tests/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is an errored remove_disks and a rebalance disks operation for the same broker", Serial, func() {
JustBeforeEach(func() {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock6())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock6())
// Remove_disk operation - errored
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(context.Background(), &operation)
Expand Down Expand Up @@ -361,7 +361,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("Cruise Control makes the Status operation async", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock7())
operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down
21 changes: 12 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ require (
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-logr/logr v1.2.4
github.com/imdario/mergo v0.3.13
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.6
github.com/onsi/ginkgo/v2 v2.9.7
github.com/onsi/gomega v1.27.8
github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1
github.com/prometheus/common v0.37.0
github.com/stretchr/testify v1.8.1
go.uber.org/mock v0.2.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
google.golang.org/protobuf v1.28.1
gopkg.in/inf.v0 v0.9.1
gotest.tools v2.2.0+incompatible
Expand All @@ -40,7 +40,7 @@ require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/tools v0.9.1 // indirect
)

require (
Expand Down Expand Up @@ -113,12 +113,12 @@ require (
github.com/wayneashleyberry/terminal-dimensions v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand All @@ -143,3 +143,6 @@ replace (
github.com/gogo/protobuf => github.com/waynz0r/protobuf v1.3.3-0.20210811122234-64636cae0910
github.com/golang/protobuf => github.com/luciferinlove/protobuf v0.0.0-20220913214010-c63936d75066
)

// remove once https://github.com/cert-manager/cert-manager/issues/5953 is fixed
replace github.com/Venafi/vcert/v4 => github.com/jetstack/vcert/v4 v4.9.6-0.20230127103832-3aa3dfd6613d
Loading

0 comments on commit 514fa07

Please sign in to comment.