diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index 09bc5cab46..5d9db696f6 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -25,6 +25,7 @@ import ( "fmt" "os" "os/exec" + "strings" "testing" "time" @@ -802,8 +803,19 @@ func TestE2EIndexJobCorrection(t *testing.T) { } t.Log("Test case 2: execute index correction after one agent removed") - t.Log("removing vald-agent-0...") - cmd := exec.CommandContext(ctx, "sh", "-c", "kubectl delete pod vald-agent-0 && kubectl wait --for=condition=Ready pod/vald-agent-0") + detail, err := op.IndexDetail(t, ctx) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + var target string + for a, c := range detail.Counts { + if c.Stored > 0 { + target = strings.Split(a, ":")[0] + break + } + } + + cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("kubectl get pods -o custom-columns=:metadata.name --no-headers=true --field-selector=\"status.podIP=%s\"", target)) out, err := cmd.Output() if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { @@ -812,6 +824,18 @@ func TestE2EIndexJobCorrection(t *testing.T) { t.Fatalf("unexpected error on creating job: %v", err) } } + agent := strings.TrimRight(string(out), "\n") + + t.Logf("removing %s...", agent) + cmd = exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("kubectl delete pod %s && kubectl wait --for=condition=Ready pod/%s", agent, agent)) + out, err = cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + t.Fatalf("%s, %s, %v", string(out), string(exitErr.Stderr), err) + } else { + t.Fatalf("unexpected error on creating job: %v", err) + } + } t.Log(string(out)) // correct the deleted index diff --git a/tests/e2e/operation/operation.go b/tests/e2e/operation/operation.go index 9ee24d79e9..9cda2a7a54 100644 --- a/tests/e2e/operation/operation.go +++ b/tests/e2e/operation/operation.go @@ -133,6 +133,7 @@ type Client interface { CreateIndex(t *testing.T, ctx context.Context) error SaveIndex(t *testing.T, ctx context.Context) error IndexInfo(t *testing.T, ctx context.Context) (*payload.Info_Index_Count, error) + IndexDetail(t *testing.T, ctx context.Context) (*payload.Info_Index_Detail, error) } type client struct { @@ -182,6 +183,15 @@ func (c *client) IndexInfo(t *testing.T, ctx context.Context) (*payload.Info_Ind return client.IndexInfo(ctx, &payload.Empty{}) } +func (c *client) IndexDetail(t *testing.T, ctx context.Context) (*payload.Info_Index_Detail, error) { + client, err := c.getClient() + if err != nil { + return nil, err + } + + return client.IndexDetail(ctx, &payload.Empty{}) +} + func (c *client) getGRPCConn() (*grpc.ClientConn, error) { return grpc.NewClient( c.host+":"+strconv.Itoa(c.port),