diff --git a/Makefile b/Makefile index 96f6b711a..7e5f4957f 100644 --- a/Makefile +++ b/Makefile @@ -88,15 +88,14 @@ install-kustomize: # Run tests test: generate fmt vet manifests bin/setup-envtest cd api && go test ./... - bin/setup-envtest use -p env ${ENVTEST_K8S_VERSION} > bin/envtest.sh \ - && source bin/envtest.sh; \ - go test ./... \ - -coverprofile cover.out \ - -v \ - -failfast \ - -test.v \ - -test.paniconexit0 \ - -timeout 1h + KUBEBUILDER_ASSETS=$$($(BIN_DIR)/setup-envtest --print path --bin-dir $(BIN_DIR) use $(ENVTEST_K8S_VERSION)) \ + go test ./... \ + -coverprofile cover.out \ + -v \ + -failfast \ + -test.v \ + -test.paniconexit0 \ + -timeout 1h cd properties && go test -coverprofile cover.out -cover -failfast -v -covermode=count ./pkg/... ./internal/... # Build manager binary @@ -165,15 +164,17 @@ bin/controller-gen-$(CONTROLLER_GEN_VERSION): mv bin/controller-gen bin/controller-gen-$(CONTROLLER_GEN_VERSION) # find or download setup-envtest -bin/setup-envtest: - @ if ! test -x $(PWD)/bin/setup-envtest; then \ - set -ex ;\ - SETUP_ENVTEST_TMP_DIR=$$(mktemp -d) ;\ - cd $$SETUP_ENVTEST_TMP_DIR ;\ - go mod init tmp ;\ - GOBIN=$(PWD)/bin go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest ;\ - rm -rf $$SETUP_ENVTEST_TMP_DIR ;\ - fi + +# https://github.com/kubernetes-sigs/controller-runtime/commits/main/tools/setup-envtest +SETUP_ENVTEST_VERSION := d4f1e822ca11e9ff149bf2d9b5285f375334eba5 + +bin/setup-envtest: $(BIN_DIR)/setup-envtest-$(SETUP_ENVTEST_VERSION) ## Install setup-envtest CLI + @ln -sf setup-envtest-$(SETUP_ENVTEST_VERSION) $(BIN_DIR)/setup-envtest + +$(BIN_DIR)/setup-envtest-$(SETUP_ENVTEST_VERSION): + @mkdir -p $(BIN_DIR) + @GOBIN=$(BIN_DIR) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@$(SETUP_ENVTEST_VERSION) + @mv $(BIN_DIR)/setup-envtest $(BIN_DIR)/setup-envtest-$(SETUP_ENVTEST_VERSION) check_release: @echo "A new tag (${REL_TAG}) will be pushed to Github, and a new Docker image will be released. Are you sure? [y/N] " && read ans && [ $${ans:-N} == y ] diff --git a/controllers/cruisecontroloperation_controller.go b/controllers/cruisecontroloperation_controller.go index 88917edd4..7ffe1d67a 100644 --- a/controllers/cruisecontroloperation_controller.go +++ b/controllers/cruisecontroloperation_controller.go @@ -133,7 +133,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques return requeueWithError(log, "failed to lookup referenced kafka cluster", err) } - //Adding finalizer + // Adding finalizer if err := r.addFinalizer(ctx, currentCCOperation); err != nil { return requeueWithError(log, "failed to add finalizer to CruiseControlOperation", err) } @@ -144,7 +144,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques } // Checking Cruise Control health - status, err := r.scaler.Status() + status, err := r.scaler.Status(ctx) if err != nil { log.Error(err, "could not get Cruise Control status") return requeueAfter(defaultRequeueIntervalInSeconds) @@ -178,7 +178,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques return requeueAfter(defaultRequeueIntervalInSeconds) } - //When the task is not in execution we can remove the finalizer + // When the task is not in execution we can remove the finalizer if isFinalizerNeeded(currentCCOperation) && !currentCCOperation.IsCurrentTaskRunning() { controllerutil.RemoveFinalizer(currentCCOperation, ccOperationFinalizerGroup) if err := r.Update(ctx, currentCCOperation); err != nil { @@ -211,7 +211,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques log.Info("executing Cruise Control task", "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters()) // Executing operation - cruseControlTaskResult, err := r.executeOperation(ccOperationExecution) + cruseControlTaskResult, err := r.executeOperation(ctx, ccOperationExecution) if err != nil { log.Error(err, "Cruise Control task execution got an error", "name", ccOperationExecution.GetName(), "namespace", ccOperationExecution.GetNamespace(), "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters()) @@ -255,18 +255,18 @@ func (r *CruiseControlOperationReconciler) addFinalizer(ctx context.Context, cur return nil } -func (r *CruiseControlOperationReconciler) executeOperation(ccOperationExecution *banzaiv1alpha1.CruiseControlOperation) (*scale.Result, error) { +func (r *CruiseControlOperationReconciler) executeOperation(ctx context.Context, ccOperationExecution *banzaiv1alpha1.CruiseControlOperation) (*scale.Result, error) { var cruseControlTaskResult *scale.Result var err error switch ccOperationExecution.CurrentTaskOperation() { case banzaiv1alpha1.OperationAddBroker: - cruseControlTaskResult, err = r.scaler.AddBrokersWithParams(ccOperationExecution.CurrentTaskParameters()) + cruseControlTaskResult, err = r.scaler.AddBrokersWithParams(ctx, ccOperationExecution.CurrentTaskParameters()) case banzaiv1alpha1.OperationRemoveBroker: - cruseControlTaskResult, err = r.scaler.RemoveBrokersWithParams(ccOperationExecution.CurrentTaskParameters()) + cruseControlTaskResult, err = r.scaler.RemoveBrokersWithParams(ctx, ccOperationExecution.CurrentTaskParameters()) case banzaiv1alpha1.OperationRebalance: - cruseControlTaskResult, err = r.scaler.RebalanceWithParams(ccOperationExecution.CurrentTaskParameters()) + cruseControlTaskResult, err = r.scaler.RebalanceWithParams(ctx, ccOperationExecution.CurrentTaskParameters()) case banzaiv1alpha1.OperationStopExecution: - cruseControlTaskResult, err = r.scaler.StopExecution() + cruseControlTaskResult, err = r.scaler.StopExecution(ctx) default: err = errors.NewWithDetails("Cruise Control operation not supported", "name", ccOperationExecution.GetName(), "namespace", ccOperationExecution.GetNamespace(), "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters()) } @@ -440,7 +440,7 @@ func (r *CruiseControlOperationReconciler) updateCurrentTasks(ctx context.Contex ccOperationsCopy = append(ccOperationsCopy, ccOperation.DeepCopy()) } - tasks, err := r.scaler.UserTasks(userTaskIDs...) + tasks, err := r.scaler.UserTasks(ctx, userTaskIDs...) if err != nil { return errors.WrapIff(err, "could not get user tasks from Cruise Control API") } diff --git a/controllers/cruisecontroltask_controller.go b/controllers/cruisecontroltask_controller.go index c3531cbc8..ca823fbf7 100644 --- a/controllers/cruisecontroltask_controller.go +++ b/controllers/cruisecontroltask_controller.go @@ -129,14 +129,14 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr break } - unavailableBrokers, err := getUnavailableBrokers(scaler, brokerIDs) + unavailableBrokers, err := getUnavailableBrokers(ctx, scaler, brokerIDs) if err != nil { log.Error(err, "could not get unavailable brokers for upscale") return requeueAfter(DefaultRequeueAfterTimeInSec) } if len(unavailableBrokers) > 0 { log.Info("requeue as broker(s) are not ready for upscale", "brokerIDs", unavailableBrokers) - // This requeue is not necessary because the cruisecontrloperation controller retries the errored task + // This requeue is not necessary because the cruisecontroloperation controller retries the errored task // but in this case there will be GracefulUpscaleCompletedWithError status in the kafkaCluster's status. // To avoid that requeue is here until brokers come up. return requeueAfter(DefaultRequeueAfterTimeInSec) @@ -175,7 +175,7 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr brokerIDs = append(brokerIDs, task.BrokerID) } - unavailableBrokerIDs, err := checkBrokerLogDirsAvailability(scaler, tasksAndStates) + unavailableBrokerIDs, err := checkBrokerLogDirsAvailability(ctx, scaler, tasksAndStates) if err != nil { log.Error(err, "failed to get unavailable brokers at rebalance") return requeueAfter(DefaultRequeueAfterTimeInSec) @@ -233,8 +233,8 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr return reconciled() } -func checkBrokerLogDirsAvailability(scaler scale.CruiseControlScaler, tasksAndStates *CruiseControlTasksAndStates) (unavailableBrokerIDs []string, err error) { - logDirsByBroker, err := scaler.LogDirsByBroker() +func checkBrokerLogDirsAvailability(ctx context.Context, scaler scale.CruiseControlScaler, tasksAndStates *CruiseControlTasksAndStates) (unavailableBrokerIDs []string, err error) { + logDirsByBroker, err := scaler.LogDirsByBroker(ctx) if err != nil { return nil, errors.Wrap(err, "failed to get list of volumes per broker from Cruise Control") } @@ -256,10 +256,10 @@ func checkBrokerLogDirsAvailability(scaler scale.CruiseControlScaler, tasksAndSt return unavailableBrokerIDs, nil } -func getUnavailableBrokers(scaler scale.CruiseControlScaler, brokerIDs []string) ([]string, error) { +func getUnavailableBrokers(ctx context.Context, scaler scale.CruiseControlScaler, brokerIDs []string) ([]string, error) { states := []scale.KafkaBrokerState{scale.KafkaBrokerAlive, scale.KafkaBrokerNew} // This can result NullPointerException when the capacity calculation is missing for a broker in the cruisecontrol configmap - availableBrokers, err := scaler.BrokersWithState(states...) + availableBrokers, err := scaler.BrokersWithState(ctx, states...) if err != nil { return nil, errors.WrapIff(err, "failed to retrieve list of available brokers from Cruise Control") } diff --git a/controllers/tests/cruisecontroloperation_controller_test.go b/controllers/tests/cruisecontroloperation_controller_test.go index 2e4000d4e..718036fee 100644 --- a/controllers/tests/cruisecontroloperation_controller_test.go +++ b/controllers/tests/cruisecontroloperation_controller_test.go @@ -320,20 +320,20 @@ func generateCruiseControlOperation(name, namespace, kafkaRef string) v1alpha1.C func getScaleMock2() *mocks.MockCruiseControlScaler { mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) - scaleMock.EXPECT().IsUp().Return(true).AnyTimes() + scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes() userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskCompletedWithError, })} - scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{ + scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ ExecutorReady: true, MonitorReady: true, AnalyzerReady: true, }, nil).AnyTimes() - scaleMock.EXPECT().AddBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{ + scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskActive, @@ -343,20 +343,20 @@ func getScaleMock2() *mocks.MockCruiseControlScaler { func getScaleMock1() *mocks.MockCruiseControlScaler { mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) - scaleMock.EXPECT().IsUp().Return(true).AnyTimes() + scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes() userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskCompleted, })} - scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{ + scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ ExecutorReady: true, MonitorReady: true, AnalyzerReady: true, }, nil).AnyTimes() - scaleMock.EXPECT().AddBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{ + scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskActive, @@ -367,7 +367,7 @@ func getScaleMock1() *mocks.MockCruiseControlScaler { func getScaleMock3() *mocks.MockCruiseControlScaler { mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) - scaleMock.EXPECT().IsUp().Return(true).AnyTimes() + scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes() userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{ TaskID: "12345", @@ -378,18 +378,18 @@ func getScaleMock3() *mocks.MockCruiseControlScaler { StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskCompleted, })} - scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{ + scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ ExecutorReady: true, MonitorReady: true, AnalyzerReady: true, }, nil).AnyTimes() - scaleMock.EXPECT().RemoveBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{ + scaleMock.EXPECT().RemoveBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskActive, }), nil).AnyTimes() - scaleMock.EXPECT().AddBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{ + scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "2", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskActive, @@ -401,7 +401,7 @@ func getScaleMock3() *mocks.MockCruiseControlScaler { func getScaleMock4() *mocks.MockCruiseControlScaler { mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) - scaleMock.EXPECT().IsUp().Return(true).AnyTimes() + scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes() userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{ TaskID: "1", @@ -412,13 +412,13 @@ func getScaleMock4() *mocks.MockCruiseControlScaler { StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskCompletedWithError, })} - scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{ + scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ ExecutorReady: true, MonitorReady: true, AnalyzerReady: true, }, nil).AnyTimes() - scaleMock.EXPECT().RemoveBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{ + scaleMock.EXPECT().RemoveBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "1", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskActive, @@ -429,7 +429,7 @@ func getScaleMock4() *mocks.MockCruiseControlScaler { func getScaleMock5() *mocks.MockCruiseControlScaler { mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) - scaleMock.EXPECT().IsUp().Return(true).AnyTimes() + scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes() userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{ TaskID: "12345", @@ -441,14 +441,14 @@ func getScaleMock5() *mocks.MockCruiseControlScaler { StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskCompleted, })} - first := scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult, nil).Times(1) - scaleMock.EXPECT().UserTasks(gomock.Any()).Return(userTaskResult2, nil).After(first).AnyTimes() - scaleMock.EXPECT().Status().Return(scale.CruiseControlStatus{ + first := scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).Times(1) + scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult2, nil).After(first).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ ExecutorReady: true, MonitorReady: true, AnalyzerReady: true, }, nil).AnyTimes() - scaleMock.EXPECT().AddBrokersWithParams(gomock.All()).Return(scaleResultPointer(scale.Result{ + scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", State: v1beta1.CruiseControlTaskActive, diff --git a/controllers/tests/cruisecontroltask_controller_test.go b/controllers/tests/cruisecontroltask_controller_test.go index 57f94ea17..cf149a991 100644 --- a/controllers/tests/cruisecontroltask_controller_test.go +++ b/controllers/tests/cruisecontroltask_controller_test.go @@ -457,7 +457,7 @@ func getScaleMockCCTask1() *mocks.MockCruiseControlScaler { mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) availableBrokers := []string{"1", "2", "3"} - scaleMock.EXPECT().BrokersWithState(gomock.All()).Return(availableBrokers, nil).AnyTimes() + scaleMock.EXPECT().BrokersWithState(gomock.Any(), gomock.All()).Return(availableBrokers, nil).AnyTimes() return scaleMock } @@ -465,7 +465,7 @@ func getScaleMockCCTask2(onlineLogDirs []string) *mocks.MockCruiseControlScaler mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) availableBrokers := []string{"1", "2", "3"} - scaleMock.EXPECT().BrokersWithState(gomock.All()).Return(availableBrokers, nil).AnyTimes() + scaleMock.EXPECT().BrokersWithState(gomock.Any(), gomock.All()).Return(availableBrokers, nil).AnyTimes() logDirs := make(map[scale.LogDirState][]string) logDirsBrokerRet := make(map[string]map[scale.LogDirState][]string) @@ -473,6 +473,6 @@ func getScaleMockCCTask2(onlineLogDirs []string) *mocks.MockCruiseControlScaler logDirs[scale.LogDirStateOnline] = onlineLogDirs logDirsBrokerRet["0"] = logDirs logDirsBrokerRet["1"] = logDirs - scaleMock.EXPECT().LogDirsByBroker().Return(logDirsBrokerRet, nil).AnyTimes() + scaleMock.EXPECT().LogDirsByBroker(gomock.Any()).Return(logDirsBrokerRet, nil).AnyTimes() return scaleMock } diff --git a/controllers/tests/mocks/scale.go b/controllers/tests/mocks/scale.go index 03505b025..6c9259baa 100644 --- a/controllers/tests/mocks/scale.go +++ b/controllers/tests/mocks/scale.go @@ -20,13 +20,13 @@ package mocks import ( + context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" - api "github.com/banzaicloud/go-cruise-control/pkg/api" types "github.com/banzaicloud/go-cruise-control/pkg/types" scale "github.com/banzaicloud/koperator/pkg/scale" + gomock "github.com/golang/mock/gomock" ) // MockCruiseControlScaler is a mock of CruiseControlScaler interface. @@ -53,9 +53,9 @@ func (m *MockCruiseControlScaler) EXPECT() *MockCruiseControlScalerMockRecorder } // AddBrokers mocks base method. -func (m *MockCruiseControlScaler) AddBrokers(brokerIDs ...string) (*scale.Result, error) { +func (m *MockCruiseControlScaler) AddBrokers(ctx context.Context, brokerIDs ...string) (*scale.Result, error) { m.ctrl.T.Helper() - varargs := []interface{}{} + varargs := []interface{}{ctx} for _, a := range brokerIDs { varargs = append(varargs, a) } @@ -66,45 +66,46 @@ func (m *MockCruiseControlScaler) AddBrokers(brokerIDs ...string) (*scale.Result } // AddBrokers indicates an expected call of AddBrokers. -func (mr *MockCruiseControlScalerMockRecorder) AddBrokers(brokerIDs ...interface{}) *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) AddBrokers(ctx interface{}, brokerIDs ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBrokers", reflect.TypeOf((*MockCruiseControlScaler)(nil).AddBrokers), brokerIDs...) + varargs := append([]interface{}{ctx}, brokerIDs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBrokers", reflect.TypeOf((*MockCruiseControlScaler)(nil).AddBrokers), varargs...) } // AddBrokersWithParams mocks base method. -func (m *MockCruiseControlScaler) AddBrokersWithParams(params map[string]string) (*scale.Result, error) { +func (m *MockCruiseControlScaler) AddBrokersWithParams(ctx context.Context, params map[string]string) (*scale.Result, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddBrokersWithParams", params) + ret := m.ctrl.Call(m, "AddBrokersWithParams", ctx, params) ret0, _ := ret[0].(*scale.Result) ret1, _ := ret[1].(error) return ret0, ret1 } // AddBrokersWithParams indicates an expected call of AddBrokersWithParams. -func (mr *MockCruiseControlScalerMockRecorder) AddBrokersWithParams(params interface{}) *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) AddBrokersWithParams(ctx, params interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBrokersWithParams", reflect.TypeOf((*MockCruiseControlScaler)(nil).AddBrokersWithParams), params) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBrokersWithParams", reflect.TypeOf((*MockCruiseControlScaler)(nil).AddBrokersWithParams), ctx, params) } // BrokerWithLeastPartitionReplicas mocks base method. -func (m *MockCruiseControlScaler) BrokerWithLeastPartitionReplicas() (string, error) { +func (m *MockCruiseControlScaler) BrokerWithLeastPartitionReplicas(ctx context.Context) (string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BrokerWithLeastPartitionReplicas") + ret := m.ctrl.Call(m, "BrokerWithLeastPartitionReplicas", ctx) ret0, _ := ret[0].(string) ret1, _ := ret[1].(error) return ret0, ret1 } // BrokerWithLeastPartitionReplicas indicates an expected call of BrokerWithLeastPartitionReplicas. -func (mr *MockCruiseControlScalerMockRecorder) BrokerWithLeastPartitionReplicas() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) BrokerWithLeastPartitionReplicas(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BrokerWithLeastPartitionReplicas", reflect.TypeOf((*MockCruiseControlScaler)(nil).BrokerWithLeastPartitionReplicas)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BrokerWithLeastPartitionReplicas", reflect.TypeOf((*MockCruiseControlScaler)(nil).BrokerWithLeastPartitionReplicas), ctx) } // BrokersWithState mocks base method. -func (m *MockCruiseControlScaler) BrokersWithState(states ...scale.KafkaBrokerState) ([]string, error) { +func (m *MockCruiseControlScaler) BrokersWithState(ctx context.Context, states ...scale.KafkaBrokerState) ([]string, error) { m.ctrl.T.Helper() - varargs := []interface{}{} + varargs := []interface{}{ctx} for _, a := range states { varargs = append(varargs, a) } @@ -115,103 +116,104 @@ func (m *MockCruiseControlScaler) BrokersWithState(states ...scale.KafkaBrokerSt } // BrokersWithState indicates an expected call of BrokersWithState. -func (mr *MockCruiseControlScalerMockRecorder) BrokersWithState(states ...interface{}) *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) BrokersWithState(ctx interface{}, states ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BrokersWithState", reflect.TypeOf((*MockCruiseControlScaler)(nil).BrokersWithState), states...) + varargs := append([]interface{}{ctx}, states...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BrokersWithState", reflect.TypeOf((*MockCruiseControlScaler)(nil).BrokersWithState), varargs...) } // IsReady mocks base method. -func (m *MockCruiseControlScaler) IsReady() bool { +func (m *MockCruiseControlScaler) IsReady(ctx context.Context) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsReady") + ret := m.ctrl.Call(m, "IsReady", ctx) ret0, _ := ret[0].(bool) return ret0 } // IsReady indicates an expected call of IsReady. -func (mr *MockCruiseControlScalerMockRecorder) IsReady() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) IsReady(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockCruiseControlScaler)(nil).IsReady)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockCruiseControlScaler)(nil).IsReady), ctx) } // IsUp mocks base method. -func (m *MockCruiseControlScaler) IsUp() bool { +func (m *MockCruiseControlScaler) IsUp(ctx context.Context) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsUp") + ret := m.ctrl.Call(m, "IsUp", ctx) ret0, _ := ret[0].(bool) return ret0 } // IsUp indicates an expected call of IsUp. -func (mr *MockCruiseControlScalerMockRecorder) IsUp() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) IsUp(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUp", reflect.TypeOf((*MockCruiseControlScaler)(nil).IsUp)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUp", reflect.TypeOf((*MockCruiseControlScaler)(nil).IsUp), ctx) } // KafkaClusterLoad mocks base method. -func (m *MockCruiseControlScaler) KafkaClusterLoad() (*api.KafkaClusterLoadResponse, error) { +func (m *MockCruiseControlScaler) KafkaClusterLoad(ctx context.Context) (*api.KafkaClusterLoadResponse, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "KafkaClusterLoad") + ret := m.ctrl.Call(m, "KafkaClusterLoad", ctx) ret0, _ := ret[0].(*api.KafkaClusterLoadResponse) ret1, _ := ret[1].(error) return ret0, ret1 } // KafkaClusterLoad indicates an expected call of KafkaClusterLoad. -func (mr *MockCruiseControlScalerMockRecorder) KafkaClusterLoad() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) KafkaClusterLoad(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KafkaClusterLoad", reflect.TypeOf((*MockCruiseControlScaler)(nil).KafkaClusterLoad)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KafkaClusterLoad", reflect.TypeOf((*MockCruiseControlScaler)(nil).KafkaClusterLoad), ctx) } // KafkaClusterState mocks base method. -func (m *MockCruiseControlScaler) KafkaClusterState() (*types.KafkaClusterState, error) { +func (m *MockCruiseControlScaler) KafkaClusterState(ctx context.Context) (*types.KafkaClusterState, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "KafkaClusterState") + ret := m.ctrl.Call(m, "KafkaClusterState", ctx) ret0, _ := ret[0].(*types.KafkaClusterState) ret1, _ := ret[1].(error) return ret0, ret1 } // KafkaClusterState indicates an expected call of KafkaClusterState. -func (mr *MockCruiseControlScalerMockRecorder) KafkaClusterState() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) KafkaClusterState(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KafkaClusterState", reflect.TypeOf((*MockCruiseControlScaler)(nil).KafkaClusterState)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KafkaClusterState", reflect.TypeOf((*MockCruiseControlScaler)(nil).KafkaClusterState), ctx) } // LogDirsByBroker mocks base method. -func (m *MockCruiseControlScaler) LogDirsByBroker() (map[string]map[scale.LogDirState][]string, error) { +func (m *MockCruiseControlScaler) LogDirsByBroker(ctx context.Context) (map[string]map[scale.LogDirState][]string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LogDirsByBroker") + ret := m.ctrl.Call(m, "LogDirsByBroker", ctx) ret0, _ := ret[0].(map[string]map[scale.LogDirState][]string) ret1, _ := ret[1].(error) return ret0, ret1 } // LogDirsByBroker indicates an expected call of LogDirsByBroker. -func (mr *MockCruiseControlScalerMockRecorder) LogDirsByBroker() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) LogDirsByBroker(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LogDirsByBroker", reflect.TypeOf((*MockCruiseControlScaler)(nil).LogDirsByBroker)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LogDirsByBroker", reflect.TypeOf((*MockCruiseControlScaler)(nil).LogDirsByBroker), ctx) } // PartitionReplicasByBroker mocks base method. -func (m *MockCruiseControlScaler) PartitionReplicasByBroker() (map[string]int32, error) { +func (m *MockCruiseControlScaler) PartitionReplicasByBroker(ctx context.Context) (map[string]int32, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PartitionReplicasByBroker") + ret := m.ctrl.Call(m, "PartitionReplicasByBroker", ctx) ret0, _ := ret[0].(map[string]int32) ret1, _ := ret[1].(error) return ret0, ret1 } // PartitionReplicasByBroker indicates an expected call of PartitionReplicasByBroker. -func (mr *MockCruiseControlScalerMockRecorder) PartitionReplicasByBroker() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) PartitionReplicasByBroker(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PartitionReplicasByBroker", reflect.TypeOf((*MockCruiseControlScaler)(nil).PartitionReplicasByBroker)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PartitionReplicasByBroker", reflect.TypeOf((*MockCruiseControlScaler)(nil).PartitionReplicasByBroker), ctx) } // RebalanceDisks mocks base method. -func (m *MockCruiseControlScaler) RebalanceDisks(brokerIDs ...string) (*scale.Result, error) { +func (m *MockCruiseControlScaler) RebalanceDisks(ctx context.Context, brokerIDs ...string) (*scale.Result, error) { m.ctrl.T.Helper() - varargs := []interface{}{} + varargs := []interface{}{ctx} for _, a := range brokerIDs { varargs = append(varargs, a) } @@ -222,30 +224,31 @@ func (m *MockCruiseControlScaler) RebalanceDisks(brokerIDs ...string) (*scale.Re } // RebalanceDisks indicates an expected call of RebalanceDisks. -func (mr *MockCruiseControlScalerMockRecorder) RebalanceDisks(brokerIDs ...interface{}) *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) RebalanceDisks(ctx interface{}, brokerIDs ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceDisks", reflect.TypeOf((*MockCruiseControlScaler)(nil).RebalanceDisks), brokerIDs...) + varargs := append([]interface{}{ctx}, brokerIDs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceDisks", reflect.TypeOf((*MockCruiseControlScaler)(nil).RebalanceDisks), varargs...) } // RebalanceWithParams mocks base method. -func (m *MockCruiseControlScaler) RebalanceWithParams(params map[string]string) (*scale.Result, error) { +func (m *MockCruiseControlScaler) RebalanceWithParams(ctx context.Context, params map[string]string) (*scale.Result, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RebalanceWithParams", params) + ret := m.ctrl.Call(m, "RebalanceWithParams", ctx, params) ret0, _ := ret[0].(*scale.Result) ret1, _ := ret[1].(error) return ret0, ret1 } // RebalanceWithParams indicates an expected call of RebalanceWithParams. -func (mr *MockCruiseControlScalerMockRecorder) RebalanceWithParams(params interface{}) *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) RebalanceWithParams(ctx, params interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceWithParams", reflect.TypeOf((*MockCruiseControlScaler)(nil).RebalanceWithParams), params) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceWithParams", reflect.TypeOf((*MockCruiseControlScaler)(nil).RebalanceWithParams), ctx, params) } // RemoveBrokers mocks base method. -func (m *MockCruiseControlScaler) RemoveBrokers(brokerIDs ...string) (*scale.Result, error) { +func (m *MockCruiseControlScaler) RemoveBrokers(ctx context.Context, brokerIDs ...string) (*scale.Result, error) { m.ctrl.T.Helper() - varargs := []interface{}{} + varargs := []interface{}{ctx} for _, a := range brokerIDs { varargs = append(varargs, a) } @@ -256,60 +259,61 @@ func (m *MockCruiseControlScaler) RemoveBrokers(brokerIDs ...string) (*scale.Res } // RemoveBrokers indicates an expected call of RemoveBrokers. -func (mr *MockCruiseControlScalerMockRecorder) RemoveBrokers(brokerIDs ...interface{}) *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) RemoveBrokers(ctx interface{}, brokerIDs ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveBrokers", reflect.TypeOf((*MockCruiseControlScaler)(nil).RemoveBrokers), brokerIDs...) + varargs := append([]interface{}{ctx}, brokerIDs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveBrokers", reflect.TypeOf((*MockCruiseControlScaler)(nil).RemoveBrokers), varargs...) } // RemoveBrokersWithParams mocks base method. -func (m *MockCruiseControlScaler) RemoveBrokersWithParams(params map[string]string) (*scale.Result, error) { +func (m *MockCruiseControlScaler) RemoveBrokersWithParams(ctx context.Context, params map[string]string) (*scale.Result, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveBrokersWithParams", params) + ret := m.ctrl.Call(m, "RemoveBrokersWithParams", ctx, params) ret0, _ := ret[0].(*scale.Result) ret1, _ := ret[1].(error) return ret0, ret1 } // RemoveBrokersWithParams indicates an expected call of RemoveBrokersWithParams. -func (mr *MockCruiseControlScalerMockRecorder) RemoveBrokersWithParams(params interface{}) *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) RemoveBrokersWithParams(ctx, params interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveBrokersWithParams", reflect.TypeOf((*MockCruiseControlScaler)(nil).RemoveBrokersWithParams), params) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveBrokersWithParams", reflect.TypeOf((*MockCruiseControlScaler)(nil).RemoveBrokersWithParams), ctx, params) } // Status mocks base method. -func (m *MockCruiseControlScaler) Status() (scale.CruiseControlStatus, error) { +func (m *MockCruiseControlScaler) Status(ctx context.Context) (scale.CruiseControlStatus, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Status") + ret := m.ctrl.Call(m, "Status", ctx) ret0, _ := ret[0].(scale.CruiseControlStatus) ret1, _ := ret[1].(error) return ret0, ret1 } // Status indicates an expected call of Status. -func (mr *MockCruiseControlScalerMockRecorder) Status() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) Status(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockCruiseControlScaler)(nil).Status)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockCruiseControlScaler)(nil).Status), ctx) } // StopExecution mocks base method. -func (m *MockCruiseControlScaler) StopExecution() (*scale.Result, error) { +func (m *MockCruiseControlScaler) StopExecution(ctx context.Context) (*scale.Result, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StopExecution") + ret := m.ctrl.Call(m, "StopExecution", ctx) ret0, _ := ret[0].(*scale.Result) ret1, _ := ret[1].(error) return ret0, ret1 } // StopExecution indicates an expected call of StopExecution. -func (mr *MockCruiseControlScalerMockRecorder) StopExecution() *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) StopExecution(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopExecution", reflect.TypeOf((*MockCruiseControlScaler)(nil).StopExecution)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopExecution", reflect.TypeOf((*MockCruiseControlScaler)(nil).StopExecution), ctx) } // UserTasks mocks base method. -func (m *MockCruiseControlScaler) UserTasks(taskIDs ...string) ([]*scale.Result, error) { +func (m *MockCruiseControlScaler) UserTasks(ctx context.Context, taskIDs ...string) ([]*scale.Result, error) { m.ctrl.T.Helper() - varargs := []interface{}{} + varargs := []interface{}{ctx} for _, a := range taskIDs { varargs = append(varargs, a) } @@ -320,7 +324,8 @@ func (m *MockCruiseControlScaler) UserTasks(taskIDs ...string) ([]*scale.Result, } // UserTasks indicates an expected call of UserTasks. -func (mr *MockCruiseControlScalerMockRecorder) UserTasks(taskIDs ...interface{}) *gomock.Call { +func (mr *MockCruiseControlScalerMockRecorder) UserTasks(ctx interface{}, taskIDs ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UserTasks", reflect.TypeOf((*MockCruiseControlScaler)(nil).UserTasks), taskIDs...) + varargs := append([]interface{}{ctx}, taskIDs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UserTasks", reflect.TypeOf((*MockCruiseControlScaler)(nil).UserTasks), varargs...) } diff --git a/go.mod b/go.mod index 065538850..6e1ba07ff 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( emperror.dev/errors v0.8.1 github.com/Masterminds/sprig/v3 v3.2.2 github.com/Shopify/sarama v1.36.0 - github.com/banzaicloud/go-cruise-control v0.2.0 + github.com/banzaicloud/go-cruise-control v0.4.0 github.com/banzaicloud/istio-client-go v0.0.17 github.com/banzaicloud/istio-operator/api/v2 v2.15.1 github.com/banzaicloud/k8s-objectmatcher v1.8.0 @@ -18,8 +18,8 @@ require ( github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/go-logr/logr v1.2.3 github.com/imdario/mergo v0.3.13 - github.com/onsi/ginkgo/v2 v2.5.0 - github.com/onsi/gomega v1.24.0 + github.com/onsi/ginkgo/v2 v2.8.4 + github.com/onsi/gomega v1.27.2 github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.0 github.com/prometheus/common v0.37.0 github.com/stretchr/testify v1.8.0 @@ -35,6 +35,12 @@ require ( sigs.k8s.io/controller-runtime v0.13.0 ) +require ( + github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect + github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect + golang.org/x/tools v0.6.0 // indirect +) + require ( cloud.google.com/go v0.99.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect diff --git a/go.sum b/go.sum index c54e3117d..15a2e1e3f 100644 --- a/go.sum +++ b/go.sum @@ -92,8 +92,8 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= -github.com/banzaicloud/go-cruise-control v0.2.0 h1:g0zsJMuFrw/nUa7f9Y5IZIvB9pR6jF6usOHMbY8+J6Q= -github.com/banzaicloud/go-cruise-control v0.2.0/go.mod h1:TdgOy6tNm8tXAIck6lDBRXClGV+/TWoqR/DD1aKvW0U= +github.com/banzaicloud/go-cruise-control v0.4.0 h1:+RH5D6k+SVe99Hqmbaw2LkHtS1ySRU9nprjai9Z+Ipk= +github.com/banzaicloud/go-cruise-control v0.4.0/go.mod h1:6pfmzhD23At4/QV2capmSquv5hkyDUSDAqjNMoS2+70= github.com/banzaicloud/istio-client-go v0.0.17 h1:wiplbM7FDiIHopujInAnin3zuovtVcphtKy9En39q5I= github.com/banzaicloud/istio-client-go v0.0.17/go.mod h1:rpnEYYGHzisx8nARl2d30Oq38EeCX0/PPaxMaREfE9I= github.com/banzaicloud/istio-operator/api/v2 v2.15.1 h1:BZg8COvoOJtfx/dgN7KpoOnce0LxDrElNHbvxNySs6g= @@ -261,6 +261,7 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng= github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M= github.com/gobuffalo/flect v0.2.3/go.mod h1:vmkQwuZYhN5Pc4ljYQZzP+1sq+NEkK+lh20jmEmX3jc= @@ -328,6 +329,7 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -517,15 +519,15 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108 github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/ginkgo/v2 v2.5.0 h1:TRtrvv2vdQqzkwrQ1ke6vtXf7IK34RBUJafIy1wMwls= -github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw= +github.com/onsi/ginkgo/v2 v2.8.4 h1:gf5mIQ8cLFieruNLAdgijHF1PYfLphKm2dxxcUtcqK0= +github.com/onsi/ginkgo/v2 v2.8.4/go.mod h1:427dEDQZkDKsBvCjc2A/ZPefhKxsTTrsQegMlayL730= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= -github.com/onsi/gomega v1.24.0 h1:+0glovB9Jd6z3VR+ScSwQqXVTIfJcGA9UBM8yzQxhqg= -github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= +github.com/onsi/gomega v1.27.2 h1:SKU0CXeKE/WVgIV1T61kSa3+IRE8Ekrv9rdXDwwTqnY= +github.com/onsi/gomega v1.27.2/go.mod h1:5mR3phAHpkAVIDkHEUBY6HGVsU+cpcEscrGPB4oPlZI= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.0 h1:y9azNmMzvkNBPyczpNRwaV4bm0U6e7Oyrj7gi2/SNFI= github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.0/go.mod h1:lAVhWwbNaveeJmxrxuSTxMgKpF6DjnuVpn6T8WiBwYQ= @@ -830,8 +832,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1000,6 +1002,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/alertmanager/currentalert/current_alerts.go b/internal/alertmanager/currentalert/current_alerts.go index bc69805a8..021e24a21 100644 --- a/internal/alertmanager/currentalert/current_alerts.go +++ b/internal/alertmanager/currentalert/current_alerts.go @@ -15,6 +15,7 @@ package currentalert import ( + "context" "errors" "sync" @@ -29,7 +30,7 @@ type CurrentAlerts interface { AlertGC(AlertState) error DeleteAlert(model.Fingerprint) error ListAlerts() map[model.Fingerprint]*currentAlertStruct - HandleAlert(model.Fingerprint, client.Client, int, logr.Logger) (*currentAlertStruct, error) + HandleAlert(context.Context, model.Fingerprint, client.Client, int, logr.Logger) (*currentAlertStruct, error) GetRollingUpgradeAlertCount() int IgnoreCCStatusCheck(bool) } @@ -116,7 +117,7 @@ func (a *currentAlerts) AlertGC(alert AlertState) error { return nil } -func (a *currentAlerts) HandleAlert(alertFp model.Fingerprint, client client.Client, rollingUpgradeAlertCount int, log logr.Logger) (*currentAlertStruct, error) { +func (a *currentAlerts) HandleAlert(ctx context.Context, alertFp model.Fingerprint, client client.Client, rollingUpgradeAlertCount int, log logr.Logger) (*currentAlertStruct, error) { a.lock.Lock() defer a.lock.Unlock() if _, ok := a.alerts[alertFp]; !ok { @@ -134,7 +135,7 @@ func (a *currentAlerts) HandleAlert(alertFp model.Fingerprint, client client.Cli // - alert has to be skipped because of broker upscale/downscale limits // - unknown command is presented // on every other case examineAlert will throw an error - alertProcessed, err := e.examineAlert(rollingUpgradeAlertCount) + alertProcessed, err := e.examineAlert(ctx, rollingUpgradeAlertCount) if err != nil { return nil, err } diff --git a/internal/alertmanager/currentalert/current_alerts_test.go b/internal/alertmanager/currentalert/current_alerts_test.go index 877675351..c6f4762d3 100644 --- a/internal/alertmanager/currentalert/current_alerts_test.go +++ b/internal/alertmanager/currentalert/current_alerts_test.go @@ -160,7 +160,7 @@ func TestGetCurrentAlerts(t *testing.T) { }, }, } - //Create Namespace first + // Create Namespace first ensureCreated(t, kafkaNamespace, mgr) ensureCreated(t, kafkaCluster, mgr) @@ -213,7 +213,7 @@ func TestGetCurrentAlerts(t *testing.T) { } testRollingUpgradeErrorCount := 5 - currAlert, err := alerts1.HandleAlert(testAlert1.FingerPrint, c, testRollingUpgradeErrorCount, log) + currAlert, err := alerts1.HandleAlert(ctx, testAlert1.FingerPrint, c, testRollingUpgradeErrorCount, log) if err != nil { t.Error("Hanlde alert failed a1 with error", err) } @@ -259,7 +259,7 @@ func TestGetCurrentAlerts(t *testing.T) { t.Error("2222 alert wasn't deleted") } - _, err = alerts3.HandleAlert(model.Fingerprint(2222), c, 0, log) + _, err = alerts3.HandleAlert(ctx, model.Fingerprint(2222), c, 0, log) expected := "alert doesn't exist" if err == nil || err.Error() != expected { t.Error("alert with 2222 isn't the expected", err) diff --git a/internal/alertmanager/currentalert/process.go b/internal/alertmanager/currentalert/process.go index 42a9c6f0b..d4284e3b4 100644 --- a/internal/alertmanager/currentalert/process.go +++ b/internal/alertmanager/currentalert/process.go @@ -87,7 +87,7 @@ func (e *examiner) getKafkaCr() (*v1beta1.KafkaCluster, error) { return cr, nil } -func (e *examiner) examineAlert(rollingUpgradeAlertCount int) (bool, error) { +func (e *examiner) examineAlert(ctx context.Context, rollingUpgradeAlertCount int) (bool, error) { cr, err := e.getKafkaCr() if err != nil { return false, err @@ -115,10 +115,10 @@ func (e *examiner) examineAlert(rollingUpgradeAlertCount int) (bool, error) { } } - return e.processAlert(ds) + return e.processAlert(ctx, ds) } -func (e *examiner) processAlert(ds disableScaling) (bool, error) { +func (e *examiner) processAlert(ctx context.Context, ds disableScaling) (bool, error) { switch e.Alert.Annotations["command"] { case AddPvcCommand: validators := AlertValidators{newAddPvcValidator(e.Alert)} @@ -151,7 +151,7 @@ func (e *examiner) processAlert(ds disableScaling) (bool, error) { e.Log.Info("downscale is skipped due to downscale limit") return false, nil } - err := downScale(e.Log, e.Alert.Labels, e.Client) + err := downScale(ctx, e.Log, e.Alert.Labels, e.Client) if err != nil { return false, err } @@ -293,7 +293,7 @@ func resizePvc(log logr.Logger, labels model.LabelSet, annotiations model.LabelS return nil } -func downScale(log logr.Logger, labels model.LabelSet, client client.Client) error { +func downScale(ctx context.Context, log logr.Logger, labels model.LabelSet, client client.Client) error { cr, err := k8sutil.GetCr(string(labels[v1beta1.KafkaCRLabelKey]), string(labels["namespace"]), client) if err != nil { return err @@ -321,7 +321,7 @@ func downScale(log logr.Logger, labels model.LabelSet, client client.Client) err return errors.WrapIfWithDetails(err, "failed to initialize Cruise Control Scaler", "cruise control url", cruiseControlURL) } - brokerID, err = cc.BrokerWithLeastPartitionReplicas() + brokerID, err = cc.BrokerWithLeastPartitionReplicas(ctx) if err != nil { return err } diff --git a/internal/alertmanager/currentalert/process_test.go b/internal/alertmanager/currentalert/process_test.go index 8f13c710d..0506bd849 100644 --- a/internal/alertmanager/currentalert/process_test.go +++ b/internal/alertmanager/currentalert/process_test.go @@ -599,6 +599,8 @@ func Test_upScale(t *testing.T) { func Test_downScale(t *testing.T) { testClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build() + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() testCases := []struct { testName string @@ -665,7 +667,7 @@ func Test_downScale(t *testing.T) { } }() - if err := downScale(logr.Discard(), test.alert.Labels, testClient); err != nil { + if err := downScale(ctx, logr.Discard(), test.alert.Labels, testClient); err != nil { t.Error(err) return } diff --git a/internal/alertmanager/dispatcher/dispatcher.go b/internal/alertmanager/dispatcher/dispatcher.go index 8e0559df1..3019a51a2 100644 --- a/internal/alertmanager/dispatcher/dispatcher.go +++ b/internal/alertmanager/dispatcher/dispatcher.go @@ -15,6 +15,8 @@ package dispatcher import ( + "context" + "github.com/go-logr/logr" "github.com/prometheus/common/model" "sigs.k8s.io/controller-runtime/pkg/client" @@ -23,7 +25,7 @@ import ( ) // Dispatcher calls actors based on alert annotations -func Dispatcher(promAlerts []model.Alert, log logr.Logger, client client.Client) { +func Dispatcher(ctx context.Context, promAlerts []model.Alert, log logr.Logger, client client.Client) { storedAlerts := currentalert.GetCurrentAlerts() for _, promAlert := range alertFilter(promAlerts) { store := currentalert.AlertState{ @@ -41,7 +43,7 @@ func Dispatcher(promAlerts []model.Alert, log logr.Logger, client client.Client) rollingUpgradeAlertCount := storedAlerts.GetRollingUpgradeAlertCount() for key, value := range storedAlerts.ListAlerts() { log.Info("Stored Alert", "key", key, "status", value.Status, "labels", value.Labels, "annotations", value.Annotations, "processed", value.Processed) - _, err := storedAlerts.HandleAlert(key, client, rollingUpgradeAlertCount, log) + _, err := storedAlerts.HandleAlert(ctx, key, client, rollingUpgradeAlertCount, log) if err != nil { log.Error(err, "failed to handle alert", "fingerprint", key) } diff --git a/internal/alertmanager/receiver/http_handler.go b/internal/alertmanager/receiver/http_handler.go index 7e3be85e1..0d9b64668 100644 --- a/internal/alertmanager/receiver/http_handler.go +++ b/internal/alertmanager/receiver/http_handler.go @@ -56,7 +56,7 @@ func (a *HTTPController) reciveAlert(w http.ResponseWriter, r *http.Request) { http.Error(w, "reading request body failed", http.StatusInternalServerError) return } - err = alertReciever(a.Logger, alert, a.Client) + err = alertReciever(r.Context(), a.Logger, alert, a.Client) if err != nil { http.Error(w, "alert receiver error", http.StatusBadRequest) return diff --git a/internal/alertmanager/receiver/receiver.go b/internal/alertmanager/receiver/receiver.go index 77b2a2914..e703e286d 100644 --- a/internal/alertmanager/receiver/receiver.go +++ b/internal/alertmanager/receiver/receiver.go @@ -15,6 +15,7 @@ package receiver import ( + "context" "encoding/json" "github.com/go-logr/logr" @@ -24,13 +25,13 @@ import ( "github.com/banzaicloud/koperator/internal/alertmanager/dispatcher" ) -func alertReciever(log logr.Logger, alert []byte, client client.Client) error { +func alertReciever(ctx context.Context, log logr.Logger, alert []byte, client client.Client) error { promAlerts := make([]model.Alert, 0) err := json.Unmarshal(alert, &promAlerts) if err != nil { return err } - dispatcher.Dispatcher(promAlerts, log, client) + dispatcher.Dispatcher(ctx, promAlerts, log, client) return nil } diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index bd2835b23..81d91a7ff 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -210,7 +210,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { } // Handle Pod delete - err := r.reconcileKafkaPodDelete(log) + err := r.reconcileKafkaPodDelete(ctx, log) if err != nil { return errors.WrapIf(err, "failed to reconcile resource") } @@ -402,7 +402,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return nil } -func (r *Reconciler) reconcileKafkaPodDelete(log logr.Logger) error { +func (r *Reconciler) reconcileKafkaPodDelete(ctx context.Context, log logr.Logger) error { podList := &corev1.PodList{} err := r.Client.List(context.TODO(), podList, client.InNamespace(r.KafkaCluster.Namespace), @@ -446,7 +446,7 @@ func (r *Reconciler) reconcileKafkaPodDelete(log logr.Logger) error { scale.KafkaBrokerDemoted, scale.KafkaBrokerBadDisks, } - availableBrokers, err := cc.BrokersWithState(brokerStates...) + availableBrokers, err := cc.BrokersWithState(ctx, brokerStates...) if err != nil { log.Error(err, "failed to get the list of available brokers from Cruise Control") return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index aceea2cd5..3e213fbde 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -85,7 +85,7 @@ func createNewDefaultCruiseControlScaler(ctx context.Context, serverURL string) UserAgent: "koperator", } - cruisecontrol, err := client.NewClient(ctx, cfg) + cruisecontrol, err := client.NewClient(cfg) if err != nil { log.Error(err, "creating Cruise Control client failed") return nil, err @@ -104,10 +104,10 @@ type cruiseControlScaler struct { } // Status returns a CruiseControlStatus describing the internal state of Cruise Control. -func (cc *cruiseControlScaler) Status() (CruiseControlStatus, error) { +func (cc *cruiseControlScaler) Status(ctx context.Context) (CruiseControlStatus, error) { req := api.StateRequestWithDefaults() req.Verbose = true - resp, err := cc.client.State(req) + resp, err := cc.client.State(ctx, req) if err != nil { return CruiseControlStatus{}, err } @@ -134,8 +134,8 @@ func (cc *cruiseControlScaler) Status() (CruiseControlStatus, error) { } // IsReady returns true if the Analyzer and Monitor components of Cruise Control are in ready state. -func (cc *cruiseControlScaler) IsReady() bool { - status, err := cc.Status() +func (cc *cruiseControlScaler) IsReady(ctx context.Context) bool { + status, err := cc.Status(ctx) if err != nil { cc.log.Error(err, "could not get Cruise Control status") return false @@ -151,18 +151,18 @@ func (cc *cruiseControlScaler) IsReady() bool { } // IsUp returns true if Cruise Control is online. -func (cc *cruiseControlScaler) IsUp() bool { - _, err := cc.client.State(api.StateRequestWithDefaults()) +func (cc *cruiseControlScaler) IsUp(ctx context.Context) bool { + _, err := cc.client.State(ctx, api.StateRequestWithDefaults()) return err == nil } // UserTasks returns list of Result describing User Tasks from Cruise Control for the provided task IDs. -func (cc *cruiseControlScaler) UserTasks(taskIDs ...string) ([]*Result, error) { +func (cc *cruiseControlScaler) UserTasks(ctx context.Context, taskIDs ...string) ([]*Result, error) { req := &api.UserTasksRequest{ UserTaskIDs: taskIDs, } - resp, err := cc.client.UserTasks(req) + resp, err := cc.client.UserTasks(ctx, req) if err != nil { return nil, err } @@ -196,7 +196,7 @@ func parseBrokerIDtoSlice(brokerid string) ([]int32, error) { // AddBrokersWithParams requests Cruise Control to add the list of provided brokers to the Kafka cluster // by reassigning partition replicas to them. The broker list and operation properties can be added // with the use of the params argument. -func (cc *cruiseControlScaler) AddBrokersWithParams(params map[string]string) (*Result, error) { +func (cc *cruiseControlScaler) AddBrokersWithParams(ctx context.Context, params map[string]string) (*Result, error) { addBrokerReq := &api.AddBrokerRequest{ AllowCapacityEstimation: true, DataFrom: types.ProposalDataSourceValidWindows, @@ -229,7 +229,7 @@ func (cc *cruiseControlScaler) AddBrokersWithParams(params map[string]string) (* } } - addBrokerResp, err := cc.client.AddBroker(addBrokerReq) + addBrokerResp, err := cc.client.AddBroker(ctx, addBrokerReq) if err != nil { return &Result{ TaskID: addBrokerResp.TaskID, @@ -252,9 +252,9 @@ func (cc *cruiseControlScaler) AddBrokersWithParams(params map[string]string) (* } // StopExecution requests Cruise Control to stop running operation gracefully -func (cc *cruiseControlScaler) StopExecution() (*Result, error) { - stopReq := api.StopProposalExecutionRequest{} - stopResp, err := cc.client.StopProposalExecution(&stopReq) +func (cc *cruiseControlScaler) StopExecution(ctx context.Context) (*Result, error) { + stopReq := &api.StopProposalExecutionRequest{} + stopResp, err := cc.client.StopProposalExecution(ctx, stopReq) if err != nil { return &Result{ TaskID: stopResp.TaskID, @@ -273,7 +273,7 @@ func (cc *cruiseControlScaler) StopExecution() (*Result, error) { }, nil } -func (cc *cruiseControlScaler) RemoveBrokersWithParams(params map[string]string) (*Result, error) { +func (cc *cruiseControlScaler) RemoveBrokersWithParams(ctx context.Context, params map[string]string) (*Result, error) { rmBrokerReq := &api.RemoveBrokerRequest{ AllowCapacityEstimation: true, DataFrom: types.ProposalDataSourceValidWindows, @@ -306,7 +306,7 @@ func (cc *cruiseControlScaler) RemoveBrokersWithParams(params map[string]string) } } - rmBrokerResp, err := cc.client.RemoveBroker(rmBrokerReq) + rmBrokerResp, err := cc.client.RemoveBroker(ctx, rmBrokerReq) if err != nil { return &Result{ TaskID: rmBrokerResp.TaskID, @@ -331,7 +331,7 @@ func (cc *cruiseControlScaler) RemoveBrokersWithParams(params map[string]string) // AddBrokers requests Cruise Control to add the list of provided brokers to the Kafka cluster // by reassigning partition replicas to them. // Request returns an error if not all brokers are available in Cruise Control. -func (cc *cruiseControlScaler) AddBrokers(brokerIDs ...string) (*Result, error) { +func (cc *cruiseControlScaler) AddBrokers(ctx context.Context, brokerIDs ...string) (*Result, error) { if len(brokerIDs) == 0 { return nil, errors.New("no broker id(s) provided for add brokers request") } @@ -343,7 +343,7 @@ func (cc *cruiseControlScaler) AddBrokers(brokerIDs ...string) (*Result, error) } states := []KafkaBrokerState{KafkaBrokerAlive, KafkaBrokerNew} - availableBrokers, err := cc.BrokersWithState(states...) + availableBrokers, err := cc.BrokersWithState(ctx, states...) if err != nil { cc.log.Error(err, "failed to retrieve list of available brokers from Cruise Control") return nil, err @@ -368,7 +368,7 @@ func (cc *cruiseControlScaler) AddBrokers(brokerIDs ...string) (*Result, error) DataFrom: types.ProposalDataSourceValidWindows, UseReadyDefaultGoals: true, } - addBrokerResp, err := cc.client.AddBroker(addBrokerReq) + addBrokerResp, err := cc.client.AddBroker(ctx, addBrokerReq) if err != nil { return &Result{ TaskID: addBrokerResp.TaskID, @@ -391,13 +391,13 @@ func (cc *cruiseControlScaler) AddBrokers(brokerIDs ...string) (*Result, error) // RemoveBrokers requests Cruise Control to move partition replicase off from the provided brokers. // The broker list and operation properties can be added with the use of the params argument. -func (cc *cruiseControlScaler) RemoveBrokers(brokerIDs ...string) (*Result, error) { +func (cc *cruiseControlScaler) RemoveBrokers(ctx context.Context, brokerIDs ...string) (*Result, error) { if len(brokerIDs) == 0 { return nil, errors.New("no broker id(s) provided for remove brokers request") } clusterStateReq := api.KafkaClusterStateRequestWithDefaults() - clusterStateResp, err := cc.client.KafkaClusterState(clusterStateReq) + clusterStateResp, err := cc.client.KafkaClusterState(ctx, clusterStateReq) if err != nil { return nil, err } @@ -432,7 +432,7 @@ func (cc *cruiseControlScaler) RemoveBrokers(brokerIDs ...string) (*Result, erro DataFrom: types.ProposalDataSourceValidWindows, UseReadyDefaultGoals: true, } - rmBrokerResp, err := cc.client.RemoveBroker(rmBrokerReq) + rmBrokerResp, err := cc.client.RemoveBroker(ctx, rmBrokerReq) if err != nil { return &Result{ @@ -454,7 +454,7 @@ func (cc *cruiseControlScaler) RemoveBrokers(brokerIDs ...string) (*Result, erro }, nil } -func (cc *cruiseControlScaler) RebalanceWithParams(params map[string]string) (*Result, error) { +func (cc *cruiseControlScaler) RebalanceWithParams(ctx context.Context, params map[string]string) (*Result, error) { rebalanceReq := &api.RebalanceRequest{ AllowCapacityEstimation: true, DataFrom: types.ProposalDataSourceValidWindows, @@ -494,7 +494,7 @@ func (cc *cruiseControlScaler) RebalanceWithParams(params map[string]string) (*R } } - rebalanceResp, err := cc.client.Rebalance(rebalanceReq) + rebalanceResp, err := cc.client.Rebalance(ctx, rebalanceReq) if err != nil { return &Result{ TaskID: rebalanceResp.TaskID, @@ -516,8 +516,8 @@ func (cc *cruiseControlScaler) RebalanceWithParams(params map[string]string) (*R }, nil } -func (cc *cruiseControlScaler) KafkaClusterLoad() (*api.KafkaClusterLoadResponse, error) { - clusterLoadResp, err := cc.client.KafkaClusterLoad(api.KafkaClusterLoadRequestWithDefaults()) +func (cc *cruiseControlScaler) KafkaClusterLoad(ctx context.Context) (*api.KafkaClusterLoadResponse, error) { + clusterLoadResp, err := cc.client.KafkaClusterLoad(ctx, api.KafkaClusterLoadRequestWithDefaults()) if err != nil { return nil, err } @@ -525,8 +525,8 @@ func (cc *cruiseControlScaler) KafkaClusterLoad() (*api.KafkaClusterLoadResponse } // RebalanceDisks performs a disk rebalance via Cruise Control for the provided list of brokers. -func (cc *cruiseControlScaler) RebalanceDisks(brokerIDs ...string) (*Result, error) { - clusterLoadResp, err := cc.client.KafkaClusterLoad(api.KafkaClusterLoadRequestWithDefaults()) +func (cc *cruiseControlScaler) RebalanceDisks(ctx context.Context, brokerIDs ...string) (*Result, error) { + clusterLoadResp, err := cc.client.KafkaClusterLoad(ctx, api.KafkaClusterLoadRequestWithDefaults()) if err != nil { return nil, err } @@ -558,7 +558,7 @@ func (cc *cruiseControlScaler) RebalanceDisks(brokerIDs ...string) (*Result, err UseReadyDefaultGoals: true, ExcludeRecentlyRemovedBrokers: true, } - rebalanceResp, err := cc.client.Rebalance(rebalanceReq) + rebalanceResp, err := cc.client.Rebalance(ctx, rebalanceReq) if err != nil { return &Result{ TaskID: rebalanceResp.TaskID, @@ -581,8 +581,8 @@ func (cc *cruiseControlScaler) RebalanceDisks(brokerIDs ...string) (*Result, err // BrokersWithState returns a list of IDs for Kafka brokers which are available in Cruise Control // and have one of the expected states. -func (cc *cruiseControlScaler) BrokersWithState(states ...KafkaBrokerState) ([]string, error) { - resp, err := cc.client.KafkaClusterLoad(api.KafkaClusterLoadRequestWithDefaults()) +func (cc *cruiseControlScaler) BrokersWithState(ctx context.Context, states ...KafkaBrokerState) ([]string, error) { + resp, err := cc.client.KafkaClusterLoad(ctx, api.KafkaClusterLoadRequestWithDefaults()) if err != nil { switch { case strings.Contains(err.Error(), nullPointerExceptionErrString): @@ -609,19 +609,19 @@ func (cc *cruiseControlScaler) BrokersWithState(states ...KafkaBrokerState) ([]s } // KafkaClusterState returns the state of the Kafka cluster -func (cc *cruiseControlScaler) KafkaClusterState() (*types.KafkaClusterState, error) { +func (cc *cruiseControlScaler) KafkaClusterState(ctx context.Context) (*types.KafkaClusterState, error) { clusterStateReq := api.KafkaClusterStateRequestWithDefaults() - clusterStateResp, err := cc.client.KafkaClusterState(clusterStateReq) + clusterStateResp, err := cc.client.KafkaClusterState(ctx, clusterStateReq) if err != nil { return nil, err } return clusterStateResp.Result, nil } -// PartitionReplicasByBroker returns the number of partition replicas for every broker in the Kafka cluster. -func (cc *cruiseControlScaler) PartitionLeadersReplicasByBroker() (brokerIDReplicaCounts map[string]int32, brokerIDLeaderCounts map[string]int32, err error) { +// PartitionLeadersReplicasByBroker returns the number of partition replicas for every broker in the Kafka cluster. +func (cc *cruiseControlScaler) PartitionLeadersReplicasByBroker(ctx context.Context) (brokerIDReplicaCounts map[string]int32, brokerIDLeaderCounts map[string]int32, err error) { clusterStateReq := api.KafkaClusterStateRequestWithDefaults() - clusterStateResp, err := cc.client.KafkaClusterState(clusterStateReq) + clusterStateResp, err := cc.client.KafkaClusterState(ctx, clusterStateReq) if err != nil { return nil, nil, err } @@ -629,10 +629,10 @@ func (cc *cruiseControlScaler) PartitionLeadersReplicasByBroker() (brokerIDRepli } // BrokerWithLeastPartitionReplicas returns the ID of the broker which host the least partition replicas. -func (cc *cruiseControlScaler) BrokerWithLeastPartitionReplicas() (string, error) { +func (cc *cruiseControlScaler) BrokerWithLeastPartitionReplicas(ctx context.Context) (string, error) { var brokerWithLeastPartitionReplicas string - brokerPartitions, err := cc.PartitionReplicasByBroker() + brokerPartitions, err := cc.PartitionReplicasByBroker(ctx) if err != nil { cc.log.Error(err, "could not retrieve partition map for brokers") return brokerWithLeastPartitionReplicas, err @@ -649,8 +649,8 @@ func (cc *cruiseControlScaler) BrokerWithLeastPartitionReplicas() (string, error } // LogDirsByBroker returns the ID of the broker which host the least partition replicas. -func (cc *cruiseControlScaler) LogDirsByBroker() (map[string]map[LogDirState][]string, error) { - resp, err := cc.client.KafkaClusterState(api.KafkaClusterStateRequestWithDefaults()) +func (cc *cruiseControlScaler) LogDirsByBroker(ctx context.Context) (map[string]map[LogDirState][]string, error) { + resp, err := cc.client.KafkaClusterState(ctx, api.KafkaClusterStateRequestWithDefaults()) if err != nil { cc.log.Error(err, "getting Kafka cluster state from Cruise Control returned an error") return nil, err diff --git a/pkg/scale/types.go b/pkg/scale/types.go index c63becff2..79b1ca188 100644 --- a/pkg/scale/types.go +++ b/pkg/scale/types.go @@ -15,6 +15,8 @@ package scale import ( + "context" + "github.com/banzaicloud/go-cruise-control/pkg/api" "github.com/banzaicloud/go-cruise-control/pkg/types" @@ -22,23 +24,23 @@ import ( ) type CruiseControlScaler interface { - IsReady() bool - Status() (CruiseControlStatus, error) - UserTasks(taskIDs ...string) ([]*Result, error) - IsUp() bool - AddBrokers(brokerIDs ...string) (*Result, error) - AddBrokersWithParams(params map[string]string) (*Result, error) - RemoveBrokersWithParams(params map[string]string) (*Result, error) - RebalanceWithParams(params map[string]string) (*Result, error) - StopExecution() (*Result, error) - RemoveBrokers(brokerIDs ...string) (*Result, error) - RebalanceDisks(brokerIDs ...string) (*Result, error) - BrokersWithState(states ...KafkaBrokerState) ([]string, error) - KafkaClusterState() (*types.KafkaClusterState, error) - PartitionReplicasByBroker() (map[string]int32, error) - BrokerWithLeastPartitionReplicas() (string, error) - LogDirsByBroker() (map[string]map[LogDirState][]string, error) - KafkaClusterLoad() (*api.KafkaClusterLoadResponse, error) + IsReady(ctx context.Context) bool + Status(ctx context.Context) (CruiseControlStatus, error) + UserTasks(ctx context.Context, taskIDs ...string) ([]*Result, error) + IsUp(ctx context.Context) bool + AddBrokers(ctx context.Context, brokerIDs ...string) (*Result, error) + AddBrokersWithParams(ctx context.Context, params map[string]string) (*Result, error) + RemoveBrokersWithParams(ctx context.Context, params map[string]string) (*Result, error) + RebalanceWithParams(ctx context.Context, params map[string]string) (*Result, error) + StopExecution(ctx context.Context) (*Result, error) + RemoveBrokers(ctx context.Context, brokerIDs ...string) (*Result, error) + RebalanceDisks(ctx context.Context, brokerIDs ...string) (*Result, error) + BrokersWithState(ctx context.Context, states ...KafkaBrokerState) ([]string, error) + KafkaClusterState(ctx context.Context) (*types.KafkaClusterState, error) + PartitionReplicasByBroker(ctx context.Context) (map[string]int32, error) + BrokerWithLeastPartitionReplicas(ctx context.Context) (string, error) + LogDirsByBroker(ctx context.Context) (map[string]map[LogDirState][]string, error) + KafkaClusterLoad(ctx context.Context) (*api.KafkaClusterLoadResponse, error) } type Result struct {