Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

shardddl/optimistic: warn add not fully dropped columns #1510

Merged
merged 34 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
05c18b1
add unit tests and partially dropped columns without etcd
lichunzhu Mar 11, 2021
d774402
add etcd info and integration test
lichunzhu Mar 15, 2021
233fe73
Merge branch 'master' into warnDropAddColumn
lichunzhu Mar 15, 2021
01f6292
fix hound
lichunzhu Mar 15, 2021
5f0cb3a
Merge branch 'warnDropAddColumn' of https://github.com/lichunzhu/dm i…
lichunzhu Mar 15, 2021
1677848
fix lint
lichunzhu Mar 15, 2021
4bcda8a
fix integration tests and unit tests
lichunzhu Mar 16, 2021
521520d
Merge branch 'master' into warnDropAddColumn
GMHDBJD Mar 17, 2021
cc29652
fix unit test and integration test
lichunzhu Mar 18, 2021
bc42b83
merge master and resolve conflicts
lichunzhu Mar 18, 2021
7f28b7d
merge master and resolve conflicts
lichunzhu Mar 18, 2021
9d165de
fix ut
lichunzhu Mar 18, 2021
86686e2
Merge branch 'warnDropAddColumn' of https://github.com/lichunzhu/dm i…
lichunzhu Mar 18, 2021
bd6cc21
fix lint
lichunzhu Mar 18, 2021
2fdb868
fix test
lichunzhu Mar 18, 2021
fb5857a
address more comments
lichunzhu Mar 18, 2021
baee6fa
address comment
lichunzhu Mar 18, 2021
10f3dcf
address comments
lichunzhu Mar 19, 2021
62309ad
fix uts
lichunzhu Mar 19, 2021
229e3df
Merge branch 'master' into warnDropAddColumn
lichunzhu Mar 19, 2021
048f627
Apply suggestions from code review
lichunzhu Mar 22, 2021
f354a16
Merge branch 'master' into warnDropAddColumn
GMHDBJD Mar 23, 2021
814ea09
address comments
lichunzhu Mar 24, 2021
129813b
Merge branch 'master' into warnDropAddColumn
lichunzhu Mar 24, 2021
d982ac4
fix terror
lichunzhu Mar 24, 2021
c247b17
Merge branch 'master' into warnDropAddColumn
GMHDBJD Mar 25, 2021
56fb28b
change ModRevision to Revision
lichunzhu Mar 26, 2021
f331643
merge master and resolve conflicts
lichunzhu Mar 26, 2021
eb54917
Merge branch 'warnDropAddColumn' of https://github.com/lichunzhu/dm i…
lichunzhu Mar 26, 2021
bfb7a17
delete colm in lockKeeper
lichunzhu Mar 26, 2021
235776f
refine some logs
lichunzhu Mar 26, 2021
2efb0cf
Update pkg/shardddl/optimism/lock.go
lichunzhu Mar 26, 2021
05b2c20
fix unit test etcd cluster race usage
lichunzhu Mar 26, 2021
213e07f
fix again
lichunzhu Mar 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:l
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high], "Message: process unit not waiting for sharding DDL to sync"
ErrSyncerUnitGenBAList,[code=36060:class=sync-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high], "Message: fail to handle ddl job for %s"
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high], "Message: fail to handle shard ddl %v in optimistic mode, because schema conflict detected, Workaround: Please use show-ddl-locks command for more details."
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high], "Message: fail to handle shard ddl %v in optimistic mode, because schema conflict detected, conflict error: %s, Workaround: Please use show-ddl-locks command for more details."
ErrSyncerFailpoint,[code=36063:class=sync-unit:scope=internal:level=low], "Message: failpoint specified error"
ErrSyncerReplaceEvent,[code=36064:class=sync-unit:scope=internal:level=high]
ErrSyncerOperatorNotExist,[code=36065:class=sync-unit:scope=internal:level=low], "Message: error operator not exist, position: %s"
Expand Down
12 changes: 11 additions & 1 deletion dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ var (
// ShardDDLOptimismInitSchemaKeyAdapter is used to store the initial schema (before constructed the lock) of merged tables.
// k/v: Encode(task-name, downstream-schema-name, downstream-table-name) -> table schema.
ShardDDLOptimismInitSchemaKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/init-schema/")
// ShardDDLOptimismDroppedColumnsKeyAdapter is used to store the columns that are not fully dropped
// k/v: Encode(task-name, downstream-schema-name, downstream-table-name, column-name, source-id, upstream-schema-name, upstream-table-name) -> empty
// If we don't identify different upstream tables, we may report an error for tb2 in the following case.
// Time series: (+a/-a means add/drop column a)
// older ----------------> newer
// tb1: +a +b +c -c
// tb2: +a +b +c
// tb3: +a +b +c
ShardDDLOptimismDroppedColumnsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/dropped-columns/")
)

