This repository has been archived by the owner on Nov 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 188
shardddl/optimistic: warn add not fully dropped columns #1510
Merged
Merged
Changes from 17 commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
05c18b1
add unit tests and partially dropped columns without etcd
lichunzhu d774402
add etcd info and integration test
lichunzhu 233fe73
Merge branch 'master' into warnDropAddColumn
lichunzhu 01f6292
fix hound
lichunzhu 5f0cb3a
Merge branch 'warnDropAddColumn' of https://github.com/lichunzhu/dm i…
lichunzhu 1677848
fix lint
lichunzhu 4bcda8a
fix integration tests and unit tests
lichunzhu 521520d
Merge branch 'master' into warnDropAddColumn
GMHDBJD cc29652
fix unit test and integration test
lichunzhu bc42b83
merge master and resolve conflicts
lichunzhu 7f28b7d
merge master and resolve conflicts
lichunzhu 9d165de
fix ut
lichunzhu 86686e2
Merge branch 'warnDropAddColumn' of https://github.com/lichunzhu/dm i…
lichunzhu bd6cc21
fix lint
lichunzhu 2fdb868
fix test
lichunzhu fb5857a
address more comments
lichunzhu baee6fa
address comment
lichunzhu 10f3dcf
address comments
lichunzhu 62309ad
fix uts
lichunzhu 229e3df
Merge branch 'master' into warnDropAddColumn
lichunzhu 048f627
Apply suggestions from code review
lichunzhu f354a16
Merge branch 'master' into warnDropAddColumn
GMHDBJD 814ea09
address comments
lichunzhu 129813b
Merge branch 'master' into warnDropAddColumn
lichunzhu d982ac4
fix terror
lichunzhu c247b17
Merge branch 'master' into warnDropAddColumn
GMHDBJD 56fb28b
change ModRevision to Revision
lichunzhu f331643
merge master and resolve conflicts
lichunzhu eb54917
Merge branch 'warnDropAddColumn' of https://github.com/lichunzhu/dm i…
lichunzhu bfb7a17
delete colm in lockKeeper
lichunzhu 235776f
refine some logs
lichunzhu 2efb0cf
Update pkg/shardddl/optimism/lock.go
lichunzhu 05b2c20
fix unit test etcd cluster race usage
lichunzhu 213e07f
fix again
lichunzhu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -245,35 +245,49 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e | |
} | ||
o.logger.Info("get history shard DDL lock operation", zap.Int64("revision", revOperation)) | ||
|
||
colm, _, err := optimism.GetAllDroppedColumns(o.cli) | ||
if err != nil { | ||
// only log the error, and don't return it to forbid the startup of the DM-master leader. | ||
// then these unexpected columns can be handled by the user. | ||
o.logger.Error("fail to recover colms", log.ShortError(err)) | ||
} | ||
|
||
// recover the shard DDL lock based on history shard DDL info & lock operation. | ||
err = o.recoverLocks(ifm, opm) | ||
err = o.recoverLocks(ifm, opm, colm) | ||
if err != nil { | ||
// only log the error, and don't return it to forbid the startup of the DM-master leader. | ||
// then these unexpected locks can be handled by the user. | ||
o.logger.Error("fail to recover locks", log.ShortError(err)) | ||
} | ||
|
||
return revSource, revInfo, revOperation, nil | ||
} | ||
|
||
// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation. | ||
func (o *Optimist) recoverLocks( | ||
ifm map[string]map[string]map[string]map[string]optimism.Info, | ||
opm map[string]map[string]map[string]map[string]optimism.Operation) error { | ||
opm map[string]map[string]map[string]map[string]optimism.Operation, | ||
colm map[string]map[string]map[string]map[string]map[string]interface{}) error { | ||
// construct locks based on the shard DDL info. | ||
for task, ifTask := range ifm { | ||
o.lk.SetColumnMap(colm) | ||
defer o.lk.SetColumnMap(nil) | ||
var firstErr error | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
setFirstErr := func(err error) { | ||
if firstErr == nil && err != nil { | ||
firstErr = err | ||
} | ||
} | ||
|
||
for _, ifTask := range ifm { | ||
for _, ifSource := range ifTask { | ||
for _, ifSchema := range ifSource { | ||
for _, info := range ifSchema { | ||
tts := o.tk.FindTables(task, info.DownSchema, info.DownTable) | ||
_, _, err := o.lk.TrySync(info, tts) | ||
if err != nil { | ||
return err | ||
} | ||
// never mark the lock operation from `done` to `not-done` when recovering. | ||
err = o.handleLock(info, tts, true) | ||
if err != nil { | ||
return err | ||
} | ||
// We should return err after all infos are set up. | ||
// If we stopped recovering locks once we meet an error, | ||
// dm-master leader may not have the full information for the other "normal" locks, | ||
// which will cause the sync error in dm-worker. | ||
err := o.handleInfo(info) | ||
setFirstErr(err) | ||
} | ||
} | ||
} | ||
|
@@ -291,6 +305,11 @@ func (o *Optimist) recoverLocks( | |
} | ||
if op.Done { | ||
lock.TryMarkDone(op.Source, op.UpSchema, op.UpTable) | ||
err := lock.DeleteColumnsByDDLs(op.DDLs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hope we could left more comment, I think I might forget the logic somedays later
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added under this function's definition. |
||
if err != nil { | ||
o.logger.Error("fail to update lock columns", zap.Error(err)) | ||
continue | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -338,7 +357,7 @@ func (o *Optimist) watchSourceInfoOperation( | |
}() | ||
go func() { | ||
defer wg.Done() | ||
o.handleInfo(ctx, infoCh) | ||
o.handleInfoPut(ctx, infoCh) | ||
}() | ||
|
||
// watch for the shard DDL lock operation and handle them. | ||
|
@@ -381,8 +400,8 @@ func (o *Optimist) handleSourceTables(ctx context.Context, sourceCh <-chan optim | |
} | ||
} | ||
|
||
// handleInfo handles PUT and DELETE for the shard DDL info. | ||
func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info) { | ||
// handleInfoPut handles PUT and DELETE for the shard DDL info. | ||
func (o *Optimist) handleInfoPut(ctx context.Context, infoCh <-chan optimism.Info) { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -414,35 +433,38 @@ func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info) | |
continue | ||
} | ||
|
||
added := o.tk.AddTable(info.Task, info.Source, info.UpSchema, info.UpTable, info.DownSchema, info.DownTable) | ||
o.logger.Debug("a table added for info", zap.Bool("added", added), zap.Stringer("info", info)) | ||
|
||
tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable) | ||
if tts == nil { | ||
// WATCH for SourceTables may fall behind WATCH for Info although PUT earlier, | ||
// so we try to get SourceTables again. | ||
// NOTE: check SourceTables for `info.Source` if needed later. | ||
stm, _, err := optimism.GetAllSourceTables(o.cli) | ||
if err != nil { | ||
o.logger.Error("fail to get source tables", log.ShortError(err)) | ||
} else if tts2 := optimism.TargetTablesForTask(info.Task, info.DownSchema, info.DownTable, stm); tts2 != nil { | ||
tts = tts2 | ||
} | ||
} | ||
// put operation for the table. we don't set `skipDone=true` now, | ||
// because in optimism mode, one table may execute/done multiple DDLs but other tables may do nothing. | ||
err := o.handleLock(info, tts, false) | ||
if err != nil { | ||
o.logger.Error("fail to handle the shard DDL lock", zap.Stringer("info", info), log.ShortError(err)) | ||
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock) | ||
o.mu.Unlock() | ||
continue | ||
} | ||
_ = o.handleInfo(info) | ||
o.mu.Unlock() | ||
} | ||
} | ||
} | ||
|
||
func (o *Optimist) handleInfo(info optimism.Info) error { | ||
added := o.tk.AddTable(info.Task, info.Source, info.UpSchema, info.UpTable, info.DownSchema, info.DownTable) | ||
o.logger.Debug("a table added for info", zap.Bool("added", added), zap.Stringer("info", info)) | ||
|
||
tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable) | ||
if tts == nil { | ||
// WATCH for SourceTables may fall behind WATCH for Info although PUT earlier, | ||
// so we try to get SourceTables again. | ||
// NOTE: check SourceTables for `info.Source` if needed later. | ||
stm, _, err := optimism.GetAllSourceTables(o.cli) | ||
if err != nil { | ||
o.logger.Error("fail to get source tables", log.ShortError(err)) | ||
} else if tts2 := optimism.TargetTablesForTask(info.Task, info.DownSchema, info.DownTable, stm); tts2 != nil { | ||
tts = tts2 | ||
} | ||
} | ||
// put operation for the table. we don't set `skipDone=true` now, | ||
// because in optimism mode, one table may execute/done multiple DDLs but other tables may do nothing. | ||
err := o.handleLock(info, tts, false) | ||
if err != nil { | ||
o.logger.Error("fail to handle the shard DDL lock", zap.Stringer("info", info), log.ShortError(err)) | ||
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock) | ||
} | ||
return err | ||
} | ||
|
||
// handleOperationPut handles PUT for the shard DDL lock operations. | ||
func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.Operation) { | ||
for { | ||
|
@@ -469,6 +491,10 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism. | |
continue | ||
} | ||
|
||
err := lock.DeleteColumnsByDDLs(op.DDLs) | ||
if err != nil { | ||
o.logger.Error("fail to update lock columns", zap.Error(err)) | ||
} | ||
// in optimistic mode, we always try to mark a table as done after received the `done` status of the DDLs operation. | ||
// NOTE: even all tables have done their previous DDLs operations, the lock may still not resolved, | ||
// because these tables may have different schemas. | ||
|
@@ -497,7 +523,7 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism. | |
|
||
// handleLock handles a single shard DDL lock. | ||
func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, skipDone bool) error { | ||
lockID, newDDLs, err := o.lk.TrySync(info, tts) | ||
lockID, newDDLs, err := o.lk.TrySync(o.cli, info, tts) | ||
var cfStage = optimism.ConflictNone | ||
if info.IgnoreConflict { | ||
o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected", | ||
|
@@ -544,7 +570,7 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk | |
} | ||
|
||
op := optimism.NewOperation(lockID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, false) | ||
rev, succ, err := optimism.PutOperation(o.cli, skipDone, op) | ||
rev, succ, err := optimism.PutOperation(o.cli, skipDone, op, info.ModRevision) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -621,7 +647,7 @@ func (o *Optimist) deleteInfosOps(lock *optimism.Lock) (bool, error) { | |
} | ||
// NOTE: we rely on only `task`, `downSchema`, and `downTable` used for deletion. | ||
initSchema := optimism.NewInitSchema(lock.Task, lock.DownSchema, lock.DownTable, nil) | ||
rev, deleted, err := optimism.DeleteInfosOperationsSchema(o.cli, infos, ops, initSchema) | ||
rev, deleted, err := optimism.DeleteInfosOperationsSchemaColumn(o.cli, infos, ops, initSchema) | ||
if err != nil { | ||
return deleted, err | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I guess this does not effect correctness? OK it may release some memory)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this will affect correctness. We only use this column map when we recover lock tables.