From af0d07603ff503ef5c8454251c8f0216e3f106fb Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Sat, 3 Apr 2021 09:19:24 +0800 Subject: [PATCH] cherry pick #1551 to release-2.0 Signed-off-by: ti-srebot --- dm/master/shardddl/optimist.go | 65 ++++++++++++++++++++----- dm/master/shardddl/optimist_test.go | 73 ++++++++++++++++++++++++++-- pkg/shardddl/optimism/schema.go | 28 +++++++++++ pkg/shardddl/optimism/schema_test.go | 30 ++++++++++-- tests/_utils/test_prepare | 4 +- tests/shardddl3/run.sh | 18 ++++--- 6 files changed, 190 insertions(+), 28 deletions(-) diff --git a/dm/master/shardddl/optimist.go b/dm/master/shardddl/optimist.go index 9fabc1494e..b499b24f0f 100644 --- a/dm/master/shardddl/optimist.go +++ b/dm/master/shardddl/optimist.go @@ -247,6 +247,12 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e } o.logger.Info("get history shard DDL lock operation", zap.Int64("revision", revOperation)) + initSchemas, revInitSchemas, err := optimism.GetAllInitSchemas(o.cli) + if err != nil { + return 0, 0, 0, err + } + o.logger.Info("get all init schemas", zap.Int64("revision", revInitSchemas)) + colm, _, err := optimism.GetAllDroppedColumns(o.cli) if err != nil { // only log the error, and don't return it to forbid the startup of the DM-master leader. @@ -255,7 +261,7 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e } // recover the shard DDL lock based on history shard DDL info & lock operation. - err = o.recoverLocks(ifm, opm, colm) + err = o.recoverLocks(ifm, opm, colm, initSchemas) if err != nil { // only log the error, and don't return it to forbid the startup of the DM-master leader. // then these unexpected locks can be handled by the user. @@ -287,19 +293,58 @@ func sortInfos(ifm map[string]map[string]map[string]map[string]optimism.Info) [] } // buildLockJoinedAndTTS build joined table and target table slice for lock by history infos -func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]map[string]optimism.Info) (map[string]schemacmp.Table, map[string][]optimism.TargetTable) { - lockJoined := make(map[string]schemacmp.Table) - lockTTS := make(map[string][]optimism.TargetTable) +func (o *Optimist) buildLockJoinedAndTTS( + ifm map[string]map[string]map[string]map[string]optimism.Info, + initSchemas map[string]map[string]map[string]optimism.InitSchema) ( + map[string]schemacmp.Table, map[string][]optimism.TargetTable) { + type infoKey struct { + lockID string + source string + upSchema string + upTable string + } + infos := make(map[infoKey]optimism.Info) + lockTTS := make(map[string][]optimism.TargetTable) for _, taskInfos := range ifm { for _, sourceInfos := range taskInfos { for _, schemaInfos := range sourceInfos { for _, info := range schemaInfos { lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable) + if _, ok := lockTTS[lockID]; !ok { + lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable) + } + infos[infoKey{lockID, info.Source, info.UpSchema, info.UpTable}] = info + } + } + } + } + + lockJoined := make(map[string]schemacmp.Table) + for lockID, tts := range lockTTS { + for _, tt := range tts { + for upSchema, tables := range tt.UpTables { + for upTable := range tables { + var table schemacmp.Table + if info, ok := infos[infoKey{lockID, tt.Source, upSchema, upTable}]; ok && info.TableInfoBefore != nil { + table = schemacmp.Encode(info.TableInfoBefore) + } else if initSchema, ok := initSchemas[tt.Task][tt.DownSchema][tt.DownTable]; ok { + // If there is no optimism.Info for a upstream table, it indicates the table structure + // hasn't been changed since last removeLock. So the init schema should be its table info. + table = schemacmp.Encode(initSchema.TableInfo) + } else { + o.logger.Error( + "can not find table info for upstream table", + zap.String("source", tt.Source), + zap.String("up-schema", upSchema), + zap.String("up-table", upTable), + ) + continue + } if joined, ok := lockJoined[lockID]; !ok { - lockJoined[lockID] = schemacmp.Encode(info.TableInfoBefore) + lockJoined[lockID] = table } else { - newJoined, err := joined.Join(schemacmp.Encode(info.TableInfoBefore)) + newJoined, err := joined.Join(table) // ignore error, will report it in TrySync later if err != nil { o.logger.Error(fmt.Sprintf("fail to join table info %s with %s, lockID: %s in recover lock", joined, newJoined, lockID), log.ShortError(err)) @@ -307,9 +352,6 @@ func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]ma lockJoined[lockID] = newJoined } } - if _, ok := lockTTS[lockID]; !ok { - lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable) - } } } } @@ -321,10 +363,11 @@ func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]ma func (o *Optimist) recoverLocks( ifm map[string]map[string]map[string]map[string]optimism.Info, opm map[string]map[string]map[string]map[string]optimism.Operation, - colm map[string]map[string]map[string]map[string]map[string]struct{}) error { + colm map[string]map[string]map[string]map[string]map[string]struct{}, + initSchemas map[string]map[string]map[string]optimism.InitSchema) error { // construct joined table based on the shard DDL info. o.logger.Info("build lock joined and tts") - lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm) + lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, initSchemas) // build lock and restore table info o.logger.Info("rebuild locks and tables") o.lk.RebuildLocksAndTables(o.cli, ifm, colm, lockJoined, lockTTS) diff --git a/dm/master/shardddl/optimist_test.go b/dm/master/shardddl/optimist_test.go index ea1b6360d1..6d00b7c64a 100644 --- a/dm/master/shardddl/optimist_test.go +++ b/dm/master/shardddl/optimist_test.go @@ -1139,8 +1139,8 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - st1.AddTable("db", "tbl-1", downSchema, downTable) - st2.AddTable("db", "tbl-1", downSchema, downTable) + st1.AddTable("foo", "bar-1", downSchema, downTable) + st2.AddTable("foo", "bar-1", downSchema, downTable) c.Assert(o.Start(ctx, etcdTestCli), IsNil) _, err := optimism.PutSourceTables(etcdTestCli, st1) @@ -1153,10 +1153,14 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) { _, err = optimism.PutInfo(etcdTestCli, i11) c.Assert(err, IsNil) + stm, _, err := optimism.GetAllSourceTables(etcdTestCli) + c.Assert(err, IsNil) + o.tk.Init(stm) + ifm, _, err := optimism.GetAllInfo(etcdTestCli) c.Assert(err, IsNil) - lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm) + lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, nil) c.Assert(len(lockJoined), Equals, 1) c.Assert(len(lockTTS), Equals, 1) joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)] @@ -1165,3 +1169,66 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) { c.Assert(err, IsNil) c.Assert(cmp, Equals, 0) } + +func (t *testOptimist) TestBuildLockWithInitSchema(c *C) { + defer clearOptimistTestSourceInfoOperation(c) + + var ( + logger = log.L() + o = NewOptimist(&logger) + task = "task" + source1 = "mysql-replica-1" + source2 = "mysql-replica-2" + downSchema = "db" + downTable = "tbl" + st1 = optimism.NewSourceTables(task, source1) + st2 = optimism.NewSourceTables(task, source2) + p = parser.New() + se = mock.NewContext() + tblID = int64(111) + + ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY, b INT, c INT)`) + ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`) + ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY)`) + + ddlDropB = "ALTER TABLE bar DROP COLUMN b" + ddlDropC = "ALTER TABLE bar DROP COLUMN c" + infoDropB = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, []string{ddlDropC}, ti0, []*model.TableInfo{ti1}) + infoDropC = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, []string{ddlDropB}, ti1, []*model.TableInfo{ti2}) + initSchema = optimism.NewInitSchema(task, downSchema, downTable, ti0) + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + st1.AddTable("foo", "bar-1", downSchema, downTable) + st2.AddTable("foo", "bar-1", downSchema, downTable) + + c.Assert(o.Start(ctx, etcdTestCli), IsNil) + _, err := optimism.PutSourceTables(etcdTestCli, st1) + c.Assert(err, IsNil) + _, err = optimism.PutSourceTables(etcdTestCli, st2) + c.Assert(err, IsNil) + + _, err = optimism.PutInfo(etcdTestCli, infoDropB) + c.Assert(err, IsNil) + _, err = optimism.PutInfo(etcdTestCli, infoDropC) + c.Assert(err, IsNil) + + stm, _, err := optimism.GetAllSourceTables(etcdTestCli) + c.Assert(err, IsNil) + o.tk.Init(stm) + + ifm, _, err := optimism.GetAllInfo(etcdTestCli) + c.Assert(err, IsNil) + + initSchemas := map[string]map[string]map[string]optimism.InitSchema{task: {downSchema: {downTable: initSchema}}} + lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, initSchemas) + c.Assert(len(lockJoined), Equals, 1) + c.Assert(len(lockTTS), Equals, 1) + joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)] + c.Assert(ok, IsTrue) + cmp, err := joined.Compare(schemacmp.Encode(ti0)) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, 0) +} diff --git a/pkg/shardddl/optimism/schema.go b/pkg/shardddl/optimism/schema.go index 328d278cb5..2d81368718 100644 --- a/pkg/shardddl/optimism/schema.go +++ b/pkg/shardddl/optimism/schema.go @@ -89,6 +89,34 @@ func GetInitSchema(cli *clientv3.Client, task, downSchema, downTable string) (In return is, rev, nil } +// GetAllInitSchemas gets all init schemas from etcd. +// This function should often be called by DM-master. +// k/k/k/v: task-name -> downstream-schema-name -> downstream-table-name -> InitSchema. +func GetAllInitSchemas(cli *clientv3.Client) (map[string]map[string]map[string]InitSchema, int64, error) { + initSchemas := make(map[string]map[string]map[string]InitSchema) + op := clientv3.OpGet(common.ShardDDLOptimismInitSchemaKeyAdapter.Path(), clientv3.WithPrefix()) + respTxn, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, op) + if err != nil { + return nil, 0, err + } + resp := respTxn.Responses[0].GetResponseRange() + + for _, kv := range resp.Kvs { + schema, err := initSchemaFromJSON(string(kv.Value)) + if err != nil { + return nil, 0, err + } + if _, ok := initSchemas[schema.Task]; !ok { + initSchemas[schema.Task] = make(map[string]map[string]InitSchema) + } + if _, ok := initSchemas[schema.Task][schema.DownSchema]; !ok { + initSchemas[schema.Task][schema.DownSchema] = make(map[string]InitSchema) + } + initSchemas[schema.Task][schema.DownSchema][schema.DownTable] = schema + } + return initSchemas, rev, nil +} + // PutInitSchemaIfNotExist puts the InitSchema into ectd if no previous one exists. func PutInitSchemaIfNotExist(cli *clientv3.Client, is InitSchema) (rev int64, putted bool, err error) { value, err := is.toJSON() diff --git a/pkg/shardddl/optimism/schema_test.go b/pkg/shardddl/optimism/schema_test.go index 9fb0628f87..1fc7a97d2c 100644 --- a/pkg/shardddl/optimism/schema_test.go +++ b/pkg/shardddl/optimism/schema_test.go @@ -38,13 +38,16 @@ func (t *testForEtcd) TestInitSchemaEtcd(c *C) { task = "test-init-schema-etcd" downSchema = "foo" downTable = "bar" + downTable2 = "bar2" p = parser.New() se = mock.NewContext() tblID int64 = 111 tblI1 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar (id INT PRIMARY KEY)") tblI2 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)") + tblI3 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar2 (id INT PRIMARY KEY, c INT)") is1 = NewInitSchema(task, downSchema, downTable, tblI1) is2 = NewInitSchema(task, downSchema, downTable, tblI2) + is3 = NewInitSchema(task, downSchema, downTable2, tblI3) ) // try to get, but no one exists. @@ -75,15 +78,32 @@ func (t *testForEtcd) TestInitSchemaEtcd(c *C) { c.Assert(rev3, Equals, rev1) c.Assert(putted, IsFalse) - // delete the schema. - rev4, deleted, err := DeleteInitSchema(etcdTestCli, is1.Task, is1.DownSchema, is1.DownTable) + // put new init schema with different downstream info. + rev4, putted, err := PutInitSchemaIfNotExist(etcdTestCli, is3) c.Assert(err, IsNil) c.Assert(rev4, Greater, rev3) + c.Assert(putted, IsTrue) + + // get all init schemas. + initSchemas, rev5, err := GetAllInitSchemas(etcdTestCli) + c.Assert(err, IsNil) + c.Assert(rev5, Equals, rev4) + c.Assert(initSchemas[is1.Task][is1.DownSchema][is1.DownTable], DeepEquals, is1) + c.Assert(initSchemas[is3.Task][is3.DownSchema][is3.DownTable], DeepEquals, is3) + + // delete the schema. + rev6, deleted, err := DeleteInitSchema(etcdTestCli, is1.Task, is1.DownSchema, is1.DownTable) + c.Assert(err, IsNil) + c.Assert(rev6, Greater, rev5) + c.Assert(deleted, IsTrue) + rev7, deleted, err := DeleteInitSchema(etcdTestCli, is3.Task, is3.DownSchema, is3.DownTable) + c.Assert(err, IsNil) + c.Assert(rev7, Greater, rev6) c.Assert(deleted, IsTrue) // not exist now. - isc, rev5, err := GetInitSchema(etcdTestCli, task, downSchema, downTable) + initSchemas, rev8, err := GetAllInitSchemas(etcdTestCli) c.Assert(err, IsNil) - c.Assert(rev5, Equals, rev4) - c.Assert(isc.IsEmpty(), IsTrue) + c.Assert(rev8, Equals, rev7) + c.Assert(initSchemas, HasLen, 0) } diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index 34a9163e94..ebf00b6fb3 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -208,7 +208,7 @@ function check_log_contain_with_retry() { echo "check log contain failed $k-th time (file not exist), retry later" continue fi - got=`grep "$text" $log1 | wc -l` + got=`grep -a "$text" $log1 | wc -l` if [[ $got -ne 0 ]]; then rc=1 break @@ -219,7 +219,7 @@ function check_log_contain_with_retry() { echo "check log contain failed $k-th time (file not exist), retry later" continue fi - got=`grep "$text" $log2 | wc -l` + got=`grep -a "$text" $log2 | wc -l` if [[ $got -ne 0 ]]; then rc=1 break diff --git a/tests/shardddl3/run.sh b/tests/shardddl3/run.sh index af55e6a59c..30d4949436 100644 --- a/tests/shardddl3/run.sh +++ b/tests/shardddl3/run.sh @@ -1020,20 +1020,24 @@ function DM_DropAddColumn_CASE() { restart_master_on_pos $reset "1" - run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" - run_sql_source1 "insert into ${shardddl1}.${tb1} values(4);" - - restart_master_on_pos $reset "2" - run_sql_source2 "alter table ${shardddl1}.${tb1} drop column c;" - run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(4,4);" - restart_master_on_pos $reset "3" + restart_master_on_pos $reset "2" # make sure column c is fully dropped in the downstream check_log_contain_with_retry 'finish to handle ddls in optimistic shard mode' $WORK_DIR/worker1/log/dm-worker.log check_log_contain_with_retry 'finish to handle ddls in optimistic shard mode' $WORK_DIR/worker2/log/dm-worker.log + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "show-ddl-locks" \ + "no DDL lock exists" 1 + + run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(5);" + + restart_master_on_pos $reset "3" + run_sql_source1 "alter table ${shardddl1}.${tb1} add column c int;" run_sql_source1 "insert into ${shardddl1}.${tb1} values(6,6);"