func keyAdapterKeysLen(s KeyAdapter) int {
Expand All @@ -95,7 +104,8 @@ func keyAdapterKeysLen(s KeyAdapter) int {
return 3
case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter:
return 4

case ShardDDLOptimismDroppedColumnsKeyAdapter:
return 7
}
return -1
}
Expand Down
6 changes: 3 additions & 3 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,15 +634,15 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
tiBefore = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
tiAfter1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
info1 = optimism.NewInfo(taskName, sources[0], "foo-1", "bar-1", schema, table, DDLs1, tiBefore, []*model.TableInfo{tiAfter1})
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, false)
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, "", false)
)

st1.AddTable("foo-1", "bar-1", schema, table)
_, err = optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, check.IsNil)
_, err = optimism.PutInfo(etcdTestCli, info1)
c.Assert(err, check.IsNil)
_, succ, err = optimism.PutOperation(etcdTestCli, false, op1)
_, succ, err = optimism.PutOperation(etcdTestCli, false, op1, 0)
c.Assert(succ, check.IsTrue)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -1471,7 +1471,7 @@ func (t *testMaster) TestOfflineMember(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(listResp.Members, check.HasLen, 3)

// make sure s3 is not the leader, otherwise it will take some time to campain a new leader after close s3, and it may cause timeout
// make sure s3 is not the leader, otherwise it will take some time to campaign a new leader after close s3, and it may cause timeout
c.Assert(utils.WaitSomething(20, 500*time.Millisecond, func() bool {
_, leaderID, _, err = s1.election.LeaderInfo(ctx)
if err != nil {
Expand Down
108 changes: 67 additions & 41 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,21 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
}
o.logger.Info("get history shard DDL lock operation", zap.Int64("revision", revOperation))

colm, _, err := optimism.GetAllDroppedColumns(o.cli)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected columns can be handled by the user.
o.logger.Error("fail to recover colms", log.ShortError(err))
}

// recover the shard DDL lock based on history shard DDL info & lock operation.
err = o.recoverLocks(ifm, opm)
err = o.recoverLocks(ifm, opm, colm)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected locks can be handled by the user.
o.logger.Error("fail to recover locks", log.ShortError(err))
}

return revSource, revInfo, revOperation, nil
}

Expand Down Expand Up @@ -312,26 +320,30 @@ func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]ma
// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation) error {
opm map[string]map[string]map[string]map[string]optimism.Operation,
colm map[string]map[string]map[string]map[string]map[string]struct{}) error {
// construct joined table based on the shard DDL info.
o.logger.Info("build lock joined and tts")
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
// build lock and restore table info
o.logger.Info("rebuild locks and tables")
o.lk.RebuildLocksAndTables(ifm, lockJoined, lockTTS)
o.lk.RebuildLocksAndTables(o.cli, ifm, colm, lockJoined, lockTTS)
// sort infos by revision
infos := sortInfos(ifm)
var firstErr error
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
setFirstErr := func(err error) {
if firstErr == nil && err != nil {
firstErr = err
}
}

for _, info := range infos {
tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
_, _, err := o.lk.TrySync(info, tts)
if err != nil {
return err
}
// never mark the lock operation from `done` to `not-done` when recovering.
err = o.handleLock(info, tts, true)
err := o.handleInfo(info, true)
if err != nil {
return err
o.logger.Error("fail to handle info while recovering locks", zap.Error(err))
setFirstErr(err)
continue
}
}

Expand All @@ -347,12 +359,17 @@ func (o *Optimist) recoverLocks(
}
if op.Done {
lock.TryMarkDone(op.Source, op.UpSchema, op.UpTable)
err := lock.DeleteColumnsByDDLs(op.DDLs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hope we could left more comment, I think I might forget the logic somedays later

if DM-master sent a DROP COLUMN DDL, all shard tables had dropped that column and got synced. So we delete it from paritially dropped columns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added under this function's definition.

if err != nil {
o.logger.Error("fail to update lock columns", zap.Error(err))
continue
}
}
}
}
}
}
return nil
return firstErr
}

// watchSourceInfoOperation watches the etcd operation for source tables, shard DDL infos and shard DDL operations.
Expand Down Expand Up @@ -394,7 +411,7 @@ func (o *Optimist) watchSourceInfoOperation(
}()
go func() {
defer wg.Done()
o.handleInfo(ctx, infoCh)
o.handleInfoPut(ctx, infoCh)
}()

// watch for the shard DDL lock operation and handle them.
Expand Down Expand Up @@ -437,8 +454,8 @@ func (o *Optimist) handleSourceTables(ctx context.Context, sourceCh <-chan optim
}
}

// handleInfo handles PUT and DELETE for the shard DDL info.
func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info) {
// handleInfoPut handles PUT and DELETE for the shard DDL info.
func (o *Optimist) handleInfoPut(ctx context.Context, infoCh <-chan optimism.Info) {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -470,35 +487,38 @@ func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info)
continue
}

