Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

checkpoint: flush global checkpoint when first time flush checkpoint #758

Merged
merged 4 commits into from
Jun 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error {
}

cp.globalPoint = newBinlogPoint(binlog.NewLocation(cp.cfg.Flavor), binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID)
cp.globalPointSaveTime = time.Time{}
cp.points = make(map[string]map[string]*binlogPoint)

return nil
Expand Down Expand Up @@ -452,7 +453,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl
sqls := make([]string, 0, 100)
args := make([][]interface{}, 0, 100)

if cp.globalPoint.outOfDate() {
if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() {
locationG := cp.GlobalPoint()
sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, nil, true)
sqls = append(sqls, sqlG)
Expand Down
39 changes: 27 additions & 12 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,10 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
s.cfg.Dir = oldDir
}()

// try load from mydumper's output
pos1 := mysql.Position{
Name: "mysql-bin.000003",
Pos: 1943,
}
dir, err := ioutil.TempDir("", "test_global_checkpoint")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

filename := filepath.Join(dir, "metadata")
err = ioutil.WriteFile(filename, []byte(
fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %d\n\tGTID:\n\nSHOW SLAVE STATUS:\n\tHost: %s\n\tLog: %s\n\tPos: %d\n\tGTID:\n\n", pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000)),
0644)
c.Assert(err, IsNil)
s.cfg.Mode = config.ModeAll
s.cfg.Dir = dir

s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
Expand Down Expand Up @@ -252,6 +240,33 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, binlog.MinPosition)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, binlog.MinPosition)

// try load from mydumper's output
dir, err := ioutil.TempDir("", "test_global_checkpoint")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

filename := filepath.Join(dir, "metadata")
err = ioutil.WriteFile(filename, []byte(
fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %d\n\tGTID:\n\nSHOW SLAVE STATUS:\n\tHost: %s\n\tLog: %s\n\tPos: %d\n\tGTID:\n\n", pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000)),
0644)
c.Assert(err, IsNil)
s.cfg.Mode = config.ModeAll
s.cfg.Dir = dir
cp.LoadMeta()

// should flush because globalPointSaveTime is zero
s.mock.ExpectBegin()
s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos1)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1)

}

func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
Expand Down
15 changes: 15 additions & 0 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,10 +1061,25 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
c.Assert(key, Equals, "b")

// will detect casuality and add a flush job
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
dbConn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}
syncer.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background())

mock.ExpectBegin()
mock.ExpectExec(".*INSERT INTO .* VALUES.* ON DUPLICATE KEY UPDATE.*").WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
key, err = syncer.resolveCasuality([]string{"a", "b"})
c.Assert(err, IsNil)
c.Assert(key, Equals, "a")

if err := mock.ExpectationsWereMet(); err != nil {
c.Errorf("checkpoint db unfulfilled expectations: %s", err)
}

wg.Wait()
}

Expand Down