From 7ed860e414b7524efa6d23a3ebe0118ec87c4d66 Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Tue, 28 Apr 2020 15:35:32 +0800 Subject: [PATCH] *: rollback schema in the tracker; fix save table checkpoint in optimistic mode (#625) --- _utils/terror_gen/errors_release.txt | 1 + cmd/dm-master/main_test.go | 4 ++ cmd/dm-syncer/main_test.go | 4 ++ cmd/dm-tracer/main_test.go | 4 ++ cmd/dm-worker/main_test.go | 4 ++ dm/master/scheduler/scheduler_test.go | 4 +- dm/master/shardddl/pessimist.go | 7 ++ pkg/schema/tracker_test.go | 9 ++- pkg/terror/error_list.go | 2 + syncer/checkpoint.go | 33 ++++++--- syncer/optimist.go | 26 ++++--- syncer/syncer.go | 67 +++++++++++++++++-- syncer/test_injector.go | 47 +++++++++++++ tests/incremental_mode/conf/source1.toml | 3 + tests/incremental_mode/run.sh | 51 ++++++++++++++ .../conf/source1.toml | 3 + tests/sequence_sharding_optimistic/run.sh | 53 +++++++++++++++ tests/sharding/conf/source1.toml | 3 + tests/sharding/data/db1.increment.sql | 2 +- tests/sharding/data/db2.increment.sql | 1 + tests/sharding/data/db2.prepare.sql | 2 +- tests/sharding/run.sh | 53 +++++++++++++++ 22 files changed, 354 insertions(+), 29 deletions(-) create mode 100644 syncer/test_injector.go diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index de802c0261..3b2f0e26ba 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -292,6 +292,7 @@ ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:le ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list" ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s" ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high],"fail to handle shard ddl %v in optimistic mode, because schema conflict detected" +ErrSyncerFailpoint,[code=36063:class=sync-unit:scope=internal:level=low],"failpoint specified error" ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid" ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported" ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid" diff --git a/cmd/dm-master/main_test.go b/cmd/dm-master/main_test.go index b5d63be728..9a7aaa0133 100644 --- a/cmd/dm-master/main_test.go +++ b/cmd/dm-master/main_test.go @@ -16,12 +16,15 @@ package main // Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc import ( + "fmt" "os" "strings" "testing" + "time" ) func TestRunMain(t *testing.T) { + fmt.Println("dm-master startup", time.Now()) var args []string for _, arg := range os.Args { switch { @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) { os.Args = args main() + fmt.Println("dm-master exit", time.Now()) } diff --git a/cmd/dm-syncer/main_test.go b/cmd/dm-syncer/main_test.go index b5d63be728..8103a5abb0 100644 --- a/cmd/dm-syncer/main_test.go +++ b/cmd/dm-syncer/main_test.go @@ -16,12 +16,15 @@ package main // Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc import ( + "fmt" "os" "strings" "testing" + "time" ) func TestRunMain(t *testing.T) { + fmt.Println("dm-syncer startup", time.Now()) var args []string for _, arg := range os.Args { switch { @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) { os.Args = args main() + fmt.Println("dm-syncer exit", time.Now()) } diff --git a/cmd/dm-tracer/main_test.go b/cmd/dm-tracer/main_test.go index b5d63be728..dd253952c1 100644 --- a/cmd/dm-tracer/main_test.go +++ b/cmd/dm-tracer/main_test.go @@ -16,12 +16,15 @@ package main // Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc import ( + "fmt" "os" "strings" "testing" + "time" ) func TestRunMain(t *testing.T) { + fmt.Println("dm-tracer startup", time.Now()) var args []string for _, arg := range os.Args { switch { @@ -34,4 +37,5 @@ func TestRunMain(t *testing.T) { os.Args = args main() + fmt.Println("dm-tracer exit", time.Now()) } diff --git a/cmd/dm-worker/main_test.go b/cmd/dm-worker/main_test.go index b4e26adf95..e52e76c8c3 100644 --- a/cmd/dm-worker/main_test.go +++ b/cmd/dm-worker/main_test.go @@ -16,6 +16,7 @@ package main // Reference: https://dzone.com/articles/measuring-integration-test-coverage-rate-in-pouchc import ( + "fmt" "os" "strings" "testing" @@ -28,6 +29,7 @@ import ( ) func TestRunMain(t *testing.T) { + fmt.Println("dm-worker startup", time.Now()) var ( args []string exit = make(chan int) @@ -67,8 +69,10 @@ func TestRunMain(t *testing.T) { select { case <-waitCh: + fmt.Println("dm-worker exit", time.Now()) return case <-exit: + fmt.Println("dm-worker exit", time.Now()) return } } diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index c8f80b6b37..b66076e747 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -715,7 +715,7 @@ func (t *testScheduler) TestRestartScheduler(c *C) { return len(bounds) == 1 && bounds[0] == sourceID1 }), IsTrue) checkSourceBoundCh := func() { - time.Sleep(300 * time.Millisecond) + time.Sleep(time.Second) c.Assert(sourceBoundCh, HasLen, 1) sourceBound := <-sourceBoundCh sourceBound.Revision = 0 @@ -857,7 +857,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) { select { case err := <-workerErrCh: c.Assert(err, Equals, etcdErrCompacted) - case <-time.After(300 * time.Millisecond): + case <-time.After(time.Second): c.Fatal("fail to get etcd error compacted") } diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index 784433c36d..929f7ccdc3 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -438,6 +438,13 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task)) if err != nil { // TODO: add & update metrics. + // FIXME: the following case is not supported automatically now, try to support it later. + // - the lock become synced, and `done` for `exec` operation received. + // - put `skip` operation for non-owners and the lock is still not resolved. + // - another new DDL from the old owner received and TrySync again with an error returned. + // after the old lock resolved, the new DDL from the old owner will NOT be handled again, + // then the lock will be block because the Pessimist thinks missing DDL from some sources. + // now, we need to `pause-task` and `resume-task` to let DM-workers put DDL again to trigger the process. p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err)) continue } else if !synced { diff --git a/pkg/schema/tracker_test.go b/pkg/schema/tracker_test.go index be6528d0ad..a003aa9dac 100644 --- a/pkg/schema/tracker_test.go +++ b/pkg/schema/tracker_test.go @@ -67,14 +67,19 @@ func (s *trackerSuite) TestDDL(c *C) { c.Assert(ti.Columns[2].Name.L, Equals, "c") c.Assert(ti.Columns[2].IsGenerated(), IsFalse) + // Verify the table info not changed (pointer equal) when getting again. + ti2, err := tracker.GetTable("testdb", "foo") + c.Assert(err, IsNil) + c.Assert(ti, Equals, ti2) + // Drop one column from the table. err = tracker.Exec(ctx, "testdb", "alter table foo drop column b") c.Assert(err, IsNil) // Verify that 2 columns remain. - ti2, err := tracker.GetTable("testdb", "foo") + ti2, err = tracker.GetTable("testdb", "foo") c.Assert(err, IsNil) - c.Assert(ti, Not(Equals), ti2) + c.Assert(ti, Not(Equals), ti2) // changed (not pointer equal) after applied DDL. c.Assert(ti2.Columns, HasLen, 2) c.Assert(ti2.Columns[0].Name.L, Equals, "a") c.Assert(ti2.Columns[0].IsGenerated(), IsFalse) diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 0621398ade..e3152adc1b 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -365,6 +365,7 @@ const ( codeSyncerUnitGenBWList codeSyncerUnitHandleDDLFailed codeSyncerShardDDLConflict + codeSyncerFailpoint ) // DM-master error code @@ -877,6 +878,7 @@ var ( ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list") ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s") ErrSyncerShardDDLConflict = New(codeSyncerShardDDLConflict, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle shard ddl %v in optimistic mode, because schema conflict detected") + ErrSyncerFailpoint = New(codeSyncerFailpoint, ClassSyncUnit, ScopeInternal, LevelLow, "failpoint specified error") // DM-master error ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid") diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 9a2cb5d1da..3734fc2c43 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -96,11 +96,25 @@ func (b *binlogPoint) flush() { b.flushedTI = b.ti } -func (b *binlogPoint) rollback() (isSchemaChanged bool) { +func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) { b.Lock() defer b.Unlock() - b.location = b.flushedLocation - if isSchemaChanged = b.ti != b.flushedTI; isSchemaChanged { + b.location = b.flushedLocation.Clone() + if b.ti == nil { + return // for global checkpoint, no need to rollback the schema. + } + + // NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly, + // and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now. + trackedTi, _ := schemaTracker.GetTable(schema, b.ti.Name.O) // ignore the returned error, only compare `trackerTi` is enough. + // may three versions of schema exist: + // - the one tracked in the TiDB-with-mockTiKV. + // - the one in the checkpoint but not flushed. + // - the one in the checkpoint and flushed. + // if any of them are not equal, then we rollback them: + // - set the one in the checkpoint but not flushed to the one flushed. + // - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now) + if isSchemaChanged = (trackedTi != b.ti) || (b.ti != b.flushedTI); isSchemaChanged { b.ti = b.flushedTI } return @@ -314,7 +328,7 @@ func (cp *RemoteCheckPoint) saveTablePoint(sourceSchema, sourceTable string, loc } // we save table checkpoint while we meet DDL or DML - cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("loaction", location), zap.String("schema", sourceSchema), zap.String("table", sourceTable)) + cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("location", location), zap.String("schema", sourceSchema), zap.String("table", sourceTable)) mSchema, ok := cp.points[sourceSchema] if !ok { mSchema = make(map[string]*binlogPoint) @@ -397,6 +411,7 @@ func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, return true } oldLocation := point.MySQLLocation() + cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation)) if gte { return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) >= 0 @@ -530,15 +545,17 @@ func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) { cp.RLock() defer cp.RUnlock() - cp.globalPoint.rollback() + cp.globalPoint.rollback(schemaTracker, "") for schema, mSchema := range cp.points { for table, point := range mSchema { logger := cp.logCtx.L().WithFields(zap.String("schema", schema), zap.String("table", table)) - logger.Info("rollback checkpoint", log.WrapStringerField("checkpoint", point)) - if point.rollback() { + logger.Debug("try to rollback checkpoint", log.WrapStringerField("checkpoint", point)) + from := point.MySQLLocation() + if point.rollback(schemaTracker, schema) { + logger.Info("rollback checkpoint", zap.Stringer("from", from), zap.Stringer("to", point.FlushedMySQLLocation())) // schema changed if err := schemaTracker.DropTable(schema, table); err != nil { - logger.Debug("failed to drop table from schema tracker", log.ShortError(err)) + logger.Warn("failed to drop table from schema tracker", log.ShortError(err)) } if point.ti != nil { // TODO: Figure out how to recover from errors. diff --git a/syncer/optimist.go b/syncer/optimist.go index 1b83798a3e..18d83c4cbb 100644 --- a/syncer/optimist.go +++ b/syncer/optimist.go @@ -14,6 +14,7 @@ package syncer import ( + "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/filter" @@ -48,10 +49,13 @@ func (s *Syncer) handleQueryEventOptimistic( ev *replication.QueryEvent, ec eventContext, needHandleDDLs []string, needTrackDDLs []trackedDDL, onlineDDLTableNames map[string]*filter.Table) error { - // wait previous DMLs to be replicated - if err := s.flushJobs(); err != nil { - return err - } + // interrupted after flush old checkpoint and before track DDL. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err := handleFlushCheckpointStage(1, val.(int), "before track DDL") + if err != nil { + failpoint.Return(err) + } + }) var ( upSchema string @@ -92,10 +96,8 @@ func (s *Syncer) handleQueryEventOptimistic( return err } - s.tctx.L().Info("save table checkpoint", zap.String("event", "query"), - zap.String("schema", upSchema), zap.String("table", upTable), - zap.Strings("ddls", needHandleDDLs), log.WrapStringerField("location", ec.currentLocation)) - s.saveTablePoint(upSchema, upTable, ec.currentLocation.Clone()) + // in optimistic mode, don't `saveTablePoint` before execute DDL, + // because it has no `UnresolvedTables` to prevent the flush of this checkpoint. info := s.optimist.ConstructInfo(upSchema, upTable, downSchema, downTable, needHandleDDLs, tiBefore, tiAfter) @@ -159,6 +161,14 @@ func (s *Syncer) handleQueryEventOptimistic( needHandleDDLs = appliedSQLs // maybe nil } + // interrupted after track DDL and before execute DDL. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err = handleFlushCheckpointStage(2, val.(int), "before execute DDL") + if err != nil { + failpoint.Return(err) + } + }) + ddlInfo := &shardingDDLInfo{ name: needTrackDDLs[0].tableNames[0][0].String(), tableNames: needTrackDDLs[0].tableNames, diff --git a/syncer/syncer.go b/syncer/syncer.go index a192501b90..c9c28604fa 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -752,6 +752,13 @@ func (s *Syncer) addJob(job *job) error { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ExitAfterDDLBeforeFlush")) utils.OsExit(1) }) + // interrupted after executed DDL and before save checkpoint. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err := handleFlushCheckpointStage(3, val.(int), "before save checkpoint") + if err != nil { + failpoint.Return(err) + } + }) // only save checkpoint for DDL and XID (see above) s.saveGlobalPoint(job.location) for sourceSchema, tbs := range job.sourceTbl { @@ -771,12 +778,19 @@ func (s *Syncer) addJob(job *job) error { continue } for _, sourceTable := range tbs { - s.saveTablePoint(sourceSchema, sourceTable, job.location) + s.saveTablePoint(sourceSchema, sourceTable, job.currentLocation) } } } if wait { + // interrupted after save checkpoint and before flush checkpoint. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err := handleFlushCheckpointStage(4, val.(int), "before flush checkpoint") + if err != nil { + failpoint.Return(err) + } + }) return s.flushCheckPoints() } @@ -1719,6 +1733,20 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return s.recordSkipSQLsLocation(*ec.lastLocation) } + // interrupted before flush old checkpoint. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err = handleFlushCheckpointStage(0, val.(int), "before flush old checkpoint") + if err != nil { + failpoint.Return(err) + } + }) + + // flush previous DMLs and checkpoint if needing to handle the DDL. + // NOTE: do this flush before operations on shard groups which may lead to skip a table caused by `UnresolvedTables`. + if err = s.flushJobs(); err != nil { + return err + } + if s.cfg.ShardMode == "" { s.tctx.L().Info("start to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation)) // try apply SQL operator before addJob. now, one query event only has one DDL job, if updating to multi DDL jobs, refine this. @@ -1731,9 +1759,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e needHandleDDLs = appliedSQLs // maybe nil } - if err := s.flushJobs(); err != nil { - return err - } + // interrupted after flush old checkpoint and before track DDL. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err = handleFlushCheckpointStage(1, val.(int), "before track DDL") + if err != nil { + failpoint.Return(err) + } + }) // run trackDDL before add ddl job to make sure checkpoint can be flushed for _, td := range needTrackDDLs { @@ -1742,6 +1774,14 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } } + // interrupted after track DDL and before execute DDL. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err = handleFlushCheckpointStage(2, val.(int), "before execute DDL") + if err != nil { + failpoint.Return(err) + } + }) + job := newDDLJob(nil, needHandleDDLs, *ec.lastLocation, *ec.currentLocation, *ec.traceID, sourceTbls) err = s.addJobFunc(job) if err != nil { @@ -1819,9 +1859,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info(annotate, zap.String("event", "query"), zap.String("source", source), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Bool("in-sharding", needShardingHandle), zap.Stringer("start location", startLocation), zap.Bool("is-synced", synced), zap.Int("unsynced", remain)) - if err := s.flushJobs(); err != nil { - return err - } + // interrupted after flush old checkpoint and before track DDL. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err = handleFlushCheckpointStage(1, val.(int), "before track DDL") + if err != nil { + failpoint.Return(err) + } + }) for _, td := range needTrackDDLs { if err = s.trackDDL(usedSchema, td.rawSQL, td.tableNames, td.stmt, &ec); err != nil { @@ -1929,6 +1973,15 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info("replace ddls to preset ddls by sql operator in shard mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Stringer("start location", startLocation), log.WrapStringerField("end location", ec.currentLocation)) needHandleDDLs = appliedSQLs // maybe nil } + + // interrupted after track DDL and before execute DDL. + failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { + err = handleFlushCheckpointStage(2, val.(int), "before execute DDL") + if err != nil { + failpoint.Return(err) + } + }) + job := newDDLJob(ddlInfo, needHandleDDLs, *ec.lastLocation, *ec.currentLocation, *ec.traceID, nil) err = s.addJobFunc(job) if err != nil { diff --git a/syncer/test_injector.go b/syncer/test_injector.go new file mode 100644 index 0000000000..5941ecef22 --- /dev/null +++ b/syncer/test_injector.go @@ -0,0 +1,47 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" +) + +// TestInjector is used to support inject test cases into syncer. +// In some cases, we use failpoint to control the test flow, +// but we may need to control the flow based on some previous status, +// so we add this TestInjector to record these status. +// NOTE: if HTTP for failpoint works well, then we may remove this. +type TestInjector struct { + flushCheckpointStage int +} + +var testInjector = TestInjector{} + +// handleFlushCheckpointStage handles failpoint of `FlushCheckpointStage`. +func handleFlushCheckpointStage(expectStage, maxStage int, stageStr string) error { + if testInjector.flushCheckpointStage != expectStage { + return nil + } + + log.L().Info("set FlushCheckpointStage", zap.String("failpoint", "FlushCheckpointStage"), zap.Int("stage", testInjector.flushCheckpointStage)) + if testInjector.flushCheckpointStage == maxStage { + testInjector.flushCheckpointStage = -1 // disable for following stages. + } else { + testInjector.flushCheckpointStage++ + } + return terror.ErrSyncerFailpoint.Generatef("failpoint error for FlushCheckpointStage %s", stageStr) +} diff --git a/tests/incremental_mode/conf/source1.toml b/tests/incremental_mode/conf/source1.toml index 7b08068ef2..8cf0ee574d 100644 --- a/tests/incremental_mode/conf/source1.toml +++ b/tests/incremental_mode/conf/source1.toml @@ -11,3 +11,6 @@ host = "127.0.0.1" user = "root" password = "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" port = 3306 + +[checker] +check-enable = false # disable auto resume. diff --git a/tests/incremental_mode/run.sh b/tests/incremental_mode/run.sh index 91f69cf685..dbf6795fb0 100755 --- a/tests/incremental_mode/run.sh +++ b/tests/incremental_mode/run.sh @@ -8,6 +8,8 @@ WORK_DIR=$TEST_DIR/$TEST_NAME TASK_NAME="test" function run() { + export GO_FAILPOINTS="github.com/pingcap/dm/syncer/FlushCheckpointStage=return(100)" # for all stages + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 @@ -107,7 +109,56 @@ function run() { sleep 3 dmctl_start_task $WORK_DIR/dm-task.yaml + # the task should paused by `FlushCheckpointStage` failpont before flush old checkpoint. + # `db2.increment.sql` has no DDL, so we check count of content as `1`. + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "failpoint error for FlushCheckpointStage before flush old checkpoint" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "failpoint error for FlushCheckpointStage before track DDL" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "failpoint error for FlushCheckpointStage before execute DDL" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "failpoint error for FlushCheckpointStage before save checkpoint" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "failpoint error for FlushCheckpointStage before flush checkpoint" 1 + + # resume-task to continue the sync + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + export GO_FAILPOINTS='' } cleanup_data $TEST_NAME diff --git a/tests/sequence_sharding_optimistic/conf/source1.toml b/tests/sequence_sharding_optimistic/conf/source1.toml index e9748fb168..c68b7af73a 100644 --- a/tests/sequence_sharding_optimistic/conf/source1.toml +++ b/tests/sequence_sharding_optimistic/conf/source1.toml @@ -11,3 +11,6 @@ host = "127.0.0.1" user = "root" password = "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" port = 3306 + +[checker] +check-enable = false # disable auto resume. diff --git a/tests/sequence_sharding_optimistic/run.sh b/tests/sequence_sharding_optimistic/run.sh index 879c0b1e2a..7554ca49b8 100755 --- a/tests/sequence_sharding_optimistic/run.sh +++ b/tests/sequence_sharding_optimistic/run.sh @@ -5,6 +5,7 @@ set -eux cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +task_name="sequence_sharding_optimistic" run() { run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -12,8 +13,14 @@ run() { run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + # now, for optimistic shard DDL, different sources will reach a stage often not at the same time, + # in order to simply the check and resume flow, only enable the failpoint for one DM-worker. + export GO_FAILPOINTS="github.com/pingcap/dm/syncer/FlushCheckpointStage=return(100)" # for all stages run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + export GO_FAILPOINTS='' + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT # operate mysql config to worker @@ -43,6 +50,52 @@ run() { run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + # the task should paused by `FlushCheckpointStage` failpont before flush old checkpoint. + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "failpoint error for FlushCheckpointStage before flush old checkpoint" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task $task_name"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "failpoint error for FlushCheckpointStage before track DDL" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task $task_name"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "failpoint error for FlushCheckpointStage before execute DDL" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task $task_name"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "failpoint error for FlushCheckpointStage before save checkpoint" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task $task_name"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "failpoint error for FlushCheckpointStage before flush checkpoint" 1 + + # resume-task to continue the sync + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task $task_name"\ + "\"result\": true" 3 + # use sync_diff_inspector to check data now! check_sync_diff $WORK_DIR $cur/conf/diff_config.toml } diff --git a/tests/sharding/conf/source1.toml b/tests/sharding/conf/source1.toml index 7b08068ef2..8cf0ee574d 100644 --- a/tests/sharding/conf/source1.toml +++ b/tests/sharding/conf/source1.toml @@ -11,3 +11,6 @@ host = "127.0.0.1" user = "root" password = "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" port = 3306 + +[checker] +check-enable = false # disable auto resume. diff --git a/tests/sharding/data/db1.increment.sql b/tests/sharding/data/db1.increment.sql index 5ab23ef40c..ce43200d06 100644 --- a/tests/sharding/data/db1.increment.sql +++ b/tests/sharding/data/db1.increment.sql @@ -2,8 +2,8 @@ use sharding1; insert into t1 (uid, name) values (10003, 'Buenos Aires'); update t1 set name = 'Gabriel José de la Concordia García Márquez' where `uid` = 10001; update t1 set name = 'One Hundred Years of Solitude' where name = 'Cien años de soledad'; +insert into t2 (uid, name, info) values (20013, 'Colonel', '{}'); # DML to trigger fetch schema from downstream before DDL alter table t1 add column age int; -insert into t2 (uid, name, info) values (20013, 'Colonel', '{}'); insert into t2 (uid, name, info) values (20023, 'Aureliano', '{}'); insert into t2 (uid, name, info) values (20033, 'Buendía', '{}'); alter table t2 add column age int; diff --git a/tests/sharding/data/db2.increment.sql b/tests/sharding/data/db2.increment.sql index 58e2ab7002..2cc9f2c22e 100644 --- a/tests/sharding/data/db2.increment.sql +++ b/tests/sharding/data/db2.increment.sql @@ -1,5 +1,6 @@ use sharding1; delete from t3 where name = 'Santa Sofía de la Piedad'; +insert into t2 (uid, name, info) values (40001, 'Amaranta', '{"age": 0}'); # DML to trigger fetch schema from downstream before DDL alter table t2 add column age int; update t2 set uid = uid + 10000; alter table t3 add column age int; diff --git a/tests/sharding/data/db2.prepare.sql b/tests/sharding/data/db2.prepare.sql index 3c88744940..4d6ed28b48 100644 --- a/tests/sharding/data/db2.prepare.sql +++ b/tests/sharding/data/db2.prepare.sql @@ -4,5 +4,5 @@ create database `sharding1`; use `sharding1`; create table t2 (id bigint auto_increment, uid int, name varchar(80), info varchar(100), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; create table t3 (id bigint auto_increment, uid int, name varchar(80), info varchar(100), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; -insert into t2 (uid, name, info) values (40000, 'Remedios Moscote', '{}'), (40001, 'Amaranta', '{"age": 0}'); +insert into t2 (uid, name, info) values (40000, 'Remedios Moscote', '{}'); insert into t3 (uid, name, info) values (30001, 'Aureliano José', '{}'), (30002, 'Santa Sofía de la Piedad', '{}'), (30003, '17 Aurelianos', NULL); diff --git a/tests/sharding/run.sh b/tests/sharding/run.sh index 031bc061ef..4312b547f7 100755 --- a/tests/sharding/run.sh +++ b/tests/sharding/run.sh @@ -24,8 +24,19 @@ function run() { run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + # now, for pessimistic shard DDL, if interrupted after executed DDL but before flush checkpoint, + # re-sync this DDL will cause the source try to sync the DDL of the previous lock again, + # this will need to recover the replication manually, + # so we do not interrupt the replication after executed DDL for this test case. + # + # now, for pessimistic shard DDL, owner and non-owner will reach a stage often not at the same time, + # in order to simply the check and resume flow, only enable the failpoint for one DM-worker. + export GO_FAILPOINTS="github.com/pingcap/dm/syncer/FlushCheckpointStage=return(2)" run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + export GO_FAILPOINTS='' + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT # operate mysql config to worker @@ -47,6 +58,48 @@ function run() { run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + # the task should paused by `FlushCheckpointStage` failpont before flush old checkpoint. + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "failpoint error for FlushCheckpointStage before flush old checkpoint" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "failpoint error for FlushCheckpointStage before track DDL" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "failpoint error for FlushCheckpointStage before execute DDL" 1 + + # resume-task to next stage + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + + # NOTE: the lock may be locked for the next DDL, for details please see the following comments in `master/shardll/pessimist.go`, + # `FIXME: the following case is not supported automatically now, try to support it later` + # so we try to do this `pause-task` and `resume-task` in the case now. + sleep 3 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "pause-task test"\ + "\"result\": true" 3 + # wait really paused + # FIXME: `if !st.stageCAS(pb.Stage_Running, pb.Stage_Paused)` in `subtask.go` is not enough to indicate the real stage. + sleep 2 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + # TODO: check sharding partition id # use sync_diff_inspector to check data now! echo "check sync diff for the first increment replication"