Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl: status handle and update crd logic for continuous benchmark tool #1955

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions internal/config/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@
// Package config providers configuration type and load configuration logic
package config

import "github.com/vdaas/vald/internal/k8s/client"

// BenchmarkJob represents the configuration for the internal benchmark search job.
type BenchmarkJob struct {
Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"`
Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"`
Dimension int `json:"dimension,omitempty" yaml:"dimension"`
Replica int `json:"replica,omitempty" yaml:"replica"`
Repetition int `json:"repetition,omitempty" yaml:"repetition"`
JobType string `json:"job_type,omitempty" yaml:"job_type"`
InsertConfig *InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"`
UpdateConfig *UpdateConfig `json:"update_config,omitempty" yaml:"update_config"`
UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"`
SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"`
RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"`
ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"`
Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"`
Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"`
Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"`
Dimension int `json:"dimension,omitempty" yaml:"dimension"`
Replica int `json:"replica,omitempty" yaml:"replica"`
Repetition int `json:"repetition,omitempty" yaml:"repetition"`
JobType string `json:"job_type,omitempty" yaml:"job_type"`
InsertConfig *InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"`
UpdateConfig *UpdateConfig `json:"update_config,omitempty" yaml:"update_config"`
UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"`
SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"`
RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"`
ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"`
Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"`
BeforeJobName string `json:"before_job_name,omitempty" yaml:"before_job_name"`
BeforeJobNamespace string `json:"before_job_namespace,omitempty" yaml:"before_job_namespace"`
Client client.Client `json:"client,omitempty" yaml:"client"`
}

// BenchmarkScenario represents the configuration for the internal benchmark scenario.
Expand Down
14 changes: 13 additions & 1 deletion internal/errors/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,16 @@
// Package errors provides benchmark error
package errors

var ErrInvalidCoreMode = New("invalid core mode")
var (
ErrInvalidCoreMode = New("invalid core mode")

// ErrFailedToCreateBenchmarkJob represents a function to generate an error that failed to create benchmark job crd.
ErrFailedToCreateBenchmarkJob = func(err error, jn string) error {
return Wrapf(err, "could not create benchmark job resource: %s ", jn)
}

// ErrFailedToCreateJob represents a function to generate an error that failed to create job resource.
ErrFailedToCreateJob = func(err error, jn string) error {
return Wrapf(err, "could not create job: %s ", jn)
}
)
9 changes: 9 additions & 0 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type (
Object = cli.Object
ObjectKey = cli.ObjectKey
DeleteAllOfOptions = cli.DeleteAllOfOptions
ListOptions = cli.ListOptions
MatchingLabels = cli.MatchingLabels
InNamespace = cli.InNamespace
)

type Client interface {
// Get retrieves an obj for the given object key from the Kubernetes Cluster.
// obj must be a struct pointer so that obj can be updated with the response
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/crd/benchmark/valdbenchmarkjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ spec:
description: ValdBenchmarkJobStatus defines the observed state of ValdBenchmarkJob
enum:
- NotReady
- Completed
- Available
- Healthy
type: string
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ spec:
description: ValdBenchmarkScenarioStatus defines the observed state of ValdBenchmarkScenario
enum:
- NotReady
- Completed
- Available
- Healthy
type: string
Expand Down
6 changes: 5 additions & 1 deletion internal/k8s/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/safety"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -35,7 +36,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

type Manager = manager.Manager
type (
Manager = manager.Manager
OwnerReference = v1.OwnerReference
)

type Controller interface {
Start(ctx context.Context) (<-chan error, error)
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/vald/benchmark/api/v1/job_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type BenchmarkJobStatus string

const (
BenchmarkJobNotReady = BenchmarkJobStatus("NotReady")
BenchmarkJobCompleted = BenchmarkJobStatus("Completed")
BenchmarkJobAvailable = BenchmarkJobStatus("Available")
BenchmarkJobHealthy = BenchmarkJobStatus("Healthy")
)
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/vald/benchmark/api/v1/scenario_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ValdBenchmarkScenarioStatus string

const (
BenchmarkScenarioNotReady ValdBenchmarkScenarioStatus = "NotReady"
BenchmarkScenarioCompleted ValdBenchmarkScenarioStatus = "Completed"
BenchmarkScenarioAvailable ValdBenchmarkScenarioStatus = "Available"
BenchmarkScenarioHealthy ValdBenchmarkScenarioStatus = "Healthy"
)
Expand Down
110 changes: 110 additions & 0 deletions internal/k8s/vald/benchmark/job/job_template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//
// Copyright (C) 2019-2022 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package job manages the main logic of benchmark job.
package job

import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
)

type benchmarkJobTemplate = batchv1.Job

const (
SvcAccountName = "vald-benchmark-operator"
ContainerName = "vald-benchmark-job"
ContainerImage = "local-registry:5000/vdaas/vald-benchmark-job:latest"

RestartPolicyAlways corev1.RestartPolicy = "Always"
RestartPolicyOnFailure corev1.RestartPolicy = "OnFailure"
RestartPolicyNever corev1.RestartPolicy = "Never"
)

// NewBenchmarkJobTemplate creates the job template for crating k8s job resource.
func NewBenchmarkJobTemplate(opts ...BenchmarkJobOption) (benchmarkJobTemplate, error) {
jobTmpl := new(benchmarkJobTemplate)
for _, opt := range append(defaultBenchmarkJobOpts, opts...) {
err := opt(jobTmpl)
if err != nil {
return *jobTmpl, err
}
}
jobTmpl.Spec.Template.Spec.Containers = []corev1.Container{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
{
Name: ContainerName,
Image: ContainerImage,
ImagePullPolicy: corev1.PullAlways,
LivenessProbe: &corev1.Probe{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
InitialDelaySeconds: int32(60),
vankichi marked this conversation as resolved.
Show resolved Hide resolved
PeriodSeconds: int32(10),
vankichi marked this conversation as resolved.
Show resolved Hide resolved
TimeoutSeconds: int32(300),
vankichi marked this conversation as resolved.
Show resolved Hide resolved
ProbeHandler: corev1.ProbeHandler{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
Exec: &corev1.ExecAction{
Command: []string{
"/go/bin/job",
"-v",
},
},
},
},
StartupProbe: &corev1.Probe{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
FailureThreshold: int32(30),
vankichi marked this conversation as resolved.
Show resolved Hide resolved
PeriodSeconds: int32(10),
vankichi marked this conversation as resolved.
Show resolved Hide resolved
TimeoutSeconds: int32(300),
vankichi marked this conversation as resolved.
Show resolved Hide resolved
ProbeHandler: corev1.ProbeHandler{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
Exec: &corev1.ExecAction{
Command: []string{
"/go/bin/job",
"-v",
},
},
},
},
Ports: []corev1.ContainerPort{
{
Name: "liveness",
Protocol: corev1.ProtocolTCP,
ContainerPort: int32(3000),
vankichi marked this conversation as resolved.
Show resolved Hide resolved
},
{
Name: "readiness",
Protocol: corev1.ProtocolTCP,
ContainerPort: int32(3001),
vankichi marked this conversation as resolved.
Show resolved Hide resolved
},
},
Env: []corev1.EnvVar{
{
Name: "CRD_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
FieldRef: &corev1.ObjectFieldSelector{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
FieldPath: "metadata.namespace",
},
},
},
{
Name: "CRD_NAME",
ValueFrom: &corev1.EnvVarSource{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
FieldRef: &corev1.ObjectFieldSelector{
vankichi marked this conversation as resolved.
Show resolved Hide resolved
FieldPath: "metadata.labels['job-name']",
},
},
},
},
},
}
return *jobTmpl, nil
}
114 changes: 114 additions & 0 deletions internal/k8s/vald/benchmark/job/job_template_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//
// Copyright (C) 2019-2022 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package job

