From 7b7c72fdc9901226b3f9b70784a0c0b40e1c38b8 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 23 Jan 2024 07:55:06 +0000 Subject: [PATCH 01/10] Add rotate-all option to rotator --- internal/k8s/client/client.go | 1 + .../job/readreplica/rotate/service/rotator.go | 155 ++++++++++++------ 2 files changed, 106 insertions(+), 50 deletions(-) diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 65b05c5696..4108bc1a9b 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -52,6 +52,7 @@ const ( DeletePropagationBackground = metav1.DeletePropagationBackground WatchDeletedEvent = watch.Deleted SelectionOpEquals = selection.Equals + SelectionOpExists = selection.Exists ) type Client interface { diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index 5a1fca4a34..b03fd77585 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -25,6 +25,7 @@ import ( "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" + "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync/errgroup" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -33,7 +34,8 @@ import ( ) const ( - apiName = "vald/index/job/readreplica/rotate" + apiName = "vald/index/job/readreplica/rotate" + rotateAllId = "rotate-all" ) // Rotator represents an interface for indexing. @@ -45,9 +47,13 @@ type rotator struct { namespace string volumeName string readReplicaLabelKey string - readReplicaID string - client client.Client - listOpts client.ListOptions + subProcesses []subProcess +} + +type subProcess struct { + listOpts client.ListOptions + client client.Client + volumeName string } // New returns Indexer object if no error occurs. @@ -57,7 +63,6 @@ func New(replicaID string, opts ...Option) (Rotator, error) { if replicaID == "" { return nil, fmt.Errorf("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") } - r.readReplicaID = replicaID for _, opt := range append(defaultOpts, opts...) { if err := opt(r); err != nil { @@ -75,17 +80,44 @@ func New(replicaID string, opts ...Option) (Rotator, error) { if err != nil { return nil, fmt.Errorf("failed to create kubernetes client: %w", err) } - r.client = c - selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{r.readReplicaID}) - if err != nil { - return nil, err - } - r.listOpts = client.ListOptions{ - Namespace: r.namespace, - LabelSelector: selector, - } + if replicaID == rotateAllId { + var deploymentList appsv1.DeploymentList + selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpExists, []string{}) + if err != nil { + return nil, err + } + c.List(context.Background(), &deploymentList, &client.ListOptions{ + Namespace: r.namespace, + LabelSelector: selector, + }) + + deployments := deploymentList.Items + if len(deployments) == 0 { + return nil, fmt.Errorf("no read replica found to rotate") + } + + var ids []string + for _, deployment := range deployments { + ids = append(ids, deployment.Labels[r.readReplicaLabelKey]) + } + + for _, id := range ids { + sub, err := r.newSubprocess(c, id) + if err != nil { + return nil, fmt.Errorf("failed to create rotator subprocess: %w", err) + } + + r.subProcesses = append(r.subProcesses, sub) + } + } else { + sub, err := r.newSubprocess(c, replicaID) + if err != nil { + return nil, fmt.Errorf("failed to create rotator subprocess: %w", err) + } + r.subProcesses = append(r.subProcesses, sub) + } return r, nil } @@ -98,58 +130,81 @@ func (r *rotator) Start(ctx context.Context) error { } }() - if err := r.rotate(ctx); err != nil { - if span != nil { - span.RecordError(err) - span.SetStatus(trace.StatusError, err.Error()) - } - return err + eg, ectx := errgroup.New(ctx) + for _, sub := range r.subProcesses { + s := sub + eg.Go(safety.RecoverFunc(func() (err error) { + if err := s.rotate(ectx); err != nil { + if span != nil { + span.RecordError(err) + span.SetStatus(trace.StatusError, err.Error()) + } + return err + } + return nil + })) } - return nil + return eg.Wait() +} + +func (r *rotator) newSubprocess(c client.Client, replicaID string) (subProcess, error) { + selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{replicaID}) + if err != nil { + return subProcess{}, err + } + sub := subProcess{ + client: c, + listOpts: client.ListOptions{ + Namespace: r.namespace, + LabelSelector: selector, + }, + volumeName: r.volumeName, + } + return sub, nil } -func (r *rotator) rotate(ctx context.Context) error { +func (s *subProcess) rotate(ctx context.Context) error { // get deployment here to pass to create methods of snapshot and pvc // and put it as owner reference of them so that they will be deleted when the deployment is deleted - deployment, err := r.getDeployment(ctx) + deployment, err := s.getDeployment(ctx) if err != nil { log.Errorf("failed to get Deployment.") return err } - newSnap, oldSnap, err := r.createSnapshot(ctx, deployment) + newSnap, oldSnap, err := s.createSnapshot(ctx, deployment) if err != nil { return err } - newPvc, oldPvc, err := r.createPVC(ctx, newSnap.GetName(), deployment) + newPvc, oldPvc, err := s.createPVC(ctx, newSnap.GetName(), deployment) if err != nil { log.Errorf("failed to create PVC. removing the new snapshot(%s)...", newSnap.GetName()) - if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil { + if dserr := s.deleteSnapshot(ctx, newSnap); dserr != nil { errors.Join(err, dserr) } return err } - err = r.updateDeployment(ctx, newPvc.GetName(), deployment) + err = s.updateDeployment(ctx, newPvc.GetName(), deployment) if err != nil { log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName()) - if dperr := r.deletePVC(ctx, newPvc); dperr != nil { + if dperr := s.deletePVC(ctx, newPvc); dperr != nil { errors.Join(err, dperr) } - if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil { + if dserr := s.deleteSnapshot(ctx, newSnap); dserr != nil { errors.Join(err, dserr) } return err } - err = r.deleteSnapshot(ctx, oldSnap) + err = s.deleteSnapshot(ctx, oldSnap) if err != nil { return err } - err = r.deletePVC(ctx, oldPvc) + err = s.deletePVC(ctx, oldPvc) if err != nil { return err } @@ -157,9 +212,9 @@ func (r *rotator) rotate(ctx context.Context) error { return nil } -func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) { +func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) { list := snapshotv1.VolumeSnapshotList{} - if err := r.client.List(ctx, &list, &r.listOpts); err != nil { + if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, nil, fmt.Errorf("failed to get snapshot: %w", err) } if len(list.Items) == 0 { @@ -193,7 +248,7 @@ func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deploym log.Infof("creating new snapshot(%s)...", newSnap.GetName()) log.Debugf("snapshot detail: %#v", newSnap) - err = r.client.Create(ctx, newSnap) + err = s.client.Create(ctx, newSnap) if err != nil { return nil, nil, fmt.Errorf("failed to create snapshot: %w", err) } @@ -201,9 +256,9 @@ func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deploym return newSnap, oldSnap, nil } -func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) { +func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) { list := v1.PersistentVolumeClaimList{} - if err := r.client.List(ctx, &list, &r.listOpts); err != nil { + if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, nil, fmt.Errorf("failed to get PVC: %w", err) } if len(list.Items) == 0 { @@ -248,16 +303,16 @@ func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment log.Infof("creating new pvc(%s)...", newPvc.GetName()) log.Debugf("pvc detail: %#v", newPvc) - if err := r.client.Create(ctx, newPvc); err != nil { + if err := s.client.Create(ctx, newPvc); err != nil { return nil, nil, fmt.Errorf("failed to create PVC(%s): %w", newPvc.GetName(), err) } return newPvc, oldPvc, nil } -func (r *rotator) getDeployment(ctx context.Context) (*appsv1.Deployment, error) { +func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, error) { list := appsv1.DeploymentList{} - if err := r.client.List(ctx, &list, &r.listOpts); err != nil { + if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, fmt.Errorf("failed to get deployment through client: %w", err) } if len(list.Items) == 0 { @@ -267,14 +322,14 @@ func (r *rotator) getDeployment(ctx context.Context) (*appsv1.Deployment, error) return &list.Items[0], nil } -func (r *rotator) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error { +func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error { if deployment.Spec.Template.ObjectMeta.Annotations == nil { deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{} } deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339) for _, vol := range deployment.Spec.Template.Spec.Volumes { - if vol.Name == r.volumeName { + if vol.Name == s.volumeName { vol.PersistentVolumeClaim.ClaimName = newPVC } } @@ -282,19 +337,19 @@ func (r *rotator) updateDeployment(ctx context.Context, newPVC string, deploymen log.Infof("updating deployment(%s)...", deployment.GetName()) log.Debugf("deployment detail: %#v", deployment) - if err := r.client.Update(ctx, deployment); err != nil { + if err := s.client.Update(ctx, deployment); err != nil { return fmt.Errorf("failed to update deployment: %w", err) } return nil } -func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error { - watcher, err := r.client.Watch(ctx, +func (s *subProcess) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error { + watcher, err := s.client.Watch(ctx, &snapshotv1.VolumeSnapshotList{ Items: []snapshotv1.VolumeSnapshot{*snapshot}, }, - &r.listOpts, + &s.listOpts, ) if err != nil { return fmt.Errorf("failed to watch snapshot(%s): %w", snapshot.GetName(), err) @@ -320,18 +375,18 @@ func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.Volum } }) - if err := r.client.Delete(ctx, snapshot); err != nil { + if err := s.client.Delete(ctx, snapshot); err != nil { return fmt.Errorf("failed to delete snapshot: %w", err) } return eg.Wait() } -func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error { - watcher, err := r.client.Watch(ctx, +func (s *subProcess) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error { + watcher, err := s.client.Watch(ctx, &v1.PersistentVolumeClaimList{ Items: []v1.PersistentVolumeClaim{*pvc}, }, - &r.listOpts, + &s.listOpts, ) if err != nil { return fmt.Errorf("failed to watch PVC: %w", err) @@ -357,7 +412,7 @@ func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) } }) - if err := r.client.Delete(ctx, pvc); err != nil { + if err := s.client.Delete(ctx, pvc); err != nil { return fmt.Errorf("failed to delete PVC(%s): %w", pvc.GetName(), err) } From 3492aea244072ea39339c2dc6c58c3484553ccc9 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Wed, 24 Jan 2024 09:49:15 +0900 Subject: [PATCH 02/10] Fix linter warnings --- pkg/index/job/readreplica/rotate/service/rotator.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index b03fd77585..be676f983d 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -35,7 +35,7 @@ import ( const ( apiName = "vald/index/job/readreplica/rotate" - rotateAllId = "rotate-all" + rotateAllID = "rotate-all" ) // Rotator represents an interface for indexing. @@ -81,7 +81,7 @@ func New(replicaID string, opts ...Option) (Rotator, error) { return nil, fmt.Errorf("failed to create kubernetes client: %w", err) } - if replicaID == rotateAllId { + if replicaID == rotateAllID { var deploymentList appsv1.DeploymentList selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpExists, []string{}) if err != nil { @@ -98,7 +98,8 @@ func New(replicaID string, opts ...Option) (Rotator, error) { } var ids []string - for _, deployment := range deployments { + for i := range deployments { + deployment := &deployments[i] ids = append(ids, deployment.Labels[r.readReplicaLabelKey]) } From 73699c18bd850af282ad1c990fc4d75b797ff695 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Sun, 28 Jan 2024 10:09:30 +0000 Subject: [PATCH 03/10] Make it possible to choose multiple rotation target --- .../job/readreplica/rotate/service/rotator.go | 76 ++++++++++--------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index be676f983d..4c8dfccf91 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -57,13 +57,10 @@ type subProcess struct { } // New returns Indexer object if no error occurs. +// replicaID must be a comma separated string of replica id or ${rotateAllID} to rotate all read replica at once. func New(replicaID string, opts ...Option) (Rotator, error) { r := new(rotator) - if replicaID == "" { - return nil, fmt.Errorf("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") - } - for _, opt := range append(defaultOpts, opts...) { if err := opt(r); err != nil { oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) @@ -81,38 +78,13 @@ func New(replicaID string, opts ...Option) (Rotator, error) { return nil, fmt.Errorf("failed to create kubernetes client: %w", err) } - if replicaID == rotateAllID { - var deploymentList appsv1.DeploymentList - selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpExists, []string{}) - if err != nil { - return nil, err - } - c.List(context.Background(), &deploymentList, &client.ListOptions{ - Namespace: r.namespace, - LabelSelector: selector, - }) - - deployments := deploymentList.Items - if len(deployments) == 0 { - return nil, fmt.Errorf("no read replica found to rotate") - } - - var ids []string - for i := range deployments { - deployment := &deployments[i] - ids = append(ids, deployment.Labels[r.readReplicaLabelKey]) - } - - for _, id := range ids { - sub, err := r.newSubprocess(c, id) - if err != nil { - return nil, fmt.Errorf("failed to create rotator subprocess: %w", err) - } + ids, err := r.parseReplicaID(replicaID, c) + if err != nil { + return nil, err + } - r.subProcesses = append(r.subProcesses, sub) - } - } else { - sub, err := r.newSubprocess(c, replicaID) + for _, id := range ids { + sub, err := r.newSubprocess(c, id) if err != nil { return nil, fmt.Errorf("failed to create rotator subprocess: %w", err) } @@ -317,7 +289,7 @@ func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, err return nil, fmt.Errorf("failed to get deployment through client: %w", err) } if len(list.Items) == 0 { - return nil, fmt.Errorf("no deployment found") + return nil, fmt.Errorf("no deployment found with the label(%s)", s.listOpts.LabelSelector) } return &list.Items[0], nil @@ -431,3 +403,35 @@ func getNewBaseName(old string) string { } return newNameBase } + +func (r *rotator) parseReplicaID(replicaID string, c client.Client) ([]string, error) { + if replicaID == "" { + return nil, fmt.Errorf("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") + } + + if replicaID == rotateAllID { + var deploymentList appsv1.DeploymentList + selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpExists, []string{}) + if err != nil { + return nil, err + } + c.List(context.Background(), &deploymentList, &client.ListOptions{ + Namespace: r.namespace, + LabelSelector: selector, + }) + + deployments := deploymentList.Items + if len(deployments) == 0 { + return nil, fmt.Errorf("no read replica found to rotate") + } + + var ids []string + for i := range deployments { + deployment := &deployments[i] + ids = append(ids, deployment.Labels[r.readReplicaLabelKey]) + } + return ids, nil + } + + return strings.Split(replicaID, ","), nil +} From 08de607f2e0d587c4682b00361d95f7bb638dd2d Mon Sep 17 00:00:00 2001 From: "yusuke.kadowaki" Date: Tue, 20 Feb 2024 13:47:12 +0900 Subject: [PATCH 04/10] Add tests for parseReplicaID --- internal/errors/rotator.go | 23 +++++++ .../job/readreplica/rotate/service/rotator.go | 8 ++- .../rotate/service/rotator_test.go | 64 +++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 internal/errors/rotator.go diff --git a/internal/errors/rotator.go b/internal/errors/rotator.go new file mode 100644 index 0000000000..4dc1955fed --- /dev/null +++ b/internal/errors/rotator.go @@ -0,0 +1,23 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package errors provides error types and function +package errors + +var ( + // ErrReadReplicaIDEmpty represents error when trying to rotate agents with empty replicaID + ErrReadReplicaIDEmpty = New("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") +) diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index 84b3dc5b4b..c478e12965 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -406,7 +406,7 @@ func getNewBaseName(old string) string { func (r *rotator) parseReplicaID(replicaID string, c client.Client) ([]string, error) { if replicaID == "" { - return nil, fmt.Errorf("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") + return nil, errors.ErrReadReplicaIDEmpty } if replicaID == rotateAllID { @@ -415,10 +415,12 @@ func (r *rotator) parseReplicaID(replicaID string, c client.Client) ([]string, e if err != nil { return nil, err } - c.List(context.Background(), &deploymentList, &client.ListOptions{ + if err := c.List(context.Background(), &deploymentList, &client.ListOptions{ Namespace: r.namespace, LabelSelector: selector, - }) + }); err != nil { + return nil, fmt.Errorf("failed to List deployments in parseReplicaID: %w", err) + } deployments := deploymentList.Items if len(deployments) == 0 { diff --git a/pkg/index/job/readreplica/rotate/service/rotator_test.go b/pkg/index/job/readreplica/rotate/service/rotator_test.go index 3a975ae0f2..8ed40cb13a 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator_test.go +++ b/pkg/index/job/readreplica/rotate/service/rotator_test.go @@ -15,6 +15,10 @@ package service import ( "testing" + + "github.com/stretchr/testify/require" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/k8s/client" ) func Test_getNewBaseName(t *testing.T) { @@ -77,6 +81,66 @@ func Test_getNewBaseName(t *testing.T) { } } +func Test_parseReplicaID(t *testing.T) { + type args struct { + replicaID string + c client.Client + } + type want struct { + ids []string + err error + } + tests := []struct { + name string + args args + want want + }{ + { + name: "single replicaID", + args: args{ + replicaID: "0", + c: nil, + }, + want: want{ + ids: []string{"0"}, + err: nil, + }, + }, + { + name: "multiple replicaIDs", + args: args{ + replicaID: "0,1", + c: nil, + }, + want: want{ + ids: []string{"0", "1"}, + err: nil, + }, + }, + { + name: "returns error when replicaID is empty", + args: args{ + replicaID: "", + c: nil, + }, + want: want{ + ids: nil, + err: errors.ErrReadReplicaIDEmpty, + }, + }, + } + for _, test := range tests { + tt := test + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + r := &rotator{} + ids, err := r.parseReplicaID(tt.args.replicaID, tt.args.c) + require.Equal(t, tt.want.ids, ids) + require.Equal(t, tt.want.err, err) + }) + } +} + // NOT IMPLEMENTED BELOW // // func TestNew(t *testing.T) { From 034b25b8f70267841b28d9805c2b61f995d86d04 Mon Sep 17 00:00:00 2001 From: "yusuke.kadowaki" Date: Tue, 20 Feb 2024 16:21:09 +0900 Subject: [PATCH 05/10] Add tests for parseReplicaID --- internal/test/mock/k8s/client.go | 59 +++++++++++++++++++ .../rotate/service/rotator_test.go | 53 ++++++++++++++++- 2 files changed, 109 insertions(+), 3 deletions(-) create mode 100644 internal/test/mock/k8s/client.go diff --git a/internal/test/mock/k8s/client.go b/internal/test/mock/k8s/client.go new file mode 100644 index 0000000000..04abcea2a5 --- /dev/null +++ b/internal/test/mock/k8s/client.go @@ -0,0 +1,59 @@ +package k8s + +import ( + "context" + + "github.com/stretchr/testify/mock" + "github.com/vdaas/vald/internal/k8s/client" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/watch" + crclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ValdK8sClientMock struct { + mock.Mock +} + +var _ client.Client = (*ValdK8sClientMock)(nil) + +func (m *ValdK8sClientMock) Get(ctx context.Context, name string, namespace string, obj client.Object, opts ...crclient.GetOption) error { + args := m.Called(ctx, name, namespace, obj, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) List(ctx context.Context, list crclient.ObjectList, opts ...client.ListOption) error { + args := m.Called(ctx, list, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + args := m.Called(ctx, obj, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Delete(ctx context.Context, obj client.Object, opts ...crclient.DeleteOption) error { + args := m.Called(ctx, obj, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Update(ctx context.Context, obj client.Object, opts ...crclient.UpdateOption) error { + args := m.Called(ctx, obj, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Patch(ctx context.Context, obj client.Object, patch crclient.Patch, opts ...crclient.PatchOption) error { + args := m.Called(ctx, obj, patch, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Watch(ctx context.Context, obj crclient.ObjectList, opts ...client.ListOption) (watch.Interface, error) { + args := m.Called(ctx, obj, opts) + return args.Get(0).(watch.Interface), args.Error(1) +} + +func (m *ValdK8sClientMock) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) { + args := m.Called(key, op, vals) + return args.Get(0).(labels.Selector), args.Error(1) +} diff --git a/pkg/index/job/readreplica/rotate/service/rotator_test.go b/pkg/index/job/readreplica/rotate/service/rotator_test.go index 8ed40cb13a..f47e479a57 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator_test.go +++ b/pkg/index/job/readreplica/rotate/service/rotator_test.go @@ -16,9 +16,15 @@ package service import ( "testing" + tmock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/test/mock/k8s" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" ) func Test_getNewBaseName(t *testing.T) { @@ -82,6 +88,7 @@ func Test_getNewBaseName(t *testing.T) { } func Test_parseReplicaID(t *testing.T) { + labelKey := "foo" type args struct { replicaID string c client.Client @@ -90,11 +97,12 @@ func Test_parseReplicaID(t *testing.T) { ids []string err error } - tests := []struct { + type test struct { name string args args want want - }{ + } + tests := []test{ { name: "single replicaID", args: args{ @@ -128,12 +136,51 @@ func Test_parseReplicaID(t *testing.T) { err: errors.ErrReadReplicaIDEmpty, }, }, + func() test { + wantId1 := "bar" + wantId2 := "baz" + mock := &k8s.ValdK8sClientMock{} + mock.On("LabelSelector", tmock.Anything, tmock.Anything, tmock.Anything).Return(labels.NewSelector(), nil) + mock.On("List", tmock.Anything, tmock.Anything, tmock.Anything).Run(func(args tmock.Arguments) { + if depList, ok := args.Get(1).(*appsv1.DeploymentList); ok { + depList.Items = []appsv1.Deployment{ + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + labelKey: wantId1, + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + labelKey: wantId2, + }, + }, + }, + } + } + }).Return(nil) + return test{ + name: "returns all ids when rotate-all option is set", + args: args{ + replicaID: rotateAllID, + c: mock, + }, + want: want{ + ids: []string{wantId1, wantId2}, + err: nil, + }, + } + }(), } for _, test := range tests { tt := test t.Run(tt.name, func(t *testing.T) { t.Parallel() - r := &rotator{} + r := &rotator{ + readReplicaLabelKey: labelKey, + } ids, err := r.parseReplicaID(tt.args.replicaID, tt.args.c) require.Equal(t, tt.want.ids, ids) require.Equal(t, tt.want.err, err) From dd84674023b35bcaf954e581b5c2a38baa6a23f9 Mon Sep 17 00:00:00 2001 From: "yusuke.kadowaki" Date: Tue, 20 Feb 2024 16:33:43 +0900 Subject: [PATCH 06/10] Hide testify in internal --- internal/k8s/client/client.go | 5 +++++ internal/test/testify/testify.go | 13 +++++++++++++ .../rotate/service/rotator_test.go | 19 ++++++++----------- 3 files changed, 26 insertions(+), 11 deletions(-) create mode 100644 internal/test/testify/testify.go diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index c2bbd6f96b..1efcdf1915 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -23,6 +23,7 @@ import ( snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" "github.com/vdaas/vald/internal/errors" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,6 +53,9 @@ type ( MatchingLabels = cli.MatchingLabels InNamespace = cli.InNamespace VolumeSnapshot = snapshotv1.VolumeSnapshot + Deployment = appsv1.Deployment + DeploymentList = appsv1.DeploymentList + ObjectMeta = metav1.ObjectMeta ) const ( @@ -64,6 +68,7 @@ const ( var ( ServerSideApply = cli.Apply MergePatch = cli.Merge + NewSelector = labels.NewSelector ) type Client interface { diff --git a/internal/test/testify/testify.go b/internal/test/testify/testify.go new file mode 100644 index 0000000000..dae8031a81 --- /dev/null +++ b/internal/test/testify/testify.go @@ -0,0 +1,13 @@ +package testify + +import ( + "github.com/stretchr/testify/mock" +) + +type ( + Arguments = mock.Arguments +) + +const ( + Anything = mock.Anything +) diff --git a/pkg/index/job/readreplica/rotate/service/rotator_test.go b/pkg/index/job/readreplica/rotate/service/rotator_test.go index f47e479a57..e4558eab41 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator_test.go +++ b/pkg/index/job/readreplica/rotate/service/rotator_test.go @@ -16,15 +16,11 @@ package service import ( "testing" - tmock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/test/mock/k8s" - - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "github.com/vdaas/vald/internal/test/testify" ) func Test_getNewBaseName(t *testing.T) { @@ -140,19 +136,20 @@ func Test_parseReplicaID(t *testing.T) { wantId1 := "bar" wantId2 := "baz" mock := &k8s.ValdK8sClientMock{} - mock.On("LabelSelector", tmock.Anything, tmock.Anything, tmock.Anything).Return(labels.NewSelector(), nil) - mock.On("List", tmock.Anything, tmock.Anything, tmock.Anything).Run(func(args tmock.Arguments) { - if depList, ok := args.Get(1).(*appsv1.DeploymentList); ok { - depList.Items = []appsv1.Deployment{ + + mock.On("LabelSelector", testify.Anything, testify.Anything, testify.Anything).Return(client.NewSelector(), nil) + mock.On("List", testify.Anything, testify.Anything, testify.Anything).Run(func(args testify.Arguments) { + if depList, ok := args.Get(1).(*client.DeploymentList); ok { + depList.Items = []client.Deployment{ { - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: client.ObjectMeta{ Labels: map[string]string{ labelKey: wantId1, }, }, }, { - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: client.ObjectMeta{ Labels: map[string]string{ labelKey: wantId2, }, From a52b887db8c5ed8ceb02b40d17f99b2f130f8af9 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Tue, 20 Feb 2024 07:34:58 +0000 Subject: [PATCH 07/10] style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in dd84674 according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2305 --- internal/errors/rotator.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/errors/rotator.go b/internal/errors/rotator.go index 4dc1955fed..73253cb505 100644 --- a/internal/errors/rotator.go +++ b/internal/errors/rotator.go @@ -17,7 +17,5 @@ // Package errors provides error types and function package errors -var ( - // ErrReadReplicaIDEmpty represents error when trying to rotate agents with empty replicaID - ErrReadReplicaIDEmpty = New("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") -) +// ErrReadReplicaIDEmpty represents error when trying to rotate agents with empty replicaID +var ErrReadReplicaIDEmpty = New("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") From d0f8a62fc0d5b52cd8befae5a0ef610674385cbe Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 20 Feb 2024 15:04:56 +0000 Subject: [PATCH 08/10] Apply format --- internal/test/mock/k8s/client.go | 14 +++++++++++++- internal/test/testify/testify.go | 13 +++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/internal/test/mock/k8s/client.go b/internal/test/mock/k8s/client.go index 04abcea2a5..663b4b7f73 100644 --- a/internal/test/mock/k8s/client.go +++ b/internal/test/mock/k8s/client.go @@ -1,3 +1,16 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package k8s import ( @@ -5,7 +18,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/vdaas/vald/internal/k8s/client" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/watch" diff --git a/internal/test/testify/testify.go b/internal/test/testify/testify.go index dae8031a81..49a7a6193b 100644 --- a/internal/test/testify/testify.go +++ b/internal/test/testify/testify.go @@ -1,3 +1,16 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package testify import ( From b3442b822466f85eb52d25329094b3a6433964c9 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 20 Feb 2024 15:07:20 +0000 Subject: [PATCH 09/10] Lint --- internal/errors/rotator.go | 2 +- pkg/agent/core/faiss/service/option.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/errors/rotator.go b/internal/errors/rotator.go index 73253cb505..ebf498f975 100644 --- a/internal/errors/rotator.go +++ b/internal/errors/rotator.go @@ -17,5 +17,5 @@ // Package errors provides error types and function package errors -// ErrReadReplicaIDEmpty represents error when trying to rotate agents with empty replicaID +// ErrReadReplicaIDEmpty represents error when trying to rotate agents with empty replicaID. var ErrReadReplicaIDEmpty = New("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") diff --git a/pkg/agent/core/faiss/service/option.go b/pkg/agent/core/faiss/service/option.go index 6f06556c35..7cc465683a 100644 --- a/pkg/agent/core/faiss/service/option.go +++ b/pkg/agent/core/faiss/service/option.go @@ -29,7 +29,7 @@ import ( "github.com/vdaas/vald/internal/timeutil" ) -// Option represent the functional option for faiss +// Option represent the functional option for faiss. type Option func(f *faiss) error var defaultOptions = []Option{ From 207a2bc0a0b922e27930d2a168be80ee9e0335fd Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 20 Feb 2024 15:08:12 +0000 Subject: [PATCH 10/10] Lint --- .../job/readreplica/rotate/service/rotator_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/index/job/readreplica/rotate/service/rotator_test.go b/pkg/index/job/readreplica/rotate/service/rotator_test.go index e4558eab41..4369a884fc 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator_test.go +++ b/pkg/index/job/readreplica/rotate/service/rotator_test.go @@ -133,8 +133,8 @@ func Test_parseReplicaID(t *testing.T) { }, }, func() test { - wantId1 := "bar" - wantId2 := "baz" + wantID1 := "bar" + wantID2 := "baz" mock := &k8s.ValdK8sClientMock{} mock.On("LabelSelector", testify.Anything, testify.Anything, testify.Anything).Return(client.NewSelector(), nil) @@ -144,14 +144,14 @@ func Test_parseReplicaID(t *testing.T) { { ObjectMeta: client.ObjectMeta{ Labels: map[string]string{ - labelKey: wantId1, + labelKey: wantID1, }, }, }, { ObjectMeta: client.ObjectMeta{ Labels: map[string]string{ - labelKey: wantId2, + labelKey: wantID2, }, }, }, @@ -165,7 +165,7 @@ func Test_parseReplicaID(t *testing.T) { c: mock, }, want: want{ - ids: []string{wantId1, wantId2}, + ids: []string{wantID1, wantID2}, err: nil, }, }