From 389e73c91c6149752050db470c99f238540259e3 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 28 May 2024 12:50:50 +0800 Subject: [PATCH] ddl: make sure put key into ETCD monotonously (#52381) (#52482) close pingcap/tidb#47060, close pingcap/tidb#52335 --- pkg/ddl/syncer/BUILD.bazel | 1 + pkg/ddl/syncer/syncer.go | 2 +- pkg/ddl/syncer/syncer_test.go | 52 +++++++++++++++++++++++++++++++++++ pkg/ddl/util/util.go | 48 ++++++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 1 deletion(-) diff --git a/pkg/ddl/syncer/BUILD.bazel b/pkg/ddl/syncer/BUILD.bazel index 672aca31a4870..a1cc2e9b691dd 100644 --- a/pkg/ddl/syncer/BUILD.bazel +++ b/pkg/ddl/syncer/BUILD.bazel @@ -33,6 +33,7 @@ go_test( "syncer_test.go", ], flaky = True, + shard_count = 3, deps = [ ":syncer", "//pkg/ddl", diff --git a/pkg/ddl/syncer/syncer.go b/pkg/ddl/syncer/syncer.go index da2ce6b438f53..6eb1d849f30a6 100644 --- a/pkg/ddl/syncer/syncer.go +++ b/pkg/ddl/syncer/syncer.go @@ -238,7 +238,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, jobID int64 var path string if variable.EnableMDL.Load() { path = fmt.Sprintf("%s/%d/%s", util.DDLAllSchemaVersionsByJob, jobID, s.ddlID) - err = util.PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver) + err = util.PutKVToEtcdMono(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver) } else { path = s.selfSchemaVerPath err = util.PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, path, ver, diff --git a/pkg/ddl/syncer/syncer_test.go b/pkg/ddl/syncer/syncer_test.go index 0b2afaf87bd49..9f5d2a37dd335 100644 --- a/pkg/ddl/syncer/syncer_test.go +++ b/pkg/ddl/syncer/syncer_test.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "runtime" + "strconv" + "sync/atomic" "testing" "time" @@ -182,3 +184,53 @@ func checkRespKV(t *testing.T, kvCount int, key, val string, kvs ...*mvccpb.KeyV require.Equal(t, key, string(kv.Key)) require.Equal(t, val, string(kv.Value)) } + +func TestPutKVToEtcdMono(t *testing.T) { + integration.BeforeTestExternal(t) + + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + cli := cluster.RandClient() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1)) + require.NoError(t, err) + + err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(2)) + require.NoError(t, err) + + err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(3)) + require.NoError(t, err) + + eg := util.NewWaitGroupEnhancedWrapper("", nil, false) + + var errCount atomic.Int64 + for i := 0; i < 30; i++ { + eg.Run(func() { + err := util2.PutKVToEtcdMono(ctx, cli, 1, "testKey", strconv.Itoa(5)) + if err != nil { + errCount.Add(1) + } + }, fmt.Sprintf("test_%v", i)) + } + // PutKVToEtcdMono should be conflicted and get errors. + eg.Wait() + require.True(t, errCount.Load() > 0) + + errCount.Store(0) + eg = util.NewWaitGroupEnhancedWrapper("", nil, false) + for i := 0; i < 30; i++ { + eg.Run(func() { + err := util2.PutKVToEtcd(ctx, cli, 1, "testKey", strconv.Itoa(5)) + if err != nil { + errCount.Add(1) + } + }, fmt.Sprintf("test_%v", i)) + } + eg.Wait() + require.True(t, errCount.Load() == 0) + + err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1)) + require.NoError(t, err) +} diff --git a/pkg/ddl/util/util.go b/pkg/ddl/util/util.go index 4da6b8318ff69..5e6f0e170cf2f 100644 --- a/pkg/ddl/util/util.go +++ b/pkg/ddl/util/util.go @@ -286,6 +286,54 @@ func DeleteKeyFromEtcd(key string, etcdCli *clientv3.Client, retryCnt int, timeo return errors.Trace(err) } +// PutKVToEtcdMono puts key value to etcd monotonously. +// etcdCli is client of etcd. +// retryCnt is retry time when an error occurs. +// opts are configures of etcd Operations. +func PutKVToEtcdMono(ctx context.Context, etcdCli *clientv3.Client, retryCnt int, key, val string, + opts ...clientv3.OpOption) error { + var err error + for i := 0; i < retryCnt; i++ { + if err = ctx.Err(); err != nil { + return errors.Trace(err) + } + + childCtx, cancel := context.WithTimeout(ctx, KeyOpDefaultTimeout) + var resp *clientv3.GetResponse + resp, err = etcdCli.Get(childCtx, key) + if err != nil { + cancel() + logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i)) + time.Sleep(KeyOpRetryInterval) + continue + } + prevRevision := int64(0) + if len(resp.Kvs) > 0 { + prevRevision = resp.Kvs[0].ModRevision + } + + var txnResp *clientv3.TxnResponse + txnResp, err = etcdCli.Txn(childCtx). + If(clientv3.Compare(clientv3.ModRevision(key), "=", prevRevision)). + Then(clientv3.OpPut(key, val, opts...)). + Commit() + + cancel() + + if err == nil && txnResp.Succeeded { + return nil + } + + if err == nil { + err = errors.New("performing compare-and-swap during PutKVToEtcd failed") + } + + logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i)) + time.Sleep(KeyOpRetryInterval) + } + return errors.Trace(err) +} + // PutKVToEtcd puts key value to etcd. // etcdCli is client of etcd. // retryCnt is retry time when an error occurs.