Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

shardddl/optimistic: use init-schema for table without info when recover lock #1551

Merged
merged 5 commits into from
Apr 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
}
o.logger.Info("get history shard DDL lock operation", zap.Int64("revision", revOperation))

initSchemas, revInitSchemas, err := optimism.GetAllInitSchemas(o.cli)
if err != nil {
return 0, 0, 0, err
}
o.logger.Info("get all init schemas", zap.Int64("revision", revInitSchemas))

colm, _, err := optimism.GetAllDroppedColumns(o.cli)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
Expand All @@ -255,7 +261,7 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
}

// recover the shard DDL lock based on history shard DDL info & lock operation.
err = o.recoverLocks(ifm, opm, colm)
err = o.recoverLocks(ifm, opm, colm, initSchemas)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected locks can be handled by the user.
Expand Down Expand Up @@ -287,29 +293,65 @@ func sortInfos(ifm map[string]map[string]map[string]map[string]optimism.Info) []
}

// buildLockJoinedAndTTS build joined table and target table slice for lock by history infos
func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]map[string]optimism.Info) (map[string]schemacmp.Table, map[string][]optimism.TargetTable) {
lockJoined := make(map[string]schemacmp.Table)
lockTTS := make(map[string][]optimism.TargetTable)
func (o *Optimist) buildLockJoinedAndTTS(
ifm map[string]map[string]map[string]map[string]optimism.Info,
initSchemas map[string]map[string]map[string]optimism.InitSchema) (
map[string]schemacmp.Table, map[string][]optimism.TargetTable) {

type infoKey struct {
lockID string
source string
upSchema string
upTable string
}
infos := make(map[infoKey]optimism.Info)
lockTTS := make(map[string][]optimism.TargetTable)
for _, taskInfos := range ifm {
for _, sourceInfos := range taskInfos {
for _, schemaInfos := range sourceInfos {
for _, info := range schemaInfos {
lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable)
if _, ok := lockTTS[lockID]; !ok {
lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
}
infos[infoKey{lockID, info.Source, info.UpSchema, info.UpTable}] = info
}
}
}
}

lockJoined := make(map[string]schemacmp.Table)
for lockID, tts := range lockTTS {
for _, tt := range tts {
for upSchema, tables := range tt.UpTables {
for upTable := range tables {
var table schemacmp.Table
if info, ok := infos[infoKey{lockID, tt.Source, upSchema, upTable}]; ok && info.TableInfoBefore != nil {
table = schemacmp.Encode(info.TableInfoBefore)
} else if initSchema, ok := initSchemas[tt.Task][tt.DownSchema][tt.DownTable]; ok {
// If there is no optimism.Info for a upstream table, it indicates the table structure
// hasn't been changed since last removeLock. So the init schema should be its table info.
table = schemacmp.Encode(initSchema.TableInfo)
} else {
o.logger.Error(
"can not find table info for upstream table",
zap.String("source", tt.Source),
zap.String("up-schema", upSchema),
zap.String("up-table", upTable),
)
continue
}
if joined, ok := lockJoined[lockID]; !ok {
lockJoined[lockID] = schemacmp.Encode(info.TableInfoBefore)
lockJoined[lockID] = table
} else {
newJoined, err := joined.Join(schemacmp.Encode(info.TableInfoBefore))
newJoined, err := joined.Join(table)
// ignore error, will report it in TrySync later
if err != nil {
o.logger.Error(fmt.Sprintf("fail to join table info %s with %s, lockID: %s in recover lock", joined, newJoined, lockID), log.ShortError(err))
} else {
lockJoined[lockID] = newJoined
}
}
if _, ok := lockTTS[lockID]; !ok {
lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
}
}
}
}
Expand All @@ -321,10 +363,11 @@ func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]ma
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation,
colm map[string]map[string]map[string]map[string]map[string]struct{}) error {
colm map[string]map[string]map[string]map[string]map[string]struct{},
initSchemas map[string]map[string]map[string]optimism.InitSchema) error {
// construct joined table based on the shard DDL info.
o.logger.Info("build lock joined and tts")
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, initSchemas)
// build lock and restore table info
o.logger.Info("rebuild locks and tables")
o.lk.RebuildLocksAndTables(o.cli, ifm, colm, lockJoined, lockTTS)
Expand Down
73 changes: 70 additions & 3 deletions dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,8 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

st1.AddTable("db", "tbl-1", downSchema, downTable)
st2.AddTable("db", "tbl-1", downSchema, downTable)
st1.AddTable("foo", "bar-1", downSchema, downTable)
st2.AddTable("foo", "bar-1", downSchema, downTable)

