diff --git a/br/pkg/lightning/verification/checksum.go b/br/pkg/lightning/verification/checksum.go index 45d60b52c2e88..d30e143b301ad 100644 --- a/br/pkg/lightning/verification/checksum.go +++ b/br/pkg/lightning/verification/checksum.go @@ -35,11 +35,9 @@ type KVChecksum struct { checksum uint64 } -// NewKVChecksum creates a new KVChecksum with the given checksum. -func NewKVChecksum(checksum uint64) *KVChecksum { - return &KVChecksum{ - checksum: checksum, - } +// NewKVChecksum creates a pointer to zero KVChecksum. +func NewKVChecksum() *KVChecksum { + return &KVChecksum{} } // NewKVChecksumWithKeyspace creates a new KVChecksum with the given checksum and keyspace. @@ -135,29 +133,30 @@ type KVGroupChecksum struct { codec tikv.Codec } -// DataKVGroupAsIndexID represents the index ID for data KV group. -const DataKVGroupAsIndexID = -1 +// DataKVGroupID represents the ID for data KV group, as index id starts from 1, +// so we use -1 to represent data kv group. +const DataKVGroupID = -1 // NewKVGroupChecksumWithKeyspace creates a new KVGroupChecksum with the given // keyspace. func NewKVGroupChecksumWithKeyspace(k tikv.Codec) *KVGroupChecksum { m := make(map[int64]*KVChecksum, 8) - m[DataKVGroupAsIndexID] = NewKVChecksumWithKeyspace(k) + m[DataKVGroupID] = NewKVChecksumWithKeyspace(k) return &KVGroupChecksum{m: m, codec: k} } -// NewKVGroupChecksumForAdd creates a new KVGroupChecksum and it can't be used +// NewKVGroupChecksumForAdd creates a new KVGroupChecksum, and it can't be used // with UpdateOneDataKV or UpdateOneIndexKV. func NewKVGroupChecksumForAdd() *KVGroupChecksum { m := make(map[int64]*KVChecksum, 8) - m[DataKVGroupAsIndexID] = NewKVChecksum(0) + m[DataKVGroupID] = NewKVChecksum() return &KVGroupChecksum{m: m} } // UpdateOneDataKV updates the checksum with a single data(record) key-value // pair. It will not check the key-value pair's key is a real data key again. func (c *KVGroupChecksum) UpdateOneDataKV(kv common.KvPair) { - c.m[DataKVGroupAsIndexID].UpdateOne(kv) + c.m[DataKVGroupID].UpdateOne(kv) } // UpdateOneIndexKV updates the checksum with a single index key-value pair. It @@ -175,30 +174,28 @@ func (c *KVGroupChecksum) UpdateOneIndexKV(indexID int64, kv common.KvPair) { // Add adds the checksum of another KVGroupChecksum. func (c *KVGroupChecksum) Add(other *KVGroupChecksum) { for id, cksum := range other.m { - thisCksum := c.m[id] - if thisCksum == nil { - if c.codec == nil { - thisCksum = NewKVChecksum(0) - } else { - thisCksum = NewKVChecksumWithKeyspace(c.codec) - } - c.m[id] = thisCksum - } + thisCksum := c.getOrCreateOneGroup(id) thisCksum.Add(cksum) } } +func (c *KVGroupChecksum) getOrCreateOneGroup(id int64) *KVChecksum { + cksum, ok := c.m[id] + if ok { + return cksum + } + if c.codec == nil { + cksum = NewKVChecksum() + } else { + cksum = NewKVChecksumWithKeyspace(c.codec) + } + c.m[id] = cksum + return cksum +} + // AddRawGroup adds the raw information of a KV group. func (c *KVGroupChecksum) AddRawGroup(id int64, bytes, kvs, checksum uint64) { - oneGroup, ok := c.m[id] - if !ok { - if c.codec == nil { - oneGroup = NewKVChecksum(0) - } else { - oneGroup = NewKVChecksumWithKeyspace(c.codec) - } - c.m[id] = oneGroup - } + oneGroup := c.getOrCreateOneGroup(id) tmp := MakeKVChecksum(bytes, kvs, checksum) oneGroup.Add(&tmp) } @@ -206,7 +203,7 @@ func (c *KVGroupChecksum) AddRawGroup(id int64, bytes, kvs, checksum uint64) { // DataAndIndexSumSize returns the total size of data KV pairs and index KV pairs. func (c *KVGroupChecksum) DataAndIndexSumSize() (dataSize, indexSize uint64) { for id, cksum := range c.m { - if id == DataKVGroupAsIndexID { + if id == DataKVGroupID { dataSize = cksum.SumSize() } else { indexSize += cksum.SumSize() @@ -218,7 +215,7 @@ func (c *KVGroupChecksum) DataAndIndexSumSize() (dataSize, indexSize uint64) { // DataAndIndexSumKVS returns the total number of data KV pairs and index KV pairs. func (c *KVGroupChecksum) DataAndIndexSumKVS() (dataKVS, indexKVS uint64) { for id, cksum := range c.m { - if id == DataKVGroupAsIndexID { + if id == DataKVGroupID { dataKVS = cksum.SumKVS() } else { indexKVS += cksum.SumKVS() @@ -242,9 +239,9 @@ func (c *KVGroupChecksum) GetInnerChecksums() map[int64]*KVChecksum { return m } -// MergeGroup merges all groups of this checksum into a single KVChecksum. -func (c *KVGroupChecksum) MergeGroup() KVChecksum { - merged := NewKVChecksum(0) +// MergedChecksum merges all groups of this checksum into a single KVChecksum. +func (c *KVGroupChecksum) MergedChecksum() KVChecksum { + merged := NewKVChecksum() for _, cksum := range c.m { merged.Add(cksum) } diff --git a/br/pkg/lightning/verification/checksum_test.go b/br/pkg/lightning/verification/checksum_test.go index 57caca5d1284a..68adbf025cef6 100644 --- a/br/pkg/lightning/verification/checksum_test.go +++ b/br/pkg/lightning/verification/checksum_test.go @@ -25,7 +25,7 @@ import ( ) func TestChecksum(t *testing.T) { - checksum := verification.NewKVChecksum(0) + checksum := verification.NewKVChecksum() require.Equal(t, uint64(0), checksum.Sum()) // checksum on nothing @@ -99,7 +99,7 @@ func TestGroupChecksum(t *testing.T) { inner := c.GetInnerChecksums() require.Equal(t, 2, len(inner)) require.Equal(t, uint64(1), inner[1].SumKVS()) - require.Equal(t, uint64(1), inner[verification.DataKVGroupAsIndexID].SumKVS()) + require.Equal(t, uint64(1), inner[verification.DataKVGroupID].SumKVS()) keyspaceCodec := &mockCodec{keyspace: []byte("keyspace")} keyspaceC := verification.NewKVGroupChecksumWithKeyspace(keyspaceCodec) @@ -116,7 +116,7 @@ func TestGroupChecksum(t *testing.T) { require.Equal(t, 3, len(inner)) require.Equal(t, uint64(2), inner[1].SumKVS()) require.Equal(t, uint64(1), inner[2].SumKVS()) - require.Equal(t, uint64(1), inner[verification.DataKVGroupAsIndexID].SumKVS()) + require.Equal(t, uint64(1), inner[verification.DataKVGroupID].SumKVS()) dataKVCnt, indexKVCnt := c.DataAndIndexSumKVS() require.Equal(t, uint64(1), dataKVCnt) @@ -126,7 +126,7 @@ func TestGroupChecksum(t *testing.T) { require.Equal(t, uint64(6), dataSize) require.Equal(t, uint64(22), indexSize) - merged := c.MergeGroup() + merged := c.MergedChecksum() require.Equal(t, uint64(4), merged.SumKVS()) require.Equal(t, uint64(28), merged.SumSize()) } diff --git a/pkg/disttask/importinto/proto.go b/pkg/disttask/importinto/proto.go index e83e91e84947c..88a53fbc19986 100644 --- a/pkg/disttask/importinto/proto.go +++ b/pkg/disttask/importinto/proto.go @@ -53,7 +53,7 @@ type ImportStepMeta struct { // this is the engine ID, not the id in tidb_background_subtask table. ID int32 Chunks []Chunk - Checksum map[int64]Checksum + Checksum map[int64]Checksum // see KVGroupChecksum for definition of map key. Result Result // MaxIDs stores the max id that have been used during encoding for each allocator type. // the max id is same among all allocator types for now, since we're using same base, see @@ -94,7 +94,8 @@ type WriteIngestStepMeta struct { // PostProcessStepMeta is the meta of post process step. type PostProcessStepMeta struct { - // accumulated checksum of all subtasks in import step. + // accumulated checksum of all subtasks in import step. See KVGroupChecksum for + // definition of map key. Checksum map[int64]Checksum // MaxIDs of max all max-ids of subtasks in import step. MaxIDs map[autoid.AllocatorType]int64 diff --git a/pkg/disttask/importinto/subtask_executor.go b/pkg/disttask/importinto/subtask_executor.go index a10bf75fb7351..5202dc70d02d9 100644 --- a/pkg/disttask/importinto/subtask_executor.go +++ b/pkg/disttask/importinto/subtask_executor.go @@ -131,8 +131,8 @@ func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProce localChecksum := verify.NewKVGroupChecksumForAdd() for id, cksum := range subtaskMeta.Checksum { callLog.Info( - "group checksum", - zap.Int64("id", id), + "kv group checksum", + zap.Int64("groupId", id), zap.Uint64("size", cksum.Size), zap.Uint64("kvs", cksum.KVs), zap.Uint64("checksum", cksum.Sum), @@ -146,6 +146,6 @@ func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProce return err } return taskManager.WithNewSession(func(se sessionctx.Context) error { - return importer.VerifyChecksum(ctx, &taskMeta.Plan, localChecksum.MergeGroup(), se, logger) + return importer.VerifyChecksum(ctx, &taskMeta.Plan, localChecksum.MergedChecksum(), se, logger) }) } diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 14089f942095d..c13b7ddd56458 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -232,9 +232,6 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt KVs: c.SumKVS(), Size: c.SumSize(), } - if s.tableImporter.IsGlobalSort() && id == verification.DataKVGroupAsIndexID { - dataKVCount = c.SumKVS() - } } subtaskMeta.Result = Result{ LoadedRowCnt: dataKVCount, diff --git a/pkg/executor/importer/chunk_process.go b/pkg/executor/importer/chunk_process.go index 45c499ab01143..b456472a27d0b 100644 --- a/pkg/executor/importer/chunk_process.go +++ b/pkg/executor/importer/chunk_process.go @@ -169,6 +169,7 @@ func (b *encodedKVGroupBatch) add(kvs *kv.Pairs) error { // chunkEncoder encodes data from readFn and sends encoded data to sendFn. type chunkEncoder struct { readFn encodeReaderFn + offset int64 sendFn func(ctx context.Context, batch *encodedKVGroupBatch) error chunkName string @@ -186,6 +187,7 @@ type chunkEncoder struct { func newChunkEncoder( chunkName string, readFn encodeReaderFn, + offset int64, sendFn func(ctx context.Context, batch *encodedKVGroupBatch) error, logger *zap.Logger, encoder KVEncoder, @@ -194,6 +196,7 @@ func newChunkEncoder( return &chunkEncoder{ chunkName: chunkName, readFn: readFn, + offset: offset, sendFn: sendFn, logger: logger, encoder: encoder, @@ -209,7 +212,7 @@ func (p *chunkEncoder) encodeLoop(ctx context.Context) error { rowCount int rowBatch = make([]*kv.Pairs, 0, MinDeliverRowCnt) rowBatchByteSize uint64 - currOffset, prevOffset int64 + currOffset int64 ) metrics, _ := metric.GetCommonMetric(ctx) if metrics != nil { @@ -224,8 +227,8 @@ func (p *chunkEncoder) encodeLoop(ctx context.Context) error { } if currOffset >= 0 && metrics != nil { - delta := currOffset - prevOffset - prevOffset = currOffset + delta := currOffset - p.offset + p.offset = currOffset // if we're using split_file, this metric might larger than total // source file size, as the offset we're using is the reader offset, // not parser offset, and we'll buffer data. @@ -378,6 +381,7 @@ func NewFileChunkProcessor( enc: newChunkEncoder( chunk.GetKey(), parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey()), + chunk.Chunk.Offset, deliver.sendEncodedData, chunkLogger, encoder, @@ -522,6 +526,7 @@ func newQueryChunkProcessor( enc: newChunkEncoder( chunkName, queryRowEncodeReader(rowCh), + -1, deliver.sendEncodedData, chunkLogger, encoder, diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index f8da189ac6e70..7665238bc2916 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -225,11 +225,11 @@ func TestPostProcess(t *testing.T) { // verify checksum failed localChecksum := verify.NewKVGroupChecksumForAdd() - localChecksum.AddRawGroup(verify.DataKVGroupAsIndexID, 1, 2, 1) + localChecksum.AddRawGroup(verify.DataKVGroupID, 1, 2, 1) require.ErrorIs(t, importer.PostProcess(ctx, tk.Session(), nil, plan, localChecksum, logger), common.ErrChecksumMismatch) // success localChecksum = verify.NewKVGroupChecksumForAdd() - localChecksum.AddRawGroup(verify.DataKVGroupAsIndexID, 1, 1, 1) + localChecksum.AddRawGroup(verify.DataKVGroupID, 1, 1, 1) require.NoError(t, importer.PostProcess(ctx, tk.Session(), nil, plan, localChecksum, logger)) // get KV store failed importer.GetKVStore = func(path string, tls kvconfig.Security) (kv.Storage, error) { @@ -328,7 +328,7 @@ func TestProcessChunkWith(t *testing.T) { require.Len(t, progress.GetColSize(), 3) checksumMap := checksum.GetInnerChecksums() require.Len(t, checksumMap, 1) - require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupAsIndexID]) + require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID]) }) t.Run("query chunk", func(t *testing.T) { @@ -360,7 +360,7 @@ func TestProcessChunkWith(t *testing.T) { require.Len(t, progress.GetColSize(), 3) checksumMap := checksum.GetInnerChecksums() require.Len(t, checksumMap, 1) - require.Equal(t, verify.MakeKVChecksum(111, 3, 14231358899564314836), *checksumMap[verify.DataKVGroupAsIndexID]) + require.Equal(t, verify.MakeKVChecksum(111, 3, 14231358899564314836), *checksumMap[verify.DataKVGroupID]) }) } diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 31491d6afeba9..15214346a3b01 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -801,7 +801,7 @@ func PostProcess( return err } - return VerifyChecksum(ctx, plan, localChecksum.MergeGroup(), se, logger) + return VerifyChecksum(ctx, plan, localChecksum.MergedChecksum(), se, logger) } type autoIDRequirement struct {