-
Notifications
You must be signed in to change notification settings - Fork 188
bug-fix: use table-info-before always and fix bug for recover lock in optimistic #1518
Conversation
Is this PR still WIP? |
pkg/shardddl/optimism/lock.go
Outdated
@@ -72,7 +72,7 @@ func NewLock(ID, task, downSchema, downTable string, ti *model.TableInfo, tts [] | |||
synced: true, | |||
versions: make(map[string]map[string]map[string]int64), | |||
} | |||
l.addTables(tts) | |||
l.addTables(l.joined, tts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if tts
contains the other tables whose schema didn't reach l.joined
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here l.joined equals tableInfoBefore. If it's a new lock(first table), tableInfoBefore=l.joined. For other newer tables, they will be added in L163
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tts
here is from https://github.com/pingcap/dm/blob/master/dm/master/shardddl/optimist.go#L267. optimistic
will get all source tables from etcd and this tts
may have not only the table in this info, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
em... That's right, I will fix it
// handle the case where <callerSource, callerSchema, callerTable> | ||
// is not in old source tables and current new source tables. | ||
// duplicate append is not a problem. | ||
tts = append(tts, newTargetTable(l.Task, callerSource, l.DownSchema, l.DownTable, | ||
map[string]map[string]struct{}{callerSchema: {callerTable: struct{}{}}})) | ||
// add any new source tables. | ||
l.addTables(tts) | ||
l.receiveTable(callerSource, callerSchema, callerTable, oldTable) | ||
if val, ok := l.versions[callerSource][callerSchema][callerTable]; !ok || val < infoVersion { | ||
l.versions[callerSource][callerSchema][callerTable] = infoVersion | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now we changed the l.tables
by l.receiveTable
, so in line 179 oldJoined
is not a result of "join every l.tables" 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean join all the tables for old joined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just not sure if the correctness depends on "old joined should be joined result of all l.tables", or "old joined of previous TrySync and new joined of next TrySync should keep consistent"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we assume user start-task with same table info in all upstream, tableInfoBefore should be same as oldJoined or oldtable. Otherwise, the oldJoined is not collect and we may keep it and hope later join may return an error? cc @lichunzhu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If dm-master/dm-worker meets a restart the table info in all upstream is more likely to be different. What's the problem here? I don't understand this clearly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if the tableInfoBefore not equal l.tables[source][schema][table], that means the l.tables[source][schema][table] and oldjoined may wrong, do we need join all the tables with tableInfoBefore as oldjoined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if the tableInfoBefore not equal l.tables[source][schema][table], that means the l.tables[source][schema][table] and oldjoined may wrong, do we need join all the tables with tableInfoBefore as oldjoined?
I think carefully about this situation. Could you help me check if I'm wrong?
When this happens, it means we add a table whose table schema is not equal to joined table info now. When we init all the tables, we have three situations:
- This task is new. The pre-check will assure the correctness of the tables' schemas.
- This dm cluster recovers from a failure. The lock is synced now and all the tables have the same schema. If we add a table with a different schema, we should report an error.
- This dm cluster recovers from a failure. The lock is unsynced now. Some workers didn't put their lock Info into etcd before the restart. These tables'
l.tables
will be set tol.joined
at first, and if it's different from thetableInfoBefore
received later I think it's acceptable. Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you mean if we get a info which we have already received the table info before, if their schema is different, we should report an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. But I don't think we should report an error in situation 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some workers didn't put their lock Info into etcd before the restart. These tables' l.tables will be set to l.joined at first
That means the table haven't been received. l.received[x][x][x]==false
In unit test, we have some idempotent TrySync, how do we deal with the case? 🤔
tests/shardddl1/run.sh
Outdated
# start=1 | ||
# end=35 | ||
# except=(024 025 029) | ||
# for i in $(seq -f "%03g" ${start} ${end}); do | ||
# if [[ ${except[@]} =~ $i ]]; then | ||
# continue | ||
# fi | ||
# DM_${i} | ||
# sleep 1 | ||
# done | ||
# DM_RENAME_COLUMN_OPTIMISTIC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will review later
newJoined, err := joined.Join(schemacmp.Encode(info.TableInfoBefore)) | ||
// ignore error, will report it in TrySync later | ||
if err != nil { | ||
o.logger.Error(fmt.Sprintf("fail to join table info %s with %s, lockID: %s in recover lock", joined, newJoined, lockID), log.ShortError(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could add a alerting rule for errors of Optimist initialization in future
infos = sortInfos(ifm) | ||
c.Assert(len(infos), Equals, 3) | ||
|
||
i11.Version = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we could replace Info.Version
by Revision
(could file another PR)
i11 = NewInfo(task, source1, upSchema, upTable, downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}) | ||
i21 = NewInfo(task, source2, upSchema, upTable, downSchema, downTable, DDLs2, ti2, []*model.TableInfo{ti3}) | ||
|
||
tts = []TargetTable{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"target" in TargetTable
makes me think of "source/target", where the "target" means downstream tables. We could rename it to "TableMap" or something in future
pkg/shardddl/optimism/lock_test.go
Outdated
@@ -1580,3 +1584,81 @@ func newInfoWithVersion(task, source, upSchema, upTable, downSchema, downTable s | |||
info.Version = vers[source][upSchema][upTable] | |||
return info | |||
} | |||
|
|||
func (t *testLock) TestLockTrySyncReceived(c *C) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which case does it test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename in 814d7a7 . Now we use table-info-before for all table, create a different index will not be in conflict now.
Co-authored-by: lance6716 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
run_sql_source2 "insert into ${shardddl1}.${tb2} values(7,7);" | ||
check_log_contain_with_retry "putted a shard DDL.*tb2.*ALTER TABLE .* ADD COLUMN" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | ||
|
||
# recover lock, tb1's info: (a,b,c)->(a,c); tb2's info: (a)->(a,b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we cover case#4 at this place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think yes? Though the order cannot determine, but we don't depend on the order now
/lgtm |
/lgtm |
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by writing |
/run-compatibility-tests |
Signed-off-by: ti-srebot <[email protected]>
cherry pick to release-2.0 in PR #1536 |
What problem does this PR solve?
#1510 case4
What is changed and how it works?
Check List
Tests