Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: rollback schema in the tracker; fix save table checkpoint in optimistic mode #625

Merged
merged 27 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bd84324
*: rollback schema in the tracker
csuzhangxc Apr 21, 2020
1286e41
*: add GetTable equal ut; fix go vet
csuzhangxc Apr 21, 2020
aa14a40
*: add schema tracker revert for pessimistic mode (some cases still n…
csuzhangxc Apr 21, 2020
ea14369
*: fix flush jobs for pessimistic shard DDL
csuzhangxc Apr 22, 2020
16b32fc
*: fix for sharding case
csuzhangxc Apr 22, 2020
2217f4d
*: fix for incremental case
csuzhangxc Apr 22, 2020
82af0aa
sycner: try fix checkpoint flush
csuzhangxc Apr 22, 2020
e554bf2
*: add interrupt for optimistic mode
csuzhangxc Apr 22, 2020
8e38eef
syncer: refine code
csuzhangxc Apr 22, 2020
98801c3
*: debug sharding case
csuzhangxc Apr 22, 2020
2055fd0
tracker: try don't to change schema lease
csuzhangxc Apr 22, 2020
ede77a1
*: add debug log
csuzhangxc Apr 22, 2020
2587136
*: add debug log
csuzhangxc Apr 22, 2020
b0c9be6
*: add debug log
csuzhangxc Apr 22, 2020
739f102
*: add debug log
csuzhangxc Apr 22, 2020
271b019
sharding: wait paused before resume
csuzhangxc Apr 23, 2020
930cd71
sharding: fix comment
csuzhangxc Apr 23, 2020
ccd88d4
*: refine log and comment
csuzhangxc Apr 23, 2020
da2fbf9
*: add startup/exit time for test binaries
csuzhangxc Apr 23, 2020
773d03c
*: remove startup/exit time for dmctl.test
csuzhangxc Apr 23, 2020
4deb977
*: increase watch timeout in unit tests
csuzhangxc Apr 23, 2020
02f5101
Merge branch 'master' into revert-tracker
csuzhangxc Apr 24, 2020
36a3f0d
syncer: fix `saveTablePoint` in optimistic mode
csuzhangxc Apr 24, 2020
cce2714
scheduler: increase wait timeout
csuzhangxc Apr 24, 2020
8fffc43
Merge branch 'master' into revert-tracker
csuzhangxc Apr 24, 2020
0c5d865
Merge branch 'master' into revert-tracker
csuzhangxc Apr 27, 2020
846c6b5
sycner: address comments
csuzhangxc Apr 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:le
ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list"
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s"
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high],"fail to handle shard ddl %v in optimistic mode, because schema conflict detected"
ErrSyncerFailpoint,[code=36063:class=sync-unit:scope=internal:level=low],"failpoint specified error"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
4 changes: 4 additions & 0 deletions cmd/dm-master/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package main
// Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc

import (
"fmt"
"os"
"strings"
"testing"
"time"
)