added := o.tk.AddTable(info.Task, info.Source, info.UpSchema, info.UpTable, info.DownSchema, info.DownTable)
o.logger.Debug("a table added for info", zap.Bool("added", added), zap.Stringer("info", info))

tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
if tts == nil {
// WATCH for SourceTables may fall behind WATCH for Info although PUT earlier,
// so we try to get SourceTables again.
// NOTE: check SourceTables for `info.Source` if needed later.
stm, _, err := optimism.GetAllSourceTables(o.cli)
if err != nil {
o.logger.Error("fail to get source tables", log.ShortError(err))
} else if tts2 := optimism.TargetTablesForTask(info.Task, info.DownSchema, info.DownTable, stm); tts2 != nil {
tts = tts2
}
}
// put operation for the table. we don't set `skipDone=true` now,
// because in optimism mode, one table may execute/done multiple DDLs but other tables may do nothing.
err := o.handleLock(info, tts, false)
if err != nil {
o.logger.Error("fail to handle the shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
o.mu.Unlock()
continue
}
_ = o.handleInfo(info, false)
o.mu.Unlock()
}
}
}

func (o *Optimist) handleInfo(info optimism.Info, skipDone bool) error {
added := o.tk.AddTable(info.Task, info.Source, info.UpSchema, info.UpTable, info.DownSchema, info.DownTable)
o.logger.Debug("a table added for info", zap.Bool("added", added), zap.Stringer("info", info))

tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
if tts == nil {
// WATCH for SourceTables may fall behind WATCH for Info although PUT earlier,
// so we try to get SourceTables again.
// NOTE: check SourceTables for `info.Source` if needed later.
stm, _, err := optimism.GetAllSourceTables(o.cli)
if err != nil {
o.logger.Error("fail to get source tables", log.ShortError(err))
} else if tts2 := optimism.TargetTablesForTask(info.Task, info.DownSchema, info.DownTable, stm); tts2 != nil {
tts = tts2
}
}
err := o.handleLock(info, tts, skipDone)
if err != nil {
o.logger.Error("fail to handle the shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
}
return err
}

// handleOperationPut handles PUT for the shard DDL lock operations.
func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.Operation) {
for {
Expand All @@ -525,6 +545,10 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.
continue
}

err := lock.DeleteColumnsByDDLs(op.DDLs)
if err != nil {
o.logger.Error("fail to update lock columns", zap.Error(err))
}
// in optimistic mode, we always try to mark a table as done after received the `done` status of the DDLs operation.
// NOTE: even all tables have done their previous DDLs operations, the lock may still not resolved,
// because these tables may have different schemas.
Expand Down Expand Up @@ -553,13 +577,15 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.

// handleLock handles a single shard DDL lock.
func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, skipDone bool) error {
lockID, newDDLs, err := o.lk.TrySync(info, tts)
lockID, newDDLs, err := o.lk.TrySync(o.cli, info, tts)
var cfStage = optimism.ConflictNone
var cfMsg = ""
if info.IgnoreConflict {
o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected",
zap.String("lock", lockID), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted), log.ShortError(err))
} else if err != nil {
cfStage = optimism.ConflictDetected // we treat any errors returned from `TrySync` as conflict detected now.
cfMsg = err.Error()
o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected",
zap.String("lock", lockID), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted), log.ShortError(err))
} else {
Expand Down Expand Up @@ -599,8 +625,8 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk
return nil
}

op := optimism.NewOperation(lockID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, false)
rev, succ, err := optimism.PutOperation(o.cli, skipDone, op)
op := optimism.NewOperation(lockID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, cfMsg, false)
rev, succ, err := optimism.PutOperation(o.cli, skipDone, op, info.Revision)
if err != nil {
return err
}
Expand Down Expand Up @@ -671,13 +697,13 @@ func (o *Optimist) deleteInfosOps(lock *optimism.Lock) (bool, error) {
info := optimism.NewInfo(lock.Task, source, schema, table, lock.DownSchema, lock.DownTable, nil, nil, nil)
info.Version = lock.GetVersion(source, schema, table)
infos = append(infos, info)
ops = append(ops, optimism.NewOperation(lock.ID, lock.Task, source, schema, table, nil, optimism.ConflictNone, false))
ops = append(ops, optimism.NewOperation(lock.ID, lock.Task, source, schema, table, nil, optimism.ConflictNone, "", false))
}
}
}
// NOTE: we rely on only `task`, `downSchema`, and `downTable` used for deletion.
initSchema := optimism.NewInitSchema(lock.Task, lock.DownSchema, lock.DownTable, nil)
rev, deleted, err := optimism.DeleteInfosOperationsSchema(o.cli, infos, ops, initSchema)
rev, deleted, err := optimism.DeleteInfosOperationsSchemaColumn(o.cli, infos, ops, initSchema)
if err != nil {
return deleted, err
}
Expand Down
Loading