Skip to content

Commit

Permalink
mounter(ticdc): adjust verify checksum method to get the checksum ver…
Browse files Browse the repository at this point in the history
…sion (#8820) (#8825)

close #8819
  • Loading branch information
ti-chi-bot authored Apr 20, 2023
1 parent b20e1ca commit 4d633bf
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,14 @@ func datum2Column(
// return true if the checksum is matched and the checksum is the matched one.
func (m *mounter) verifyChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
) (uint32, bool, error) {
) (uint32, int, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
return 0, 0, true, nil
}

// when the old value is not enabled, no previous columns so that no need to verify the checksum.
if isPreRow && !m.enableOldValue {
return 0, true, nil
return 0, 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
Expand All @@ -416,14 +416,15 @@ func (m *mounter) verifyChecksum(
decoder = m.decoder
}
if decoder == nil {
return 0, false, errors.New("cannot found the decoder to get the checksum")
return 0, 0, false, errors.New("cannot found the decoder to get the checksum")
}

version := decoder.ChecksumVersion()
// if the checksum cannot be found, which means the upstream TiDB checksum is not enabled,
// so return matched as true to skip check the event.
first, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
return 0, version, true, nil
}

columns := make([]rowcodec.ColData, 0, len(rawColumns))
Expand All @@ -440,12 +441,12 @@ func (m *mounter) verifyChecksum(
checksum, err := calculator.Checksum()
if err != nil {
log.Error("failed to calculate the checksum", zap.Error(err))
return 0, false, errors.Trace(err)
return 0, version, false, errors.Trace(err)
}

// the first checksum matched, it hits in the most case.
if checksum == first {
return checksum, true, nil
return checksum, version, true, nil
}

extra, ok := decoder.GetExtraChecksum()
Expand All @@ -454,22 +455,23 @@ func (m *mounter) verifyChecksum(
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, false, errors.New("cannot found the extra checksum from the event")
return checksum, version,
false, errors.New("cannot found the extra checksum from the event")
}

if checksum == extra {
log.Warn("extra checksum matched, this may happen the upstream TiDB is during the DDL"+
"execution phase",
zap.Uint32("checksum", checksum),
zap.Uint32("extra", extra))
return checksum, true, nil
return checksum, version, true, nil
}

log.Error("checksum mismatch",
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, false, nil
return checksum, version, false, nil
}

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
Expand Down Expand Up @@ -502,11 +504,10 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
preChecksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
checksumVersion = m.encoder.ChecksumVersion()

if !matched {
log.Error("previous columns checksum mismatch",
Expand Down Expand Up @@ -543,7 +544,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

checksum, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
checksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand All @@ -558,7 +559,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
}
corrupted = true
}
checksumVersion = m.encoder.ChecksumVersion()
}

schemaName := tableInfo.TableName.Schema
Expand Down

0 comments on commit 4d633bf

Please sign in to comment.