func TestRunMain(t *testing.T) {
fmt.Println("dm-master startup", time.Now())
var args []string
for _, arg := range os.Args {
switch {
Expand All @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) {

os.Args = args
main()
fmt.Println("dm-master exit", time.Now())
}
4 changes: 4 additions & 0 deletions cmd/dm-syncer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package main
// Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc

import (
"fmt"
"os"
"strings"
"testing"
"time"
)

func TestRunMain(t *testing.T) {
fmt.Println("dm-syncer startup", time.Now())
var args []string
for _, arg := range os.Args {
switch {
Expand All @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) {

os.Args = args
main()
fmt.Println("dm-syncer exit", time.Now())
}
4 changes: 4 additions & 0 deletions cmd/dm-tracer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package main
// Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc

import (
"fmt"
"os"
"strings"
"testing"
"time"
)

func TestRunMain(t *testing.T) {
fmt.Println("dm-tracer startup", time.Now())
var args []string
for _, arg := range os.Args {
switch {
Expand All @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) {

os.Args = args
main()
fmt.Println("dm-tracer exit", time.Now())
}
4 changes: 4 additions & 0 deletions cmd/dm-worker/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main
// Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc

import (
"fmt"
"os"
"strings"
"testing"
Expand All @@ -28,6 +29,7 @@ import (
)

func TestRunMain(t *testing.T) {
fmt.Println("dm-worker startup", time.Now())
var (
args []string
exit = make(chan int)
Expand Down Expand Up @@ -67,8 +69,10 @@ func TestRunMain(t *testing.T) {

select {
case <-waitCh:
fmt.Println("dm-worker exit", time.Now())
return
case <-exit:
fmt.Println("dm-worker exit", time.Now())
return
}
}
7 changes: 7 additions & 0 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task))
if err != nil {
// TODO: add & update metrics.
// FIXME: the following case is not supported automatically now, try to support it later.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// - the lock become synced, and `done` for `exec` operation received.
// - put `skip` operation for non-owners and the lock is still not resolved.
// - another new DDL from the old owner received and TrySync again with an error returned.
// after the old lock resolved, the new DDL from the old owner will NOT be handled again,
// then the lock will be block because the Pessimist thinks missing DDL from some sources.
// now, we need to `pause-task` and `resume-task` to let DM-workers put DDL again to trigger the process.
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
continue
} else if !synced {
Expand Down
9 changes: 7 additions & 2 deletions pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ func (s *trackerSuite) TestDDL(c *C) {
c.Assert(ti.Columns[2].Name.L, Equals, "c")
c.Assert(ti.Columns[2].IsGenerated(), IsFalse)

// Verify the table info not changed (pointer equal) when getting again.
ti2, err := tracker.GetTable("testdb", "foo")
c.Assert(err, IsNil)
c.Assert(ti, Equals, ti2)

// Drop one column from the table.
err = tracker.Exec(ctx, "testdb", "alter table foo drop column b")
c.Assert(err, IsNil)

// Verify that 2 columns remain.
ti2, err := tracker.GetTable("testdb", "foo")
ti2, err = tracker.GetTable("testdb", "foo")
c.Assert(err, IsNil)
c.Assert(ti, Not(Equals), ti2)
c.Assert(ti, Not(Equals), ti2) // changed (not pointer equal) after applied DDL.
c.Assert(ti2.Columns, HasLen, 2)
c.Assert(ti2.Columns[0].Name.L, Equals, "a")
c.Assert(ti2.Columns[0].IsGenerated(), IsFalse)
Expand Down
2 changes: 2 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ const (
codeSyncerUnitGenBWList
codeSyncerUnitHandleDDLFailed
codeSyncerShardDDLConflict
codeSyncerFailpoint
)

// DM-master error code
Expand Down Expand Up @@ -877,6 +878,7 @@ var (
ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list")
ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s")
ErrSyncerShardDDLConflict = New(codeSyncerShardDDLConflict, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle shard ddl %v in optimistic mode, because schema conflict detected")
ErrSyncerFailpoint = New(codeSyncerFailpoint, ClassSyncUnit, ScopeInternal, LevelLow, "failpoint specified error")

// DM-master error
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid")
Expand Down
33 changes: 25 additions & 8 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,25 @@ func (b *binlogPoint) flush() {
b.flushedTI = b.ti
}

func (b *binlogPoint) rollback() (isSchemaChanged bool) {
func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) {
b.Lock()
defer b.Unlock()
b.location = b.flushedLocation
if isSchemaChanged = b.ti != b.flushedTI; isSchemaChanged {
b.location = b.flushedLocation.Clone()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also .Clone.

if b.ti == nil {
return // for global checkpoint, no need to rollback the schema.
}

// NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly,
// and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now.
trackedTi, _ := schemaTracker.GetTable(schema, b.ti.Name.O) // ignore the returned error, only compare `trackerTi` is enough.
// may three versions of schema exist:
// - the one tracked in the TiDB-with-mockTiKV.
// - the one in the checkpoint but not flushed.
// - the one in the checkpoint and flushed.
// if any of them are not equal, then we rollback them:
// - set the one in the checkpoint but not flushed to the one flushed.
// - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now)
if isSchemaChanged = trackedTi != b.ti || b.ti != b.flushedTI; isSchemaChanged {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about adding some ()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 846c6b5.

b.ti = b.flushedTI
}
return
Expand Down Expand Up @@ -314,7 +328,7 @@ func (cp *RemoteCheckPoint) saveTablePoint(sourceSchema, sourceTable string, loc
}

// we save table checkpoint while we meet DDL or DML
cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("loaction", location), zap.String("schema", sourceSchema), zap.String("table", sourceTable))
cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("location", location), zap.String("schema", sourceSchema), zap.String("table", sourceTable))
mSchema, ok := cp.points[sourceSchema]
if !ok {
mSchema = make(map[string]*binlogPoint)
Expand Down Expand Up @@ -397,6 +411,7 @@ func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string,
return true
}
oldLocation := point.MySQLLocation()
cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation))

if gte {
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) >= 0
Expand Down Expand Up @@ -530,15 +545,17 @@ func (cp *RemoteCheckPoint) CheckGlobalPoint() bool {
func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) {
cp.RLock()
defer cp.RUnlock()
cp.globalPoint.rollback()
cp.globalPoint.rollback(schemaTracker, "")
for schema, mSchema := range cp.points {
for table, point := range mSchema {
logger := cp.logCtx.L().WithFields(zap.String("schema", schema), zap.String("table", table))
logger.Info("rollback checkpoint", log.WrapStringerField("checkpoint", point))
if point.rollback() {
logger.Debug("try to rollback checkpoint", log.WrapStringerField("checkpoint", point))
from := point.MySQLLocation()
if point.rollback(schemaTracker, schema) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also see #629.

logger.Info("rollback checkpoint", zap.Stringer("from", from), zap.Stringer("to", point.FlushedMySQLLocation()))
// schema changed
if err := schemaTracker.DropTable(schema, table); err != nil {
logger.Debug("failed to drop table from schema tracker", log.ShortError(err))
logger.Warn("failed to drop table from schema tracker", log.ShortError(err))
}
if point.ti != nil {
// TODO: Figure out how to recover from errors.
Expand Down
20 changes: 16 additions & 4 deletions syncer/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package syncer

import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/filter"
Expand Down Expand Up @@ -48,10 +49,13 @@ func (s *Syncer) handleQueryEventOptimistic(
ev *replication.QueryEvent, ec eventContext,
needHandleDDLs []string, needTrackDDLs []trackedDDL,
onlineDDLTableNames map[string]*filter.Table) error {
// wait previous DMLs to be replicated
if err := s.flushJobs(); err != nil {
return err
}
// interrupted after flush old checkpoint and before track DDL.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err := handleFlushCheckpointStage(1, val.(int), "before track DDL")
if err != nil {
failpoint.Return(err)
}
})

var (
upSchema string
Expand Down Expand Up @@ -159,6 +163,14 @@ func (s *Syncer) handleQueryEventOptimistic(
needHandleDDLs = appliedSQLs // maybe nil
}

// interrupted after track DDL and before execute DDL.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(2, val.(int), "before execute DDL")
if err != nil {
failpoint.Return(err)
}
})

ddlInfo := &shardingDDLInfo{
name: needTrackDDLs[0].tableNames[0][0].String(),
tableNames: needTrackDDLs[0].tableNames,
Expand Down
67 changes: 60 additions & 7 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,13 @@ func (s *Syncer) addJob(job *job) error {
s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ExitAfterDDLBeforeFlush"))
utils.OsExit(1)
})
// interrupted after executed DDL and before save checkpoint.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err := handleFlushCheckpointStage(3, val.(int), "before save checkpoint")
if err != nil {
failpoint.Return(err)
}
})
// only save checkpoint for DDL and XID (see above)
s.saveGlobalPoint(job.location)
for sourceSchema, tbs := range job.sourceTbl {
Expand All @@ -771,12 +778,19 @@ func (s *Syncer) addJob(job *job) error {
continue
}
for _, sourceTable := range tbs {
s.saveTablePoint(sourceSchema, sourceTable, job.location)
s.saveTablePoint(sourceSchema, sourceTable, job.currentLocation)
}
}
}

if wait {
// interrupted after save checkpoint and before flush checkpoint.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err := handleFlushCheckpointStage(4, val.(int), "before flush checkpoint")
if err != nil {
failpoint.Return(err)
}
})
return s.flushCheckPoints()
}

Expand Down Expand Up @@ -1719,6 +1733,20 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
return s.recordSkipSQLsLocation(*ec.lastLocation)
}

// interrupted before flush old checkpoint.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(0, val.(int), "before flush old checkpoint")
if err != nil {
failpoint.Return(err)
}
})

// flush previous DMLs and checkpoint if needing to handle the DDL.
// NOTE: do this flush before operations on shard groups which may lead to skip a table caused by `UnresolvedTables`.
if err = s.flushJobs(); err != nil {
return err
}

if s.cfg.ShardMode == "" {
s.tctx.L().Info("start to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation))
// try apply SQL operator before addJob. now, one query event only has one DDL job, if updating to multi DDL jobs, refine this.
Expand All @@ -1731,9 +1759,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
needHandleDDLs = appliedSQLs // maybe nil
}

if err := s.flushJobs(); err != nil {
return err
}
// interrupted after flush old checkpoint and before track DDL.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(1, val.(int), "before track DDL")
if err != nil {
failpoint.Return(err)
}
})

// run trackDDL before add ddl job to make sure checkpoint can be flushed
for _, td := range needTrackDDLs {
Expand All @@ -1742,6 +1774,14 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
}
}

// interrupted after track DDL and before execute DDL.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(2, val.(int), "before execute DDL")
if err != nil {
failpoint.Return(err)
}
})