c.Assert(o.Start(ctx, etcdTestCli), IsNil)
_, err := optimism.PutSourceTables(etcdTestCli, st1)
Expand All @@ -1153,10 +1153,14 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) {
_, err = optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)

stm, _, err := optimism.GetAllSourceTables(etcdTestCli)
c.Assert(err, IsNil)
o.tk.Init(stm)

ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)

lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, nil)
c.Assert(len(lockJoined), Equals, 1)
c.Assert(len(lockTTS), Equals, 1)
joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)]
Expand All @@ -1165,3 +1169,66 @@ func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) {
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)
}

func (t *testOptimist) TestBuildLockWithInitSchema(c *C) {
defer clearOptimistTestSourceInfoOperation(c)

var (
logger = log.L()
o = NewOptimist(&logger)
task = "task"
source1 = "mysql-replica-1"
source2 = "mysql-replica-2"
downSchema = "db"
downTable = "tbl"
st1 = optimism.NewSourceTables(task, source1)
st2 = optimism.NewSourceTables(task, source2)
p = parser.New()
se = mock.NewContext()
tblID = int64(111)

ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY, b INT, c INT)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (a INT PRIMARY KEY)`)

ddlDropB = "ALTER TABLE bar DROP COLUMN b"
ddlDropC = "ALTER TABLE bar DROP COLUMN c"
infoDropB = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, []string{ddlDropC}, ti0, []*model.TableInfo{ti1})
infoDropC = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, []string{ddlDropB}, ti1, []*model.TableInfo{ti2})
initSchema = optimism.NewInitSchema(task, downSchema, downTable, ti0)
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

st1.AddTable("foo", "bar-1", downSchema, downTable)
st2.AddTable("foo", "bar-1", downSchema, downTable)

c.Assert(o.Start(ctx, etcdTestCli), IsNil)
_, err := optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, IsNil)
_, err = optimism.PutSourceTables(etcdTestCli, st2)
c.Assert(err, IsNil)

_, err = optimism.PutInfo(etcdTestCli, infoDropB)
c.Assert(err, IsNil)
_, err = optimism.PutInfo(etcdTestCli, infoDropC)
c.Assert(err, IsNil)

stm, _, err := optimism.GetAllSourceTables(etcdTestCli)
c.Assert(err, IsNil)
o.tk.Init(stm)

ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)

initSchemas := map[string]map[string]map[string]optimism.InitSchema{task: {downSchema: {downTable: initSchema}}}
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm, initSchemas)
c.Assert(len(lockJoined), Equals, 1)
c.Assert(len(lockTTS), Equals, 1)
joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)]
c.Assert(ok, IsTrue)
cmp, err := joined.Compare(schemacmp.Encode(ti0))
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)
}
28 changes: 28 additions & 0 deletions pkg/shardddl/optimism/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ func GetInitSchema(cli *clientv3.Client, task, downSchema, downTable string) (In
return is, rev, nil
}

// GetAllInitSchemas gets all init schemas from etcd.
// This function should often be called by DM-master.
// k/k/k/v: task-name -> downstream-schema-name -> downstream-table-name -> InitSchema.
func GetAllInitSchemas(cli *clientv3.Client) (map[string]map[string]map[string]InitSchema, int64, error) {
initSchemas := make(map[string]map[string]map[string]InitSchema)
op := clientv3.OpGet(common.ShardDDLOptimismInitSchemaKeyAdapter.Path(), clientv3.WithPrefix())
respTxn, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, op)
if err != nil {
return nil, 0, err
}
resp := respTxn.Responses[0].GetResponseRange()

for _, kv := range resp.Kvs {
schema, err := initSchemaFromJSON(string(kv.Value))
if err != nil {
return nil, 0, err
}
if _, ok := initSchemas[schema.Task]; !ok {
initSchemas[schema.Task] = make(map[string]map[string]InitSchema)
}
if _, ok := initSchemas[schema.Task][schema.DownSchema]; !ok {
initSchemas[schema.Task][schema.DownSchema] = make(map[string]InitSchema)
}
initSchemas[schema.Task][schema.DownSchema][schema.DownTable] = schema
}
return initSchemas, rev, nil
}

// PutInitSchemaIfNotExist puts the InitSchema into ectd if no previous one exists.
func PutInitSchemaIfNotExist(cli *clientv3.Client, is InitSchema) (rev int64, putted bool, err error) {
value, err := is.toJSON()
Expand Down
30 changes: 25 additions & 5 deletions pkg/shardddl/optimism/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ func (t *testForEtcd) TestInitSchemaEtcd(c *C) {
task = "test-init-schema-etcd"
downSchema = "foo"
downTable = "bar"
downTable2 = "bar2"
p = parser.New()
se = mock.NewContext()
tblID int64 = 111
tblI1 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar (id INT PRIMARY KEY)")
tblI2 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)")
tblI3 = createTableInfo(c, p, se, tblID, "CREATE TABLE bar2 (id INT PRIMARY KEY, c INT)")
is1 = NewInitSchema(task, downSchema, downTable, tblI1)
is2 = NewInitSchema(task, downSchema, downTable, tblI2)
is3 = NewInitSchema(task, downSchema, downTable2, tblI3)
)

// try to get, but no one exists.
Expand Down Expand Up @@ -75,15 +78,32 @@ func (t *testForEtcd) TestInitSchemaEtcd(c *C) {
c.Assert(rev3, Equals, rev1)
c.Assert(putted, IsFalse)

// delete the schema.
rev4, deleted, err := DeleteInitSchema(etcdTestCli, is1.Task, is1.DownSchema, is1.DownTable)
// put new init schema with different downstream info.
rev4, putted, err := PutInitSchemaIfNotExist(etcdTestCli, is3)
c.Assert(err, IsNil)
c.Assert(rev4, Greater, rev3)
c.Assert(putted, IsTrue)

// get all init schemas.
initSchemas, rev5, err := GetAllInitSchemas(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
c.Assert(initSchemas[is1.Task][is1.DownSchema][is1.DownTable], DeepEquals, is1)
c.Assert(initSchemas[is3.Task][is3.DownSchema][is3.DownTable], DeepEquals, is3)

// delete the schema.
rev6, deleted, err := DeleteInitSchema(etcdTestCli, is1.Task, is1.DownSchema, is1.DownTable)
c.Assert(err, IsNil)
c.Assert(rev6, Greater, rev5)
c.Assert(deleted, IsTrue)
rev7, deleted, err := DeleteInitSchema(etcdTestCli, is3.Task, is3.DownSchema, is3.DownTable)
c.Assert(err, IsNil)
c.Assert(rev7, Greater, rev6)
c.Assert(deleted, IsTrue)

// not exist now.
isc, rev5, err := GetInitSchema(etcdTestCli, task, downSchema, downTable)
initSchemas, rev8, err := GetAllInitSchemas(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
c.Assert(isc.IsEmpty(), IsTrue)
c.Assert(rev8, Equals, rev7)
c.Assert(initSchemas, HasLen, 0)
}
4 changes: 2 additions & 2 deletions tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ function check_log_contain_with_retry() {
echo "check log contain failed $k-th time (file not exist), retry later"
continue
fi
got=`grep "$text" $log1 | wc -l`
got=`grep -a "$text" $log1 | wc -l`
if [[ $got -ne 0 ]]; then
rc=1
break
Expand All @@ -219,7 +219,7 @@ function check_log_contain_with_retry() {
echo "check log contain failed $k-th time (file not exist), retry later"
continue
fi
got=`grep "$text" $log2 | wc -l`
got=`grep -a "$text" $log2 | wc -l`
if [[ $got -ne 0 ]]; then
rc=1
break
Expand Down
18 changes: 11 additions & 7 deletions tests/shardddl3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1020,20 +1020,24 @@ function DM_DropAddColumn_CASE() {

restart_master_on_pos $reset "1"

run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(4);"

restart_master_on_pos $reset "2"

run_sql_source2 "alter table ${shardddl1}.${tb1} drop column c;"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(4,4);"

restart_master_on_pos $reset "3"
restart_master_on_pos $reset "2"

# make sure column c is fully dropped in the downstream
check_log_contain_with_retry 'finish to handle ddls in optimistic shard mode' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry 'finish to handle ddls in optimistic shard mode' $WORK_DIR/worker2/log/dm-worker.log

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we remove above check_log_contain_with_retry?

Copy link
Contributor Author

@sleepymole sleepymole Apr 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems no, no DDL lock exists doesn't guarantee that master received info and synced drop column c to the downstream, probably master didn't received any ddl infos.

"show-ddl-locks" \
"no DDL lock exists" 1

run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(5);"

restart_master_on_pos $reset "3"

run_sql_source1 "alter table ${shardddl1}.${tb1} add column c int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(6,6);"

Expand Down