From 4d633bfd28c0305109c52a01cd1304b436c0dfba Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 21 Apr 2023 00:41:19 +0800 Subject: [PATCH] mounter(ticdc): adjust verify checksum method to get the checksum version (#8820) (#8825) close pingcap/tiflow#8819 --- cdc/entry/mounter.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index b7315c6af3f..7fdc476f522 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -399,14 +399,14 @@ func datum2Column( // return true if the checksum is matched and the checksum is the matched one. func (m *mounter) verifyChecksum( columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool, -) (uint32, bool, error) { +) (uint32, int, bool, error) { if !m.integrity.Enabled() { - return 0, true, nil + return 0, 0, true, nil } // when the old value is not enabled, no previous columns so that no need to verify the checksum. if isPreRow && !m.enableOldValue { - return 0, true, nil + return 0, 0, true, nil } var decoder *rowcodec.DatumMapDecoder @@ -416,14 +416,15 @@ func (m *mounter) verifyChecksum( decoder = m.decoder } if decoder == nil { - return 0, false, errors.New("cannot found the decoder to get the checksum") + return 0, 0, false, errors.New("cannot found the decoder to get the checksum") } + version := decoder.ChecksumVersion() // if the checksum cannot be found, which means the upstream TiDB checksum is not enabled, // so return matched as true to skip check the event. first, ok := decoder.GetChecksum() if !ok { - return 0, true, nil + return 0, version, true, nil } columns := make([]rowcodec.ColData, 0, len(rawColumns)) @@ -440,12 +441,12 @@ func (m *mounter) verifyChecksum( checksum, err := calculator.Checksum() if err != nil { log.Error("failed to calculate the checksum", zap.Error(err)) - return 0, false, errors.Trace(err) + return 0, version, false, errors.Trace(err) } // the first checksum matched, it hits in the most case. if checksum == first { - return checksum, true, nil + return checksum, version, true, nil } extra, ok := decoder.GetExtraChecksum() @@ -454,7 +455,8 @@ func (m *mounter) verifyChecksum( zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra)) - return checksum, false, errors.New("cannot found the extra checksum from the event") + return checksum, version, + false, errors.New("cannot found the extra checksum from the event") } if checksum == extra { @@ -462,14 +464,14 @@ func (m *mounter) verifyChecksum( "execution phase", zap.Uint32("checksum", checksum), zap.Uint32("extra", extra)) - return checksum, true, nil + return checksum, version, true, nil } log.Error("checksum mismatch", zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra)) - return checksum, false, nil + return checksum, version, false, nil } func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) { @@ -502,11 +504,10 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d return nil, rawRow, errors.Trace(err) } - preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true) + preChecksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, preRawCols, true) if err != nil { return nil, rawRow, errors.Trace(err) } - checksumVersion = m.encoder.ChecksumVersion() if !matched { log.Error("previous columns checksum mismatch", @@ -543,7 +544,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d return nil, rawRow, errors.Trace(err) } - checksum, matched, err = m.verifyChecksum(columnInfos, rawCols, false) + checksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -558,7 +559,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d } corrupted = true } - checksumVersion = m.encoder.ChecksumVersion() } schemaName := tableInfo.TableName.Schema