job := newDDLJob(nil, needHandleDDLs, *ec.lastLocation, *ec.currentLocation, *ec.traceID, sourceTbls)
err = s.addJobFunc(job)
if err != nil {
Expand Down Expand Up @@ -1819,9 +1859,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

s.tctx.L().Info(annotate, zap.String("event", "query"), zap.String("source", source), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Bool("in-sharding", needShardingHandle), zap.Stringer("start location", startLocation), zap.Bool("is-synced", synced), zap.Int("unsynced", remain))

if err := s.flushJobs(); err != nil {
return err
}
// interrupted after flush old checkpoint and before track DDL.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(1, val.(int), "before track DDL")
if err != nil {
failpoint.Return(err)
}
})

for _, td := range needTrackDDLs {
if err = s.trackDDL(usedSchema, td.rawSQL, td.tableNames, td.stmt, &ec); err != nil {
Expand Down Expand Up @@ -1929,6 +1973,15 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
s.tctx.L().Info("replace ddls to preset ddls by sql operator in shard mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Stringer("start location", startLocation), log.WrapStringerField("end location", ec.currentLocation))
needHandleDDLs = appliedSQLs // maybe nil
}

// interrupted after track DDL and before execute DDL.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(2, val.(int), "before execute DDL")
if err != nil {
failpoint.Return(err)
}
})

job := newDDLJob(ddlInfo, needHandleDDLs, *ec.lastLocation, *ec.currentLocation, *ec.traceID, nil)
err = s.addJobFunc(job)
if err != nil {
Expand Down
Loading