Skip to content

Commit

Permalink
*: rollback schema in the tracker; fix save table checkpoint in optim…
Browse files Browse the repository at this point in the history
…istic mode (pingcap#625)
  • Loading branch information
csuzhangxc authored and Kuri-su committed Apr 28, 2020
1 parent 4904aaf commit 7ed860e
Show file tree
Hide file tree
Showing 22 changed files with 354 additions and 29 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:le
ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list"
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s"
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high],"fail to handle shard ddl %v in optimistic mode, because schema conflict detected"
ErrSyncerFailpoint,[code=36063:class=sync-unit:scope=internal:level=low],"failpoint specified error"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
4 changes: 4 additions & 0 deletions cmd/dm-master/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package main
// Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc

import (
"fmt"
"os"
"strings"
"testing"
"time"
)

func TestRunMain(t *testing.T) {
fmt.Println("dm-master startup", time.Now())
var args []string
for _, arg := range os.Args {
switch {
Expand All @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) {

os.Args = args
main()
fmt.Println("dm-master exit", time.Now())
}
4 changes: 4 additions & 0 deletions cmd/dm-syncer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package main
// Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc

import (
"fmt"
"os"
"strings"
"testing"
"time"
)

func TestRunMain(t *testing.T) {
fmt.Println("dm-syncer startup", time.Now())
var args []string
for _, arg := range os.Args {
switch {
Expand All @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) {

os.Args = args
main()
fmt.Println("dm-syncer exit", time.Now())
}
4 changes: 4 additions & 0 deletions cmd/dm-tracer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package main
// Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc

import (
"fmt"
"os"
"strings"
"testing"
"time"
)

func TestRunMain(t *testing.T) {
fmt.Println("dm-tracer startup", time.Now())
var args []string
for _, arg := range os.Args {
switch {
Expand All @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) {

os.Args = args
main()
fmt.Println("dm-tracer exit", time.Now())
}
4 changes: 4 additions & 0 deletions cmd/dm-worker/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main
// Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc

import (
"fmt"
"os"
"strings"
"testing"
Expand All @@ -28,6 +29,7 @@ import (
)

func TestRunMain(t *testing.T) {
fmt.Println("dm-worker startup", time.Now())
var (
args []string
exit = make(chan int)
Expand Down Expand Up @@ -67,8 +69,10 @@ func TestRunMain(t *testing.T) {

select {
case <-waitCh:
fmt.Println("dm-worker exit", time.Now())
return
case <-exit:
fmt.Println("dm-worker exit", time.Now())
return
}
}
4 changes: 2 additions & 2 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func (t *testScheduler) TestRestartScheduler(c *C) {
return len(bounds) == 1 && bounds[0] == sourceID1
}), IsTrue)
checkSourceBoundCh := func() {
time.Sleep(300 * time.Millisecond)
time.Sleep(time.Second)
c.Assert(sourceBoundCh, HasLen, 1)
sourceBound := <-sourceBoundCh
sourceBound.Revision = 0
Expand Down Expand Up @@ -857,7 +857,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) {
select {
case err := <-workerErrCh:
c.Assert(err, Equals, etcdErrCompacted)
case <-time.After(300 * time.Millisecond):
case <-time.After(time.Second):
c.Fatal("fail to get etcd error compacted")
}

Expand Down
7 changes: 7 additions & 0 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task))
if err != nil {
// TODO: add & update metrics.
// FIXME: the following case is not supported automatically now, try to support it later.
// - the lock become synced, and `done` for `exec` operation received.
// - put `skip` operation for non-owners and the lock is still not resolved.
// - another new DDL from the old owner received and TrySync again with an error returned.
// after the old lock resolved, the new DDL from the old owner will NOT be handled again,
// then the lock will be block because the Pessimist thinks missing DDL from some sources.
// now, we need to `pause-task` and `resume-task` to let DM-workers put DDL again to trigger the process.
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
continue
} else if !synced {
Expand Down
9 changes: 7 additions & 2 deletions pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ func (s *trackerSuite) TestDDL(c *C) {
c.Assert(ti.Columns[2].Name.L, Equals, "c")
c.Assert(ti.Columns[2].IsGenerated(), IsFalse)

// Verify the table info not changed (pointer equal) when getting again.
ti2, err := tracker.GetTable("testdb", "foo")
c.Assert(err, IsNil)
c.Assert(ti, Equals, ti2)

// Drop one column from the table.
err = tracker.Exec(ctx, "testdb", "alter table foo drop column b")
c.Assert(err, IsNil)

// Verify that 2 columns remain.
ti2, err := tracker.GetTable("testdb", "foo")
ti2, err = tracker.GetTable("testdb", "foo")
c.Assert(err, IsNil)
c.Assert(ti, Not(Equals), ti2)
c.Assert(ti, Not(Equals), ti2) // changed (not pointer equal) after applied DDL.
c.Assert(ti2.Columns, HasLen, 2)
c.Assert(ti2.Columns[0].Name.L, Equals, "a")
c.Assert(ti2.Columns[0].IsGenerated(), IsFalse)
Expand Down
2 changes: 2 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ const (
codeSyncerUnitGenBWList
codeSyncerUnitHandleDDLFailed
codeSyncerShardDDLConflict
codeSyncerFailpoint
)

// DM-master error code
Expand Down Expand Up @@ -877,6 +878,7 @@ var (
ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list")
ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s")
ErrSyncerShardDDLConflict = New(codeSyncerShardDDLConflict, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle shard ddl %v in optimistic mode, because schema conflict detected")
ErrSyncerFailpoint = New(codeSyncerFailpoint, ClassSyncUnit, ScopeInternal, LevelLow, "failpoint specified error")

// DM-master error
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid")
Expand Down
33 changes: 25 additions & 8 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,25 @@ func (b *binlogPoint) flush() {
b.flushedTI = b.ti
}

func (b *binlogPoint) rollback() (isSchemaChanged bool) {
func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) {
b.Lock()
defer b.Unlock()
b.location = b.flushedLocation
if isSchemaChanged = b.ti != b.flushedTI; isSchemaChanged {
b.location = b.flushedLocation.Clone()
if b.ti == nil {
return // for global checkpoint, no need to rollback the schema.
}

// NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly,
// and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now.
trackedTi, _ := schemaTracker.GetTable(schema, b.ti.Name.O) // ignore the returned error, only compare `trackerTi` is enough.
// may three versions of schema exist:
// - the one tracked in the TiDB-with-mockTiKV.
// - the one in the checkpoint but not flushed.
// - the one in the checkpoint and flushed.
// if any of them are not equal, then we rollback them:
// - set the one in the checkpoint but not flushed to the one flushed.
// - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now)
if isSchemaChanged = (trackedTi != b.ti) || (b.ti != b.flushedTI); isSchemaChanged {
b.ti = b.flushedTI
}
return
Expand Down Expand Up @@ -314,7 +328,7 @@ func (cp *RemoteCheckPoint) saveTablePoint(sourceSchema, sourceTable string, loc
}

// we save table checkpoint while we meet DDL or DML
cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("loaction", location), zap.String("schema", sourceSchema), zap.String("table", sourceTable))
cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("location", location), zap.String("schema", sourceSchema), zap.String("table", sourceTable))
mSchema, ok := cp.points[sourceSchema]
if !ok {
mSchema = make(map[string]*binlogPoint)
Expand Down Expand Up @@ -397,6 +411,7 @@ func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string,
return true
}
oldLocation := point.MySQLLocation()
cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation))

if gte {
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) >= 0
Expand Down Expand Up @@ -530,15 +545,17 @@ func (cp *RemoteCheckPoint) CheckGlobalPoint() bool {
func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) {
cp.RLock()
defer cp.RUnlock()
cp.globalPoint.rollback()
cp.globalPoint.rollback(schemaTracker, "")
for schema, mSchema := range cp.points {
for table, point := range mSchema {
logger := cp.logCtx.L().WithFields(zap.String("schema", schema), zap.String("table", table))
logger.Info("rollback checkpoint", log.WrapStringerField("checkpoint", point))
if point.rollback() {
logger.Debug("try to rollback checkpoint", log.WrapStringerField("checkpoint", point))
from := point.MySQLLocation()
if point.rollback(schemaTracker, schema) {
logger.Info("rollback checkpoint", zap.Stringer("from", from), zap.Stringer("to", point.FlushedMySQLLocation()))
// schema changed
if err := schemaTracker.DropTable(schema, table); err != nil {
logger.Debug("failed to drop table from schema tracker", log.ShortError(err))
logger.Warn("failed to drop table from schema tracker", log.ShortError(err))
}
if point.ti != nil {
// TODO: Figure out how to recover from errors.
Expand Down
26 changes: 18 additions & 8 deletions syncer/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package syncer

import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/filter"
Expand Down Expand Up @@ -48,10 +49,13 @@ func (s *Syncer) handleQueryEventOptimistic(
ev *replication.QueryEvent, ec eventContext,
needHandleDDLs []string, needTrackDDLs []trackedDDL,
onlineDDLTableNames map[string]*filter.Table) error {
// wait previous DMLs to be replicated
if err := s.flushJobs(); err != nil {
return err
}
// interrupted after flush old checkpoint and before track DDL.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err := handleFlushCheckpointStage(1, val.(int), "before track DDL")
if err != nil {
failpoint.Return(err)
}
})

var (
upSchema string
Expand Down Expand Up @@ -92,10 +96,8 @@ func (s *Syncer) handleQueryEventOptimistic(
return err
}

s.tctx.L().Info("save table checkpoint", zap.String("event", "query"),
zap.String("schema", upSchema), zap.String("table", upTable),
zap.Strings("ddls", needHandleDDLs), log.WrapStringerField("location", ec.currentLocation))
s.saveTablePoint(upSchema, upTable, ec.currentLocation.Clone())
// in optimistic mode, don't `saveTablePoint` before execute DDL,
// because it has no `UnresolvedTables` to prevent the flush of this checkpoint.

info := s.optimist.ConstructInfo(upSchema, upTable, downSchema, downTable, needHandleDDLs, tiBefore, tiAfter)

Expand Down Expand Up @@ -159,6 +161,14 @@ func (s *Syncer) handleQueryEventOptimistic(
needHandleDDLs = appliedSQLs // maybe nil
}

// interrupted after track DDL and before execute DDL.
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(2, val.(int), "before execute DDL")
if err != nil {
failpoint.Return(err)
}
})

ddlInfo := &shardingDDLInfo{
name: needTrackDDLs[0].tableNames[0][0].String(),
tableNames: needTrackDDLs[0].tableNames,
Expand Down
Loading

0 comments on commit 7ed860e

Please sign in to comment.