Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update go-cruise-control to v0.4.0 #949

Merged
merged 2 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,14 @@ install-kustomize:
# Run tests
test: generate fmt vet manifests bin/setup-envtest
cd api && go test ./...
bin/setup-envtest use -p env ${ENVTEST_K8S_VERSION} > bin/envtest.sh \
&& source bin/envtest.sh; \
go test ./... \
-coverprofile cover.out \
-v \
-failfast \
-test.v \
-test.paniconexit0 \
-timeout 1h
KUBEBUILDER_ASSETS=$$($(BIN_DIR)/setup-envtest --print path --bin-dir $(BIN_DIR) use $(ENVTEST_K8S_VERSION)) \
go test ./... \
-coverprofile cover.out \
-v \
-failfast \
-test.v \
-test.paniconexit0 \
-timeout 1h
cd properties && go test -coverprofile cover.out -cover -failfast -v -covermode=count ./pkg/... ./internal/...

# Build manager binary
Expand Down Expand Up @@ -165,15 +164,17 @@ bin/controller-gen-$(CONTROLLER_GEN_VERSION):
mv bin/controller-gen bin/controller-gen-$(CONTROLLER_GEN_VERSION)

# find or download setup-envtest
bin/setup-envtest:
@ if ! test -x $(PWD)/bin/setup-envtest; then \
set -ex ;\
SETUP_ENVTEST_TMP_DIR=$$(mktemp -d) ;\
cd $$SETUP_ENVTEST_TMP_DIR ;\
go mod init tmp ;\
GOBIN=$(PWD)/bin go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest ;\
rm -rf $$SETUP_ENVTEST_TMP_DIR ;\
fi

# https://github.com/kubernetes-sigs/controller-runtime/commits/main/tools/setup-envtest
SETUP_ENVTEST_VERSION := d4f1e822ca11e9ff149bf2d9b5285f375334eba5

bin/setup-envtest: $(BIN_DIR)/setup-envtest-$(SETUP_ENVTEST_VERSION) ## Install setup-envtest CLI
@ln -sf setup-envtest-$(SETUP_ENVTEST_VERSION) $(BIN_DIR)/setup-envtest

$(BIN_DIR)/setup-envtest-$(SETUP_ENVTEST_VERSION):
@mkdir -p $(BIN_DIR)
@GOBIN=$(BIN_DIR) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@$(SETUP_ENVTEST_VERSION)
@mv $(BIN_DIR)/setup-envtest $(BIN_DIR)/setup-envtest-$(SETUP_ENVTEST_VERSION)

check_release:
@echo "A new tag (${REL_TAG}) will be pushed to Github, and a new Docker image will be released. Are you sure? [y/N] " && read ans && [ $${ans:-N} == y ]
Expand Down
20 changes: 10 additions & 10 deletions controllers/cruisecontroloperation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
return requeueWithError(log, "failed to lookup referenced kafka cluster", err)
}

//Adding finalizer
// Adding finalizer
if err := r.addFinalizer(ctx, currentCCOperation); err != nil {
return requeueWithError(log, "failed to add finalizer to CruiseControlOperation", err)
}
Expand All @@ -144,7 +144,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
}

// Checking Cruise Control health
status, err := r.scaler.Status()
status, err := r.scaler.Status(ctx)
if err != nil {
log.Error(err, "could not get Cruise Control status")
return requeueAfter(defaultRequeueIntervalInSeconds)
Expand Down Expand Up @@ -178,7 +178,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
return requeueAfter(defaultRequeueIntervalInSeconds)
}

