Skip to content

Commit

Permalink
✨ impl status handle of continuous benchmark crds (#1955)
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi committed Sep 25, 2023
1 parent f6479c0 commit 10ecd81
Show file tree
Hide file tree
Showing 19 changed files with 1,036 additions and 417 deletions.
31 changes: 18 additions & 13 deletions internal/config/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@
// Package config providers configuration type and load configuration logic
package config

import "github.com/vdaas/vald/internal/k8s/client"

// BenchmarkJob represents the configuration for the internal benchmark search job.
type BenchmarkJob struct {
Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"`
Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"`
Dimension int `json:"dimension,omitempty" yaml:"dimension"`
Replica int `json:"replica,omitempty" yaml:"replica"`
Repetition int `json:"repetition,omitempty" yaml:"repetition"`
JobType string `json:"job_type,omitempty" yaml:"job_type"`
InsertConfig *InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"`
UpdateConfig *UpdateConfig `json:"update_config,omitempty" yaml:"update_config"`
UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"`
SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"`
RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"`
ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"`
Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"`
Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"`
Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"`
Dimension int `json:"dimension,omitempty" yaml:"dimension"`
Replica int `json:"replica,omitempty" yaml:"replica"`
Repetition int `json:"repetition,omitempty" yaml:"repetition"`
JobType string `json:"job_type,omitempty" yaml:"job_type"`
InsertConfig *InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"`
UpdateConfig *UpdateConfig `json:"update_config,omitempty" yaml:"update_config"`
UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"`
SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"`
RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"`
ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"`
Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"`
BeforeJobName string `json:"before_job_name,omitempty" yaml:"before_job_name"`
BeforeJobNamespace string `json:"before_job_namespace,omitempty" yaml:"before_job_namespace"`
Client client.Client `json:"client,omitempty" yaml:"client"`
}

// BenchmarkScenario represents the configuration for the internal benchmark scenario.
Expand Down
14 changes: 13 additions & 1 deletion internal/errors/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,16 @@
// Package errors provides benchmark error
package errors

var ErrInvalidCoreMode = New("invalid core mode")
var (
ErrInvalidCoreMode = New("invalid core mode")

// ErrFailedToCreateBenchmarkJob represents a function to generate an error that failed to create benchmark job crd.
ErrFailedToCreateBenchmarkJob = func(err error, jn string) error {
return Wrapf(err, "could not create benchmark job resource: %s ", jn)
}

// ErrFailedToCreateJob represents a function to generate an error that failed to create job resource.
ErrFailedToCreateJob = func(err error, jn string) error {
return Wrapf(err, "could not create job: %s ", jn)
}
)
9 changes: 9 additions & 0 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type (
Object = cli.Object
ObjectKey = cli.ObjectKey
DeleteAllOfOptions = cli.DeleteAllOfOptions
ListOptions = cli.ListOptions
MatchingLabels = cli.MatchingLabels
InNamespace = cli.InNamespace
)

type Client interface {
// Get retrieves an obj for the given object key from the Kubernetes Cluster.
// obj must be a struct pointer so that obj can be updated with the response
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/crd/benchmark/valdbenchmarkjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ spec:
description: ValdBenchmarkJobStatus defines the observed state of ValdBenchmarkJob
enum:
- NotReady
- Completed
- Available
- Healthy
type: string
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ spec:
description: ValdBenchmarkScenarioStatus defines the observed state of ValdBenchmarkScenario
enum:
- NotReady
- Completed
- Available
- Healthy
type: string
Expand Down
6 changes: 5 additions & 1 deletion internal/k8s/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync/errgroup"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -34,7 +35,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type Manager = manager.Manager
type (
Manager = manager.Manager
OwnerReference = v1.OwnerReference
)

type Controller interface {
Start(ctx context.Context) (<-chan error, error)
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/vald/benchmark/api/v1/job_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type BenchmarkJobStatus string

const (
BenchmarkJobNotReady = BenchmarkJobStatus("NotReady")
BenchmarkJobCompleted = BenchmarkJobStatus("Completed")
BenchmarkJobAvailable = BenchmarkJobStatus("Available")
BenchmarkJobHealthy = BenchmarkJobStatus("Healthy")
)
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/vald/benchmark/api/v1/scenario_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ValdBenchmarkScenarioStatus string

const (
BenchmarkScenarioNotReady ValdBenchmarkScenarioStatus = "NotReady"
BenchmarkScenarioCompleted ValdBenchmarkScenarioStatus = "Completed"
BenchmarkScenarioAvailable ValdBenchmarkScenarioStatus = "Available"
BenchmarkScenarioHealthy ValdBenchmarkScenarioStatus = "Healthy"
)
Expand Down
110 changes: 110 additions & 0 deletions internal/k8s/vald/benchmark/job/job_template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//
// Copyright (C) 2019-2022 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package job manages the main logic of benchmark job.
package job

import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
)

type benchmarkJobTemplate = batchv1.Job

const (
SvcAccountName = "vald-benchmark-operator"
ContainerName = "vald-benchmark-job"
ContainerImage = "local-registry:5000/vdaas/vald-benchmark-job:latest"

RestartPolicyAlways corev1.RestartPolicy = "Always"
RestartPolicyOnFailure corev1.RestartPolicy = "OnFailure"
RestartPolicyNever corev1.RestartPolicy = "Never"
)

// NewBenchmarkJobTemplate creates the job template for crating k8s job resource.
func NewBenchmarkJobTemplate(opts ...BenchmarkJobOption) (benchmarkJobTemplate, error) {
jobTmpl := new(benchmarkJobTemplate)
for _, opt := range append(defaultBenchmarkJobOpts, opts...) {
err := opt(jobTmpl)
if err != nil {
return *jobTmpl, err
}
}
jobTmpl.Spec.Template.Spec.Containers = []corev1.Container{
{
Name: ContainerName,
Image: ContainerImage,
ImagePullPolicy: corev1.PullAlways,
LivenessProbe: &corev1.Probe{
InitialDelaySeconds: int32(60),
PeriodSeconds: int32(10),
TimeoutSeconds: int32(300),
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{
"/go/bin/job",
"-v",
},
},
},
},
StartupProbe: &corev1.Probe{
FailureThreshold: int32(30),
PeriodSeconds: int32(10),
TimeoutSeconds: int32(300),
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{
"/go/bin/job",
"-v",
},
},
},
},
Ports: []corev1.ContainerPort{
{
Name: "liveness",
Protocol: corev1.ProtocolTCP,
ContainerPort: int32(3000),
},
{
Name: "readiness",
Protocol: corev1.ProtocolTCP,
ContainerPort: int32(3001),
},
},
Env: []corev1.EnvVar{
{
Name: "CRD_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "CRD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.labels['job-name']",
},
},
},
},
},
}
return *jobTmpl, nil
}
114 changes: 114 additions & 0 deletions internal/k8s/vald/benchmark/job/job_template_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//
// Copyright (C) 2019-2022 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package job

