diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index b5bd90b67e..5a1fca4a34 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -22,7 +22,7 @@ import ( snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" "github.com/vdaas/vald/internal/errors" - client "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/sync/errgroup" diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index 4ec8689837..7726d04ecf 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -105,14 +105,12 @@ func init() { flag.Parse() var err error + kubeClient, err = client.New(*kubeConfig) + if err != nil { + panic(err) + } if *pf { - kubeClient, err = client.New(*kubeConfig) - if err != nil { - panic(err) - } - forwarder = kubeClient.Portforward(namespace, *pfPodName, port, *pfPodPort) - err = forwarder.Start() if err != nil { panic(err) @@ -843,46 +841,36 @@ func TestE2EReadReplica(t *testing.T) { sleep(t, waitAfterInsertDuration) t.Log("starting to restart all the agent pods to make it backup index to pvc...") - cmd := exec.CommandContext(ctx, "sh", "-c", - "kubectl delete pod -l app=vald-agent-ngt && kubectl wait --timeout=120s --for=condition=Ready pod -l app=vald-agent-ngt") - out, err := cmd.Output() - if err != nil { - parseCmdErrorAndFail(t, out, err) + if err := kubeClient.RolloutResource(ctx, "statefulsets/vald-agent-ngt"); err != nil { + t.Fatalf("failed to restart all the agent pods: %s", err) + } - t.Log(string(out)) - t.Log("getting agent statefulset replicas...") - cmd = exec.CommandContext(ctx, "sh", "-c", "kubectl get statefulset vald-agent-ngt -o=jsonpath='{.spec.replicas}'") - out, err = cmd.Output() + t.Log("starting to create read replica rotators...") + pods, err := kubeClient.GetPods(ctx, namespace, "app=vald-agent-ngt") if err != nil { - parseCmdErrorAndFail(t, out, err) + t.Fatalf("GetPods failed: %s", err) } - replicasStr := string(out) - replicas, err := strconv.Atoi(replicasStr) + cronJobs, err := kubeClient.ListCronJob(ctx, namespace, "app=vald-readreplica-rotate") if err != nil { - t.Fatalf("failed to parse replicas: %s", err) + t.Fatalf("ListCronJob failed: %s", err) } - t.Log("statefulset replicas found as:" + string(out)) - - t.Log("starting to create read replica rotators...") - for id := 0; id < replicas; id++ { - patchCmd := fmt.Sprintf(`kubectl patch cronjob vald-readreplica-rotate --namespace=default --type='json' -p='[{"op": "replace", "path": "/spec/jobTemplate/spec/template/spec/containers/0/env/0/value", "value": "%d"}]'`, id) - createCmd := fmt.Sprintf("kubectl create job vald-readreplica-rotate-%d --from=cronjob/vald-readreplica-rotate", id) - cmd := exec.CommandContext(ctx, "sh", "-c", patchCmd+" && "+createCmd) - out, err := cmd.Output() - if err != nil { - parseCmdErrorAndFail(t, out, err) - } - t.Log(string(out)) + cronJob := cronJobs[0] + for id := 0; id < len(pods); id++ { + cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env[0].Value = strconv.Itoa(id) + kubeClient.CreateJobFromCronJob(ctx, "vald-readreplica-rotate-"+strconv.Itoa(id), namespace, &cronJob) } t.Log("waiting for read replica rotator jobs to complete...") - cmd = exec.CommandContext(ctx, "sh", "-c", "kubectl wait --timeout=120s --for=condition=complete job -l app=vald-readreplica-rotate") - out, err = cmd.Output() - if err != nil { - parseCmdErrorAndFail(t, out, err) + // cmd := exec.CommandContext(ctx, "sh", "-c", "kubectl wait --timeout=120s --for=condition=complete job -l app=vald-readreplica-rotate") + // out, err := cmd.Output() + // if err != nil { + // parseCmdErrorAndFail(t, out, err) + // } + // t.Log(string(out)) + if err := kubeClient.WaitResources(ctx, "job", "app=vald-readreplica-rotate", "complete", "120s"); err != nil { + t.Fatalf("failed to wait for read replica rotator jobs to complete: %s", err) } - t.Log(string(out)) err = op.Search(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], diff --git a/tests/e2e/kubernetes/client/client.go b/tests/e2e/kubernetes/client/client.go index 2ced2f3cfa..0aeb05228d 100644 --- a/tests/e2e/kubernetes/client/client.go +++ b/tests/e2e/kubernetes/client/client.go @@ -21,12 +21,16 @@ package client import ( "context" + "fmt" "os" + "os/exec" "time" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/strings" "github.com/vdaas/vald/tests/e2e/kubernetes/portforward" + v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -58,6 +62,29 @@ type Client interface { namespace, name string, timeout time.Duration, ) (ok bool, err error) + WaitForPodsReady( + ctx context.Context, + namespace, labelSelector, timeout string, + ) error + ListCronJob( + ctx context.Context, + namespace, labelSelector string, + ) ([]v1.CronJob, error) + CreateJob( + ctx context.Context, + namespace string, + job *v1.Job, + ) error + CreateJobFromCronJob( + ctx context.Context, + name, namespace string, + cronJob *v1.CronJob, + ) error + RolloutResource( + ctx context.Context, + resource string, + ) error + WaitResources(ctx context.Context, resource, labelSelector, condition, timeout string) error } type client struct { @@ -109,7 +136,6 @@ func (cli *client) GetPod( if err != nil { return nil, err } - return pod, nil } @@ -174,3 +200,81 @@ func (cli *client) WaitForPodReady( } } } + +func (cli *client) RolloutResource(ctx context.Context, resource string) error { + cmd := exec.CommandContext(ctx, "sh", "-c", + fmt.Sprintf("kubectl rollout restart %s && kubectl rollout status %s", resource, resource), + ) + out, err := cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + return errors.New(string(exitErr.Stderr)) + } else { + return fmt.Errorf("unexpected error: %w", err) + } + } + fmt.Println(string(out)) + return nil +} + +func (cli *client) WaitResources(ctx context.Context, resource, labelSelector, condition, timeout string) error { + cmd := exec.CommandContext(ctx, "sh", "-c", + fmt.Sprintf("kubectl wait --for=condition=%s %s -l %s --timeout %s", condition, resource, labelSelector, timeout), + ) + out, err := cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + return errors.New(string(exitErr.Stderr)) + } else { + return fmt.Errorf("unexpected error: %w", err) + } + } + fmt.Println(string(out)) + return nil +} + +func (cli *client) WaitForPodsReady(ctx context.Context, namespace, labelSelector, timeout string) error { + // use kubectl wait because it's complicated to implement this with client-go + cmd := exec.CommandContext(ctx, "sh", "-c", + fmt.Sprintf("kubectl wait --timeout=%s --for=condition=Ready pod -n %s -l %s", timeout, namespace, labelSelector), + ) + out, err := cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + return errors.New(string(exitErr.Stderr)) + } else { + return fmt.Errorf("unexpected error: %w", err) + } + } + fmt.Println(string(out)) + return nil +} + +func (cli *client) ListCronJob(ctx context.Context, namespace, labelSelector string) ([]v1.CronJob, error) { + cronJobs, err := cli.clientset.BatchV1().CronJobs(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, err + } + + return cronJobs.Items, nil +} + +func (cli *client) CreateJob(ctx context.Context, namespace string, job *v1.Job) error { + _, err := cli.clientset.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{}) + return err +} + +func (cli *client) CreateJobFromCronJob(ctx context.Context, name, namespace string, cronJob *v1.CronJob) error { + job := &v1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: cronJob.Spec.JobTemplate.Spec, + } + + _, err := cli.clientset.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{}) + return err +}