Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#45330
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Jul 14, 2023
1 parent 905a155 commit ed9193b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 3 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//errs",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
32 changes: 29 additions & 3 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,37 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
return nil, err
}

<<<<<<< HEAD:br/pkg/lightning/restore/checksum.go
func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
=======
var retryGetTSInterval = time.Second

// Checksum implements the ChecksumManager interface.
func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
>>>>>>> 04f6570f1a7 (lightning: retry for leader change error when GetTS (#44478) (#44856) (#45330)):br/pkg/lightning/backend/local/checksum.go
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
var (
physicalTS, logicalTS int64
err error
retryTime int
)
physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx)
for err != nil {
if !pd.IsLeaderChange(errors.Cause(err)) {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
retryTime++
if retryTime%60 == 0 {
log.FromContext(ctx).Warn("fetch tso from pd failed and retrying",
zap.Int("retryTime", retryTime),
zap.Error(err))
}
select {
case <-ctx.Done():
err = ctx.Err()
case <-time.After(retryGetTSInterval):
physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx)
}
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -198,6 +199,18 @@ func TestDoChecksumWithTikv(t *testing.T) {
require.Zero(t, checksumExec.manager.currentTS)
require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS))
}

// test PD leader change error
backup := retryGetTSInterval
retryGetTSInterval = time.Millisecond
t.Cleanup(func() {
retryGetTSInterval = backup
})
pdClient.leaderChanging = true
kvClient.maxErrCount = 0
checksumExec := &TiKVChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient}
_, err := checksumExec.Checksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
require.NoError(t, err)
}

func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {
Expand Down Expand Up @@ -233,6 +246,7 @@ type testPDClient struct {
count atomic.Int32
gcSafePoint []safePointTTL
logicalTSCounter atomic.Uint64
leaderChanging bool
}

func (c *testPDClient) currentSafePoint() uint64 {
Expand All @@ -249,6 +263,9 @@ func (c *testPDClient) currentSafePoint() uint64 {

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
physicalTS := time.Now().UnixMilli()
if c.leaderChanging && physicalTS%2 == 0 {
return 0, 0, errors.WithStack(errs.ErrClientTSOStreamClosed)
}
logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc())
return physicalTS, logicalTS, nil
}
Expand Down

0 comments on commit ed9193b

Please sign in to comment.