diff --git a/internal/config/benchmark.go b/internal/config/benchmark.go index 37bcba94b90..83fb41b2dde 100644 --- a/internal/config/benchmark.go +++ b/internal/config/benchmark.go @@ -17,21 +17,26 @@ // Package config providers configuration type and load configuration logic package config +import "github.com/vdaas/vald/internal/k8s/client" + // BenchmarkJob represents the configuration for the internal benchmark search job. type BenchmarkJob struct { - Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` - Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` - Dimension int `json:"dimension,omitempty" yaml:"dimension"` - Replica int `json:"replica,omitempty" yaml:"replica"` - Repetition int `json:"repetition,omitempty" yaml:"repetition"` - JobType string `json:"job_type,omitempty" yaml:"job_type"` - InsertConfig *InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"` - UpdateConfig *UpdateConfig `json:"update_config,omitempty" yaml:"update_config"` - UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"` - SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"` - RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"` - ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"` - Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"` + Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` + Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` + Dimension int `json:"dimension,omitempty" yaml:"dimension"` + Replica int `json:"replica,omitempty" yaml:"replica"` + Repetition int `json:"repetition,omitempty" yaml:"repetition"` + JobType string `json:"job_type,omitempty" yaml:"job_type"` + InsertConfig *InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"` + UpdateConfig *UpdateConfig `json:"update_config,omitempty" yaml:"update_config"` + UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"` + SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"` + RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"` + ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"` + Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"` + BeforeJobName string `json:"before_job_name,omitempty" yaml:"before_job_name"` + BeforeJobNamespace string `json:"before_job_namespace,omitempty" yaml:"before_job_namespace"` + Client client.Client `json:"client,omitempty" yaml:"client"` } // BenchmarkScenario represents the configuration for the internal benchmark scenario. diff --git a/internal/errors/benchmark.go b/internal/errors/benchmark.go index 4223ee8befd..d4fa5cd3c4e 100644 --- a/internal/errors/benchmark.go +++ b/internal/errors/benchmark.go @@ -17,4 +17,16 @@ // Package errors provides benchmark error package errors -var ErrInvalidCoreMode = New("invalid core mode") +var ( + ErrInvalidCoreMode = New("invalid core mode") + + // ErrFailedToCreateBenchmarkJob represents a function to generate an error that failed to create benchmark job crd. + ErrFailedToCreateBenchmarkJob = func(err error, jn string) error { + return Wrapf(err, "could not create benchmark job resource: %s ", jn) + } + + // ErrFailedToCreateJob represents a function to generate an error that failed to create job resource. + ErrFailedToCreateJob = func(err error, jn string) error { + return Wrapf(err, "could not create job: %s ", jn) + } +) diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 9822d36afe2..8cd8f7f6f87 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -26,6 +26,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" ) +type ( + Object = cli.Object + ObjectKey = cli.ObjectKey + DeleteAllOfOptions = cli.DeleteAllOfOptions + ListOptions = cli.ListOptions + MatchingLabels = cli.MatchingLabels + InNamespace = cli.InNamespace +) + type Client interface { // Get retrieves an obj for the given object key from the Kubernetes Cluster. // obj must be a struct pointer so that obj can be updated with the response diff --git a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml index 34a92c68caa..cdefc9dcaa8 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml @@ -64,6 +64,7 @@ spec: description: ValdBenchmarkJobStatus defines the observed state of ValdBenchmarkJob enum: - NotReady + - Completed - Available - Healthy type: string diff --git a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml index 09a73f17ccd..135dee3c302 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml @@ -61,6 +61,7 @@ spec: description: ValdBenchmarkScenarioStatus defines the observed state of ValdBenchmarkScenario enum: - NotReady + - Completed - Available - Healthy type: string diff --git a/internal/k8s/reconciler.go b/internal/k8s/reconciler.go index 08f3a5df49b..894c9cae119 100644 --- a/internal/k8s/reconciler.go +++ b/internal/k8s/reconciler.go @@ -25,6 +25,7 @@ import ( "github.com/vdaas/vald/internal/net" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync/errgroup" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -34,7 +35,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -type Manager = manager.Manager +type ( + Manager = manager.Manager + OwnerReference = v1.OwnerReference +) type Controller interface { Start(ctx context.Context) (<-chan error, error) diff --git a/internal/k8s/vald/benchmark/api/v1/job_types.go b/internal/k8s/vald/benchmark/api/v1/job_types.go index dc3996dbb02..40c71232d7b 100644 --- a/internal/k8s/vald/benchmark/api/v1/job_types.go +++ b/internal/k8s/vald/benchmark/api/v1/job_types.go @@ -42,6 +42,7 @@ type BenchmarkJobStatus string const ( BenchmarkJobNotReady = BenchmarkJobStatus("NotReady") + BenchmarkJobCompleted = BenchmarkJobStatus("Completed") BenchmarkJobAvailable = BenchmarkJobStatus("Available") BenchmarkJobHealthy = BenchmarkJobStatus("Healthy") ) diff --git a/internal/k8s/vald/benchmark/api/v1/scenario_types.go b/internal/k8s/vald/benchmark/api/v1/scenario_types.go index 0ef2adb341b..8973a1e6b5b 100644 --- a/internal/k8s/vald/benchmark/api/v1/scenario_types.go +++ b/internal/k8s/vald/benchmark/api/v1/scenario_types.go @@ -31,6 +31,7 @@ type ValdBenchmarkScenarioStatus string const ( BenchmarkScenarioNotReady ValdBenchmarkScenarioStatus = "NotReady" + BenchmarkScenarioCompleted ValdBenchmarkScenarioStatus = "Completed" BenchmarkScenarioAvailable ValdBenchmarkScenarioStatus = "Available" BenchmarkScenarioHealthy ValdBenchmarkScenarioStatus = "Healthy" ) diff --git a/internal/k8s/vald/benchmark/job/job_template.go b/internal/k8s/vald/benchmark/job/job_template.go new file mode 100644 index 00000000000..2830e239311 --- /dev/null +++ b/internal/k8s/vald/benchmark/job/job_template.go @@ -0,0 +1,110 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package job manages the main logic of benchmark job. +package job + +import ( + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" +) + +type benchmarkJobTemplate = batchv1.Job + +const ( + SvcAccountName = "vald-benchmark-operator" + ContainerName = "vald-benchmark-job" + ContainerImage = "local-registry:5000/vdaas/vald-benchmark-job:latest" + + RestartPolicyAlways corev1.RestartPolicy = "Always" + RestartPolicyOnFailure corev1.RestartPolicy = "OnFailure" + RestartPolicyNever corev1.RestartPolicy = "Never" +) + +// NewBenchmarkJobTemplate creates the job template for crating k8s job resource. +func NewBenchmarkJobTemplate(opts ...BenchmarkJobOption) (benchmarkJobTemplate, error) { + jobTmpl := new(benchmarkJobTemplate) + for _, opt := range append(defaultBenchmarkJobOpts, opts...) { + err := opt(jobTmpl) + if err != nil { + return *jobTmpl, err + } + } + jobTmpl.Spec.Template.Spec.Containers = []corev1.Container{ + { + Name: ContainerName, + Image: ContainerImage, + ImagePullPolicy: corev1.PullAlways, + LivenessProbe: &corev1.Probe{ + InitialDelaySeconds: int32(60), + PeriodSeconds: int32(10), + TimeoutSeconds: int32(300), + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{ + "/go/bin/job", + "-v", + }, + }, + }, + }, + StartupProbe: &corev1.Probe{ + FailureThreshold: int32(30), + PeriodSeconds: int32(10), + TimeoutSeconds: int32(300), + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{ + "/go/bin/job", + "-v", + }, + }, + }, + }, + Ports: []corev1.ContainerPort{ + { + Name: "liveness", + Protocol: corev1.ProtocolTCP, + ContainerPort: int32(3000), + }, + { + Name: "readiness", + Protocol: corev1.ProtocolTCP, + ContainerPort: int32(3001), + }, + }, + Env: []corev1.EnvVar{ + { + Name: "CRD_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "CRD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.labels['job-name']", + }, + }, + }, + }, + }, + } + return *jobTmpl, nil +} diff --git a/internal/k8s/vald/benchmark/job/job_template_option.go b/internal/k8s/vald/benchmark/job/job_template_option.go new file mode 100644 index 00000000000..79b29a81b85 --- /dev/null +++ b/internal/k8s/vald/benchmark/job/job_template_option.go @@ -0,0 +1,114 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package job + +import ( + "github.com/vdaas/vald/internal/k8s" + corev1 "k8s.io/api/core/v1" +) + +// BenchmarkJobOption represents the option for create benchmark job template. +type BenchmarkJobOption func(b *benchmarkJobTemplate) error + +var defaultBenchmarkJobOpts = []BenchmarkJobOption{ + WithSvcAccountName(SvcAccountName), + WithRestartPolicy(RestartPolicyNever), +} + +// WithSvcAccountName sets the service account name for benchmark job. +func WithSvcAccountName(name string) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + if len(name) > 0 { + b.Spec.Template.Spec.ServiceAccountName = name + } + return nil + } +} + +// WithRestartPolicy sets the job restart policy for benchmark job. +func WithRestartPolicy(rp corev1.RestartPolicy) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + if len(rp) > 0 { + b.Spec.Template.Spec.RestartPolicy = rp + } + return nil + } +} + +// WithBackoffLimit sets the job backoff limit for benchmark job. +func WithBackoffLimit(bo int32) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + b.Spec.BackoffLimit = &bo + return nil + } +} + +// WithName sets the job name of benchmark job. +func WithName(name string) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + b.Name = name + return nil + } +} + +// WithNamespace specify namespace where job will execute. +func WithNamespace(ns string) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + b.Namespace = ns + return nil + } +} + +// WithOwnerRef sets the OwnerReference to the job resource. +func WithOwnerRef(refs []k8s.OwnerReference) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + if len(refs) > 0 { + b.OwnerReferences = refs + } + return nil + } +} + +// WithCompletions sets the job completion. +func WithCompletions(com int32) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + if com > 1 { + b.Spec.Completions = &com + } + return nil + } +} + +// WithParallelism sets the job parallelism. +func WithParallelism(parallelism int32) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + if parallelism > 1 { + b.Spec.Parallelism = ¶llelism + } + return nil + } +} + +// WithLabel sets the label to the job resource. +func WithLabel(label map[string]string) BenchmarkJobOption { + return func(b *benchmarkJobTemplate) error { + if len(label) > 0 { + b.Labels = label + } + return nil + } +} diff --git a/pkg/tools/benchmark/job/config/config.go b/pkg/tools/benchmark/job/config/config.go index 36620e1dbc5..34aa8e34feb 100644 --- a/pkg/tools/benchmark/job/config/config.go +++ b/pkg/tools/benchmark/job/config/config.go @@ -85,11 +85,15 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) { // Get config from applied ValdBenchmarkJob custom resource var jobResource v1.ValdBenchmarkJob - c, err := client.New(client.WithSchemeBuilder(*v1.SchemeBuilder)) - if err != nil { - log.Warn(err.Error()) + if cfg.Job.Client == nil { + c, err := client.New(client.WithSchemeBuilder(*v1.SchemeBuilder)) + if err != nil { + log.Error(err.Error()) + return nil, err + } + cfg.Job.Client = c } - err = c.Get(ctx, NAME, NAMESPACE, &jobResource) + err = cfg.Job.Client.Get(ctx, NAME, NAMESPACE, &jobResource) if err != nil { log.Warn(err.Error()) } else { @@ -105,6 +109,10 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) { cfg.Job.SearchConfig = jobResource.Spec.SearchConfig cfg.Job.RemoveConfig = jobResource.Spec.RemoveConfig cfg.Job.ClientConfig = jobResource.Spec.ClientConfig + if annotations := jobResource.GetAnnotations(); annotations != nil { + cfg.Job.BeforeJobName = annotations["before-job-name"] + cfg.Job.BeforeJobNamespace = annotations["before-job-namespace"] + } } return cfg, nil diff --git a/pkg/tools/benchmark/job/service/job.go b/pkg/tools/benchmark/job/service/job.go index 5287d9c7f1f..69c0d43f0ee 100644 --- a/pkg/tools/benchmark/job/service/job.go +++ b/pkg/tools/benchmark/job/service/job.go @@ -22,13 +22,17 @@ import ( "os" "reflect" "syscall" + "time" "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/client/v1/client/vald" "github.com/vdaas/vald/internal/config" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/k8s/client" + v1 "github.com/vdaas/vald/internal/k8s/vald/benchmark/api/v1" "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/test/data/hdf5" ) @@ -56,18 +60,22 @@ func (jt jobType) String() string { } type job struct { - eg errgroup.Group - dimension int - dataset *config.BenchmarkDataset - jobType jobType - jobFunc func(context.Context, chan error) error - insertConfig *config.InsertConfig - updateConfig *config.UpdateConfig - upsertConfig *config.UpsertConfig - searchConfig *config.SearchConfig - removeConfig *config.RemoveConfig - client vald.Client - hdf5 hdf5.Data + eg errgroup.Group + dimension int + dataset *config.BenchmarkDataset + jobType jobType + jobFunc func(context.Context, chan error) error + insertConfig *config.InsertConfig + updateConfig *config.UpdateConfig + upsertConfig *config.UpsertConfig + searchConfig *config.SearchConfig + removeConfig *config.RemoveConfig + client vald.Client + hdf5 hdf5.Data + beforeJobName string + beforeJobNamespace string + k8sClient client.Client + beforeJobDur time.Duration } func New(opts ...Option) (Job, error) { @@ -93,6 +101,34 @@ func New(opts ...Option) (Job, error) { } func (j *job) PreStart(ctx context.Context) error { + if len(j.beforeJobName) != 0 { + var jobResource v1.ValdBenchmarkJob + log.Info("[benchmark job] check before benchjob is completed or not...") + j.eg.Go(safety.RecoverFunc(func() error { + dt := time.NewTicker(j.beforeJobDur) + defer dt.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-dt.C: + err := j.k8sClient.Get(ctx, j.beforeJobName, j.beforeJobNamespace, &jobResource) + if err != nil { + return err + } + if jobResource.Status == v1.BenchmarkJobCompleted { + log.Infof("[benchmark job ] before job (%s) is completed, job service will start soon.", j.beforeJobName) + return nil + } + log.Infof("[benchmark job] before job (%s) is not completed...", j.beforeJobName) + } + } + })) + if err := j.eg.Wait(); err != nil { + return err + } + } + log.Infof("[benchmark job] start download dataset of %s", j.hdf5.GetName().String()) if err := j.hdf5.Download(); err != nil { return err diff --git a/pkg/tools/benchmark/job/service/option.go b/pkg/tools/benchmark/job/service/option.go index 38013805ebc..998f9f87001 100644 --- a/pkg/tools/benchmark/job/service/option.go +++ b/pkg/tools/benchmark/job/service/option.go @@ -24,16 +24,20 @@ import ( "github.com/vdaas/vald/internal/config" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/test/data/hdf5" + "github.com/vdaas/vald/internal/timeutil" ) type Option func(j *job) error var defaultOpts = []Option{ - WithDimension(748), // TODO: set default config for client + WithDimension(748), + WithBeforeJobDuration("30s"), } +// WithDimension sets the vector's dimension for running benchmark job with dataset. func WithDimension(dim int) Option { return func(j *job) error { if dim > 0 { @@ -43,6 +47,7 @@ func WithDimension(dim int) Option { } } +// WithInsertConfig sets the insert API config for running insert request job. func WithInsertConfig(c *config.InsertConfig) Option { return func(j *job) error { if c != nil { @@ -52,6 +57,7 @@ func WithInsertConfig(c *config.InsertConfig) Option { } } +// WithUpdateConfig sets the update API config for running update request job. func WithUpdateConfig(c *config.UpdateConfig) Option { return func(j *job) error { if c != nil { @@ -61,6 +67,7 @@ func WithUpdateConfig(c *config.UpdateConfig) Option { } } +// WithUpsertConfig sets the upsert API config for running upsert request job. func WithUpsertConfig(c *config.UpsertConfig) Option { return func(j *job) error { if c != nil { @@ -70,6 +77,7 @@ func WithUpsertConfig(c *config.UpsertConfig) Option { } } +// WithSearchConfig sets the search API config for running search request job. func WithSearchConfig(c *config.SearchConfig) Option { return func(j *job) error { if c != nil { @@ -79,6 +87,7 @@ func WithSearchConfig(c *config.SearchConfig) Option { } } +// WithRemoveConfig sets the remove API config for running remove request job. func WithRemoveConfig(c *config.RemoveConfig) Option { return func(j *job) error { if c != nil { @@ -88,6 +97,7 @@ func WithRemoveConfig(c *config.RemoveConfig) Option { } } +// WithValdClient sets the Vald client for sending request to the target Vald cluster. func WithValdClient(c vald.Client) Option { return func(j *job) error { if c == nil { @@ -98,16 +108,18 @@ func WithValdClient(c vald.Client) Option { } } +// WithErrGroup sets the errgroup to the job struct to handle errors. func WithErrGroup(eg errgroup.Group) Option { return func(j *job) error { if eg == nil { - return errors.NewErrInvalidOption("client", eg) + return errors.NewErrInvalidOption("error group", eg) } j.eg = eg return nil } } +// WithHdf5 sets the hdf5.Data which is used for benchmark job dataset. func WithHdf5(d hdf5.Data) Option { return func(j *job) error { if d == nil { @@ -118,6 +130,7 @@ func WithHdf5(d hdf5.Data) Option { } } +// WithDataset sets the config.BenchmarkDataset including benchmakr dataset name, group name of hdf5.Data, the number of index, start range and end range. func WithDataset(d *config.BenchmarkDataset) Option { return func(j *job) error { if d == nil { @@ -128,6 +141,7 @@ func WithDataset(d *config.BenchmarkDataset) Option { } } +// WithJobTypeByString converts given string to JobType. func WithJobTypeByString(t string) Option { var jt jobType switch t { @@ -139,6 +153,7 @@ func WithJobTypeByString(t string) Option { return WithJobType(jt) } +// WithJobType sets the jobType for running benchmark job. func WithJobType(jt jobType) Option { return func(j *job) error { switch jt { @@ -153,6 +168,7 @@ func WithJobType(jt jobType) Option { } } +// WithJobFunc sets the job function. func WithJobFunc(jf func(context.Context, chan error) error) Option { return func(j *job) error { if jf == nil { @@ -162,3 +178,48 @@ func WithJobFunc(jf func(context.Context, chan error) error) Option { return nil } } + +// WithBeforeJobName sets the beforeJobName which we should wait for until finish before running job. +func WithBeforeJobName(bjn string) Option { + return func(j *job) error { + if len(bjn) > 0 { + j.beforeJobName = bjn + } + return nil + } +} + +// WithBeforeJobNamespace sets the beforeJobNamespace of the beforeJobName which we should wait for until finish before running job. +func WithBeforeJobNamespace(bjns string) Option { + return func(j *job) error { + if len(bjns) > 0 { + j.beforeJobNamespace = bjns + } + return nil + } +} + +// WithBeforeJobDuration sets the duration for watching beforeJobName's status. +func WithBeforeJobDuration(dur string) Option { + return func(j *job) error { + if len(dur) == 0 { + return nil + } + dur, err := timeutil.Parse(dur) + if err != nil { + return err + } + j.beforeJobDur = dur + return nil + } +} + +// WithK8sClient binds the k8s client to the job struct which is used for get BenchmarkJobResource from Kubernetes API server. +func WithK8sClient(cli client.Client) Option { + return func(j *job) error { + if cli != nil { + j.k8sClient = cli + } + return nil + } +} diff --git a/pkg/tools/benchmark/job/usecase/benchmarkd.go b/pkg/tools/benchmark/job/usecase/benchmarkd.go index 4ceafcbfa9d..4bd9917b324 100644 --- a/pkg/tools/benchmark/job/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/job/usecase/benchmarkd.go @@ -100,6 +100,9 @@ func New(cfg *config.Config) (r runner.Runner, err error) { service.WithSearchConfig(cfg.Job.SearchConfig), service.WithRemoveConfig(cfg.Job.RemoveConfig), service.WithHdf5(d), + service.WithBeforeJobName(cfg.Job.BeforeJobName), + service.WithBeforeJobNamespace(cfg.Job.BeforeJobNamespace), + service.WithK8sClient(cfg.Job.Client), ) if err != nil { return nil, err diff --git a/pkg/tools/benchmark/operator/handler/grpc/handler.go b/pkg/tools/benchmark/operator/handler/grpc/handler.go index 2ac0cd13465..8120e778b07 100644 --- a/pkg/tools/benchmark/operator/handler/grpc/handler.go +++ b/pkg/tools/benchmark/operator/handler/grpc/handler.go @@ -33,7 +33,7 @@ type Benchmark interface { type server struct { benchmark.UnimplementedJobServer - scenario service.Scenario + operator service.Operator group singleflight.Group } diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go new file mode 100644 index 00000000000..9dc749c386f --- /dev/null +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -0,0 +1,612 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "reflect" + "strconv" + "sync/atomic" + "time" + + "github.com/vdaas/vald/internal/errgroup" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/k8s" + "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/k8s/job" + v1 "github.com/vdaas/vald/internal/k8s/vald/benchmark/api/v1" + benchjob "github.com/vdaas/vald/internal/k8s/vald/benchmark/job" + benchscenario "github.com/vdaas/vald/internal/k8s/vald/benchmark/scenario" + "github.com/vdaas/vald/internal/log" +) + +type Operator interface { + PreStart(context.Context) error + Start(context.Context) (<-chan error, error) +} + +type scenario struct { + Crd *v1.ValdBenchmarkScenario + BenchJobStatus map[string]v1.BenchmarkJobStatus +} + +const ( + Scenario = "scenario" + BenchmarkName = "benchmark-name" + BeforeJobName = "before-job-name" + BeforeJobNamespace = "before-job-namespace" +) + +type operator struct { + jobNamespace string + scenarios atomic.Pointer[map[string]*scenario] + benchjobs atomic.Pointer[map[string]*v1.ValdBenchmarkJob] + jobs atomic.Pointer[map[string]string] + rcd time.Duration // reconcile check duration + eg errgroup.Group + ctrl k8s.Controller +} + +// New creates the new scenario struct to handle vald benchmark job scenario. +// When the input options are invalid, the error will be returned. +func New(opts ...Option) (Operator, error) { + operator := new(operator) + for _, opt := range append(defaultOpts, opts...) { + if err := opt(operator); err != nil { + return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + } + } + + err := operator.initCtrl() + if err != nil { + return nil, err + } + return operator, nil +} + +// initCtrl creates the controller for reconcile k8s objects. +func (o *operator) initCtrl() error { + // watcher of vald benchmark scenario resource + benchScenario, err := benchscenario.New( + benchscenario.WithControllerName("benchmark scenario resource"), + benchscenario.WithNamespaces(o.jobNamespace), + benchscenario.WithOnErrorFunc(func(err error) { + log.Errorf("failed to reconcile benchmark scenario resource:", err) + }), + benchscenario.WithOnReconcileFunc(o.benchScenarioReconcile), + ) + if err != nil { + return err + } + + // watcher of vald benchmark job resource + benchJob, err := benchjob.New( + benchjob.WithControllerName("benchmark job resource"), + benchjob.WithOnErrorFunc(func(err error) { + log.Errorf("failed to reconcile benchmark job resource:", err) + }), + benchjob.WithNamespaces(o.jobNamespace), + benchjob.WithOnErrorFunc(func(err error) { + log.Error(err) + }), + benchjob.WithOnReconcileFunc(o.benchJobReconcile), + ) + if err != nil { + return err + } + + // watcher of job resource + job, err := job.New( + job.WithControllerName("benchmark job"), + job.WithNamespaces(o.jobNamespace), + job.WithOnErrorFunc(func(err error) { + log.Errorf("failed to reconcile job resource:", err) + }), + job.WithOnReconcileFunc(o.jobReconcile), + ) + if err != nil { + return err + } + + // create reconcile controller which watches valdbenchmarkscenario resource, valdbenchmarkjob resource, and job resource. + o.ctrl, err = k8s.New( + k8s.WithControllerName("vald benchmark scenario operator"), + k8s.WithResourceController(benchScenario), + k8s.WithResourceController(benchJob), + k8s.WithResourceController(job), + ) + return err +} + +func (o *operator) getAtomicScenario() map[string]*scenario { + if v := o.scenarios.Load(); v != nil { + return *(v) + } + return nil +} + +func (o *operator) getAtomicBenchJob() map[string]*v1.ValdBenchmarkJob { + if v := o.benchjobs.Load(); v != nil { + return *(v) + } + return nil +} + +func (o *operator) getAtomicJob() map[string]string { + if v := o.jobs.Load(); v != nil { + return *(v) + } + return nil +} + +// jobReconcile gets k8s job list and watches theirs STATUS. +// Then, it processes according STATUS. +func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Job) { + log.Debug("[reconcile job] start") + if len(jobList) == 0 { + log.Info("[reconcile job] no job is founded") + o.jobs.Store(&map[string]string{}) + return + } + cjobs := o.getAtomicJob() + if cjobs == nil { + cjobs = map[string]string{} + } + // jobStatus is used for update benchmark job resource status + benchmarkJobStatus := make(map[string]v1.BenchmarkJobStatus) + // jobNames is used for check whether cjobs has delted job. + // If cjobs has the delted job, it will be remove the end of jobReconcile function. + jobNames := map[string]struct{}{} + for _, jobs := range jobList { + cnt := len(jobs) + var name string + for _, job := range jobs { + if job.GetNamespace() != o.jobNamespace { + continue + } + jobNames[job.GetName()] = struct{}{} + if _, ok := cjobs[job.Name]; !ok && job.Status.CompletionTime == nil { + cjobs[job.GetName()] = job.Namespace + benchmarkJobStatus[job.GetName()] = v1.BenchmarkJobAvailable + continue + } else { + name = job.GetName() + } + if job.Status.Active == 0 && job.Status.Succeeded != 0 { + cnt-- + } + } + if cnt == 0 && len(name) != 0 { + benchmarkJobStatus[name] = v1.BenchmarkJobCompleted + } + } + if len(benchmarkJobStatus) != 0 { + _, err := o.updateBenchmarkJobStatus(ctx, benchmarkJobStatus) + if err != nil { + log.Error(err.Error) + } + } + // delete job which is not be in `jobList` from cj. + for k := range cjobs { + if _, ok := jobNames[k]; !ok { + delete(cjobs, k) + } + } + o.jobs.Store(&cjobs) + log.Debug("[reconcile job] finish") +} + +// benchmarkJobReconcile gets the vald benchmark job resource list and create Job for running benchmark job. +func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[string]v1.ValdBenchmarkJob) { + log.Debugf("[reconcile benchmark job resource] job list: %#v", benchJobList) + if len(benchJobList) == 0 { + o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) + log.Info("[reconcile benchmark job resource] job resource not found") + return + } + cbjl := o.getAtomicBenchJob() + if cbjl == nil { + cbjl = make(map[string]*v1.ValdBenchmarkJob, 0) + } + jobStatus := make(map[string]v1.BenchmarkJobStatus) + for k := range benchJobList { + // update scenario status + job := benchJobList[k] + if scenarios := o.getAtomicScenario(); scenarios != nil { + on := job.GetOwnerReferences()[0].Name + if _, ok := scenarios[on]; ok { + if scenarios[on].BenchJobStatus == nil { + scenarios[on].BenchJobStatus = map[string]v1.BenchmarkJobStatus{} + } + scenarios[on].BenchJobStatus[job.Name] = job.Status + } + o.scenarios.Store(&scenarios) + } + if oldJob := cbjl[k]; oldJob != nil { + if oldJob.GetGeneration() != job.GetGeneration() { + if job.Status != "" && oldJob.Status != v1.BenchmarkJobCompleted { + // delete old version job + err := o.deleteJob(ctx, oldJob.GetName(), oldJob.GetGeneration()) + if err != nil { + log.Warnf("[reconcile benchmark job resource] failed to delete old version job: job name=%s, version=%d\t%s", oldJob.GetName(), oldJob.GetGeneration(), err.Error()) + } + cbjl[k] = &job + } + } else if oldJob.Status == "" { + jobStatus[oldJob.GetName()] = v1.BenchmarkJobAvailable + } + } else if len(job.Status) == 0 || job.Status == v1.BenchmarkJobNotReady { + log.Info("[reconcile benchmark job resource] create job: ", k) + err := o.createJob(ctx, job) + if err != nil { + log.Errorf("[reconcile benchmark job resource] failed to create job: %s", err.Error()) + } + jobStatus[job.Name] = v1.BenchmarkJobAvailable + cbjl[k] = &job + } + } + // delete benchmark job which is not be in `benchJobList` from cbjl. + for k := range cbjl { + if _, ok := benchJobList[k]; !ok { + delete(cbjl, k) + } + } + o.benchjobs.Store(&cbjl) + if len(jobStatus) != 0 { + _, err := o.updateBenchmarkJobStatus(ctx, jobStatus) + if err != nil { + log.Errorf("[reconcile benchmark job resource] failed update job status: %s", err) + } + } + log.Debug("[reconcile benchmark job resource] finish") +} + +// benchScenarioReconcile gets the vald benchmark scenario list and create vald benchmark job resource according to it. +func (o *operator) benchScenarioReconcile(ctx context.Context, scenarioList map[string]v1.ValdBenchmarkScenario) { + log.Debugf("[reconcile benchmark scenario resource] scenario list: %#v", scenarioList) + if len(scenarioList) == 0 { + log.Info("[reconcile benchmark scenario resource]: scenario not found") + o.scenarios.Store(&(map[string]*scenario{})) + o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) + o.jobs.Store(&(map[string]string{})) + return + } + cbsl := o.getAtomicScenario() + if cbsl == nil { + cbsl = map[string]*scenario{} + } + scenarioStatus := make(map[string]v1.ValdBenchmarkScenarioStatus) + for name := range scenarioList { + sc := scenarioList[name] + if oldScenario := cbsl[name]; oldScenario == nil { + // apply new crd which is not set yet. + jobNames, err := o.createBenchmarkJob(ctx, sc) + if err != nil { + log.Errorf("[reconcile benchmark scenario resource] failed to create benchmark job resource: %s", err.Error()) + } + cbsl[name] = &scenario{ + Crd: &sc, + BenchJobStatus: func() map[string]v1.BenchmarkJobStatus { + s := map[string]v1.BenchmarkJobStatus{} + for _, v := range jobNames { + s[v] = v1.BenchmarkJobNotReady + } + return s + }(), + } + scenarioStatus[sc.GetName()] = v1.BenchmarkScenarioHealthy + } else { + // apply updated crd which is already applied. + if oldScenario.Crd.GetGeneration() < sc.GetGeneration() { + // delete old job resource. If it is succeeded, job pod will be deleted automatically because of OwnerReference. + err := o.deleteBenchmarkJob(ctx, oldScenario.Crd.GetName(), oldScenario.Crd.Generation) + if err != nil { + log.Warnf("[reconcile benchmark scenario resource] failed to delete old version benchmark jobs: scenario name=%s, version=%d\t%s", oldScenario.Crd.GetName(), oldScenario.Crd.Generation, err.Error()) + } + // create new benchmark job resources of new version. + jobNames, err := o.createBenchmarkJob(ctx, sc) + if err != nil { + log.Errorf("[reconcile benchmark scenario resource] failed to create benchmark job resource: %s", err.Error()) + } + cbsl[name] = &scenario{ + Crd: &sc, + BenchJobStatus: func() map[string]v1.BenchmarkJobStatus { + s := map[string]v1.BenchmarkJobStatus{} + for _, v := range jobNames { + s[v] = v1.BenchmarkJobNotReady + } + return s + }(), + } + } else { + // only update status + if oldScenario.Crd.Status != sc.Status { + cbsl[name].Crd.Status = sc.Status + } + } + } + } + // delete stored crd which is not be in `scenarioList` from cbsl. + for k := range cbsl { + if _, ok := scenarioList[k]; !ok { + delete(cbsl, k) + } + } + o.scenarios.Store(&cbsl) + // Update scenario status + _, err := o.updateBenchmarkScenarioStatus(ctx, scenarioStatus) + if err != nil { + log.Errorf("[reconcile benchmark scenario resource] failed to update benchmark scenario resource status: %s", err.Error()) + } + log.Debug("[reconcile benchmark scenario resource] finish") +} + +// deleteBenchmarkJob deletes benchmark job resource according to given scenario name and generation. +func (o *operator) deleteBenchmarkJob(ctx context.Context, name string, generation int64) error { + opts := new(client.DeleteAllOfOptions) + client.MatchingLabels(map[string]string{ + Scenario: name + strconv.Itoa(int(generation)), + }).ApplyToDeleteAllOf(opts) + client.InNamespace(o.jobNamespace).ApplyToDeleteAllOf(opts) + return o.ctrl.GetManager().GetClient().DeleteAllOf(ctx, &v1.ValdBenchmarkJob{}, opts) +} + +// deleteJob deletes job resource according to given benchmark job name and generation. +func (o *operator) deleteJob(ctx context.Context, name string, generation int64) error { + opts := new(client.DeleteAllOfOptions) + client.MatchingLabels(map[string]string{ + BenchmarkName: name + strconv.Itoa(int(generation)), + }).ApplyToDeleteAllOf(opts) + client.InNamespace(o.jobNamespace).ApplyToDeleteAllOf(opts) + return o.ctrl.GetManager().GetClient().DeleteAllOf(ctx, &job.Job{}, opts) +} + +// createBenchmarkJob creates the ValdBenchmarkJob crd for running job. +func (o *operator) createBenchmarkJob(ctx context.Context, scenario v1.ValdBenchmarkScenario) ([]string, error) { + ownerRef := []k8s.OwnerReference{ + { + APIVersion: scenario.APIVersion, + Kind: scenario.Kind, + Name: scenario.Name, + UID: scenario.UID, + }, + } + jobNames := make([]string, 0) + var beforeJobName string + for _, job := range scenario.Spec.Jobs { + bj := new(v1.ValdBenchmarkJob) + // set metadata.name, metadata.namespace, OwnerReference + bj.Name = scenario.GetName() + "-" + job.JobType + "-" + strconv.FormatInt(time.Now().UnixNano(), 10) + bj.Namespace = scenario.GetNamespace() + bj.SetOwnerReferences(ownerRef) + // set label + labels := map[string]string{ + Scenario: scenario.GetName() + strconv.Itoa(int(scenario.Generation)), + } + bj.SetLabels(labels) + // set annotations for wating before job + annotations := map[string]string{ + BeforeJobName: beforeJobName, + BeforeJobNamespace: o.jobNamespace, + } + bj.SetAnnotations(annotations) + // set specs + bj.Spec = *job + if bj.Spec.Target == nil { + bj.Spec.Target = scenario.Spec.Target + } + if bj.Spec.Dataset == nil { + bj.Spec.Dataset = scenario.Spec.Dataset + } + // set status + bj.Status = v1.BenchmarkJobNotReady + // create benchmark job resource + c := o.ctrl.GetManager().GetClient() + if err := c.Create(ctx, bj); err != nil { + return nil, errors.ErrFailedToCreateBenchmarkJob(err, bj.GetName()) + } + jobNames = append(jobNames, bj.Name) + beforeJobName = bj.Name + } + return jobNames, nil +} + +// createJob creates benchmark job from benchmark job resource. +func (o *operator) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) error { + label := map[string]string{ + BenchmarkName: bjr.GetName() + strconv.Itoa(int(bjr.Generation)), + } + job, err := benchjob.NewBenchmarkJobTemplate( + benchjob.WithName(bjr.Name), + benchjob.WithNamespace(bjr.Namespace), + benchjob.WithLabel(label), + benchjob.WithCompletions(int32(bjr.Spec.Repetition)), + benchjob.WithParallelism(int32(bjr.Spec.Replica)), + benchjob.WithOwnerRef([]k8s.OwnerReference{ + { + APIVersion: bjr.APIVersion, + Kind: bjr.Kind, + Name: bjr.Name, + UID: bjr.UID, + }, + }), + ) + if err != nil { + return err + } + // create job + c := o.ctrl.GetManager().GetClient() + if err = c.Create(ctx, &job); err != nil { + return errors.ErrFailedToCreateJob(err, job.GetName()) + } + return nil +} + +// updateBenchmarkScenarioStatus updates status of ValdBenchmarkScenarioResource +func (o *operator) updateBenchmarkScenarioStatus(ctx context.Context, ss map[string]v1.ValdBenchmarkScenarioStatus) ([]string, error) { + var sns []string + if cbsl := o.getAtomicScenario(); cbsl != nil { + for name, status := range ss { + if scenario, ok := cbsl[name]; ok { + if scenario.Crd.Status == status { + continue + } + scenario.Crd.Status = status + cli := o.ctrl.GetManager().GetClient() + err := cli.Status().Update(ctx, scenario.Crd) + if err != nil { + log.Error(err.Error()) + continue + } + sns = append(sns, name) + } + } + } + return sns, nil +} + +// updateBenchmarkJobStatus updates status of ValdBenchmarkJobResource +func (o *operator) updateBenchmarkJobStatus(ctx context.Context, js map[string]v1.BenchmarkJobStatus) ([]string, error) { + var jns []string + if cbjl := o.getAtomicBenchJob(); cbjl != nil { + for name, status := range js { + if bjob, ok := cbjl[name]; ok { + if bjob.Status == status { + continue + } + bjob.Status = status + cli := o.ctrl.GetManager().GetClient() + err := cli.Status().Update(ctx, bjob) + if err != nil { + log.Error(err.Error()) + continue + } + jns = append(jns, name) + } + } + } + return jns, nil +} + +func (o *operator) checkJobsStatus(ctx context.Context, jobs map[string]string) error { + cbjl := o.getAtomicBenchJob() + if jobs == nil || cbjl == nil { + log.Infof("[check job status] no job launched") + return nil + } + job := new(job.Job) + c := o.ctrl.GetManager().GetClient() + jobStatus := map[string]v1.BenchmarkJobStatus{} + for name, ns := range jobs { + err := c.Get(ctx, client.ObjectKey{ + Namespace: ns, + Name: name, + }, job) + if err != nil { + return err + } + if job.Status.Active != 0 || job.Status.Failed != 0 { + continue + } + if job.Status.Succeeded != 0 { + if job, ok := cbjl[name]; ok { + if job.Status != v1.BenchmarkJobCompleted { + jobStatus[name] = v1.BenchmarkJobCompleted + } + } + } + + } + _, err := o.updateBenchmarkJobStatus(ctx, jobStatus) + return err +} + +func (o *operator) initAtomics() { + if cbsl := o.getAtomicScenario(); len(cbsl) > 0 { + o.scenarios.Store(&(map[string]*scenario{})) + } + if cbjl := o.getAtomicBenchJob(); len(cbjl) > 0 { + o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) + } + if cjl := o.getAtomicJob(); len(cjl) > 0 { + o.jobs.Store(&(map[string]string{})) + } +} + +func (o *operator) PreStart(ctx context.Context) error { + log.Infof("[benchmark scenario operator] start vald benchmark scenario operator") + return nil +} + +func (o *operator) Start(ctx context.Context) (<-chan error, error) { + scch, err := o.ctrl.Start(ctx) + if err != nil { + return nil, err + } + ech := make(chan error, 2) + o.eg.Go(func() error { + defer close(ech) + rcticker := time.NewTicker(o.rcd) + defer rcticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-rcticker.C: + cbsl := o.getAtomicScenario() + if cbsl == nil { + log.Info("benchmark scenario resource is empty") + // clear atomic pointer + o.initAtomics() + continue + } else { + scenarioStatus := make(map[string]v1.ValdBenchmarkScenarioStatus) + for name, scenario := range cbsl { + if scenario.Crd.Status != v1.BenchmarkScenarioCompleted { + cnt := len(scenario.BenchJobStatus) + for _, bjob := range scenario.BenchJobStatus { + if bjob == v1.BenchmarkJobCompleted { + cnt-- + } + } + if cnt == 0 { + scenarioStatus[name] = v1.BenchmarkScenarioCompleted + } + } + } + if _, err := o.updateBenchmarkScenarioStatus(ctx, scenarioStatus); err != nil { + log.Errorf("failed to update benchmark scenario to %s\terror: %s", v1.BenchmarkJobCompleted, err.Error()) + } + + } + // get job and check status + if jobs := o.getAtomicJob(); jobs != nil { + err = o.checkJobsStatus(ctx, jobs) + if err != nil { + log.Error(err.Error()) + } + } + case err = <-scch: + if err != nil { + ech <- err + } + } + } + }) + return ech, nil +} diff --git a/pkg/tools/benchmark/operator/service/option.go b/pkg/tools/benchmark/operator/service/option.go index 43b10ec5813..da106afa7f1 100644 --- a/pkg/tools/benchmark/operator/service/option.go +++ b/pkg/tools/benchmark/operator/service/option.go @@ -25,31 +25,44 @@ import ( ) // Option represents the functional option for scenario struct. -type Option func(sc *scenario) error +type Option func(o *operator) error var defaultOpts = []Option{ WithReconcileCheckDuration("10s"), + WithJobNamespace("default"), } // WithErrGroup sets the error group to scenario. func WithErrGroup(eg errgroup.Group) Option { - return func(sc *scenario) error { + return func(o *operator) error { if eg == nil { return errors.NewErrInvalidOption("client", eg) } - sc.eg = eg + o.eg = eg return nil } } // WithReconcileCheckDuration sets the reconcile check duration from input string. func WithReconcileCheckDuration(ts string) Option { - return func(sc *scenario) error { + return func(o *operator) error { t, err := time.ParseDuration(ts) if err != nil { return err } - sc.rcd = t + o.rcd = t + return nil + } +} + +// WithJobNamespace sets the namespace for running benchmark job. +func WithJobNamespace(ns string) Option { + return func(o *operator) error { + if len(ns) == 0 { + o.jobNamespace = "default" + } else { + o.jobNamespace = ns + } return nil } } diff --git a/pkg/tools/benchmark/operator/service/scenario.go b/pkg/tools/benchmark/operator/service/scenario.go deleted file mode 100644 index dcd15ceab04..00000000000 --- a/pkg/tools/benchmark/operator/service/scenario.go +++ /dev/null @@ -1,372 +0,0 @@ -// -// Copyright (C) 2019-2022 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -// Package service manages the main logic of benchmark job. -package service - -import ( - "context" - "reflect" - "strconv" - "sync/atomic" - "time" - - "github.com/vdaas/vald/internal/errgroup" - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/k8s" - "github.com/vdaas/vald/internal/k8s/job" - v1 "github.com/vdaas/vald/internal/k8s/vald/benchmark/api/v1" - benchjob "github.com/vdaas/vald/internal/k8s/vald/benchmark/job" - benchscenario "github.com/vdaas/vald/internal/k8s/vald/benchmark/scenario" - "github.com/vdaas/vald/internal/log" - corev1 "k8s.io/api/core/v1" - k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -type Scenario interface { - PreStart(context.Context) error - Start(context.Context) (<-chan error, error) -} - -type scenario struct { - jobs atomic.Value - jobName string - jobNamespace string - jobTemplate string // row manifest template data of rebalance job. - jobObject *job.Job // object generated from template. - currentDeviationJobName atomic.Value - - scenarios atomic.Value - benchjobs atomic.Value - - rcd time.Duration // reconcile check duration - eg errgroup.Group - ctrl k8s.Controller -} - -// New creates the new scenario struct to handle vald benchmark job scenario. -// When the input options are invalid, the error will be returned. -func New(opts ...Option) (Scenario, error) { - sc := new(scenario) - for _, opt := range append(defaultOpts, opts...) { - if err := opt(sc); err != nil { - return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt)) - } - } - - err := sc.initCtrl() - if err != nil { - return nil, err - } - return sc, nil -} - -// initCtrl creates the controller for reconcile k8s objects. -func (sc *scenario) initCtrl() (err error) { - // watcher of vald benchmark scenario resource - bs, err := benchscenario.New( - benchscenario.WithControllerName("benchmark scenario resource"), - benchscenario.WithNamespaces(sc.jobNamespace), - benchscenario.WithOnErrorFunc(func(err error) { - log.Errorf("failed to reconcile:", err) - }), - benchscenario.WithOnReconcileFunc(sc.benchScenarioReconcile), - ) - if err != nil { - return - } - - // watcher of vald benchmark job resource - bj, err := benchjob.New( - benchjob.WithControllerName("benchmark job resource"), - benchjob.WithOnErrorFunc(func(err error) { - log.Errorf("failed to reconcile:", err) - }), - benchjob.WithNamespaces(sc.jobNamespace), - benchjob.WithOnErrorFunc(func(err error) { - log.Error(err) - }), - benchjob.WithOnReconcileFunc(sc.benchJobReconcile), - ) - if err != nil { - return - } - - // watcher of job resource - job, err := job.New( - job.WithControllerName("benchmark job"), - job.WithNamespaces(sc.jobNamespace), - job.WithOnErrorFunc(func(err error) { - log.Errorf("failed to reconcile:", err) - }), - job.WithOnReconcileFunc(sc.jobReconcile), - ) - if err != nil { - return - } - - // create reconcile controller which watches valdbenchmarkscenario resource, valdbenchmarkjob resource, and job resource. - sc.ctrl, err = k8s.New( - k8s.WithControllerName("vald benchmark operator"), - k8s.WithResourceController(bs), - k8s.WithResourceController(bj), - k8s.WithResourceController(job), - ) - return -} - -// jobReconcile gets k8s job list and watches theirs STATUS. -// Then, it processes according STATUS. -func (sc *scenario) jobReconcile(ctx context.Context, jobList map[string][]job.Job) { - // TODO: impl logic - // for k, v := range jobList { - // log.Warnf("key: %s, value: %v", k, v) - // } - return -} - -// benchmarkJobReconcile gets the vald benchmark job resource list and create Job for running benchmark job. -func (sc *scenario) benchJobReconcile(ctx context.Context, jobList map[string]v1.ValdBenchmarkJob) { - log.Debugf("[reconcile benchmark job resource] job list: %#v", jobList) - if len(jobList) == 0 { - sc.benchjobs.Store(make(map[string]*v1.ValdBenchmarkJob, 0)) - log.Info("[reconcile benchmark job resource] job resource not found") - return - } - var cbjl map[string]*v1.ValdBenchmarkJob - if ok := sc.benchjobs.Load(); ok == nil { - cbjl = make(map[string]*v1.ValdBenchmarkJob, 0) - } else { - cbjl = ok.(map[string]*v1.ValdBenchmarkJob) - } - for k, job := range jobList { - if oldJob := cbjl[k]; oldJob != nil { - if oldJob.GetGeneration() != job.GetGeneration() { - // TODO: delete old version job - cbjl[k] = &job - } - } else { - log.Info("create job: ", k) - err := sc.createJob(ctx, job) - if err != nil { - log.Errorf("[reconcile benchmark job resource] failed to create job: %s", err.Error()) - } - cbjl[k] = &job - } - } - sc.benchjobs.Store(cbjl) -} - -// benchScenarioReconcile gets the vald benchmark scenario list and create vald benchmark job resource according to it. -func (sc *scenario) benchScenarioReconcile(ctx context.Context, scenarioList map[string]v1.ValdBenchmarkScenario) { - log.Debugf("[reconcile benchmark scenario resource] scenario list: %#v", scenarioList) - if len(scenarioList) == 0 { - sc.scenarios.Store(make(map[string]*v1.ValdBenchmarkScenario, 0)) - sc.benchjobs.Store(make(map[string]*v1.ValdBenchmarkJob, 0)) - log.Info("[reconcile benchmark scenario resource]: scenario not found") - return - } - var cbsl map[string]*v1.ValdBenchmarkScenario - if ok := sc.scenarios.Load(); ok == nil { - cbsl = make(map[string]*v1.ValdBenchmarkScenario, len(scenarioList)) - } else { - cbsl = ok.(map[string]*v1.ValdBenchmarkScenario) - } - for k, scenario := range scenarioList { - if oldScenario := cbsl[k]; oldScenario == nil { - err := sc.createBenchmarkJob(ctx, scenario) - if err != nil { - log.Errorf("[reconcile scenario] failed to create job: %s", err.Error()) - } - cbsl[k] = &scenario - } else { - // TODO delete old jobresource and job - if oldScenario.GetGeneration() != scenario.GetGeneration() { - cbsl[k] = &scenario - } - } - } - sc.scenarios.Store(cbsl) -} - -// createBenchmarkJob creates the ValdBenchmarkJob crd for running job. -func (sc *scenario) createBenchmarkJob(ctx context.Context, scenario v1.ValdBenchmarkScenario) error { - ownerRef := []k8smeta.OwnerReference{ - { - APIVersion: scenario.APIVersion, - Kind: scenario.Kind, - Name: scenario.Name, - UID: scenario.UID, - }, - } - for _, job := range scenario.Spec.Jobs { - bj := new(v1.ValdBenchmarkJob) - // set metadata.name, metadata.namespace - bj.Name = scenario.GetName() + "-" + job.JobType + "-" + strconv.FormatInt(time.Now().UnixNano(), 10) - bj.Namespace = scenario.GetNamespace() - bj.SetOwnerReferences(ownerRef) - - // set specs - bj.Spec = *job - if bj.Spec.Target == nil { - bj.Spec.Target = scenario.Spec.Target - } - if bj.Spec.Dataset == nil { - bj.Spec.Dataset = scenario.Spec.Dataset - } - // create benchmark job resource - c := sc.ctrl.GetManager().GetClient() - if err := c.Create(ctx, bj); err != nil { - // TODO: create new custom error - return err - } - } - return nil -} - -// createJobTemplate creates the job template for crating k8s job resource. -// ns and name are required to set job environment value. -func createJobTemplate(ns, name string) job.Job { - j := new(job.Job) - backoffLimit := int32(0) - j.Spec.BackoffLimit = &backoffLimit - j.Spec.Template.Spec.ServiceAccountName = "vald-benchmark-operator" - j.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever - j.Spec.Template.Spec.Containers = []corev1.Container{ - { - Name: "vald-benchmark-job", - Image: "local-registry:5000/vdaas/vald-benchmark-job:latest", - ImagePullPolicy: corev1.PullAlways, - LivenessProbe: &corev1.Probe{ - InitialDelaySeconds: int32(60), - PeriodSeconds: int32(10), - TimeoutSeconds: int32(300), - ProbeHandler: corev1.ProbeHandler{ - Exec: &corev1.ExecAction{ - Command: []string{ - "/go/bin/job", - "-v", - }, - }, - }, - }, - StartupProbe: &corev1.Probe{ - FailureThreshold: int32(30), - PeriodSeconds: int32(10), - TimeoutSeconds: int32(300), - ProbeHandler: corev1.ProbeHandler{ - Exec: &corev1.ExecAction{ - Command: []string{ - "/go/bin/job", - "-v", - }, - }, - }, - }, - Ports: []corev1.ContainerPort{ - { - Name: "liveness", - Protocol: corev1.ProtocolTCP, - ContainerPort: int32(3000), - }, - { - Name: "readiness", - Protocol: corev1.ProtocolTCP, - ContainerPort: int32(3001), - }, - }, - Env: []corev1.EnvVar{ - { - Name: "CRD_NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "CRD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.labels['job-name']", - }, - }, - }, - }, - }, - } - return *j -} - -// createJob creates benchmark job from benchmark job resource. -func (sc *scenario) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) (err error) { - bj := createJobTemplate(bjr.Namespace, bjr.Name) - bj.Name = bjr.Name - bj.Namespace = bjr.Namespace - bj.SetOwnerReferences(bjr.GetOwnerReferences()) - // create job - c := sc.ctrl.GetManager().GetClient() - if err = c.Create(ctx, &bj); err != nil { - // TODO: create new custom error - return err - } - if ok := sc.jobs.Load(); ok == nil { - sc.jobs.Store([]string{bj.Name}) - return - } else { - jobs := sc.jobs.Load().([]string) - jobs = append(jobs, bj.Name) - sc.jobs.Swap(jobs) - } - return -} - -func (sc *scenario) PreStart(ctx context.Context) error { - log.Infof("[benchmark scenario] start vald benchmark scenario") - return nil -} - -func (sc *scenario) Start(ctx context.Context) (<-chan error, error) { - scch, err := sc.ctrl.Start(ctx) - if err != nil { - return nil, err - } - ech := make(chan error, 2) - sc.eg.Go(func() error { - defer close(ech) - dt := time.NewTicker(sc.rcd) - defer dt.Stop() - for { - select { - case <-ctx.Done(): - return nil - case <-dt.C: - // TODO: Get Resource - _, ok := sc.scenarios.Load().(map[string]*v1.ValdBenchmarkScenario) - if !ok { - log.Info("benchmark scenario resource is empty") - continue - } - case err = <-scch: - if err != nil { - ech <- err - } - } - } - }) - - return ech, nil -} diff --git a/pkg/tools/benchmark/operator/usecase/benchmarkd.go b/pkg/tools/benchmark/operator/usecase/benchmarkd.go index 116dfd6cd79..6aa069b0c0b 100644 --- a/pkg/tools/benchmark/operator/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/operator/usecase/benchmarkd.go @@ -43,7 +43,7 @@ import ( type run struct { eg errgroup.Group cfg *config.Config - scenario service.Scenario + operator service.Operator h handler.Benchmark server starter.Server observability observability.Observability @@ -56,7 +56,7 @@ func New(cfg *config.Config) (r runner.Runner, err error) { log.Info("pkg/tools/benchmark/scenario/cmd success d") - sc, err := service.New( + operator, err := service.New( service.WithErrGroup(eg), ) if err != nil { @@ -125,7 +125,7 @@ func New(cfg *config.Config) (r runner.Runner, err error) { return &run{ eg: eg, cfg: cfg, - scenario: sc, + operator: operator, h: h, server: srv, observability: obs, @@ -138,8 +138,8 @@ func (r *run) PreStart(ctx context.Context) error { return err } } - if r.scenario != nil { - return r.scenario.PreStart(ctx) + if r.operator != nil { + return r.operator.PreStart(ctx) } return nil } @@ -153,7 +153,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { oech = r.observability.Start(ctx) } - dech, err = r.scenario.Start(ctx) + dech, err = r.operator.Start(ctx) if err != nil { ech <- err return err