import (
"github.com/vdaas/vald/internal/k8s"
corev1 "k8s.io/api/core/v1"
)

// BenchmarkJobOption represents the option for create benchmark job template.
type BenchmarkJobOption func(b *benchmarkJobTemplate) error

var defaultBenchmarkJobOpts = []BenchmarkJobOption{
WithSvcAccountName(SvcAccountName),
WithRestartPolicy(RestartPolicyNever),
}

// WithSvcAccountName sets the service account name for benchmark job.
func WithSvcAccountName(name string) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if len(name) > 0 {
b.Spec.Template.Spec.ServiceAccountName = name
}
return nil
}
}

// WithRestartPolicy sets the job restart policy for benchmark job.
func WithRestartPolicy(rp corev1.RestartPolicy) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if len(rp) > 0 {
b.Spec.Template.Spec.RestartPolicy = rp
}
return nil
}
}

// WithBackoffLimit sets the job backoff limit for benchmark job.
func WithBackoffLimit(bo int32) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
b.Spec.BackoffLimit = &bo
return nil
}
}

// WithName sets the job name of benchmark job.
func WithName(name string) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
b.Name = name
return nil
}
}

// WithNamespace specify namespace where job will execute.
func WithNamespace(ns string) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
b.Namespace = ns
return nil
}
}

// WithOwnerRef sets the OwnerReference to the job resource.
func WithOwnerRef(refs []k8s.OwnerReference) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if len(refs) > 0 {
b.OwnerReferences = refs
}
return nil
}
}

// WithCompletions sets the job completion.
func WithCompletions(com int32) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if com > 1 {
b.Spec.Completions = &com
}
return nil
}
}

// WithParallelism sets the job parallelism.
func WithParallelism(parallelism int32) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if parallelism > 1 {
b.Spec.Parallelism = &parallelism
}
return nil
}
}

// WithLabel sets the label to the job resource.
func WithLabel(label map[string]string) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if len(label) > 0 {
b.Labels = label
}
return nil
}
}
16 changes: 12 additions & 4 deletions pkg/tools/benchmark/job/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,15 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) {

// Get config from applied ValdBenchmarkJob custom resource
var jobResource v1.ValdBenchmarkJob
c, err := client.New(client.WithSchemeBuilder(*v1.SchemeBuilder))
if err != nil {
log.Warn(err.Error())
if cfg.Job.Client == nil {
c, err := client.New(client.WithSchemeBuilder(*v1.SchemeBuilder))
if err != nil {
log.Error(err.Error())
return nil, err
}
cfg.Job.Client = c
}
err = c.Get(ctx, NAME, NAMESPACE, &jobResource)
err = cfg.Job.Client.Get(ctx, NAME, NAMESPACE, &jobResource)
if err != nil {
log.Warn(err.Error())
} else {
Expand All @@ -105,6 +109,10 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) {
cfg.Job.SearchConfig = jobResource.Spec.SearchConfig
cfg.Job.RemoveConfig = jobResource.Spec.RemoveConfig
cfg.Job.ClientConfig = jobResource.Spec.ClientConfig
if annotations := jobResource.GetAnnotations(); annotations != nil {
cfg.Job.BeforeJobName = annotations["before-job-name"]
cfg.Job.BeforeJobNamespace = annotations["before-job-namespace"]
}
}

return cfg, nil
Expand Down
Loading

0 comments on commit 10ecd81

Please sign in to comment.