Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): adjust verify checksum method to get the checksum version #8820

Merged
merged 2 commits into from
Apr 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,14 @@ func datum2Column(
// return true if the checksum is matched and the checksum is the matched one.
func (m *mounter) verifyChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
) (uint32, bool, error) {
) (uint32, int, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
return 0, 0, true, nil
}

// when the old value is not enabled, no previous columns so that no need to verify the checksum.
if isPreRow && !m.enableOldValue {
return 0, true, nil
return 0, 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
Expand All @@ -416,14 +416,15 @@ func (m *mounter) verifyChecksum(
decoder = m.decoder
}
if decoder == nil {
return 0, false, errors.New("cannot found the decoder to get the checksum")
return 0, 0, false, errors.New("cannot found the decoder to get the checksum")
}

version := decoder.ChecksumVersion()
// if the checksum cannot be found, which means the upstream TiDB checksum is not enabled,
// so return matched as true to skip check the event.
first, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
return 0, version, true, nil
}

columns := make([]rowcodec.ColData, 0, len(rawColumns))
Expand All @@ -440,12 +441,12 @@ func (m *mounter) verifyChecksum(
checksum, err := calculator.Checksum()
if err != nil {
log.Error("failed to calculate the checksum", zap.Error(err))
return 0, false, errors.Trace(err)
return 0, version, false, errors.Trace(err)
}

// the first checksum matched, it hits in the most case.
if checksum == first {
return checksum, true, nil
return checksum, version, true, nil
}

extra, ok := decoder.GetExtraChecksum()
Expand All @@ -454,22 +455,23 @@ func (m *mounter) verifyChecksum(
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, false, errors.New("cannot found the extra checksum from the event")
return checksum, version,
false, errors.New("cannot found the extra checksum from the event")
}

if checksum == extra {
log.Warn("extra checksum matched, this may happen the upstream TiDB is during the DDL"+
"execution phase",
zap.Uint32("checksum", checksum),
zap.Uint32("extra", extra))
return checksum, true, nil
return checksum, version, true, nil
}

log.Error("checksum mismatch",
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, false, nil
return checksum, version, false, nil
}

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
Expand Down Expand Up @@ -502,11 +504,10 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
preChecksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
checksumVersion = m.encoder.ChecksumVersion()

if !matched {
log.Error("previous columns checksum mismatch",
Expand Down Expand Up @@ -543,7 +544,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

checksum, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
checksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down