import (
"github.com/vdaas/vald/internal/k8s"
corev1 "k8s.io/api/core/v1"
)

// BenchmarkJobOption represents the option for create benchmark job template.
type BenchmarkJobOption func(b *benchmarkJobTemplate) error

var defaultBenchmarkJobOpts = []BenchmarkJobOption{
WithSvcAccountName(SvcAccountName),
WithRestartPolicy(RestartPolicyNever),
}

// WithSvcAccountName sets the service account name for benchmark job.
func WithSvcAccountName(name string) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if len(name) > 0 {
b.Spec.Template.Spec.ServiceAccountName = name
}
return nil
}
}

// WithRestartPolicy sets the job restart policy for benchmark job.
func WithRestartPolicy(rp corev1.RestartPolicy) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if len(rp) > 0 {
b.Spec.Template.Spec.RestartPolicy = rp
}
return nil
}
}

// WithBackoffLimit sets the job backoff limit for benchmark job.
func WithBackoffLimit(bo int32) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
b.Spec.BackoffLimit = &bo
return nil
}
}

// WithName sets the job name of benchmark job.
func WithName(name string) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
b.Name = name
return nil
}
}

// WithNamespace specify namespace where job will execute.
func WithNamespace(ns string) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
b.Namespace = ns
return nil
}
}

// WithOwnerRef sets the OwnerReference to the job resource.
func WithOwnerRef(refs []k8s.OwnerReference) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if len(refs) > 0 {
b.OwnerReferences = refs
}
return nil
}
}

// WithCompletions sets the job completion.
func WithCompletions(com int32) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if com > 1 {
b.Spec.Completions = &com
}
return nil
}
}

// WithParallelism sets the job parallelism.
func WithParallelism(parallelism int32) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if parallelism > 1 {
b.Spec.Parallelism = &parallelism
}
return nil
}
}

// WithLabel sets the label to the job resource.
func WithLabel(label map[string]string) BenchmarkJobOption {
return func(b *benchmarkJobTemplate) error {
if len(label) > 0 {
b.Labels = label
}
return nil
}
}
16 changes: 12 additions & 4 deletions pkg/tools/benchmark/job/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,15 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) {

// Get config from applied ValdBenchmarkJob custom resource
var jobResource v1.ValdBenchmarkJob
c, err := client.New(client.WithSchemeBuilder(*v1.SchemeBuilder))
if err != nil {
log.Warn(err.Error())
if cfg.Job.Client == nil {
c, err := client.New(client.WithSchemeBuilder(*v1.SchemeBuilder))
if err != nil {
log.Error(err.Error())
return nil, err
}
cfg.Job.Client = c
}
err = c.Get(ctx, NAME, NAMESPACE, &jobResource)
err = cfg.Job.Client.Get(ctx, NAME, NAMESPACE, &jobResource)
if err != nil {
log.Warn(err.Error())
} else {
Expand All @@ -105,6 +109,10 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) {
cfg.Job.SearchConfig = jobResource.Spec.SearchConfig
cfg.Job.RemoveConfig = jobResource.Spec.RemoveConfig
cfg.Job.ClientConfig = jobResource.Spec.ClientConfig
if annotations := jobResource.GetAnnotations(); annotations != nil {
cfg.Job.BeforeJobName = annotations["before-job-name"]
cfg.Job.BeforeJobNamespace = annotations["before-job-namespace"]
}
}

return cfg, nil
Expand Down
Loading