diff --git a/.github/actions/detect-docker-image-tags/action.yaml b/.github/actions/detect-docker-image-tags/action.yaml index 18dc766362..e38088b07a 100644 --- a/.github/actions/detect-docker-image-tags/action.yaml +++ b/.github/actions/detect-docker-image-tags/action.yaml @@ -23,7 +23,7 @@ inputs: images: description: "Image names" required: false - default: "vdaas/vald-agent-ngt vdaas/vald-agent-faiss vdaas/vald-agent-sidecar vdaas/vald-discoverer-k8s vdaas/vald-lb-gateway vdaas/vald-filter-gateway vdaas/vald-mirror-gateway vdaas/vald-manager-index" + default: "vdaas/vald-agent-ngt vdaas/vald-agent-faiss vdaas/vald-agent-sidecar vdaas/vald-discoverer-k8s vdaas/vald-lb-gateway vdaas/vald-filter-gateway vdaas/vald-mirror-gateway vdaas/vald-manager-index vdaas/vald-index-operator vdaas/vald-readreplica-rotate" outputs: HELM_EXTRA_OPTIONS: description: "Helm extra options that specifies image tags" @@ -49,8 +49,10 @@ runs: ["vdaas/vald-manager-index"]="manager.index.image.tag" ["vdaas/vald-index-creation"]="manager.index.creator.image.tag" ["vdaas/vald-index-save"]="manager.index.saver.image.tag" + ["vdaas/vald-readreplica-rotate"]="manager.index.readreplica.rotator.image.tag" ["vdaas/vald-helm-operator"]="image.tag" ["vdaas/vald-ci-container"]="" + ["vdaas/vald-index-operator"]="manager.index.operator.image.tag" ) for image in ${IMAGES}; do diff --git a/.github/actions/setup-e2e/action.yaml b/.github/actions/setup-e2e/action.yaml index 295f7f815a..f2c11c0aec 100644 --- a/.github/actions/setup-e2e/action.yaml +++ b/.github/actions/setup-e2e/action.yaml @@ -43,7 +43,7 @@ inputs: target_images: description: "Image names" required: false - default: "vdaas/vald-agent-ngt vdaas/vald-agent-faiss vdaas/vald-agent-sidecar vdaas/vald-discoverer-k8s vdaas/vald-lb-gateway vdaas/vald-filter-gateway vdaas/vald-mirror-gateway vdaas/vald-manager-index" + default: "vdaas/vald-agent-ngt vdaas/vald-agent-faiss vdaas/vald-agent-sidecar vdaas/vald-discoverer-k8s vdaas/vald-lb-gateway vdaas/vald-filter-gateway vdaas/vald-mirror-gateway vdaas/vald-manager-index vdaas/vald-index-operator vdaas/vald-readreplica-rotate" outputs: HELM_EXTRA_OPTIONS: description: "Helm extra options that specifies E2E target image tags" diff --git a/.github/actions/wait-for-docker-image/action.yaml b/.github/actions/wait-for-docker-image/action.yaml index 8877419eae..546a46d805 100644 --- a/.github/actions/wait-for-docker-image/action.yaml +++ b/.github/actions/wait-for-docker-image/action.yaml @@ -19,7 +19,7 @@ inputs: images: description: "image names" required: false - default: "vdaas/vald-agent-ngt vdaas/vald-agent-faiss vdaas/vald-agent-sidecar vdaas/vald-discoverer-k8s vdaas/vald-lb-gateway vdaas/vald-filter-gateway vdaas/vald-mirror-gateway vdaas/vald-manager-index" + default: "vdaas/vald-agent-ngt vdaas/vald-agent-faiss vdaas/vald-agent-sidecar vdaas/vald-discoverer-k8s vdaas/vald-lb-gateway vdaas/vald-filter-gateway vdaas/vald-mirror-gateway vdaas/vald-manager-index vdaas/vald-index-operator vdaas/vald-readreplica-rotate" outputs: {} runs: using: "composite" diff --git a/.github/helm/values/values-readreplica.yaml b/.github/helm/values/values-readreplica.yaml index b664de67b9..6cf28a3de1 100644 --- a/.github/helm/values/values-readreplica.yaml +++ b/.github/helm/values/values-readreplica.yaml @@ -48,6 +48,7 @@ agent: dimension: 784 index_path: /var/ngt/index enable_in_memory_mode: false + enable_export_index_info_to_k8s: true persistentVolume: enabled: true accessMode: ReadWriteOnce @@ -80,3 +81,6 @@ manager: readreplica: rotator: enabled: true + operator: + enabled: true + rotation_job_concurrency: 10 diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 15bcedbe8f..780ddd8b66 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -279,9 +279,6 @@ jobs: e2e-stream-crud-with-readreplica: name: "E2E test (Stream CRUD) with read replica" needs: [dump-contexts-to-log] - # FIXME: This job is disabled because it is not working properly for a moment. - # Needs to fix TestE2EReadReplica not to use CronJob since there is no CronJob for read replica anymore. - if: false runs-on: ubuntu-latest timeout-minutes: 60 steps: diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index b788ef327b..10a91a5224 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -67,6 +67,7 @@ type ( EnvVar = corev1.EnvVar Job = batchv1.Job JobList = batchv1.JobList + JobStatus = batchv1.JobStatus CronJob = batchv1.CronJob Result = reconcile.Result ) diff --git a/internal/test/testify/testify.go b/internal/test/testify/testify.go index 49a7a6193b..4b85ddaa71 100644 --- a/internal/test/testify/testify.go +++ b/internal/test/testify/testify.go @@ -24,3 +24,5 @@ type ( const ( Anything = mock.Anything ) + +var AnythingOfType = mock.AnythingOfType diff --git a/pkg/index/operator/service/operator_test.go b/pkg/index/operator/service/operator_test.go new file mode 100644 index 0000000000..f96fe89fa6 --- /dev/null +++ b/pkg/index/operator/service/operator_test.go @@ -0,0 +1,326 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/k8s/vald" + "github.com/vdaas/vald/internal/test/mock/k8s" + "github.com/vdaas/vald/internal/test/testify" +) + +func Test_operator_podOnReconcile(t *testing.T) { + t.Parallel() + + type want struct { + res client.Result + createCalled bool + err error + } + type test struct { + name string + agentPod *client.Pod + readReplicaEnabled bool + readReplicaDeployment *client.Deployment + runningJobs []client.Job + rotationJobConcurrency uint + want want + } + + tests := []test{ + { + name: "returns client.Result{} when read replica is not enabled", + readReplicaEnabled: false, + want: want{ + res: client.Result{}, + createCalled: false, + err: nil, + }, + }, + { + name: "returns client.Result{} when pod is not a statefulset", + readReplicaEnabled: true, + agentPod: &client.Pod{}, + want: want{ + res: client.Result{}, + createCalled: false, + err: nil, + }, + }, + func() test { + saveTime := time.Now() + rotateTime := saveTime.Add(1 * time.Second) + return test{ + name: "returns requeue: false when last snapshot time is after the last save time", + readReplicaEnabled: true, + agentPod: &client.Pod{ + ObjectMeta: client.ObjectMeta{ + Labels: map[string]string{ + client.PodIndexLabel: "0", + }, + Annotations: map[string]string{ + vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), + }, + }, + }, + readReplicaDeployment: &client.Deployment{ + ObjectMeta: client.ObjectMeta{ + Name: "deploymentName", + Annotations: map[string]string{ + vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), + }, + }, + }, + want: want{ + res: client.Result{ + Requeue: false, + }, + createCalled: false, + err: nil, + }, + } + }(), + func() test { + saveTime := time.Now() + rotateTime := saveTime.Add(-1 * time.Second) + return test{ + name: "returns requeue: false and calls client.Create once when last snapshot time is before the last save time", + readReplicaEnabled: true, + agentPod: &client.Pod{ + ObjectMeta: client.ObjectMeta{ + Labels: map[string]string{ + client.PodIndexLabel: "0", + }, + Annotations: map[string]string{ + vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), + }, + }, + }, + readReplicaDeployment: &client.Deployment{ + ObjectMeta: client.ObjectMeta{ + Name: "deploymentName", + Annotations: map[string]string{ + vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), + }, + }, + }, + want: want{ + res: client.Result{ + Requeue: false, + }, + createCalled: true, + err: nil, + }, + } + }(), + func() test { + saveTime := time.Now() + rotateTime := saveTime.Add(-1 * time.Second) + return test{ + name: "returns requeue: true when there is already one running job when rotation job concurrency is 1", + readReplicaEnabled: true, + agentPod: &client.Pod{ + ObjectMeta: client.ObjectMeta{ + Labels: map[string]string{ + client.PodIndexLabel: "0", + }, + Annotations: map[string]string{ + vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), + }, + }, + }, + readReplicaDeployment: &client.Deployment{ + ObjectMeta: client.ObjectMeta{ + Name: "deploymentName", + Annotations: map[string]string{ + vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), + }, + }, + }, + runningJobs: []client.Job{ + { + ObjectMeta: client.ObjectMeta{ + Name: "already running job1", + }, + Status: client.JobStatus{ + Active: 1, + }, + }, + }, + rotationJobConcurrency: 1, + want: want{ + res: client.Result{ + Requeue: true, + }, + createCalled: false, + err: nil, + }, + } + }(), + func() test { + saveTime := time.Now() + rotateTime := saveTime.Add(-1 * time.Second) + return test{ + name: "returns requeue: false and create job when there is one running job when rotation job concurrency is 2", + readReplicaEnabled: true, + agentPod: &client.Pod{ + ObjectMeta: client.ObjectMeta{ + Labels: map[string]string{ + client.PodIndexLabel: "0", + }, + Annotations: map[string]string{ + vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), + }, + }, + }, + readReplicaDeployment: &client.Deployment{ + ObjectMeta: client.ObjectMeta{ + Name: "deploymentName", + Annotations: map[string]string{ + vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), + }, + }, + }, + runningJobs: []client.Job{ + { + ObjectMeta: client.ObjectMeta{ + Name: "already running job1", + }, + Status: client.JobStatus{ + Active: 1, + }, + }, + }, + rotationJobConcurrency: 2, + want: want{ + res: client.Result{ + Requeue: false, + }, + createCalled: true, + err: nil, + }, + } + }(), + func() test { + saveTime := time.Now() + rotateTime := saveTime.Add(-1 * time.Second) + return test{ + name: "returns requeue: true when there are two running jobs when rotation job concurrency is 2", + readReplicaEnabled: true, + agentPod: &client.Pod{ + ObjectMeta: client.ObjectMeta{ + Labels: map[string]string{ + client.PodIndexLabel: "0", + }, + Annotations: map[string]string{ + vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), + }, + }, + }, + readReplicaDeployment: &client.Deployment{ + ObjectMeta: client.ObjectMeta{ + Name: "deploymentName", + Annotations: map[string]string{ + vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), + }, + }, + }, + runningJobs: []client.Job{ + { + ObjectMeta: client.ObjectMeta{ + Name: "already running job1", + }, + Status: client.JobStatus{ + Active: 1, + }, + }, + { + ObjectMeta: client.ObjectMeta{ + Name: "already running job2", + }, + Status: client.JobStatus{ + Active: 1, + }, + }, + }, + rotationJobConcurrency: 2, + want: want{ + res: client.Result{ + Requeue: true, + }, + createCalled: false, + err: nil, + }, + } + }(), + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + + mock := &k8s.ValdK8sClientMock{} + mock.On("LabelSelector", testify.Anything, testify.Anything, testify.Anything).Return(client.NewSelector(), nil).Maybe() + mock.On("List", testify.Anything, testify.AnythingOfType("*v1.DeploymentList"), testify.Anything).Run(func(args testify.Arguments) { + arg, ok := args.Get(1).(*client.DeploymentList) + require.True(t, ok) + + arg.Items = []client.Deployment{*test.readReplicaDeployment} + }).Return(nil).Maybe() + + mock.On("List", testify.Anything, testify.AnythingOfType("*v1.JobList"), testify.Anything).Run(func(args testify.Arguments) { + arg, ok := args.Get(1).(*client.JobList) + require.True(t, ok) + + arg.Items = test.runningJobs + }).Return(nil).Maybe() + + // testify/mock does not accept to set Times(0) so you cannot do things like .Return(nil).Once(calledTimes) + // ref: https://github.com/stretchr/testify/issues/566 + if test.want.createCalled { + mock.On("Create", testify.Anything, testify.Anything, testify.Anything).Return(nil).Once() + } + defer mock.AssertExpectations(tt) + + concurrency := uint(1) + if test.rotationJobConcurrency != 0 { + concurrency = test.rotationJobConcurrency + } + op := operator{ + client: mock, + readReplicaEnabled: test.readReplicaEnabled, + rotationJobConcurrency: concurrency, + } + + op.rotatorJob = &client.Job{ + ObjectMeta: client.ObjectMeta{ + Name: "foo job", + }, + } + + res, err := op.podOnReconcile(context.Background(), test.agentPod) + require.Equal(t, test.want.err, err) + require.Equal(t, test.want.res, res) + }) + } +} diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index 4c5411a5d0..ca99e43859 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -25,7 +25,6 @@ import ( "fmt" "os" "os/exec" - "strconv" "testing" "time" @@ -834,6 +833,7 @@ func TestE2EIndexJobCorrection(t *testing.T) { } } +// TestE2EReadReplica tests that search requests succeed with read replica resources. func TestE2EReadReplica(t *testing.T) { t.Cleanup(teardown) @@ -861,29 +861,15 @@ func TestE2EReadReplica(t *testing.T) { sleep(t, waitAfterInsertDuration) - t.Log("starting to restart all the agent pods to make it backup index to pvc...") - if err := kubectl.RolloutResource(ctx, t, "statefulsets/vald-agent"); err != nil { - t.Fatalf("failed to restart all the agent pods: %s", err) - } - - t.Log("starting to create read replica rotators...") - pods, err := kubeClient.GetPods(ctx, namespace, "app=vald-agent") - if err != nil { - t.Fatalf("GetPods failed: %s", err) - } - cronJobs, err := kubeClient.ListCronJob(ctx, namespace, "app=vald-readreplica-rotate") - if err != nil { - t.Fatalf("ListCronJob failed: %s", err) - } - cronJob := cronJobs[0] - for id := 0; id < len(pods); id++ { - // the annotation key comes from `manager.index.readreplica.rotator.target_read_replica_id_annotations_key` - cronJob.Spec.JobTemplate.Spec.Template.GetObjectMeta().SetAnnotations(map[string]string{"vald.vdaas.org/target-read-replica-id": strconv.Itoa(id)}) - kubeClient.CreateJobFromCronJob(ctx, "vald-readreplica-rotate-"+strconv.Itoa(id), namespace, &cronJob) - } - + t.Log("index operator should be creating read replica rotator jobs") t.Log("waiting for read replica rotator jobs to complete...") - if err := kubectl.WaitResources(ctx, t, "job", "app=vald-readreplica-rotate", "complete", "120s"); err != nil { + if err := kubectl.WaitResources(ctx, t, "job", "app=vald-readreplica-rotate", "complete", "60s"); err != nil { + t.Log("wait failed. printing yaml of vald-readreplica-rotate") + kubectl.KubectlCmd(ctx, t, "get", "pod", "-l", "app=vald-readreplica-rotate", "-oyaml") + t.Log("wait failed. printing log of vald-index-operator") + kubectl.DebugLog(ctx, t, "app=vald-index-operator") + t.Log("wait failed. printing log of vald-readreplica-rotate") + kubectl.DebugLog(ctx, t, "app=vald-readreplica-rotate") t.Fatalf("failed to wait for read replica rotator jobs to complete: %s", err) } diff --git a/tests/e2e/kubernetes/kubectl/kubectl.go b/tests/e2e/kubernetes/kubectl/kubectl.go index 2237507fb1..a09725a5f1 100644 --- a/tests/e2e/kubernetes/kubectl/kubectl.go +++ b/tests/e2e/kubernetes/kubectl/kubectl.go @@ -48,6 +48,18 @@ func WaitResources(ctx context.Context, t *testing.T, resource, labelSelector, c return runCmd(t, cmd) } +func DebugLog(ctx context.Context, t *testing.T, label string) error { + t.Helper() + cmd := exec.CommandContext(ctx, "kubectl", "logs", "-l", label, "--tail=-1") + return runCmd(t, cmd) +} + +func KubectlCmd(ctx context.Context, t *testing.T, subcmds ...string) error { + t.Helper() + cmd := exec.CommandContext(ctx, "kubectl", subcmds...) + return runCmd(t, cmd) +} + func runCmd(t *testing.T, cmd *exec.Cmd) error { t.Helper() out, err := cmd.Output()