//When the task is not in execution we can remove the finalizer
// When the task is not in execution we can remove the finalizer
if isFinalizerNeeded(currentCCOperation) && !currentCCOperation.IsCurrentTaskRunning() {
controllerutil.RemoveFinalizer(currentCCOperation, ccOperationFinalizerGroup)
if err := r.Update(ctx, currentCCOperation); err != nil {
Expand Down Expand Up @@ -211,7 +211,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques

log.Info("executing Cruise Control task", "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters())
// Executing operation
cruseControlTaskResult, err := r.executeOperation(ccOperationExecution)
cruseControlTaskResult, err := r.executeOperation(ctx, ccOperationExecution)

if err != nil {
log.Error(err, "Cruise Control task execution got an error", "name", ccOperationExecution.GetName(), "namespace", ccOperationExecution.GetNamespace(), "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters())
Expand Down Expand Up @@ -255,18 +255,18 @@ func (r *CruiseControlOperationReconciler) addFinalizer(ctx context.Context, cur
return nil
}

func (r *CruiseControlOperationReconciler) executeOperation(ccOperationExecution *banzaiv1alpha1.CruiseControlOperation) (*scale.Result, error) {
func (r *CruiseControlOperationReconciler) executeOperation(ctx context.Context, ccOperationExecution *banzaiv1alpha1.CruiseControlOperation) (*scale.Result, error) {
var cruseControlTaskResult *scale.Result
var err error
switch ccOperationExecution.CurrentTaskOperation() {
case banzaiv1alpha1.OperationAddBroker:
cruseControlTaskResult, err = r.scaler.AddBrokersWithParams(ccOperationExecution.CurrentTaskParameters())
cruseControlTaskResult, err = r.scaler.AddBrokersWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationRemoveBroker:
cruseControlTaskResult, err = r.scaler.RemoveBrokersWithParams(ccOperationExecution.CurrentTaskParameters())
cruseControlTaskResult, err = r.scaler.RemoveBrokersWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationRebalance:
cruseControlTaskResult, err = r.scaler.RebalanceWithParams(ccOperationExecution.CurrentTaskParameters())
cruseControlTaskResult, err = r.scaler.RebalanceWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationStopExecution:
cruseControlTaskResult, err = r.scaler.StopExecution()
cruseControlTaskResult, err = r.scaler.StopExecution(ctx)
default:
err = errors.NewWithDetails("Cruise Control operation not supported", "name", ccOperationExecution.GetName(), "namespace", ccOperationExecution.GetNamespace(), "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters())
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func (r *CruiseControlOperationReconciler) updateCurrentTasks(ctx context.Contex
ccOperationsCopy = append(ccOperationsCopy, ccOperation.DeepCopy())
}

tasks, err := r.scaler.UserTasks(userTaskIDs...)
tasks, err := r.scaler.UserTasks(ctx, userTaskIDs...)
if err != nil {
return errors.WrapIff(err, "could not get user tasks from Cruise Control API")
}
Expand Down
14 changes: 7 additions & 7 deletions controllers/cruisecontroltask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
break
}

unavailableBrokers, err := getUnavailableBrokers(scaler, brokerIDs)
unavailableBrokers, err := getUnavailableBrokers(ctx, scaler, brokerIDs)
if err != nil {
log.Error(err, "could not get unavailable brokers for upscale")
return requeueAfter(DefaultRequeueAfterTimeInSec)
}
if len(unavailableBrokers) > 0 {
log.Info("requeue as broker(s) are not ready for upscale", "brokerIDs", unavailableBrokers)
// This requeue is not necessary because the cruisecontrloperation controller retries the errored task
// This requeue is not necessary because the cruisecontroloperation controller retries the errored task
// but in this case there will be GracefulUpscaleCompletedWithError status in the kafkaCluster's status.
// To avoid that requeue is here until brokers come up.
return requeueAfter(DefaultRequeueAfterTimeInSec)
Expand Down Expand Up @@ -175,7 +175,7 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
brokerIDs = append(brokerIDs, task.BrokerID)
}

unavailableBrokerIDs, err := checkBrokerLogDirsAvailability(scaler, tasksAndStates)
unavailableBrokerIDs, err := checkBrokerLogDirsAvailability(ctx, scaler, tasksAndStates)
if err != nil {
log.Error(err, "failed to get unavailable brokers at rebalance")
return requeueAfter(DefaultRequeueAfterTimeInSec)
Expand Down Expand Up @@ -233,8 +233,8 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
return reconciled()
}

func checkBrokerLogDirsAvailability(scaler scale.CruiseControlScaler, tasksAndStates *CruiseControlTasksAndStates) (unavailableBrokerIDs []string, err error) {
logDirsByBroker, err := scaler.LogDirsByBroker()
func checkBrokerLogDirsAvailability(ctx context.Context, scaler scale.CruiseControlScaler, tasksAndStates *CruiseControlTasksAndStates) (unavailableBrokerIDs []string, err error) {
logDirsByBroker, err := scaler.LogDirsByBroker(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get list of volumes per broker from Cruise Control")
}
Expand All @@ -256,10 +256,10 @@ func checkBrokerLogDirsAvailability(scaler scale.CruiseControlScaler, tasksAndSt
return unavailableBrokerIDs, nil
}

func getUnavailableBrokers(scaler scale.CruiseControlScaler, brokerIDs []string) ([]string, error) {
func getUnavailableBrokers(ctx context.Context, scaler scale.CruiseControlScaler, brokerIDs []string) ([]string, error) {
states := []scale.KafkaBrokerState{scale.KafkaBrokerAlive, scale.KafkaBrokerNew}
// This can result NullPointerException when the capacity calculation is missing for a broker in the cruisecontrol configmap
availableBrokers, err := scaler.BrokersWithState(states...)
availableBrokers, err := scaler.BrokersWithState(ctx, states...)
if err != nil {
return nil, errors.WrapIff(err, "failed to retrieve list of available brokers from Cruise Control")
}
Expand Down
44 changes: 22 additions & 22 deletions controllers/tests/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,20 +320,20 @@ func generateCruiseControlOperation(name, namespace, kafkaRef string) v1alpha1.C
func getScaleMock2() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
scaleMock.EXPECT().IsUp().Return(true).AnyTimes()
scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes()

userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskCompletedWithError,
})}
scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().AddBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{
scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskActive,
Expand All @@ -343,20 +343,20 @@ func getScaleMock2() *mocks.MockCruiseControlScaler {
func getScaleMock1() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
scaleMock.EXPECT().IsUp().Return(true).AnyTimes()
scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes()

userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskCompleted,
})}
scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().AddBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{
scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskActive,
Expand All @@ -367,7 +367,7 @@ func getScaleMock1() *mocks.MockCruiseControlScaler {
func getScaleMock3() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
scaleMock.EXPECT().IsUp().Return(true).AnyTimes()
scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes()

userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{
TaskID: "12345",
Expand All @@ -378,18 +378,18 @@ func getScaleMock3() *mocks.MockCruiseControlScaler {
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskCompleted,
})}
scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().RemoveBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{
scaleMock.EXPECT().RemoveBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskActive,
}), nil).AnyTimes()
scaleMock.EXPECT().AddBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{
scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "2",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskActive,
Expand All @@ -401,7 +401,7 @@ func getScaleMock3() *mocks.MockCruiseControlScaler {
func getScaleMock4() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
scaleMock.EXPECT().IsUp().Return(true).AnyTimes()
scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes()

userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{
TaskID: "1",
Expand All @@ -412,13 +412,13 @@ func getScaleMock4() *mocks.MockCruiseControlScaler {
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskCompletedWithError,
})}
scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().RemoveBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{
scaleMock.EXPECT().RemoveBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "1",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskActive,
Expand All @@ -429,7 +429,7 @@ func getScaleMock4() *mocks.MockCruiseControlScaler {
func getScaleMock5() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
scaleMock.EXPECT().IsUp().Return(true).AnyTimes()
scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes()

userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{
TaskID: "12345",
Expand All @@ -441,14 +441,14 @@ func getScaleMock5() *mocks.MockCruiseControlScaler {
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskCompleted,
})}
first := scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).Times(1)
scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult2, nil).After(first).AnyTimes()
scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{
first := scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).Times(1)
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult2, nil).After(first).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().AddBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{
scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskActive,
Expand Down
6 changes: 3 additions & 3 deletions controllers/tests/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,22 +457,22 @@ func getScaleMockCCTask1() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
availableBrokers := []string{"1", "2", "3"}
scaleMock.EXPECT().BrokersWithState(gomock.All()).Return(availableBrokers, nil).AnyTimes()
scaleMock.EXPECT().BrokersWithState(gomock.Any(), gomock.All()).Return(availableBrokers, nil).AnyTimes()
return scaleMock
}

func getScaleMockCCTask2(onlineLogDirs []string) *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
availableBrokers := []string{"1", "2", "3"}
scaleMock.EXPECT().BrokersWithState(gomock.All()).Return(availableBrokers, nil).AnyTimes()
scaleMock.EXPECT().BrokersWithState(gomock.Any(), gomock.All()).Return(availableBrokers, nil).AnyTimes()

logDirs := make(map[scale.LogDirState][]string)
logDirsBrokerRet := make(map[string]map[scale.LogDirState][]string)

logDirs[scale.LogDirStateOnline] = onlineLogDirs
logDirsBrokerRet["0"] = logDirs
logDirsBrokerRet["1"] = logDirs
scaleMock.EXPECT().LogDirsByBroker().Return(logDirsBrokerRet, nil).AnyTimes()
scaleMock.EXPECT().LogDirsByBroker(gomock.Any()).Return(logDirsBrokerRet, nil).AnyTimes()
return scaleMock
}
Loading