Skip to content

Commit

Permalink
syncer(dm): discard checkpoint snapshot when syncer reset (pingcap#8378
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 12, 2023
1 parent 3ad19a3 commit 416ed67
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
16 changes: 14 additions & 2 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ type CheckPoint interface {
// Snapshot make a snapshot of current checkpoint. If returns nil, it means nothing has changed since last call.
Snapshot(isSyncFlush bool) *SnapshotInfo

// DiscardPendingSnapshots discards all pending snapshots. It's used when we create a snapshot but are unable to
// call FlushPointsExcept() to flush the snapshot due to some error.
DiscardPendingSnapshots()

// FlushPointsExcept flushes the global checkpoint and tables'
// checkpoints except exceptTables, it also flushes SQLs with Args providing
// by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only.
Expand Down Expand Up @@ -369,7 +373,7 @@ type RemoteCheckPoint struct {
needFlushSafeModeExitPoint atomic.Bool

logCtx *tcontext.Context
// these fields are used for async flush checkpoint

snapshots []*remoteCheckpointSnapshot
snapshotSeq int
}
Expand Down Expand Up @@ -456,6 +460,14 @@ func (cp *RemoteCheckPoint) Snapshot(isSyncFlush bool) *SnapshotInfo {
}
}

// DiscardPendingSnapshots discard all pending snapshots.
func (cp *RemoteCheckPoint) DiscardPendingSnapshots() {
cp.Lock()
defer cp.Unlock()

cp.snapshots = nil
}

// Init implements CheckPoint.Init.
func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) (err error) {
var db *conn.BaseDB
Expand Down Expand Up @@ -700,7 +712,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(
cp.Lock()

if len(cp.snapshots) == 0 || cp.snapshots[0].id != snapshotID {
cp.logCtx.Logger.DPanic("snapshot not found", zap.Int("id", snapshotID))
return errors.Errorf("snapshot %d not found", snapshotID)
}
snapshotCp := cp.snapshots[0]
cp.snapshots = cp.snapshots[1:]
Expand Down
10 changes: 6 additions & 4 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/go-mysql-org/go-mysql/mysql"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -170,8 +169,11 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectBegin()
s.mock.ExpectExec("(162)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", "", 0, "", "null", true).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.FlushPointsExcept(tctx, cp.Snapshot(true).id, nil, nil, nil)
c.Log(errors.ErrorStack(err))
// Create a new snapshot, and discard it, then create a new snapshot again.
cp.Snapshot(true)
cp.DiscardPendingSnapshots()
snap := cp.Snapshot(true)
err = cp.FlushPointsExcept(tctx, snap.id, nil, nil, nil)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos1)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1)
Expand Down Expand Up @@ -268,7 +270,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {

// should flush because checkpoint hasn't been updated before (cp.globalPointCheckOrSaveTime.IsZero() == true).
snapshot := cp.Snapshot(true)
c.Assert(snapshot.id, Equals, 3)
c.Assert(snapshot.id, Equals, 4)

s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx)
Expand Down
1 change: 1 addition & 0 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ func (s *Syncer) reset() {
}
// create new job chans
s.newJobChans()
s.checkpoint.DiscardPendingSnapshots()
s.checkpointFlushWorker = &checkpointFlushWorker{
input: make(chan *checkpointFlushTask, 16),
cp: s.checkpoint,
Expand Down

0 comments on commit 416ed67

Please sign in to comment.