Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1551 to release-2.0 (#1553)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Apr 9, 2021
1 parent 7bcfad5 commit 1e23b34
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 28 deletions.
65 changes: 54 additions & 11 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
}
o.logger.Info("get history shard DDL lock operation", zap.Int64("revision", revOperation))

initSchemas, revInitSchemas, err := optimism.GetAllInitSchemas(o.cli)
if err != nil {
return 0, 0, 0, err
}
o.logger.Info("get all init schemas", zap.Int64("revision", revInitSchemas))

colm, _, err := optimism.GetAllDroppedColumns(o.cli)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
Expand All @@ -255,7 +261,7 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
}

// recover the shard DDL lock based on history shard DDL info & lock operation.
err = o.recoverLocks(ifm, opm, colm)
err = o.recoverLocks(ifm, opm, colm, initSchemas)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected locks can be handled by the user.
Expand Down Expand Up @@ -287,29 +293,65 @@ func sortInfos(ifm map[string]map[string]map[string]map[string]optimism.Info) []
}

// buildLockJoinedAndTTS build joined table and target table slice for lock by history infos
func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]map[string]optimism.Info) (map[string]schemacmp.Table, map[string][]optimism.TargetTable) {
lockJoined := make(map[string]schemacmp.Table)
lockTTS := make(map[string][]optimism.TargetTable)
func (o *Optimist) buildLockJoinedAndTTS(
ifm map[string]map[string]map[string]map[string]optimism.Info,
initSchemas map[string]map[string]map[string]optimism.InitSchema) (
map[string]schemacmp.Table, map[string][]optimism.TargetTable) {

type infoKey struct {
lockID string
source string
upSchema string
upTable string
}
infos := make(map[infoKey]optimism.Info)
lockTTS := make(map[string][]optimism.TargetTable)
for _, taskInfos := range ifm {
for _, sourceInfos := range taskInfos {
for _, schemaInfos := range sourceInfos {
for _, info := range schemaInfos {
lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable)
if _, ok := lockTTS[lockID]; !ok {
lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
}
infos[infoKey{lockID, info.Source, info.UpSchema, info.UpTable}] = info
}
}
}
}

lockJoined := make(map[string]schemacmp.Table)
for lockID, tts := range lockTTS {
for _, tt := range tts {
for upSchema, tables := range tt.UpTables {
for upTable := range tables {
var table schemacmp.Table
if info, ok := infos[infoKey{lockID, tt.Source, upSchema, upTable}]; ok && info.TableInfoBefore != nil {
table = schemacmp.Encode(info.TableInfoBefore)
} else if initSchema, ok := initSchemas[tt.Task][tt.DownSchema][tt.DownTable]; ok {
// If there is no optimism.Info for a upstream table, it indicates the table structure
// hasn't been changed since last removeLock. So the init schema should be its table info.
table = schemacmp.Encode(initSchema.TableInfo)
} else {
o.logger.Error(
"can not find table info for upstream table",
zap.String("source", tt.Source),
zap.String("up-schema", upSchema),
zap.String("up-table", upTable),
)
continue
}
if joined, ok := lockJoined[lockID]; !ok {
lockJoined[lockID] = schemacmp.Encode(info.TableInfoBefore)
lockJoined[lockID] = table
} else {
newJoined, err := joined.Join(schemacmp.Encode(info.TableInfoBefore))
newJoined, err := joined.Join(table)
// ignore error, will report it in TrySync later
if err != nil {
o.logger.Error(fmt.Sprintf("fail to join table info %s with %s, lockID: %s in recover lock", joined, newJoined, lockID), log.ShortError(err))
} else {
lockJoined[lockID] = newJoined
}
}
if _, ok := lockTTS[lockID]; !ok {
lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
}
}
}
}
Expand All @@ -321,10 +363,11 @@ func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]ma
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation,
colm map[string]map[string]map[string]map[string]map[string]struct{}) error {
colm map[string]map[string]map[string]map[string]map[string]struct{},
initSchemas map[string]map[string]map[string]optimism.InitSchema) error {
// construct joined table based on the shard DDL info.
o.logger.Info("build lock joined and tts")
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, initSchemas)
// build lock and restore table info
o.logger.Info("rebuild locks and tables")
o.lk.RebuildLocksAndTables(o.cli, ifm, colm, lockJoined, lockTTS)
Expand Down
73 changes: 70 additions & 3 deletions dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,8 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

