This repository has been archived by the owner on Nov 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 188
*: rollback schema in the tracker; fix save table checkpoint in optimistic mode #625
Merged
Merged
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
bd84324
*: rollback schema in the tracker
csuzhangxc 1286e41
*: add GetTable equal ut; fix go vet
csuzhangxc aa14a40
*: add schema tracker revert for pessimistic mode (some cases still n…
csuzhangxc ea14369
*: fix flush jobs for pessimistic shard DDL
csuzhangxc 16b32fc
*: fix for sharding case
csuzhangxc 2217f4d
*: fix for incremental case
csuzhangxc 82af0aa
sycner: try fix checkpoint flush
csuzhangxc e554bf2
*: add interrupt for optimistic mode
csuzhangxc 8e38eef
syncer: refine code
csuzhangxc 98801c3
*: debug sharding case
csuzhangxc 2055fd0
tracker: try don't to change schema lease
csuzhangxc ede77a1
*: add debug log
csuzhangxc 2587136
*: add debug log
csuzhangxc b0c9be6
*: add debug log
csuzhangxc 739f102
*: add debug log
csuzhangxc 271b019
sharding: wait paused before resume
csuzhangxc 930cd71
sharding: fix comment
csuzhangxc ccd88d4
*: refine log and comment
csuzhangxc da2fbf9
*: add startup/exit time for test binaries
csuzhangxc 773d03c
*: remove startup/exit time for dmctl.test
csuzhangxc 4deb977
*: increase watch timeout in unit tests
csuzhangxc 02f5101
Merge branch 'master' into revert-tracker
csuzhangxc 36a3f0d
syncer: fix `saveTablePoint` in optimistic mode
csuzhangxc cce2714
scheduler: increase wait timeout
csuzhangxc 8fffc43
Merge branch 'master' into revert-tracker
csuzhangxc 0c5d865
Merge branch 'master' into revert-tracker
csuzhangxc 846c6b5
sycner: address comments
csuzhangxc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,11 +96,25 @@ func (b *binlogPoint) flush() { | |
b.flushedTI = b.ti | ||
} | ||
|
||
func (b *binlogPoint) rollback() (isSchemaChanged bool) { | ||
func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) { | ||
b.Lock() | ||
defer b.Unlock() | ||
b.location = b.flushedLocation | ||
if isSchemaChanged = b.ti != b.flushedTI; isSchemaChanged { | ||
b.location = b.flushedLocation.Clone() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also |
||
if b.ti == nil { | ||
return // for global checkpoint, no need to rollback the schema. | ||
} | ||
|
||
// NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly, | ||
// and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now. | ||
trackedTi, _ := schemaTracker.GetTable(schema, b.ti.Name.O) // ignore the returned error, only compare `trackerTi` is enough. | ||
// may three versions of schema exist: | ||
// - the one tracked in the TiDB-with-mockTiKV. | ||
// - the one in the checkpoint but not flushed. | ||
// - the one in the checkpoint and flushed. | ||
// if any of them are not equal, then we rollback them: | ||
// - set the one in the checkpoint but not flushed to the one flushed. | ||
// - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now) | ||
if isSchemaChanged = trackedTi != b.ti || b.ti != b.flushedTI; isSchemaChanged { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about adding some There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added in 846c6b5. |
||
b.ti = b.flushedTI | ||
} | ||
return | ||
|
@@ -314,7 +328,7 @@ func (cp *RemoteCheckPoint) saveTablePoint(sourceSchema, sourceTable string, loc | |
} | ||
|
||
// we save table checkpoint while we meet DDL or DML | ||
cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("loaction", location), zap.String("schema", sourceSchema), zap.String("table", sourceTable)) | ||
cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("location", location), zap.String("schema", sourceSchema), zap.String("table", sourceTable)) | ||
mSchema, ok := cp.points[sourceSchema] | ||
if !ok { | ||
mSchema = make(map[string]*binlogPoint) | ||
|
@@ -397,6 +411,7 @@ func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, | |
return true | ||
} | ||
oldLocation := point.MySQLLocation() | ||
cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation)) | ||
|
||
if gte { | ||
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) >= 0 | ||
|
@@ -530,15 +545,17 @@ func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { | |
func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) { | ||
cp.RLock() | ||
defer cp.RUnlock() | ||
cp.globalPoint.rollback() | ||
cp.globalPoint.rollback(schemaTracker, "") | ||
for schema, mSchema := range cp.points { | ||
for table, point := range mSchema { | ||
logger := cp.logCtx.L().WithFields(zap.String("schema", schema), zap.String("table", table)) | ||
logger.Info("rollback checkpoint", log.WrapStringerField("checkpoint", point)) | ||
if point.rollback() { | ||
logger.Debug("try to rollback checkpoint", log.WrapStringerField("checkpoint", point)) | ||
from := point.MySQLLocation() | ||
if point.rollback(schemaTracker, schema) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also see #629. |
||
logger.Info("rollback checkpoint", zap.Stringer("from", from), zap.Stringer("to", point.FlushedMySQLLocation())) | ||
// schema changed | ||
if err := schemaTracker.DropTable(schema, table); err != nil { | ||
logger.Debug("failed to drop table from schema tracker", log.ShortError(err)) | ||
logger.Warn("failed to drop table from schema tracker", log.ShortError(err)) | ||
} | ||
if point.ti != nil { | ||
// TODO: Figure out how to recover from errors. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
package syncer | ||
|
||
import ( | ||
"github.com/pingcap/failpoint" | ||
"github.com/pingcap/parser/ast" | ||
"github.com/pingcap/parser/model" | ||
"github.com/pingcap/tidb-tools/pkg/filter" | ||
|
@@ -48,10 +49,13 @@ func (s *Syncer) handleQueryEventOptimistic( | |
ev *replication.QueryEvent, ec eventContext, | ||
needHandleDDLs []string, needTrackDDLs []trackedDDL, | ||
onlineDDLTableNames map[string]*filter.Table) error { | ||
// wait previous DMLs to be replicated | ||
if err := s.flushJobs(); err != nil { | ||
return err | ||
} | ||
// interrupted after flush old checkpoint and before track DDL. | ||
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { | ||
err := handleFlushCheckpointStage(1, val.(int), "before track DDL") | ||
if err != nil { | ||
failpoint.Return(err) | ||
} | ||
}) | ||
|
||
var ( | ||
upSchema string | ||
|
@@ -92,10 +96,8 @@ func (s *Syncer) handleQueryEventOptimistic( | |
return err | ||
} | ||
|
||
s.tctx.L().Info("save table checkpoint", zap.String("event", "query"), | ||
zap.String("schema", upSchema), zap.String("table", upTable), | ||
zap.Strings("ddls", needHandleDDLs), log.WrapStringerField("location", ec.currentLocation)) | ||
s.saveTablePoint(upSchema, upTable, ec.currentLocation.Clone()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. found a bug relative to #584. |
||
// in optimistic mode, don't `saveTablePoint` before execute DDL, | ||
// because it has no `UnresolvedTables` to prevent the flush of this checkpoint. | ||
|
||
info := s.optimist.ConstructInfo(upSchema, upTable, downSchema, downTable, needHandleDDLs, tiBefore, tiAfter) | ||
|
||
|
@@ -159,6 +161,14 @@ func (s *Syncer) handleQueryEventOptimistic( | |
needHandleDDLs = appliedSQLs // maybe nil | ||
} | ||
|
||
// interrupted after track DDL and before execute DDL. | ||
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { | ||
err = handleFlushCheckpointStage(2, val.(int), "before execute DDL") | ||
if err != nil { | ||
failpoint.Return(err) | ||
} | ||
}) | ||
|
||
ddlInfo := &shardingDDLInfo{ | ||
name: needTrackDDLs[0].tableNames[0][0].String(), | ||
tableNames: needTrackDDLs[0].tableNames, | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#628