Skip to content

Commit

Permalink
✅ add benchmark operator reconcile test
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi committed Dec 11, 2023
1 parent 653430a commit 30d8249
Show file tree
Hide file tree
Showing 11 changed files with 4,644 additions and 6 deletions.
3 changes: 3 additions & 0 deletions internal/k8s/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type reconciler struct {
// Job is a type alias for the k8s job definition.
type Job = batchv1.Job

// JobStatus is a type alias for the k8s job status definition.
type JobStatus = batchv1.JobStatus

// New returns the JobWatcher that implements reconciliation loop, or any errors occurred.
func New(opts ...Option) (JobWatcher, error) {
r := &reconciler{
Expand Down
67 changes: 67 additions & 0 deletions internal/test/mock/controller_runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mock

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type MockSubResourceWriter struct {
client.SubResourceWriter
}

func (s *MockSubResourceWriter) Update(context.Context, client.Object, ...client.SubResourceUpdateOption) error {
return nil
}

type MockClient struct {
client.Client
}

func (*MockClient) Status() client.SubResourceWriter {
s := MockSubResourceWriter{
SubResourceWriter: &MockSubResourceWriter{},
}
return s.SubResourceWriter
}

func (*MockClient) Get(context.Context, client.ObjectKey, client.Object, ...client.GetOption) error {
return nil
}

func (*MockClient) Create(context.Context, client.Object, ...client.CreateOption) error {
return nil
}

func (*MockClient) Delete(context.Context, client.Object, ...client.DeleteOption) error {
return nil
}

func (*MockClient) DeleteAllOf(context.Context, client.Object, ...client.DeleteAllOfOption) error {
return nil
}

type MockManager struct {
manager.Manager
}

func (m *MockManager) GetClient() client.Client {
c := &MockClient{
Client: &MockClient{},
}
return c.Client
}
7 changes: 7 additions & 0 deletions pkg/tools/benchmark/job/service/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (j *job) insert(ctx context.Context, ech chan error) error {
case ech <- err:
}
}
// loopCnt represents the quotient of iter divided by the len(vecs).
// This is to account for when iter exceeds len(vecs).
// It is used to calculate idx to determine which index of vecs to access.
// idx takes between <0, len(vecs)-1>.
loopCnt := math.Floor(float64(iter-1) / float64(len(vecs)))
idx := iter - 1 - (len(vecs) * int(loopCnt))
res, err := j.client.Insert(egctx, &payload.Insert_Request{
Expand All @@ -71,6 +75,9 @@ func (j *job) insert(ctx context.Context, ech chan error) error {
log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err())
return errors.Join(err, egctx.Err())
default:
// TODO: count up error for observe benchmark job
// We should wait for refactoring internal/o11y.
log.Errorf("[benchmark job] err: %s", err.Error())
}
}
// TODO: send metrics to the Prometeus
Expand Down
2 changes: 2 additions & 0 deletions pkg/tools/benchmark/job/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (j *job) Start(ctx context.Context) (<-chan error, error) {
cech, err := j.client.Start(ctx)
if err != nil {
log.Error("[benchmark job] failed to start connection monitor")
close(ech)
return nil, err
}
j.eg.Go(func() error {
Expand All @@ -280,6 +281,7 @@ func (j *job) Start(ctx context.Context) (<-chan error, error) {
case ech <- err:
}
}
close(ech)
if err := p.Signal(syscall.SIGTERM); err != nil {
log.Error(err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/tools/benchmark/job/service/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (j *job) exists(ctx context.Context, ech chan error) error {
log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err())
return nil
default:
// TODO: count up error for observe benchmark job
// We should wait for refactoring internal/o11y.
log.Errorf("[benchmark job] err: %s", err.Error())
}
}
log.Debugf("[benchmark job] Finish exists: iter= %d \n%v\n", idx, res)
Expand Down Expand Up @@ -115,6 +118,9 @@ func (j *job) getObject(ctx context.Context, ech chan error) error {
log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err())
return nil
default:
// TODO: count up error for observe benchmark job
// We should wait for refactoring internal/o11y.
log.Errorf("[benchmark job] err: %s", err.Error())
}
}
if res != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/tools/benchmark/job/service/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func (j *job) remove(ctx context.Context, ech chan error) error {
log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err())
return errors.Join(err, egctx.Err())
default:
// TODO: count up error for observe benchmark job
// We should wait for refactoring internal/o11y.
log.Errorf("[benchmark job] err: %s", err.Error())
}
}
log.Debugf("[benchmark job] Finish remove: iter= %d \n%v", idx, res)
Expand Down
14 changes: 14 additions & 0 deletions pkg/tools/benchmark/job/service/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (j *job) search(ctx context.Context, ech chan error) error {
case ech <- err:
}
}
// loopCnt represents the quotient of iter divided by the len(vecs).
// This is to account for when iter exceeds len(vecs).
// It is used to calculate idx to determine which index of vecs to access.
// idx takes between <0, len(vecs)-1>.
loopCnt := math.Floor(float64(iter-1) / float64(len(vecs)))
idx := iter - 1 - (len(vecs) * int(loopCnt))
if len(vecs[idx]) != j.dimension {
Expand All @@ -81,6 +85,9 @@ func (j *job) search(ctx context.Context, ech chan error) error {
log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err())
return nil
default:
// TODO: count up error for observe benchmark job
// We should wait for refactoring internal/o11y.
log.Errorf("[benchmark job] err: %s", err.Error())
}
}
if res != nil && j.searchConfig.EnableLinearSearch {
Expand Down Expand Up @@ -116,6 +123,10 @@ func (j *job) search(ctx context.Context, ech chan error) error {
}
}
log.Debugf("[benchmark job] Start linear search: iter = %d", iter)
// loopCnt represents the quotient of iter divided by the len(vecs).
// This is to account for when iter exceeds len(vecs).
// It is used to calculate idx to determine which index of vecs to access.
// idx takes between <0, len(vecs)-1>.
loopCnt := math.Floor(float64(i-1) / float64(len(vecs)))
idx := iter - 1 - (len(vecs) * int(loopCnt))
if len(vecs[idx]) != j.dimension {
Expand All @@ -132,6 +143,9 @@ func (j *job) search(ctx context.Context, ech chan error) error {
log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err())
return errors.Join(err, egctx.Err())
default:
// TODO: count up error for observe benchmark job
// We should wait for refactoring internal/o11y.
log.Errorf("[benchmark job] err: %s", err.Error())
}
}
if res != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/tools/benchmark/job/service/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (j *job) update(ctx context.Context, ech chan error) error {
case ech <- err:
}
}
// loopCnt represents the quotient of iter divided by the len(vecs).
// This is to account for when iter exceeds len(vecs).
// It is used to calculate idx to determine which index of vecs to access.
// idx takes between <0, len(vecs)-1>.
loopCnt := math.Floor(float64(iter-1) / float64(len(vecs)))
idx := iter - 1 - (len(vecs) * int(loopCnt))
res, err := j.client.Update(egctx, &payload.Update_Request{
Expand All @@ -72,6 +76,9 @@ func (j *job) update(ctx context.Context, ech chan error) error {
log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err())
return errors.Join(err, egctx.Err())
default:
// TODO: count up error for observe benchmark job
// We should wait for refactoring internal/o11y.
log.Errorf("[benchmark job] err: %s", err.Error())
}
}
if res != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/tools/benchmark/job/service/upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (j *job) upsert(ctx context.Context, ech chan error) error {
case ech <- err:
}
}
// loopCnt represents the quotient of iter divided by the len(vecs).
// This is to account for when iter exceeds len(vecs).
// It is used to calculate idx to determine which index of vecs to access.
// idx takes between <0, len(vecs)-1>.
loopCnt := math.Floor(float64(iter-1) / float64(len(vecs)))
idx := iter - 1 - (len(vecs) * int(loopCnt))
res, err := j.client.Upsert(egctx, &payload.Upsert_Request{
Expand All @@ -72,6 +76,9 @@ func (j *job) upsert(ctx context.Context, ech chan error) error {
log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err())
return errors.Join(err, egctx.Err())
default:
// TODO: count up error for observe benchmark job
// We should wait for refactoring internal/o11y.
log.Errorf("[benchmark job] err: %s", err.Error())
}
}
if res != nil {
Expand Down
25 changes: 19 additions & 6 deletions pkg/tools/benchmark/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ type operator struct {
jobNamespace string
jobImage string
jobImagePullPolicy string
scenarios atomic.Pointer[map[string]*scenario]
benchjobs atomic.Pointer[map[string]*v1.ValdBenchmarkJob]
jobs atomic.Pointer[map[string]string]
scenarios *atomic.Pointer[map[string]*scenario]
benchjobs *atomic.Pointer[map[string]*v1.ValdBenchmarkJob]
jobs *atomic.Pointer[map[string]string]
rcd time.Duration // reconcile check duration
eg errgroup.Group
ctrl k8s.Controller
Expand Down Expand Up @@ -137,20 +137,32 @@ func (o *operator) initCtrl() error {
}

func (o *operator) getAtomicScenario() map[string]*scenario {
if o.scenarios == nil {
o.scenarios = &atomic.Pointer[map[string]*scenario]{}
return nil
}
if v := o.scenarios.Load(); v != nil {
return *(v)
}
return nil
}

func (o *operator) getAtomicBenchJob() map[string]*v1.ValdBenchmarkJob {
if o.benchjobs == nil {
o.benchjobs = &atomic.Pointer[map[string]*v1.ValdBenchmarkJob]{}
return nil
}
if v := o.benchjobs.Load(); v != nil {
return *(v)
}
return nil
}

func (o *operator) getAtomicJob() map[string]string {
if o.jobs == nil {
o.jobs = &atomic.Pointer[map[string]string]{}
return nil
}
if v := o.jobs.Load(); v != nil {
return *(v)
}
Expand All @@ -171,7 +183,7 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo
if cjobs == nil {
cjobs = map[string]string{}
}
// jobStatus is used for update benchmark job resource status
// benchmarkJobStatus is used for update benchmark job resource status
benchmarkJobStatus := make(map[string]v1.BenchmarkJobStatus)
// jobNames is used for check whether cjobs has delted job.
// If cjobs has the delted job, it will be remove the end of jobReconcile function.
Expand Down Expand Up @@ -215,7 +227,7 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo
log.Debug("[reconcile job] finish")
}

// benchmarkJobReconcile gets the vald benchmark job resource list and create Job for running benchmark job.
// benchJobReconcile gets the vald benchmark job resource list and create Job for running benchmark job.
func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[string]v1.ValdBenchmarkJob) {
log.Debugf("[reconcile benchmark job resource] job list: %#v", benchJobList)
if len(benchJobList) == 0 {
Expand All @@ -228,6 +240,7 @@ func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[strin
if cbjl == nil {
cbjl = make(map[string]*v1.ValdBenchmarkJob, 0)
}
// jobStatus is used for update benchmarkJob CR status if updating is needed.
jobStatus := make(map[string]v1.BenchmarkJobStatus)
for k := range benchJobList {
// update scenario status
Expand Down Expand Up @@ -475,7 +488,7 @@ func (o *operator) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) error
benchjob.WithTTLSecondsAfterFinished(int32(bjr.Spec.TTLSecondsAfterFinished)),
)
if err != nil {
return nil
return err
}
// create job
c := o.ctrl.GetManager().GetClient()
Expand Down
Loading

0 comments on commit 30d8249

Please sign in to comment.