st1.AddTable("db", "tbl-1", downSchema, downTable)
st2.AddTable("db", "tbl-1", downSchema, downTable)
st1.AddTable("foo", "bar-1", downSchema, downTable)
st2.AddTable("foo", "bar-1", downSchema, downTable)

c.Assert(o.Start(ctx, etcdTestCli), IsNil)
_, err := optimism.PutSourceTables(etcdTestCli, st1)
Expand All @@ -1153,10 +1153,14 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) {
_, err = optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)

stm, _, err := optimism.GetAllSourceTables(etcdTestCli)
c.Assert(err, IsNil)
o.tk.Init(stm)

ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)

lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, nil)
c.Assert(len(lockJoined), Equals, 1)
c.Assert(len(lockTTS), Equals, 1)
joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)]
Expand All @@ -1165,3 +1169,66 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) {
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)
}

func (t *testOptimist) TestBuildLockWithInitSchema(c *C) {
defer clearOptimistTestSourceInfoOperation(c)

var (
logger = log.L()
o = NewOptimist(&logger)
task = "task"
source1 = "mysql-replica-1"
source2 = "mysql-replica-2"
downSchema = "db"
downTable = "tbl"
st1 = optimism.NewSourceTables(task, source1)
st2 = optimism.NewSourceTables(task, source2)
p = parser.New()
se = mock.NewContext()
tblID = int64(111)

ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY, b INT, c INT)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY)`)

ddlDropB = "ALTER TABLE bar DROP COLUMN b"
ddlDropC = "ALTER TABLE bar DROP COLUMN c"
infoDropB = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, []string{ddlDropC}, ti0, []*model.TableInfo{ti1})
infoDropC = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, []string{ddlDropB}, ti1, []*model.TableInfo{ti2})
initSchema = optimism.NewInitSchema(task, downSchema, downTable, ti0)
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

st1.AddTable("foo", "bar-1", downSchema, downTable)
st2.AddTable("foo", "bar-1", downSchema, downTable)

c.Assert(o.Start(ctx, etcdTestCli), IsNil)
_, err := optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, IsNil)
_, err = optimism.PutSourceTables(etcdTestCli, st2)
c.Assert(err, IsNil)

_, err = optimism.PutInfo(etcdTestCli, infoDropB)
c.Assert(err, IsNil)
_, err = optimism.PutInfo(etcdTestCli, infoDropC)
c.Assert(err, IsNil)

stm, _, err := optimism.GetAllSourceTables(etcdTestCli)
c.Assert(err, IsNil)
o.tk.Init(stm)

ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)

initSchemas := map[string]map[string]map[string]optimism.InitSchema{task: {downSchema: {downTable: initSchema}}}
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, initSchemas)
c.Assert(len(lockJoined), Equals, 1)
c.Assert(len(lockTTS), Equals, 1)
joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)]
c.Assert(ok, IsTrue)
cmp, err := joined.Compare(schemacmp.Encode(ti0))
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)
}
28 changes: 28 additions & 0 deletions pkg/shardddl/optimism/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ func GetInitSchema(cli *clientv3.Client, task, downSchema, downTable string) (In
return is, rev, nil
}

// GetAllInitSchemas gets all init schemas from etcd.
// This function should often be called by DM-master.
// k/k/k/v: task-name -> downstream-schema-name -> downstream-table-name -> InitSchema.
func GetAllInitSchemas(cli *clientv3.Client) (map[string]map[string]map[string]InitSchema, int64, error) {
initSchemas := make(map[string]map[string]map[string]InitSchema)
op := clientv3.OpGet(common.ShardDDLOptimismInitSchemaKeyAdapter.Path(), clientv3.WithPrefix())
respTxn, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, op)
if err != nil {
return nil, 0, err
}
resp := respTxn.Responses[0].GetResponseRange()

for _, kv := range resp.Kvs {
schema, err := initSchemaFromJSON(string(kv.Value))
if err != nil {
return nil, 0, err
}
if _, ok := initSchemas[schema.Task]; !ok {
initSchemas[schema.Task] = make(map[string]map[string]InitSchema)
}
if _, ok := initSchemas[schema.Task][schema.DownSchema]; !ok {
initSchemas[schema.Task][schema.DownSchema] = make(map[string]InitSchema)
}
initSchemas[schema.Task][schema.DownSchema][schema.DownTable] = schema
}
return initSchemas, rev, nil
}

// PutInitSchemaIfNotExist puts the InitSchema into ectd if no previous one exists.
func PutInitSchemaIfNotExist(cli *clientv3.Client, is InitSchema) (rev int64, putted bool, err error) {
value, err := is.toJSON()
Expand Down
30 changes: 25 additions & 5 deletions pkg/shardddl/optimism/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ func (t *testForEtcd) TestInitSchemaEtcd(c *C) {
task = "test-init-schema-etcd"
downSchema = "foo"
downTable = "bar"
downTable2 = "bar2"
p = parser.New()
se = mock.NewContext()
tblID int64 = 111
tblI1 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar (id INT PRIMARY KEY)")
tblI2 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)")
tblI3 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar2 (id INT PRIMARY KEY, c INT)")
is1 = NewInitSchema(task, downSchema, downTable, tblI1)
is2 = NewInitSchema(task, downSchema, downTable, tblI2)
is3 = NewInitSchema(task, downSchema, downTable2, tblI3)
)

// try to get, but no one exists.
Expand Down Expand Up @@ -75,15 +78,32 @@ func (t *testForEtcd) TestInitSchemaEtcd(c *C) {
c.Assert(rev3, Equals, rev1)
c.Assert(putted, IsFalse)

// delete the schema.
rev4, deleted, err := DeleteInitSchema(etcdTestCli, is1.Task, is1.DownSchema, is1.DownTable)
// put new init schema with different downstream info.
rev4, putted, err := PutInitSchemaIfNotExist(etcdTestCli, is3)
c.Assert(err, IsNil)
c.Assert(rev4, Greater, rev3)
c.Assert(putted, IsTrue)

// get all init schemas.
initSchemas, rev5, err := GetAllInitSchemas(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
c.Assert(initSchemas[is1.Task][is1.DownSchema][is1.DownTable], DeepEquals, is1)
c.Assert(initSchemas[is3.Task][is3.DownSchema][is3.DownTable], DeepEquals, is3)

// delete the schema.
rev6, deleted, err := DeleteInitSchema(etcdTestCli, is1.Task, is1.DownSchema, is1.DownTable)
c.Assert(err, IsNil)
c.Assert(rev6, Greater, rev5)
c.Assert(deleted, IsTrue)
rev7, deleted, err := DeleteInitSchema(etcdTestCli, is3.Task, is3.DownSchema, is3.DownTable)
c.Assert(err, IsNil)
c.Assert(rev7, Greater, rev6)
c.Assert(deleted, IsTrue)

// not exist now.
isc, rev5, err := GetInitSchema(etcdTestCli, task, downSchema, downTable)
initSchemas, rev8, err := GetAllInitSchemas(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
c.Assert(isc.IsEmpty(), IsTrue)
c.Assert(rev8, Equals, rev7)
c.Assert(initSchemas, HasLen, 0)
}
4 changes: 2 additions & 2 deletions tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ function check_log_contain_with_retry() {
echo "check log contain failed $k-th time (file not exist), retry later"
continue
fi
got=`grep "$text" $log1 | wc -l`
got=`grep -a "$text" $log1 | wc -l`
if [[ $got -ne 0 ]]; then
rc=1
break
Expand All @@ -219,7 +219,7 @@ function check_log_contain_with_retry() {
echo "check log contain failed $k-th time (file not exist), retry later"
continue
fi
got=`grep "$text" $log2 | wc -l`
got=`grep -a "$text" $log2 | wc -l`
if [[ $got -ne 0 ]]; then
rc=1
break
Expand Down
18 changes: 11 additions & 7 deletions tests/shardddl3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2464,20 +2464,24 @@ function DM_DropAddColumn_CASE() {

restart_master_on_pos $reset "1"

run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(4);"

restart_master_on_pos $reset "2"

run_sql_source2 "alter table ${shardddl1}.${tb1} drop column c;"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(4,4);"

restart_master_on_pos $reset "3"
restart_master_on_pos $reset "2"

# make sure column c is fully dropped in the downstream
check_log_contain_with_retry 'finish to handle ddls in optimistic shard mode' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry 'finish to handle ddls in optimistic shard mode' $WORK_DIR/worker2/log/dm-worker.log

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"show-ddl-locks" \
"no DDL lock exists" 1

run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(5);"

restart_master_on_pos $reset "3"

run_sql_source1 "alter table ${shardddl1}.${tb1} add column c int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(6,6);"

Expand Down

0 comments on commit 1e23b34

Please sign in to comment.