From 8cdc6b94e9a47e93b8c4e8b8b834fcf1452a1636 Mon Sep 17 00:00:00 2001 From: Adam Janikowski <12255597+ajanikow@users.noreply.github.com> Date: Thu, 23 Nov 2023 10:58:34 +0100 Subject: [PATCH] [Feature] [ML] Deployment Handler (#1500) --- chart/kube-arangodb/templates/ml/role.yaml | 8 ++ cmd/cmd.go | 9 ++ docs/api/ArangoMLExtension.V1Alpha1.md | 6 ++ pkg/apis/backup/v1/backup.go | 8 ++ pkg/apis/ml/v1alpha1/extension.go | 8 ++ pkg/apis/ml/v1alpha1/extension_conditions.go | 27 +++++ pkg/apis/ml/v1alpha1/extension_status.go | 5 + pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go | 10 +- pkg/debug_package/generator.go | 1 + .../kubernetes/arango_deployments.go | 10 +- .../generators/kubernetes/arango_ml.go | 62 +++++++++++ .../kubernetes/arango_ml_batch_job.go | 73 +++++++++++++ .../kubernetes/arango_ml_cron_job.go | 73 +++++++++++++ .../kubernetes/arango_ml_extension.go | 73 +++++++++++++ .../kubernetes/arango_ml_storage.go | 73 +++++++++++++ pkg/handlers/backup/handler.go | 26 +---- pkg/operatorV2/errors_stop.go | 49 +++++++++ pkg/operatorV2/handle.go | 102 +++++++++++++++--- pkg/operatorV2/update.go | 86 +++++++++++++++ pkg/operatorV2/update_wraps.go | 38 +++++++ pkg/util/context.go | 39 +++++++ pkg/util/errors/execute.go | 33 ++++++ pkg/util/globals/global.go | 33 ++++++ 23 files changed, 804 insertions(+), 48 deletions(-) create mode 100644 pkg/apis/ml/v1alpha1/extension_conditions.go create mode 100644 pkg/debug_package/generators/kubernetes/arango_ml.go create mode 100644 pkg/debug_package/generators/kubernetes/arango_ml_batch_job.go create mode 100644 pkg/debug_package/generators/kubernetes/arango_ml_cron_job.go create mode 100644 pkg/debug_package/generators/kubernetes/arango_ml_extension.go create mode 100644 pkg/debug_package/generators/kubernetes/arango_ml_storage.go create mode 100644 pkg/operatorV2/errors_stop.go create mode 100644 pkg/operatorV2/update.go create mode 100644 pkg/operatorV2/update_wraps.go create mode 100644 pkg/util/context.go create mode 100644 pkg/util/errors/execute.go diff --git a/chart/kube-arangodb/templates/ml/role.yaml b/chart/kube-arangodb/templates/ml/role.yaml index 3c9384f24..784fab921 100644 --- a/chart/kube-arangodb/templates/ml/role.yaml +++ b/chart/kube-arangodb/templates/ml/role.yaml @@ -26,5 +26,13 @@ rules: - "arangomlstorages/status" verbs: - "*" + - apiGroups: + - "database.arangodb.com" + resources: + - "arangodeployments" + verbs: + - "get" + - "list" + - "watch" {{- end }} {{- end }} \ No newline at end of file diff --git a/cmd/cmd.go b/cmd/cmd.go index 721c6c115..bb129d7c3 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -160,6 +160,10 @@ var ( backupArangoD time.Duration backupUploadArangoD time.Duration } + operatorReconciliationRetry struct { + delay time.Duration + count int + } chaosOptions struct { allowed bool } @@ -222,6 +226,8 @@ func init() { f.DurationVar(&operatorTimeouts.backupUploadArangoD, "timeout.backup-upload", globals.BackupUploadArangoClientTimeout, "The request timeout to the ArangoDB during uploading files") f.DurationVar(&shutdownOptions.delay, "shutdown.delay", defaultShutdownDelay, "The delay before running shutdown handlers") f.DurationVar(&shutdownOptions.timeout, "shutdown.timeout", defaultShutdownTimeout, "Timeout for shutdown handlers") + f.DurationVar(&operatorReconciliationRetry.delay, "operator.reconciliation.retry.delay", globals.DefaultOperatorUpdateRetryDelay, "Delay between Object Update operations in the Reconciliation loop") + f.IntVar(&operatorReconciliationRetry.count, "operator.reconciliation.retry.count", globals.DefaultOperatorUpdateRetryCount, "Count of retries during Object Update operations in the Reconciliation loop") f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration") f.DurationVar(&operatorOptions.reconciliationDelay, "reconciliation.delay", 0, "Delay between reconciliation loops (<= 0 -> Disabled)") f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read") @@ -281,6 +287,9 @@ func executeMain(cmd *cobra.Command, args []string) { globals.GetGlobalTimeouts().BackupArangoClientTimeout().Set(operatorTimeouts.backupArangoD) globals.GetGlobalTimeouts().BackupArangoClientUploadTimeout().Set(operatorTimeouts.backupUploadArangoD) + globals.GetGlobals().Retry().OperatorUpdateRetryDelay().Set(operatorReconciliationRetry.delay) + globals.GetGlobals().Retry().OperatorUpdateRetryCount().Set(operatorReconciliationRetry.count) + globals.GetGlobals().Kubernetes().RequestBatchSize().Set(operatorKubernetesOptions.maxBatchSize) globals.GetGlobals().Backup().ConcurrentUploads().Set(operatorBackup.concurrentUploads) diff --git a/docs/api/ArangoMLExtension.V1Alpha1.md b/docs/api/ArangoMLExtension.V1Alpha1.md index 16838c257..93e0d1dfd 100644 --- a/docs/api/ArangoMLExtension.V1Alpha1.md +++ b/docs/api/ArangoMLExtension.V1Alpha1.md @@ -4,3 +4,9 @@ ## Status +### .status.conditions + +Type: `api.Conditions` [\[ref\]](https://github.com/arangodb/kube-arangodb/blob/1.2.35/pkg/apis/ml/v1alpha1/extension_status.go#L28) + +Conditions specific to the entire extension + diff --git a/pkg/apis/backup/v1/backup.go b/pkg/apis/backup/v1/backup.go index 4f1868ea2..16c254ca7 100644 --- a/pkg/apis/backup/v1/backup.go +++ b/pkg/apis/backup/v1/backup.go @@ -61,3 +61,11 @@ type ArangoBackup struct { Spec ArangoBackupSpec `json:"spec"` Status ArangoBackupStatus `json:"status"` } + +func (a *ArangoBackup) GetStatus() ArangoBackupStatus { + return a.Status +} + +func (a *ArangoBackup) SetStatus(status ArangoBackupStatus) { + a.Status = status +} diff --git a/pkg/apis/ml/v1alpha1/extension.go b/pkg/apis/ml/v1alpha1/extension.go index 3a5a2dcbb..0aafe68e5 100644 --- a/pkg/apis/ml/v1alpha1/extension.go +++ b/pkg/apis/ml/v1alpha1/extension.go @@ -45,3 +45,11 @@ type ArangoMLExtension struct { Spec ArangoMLExtensionSpec `json:"spec"` Status ArangoMLExtensionStatus `json:"status"` } + +func (a *ArangoMLExtension) GetStatus() ArangoMLExtensionStatus { + return a.Status +} + +func (a *ArangoMLExtension) SetStatus(status ArangoMLExtensionStatus) { + a.Status = status +} diff --git a/pkg/apis/ml/v1alpha1/extension_conditions.go b/pkg/apis/ml/v1alpha1/extension_conditions.go new file mode 100644 index 000000000..4d725fc3a --- /dev/null +++ b/pkg/apis/ml/v1alpha1/extension_conditions.go @@ -0,0 +1,27 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1alpha1 + +import api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + +const ( + ExtensionDeploymentFoundCondition api.ConditionType = "DeploymentFound" +) diff --git a/pkg/apis/ml/v1alpha1/extension_status.go b/pkg/apis/ml/v1alpha1/extension_status.go index f240308b1..2a0209982 100644 --- a/pkg/apis/ml/v1alpha1/extension_status.go +++ b/pkg/apis/ml/v1alpha1/extension_status.go @@ -20,5 +20,10 @@ package v1alpha1 +import api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + type ArangoMLExtensionStatus struct { + // Conditions specific to the entire extension + // +doc/type: api.Conditions + Conditions api.ConditionList `json:"conditions,omitempty"` } diff --git a/pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go index 844d04057..5e818c669 100644 --- a/pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go @@ -26,6 +26,7 @@ package v1alpha1 import ( + v1 "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -221,7 +222,7 @@ func (in *ArangoMLExtension) DeepCopyInto(out *ArangoMLExtension) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } @@ -295,6 +296,13 @@ func (in *ArangoMLExtensionSpec) DeepCopy() *ArangoMLExtensionSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ArangoMLExtensionStatus) DeepCopyInto(out *ArangoMLExtensionStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make(v1.ConditionList, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } diff --git a/pkg/debug_package/generator.go b/pkg/debug_package/generator.go index 9b82553a9..c2dbbc5cd 100644 --- a/pkg/debug_package/generator.go +++ b/pkg/debug_package/generator.go @@ -42,6 +42,7 @@ var rootFactories = []shared.Factory{ kubernetes.Services(), kubernetes.Deployments(), kubernetes.AgencyDump(), + kubernetes.ML(), } func InitCommand(cmd *cobra.Command) { diff --git a/pkg/debug_package/generators/kubernetes/arango_deployments.go b/pkg/debug_package/generators/kubernetes/arango_deployments.go index dd412af71..ae446b484 100644 --- a/pkg/debug_package/generators/kubernetes/arango_deployments.go +++ b/pkg/debug_package/generators/kubernetes/arango_deployments.go @@ -64,13 +64,7 @@ func deployments(logger zerolog.Logger, files chan<- shared.File) error { return err } - errDeployments := make([]error, len(deploymentList)) - - for id := range deploymentList { - errDeployments[id] = deployment(k, deploymentList[id], files) - } - - if err := errors.Errors(errDeployments...); err != nil { + if err := errors.ExecuteWithErrorArrayP2(deployment, k, files, deploymentList...); err != nil { logger.Err(err).Msgf("Error while collecting arango deployments") return err } @@ -78,7 +72,7 @@ func deployments(logger zerolog.Logger, files chan<- shared.File) error { return nil } -func deployment(client kclient.Client, depl *api.ArangoDeployment, files chan<- shared.File) error { +func deployment(client kclient.Client, files chan<- shared.File, depl *api.ArangoDeployment) error { files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/deployments/%s.yaml", depl.GetName()), func() ([]interface{}, error) { return []interface{}{depl}, nil }) diff --git a/pkg/debug_package/generators/kubernetes/arango_ml.go b/pkg/debug_package/generators/kubernetes/arango_ml.go new file mode 100644 index 000000000..d726ab834 --- /dev/null +++ b/pkg/debug_package/generators/kubernetes/arango_ml.go @@ -0,0 +1,62 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kubernetes + +import ( + "github.com/rs/zerolog" + + "github.com/arangodb/kube-arangodb/pkg/debug_package/shared" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" +) + +func ML() shared.Factory { + return shared.NewFactory("ml", true, ml) +} + +func ml(logger zerolog.Logger, files chan<- shared.File) error { + k, ok := kclient.GetDefaultFactory().Client() + if !ok { + return errors.Newf("Client is not initialised") + } + + if err := mlExtensions(logger, files, k); err != nil { + logger.Err(err).Msgf("Error while collecting arango ml extension") + return err + } + + if err := mlStorages(logger, files, k); err != nil { + logger.Err(err).Msgf("Error while collecting arango ml storage") + return err + } + + if err := mlBatchJobs(logger, files, k); err != nil { + logger.Err(err).Msgf("Error while collecting arango ml batch jobs") + return err + } + + if err := mlCronJobs(logger, files, k); err != nil { + logger.Err(err).Msgf("Error while collecting arango ml cron jobs") + return err + } + + return nil +} diff --git a/pkg/debug_package/generators/kubernetes/arango_ml_batch_job.go b/pkg/debug_package/generators/kubernetes/arango_ml_batch_job.go new file mode 100644 index 000000000..4622d5f29 --- /dev/null +++ b/pkg/debug_package/generators/kubernetes/arango_ml_batch_job.go @@ -0,0 +1,73 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kubernetes + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + + mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1" + "github.com/arangodb/kube-arangodb/pkg/debug_package/cli" + "github.com/arangodb/kube-arangodb/pkg/debug_package/shared" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" +) + +func mlBatchJobs(logger zerolog.Logger, files chan<- shared.File, client kclient.Client) error { + batchjobs, err := listMLBatchJobs(client) + if err != nil { + if kerrors.IsForbiddenOrNotFound(err) { + return nil + } + + return err + } + + if err := errors.ExecuteWithErrorArrayP2(mlBatchJob, client, files, batchjobs...); err != nil { + logger.Err(err).Msgf("Error while collecting arango ml batchjobs") + return err + } + + return nil +} + +func mlBatchJob(client kclient.Client, files chan<- shared.File, ext *mlApi.ArangoMLBatchJob) error { + files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/ml/batchjobs/%s.yaml", ext.GetName()), func() ([]interface{}, error) { + return []interface{}{ext}, nil + }) + + return nil +} + +func listMLBatchJobs(client kclient.Client) ([]*mlApi.ArangoMLBatchJob, error) { + return ListObjects[*mlApi.ArangoMLBatchJobList, *mlApi.ArangoMLBatchJob](context.Background(), client.Arango().MlV1alpha1().ArangoMLBatchJobs(cli.GetInput().Namespace), func(result *mlApi.ArangoMLBatchJobList) []*mlApi.ArangoMLBatchJob { + q := make([]*mlApi.ArangoMLBatchJob, len(result.Items)) + + for id, e := range result.Items { + q[id] = e.DeepCopy() + } + + return q + }) +} diff --git a/pkg/debug_package/generators/kubernetes/arango_ml_cron_job.go b/pkg/debug_package/generators/kubernetes/arango_ml_cron_job.go new file mode 100644 index 000000000..f49c81378 --- /dev/null +++ b/pkg/debug_package/generators/kubernetes/arango_ml_cron_job.go @@ -0,0 +1,73 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kubernetes + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + + mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1" + "github.com/arangodb/kube-arangodb/pkg/debug_package/cli" + "github.com/arangodb/kube-arangodb/pkg/debug_package/shared" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" +) + +func mlCronJobs(logger zerolog.Logger, files chan<- shared.File, client kclient.Client) error { + cronjobs, err := listMLCronJobs(client) + if err != nil { + if kerrors.IsForbiddenOrNotFound(err) { + return nil + } + + return err + } + + if err := errors.ExecuteWithErrorArrayP2(mlCronJob, client, files, cronjobs...); err != nil { + logger.Err(err).Msgf("Error while collecting arango ml cronjobs") + return err + } + + return nil +} + +func mlCronJob(client kclient.Client, files chan<- shared.File, ext *mlApi.ArangoMLCronJob) error { + files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/ml/cronjobs/%s.yaml", ext.GetName()), func() ([]interface{}, error) { + return []interface{}{ext}, nil + }) + + return nil +} + +func listMLCronJobs(client kclient.Client) ([]*mlApi.ArangoMLCronJob, error) { + return ListObjects[*mlApi.ArangoMLCronJobList, *mlApi.ArangoMLCronJob](context.Background(), client.Arango().MlV1alpha1().ArangoMLCronJobs(cli.GetInput().Namespace), func(result *mlApi.ArangoMLCronJobList) []*mlApi.ArangoMLCronJob { + q := make([]*mlApi.ArangoMLCronJob, len(result.Items)) + + for id, e := range result.Items { + q[id] = e.DeepCopy() + } + + return q + }) +} diff --git a/pkg/debug_package/generators/kubernetes/arango_ml_extension.go b/pkg/debug_package/generators/kubernetes/arango_ml_extension.go new file mode 100644 index 000000000..2c894c344 --- /dev/null +++ b/pkg/debug_package/generators/kubernetes/arango_ml_extension.go @@ -0,0 +1,73 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kubernetes + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + + mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1" + "github.com/arangodb/kube-arangodb/pkg/debug_package/cli" + "github.com/arangodb/kube-arangodb/pkg/debug_package/shared" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" +) + +func mlExtensions(logger zerolog.Logger, files chan<- shared.File, client kclient.Client) error { + extensions, err := listMLExtensions(client) + if err != nil { + if kerrors.IsForbiddenOrNotFound(err) { + return nil + } + + return err + } + + if err := errors.ExecuteWithErrorArrayP2(mlExtension, client, files, extensions...); err != nil { + logger.Err(err).Msgf("Error while collecting arango ml extensions") + return err + } + + return nil +} + +func mlExtension(client kclient.Client, files chan<- shared.File, ext *mlApi.ArangoMLExtension) error { + files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/ml/extensions/%s.yaml", ext.GetName()), func() ([]interface{}, error) { + return []interface{}{ext}, nil + }) + + return nil +} + +func listMLExtensions(client kclient.Client) ([]*mlApi.ArangoMLExtension, error) { + return ListObjects[*mlApi.ArangoMLExtensionList, *mlApi.ArangoMLExtension](context.Background(), client.Arango().MlV1alpha1().ArangoMLExtensions(cli.GetInput().Namespace), func(result *mlApi.ArangoMLExtensionList) []*mlApi.ArangoMLExtension { + q := make([]*mlApi.ArangoMLExtension, len(result.Items)) + + for id, e := range result.Items { + q[id] = e.DeepCopy() + } + + return q + }) +} diff --git a/pkg/debug_package/generators/kubernetes/arango_ml_storage.go b/pkg/debug_package/generators/kubernetes/arango_ml_storage.go new file mode 100644 index 000000000..d68d8c04a --- /dev/null +++ b/pkg/debug_package/generators/kubernetes/arango_ml_storage.go @@ -0,0 +1,73 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kubernetes + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + + mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1" + "github.com/arangodb/kube-arangodb/pkg/debug_package/cli" + "github.com/arangodb/kube-arangodb/pkg/debug_package/shared" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" +) + +func mlStorages(logger zerolog.Logger, files chan<- shared.File, client kclient.Client) error { + storages, err := listMLStorages(client) + if err != nil { + if kerrors.IsForbiddenOrNotFound(err) { + return nil + } + + return err + } + + if err := errors.ExecuteWithErrorArrayP2(mlStorage, client, files, storages...); err != nil { + logger.Err(err).Msgf("Error while collecting arango ml storages") + return err + } + + return nil +} + +func mlStorage(client kclient.Client, files chan<- shared.File, ext *mlApi.ArangoMLStorage) error { + files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/ml/storages/%s.yaml", ext.GetName()), func() ([]interface{}, error) { + return []interface{}{ext}, nil + }) + + return nil +} + +func listMLStorages(client kclient.Client) ([]*mlApi.ArangoMLStorage, error) { + return ListObjects[*mlApi.ArangoMLStorageList, *mlApi.ArangoMLStorage](context.Background(), client.Arango().MlV1alpha1().ArangoMLStorages(cli.GetInput().Namespace), func(result *mlApi.ArangoMLStorageList) []*mlApi.ArangoMLStorage { + q := make([]*mlApi.ArangoMLStorage, len(result.Items)) + + for id, e := range result.Items { + q[id] = e.DeepCopy() + } + + return q + }) +} diff --git a/pkg/handlers/backup/handler.go b/pkg/handlers/backup/handler.go index a2077f5cf..0cab65d19 100644 --- a/pkg/handlers/backup/handler.go +++ b/pkg/handlers/backup/handler.go @@ -39,7 +39,6 @@ import ( backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" database "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" arangoClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" - "github.com/arangodb/kube-arangodb/pkg/handlers/utils" "github.com/arangodb/kube-arangodb/pkg/logging" operator "github.com/arangodb/kube-arangodb/pkg/operatorV2" "github.com/arangodb/kube-arangodb/pkg/operatorV2/event" @@ -51,9 +50,6 @@ import ( var logger = logging.Global().RegisterAndGetLogger("backup-operator", logging.Info) const ( - retryCount = 25 - retryDelay = time.Second - // StateChange name of the event send when state changed StateChange = "StateChange" @@ -181,9 +177,7 @@ func (h *handler) refreshDeploymentBackup(deployment *database.ArangoDeployment, updateStatusBackup(backupMeta), updateStatusBackupImported(util.NewType[bool](true))) - backup.Status = *status - - err = h.updateBackupStatus(backup) + _, err = operator.WithArangoBackupUpdateStatusInterfaceRetry(context.Background(), h.client.BackupV1().ArangoBackups(backup.GetNamespace()), backup, *status, meta.UpdateOptions{}) if err != nil { return err } @@ -195,20 +189,6 @@ func (h *handler) Name() string { return backup.ArangoBackupResourceKind } -func (h *handler) updateBackupStatus(b *backupApi.ArangoBackup) error { - return utils.Retry(retryCount, retryDelay, func() error { - backup, err := h.client.BackupV1().ArangoBackups(b.Namespace).Get(context.Background(), b.Name, meta.GetOptions{}) - if err != nil { - return err - } - - backup.Status = b.Status - - _, err = h.client.BackupV1().ArangoBackups(b.Namespace).UpdateStatus(context.Background(), backup, meta.UpdateOptions{}) - return err - }) -} - func (h *handler) getDeploymentMutex(namespace, deployment string) *sync.Mutex { h.lock.Lock() defer h.lock.Unlock() @@ -346,15 +326,13 @@ func (h *handler) Handle(_ context.Context, item operation.Item) error { } } - b.Status = *status - logger.Debug("Updating %s %s/%s", item.Kind, item.Namespace, item.Name) // Update status on object - if err := h.updateBackupStatus(b); err != nil { + if _, err := operator.WithArangoBackupUpdateStatusInterfaceRetry(context.Background(), h.client.BackupV1().ArangoBackups(b.GetNamespace()), b, *status, meta.UpdateOptions{}); err != nil { return err } diff --git a/pkg/operatorV2/errors_stop.go b/pkg/operatorV2/errors_stop.go new file mode 100644 index 000000000..35da7d698 --- /dev/null +++ b/pkg/operatorV2/errors_stop.go @@ -0,0 +1,49 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package operator + +import "fmt" + +func Stop(msg string, args ...interface{}) error { + return stop{ + message: fmt.Sprintf(msg, args...), + } +} + +type stop struct { + message string +} + +func (r stop) Error() string { + return r.message +} + +func IsStop(err error) bool { + if err == nil { + return false + } + + if _, ok := err.(stop); ok { + return true + } + + return false +} diff --git a/pkg/operatorV2/handle.go b/pkg/operatorV2/handle.go index 3513abbd9..7c2377ad8 100644 --- a/pkg/operatorV2/handle.go +++ b/pkg/operatorV2/handle.go @@ -22,38 +22,110 @@ package operator import "context" -type HandleP0Func func(tx context.Context) error +type HandleP0Func func(ctx context.Context) (bool, error) -type HandleP1Func[P1 interface{}] func(tx context.Context, p1 P1) error +type HandleP1Func[P1 interface{}] func(ctx context.Context, p1 P1) (bool, error) -type HandleP2Func[P1, P2 interface{}] func(tx context.Context, p1 P1, p2 P2) error +type HandleP2Func[P1, P2 interface{}] func(ctx context.Context, p1 P1, p2 P2) (bool, error) -func HandleP0(ctx context.Context, handler ...HandleP0Func) error { +type HandleP3Func[P1, P2, P3 interface{}] func(ctx context.Context, p1 P1, p2 P2, p3 P3) (bool, error) + +type HandleP4Func[P1, P2, P3, P4 interface{}] func(ctx context.Context, p1 P1, p2 P2, p3 P3, p4 P4) (bool, error) + +type HandleP9Func[P1, P2, P3, P4, P5, P6, P7, P8, P9 interface{}] func(ctx context.Context, p1 P1, p2 P2, p3 P3, p4 P4, p5 P5, p6 P6, p7 P7, p8 P8, p9 P9) (bool, error) + +func HandleP0(ctx context.Context, handler ...HandleP0Func) (bool, error) { + isChanged := false for _, h := range handler { - if err := h(ctx); err != nil { - return err + changed, err := h(ctx) + if changed { + isChanged = true + } + + if err != nil { + return isChanged, err } } - return nil + return isChanged, nil } -func HandleP1[P1 interface{}](ctx context.Context, p1 P1, handler ...HandleP1Func[P1]) error { +func HandleP1[P1 interface{}](ctx context.Context, p1 P1, handler ...HandleP1Func[P1]) (bool, error) { + isChanged := false for _, h := range handler { - if err := h(ctx, p1); err != nil { - return err + changed, err := h(ctx, p1) + if changed { + isChanged = true + } + + if err != nil { + return isChanged, err } } - return nil + return isChanged, nil } -func HandleP2[P1, P2 interface{}](ctx context.Context, p1 P1, p2 P2, handler ...HandleP2Func[P1, P2]) error { +func HandleP2[P1, P2 interface{}](ctx context.Context, p1 P1, p2 P2, handler ...HandleP2Func[P1, P2]) (bool, error) { + isChanged := false for _, h := range handler { - if err := h(ctx, p1, p2); err != nil { - return err + changed, err := h(ctx, p1, p2) + if changed { + isChanged = true + } + + if err != nil { + return isChanged, err + } + } + + return isChanged, nil +} + +func HandleP3[P1, P2, P3 interface{}](ctx context.Context, p1 P1, p2 P2, p3 P3, handler ...HandleP3Func[P1, P2, P3]) (bool, error) { + isChanged := false + for _, h := range handler { + changed, err := h(ctx, p1, p2, p3) + if changed { + isChanged = true + } + + if err != nil { + return isChanged, err + } + } + + return isChanged, nil +} + +func HandleP4[P1, P2, P3, P4 interface{}](ctx context.Context, p1 P1, p2 P2, p3 P3, p4 P4, handler ...HandleP4Func[P1, P2, P3, P4]) (bool, error) { + isChanged := false + for _, h := range handler { + changed, err := h(ctx, p1, p2, p3, p4) + if changed { + isChanged = true + } + + if err != nil { + return isChanged, err + } + } + + return isChanged, nil +} + +func HandleP9[P1, P2, P3, P4, P5, P6, P7, P8, P9 interface{}](ctx context.Context, p1 P1, p2 P2, p3 P3, p4 P4, p5 P5, p6 P6, p7 P7, p8 P8, p9 P9, handler ...HandleP9Func[P1, P2, P3, P4, P5, P6, P7, P8, P9]) (bool, error) { + isChanged := false + for _, h := range handler { + changed, err := h(ctx, p1, p2, p3, p4, p5, p6, p7, p8, p9) + if changed { + isChanged = true + } + + if err != nil { + return isChanged, err } } - return nil + return isChanged, nil } diff --git a/pkg/operatorV2/update.go b/pkg/operatorV2/update.go new file mode 100644 index 000000000..3e55ac15e --- /dev/null +++ b/pkg/operatorV2/update.go @@ -0,0 +1,86 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package operator + +import ( + "context" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/globals" + "github.com/arangodb/kube-arangodb/pkg/util/timer" +) + +type Object[T interface{}] interface { + meta.Object + + GetStatus() T + SetStatus(T) +} + +type GetInterface[S interface{}, T Object[S]] interface { + Get(ctx context.Context, name string, options meta.GetOptions) (T, error) +} + +type UpdateStatusInterfaceClient[S interface{}, T Object[S]] func(namespace string) UpdateStatusInterface[S, T] + +type UpdateStatusInterface[S interface{}, T Object[S]] interface { + GetInterface[S, T] + + UpdateStatus(ctx context.Context, in T, options meta.UpdateOptions) (T, error) +} + +func WithUpdateStatusInterfaceRetry[S interface{}, T Object[S]](ctx context.Context, client UpdateStatusInterface[S, T], obj T, status S, opts meta.UpdateOptions) (T, error) { + for id := 0; id < globals.GetGlobals().Retry().OperatorUpdateRetryCount().Get(); id++ { + // Let's try to make a call + if nObj, err := WithUpdateStatusInterface(ctx, client, obj, status, opts); err == nil { + return nObj, nil + } + + select { + case <-timer.After(globals.GetGlobals().Retry().OperatorUpdateRetryDelay().Get()): + continue + case <-ctx.Done(): + return util.Default[T](), context.DeadlineExceeded + } + } + + return util.Default[T](), errors.Newf("Unable to save Object %s/%s, retries exceeded", obj.GetNamespace(), obj.GetName()) +} + +func WithUpdateStatusInterface[S interface{}, T Object[S]](ctx context.Context, client UpdateStatusInterface[S, T], obj T, status S, opts meta.UpdateOptions) (T, error) { + cCtx, c := globals.GetGlobals().Timeouts().Kubernetes().WithTimeout(ctx) + defer c() + + currentObj, err := client.Get(cCtx, obj.GetName(), meta.GetOptions{}) + if err != nil { + return util.Default[T](), err + } + + currentObj.SetStatus(status) + + nCtx, c := globals.GetGlobals().Timeouts().Kubernetes().WithTimeout(ctx) + defer c() + + return client.UpdateStatus(nCtx, currentObj, opts) +} diff --git a/pkg/operatorV2/update_wraps.go b/pkg/operatorV2/update_wraps.go new file mode 100644 index 000000000..c9d4cd394 --- /dev/null +++ b/pkg/operatorV2/update_wraps.go @@ -0,0 +1,38 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package operator + +import ( + "context" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + + backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" + mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1" +) + +func WithArangoBackupUpdateStatusInterfaceRetry(ctx context.Context, client UpdateStatusInterface[backupApi.ArangoBackupStatus, *backupApi.ArangoBackup], obj *backupApi.ArangoBackup, status backupApi.ArangoBackupStatus, opts meta.UpdateOptions) (*backupApi.ArangoBackup, error) { + return WithUpdateStatusInterfaceRetry[backupApi.ArangoBackupStatus, *backupApi.ArangoBackup](ctx, client, obj, status, opts) +} + +func WithArangoExtensionUpdateStatusInterfaceRetry(ctx context.Context, client UpdateStatusInterface[mlApi.ArangoMLExtensionStatus, *mlApi.ArangoMLExtension], obj *mlApi.ArangoMLExtension, status mlApi.ArangoMLExtensionStatus, opts meta.UpdateOptions) (*mlApi.ArangoMLExtension, error) { + return WithUpdateStatusInterfaceRetry[mlApi.ArangoMLExtensionStatus, *mlApi.ArangoMLExtension](ctx, client, obj, status, opts) +} diff --git a/pkg/util/context.go b/pkg/util/context.go new file mode 100644 index 000000000..807de1a93 --- /dev/null +++ b/pkg/util/context.go @@ -0,0 +1,39 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package util + +import ( + "context" + "time" + + "github.com/arangodb/kube-arangodb/pkg/util/globals" +) + +func WithKubernetesContextTimeoutP2A2[P1, P2, A1, A2 interface{}](ctx context.Context, f func(context.Context, A1, A2) (P1, P2), a1 A1, a2 A2) (P1, P2) { + return WithContextTimeoutP2A2(ctx, globals.GetGlobals().Timeouts().Kubernetes().Get(), f, a1, a2) +} + +func WithContextTimeoutP2A2[P1, P2, A1, A2 interface{}](ctx context.Context, timeout time.Duration, f func(context.Context, A1, A2) (P1, P2), a1 A1, a2 A2) (P1, P2) { + nCtx, c := context.WithTimeout(ctx, timeout) + defer c() + + return f(nCtx, a1, a2) +} diff --git a/pkg/util/errors/execute.go b/pkg/util/errors/execute.go new file mode 100644 index 000000000..1cb2d3fca --- /dev/null +++ b/pkg/util/errors/execute.go @@ -0,0 +1,33 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package errors + +type WithErrorArrayP2[IN, P1, P2 any] func(p1 P1, p2 P2, in IN) error + +func ExecuteWithErrorArrayP2[IN, P1, P2 any](caller WithErrorArrayP2[IN, P1, P2], p1 P1, p2 P2, elements ...IN) error { + errors := make([]error, len(elements)) + + for id := range elements { + errors[id] = caller(p1, p2, elements[id]) + } + + return Errors(errors...) +} diff --git a/pkg/util/globals/global.go b/pkg/util/globals/global.go index add59f7d2..e9952f11a 100644 --- a/pkg/util/globals/global.go +++ b/pkg/util/globals/global.go @@ -41,6 +41,11 @@ const ( DefaultKubernetesRequestBatchSize = 256 DefaultBackupConcurrentUploads = 4 + + // Retry + + DefaultOperatorUpdateRetryCount = 25 + DefaultOperatorUpdateRetryDelay = time.Second ) var globalObj = &globals{ @@ -61,6 +66,10 @@ var globalObj = &globals{ backup: &globalBackup{ concurrentUploads: NewInt(DefaultBackupConcurrentUploads), }, + retry: &globalRetry{ + operatorUpdateRetryCount: NewInt(DefaultOperatorUpdateRetryCount), + operatorUpdateRetryDelay: NewTimeout(DefaultOperatorUpdateRetryDelay), + }, } func GetGlobals() Globals { @@ -75,12 +84,18 @@ type Globals interface { Timeouts() GlobalTimeouts Kubernetes() GlobalKubernetes Backup() GlobalBackup + Retry() GlobalRetry } type globals struct { timeouts *globalTimeouts kubernetes *globalKubernetes backup *globalBackup + retry *globalRetry +} + +func (g globals) Retry() GlobalRetry { + return g.retry } func (g globals) Backup() GlobalBackup { @@ -174,3 +189,21 @@ func (g *globalTimeouts) BackupArangoClientTimeout() Timeout { func (g *globalTimeouts) BackupArangoClientUploadTimeout() Timeout { return g.backupArangoClientUploadTimeout } + +type GlobalRetry interface { + OperatorUpdateRetryCount() Int + OperatorUpdateRetryDelay() Timeout +} + +type globalRetry struct { + operatorUpdateRetryCount Int + operatorUpdateRetryDelay Timeout +} + +func (g *globalRetry) OperatorUpdateRetryCount() Int { + return g.operatorUpdateRetryCount +} + +func (g *globalRetry) OperatorUpdateRetryDelay() Timeout { + return g.operatorUpdateRetryDelay +}