Skip to content

Commit

Permalink
ddl: make sure put key into ETCD monotonously (pingcap#52381) (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 28, 2024
1 parent cbda206 commit 389e73c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/ddl/syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_test(
"syncer_test.go",
],
flaky = True,
shard_count = 3,
deps = [
":syncer",
"//pkg/ddl",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, jobID int64
var path string
if variable.EnableMDL.Load() {
path = fmt.Sprintf("%s/%d/%s", util.DDLAllSchemaVersionsByJob, jobID, s.ddlID)
err = util.PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver)
err = util.PutKVToEtcdMono(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver)
} else {
path = s.selfSchemaVerPath
err = util.PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, path, ver,
Expand Down
52 changes: 52 additions & 0 deletions pkg/ddl/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"runtime"
"strconv"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -182,3 +184,53 @@ func checkRespKV(t *testing.T, kvCount int, key, val string, kvs ...*mvccpb.KeyV
require.Equal(t, key, string(kv.Key))
require.Equal(t, val, string(kv.Value))
}

func TestPutKVToEtcdMono(t *testing.T) {
integration.BeforeTestExternal(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
cli := cluster.RandClient()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1))
require.NoError(t, err)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(2))
require.NoError(t, err)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(3))
require.NoError(t, err)

eg := util.NewWaitGroupEnhancedWrapper("", nil, false)

var errCount atomic.Int64
for i := 0; i < 30; i++ {
eg.Run(func() {
err := util2.PutKVToEtcdMono(ctx, cli, 1, "testKey", strconv.Itoa(5))
if err != nil {
errCount.Add(1)
}
}, fmt.Sprintf("test_%v", i))
}
// PutKVToEtcdMono should be conflicted and get errors.
eg.Wait()
require.True(t, errCount.Load() > 0)

errCount.Store(0)
eg = util.NewWaitGroupEnhancedWrapper("", nil, false)
for i := 0; i < 30; i++ {
eg.Run(func() {
err := util2.PutKVToEtcd(ctx, cli, 1, "testKey", strconv.Itoa(5))
if err != nil {
errCount.Add(1)
}
}, fmt.Sprintf("test_%v", i))
}
eg.Wait()
require.True(t, errCount.Load() == 0)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1))
require.NoError(t, err)
}
48 changes: 48 additions & 0 deletions pkg/ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,54 @@ func DeleteKeyFromEtcd(key string, etcdCli *clientv3.Client, retryCnt int, timeo
return errors.Trace(err)
}

// PutKVToEtcdMono puts key value to etcd monotonously.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
// opts are configures of etcd Operations.
func PutKVToEtcdMono(ctx context.Context, etcdCli *clientv3.Client, retryCnt int, key, val string,
opts ...clientv3.OpOption) error {
var err error
for i := 0; i < retryCnt; i++ {
if err = ctx.Err(); err != nil {
return errors.Trace(err)
}

childCtx, cancel := context.WithTimeout(ctx, KeyOpDefaultTimeout)
var resp *clientv3.GetResponse
resp, err = etcdCli.Get(childCtx, key)
if err != nil {
cancel()
logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i))
time.Sleep(KeyOpRetryInterval)
continue
}
prevRevision := int64(0)
if len(resp.Kvs) > 0 {
prevRevision = resp.Kvs[0].ModRevision
}

var txnResp *clientv3.TxnResponse
txnResp, err = etcdCli.Txn(childCtx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", prevRevision)).
Then(clientv3.OpPut(key, val, opts...)).
Commit()

cancel()

if err == nil && txnResp.Succeeded {
return nil
}

if err == nil {
err = errors.New("performing compare-and-swap during PutKVToEtcd failed")
}

logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i))
time.Sleep(KeyOpRetryInterval)
}
return errors.Trace(err)
}

// PutKVToEtcd puts key value to etcd.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
Expand Down

0 comments on commit 389e73c

Please sign in to comment.