From 0108fe70bb35b705f2d2be78692ef96e6d3a56b5 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 26 Jul 2024 19:37:34 +0800 Subject: [PATCH] tests/e2e: add test cases related to HashKV Signed-off-by: Wei Fu (cherry picked from commit 6f93af85d2aacbc85948f1db07e07f1a86698366) Signed-off-by: Wei Fu --- tests/e2e/hashkv_test.go | 241 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100644 tests/e2e/hashkv_test.go diff --git a/tests/e2e/hashkv_test.go b/tests/e2e/hashkv_test.go new file mode 100644 index 00000000000..e771c233b04 --- /dev/null +++ b/tests/e2e/hashkv_test.go @@ -0,0 +1,241 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/client/pkg/v3/fileutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestVerifyHashKVAfterCompact(t *testing.T) { + scenarios := []struct { + clusterVersion e2e.ClusterVersion + keys []string // used for data generators + }{ + { + clusterVersion: e2e.CurrentVersion, + keys: []string{"key0"}, + }, + { + clusterVersion: e2e.CurrentVersion, + keys: []string{"key0", "key1"}, + }, + { + clusterVersion: e2e.QuorumLastVersion, + keys: []string{"key0"}, + }, + { + clusterVersion: e2e.QuorumLastVersion, + keys: []string{"key0", "key1"}, + }, + } + + for _, compactedOnTombstoneRev := range []bool{false, true} { + for _, scenario := range scenarios { + t.Run(fmt.Sprintf("compactedOnTombstone=%v - %s - Keys=%v", compactedOnTombstoneRev, scenario.clusterVersion, scenario.keys), func(t *testing.T) { + e2e.BeforeTest(t) + + if scenario.clusterVersion != e2e.CurrentVersion { + if !fileutil.Exist(e2e.BinPathLastRelease) { + t.Skipf("%q does not exist", e2e.BinPathLastRelease) + } + } + + ctx := context.Background() + + cfg := e2e.EtcdProcessClusterConfig{ + ClusterSize: 3, + IsClientAutoTLS: true, + ClientTLS: e2e.ClientTLS, + Version: scenario.clusterVersion, + } + + clus, err := e2e.NewEtcdProcessCluster(t, &cfg) + require.NoError(t, err) + defer clus.Close() + + tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, &cfg, scenario.keys) + + compactedOnRev := tombstoneRevs[0] + + // If compaction revision isn't a tombstone, select a revision in the middle of two tombstones. + if !compactedOnTombstoneRev { + compactedOnRev = (tombstoneRevs[0] + tombstoneRevs[1]) / 2 + require.Greater(t, tombstoneRevs[1], compactedOnRev) + } + + cli := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS) + defer cli.Close() + + t.Logf("COMPACT on rev=%d", compactedOnRev) + _, err = cli.Compact(ctx, compactedOnRev, clientv3.WithCompactPhysical()) + require.NoError(t, err) + + for rev := compactedOnRev; rev <= lastestRev; rev++ { + verifyConsistentHashKVAcrossAllMembers(t, cli, rev) + } + }) + } + } +} + +func TestVerifyHashKVAfterTwoCompactionsOnTombstone_MixVersions(t *testing.T) { + e2e.BeforeTest(t) + + if !fileutil.Exist(e2e.BinPathLastRelease) { + t.Skipf("%q does not exist", e2e.BinPathLastRelease) + } + + ctx := context.Background() + + cfg := e2e.EtcdProcessClusterConfig{ + ClusterSize: 3, + IsClientAutoTLS: true, + ClientTLS: e2e.ClientTLS, + Version: e2e.QuorumLastVersion, + } + + clus, err := e2e.NewEtcdProcessCluster(t, &cfg) + require.NoError(t, err) + defer clus.Close() + + t.Cleanup(func() { clus.Close() }) + + tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, &cfg, []string{"key0"}) + + cli := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS) + defer cli.Close() + + firstCompactOnRev := tombstoneRevs[0] + t.Logf("COMPACT rev=%d", firstCompactOnRev) + _, err = cli.Compact(ctx, firstCompactOnRev, clientv3.WithCompactPhysical()) + require.NoError(t, err) + + secondCompactOnRev := tombstoneRevs[1] + t.Logf("COMPACT rev=%d", secondCompactOnRev) + _, err = cli.Compact(ctx, secondCompactOnRev, clientv3.WithCompactPhysical()) + require.NoError(t, err) + + for rev := secondCompactOnRev; rev <= lastestRev; rev++ { + verifyConsistentHashKVAcrossAllMembers(t, cli, rev) + } +} + +func TestVerifyHashKVAfterCompactionOnLastTombstone_MixVersions(t *testing.T) { + e2e.BeforeTest(t) + + if !fileutil.Exist(e2e.BinPathLastRelease) { + t.Skipf("%q does not exist", e2e.BinPathLastRelease) + } + + for _, keys := range [][]string{ + []string{"key0"}, + []string{"key0", "key1"}, + } { + t.Run(fmt.Sprintf("#%v", keys), func(t *testing.T) { + ctx := context.Background() + + cfg := e2e.EtcdProcessClusterConfig{ + ClusterSize: 3, + IsClientAutoTLS: true, + ClientTLS: e2e.ClientTLS, + Version: e2e.QuorumLastVersion, + } + + clus, err := e2e.NewEtcdProcessCluster(t, &cfg) + require.NoError(t, err) + defer clus.Close() + + tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, &cfg, keys) + + cli := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS) + defer cli.Close() + + compactOnRev := tombstoneRevs[len(tombstoneRevs)-1] + t.Logf("COMPACT rev=%d", compactOnRev) + _, err = cli.Compact(ctx, compactOnRev, clientv3.WithCompactPhysical()) + require.NoError(t, err) + + for rev := compactOnRev; rev <= lastestRev; rev++ { + verifyConsistentHashKVAcrossAllMembers(t, cli, rev) + } + + }) + } +} + +// populateDataForHashKV populates some sample data, and return a slice of tombstone +// revisions and the latest revision +func populateDataForHashKV(t *testing.T, clus *e2e.EtcdProcessCluster, clusCfg *e2e.EtcdProcessClusterConfig, keys []string) ([]int64, int64) { + c := newClient(t, clus.EndpointsGRPC(), clusCfg.ClientTLS, clusCfg.IsClientAutoTLS) + defer c.Close() + + ctx := context.Background() + totalOperations := 40 + + var ( + tombStoneRevs []int64 + latestRev int64 + ) + + deleteStep := 10 // submit a delete operation on every 10 operations + for i := 1; i <= totalOperations; i++ { + if i%deleteStep == 0 { + t.Logf("Deleting key=%s", keys[0]) // Only delete the first key for simplicity + resp, derr := c.Delete(ctx, keys[0]) + require.NoError(t, derr) + latestRev = resp.Header.Revision + tombStoneRevs = append(tombStoneRevs, resp.Header.Revision) + continue + } + + value := fmt.Sprintf("%d", i) + var ops []clientv3.Op + for _, key := range keys { + ops = append(ops, clientv3.OpPut(key, value)) + } + + t.Logf("Writing keys: %v, value: %s", keys, value) + resp, terr := c.Txn(ctx).Then(ops...).Commit() + require.NoError(t, terr) + require.True(t, resp.Succeeded) + require.Len(t, resp.Responses, len(ops)) + latestRev = resp.Header.Revision + } + return tombStoneRevs, latestRev +} + +func verifyConsistentHashKVAcrossAllMembers(t *testing.T, cli *clientv3.Client, hashKVOnRev int64) { + t.Logf("HashKV on rev=%d", hashKVOnRev) + resp, err := hashKVs(cli.Endpoints(), cli) + require.NoError(t, err) + + require.Greater(t, len(resp), 1) + require.True(t, resp[0].Hash != 0) + t.Logf("One Hash value is %d", resp[0].Hash) + + for i := 1; i < len(resp); i++ { + require.Equal(t, resp[0].Hash, resp[i].Hash) + } +}