Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: lance6716 <[email protected]>
  • Loading branch information
lance6716 committed Feb 5, 2024
1 parent 827ae5a commit 0fee9cf
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 54 deletions.
65 changes: 31 additions & 34 deletions br/pkg/lightning/verification/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ type KVChecksum struct {
checksum uint64
}

// NewKVChecksum creates a new KVChecksum with the given checksum.
func NewKVChecksum(checksum uint64) *KVChecksum {
return &KVChecksum{
checksum: checksum,
}
// NewKVChecksum creates a pointer to zero KVChecksum.
func NewKVChecksum() *KVChecksum {
return &KVChecksum{}
}

// NewKVChecksumWithKeyspace creates a new KVChecksum with the given checksum and keyspace.
Expand Down Expand Up @@ -135,29 +133,30 @@ type KVGroupChecksum struct {
codec tikv.Codec
}

// DataKVGroupAsIndexID represents the index ID for data KV group.
const DataKVGroupAsIndexID = -1
// DataKVGroupID represents the ID for data KV group, as index id starts from 1,
// so we use -1 to represent data kv group.
const DataKVGroupID = -1

// NewKVGroupChecksumWithKeyspace creates a new KVGroupChecksum with the given
// keyspace.
func NewKVGroupChecksumWithKeyspace(k tikv.Codec) *KVGroupChecksum {
m := make(map[int64]*KVChecksum, 8)
m[DataKVGroupAsIndexID] = NewKVChecksumWithKeyspace(k)
m[DataKVGroupID] = NewKVChecksumWithKeyspace(k)
return &KVGroupChecksum{m: m, codec: k}
}

// NewKVGroupChecksumForAdd creates a new KVGroupChecksum and it can't be used
// NewKVGroupChecksumForAdd creates a new KVGroupChecksum, and it can't be used
// with UpdateOneDataKV or UpdateOneIndexKV.
func NewKVGroupChecksumForAdd() *KVGroupChecksum {
m := make(map[int64]*KVChecksum, 8)
m[DataKVGroupAsIndexID] = NewKVChecksum(0)
m[DataKVGroupID] = NewKVChecksum()
return &KVGroupChecksum{m: m}
}

// UpdateOneDataKV updates the checksum with a single data(record) key-value
// pair. It will not check the key-value pair's key is a real data key again.
func (c *KVGroupChecksum) UpdateOneDataKV(kv common.KvPair) {
c.m[DataKVGroupAsIndexID].UpdateOne(kv)
c.m[DataKVGroupID].UpdateOne(kv)
}

// UpdateOneIndexKV updates the checksum with a single index key-value pair. It
Expand All @@ -175,38 +174,36 @@ func (c *KVGroupChecksum) UpdateOneIndexKV(indexID int64, kv common.KvPair) {
// Add adds the checksum of another KVGroupChecksum.
func (c *KVGroupChecksum) Add(other *KVGroupChecksum) {
for id, cksum := range other.m {
thisCksum := c.m[id]
if thisCksum == nil {
if c.codec == nil {
thisCksum = NewKVChecksum(0)
} else {
thisCksum = NewKVChecksumWithKeyspace(c.codec)
}
c.m[id] = thisCksum
}
thisCksum := c.getOrCreateOneGroup(id)
thisCksum.Add(cksum)
}
}

func (c *KVGroupChecksum) getOrCreateOneGroup(id int64) *KVChecksum {
cksum, ok := c.m[id]
if ok {
return cksum
}
if c.codec == nil {
cksum = NewKVChecksum()
} else {
cksum = NewKVChecksumWithKeyspace(c.codec)
}
c.m[id] = cksum
return cksum
}

// AddRawGroup adds the raw information of a KV group.
func (c *KVGroupChecksum) AddRawGroup(id int64, bytes, kvs, checksum uint64) {
oneGroup, ok := c.m[id]
if !ok {
if c.codec == nil {
oneGroup = NewKVChecksum(0)
} else {
oneGroup = NewKVChecksumWithKeyspace(c.codec)
}
c.m[id] = oneGroup
}
oneGroup := c.getOrCreateOneGroup(id)
tmp := MakeKVChecksum(bytes, kvs, checksum)
oneGroup.Add(&tmp)
}

// DataAndIndexSumSize returns the total size of data KV pairs and index KV pairs.
func (c *KVGroupChecksum) DataAndIndexSumSize() (dataSize, indexSize uint64) {
for id, cksum := range c.m {
if id == DataKVGroupAsIndexID {
if id == DataKVGroupID {
dataSize = cksum.SumSize()
} else {
indexSize += cksum.SumSize()
Expand All @@ -218,7 +215,7 @@ func (c *KVGroupChecksum) DataAndIndexSumSize() (dataSize, indexSize uint64) {
// DataAndIndexSumKVS returns the total number of data KV pairs and index KV pairs.
func (c *KVGroupChecksum) DataAndIndexSumKVS() (dataKVS, indexKVS uint64) {
for id, cksum := range c.m {
if id == DataKVGroupAsIndexID {
if id == DataKVGroupID {
dataKVS = cksum.SumKVS()
} else {
indexKVS += cksum.SumKVS()
Expand All @@ -242,9 +239,9 @@ func (c *KVGroupChecksum) GetInnerChecksums() map[int64]*KVChecksum {
return m
}

// MergeGroup merges all groups of this checksum into a single KVChecksum.
func (c *KVGroupChecksum) MergeGroup() KVChecksum {
merged := NewKVChecksum(0)
// MergedChecksum merges all groups of this checksum into a single KVChecksum.
func (c *KVGroupChecksum) MergedChecksum() KVChecksum {
merged := NewKVChecksum()
for _, cksum := range c.m {
merged.Add(cksum)
}
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/verification/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func TestChecksum(t *testing.T) {
checksum := verification.NewKVChecksum(0)
checksum := verification.NewKVChecksum()
require.Equal(t, uint64(0), checksum.Sum())

// checksum on nothing
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestGroupChecksum(t *testing.T) {
inner := c.GetInnerChecksums()
require.Equal(t, 2, len(inner))
require.Equal(t, uint64(1), inner[1].SumKVS())
require.Equal(t, uint64(1), inner[verification.DataKVGroupAsIndexID].SumKVS())
require.Equal(t, uint64(1), inner[verification.DataKVGroupID].SumKVS())

keyspaceCodec := &mockCodec{keyspace: []byte("keyspace")}
keyspaceC := verification.NewKVGroupChecksumWithKeyspace(keyspaceCodec)
Expand All @@ -116,7 +116,7 @@ func TestGroupChecksum(t *testing.T) {
require.Equal(t, 3, len(inner))
require.Equal(t, uint64(2), inner[1].SumKVS())
require.Equal(t, uint64(1), inner[2].SumKVS())
require.Equal(t, uint64(1), inner[verification.DataKVGroupAsIndexID].SumKVS())
require.Equal(t, uint64(1), inner[verification.DataKVGroupID].SumKVS())

dataKVCnt, indexKVCnt := c.DataAndIndexSumKVS()
require.Equal(t, uint64(1), dataKVCnt)
Expand All @@ -126,7 +126,7 @@ func TestGroupChecksum(t *testing.T) {
require.Equal(t, uint64(6), dataSize)
require.Equal(t, uint64(22), indexSize)

merged := c.MergeGroup()
merged := c.MergedChecksum()
require.Equal(t, uint64(4), merged.SumKVS())
require.Equal(t, uint64(28), merged.SumSize())
}
5 changes: 3 additions & 2 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type ImportStepMeta struct {
// this is the engine ID, not the id in tidb_background_subtask table.
ID int32
Chunks []Chunk
Checksum map[int64]Checksum
Checksum map[int64]Checksum // see KVGroupChecksum for definition of map key.
Result Result
// MaxIDs stores the max id that have been used during encoding for each allocator type.
// the max id is same among all allocator types for now, since we're using same base, see
Expand Down Expand Up @@ -94,7 +94,8 @@ type WriteIngestStepMeta struct {

// PostProcessStepMeta is the meta of post process step.
type PostProcessStepMeta struct {
// accumulated checksum of all subtasks in import step.
// accumulated checksum of all subtasks in import step. See KVGroupChecksum for
// definition of map key.
Checksum map[int64]Checksum
// MaxIDs of max all max-ids of subtasks in import step.
MaxIDs map[autoid.AllocatorType]int64
Expand Down
6 changes: 3 additions & 3 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProce
localChecksum := verify.NewKVGroupChecksumForAdd()
for id, cksum := range subtaskMeta.Checksum {
callLog.Info(
"group checksum",
zap.Int64("id", id),
"kv group checksum",
zap.Int64("groupId", id),
zap.Uint64("size", cksum.Size),
zap.Uint64("kvs", cksum.KVs),
zap.Uint64("checksum", cksum.Sum),
Expand All @@ -146,6 +146,6 @@ func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProce
return err
}
return taskManager.WithNewSession(func(se sessionctx.Context) error {
return importer.VerifyChecksum(ctx, &taskMeta.Plan, localChecksum.MergeGroup(), se, logger)
return importer.VerifyChecksum(ctx, &taskMeta.Plan, localChecksum.MergedChecksum(), se, logger)
})
}
3 changes: 0 additions & 3 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,6 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt
KVs: c.SumKVS(),
Size: c.SumSize(),
}
if s.tableImporter.IsGlobalSort() && id == verification.DataKVGroupAsIndexID {
dataKVCount = c.SumKVS()
}
}
subtaskMeta.Result = Result{
LoadedRowCnt: dataKVCount,
Expand Down
11 changes: 8 additions & 3 deletions pkg/executor/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (b *encodedKVGroupBatch) add(kvs *kv.Pairs) error {
// chunkEncoder encodes data from readFn and sends encoded data to sendFn.
type chunkEncoder struct {
readFn encodeReaderFn
offset int64
sendFn func(ctx context.Context, batch *encodedKVGroupBatch) error

chunkName string
Expand All @@ -186,6 +187,7 @@ type chunkEncoder struct {
func newChunkEncoder(
chunkName string,
readFn encodeReaderFn,
offset int64,
sendFn func(ctx context.Context, batch *encodedKVGroupBatch) error,
logger *zap.Logger,
encoder KVEncoder,
Expand All @@ -194,6 +196,7 @@ func newChunkEncoder(
return &chunkEncoder{
chunkName: chunkName,
readFn: readFn,
offset: offset,
sendFn: sendFn,
logger: logger,
encoder: encoder,
Expand All @@ -209,7 +212,7 @@ func (p *chunkEncoder) encodeLoop(ctx context.Context) error {
rowCount int
rowBatch = make([]*kv.Pairs, 0, MinDeliverRowCnt)
rowBatchByteSize uint64
currOffset, prevOffset int64
currOffset int64
)
metrics, _ := metric.GetCommonMetric(ctx)
if metrics != nil {
Expand All @@ -224,8 +227,8 @@ func (p *chunkEncoder) encodeLoop(ctx context.Context) error {
}

if currOffset >= 0 && metrics != nil {
delta := currOffset - prevOffset
prevOffset = currOffset
delta := currOffset - p.offset
p.offset = currOffset
// if we're using split_file, this metric might larger than total
// source file size, as the offset we're using is the reader offset,
// not parser offset, and we'll buffer data.
Expand Down Expand Up @@ -378,6 +381,7 @@ func NewFileChunkProcessor(
enc: newChunkEncoder(
chunk.GetKey(),
parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey()),
chunk.Chunk.Offset,
deliver.sendEncodedData,
chunkLogger,
encoder,
Expand Down Expand Up @@ -522,6 +526,7 @@ func newQueryChunkProcessor(
enc: newChunkEncoder(
chunkName,
queryRowEncodeReader(rowCh),
-1,
deliver.sendEncodedData,
chunkLogger,
encoder,
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ func TestPostProcess(t *testing.T) {

// verify checksum failed
localChecksum := verify.NewKVGroupChecksumForAdd()
localChecksum.AddRawGroup(verify.DataKVGroupAsIndexID, 1, 2, 1)
localChecksum.AddRawGroup(verify.DataKVGroupID, 1, 2, 1)
require.ErrorIs(t, importer.PostProcess(ctx, tk.Session(), nil, plan, localChecksum, logger), common.ErrChecksumMismatch)
// success
localChecksum = verify.NewKVGroupChecksumForAdd()
localChecksum.AddRawGroup(verify.DataKVGroupAsIndexID, 1, 1, 1)
localChecksum.AddRawGroup(verify.DataKVGroupID, 1, 1, 1)
require.NoError(t, importer.PostProcess(ctx, tk.Session(), nil, plan, localChecksum, logger))
// get KV store failed
importer.GetKVStore = func(path string, tls kvconfig.Security) (kv.Storage, error) {
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestProcessChunkWith(t *testing.T) {
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupAsIndexID])
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID])
})

t.Run("query chunk", func(t *testing.T) {
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestProcessChunkWith(t *testing.T) {
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(111, 3, 14231358899564314836), *checksumMap[verify.DataKVGroupAsIndexID])
require.Equal(t, verify.MakeKVChecksum(111, 3, 14231358899564314836), *checksumMap[verify.DataKVGroupID])
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func PostProcess(
return err
}

return VerifyChecksum(ctx, plan, localChecksum.MergeGroup(), se, logger)
return VerifyChecksum(ctx, plan, localChecksum.MergedChecksum(), se, logger)
}

type autoIDRequirement struct {
Expand Down

0 comments on commit 0fee9cf

Please sign in to comment.