diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index fef655348c4a1..4f077b1e1a756 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -153,6 +153,21 @@ type Pairs struct { MemBuf *MemBuf } +// GroupedPairs is a map from index ID to KvPairs. +type GroupedPairs map[int64][]common.KvPair + +// SplitIntoChunks implements the encode.Rows interface. It just satisfies the +// type system and should never be called. +func (GroupedPairs) SplitIntoChunks(int) []encode.Rows { + panic("not implemented") +} + +// Clear implements the encode.Rows interface. It just satisfies the type system +// and should never be called. +func (GroupedPairs) Clear() encode.Rows { + panic("not implemented") +} + // MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is // mainly used for testing only. The resulting Rows instance should only be used // for the importer backend. @@ -171,7 +186,21 @@ func MakeRowFromKvPairs(pairs []common.KvPair) encode.Row { // back into a slice of KvPair. This method panics if the Rows is not // constructed in such way. func Rows2KvPairs(rows encode.Rows) []common.KvPair { - return rows.(*Pairs).Pairs + switch v := rows.(type) { + case *Pairs: + return v.Pairs + case GroupedPairs: + cnt := 0 + for _, pairs := range v { + cnt += len(pairs) + } + res := make([]common.KvPair, 0, cnt) + for _, pairs := range v { + res = append(res, pairs...) + } + return res + } + panic(fmt.Sprintf("unknown Rows type %T", rows)) } // Row2KvPairs converts a Row instance constructed from MakeRowFromKvPairs diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 6fdc16688a869..86cc2f65c5cf8 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -112,7 +112,7 @@ go_test( ], embed = [":common"], flaky = True, - shard_count = 29, + shard_count = 28, deps = [ "//br/pkg/errors", "//br/pkg/lightning/log", diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index e5c9cffd46f59..7ebbcd4e62a3d 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -454,13 +454,16 @@ func KillMySelf() error { return errors.Trace(err) } -// KvPair is a pair of key and value. +// KvPair contains a key-value pair and other fields that can be used to ingest +// KV pairs into TiKV. type KvPair struct { // Key is the key of the KV pair Key []byte // Val is the value of the KV pair Val []byte - // RowID is the row id of the KV pair. + // RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It's + // often set to the file offset of the KvPair in the source file or the record + // handle. RowID []byte } @@ -482,19 +485,6 @@ func TableHasAutoID(info *model.TableInfo) bool { return TableHasAutoRowID(info) || info.GetAutoIncrementColInfo() != nil || info.ContainsAutoRandomBits() } -// StringSliceEqual checks if two string slices are equal. -func StringSliceEqual(a, b []string) bool { - if len(a) != len(b) { - return false - } - for i, v := range a { - if v != b[i] { - return false - } - } - return true -} - // GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it. // todo: better put in ddl package, but this will cause import cycle since ddl package import lightning func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo { diff --git a/br/pkg/lightning/common/util_test.go b/br/pkg/lightning/common/util_test.go index 11b126a2ebdd8..774319d7300a7 100644 --- a/br/pkg/lightning/common/util_test.go +++ b/br/pkg/lightning/common/util_test.go @@ -163,19 +163,6 @@ func TestSQLWithRetry(t *testing.T) { require.Nil(t, mock.ExpectationsWereMet()) } -func TestStringSliceEqual(t *testing.T) { - assert.True(t, common.StringSliceEqual(nil, nil)) - assert.True(t, common.StringSliceEqual(nil, []string{})) - assert.False(t, common.StringSliceEqual(nil, []string{"a"})) - assert.False(t, common.StringSliceEqual([]string{"a"}, nil)) - assert.True(t, common.StringSliceEqual([]string{"a"}, []string{"a"})) - assert.False(t, common.StringSliceEqual([]string{"a"}, []string{"b"})) - assert.True(t, common.StringSliceEqual([]string{"a", "b", "c"}, []string{"a", "b", "c"})) - assert.False(t, common.StringSliceEqual([]string{"a"}, []string{"a", "b", "c"})) - assert.False(t, common.StringSliceEqual([]string{"a", "b", "c"}, []string{"a", "b"})) - assert.False(t, common.StringSliceEqual([]string{"a", "x", "y"}, []string{"a", "y", "x"})) -} - func TestInterpolateMySQLString(t *testing.T) { assert.Equal(t, "'123'", common.InterpolateMySQLString("123")) assert.Equal(t, "'1''23'", common.InterpolateMySQLString("1'23")) diff --git a/br/pkg/lightning/config/BUILD.bazel b/br/pkg/lightning/config/BUILD.bazel index 1ae65a73dfd5e..68b51db7b3efd 100644 --- a/br/pkg/lightning/config/BUILD.bazel +++ b/br/pkg/lightning/config/BUILD.bazel @@ -44,7 +44,6 @@ go_test( flaky = True, shard_count = 48, deps = [ - "//br/pkg/lightning/common", "@com_github_burntsushi_toml//:toml", "@com_github_stretchr_testify//require", ], diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index d7047b406c6aa..1605bd6b45acd 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/BurntSushi/toml" - "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/stretchr/testify/require" ) @@ -1221,35 +1220,22 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) { [mydumper] filter = ["db1.tbl1", "db2.*", "!db2.tbl1"] `))) - require.Equal(t, 3, len(cfg1.Mydumper.Filter)) - require.True(t, common.StringSliceEqual( - cfg1.Mydumper.Filter, - []string{"db1.tbl1", "db2.*", "!db2.tbl1"}, - )) - require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg)) + require.Equal(t, []string{"db1.tbl1", "db2.*", "!db2.tbl1"}, cfg1.Mydumper.Filter) + require.Equal(t, GetDefaultFilter(), originalDefaultCfg) cfg2 := NewConfig() - require.True(t, common.StringSliceEqual( - cfg2.Mydumper.Filter, - originalDefaultCfg, - )) - require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg)) + require.Equal(t, originalDefaultCfg, cfg2.Mydumper.Filter) + require.Equal(t, GetDefaultFilter(), originalDefaultCfg) gCfg1, err := LoadGlobalConfig([]string{"-f", "db1.tbl1", "-f", "db2.*", "-f", "!db2.tbl1"}, nil) require.NoError(t, err) - require.True(t, common.StringSliceEqual( - gCfg1.Mydumper.Filter, - []string{"db1.tbl1", "db2.*", "!db2.tbl1"}, - )) - require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg)) + require.Equal(t, []string{"db1.tbl1", "db2.*", "!db2.tbl1"}, gCfg1.Mydumper.Filter) + require.Equal(t, GetDefaultFilter(), originalDefaultCfg) gCfg2, err := LoadGlobalConfig([]string{}, nil) require.NoError(t, err) - require.True(t, common.StringSliceEqual( - gCfg2.Mydumper.Filter, - originalDefaultCfg, - )) - require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg)) + require.Equal(t, originalDefaultCfg, gCfg2.Mydumper.Filter) + require.Equal(t, GetDefaultFilter(), originalDefaultCfg) } func TestCompressionType(t *testing.T) { diff --git a/br/pkg/lightning/verification/BUILD.bazel b/br/pkg/lightning/verification/BUILD.bazel index 122e4b0e8535f..a28cd05043f0f 100644 --- a/br/pkg/lightning/verification/BUILD.bazel +++ b/br/pkg/lightning/verification/BUILD.bazel @@ -17,9 +17,11 @@ go_test( timeout = "short", srcs = ["checksum_test.go"], flaky = True, + shard_count = 3, deps = [ ":verification", "//br/pkg/lightning/common", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", ], ) diff --git a/br/pkg/lightning/verification/checksum.go b/br/pkg/lightning/verification/checksum.go index d1219455d3254..d30e143b301ad 100644 --- a/br/pkg/lightning/verification/checksum.go +++ b/br/pkg/lightning/verification/checksum.go @@ -25,7 +25,8 @@ import ( var ecmaTable = crc64.MakeTable(crc64.ECMA) -// KVChecksum is the checksum of a collection of key-value pairs. +// KVChecksum is the checksum of a collection of key-value pairs. The zero value +// of KVChecksum is a checksum for empty content and zero keyspace. type KVChecksum struct { base uint64 prefixLen int @@ -34,11 +35,9 @@ type KVChecksum struct { checksum uint64 } -// NewKVChecksum creates a new KVChecksum with the given checksum. -func NewKVChecksum(checksum uint64) *KVChecksum { - return &KVChecksum{ - checksum: checksum, - } +// NewKVChecksum creates a pointer to zero KVChecksum. +func NewKVChecksum() *KVChecksum { + return &KVChecksum{} } // NewKVChecksumWithKeyspace creates a new KVChecksum with the given checksum and keyspace. @@ -127,3 +126,135 @@ func (c *KVChecksum) MarshalJSON() ([]byte, error) { result := fmt.Sprintf(`{"checksum":%d,"size":%d,"kvs":%d}`, c.checksum, c.bytes, c.kvs) return []byte(result), nil } + +// KVGroupChecksum is KVChecksum(s) each for a data KV group or index KV groups. +type KVGroupChecksum struct { + m map[int64]*KVChecksum + codec tikv.Codec +} + +// DataKVGroupID represents the ID for data KV group, as index id starts from 1, +// so we use -1 to represent data kv group. +const DataKVGroupID = -1 + +// NewKVGroupChecksumWithKeyspace creates a new KVGroupChecksum with the given +// keyspace. +func NewKVGroupChecksumWithKeyspace(k tikv.Codec) *KVGroupChecksum { + m := make(map[int64]*KVChecksum, 8) + m[DataKVGroupID] = NewKVChecksumWithKeyspace(k) + return &KVGroupChecksum{m: m, codec: k} +} + +// NewKVGroupChecksumForAdd creates a new KVGroupChecksum, and it can't be used +// with UpdateOneDataKV or UpdateOneIndexKV. +func NewKVGroupChecksumForAdd() *KVGroupChecksum { + m := make(map[int64]*KVChecksum, 8) + m[DataKVGroupID] = NewKVChecksum() + return &KVGroupChecksum{m: m} +} + +// UpdateOneDataKV updates the checksum with a single data(record) key-value +// pair. It will not check the key-value pair's key is a real data key again. +func (c *KVGroupChecksum) UpdateOneDataKV(kv common.KvPair) { + c.m[DataKVGroupID].UpdateOne(kv) +} + +// UpdateOneIndexKV updates the checksum with a single index key-value pair. It +// will not check the key-value pair's key is a real index key of that index ID +// again. +func (c *KVGroupChecksum) UpdateOneIndexKV(indexID int64, kv common.KvPair) { + cksum := c.m[indexID] + if cksum == nil { + cksum = NewKVChecksumWithKeyspace(c.codec) + c.m[indexID] = cksum + } + cksum.UpdateOne(kv) +} + +// Add adds the checksum of another KVGroupChecksum. +func (c *KVGroupChecksum) Add(other *KVGroupChecksum) { + for id, cksum := range other.m { + thisCksum := c.getOrCreateOneGroup(id) + thisCksum.Add(cksum) + } +} + +func (c *KVGroupChecksum) getOrCreateOneGroup(id int64) *KVChecksum { + cksum, ok := c.m[id] + if ok { + return cksum + } + if c.codec == nil { + cksum = NewKVChecksum() + } else { + cksum = NewKVChecksumWithKeyspace(c.codec) + } + c.m[id] = cksum + return cksum +} + +// AddRawGroup adds the raw information of a KV group. +func (c *KVGroupChecksum) AddRawGroup(id int64, bytes, kvs, checksum uint64) { + oneGroup := c.getOrCreateOneGroup(id) + tmp := MakeKVChecksum(bytes, kvs, checksum) + oneGroup.Add(&tmp) +} + +// DataAndIndexSumSize returns the total size of data KV pairs and index KV pairs. +func (c *KVGroupChecksum) DataAndIndexSumSize() (dataSize, indexSize uint64) { + for id, cksum := range c.m { + if id == DataKVGroupID { + dataSize = cksum.SumSize() + } else { + indexSize += cksum.SumSize() + } + } + return +} + +// DataAndIndexSumKVS returns the total number of data KV pairs and index KV pairs. +func (c *KVGroupChecksum) DataAndIndexSumKVS() (dataKVS, indexKVS uint64) { + for id, cksum := range c.m { + if id == DataKVGroupID { + dataKVS = cksum.SumKVS() + } else { + indexKVS += cksum.SumKVS() + } + } + return +} + +// GetInnerChecksums returns a cloned map of index ID to its KVChecksum. +func (c *KVGroupChecksum) GetInnerChecksums() map[int64]*KVChecksum { + m := make(map[int64]*KVChecksum, len(c.m)) + for id, cksum := range c.m { + m[id] = &KVChecksum{ + base: cksum.base, + prefixLen: cksum.prefixLen, + bytes: cksum.bytes, + kvs: cksum.kvs, + checksum: cksum.checksum, + } + } + return m +} + +// MergedChecksum merges all groups of this checksum into a single KVChecksum. +func (c *KVGroupChecksum) MergedChecksum() KVChecksum { + merged := NewKVChecksum() + for _, cksum := range c.m { + merged.Add(cksum) + } + return *merged +} + +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (c *KVGroupChecksum) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + for id, cksum := range c.m { + err := encoder.AddObject(fmt.Sprintf("id=%d", id), cksum) + if err != nil { + return err + } + } + return nil +} diff --git a/br/pkg/lightning/verification/checksum_test.go b/br/pkg/lightning/verification/checksum_test.go index 58484eecfea16..68adbf025cef6 100644 --- a/br/pkg/lightning/verification/checksum_test.go +++ b/br/pkg/lightning/verification/checksum_test.go @@ -21,12 +21,11 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" ) -func uint64NotEqual(a uint64, b uint64) bool { return a != b } - -func TestChcksum(t *testing.T) { - checksum := verification.NewKVChecksum(0) +func TestChecksum(t *testing.T) { + checksum := verification.NewKVChecksum() require.Equal(t, uint64(0), checksum.Sum()) // checksum on nothing @@ -64,7 +63,7 @@ func TestChcksum(t *testing.T) { checksum.Update(kvs) require.Equal(t, kvBytes<<1, checksum.SumSize()) require.Equal(t, uint64(len(kvs))<<1, checksum.SumKVS()) - require.True(t, uint64NotEqual(checksum.Sum(), excpectChecksum)) + require.NotEqual(t, excpectChecksum, checksum.Sum()) } func TestChecksumJSON(t *testing.T) { @@ -79,3 +78,55 @@ func TestChecksumJSON(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte(`{"Checksum":{"checksum":7890,"size":123,"kvs":456}}`), res) } + +type mockCodec struct { + tikv.Codec + keyspace []byte +} + +func (m *mockCodec) GetKeyspace() []byte { + return m.keyspace +} + +func TestGroupChecksum(t *testing.T) { + codec := &mockCodec{} + kvPair := common.KvPair{Key: []byte("key"), Val: []byte("val")} + kvPair2 := common.KvPair{Key: []byte("key2"), Val: []byte("val2")} + + c := verification.NewKVGroupChecksumWithKeyspace(codec) + c.UpdateOneDataKV(kvPair) + c.UpdateOneIndexKV(1, kvPair2) + inner := c.GetInnerChecksums() + require.Equal(t, 2, len(inner)) + require.Equal(t, uint64(1), inner[1].SumKVS()) + require.Equal(t, uint64(1), inner[verification.DataKVGroupID].SumKVS()) + + keyspaceCodec := &mockCodec{keyspace: []byte("keyspace")} + keyspaceC := verification.NewKVGroupChecksumWithKeyspace(keyspaceCodec) + keyspaceC.UpdateOneDataKV(kvPair) + keyspaceC.UpdateOneIndexKV(1, kvPair2) + keyspaceInner := keyspaceC.GetInnerChecksums() + require.NotEqual(t, inner, keyspaceInner) + + c2 := verification.NewKVGroupChecksumWithKeyspace(codec) + c2.UpdateOneIndexKV(1, kvPair) + c2.UpdateOneIndexKV(2, kvPair2) + c.Add(c2) + inner = c.GetInnerChecksums() + require.Equal(t, 3, len(inner)) + require.Equal(t, uint64(2), inner[1].SumKVS()) + require.Equal(t, uint64(1), inner[2].SumKVS()) + require.Equal(t, uint64(1), inner[verification.DataKVGroupID].SumKVS()) + + dataKVCnt, indexKVCnt := c.DataAndIndexSumKVS() + require.Equal(t, uint64(1), dataKVCnt) + require.Equal(t, uint64(3), indexKVCnt) + + dataSize, indexSize := c.DataAndIndexSumSize() + require.Equal(t, uint64(6), dataSize) + require.Equal(t, uint64(22), indexSize) + + merged := c.MergedChecksum() + require.Equal(t, uint64(4), merged.SumKVS()) + require.Equal(t, uint64(28), merged.SumSize()) +} diff --git a/pkg/disttask/importinto/planner.go b/pkg/disttask/importinto/planner.go index e2a2810c071e7..9224f6aa355e2 100644 --- a/pkg/disttask/importinto/planner.go +++ b/pkg/disttask/importinto/planner.go @@ -205,11 +205,12 @@ func (*PostProcessSpec) ToSubtaskMeta(planCtx planner.PlanCtx) ([]byte, error) { } subtaskMetas = append(subtaskMetas, &subtaskMeta) } - var localChecksum verify.KVChecksum + localChecksum := verify.NewKVGroupChecksumForAdd() maxIDs := make(map[autoid.AllocatorType]int64, 3) for _, subtaskMeta := range subtaskMetas { - checksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum) - localChecksum.Add(&checksum) + for id, c := range subtaskMeta.Checksum { + localChecksum.AddRawGroup(id, c.Size, c.KVs, c.Sum) + } for key, val := range subtaskMeta.MaxIDs { if maxIDs[key] < val { @@ -217,13 +218,17 @@ func (*PostProcessSpec) ToSubtaskMeta(planCtx planner.PlanCtx) ([]byte, error) { } } } + c := localChecksum.GetInnerChecksums() postProcessStepMeta := &PostProcessStepMeta{ - Checksum: Checksum{ - Size: localChecksum.SumSize(), - KVs: localChecksum.SumKVS(), - Sum: localChecksum.Sum(), - }, - MaxIDs: maxIDs, + Checksum: make(map[int64]Checksum, len(c)), + MaxIDs: maxIDs, + } + for id, cksum := range c { + postProcessStepMeta.Checksum[id] = Checksum{ + Size: cksum.SumSize(), + KVs: cksum.SumKVS(), + Sum: cksum.Sum(), + } } return json.Marshal(postProcessStepMeta) } diff --git a/pkg/disttask/importinto/planner_test.go b/pkg/disttask/importinto/planner_test.go index f5b8c1e4c3286..28981ccc57e24 100644 --- a/pkg/disttask/importinto/planner_test.go +++ b/pkg/disttask/importinto/planner_test.go @@ -97,7 +97,7 @@ func TestToPhysicalPlan(t *testing.T) { require.NoError(t, err) require.Equal(t, [][]byte{bs}, subtaskMetas1) - subtaskMeta1.Checksum = Checksum{Size: 1, KVs: 2, Sum: 3} + subtaskMeta1.Checksum = map[int64]Checksum{-1: {Size: 1, KVs: 2, Sum: 3}} bs, err = json.Marshal(subtaskMeta1) require.NoError(t, err) planCtx = planner.PlanCtx{ @@ -112,7 +112,7 @@ func TestToPhysicalPlan(t *testing.T) { }, proto.ImportStepPostProcess) require.NoError(t, err) subtaskMeta2 := PostProcessStepMeta{ - Checksum: Checksum{Size: 1, KVs: 2, Sum: 3}, + Checksum: map[int64]Checksum{-1: {Size: 1, KVs: 2, Sum: 3}}, MaxIDs: map[autoid.AllocatorType]int64{}, } bs, err = json.Marshal(subtaskMeta2) diff --git a/pkg/disttask/importinto/proto.go b/pkg/disttask/importinto/proto.go index 129433504e84a..88a53fbc19986 100644 --- a/pkg/disttask/importinto/proto.go +++ b/pkg/disttask/importinto/proto.go @@ -53,7 +53,7 @@ type ImportStepMeta struct { // this is the engine ID, not the id in tidb_background_subtask table. ID int32 Chunks []Chunk - Checksum Checksum + Checksum map[int64]Checksum // see KVGroupChecksum for definition of map key. Result Result // MaxIDs stores the max id that have been used during encoding for each allocator type. // the max id is same among all allocator types for now, since we're using same base, see @@ -94,8 +94,9 @@ type WriteIngestStepMeta struct { // PostProcessStepMeta is the meta of post process step. type PostProcessStepMeta struct { - // accumulated checksum of all subtasks in import step. - Checksum Checksum + // accumulated checksum of all subtasks in import step. See KVGroupChecksum for + // definition of map key. + Checksum map[int64]Checksum // MaxIDs of max all max-ids of subtasks in import step. MaxIDs map[autoid.AllocatorType]int64 } @@ -110,7 +111,7 @@ type SharedVars struct { Progress *importer.Progress mu sync.Mutex - Checksum *verification.KVChecksum + Checksum *verification.KVGroupChecksum SortedDataMeta *external.SortedKVMeta // SortedIndexMetas is a map from index id to its sorted kv meta. diff --git a/pkg/disttask/importinto/subtask_executor.go b/pkg/disttask/importinto/subtask_executor.go index eaab3656d0b85..5202dc70d02d9 100644 --- a/pkg/disttask/importinto/subtask_executor.go +++ b/pkg/disttask/importinto/subtask_executor.go @@ -70,19 +70,38 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr }) chunkCheckpoint := toChunkCheckpoint(e.mTtask.Chunk) sharedVars := e.mTtask.SharedVars + checksum := verify.NewKVGroupChecksumWithKeyspace(sharedVars.TableImporter.GetCodec()) if sharedVars.TableImporter.IsLocalSort() { - if err := importer.ProcessChunk(ctx, &chunkCheckpoint, sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, sharedVars.Progress, logger); err != nil { + if err := importer.ProcessChunk( + ctx, + &chunkCheckpoint, + sharedVars.TableImporter, + sharedVars.DataEngine, + sharedVars.IndexEngine, + sharedVars.Progress, + logger, + checksum, + ); err != nil { return err } } else { - if err := importer.ProcessChunkWith(ctx, &chunkCheckpoint, sharedVars.TableImporter, dataWriter, indexWriter, sharedVars.Progress, logger); err != nil { + if err := importer.ProcessChunkWithWriter( + ctx, + &chunkCheckpoint, + sharedVars.TableImporter, + dataWriter, + indexWriter, + sharedVars.Progress, + logger, + checksum, + ); err != nil { return err } } sharedVars.mu.Lock() defer sharedVars.mu.Unlock() - sharedVars.Checksum.Add(&chunkCheckpoint.Checksum) + sharedVars.Checksum.Add(checksum) return nil } @@ -109,13 +128,24 @@ func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProce // err = multierr.Append(err, err2) // }() - localChecksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum) + localChecksum := verify.NewKVGroupChecksumForAdd() + for id, cksum := range subtaskMeta.Checksum { + callLog.Info( + "kv group checksum", + zap.Int64("groupId", id), + zap.Uint64("size", cksum.Size), + zap.Uint64("kvs", cksum.KVs), + zap.Uint64("checksum", cksum.Sum), + ) + localChecksum.AddRawGroup(id, cksum.Size, cksum.KVs, cksum.Sum) + } + taskManager, err := storage.GetTaskManager() ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask) if err != nil { return err } return taskManager.WithNewSession(func(se sessionctx.Context) error { - return importer.VerifyChecksum(ctx, &taskMeta.Plan, localChecksum, se, logger) + return importer.VerifyChecksum(ctx, &taskMeta.Plan, localChecksum.MergedChecksum(), se, logger) }) } diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 181c4ca4c2d1b..24b86b2e82454 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -148,7 +148,7 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt DataEngine: dataEngine, IndexEngine: indexEngine, Progress: importer.NewProgress(), - Checksum: &verification.KVChecksum{}, + Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetCodec()), SortedDataMeta: &external.SortedKVMeta{}, SortedIndexMetas: make(map[int64]*external.SortedKVMeta), } @@ -201,7 +201,7 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt return errors.Errorf("sharedVars %d not found", subtaskMeta.ID) } - var dataKVCount int64 + var dataKVCount uint64 if s.tableImporter.IsLocalSort() { // TODO: we should close and cleanup engine in all case, since there's no checkpoint. s.logger.Info("import data engine", zap.Int32("engine-id", subtaskMeta.ID)) @@ -209,10 +209,11 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt if err != nil { return err } - dataKVCount, err = s.tableImporter.ImportAndCleanup(ctx, closedDataEngine) + dataKVCount2, err := s.tableImporter.ImportAndCleanup(ctx, closedDataEngine) if err != nil { return err } + dataKVCount = uint64(dataKVCount2) s.logger.Info("import index engine", zap.Int32("engine-id", subtaskMeta.ID)) if closedEngine, err := sharedVars.IndexEngine.Close(ctx); err != nil { @@ -225,11 +226,16 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt sharedVars.mu.Lock() defer sharedVars.mu.Unlock() - subtaskMeta.Checksum.Sum = sharedVars.Checksum.Sum() - subtaskMeta.Checksum.KVs = sharedVars.Checksum.SumKVS() - subtaskMeta.Checksum.Size = sharedVars.Checksum.SumSize() + subtaskMeta.Checksum = map[int64]Checksum{} + for id, c := range sharedVars.Checksum.GetInnerChecksums() { + subtaskMeta.Checksum[id] = Checksum{ + Sum: c.Sum(), + KVs: c.SumKVS(), + Size: c.SumSize(), + } + } subtaskMeta.Result = Result{ - LoadedRowCnt: uint64(dataKVCount), + LoadedRowCnt: dataKVCount, ColSizeMap: sharedVars.Progress.GetColSize(), } allocators := sharedVars.TableImporter.Allocators() diff --git a/pkg/disttask/importinto/task_executor_testkit_test.go b/pkg/disttask/importinto/task_executor_testkit_test.go index 3458dbeef69a0..49924751fb5de 100644 --- a/pkg/disttask/importinto/task_executor_testkit_test.go +++ b/pkg/disttask/importinto/task_executor_testkit_test.go @@ -46,10 +46,12 @@ func TestPostProcessStepExecutor(t *testing.T) { tk.MustExec("insert into t values (1, 2), (3, 4)") res := tk.MustQuery("admin checksum table t").Rows() stepMeta := &importinto.PostProcessStepMeta{ - Checksum: importinto.Checksum{ - Sum: uint64(asInt(res[0][2].(string))), - KVs: uint64(asInt(res[0][3].(string))), - Size: uint64(asInt(res[0][4].(string))), + Checksum: map[int64]importinto.Checksum{ + -1: { + Sum: uint64(asInt(res[0][2].(string))), + KVs: uint64(asInt(res[0][3].(string))), + Size: uint64(asInt(res[0][4].(string))), + }, }, } @@ -72,7 +74,9 @@ func TestPostProcessStepExecutor(t *testing.T) { err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes}) require.NoError(t, err) - stepMeta.Checksum.Sum += 1 + tmp := stepMeta.Checksum[-1] + tmp.Sum += 1 + stepMeta.Checksum[-1] = tmp bytes, err = json.Marshal(stepMeta) require.NoError(t, err) executor = importinto.NewPostProcessStepExecutor(1, taskMeta, zap.NewExample()) diff --git a/pkg/executor/importer/chunk_process.go b/pkg/executor/importer/chunk_process.go index 636f1048212f9..609e010dc9d19 100644 --- a/pkg/executor/importer/chunk_process.go +++ b/pkg/executor/importer/chunk_process.go @@ -48,101 +48,172 @@ var ( MinDeliverRowCnt = 4096 ) -type deliveredRow struct { - kvs *kv.Pairs // if kvs is nil, this indicated we've got the last message. - // offset is the end offset in data file after encode this row. - offset int64 +type rowToEncode struct { + row []types.Datum + rowID int64 + // endOffset represents the offset after the current row in encode reader. + // it will be negative if the data source is not file. + endOffset int64 + resetFn func() } -type deliverKVBatch struct { - dataKVs kv.Pairs - indexKVs kv.Pairs +type encodeReaderFn func(ctx context.Context) (data rowToEncode, closed bool, err error) - dataChecksum *verify.KVChecksum - indexChecksum *verify.KVChecksum +// parserEncodeReader wraps a mydump.Parser as a encodeReaderFn. +func parserEncodeReader(parser mydump.Parser, endOffset int64, filename string) encodeReaderFn { + return func(context.Context) (data rowToEncode, closed bool, err error) { + readPos, _ := parser.Pos() + if readPos >= endOffset { + closed = true + return + } - codec tikv.Codec + err = parser.ReadRow() + // todo: we can implement a ScannedPos which don't return error, will change it later. + currOffset, _ := parser.ScannedPos() + switch errors.Cause(err) { + case nil: + case io.EOF: + closed = true + err = nil + return + default: + err = common.ErrEncodeKV.Wrap(err).GenWithStackByArgs(filename, currOffset) + return + } + lastRow := parser.LastRow() + data = rowToEncode{ + row: lastRow.Row, + rowID: lastRow.RowID, + endOffset: currOffset, + resetFn: func() { parser.RecycleRow(lastRow) }, + } + return + } } -func newDeliverKVBatch(codec tikv.Codec) *deliverKVBatch { - return &deliverKVBatch{ - dataChecksum: verify.NewKVChecksumWithKeyspace(codec), - indexChecksum: verify.NewKVChecksumWithKeyspace(codec), - codec: codec, +// queryRowEncodeReader wraps a QueryRow channel as a encodeReaderFn. +func queryRowEncodeReader(rowCh <-chan QueryRow) encodeReaderFn { + return func(ctx context.Context) (data rowToEncode, closed bool, err error) { + select { + case <-ctx.Done(): + err = ctx.Err() + return + case row, ok := <-rowCh: + if !ok { + closed = true + return + } + data = rowToEncode{ + row: row.Data, + rowID: row.ID, + endOffset: -1, + resetFn: func() {}, + } + return + } } } -func (b *deliverKVBatch) reset() { - b.dataKVs.Clear() - b.indexKVs.Clear() - b.dataChecksum = verify.NewKVChecksumWithKeyspace(b.codec) - b.indexChecksum = verify.NewKVChecksumWithKeyspace(b.codec) +type encodedKVGroupBatch struct { + dataKVs []common.KvPair + indexKVs map[int64][]common.KvPair // indexID -> pairs + + bytesBuf *kv.BytesBuf + memBuf *kv.MemBuf + + groupChecksum *verify.KVGroupChecksum } -func (b *deliverKVBatch) size() uint64 { - return b.dataChecksum.SumSize() + b.indexChecksum.SumSize() +func (b *encodedKVGroupBatch) reset() { + if b.memBuf == nil { + return + } + // mimic kv.Pairs.Clear + b.memBuf.Recycle(b.bytesBuf) + b.bytesBuf = nil + b.memBuf = nil } -func (b *deliverKVBatch) add(kvs *kv.Pairs) { +func newEncodedKVGroupBatch(codec tikv.Codec) *encodedKVGroupBatch { + return &encodedKVGroupBatch{ + indexKVs: make(map[int64][]common.KvPair, 8), + groupChecksum: verify.NewKVGroupChecksumWithKeyspace(codec), + } +} + +// add must be called with `kvs` from the same session for a encodedKVGroupBatch. +func (b *encodedKVGroupBatch) add(kvs *kv.Pairs) error { for _, pair := range kvs.Pairs { if tablecodec.IsRecordKey(pair.Key) { - b.dataKVs.Pairs = append(b.dataKVs.Pairs, pair) - b.dataChecksum.UpdateOne(pair) + b.dataKVs = append(b.dataKVs, pair) + b.groupChecksum.UpdateOneDataKV(pair) } else { - b.indexKVs.Pairs = append(b.indexKVs.Pairs, pair) - b.indexChecksum.UpdateOne(pair) + indexID, err := tablecodec.DecodeIndexID(pair.Key) + if err != nil { + return errors.Trace(err) + } + b.indexKVs[indexID] = append(b.indexKVs[indexID], pair) + b.groupChecksum.UpdateOneIndexKV(indexID, pair) } } - // the related buf is shared, so we only need to set it into one of the kvs so it can be released - if kvs.BytesBuf != nil { - b.dataKVs.BytesBuf = kvs.BytesBuf - b.dataKVs.MemBuf = kvs.MemBuf + // the related buf is shared, so we only need to record any one of them. + if b.bytesBuf == nil && kvs.BytesBuf != nil { + b.bytesBuf = kvs.BytesBuf + b.memBuf = kvs.MemBuf } + return nil } -type chunkEncoder interface { - init() error - encodeLoop(ctx context.Context) error - summaryFields() []zap.Field -} +// chunkEncoder encodes data from readFn and sends encoded data to sendFn. +type chunkEncoder struct { + readFn encodeReaderFn + offset int64 + sendFn func(ctx context.Context, batch *encodedKVGroupBatch) error -// fileChunkEncoder encode data chunk(either a data file or part of a file). -type fileChunkEncoder struct { - parser mydump.Parser - chunkInfo *checkpoints.ChunkCheckpoint + chunkName string logger *zap.Logger encoder KVEncoder kvCodec tikv.Codec - sendFn func(ctx context.Context, kvs []deliveredRow) error - // startOffset is the offset of current source file reader. it might be - // larger than the pos that has been parsed due to reader buffering. - // some rows before startOffset might be skipped if skip_rows > 0. - startOffset int64 - // total duration takes by read/encode/deliver. + // total duration takes by read/encode. readTotalDur time.Duration encodeTotalDur time.Duration -} -var _ chunkEncoder = (*fileChunkEncoder)(nil) + groupChecksum *verify.KVGroupChecksum +} -func (p *fileChunkEncoder) init() error { - // we might skip N rows or start from checkpoint - offset, err := p.parser.ScannedPos() - if err != nil { - return errors.Trace(err) +func newChunkEncoder( + chunkName string, + readFn encodeReaderFn, + offset int64, + sendFn func(ctx context.Context, batch *encodedKVGroupBatch) error, + logger *zap.Logger, + encoder KVEncoder, + kvCodec tikv.Codec, +) *chunkEncoder { + return &chunkEncoder{ + chunkName: chunkName, + readFn: readFn, + offset: offset, + sendFn: sendFn, + logger: logger, + encoder: encoder, + kvCodec: kvCodec, + groupChecksum: verify.NewKVGroupChecksumWithKeyspace(kvCodec), } - p.startOffset = offset - return nil } -func (p *fileChunkEncoder) encodeLoop(ctx context.Context) error { - var err error - reachEOF := false - prevOffset, currOffset := p.startOffset, p.startOffset - - var encodedBytesCounter, encodedRowsCounter prometheus.Counter +func (p *chunkEncoder) encodeLoop(ctx context.Context) error { + var ( + encodedBytesCounter, encodedRowsCounter prometheus.Counter + readDur, encodeDur time.Duration + rowCount int + rowBatch = make([]*kv.Pairs, 0, MinDeliverRowCnt) + rowBatchByteSize uint64 + currOffset int64 + ) metrics, _ := metric.GetCommonMetric(ctx) if metrics != nil { encodedBytesCounter = metrics.BytesCounter.WithLabelValues(metric.StateRestored) @@ -150,85 +221,90 @@ func (p *fileChunkEncoder) encodeLoop(ctx context.Context) error { encodedRowsCounter = metrics.RowsCounter.WithLabelValues(metric.StateRestored, "") } - for !reachEOF { - readPos, _ := p.parser.Pos() - if readPos >= p.chunkInfo.Chunk.EndOffset { - break + recordSendReset := func() error { + if len(rowBatch) == 0 { + return nil } - var readDur, encodeDur time.Duration - canDeliver := false - rowBatch := make([]deliveredRow, 0, MinDeliverRowCnt) - var rowCount, kvSize uint64 - outLoop: - for !canDeliver { - readDurStart := time.Now() - err = p.parser.ReadRow() - readPos, _ = p.parser.Pos() - // todo: we can implement a ScannedPos which don't return error, will change it later. - currOffset, _ = p.parser.ScannedPos() - - switch errors.Cause(err) { - case nil: - case io.EOF: - reachEOF = true - break outLoop - default: - return common.ErrEncodeKV.Wrap(err).GenWithStackByArgs(p.chunkInfo.GetKey(), currOffset) - } - readDur += time.Since(readDurStart) - encodeDurStart := time.Now() - lastRow := p.parser.LastRow() - // sql -> kv - kvs, encodeErr := p.encoder.Encode(lastRow.Row, lastRow.RowID) - encodeDur += time.Since(encodeDurStart) - - if encodeErr != nil { - // todo: record and ignore encode error if user set max-errors param - err = common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(p.chunkInfo.GetKey(), currOffset) - } - p.parser.RecycleRow(lastRow) - if err != nil { - return err - } - - rowBatch = append(rowBatch, deliveredRow{kvs: kvs, offset: currOffset}) - kvSize += kvs.Size() - rowCount++ - // pebble cannot allow > 4.0G kv in one batch. - // we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt. - // so add this check. - if kvSize >= MinDeliverBytes || len(rowBatch) >= MinDeliverRowCnt || readPos == p.chunkInfo.Chunk.EndOffset { - canDeliver = true - } + if currOffset >= 0 && metrics != nil { + delta := currOffset - p.offset + p.offset = currOffset + // if we're using split_file, this metric might larger than total + // source file size, as the offset we're using is the reader offset, + // not parser offset, and we'll buffer data. + encodedBytesCounter.Add(float64(delta)) } - delta := currOffset - prevOffset - prevOffset = currOffset - - p.encodeTotalDur += encodeDur - p.readTotalDur += readDur if metrics != nil { metrics.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds()) metrics.RowReadSecondsHistogram.Observe(readDur.Seconds()) - // if we're using split_file, this metric might larger than total - // source file size, as the offset we're using is the reader offset, - // not parser offset, and we'll buffer data. - encodedBytesCounter.Add(float64(delta)) encodedRowsCounter.Add(float64(rowCount)) } + p.encodeTotalDur += encodeDur + p.readTotalDur += readDur - if len(rowBatch) > 0 { - if err = p.sendFn(ctx, rowBatch); err != nil { + kvGroupBatch := newEncodedKVGroupBatch(p.kvCodec) + + for _, kvs := range rowBatch { + if err := kvGroupBatch.add(kvs); err != nil { + return errors.Trace(err) + } + } + + p.groupChecksum.Add(kvGroupBatch.groupChecksum) + + if err := p.sendFn(ctx, kvGroupBatch); err != nil { + return err + } + + // the ownership of rowBatch is transferred to the receiver of sendFn, we should + // not touch it anymore. + rowBatch = make([]*kv.Pairs, 0, MinDeliverRowCnt) + rowBatchByteSize = 0 + rowCount = 0 + readDur = 0 + encodeDur = 0 + return nil + } + + for { + readDurStart := time.Now() + data, closed, err := p.readFn(ctx) + if err != nil { + return errors.Trace(err) + } + if closed { + break + } + readDur += time.Since(readDurStart) + + encodeDurStart := time.Now() + kvs, encodeErr := p.encoder.Encode(data.row, data.rowID) + currOffset = data.endOffset + data.resetFn() + if encodeErr != nil { + // todo: record and ignore encode error if user set max-errors param + return common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(p.chunkName, data.endOffset) + } + encodeDur += time.Since(encodeDurStart) + + rowCount++ + rowBatch = append(rowBatch, kvs) + rowBatchByteSize += kvs.Size() + // pebble cannot allow > 4.0G kv in one batch. + // we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt. + // so add this check. + if rowBatchByteSize >= MinDeliverBytes || len(rowBatch) >= MinDeliverRowCnt { + if err := recordSendReset(); err != nil { return err } } } - return nil + return recordSendReset() } -func (p *fileChunkEncoder) summaryFields() []zap.Field { +func (p *chunkEncoder) summaryFields() []zap.Field { return []zap.Field{ zap.Duration("readDur", p.readTotalDur), zap.Duration("encodeDur", p.encodeTotalDur), @@ -242,26 +318,23 @@ type ChunkProcessor interface { } type baseChunkProcessor struct { - sourceType DataSourceType - enc chunkEncoder - deliver *dataDeliver - logger *zap.Logger - chunkInfo *checkpoints.ChunkCheckpoint + sourceType DataSourceType + enc *chunkEncoder + deliver *dataDeliver + logger *zap.Logger + groupChecksum *verify.KVGroupChecksum } func (p *baseChunkProcessor) Process(ctx context.Context) (err error) { task := log.BeginTask(p.logger, "process chunk") defer func() { - logFields := append(p.enc.summaryFields(), p.deliver.logFields()...) + logFields := append(p.enc.summaryFields(), p.deliver.summaryFields()...) logFields = append(logFields, zap.Stringer("type", p.sourceType)) task.End(zap.ErrorLevel, err, logFields...) if metrics, ok := metric.GetCommonMetric(ctx); ok && err == nil { metrics.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Inc() } }() - if err2 := p.enc.init(); err2 != nil { - return err2 - } group, gCtx := errgroup.WithContext(ctx) group.Go(func() error { @@ -273,7 +346,10 @@ func (p *baseChunkProcessor) Process(ctx context.Context) (err error) { }) err2 := group.Wait() - p.chunkInfo.Checksum.Add(&p.deliver.checksum) + // in some unit tests it's nil + if c := p.groupChecksum; c != nil { + c.Add(p.enc.groupChecksum) + } return err2 } @@ -288,51 +364,52 @@ func NewFileChunkProcessor( diskQuotaLock *syncutil.RWMutex, dataWriter backend.EngineWriter, indexWriter backend.EngineWriter, + groupChecksum *verify.KVGroupChecksum, ) ChunkProcessor { chunkLogger := logger.With(zap.String("key", chunk.GetKey())) deliver := &dataDeliver{ logger: chunkLogger, kvCodec: kvCodec, diskQuotaLock: diskQuotaLock, - kvsCh: make(chan []deliveredRow, maxKVQueueSize), + kvBatch: make(chan *encodedKVGroupBatch, maxKVQueueSize), dataWriter: dataWriter, indexWriter: indexWriter, } return &baseChunkProcessor{ sourceType: DataSourceTypeFile, deliver: deliver, - enc: &fileChunkEncoder{ - parser: parser, - chunkInfo: chunk, - logger: chunkLogger, - encoder: encoder, - kvCodec: kvCodec, - sendFn: deliver.sendEncodedData, - }, - logger: chunkLogger, - chunkInfo: chunk, + enc: newChunkEncoder( + chunk.GetKey(), + parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey()), + chunk.Chunk.Offset, + deliver.sendEncodedData, + chunkLogger, + encoder, + kvCodec, + ), + logger: chunkLogger, + groupChecksum: groupChecksum, } } type dataDeliver struct { logger *zap.Logger kvCodec tikv.Codec - kvsCh chan []deliveredRow + kvBatch chan *encodedKVGroupBatch diskQuotaLock *syncutil.RWMutex dataWriter backend.EngineWriter indexWriter backend.EngineWriter - checksum verify.KVChecksum deliverTotalDur time.Duration } func (p *dataDeliver) encodeDone() { - close(p.kvsCh) + close(p.kvBatch) } -func (p *dataDeliver) sendEncodedData(ctx context.Context, kvs []deliveredRow) error { +func (p *dataDeliver) sendEncodedData(ctx context.Context, batch *encodedKVGroupBatch) error { select { - case p.kvsCh <- kvs: + case p.kvBatch <- batch: return nil case <-ctx.Done(): return ctx.Err() @@ -340,8 +417,6 @@ func (p *dataDeliver) sendEncodedData(ctx context.Context, kvs []deliveredRow) e } func (p *dataDeliver) deliverLoop(ctx context.Context) error { - kvBatch := newDeliverKVBatch(p.kvCodec) - var ( dataKVBytesHist, indexKVBytesHist prometheus.Observer dataKVPairsHist, indexKVPairsHist prometheus.Observer @@ -358,23 +433,18 @@ func (p *dataDeliver) deliverLoop(ctx context.Context) error { } for { - outer: - for kvBatch.size() < MinDeliverBytes { - select { - case kvPacket, ok := <-p.kvsCh: - if !ok { - break outer - } - for _, row := range kvPacket { - kvBatch.add(row.kvs) - } - case <-ctx.Done(): - return ctx.Err() + var ( + kvBatch *encodedKVGroupBatch + ok bool + ) + + select { + case kvBatch, ok = <-p.kvBatch: + if !ok { + return nil } - } - - if kvBatch.size() == 0 { - break + case <-ctx.Done(): + return ctx.Err() } err := func() error { @@ -382,13 +452,13 @@ func (p *dataDeliver) deliverLoop(ctx context.Context) error { defer p.diskQuotaLock.RUnlock() start := time.Now() - if err := p.dataWriter.AppendRows(ctx, nil, &kvBatch.dataKVs); err != nil { + if err := p.dataWriter.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvBatch.dataKVs)); err != nil { if !common.IsContextCanceledError(err) { p.logger.Error("write to data engine failed", log.ShortError(err)) } return errors.Trace(err) } - if err := p.indexWriter.AppendRows(ctx, nil, &kvBatch.indexKVs); err != nil { + if err := p.indexWriter.AppendRows(ctx, nil, kv.GroupedPairs(kvBatch.indexKVs)); err != nil { if !common.IsContextCanceledError(err) { p.logger.Error("write to index engine failed", log.ShortError(err)) } @@ -399,10 +469,14 @@ func (p *dataDeliver) deliverLoop(ctx context.Context) error { p.deliverTotalDur += deliverDur if metrics != nil { metrics.BlockDeliverSecondsHistogram.Observe(deliverDur.Seconds()) - dataKVBytesHist.Observe(float64(kvBatch.dataChecksum.SumSize())) - indexKVBytesHist.Observe(float64(kvBatch.indexChecksum.SumSize())) - dataKVPairsHist.Observe(float64(kvBatch.dataChecksum.SumKVS())) - indexKVPairsHist.Observe(float64(kvBatch.indexChecksum.SumKVS())) + + dataSize, indexSize := kvBatch.groupChecksum.DataAndIndexSumSize() + dataKVCnt, indexKVCnt := kvBatch.groupChecksum.DataAndIndexSumKVS() + dataKVBytesHist.Observe(float64(dataSize)) + dataKVPairsHist.Observe(float64(dataKVCnt)) + indexKVBytesHist.Observe(float64(indexSize)) + indexKVPairsHist.Observe(float64(indexKVCnt)) + deliverBytesCounter.Add(float64(dataSize + indexSize)) } return nil }() @@ -410,124 +484,13 @@ func (p *dataDeliver) deliverLoop(ctx context.Context) error { return err } - if metrics != nil { - deliverBytesCounter.Add(float64(kvBatch.size())) - } - - p.checksum.Add(kvBatch.dataChecksum) - p.checksum.Add(kvBatch.indexChecksum) - kvBatch.reset() } - - return nil } -func (p *dataDeliver) logFields() []zap.Field { +func (p *dataDeliver) summaryFields() []zap.Field { return []zap.Field{ zap.Duration("deliverDur", p.deliverTotalDur), - zap.Object("checksum", &p.checksum), - } -} - -// fileChunkEncoder encode data chunk(either a data file or part of a file). -type queryChunkEncoder struct { - rowCh chan QueryRow - chunkInfo *checkpoints.ChunkCheckpoint - logger *zap.Logger - encoder KVEncoder - sendFn func(ctx context.Context, kvs []deliveredRow) error - - // total duration takes by read/encode/deliver. - readTotalDur time.Duration - encodeTotalDur time.Duration -} - -var _ chunkEncoder = (*queryChunkEncoder)(nil) - -func (*queryChunkEncoder) init() error { - return nil -} - -// TODO logic is very similar to fileChunkEncoder, consider merge them. -func (e *queryChunkEncoder) encodeLoop(ctx context.Context) error { - var err error - reachEOF := false - var encodedRowsCounter prometheus.Counter - metrics, _ := metric.GetCommonMetric(ctx) - if metrics != nil { - // table name doesn't matter here, all those metrics will have task-id label. - encodedRowsCounter = metrics.RowsCounter.WithLabelValues(metric.StateRestored, "") - } - - for !reachEOF { - var readDur, encodeDur time.Duration - canDeliver := false - rowBatch := make([]deliveredRow, 0, MinDeliverRowCnt) - var rowCount, kvSize uint64 - outLoop: - for !canDeliver { - readDurStart := time.Now() - var ( - lastRow QueryRow - rowID int64 - ok bool - ) - select { - case lastRow, ok = <-e.rowCh: - if !ok { - reachEOF = true - break outLoop - } - case <-ctx.Done(): - return ctx.Err() - } - readDur += time.Since(readDurStart) - encodeDurStart := time.Now() - // sql -> kv - kvs, encodeErr := e.encoder.Encode(lastRow.Data, lastRow.ID) - encodeDur += time.Since(encodeDurStart) - - if encodeErr != nil { - err = common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(e.chunkInfo.GetKey(), rowID) - } - if err != nil { - return err - } - - rowBatch = append(rowBatch, deliveredRow{kvs: kvs}) - kvSize += kvs.Size() - rowCount++ - // pebble cannot allow > 4.0G kv in one batch. - // we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt. - // so add this check. - if kvSize >= MinDeliverBytes || len(rowBatch) >= MinDeliverRowCnt { - canDeliver = true - } - } - - e.encodeTotalDur += encodeDur - e.readTotalDur += readDur - if metrics != nil { - metrics.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds()) - metrics.RowReadSecondsHistogram.Observe(readDur.Seconds()) - encodedRowsCounter.Add(float64(rowCount)) - } - - if len(rowBatch) > 0 { - if err = e.sendFn(ctx, rowBatch); err != nil { - return err - } - } - } - - return nil -} - -func (e *queryChunkEncoder) summaryFields() []zap.Field { - return []zap.Field{ - zap.Duration("readDur", e.readTotalDur), - zap.Duration("encodeDur", e.encodeTotalDur), } } @@ -541,44 +504,42 @@ func newQueryChunkProcessor( rowCh chan QueryRow, encoder KVEncoder, kvCodec tikv.Codec, - chunk *checkpoints.ChunkCheckpoint, logger *zap.Logger, diskQuotaLock *syncutil.RWMutex, dataWriter backend.EngineWriter, indexWriter backend.EngineWriter, + groupChecksum *verify.KVGroupChecksum, ) ChunkProcessor { - chunkLogger := logger.With(zap.String("key", chunk.GetKey())) + chunkName := "import-from-select" + chunkLogger := logger.With(zap.String("key", chunkName)) deliver := &dataDeliver{ logger: chunkLogger, kvCodec: kvCodec, diskQuotaLock: diskQuotaLock, - kvsCh: make(chan []deliveredRow, maxKVQueueSize), + kvBatch: make(chan *encodedKVGroupBatch, maxKVQueueSize), dataWriter: dataWriter, indexWriter: indexWriter, } return &baseChunkProcessor{ sourceType: DataSourceTypeQuery, deliver: deliver, - enc: &queryChunkEncoder{ - rowCh: rowCh, - chunkInfo: chunk, - logger: chunkLogger, - encoder: encoder, - sendFn: deliver.sendEncodedData, - }, - logger: chunkLogger, - chunkInfo: chunk, + enc: newChunkEncoder( + chunkName, + queryRowEncodeReader(rowCh), + -1, + deliver.sendEncodedData, + chunkLogger, + encoder, + kvCodec, + ), + logger: chunkLogger, + groupChecksum: groupChecksum, } } // IndexRouteWriter is a writer for index when using global sort. // we route kvs of different index to different writer in order to make // merge sort easier, else kv data of all subtasks will all be overlapped. -// -// drawback of doing this is that the number of writers need to open will be -// index-count * encode-concurrency, when the table has many indexes, and each -// writer will take 256MiB buffer on default. -// this will take a lot of memory, or even OOM. type IndexRouteWriter struct { writers map[int64]*external.Writer logger *zap.Logger @@ -596,24 +557,23 @@ func NewIndexRouteWriter(logger *zap.Logger, writerFactory func(int64) *external // AppendRows implements backend.EngineWriter interface. func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { - kvs := kv.Rows2KvPairs(rows) - if len(kvs) == 0 { - return nil + groupedKvs, ok := rows.(kv.GroupedPairs) + if !ok { + return errors.Errorf("invalid kv pairs type for IndexRouteWriter: %T", rows) } - for _, item := range kvs { - indexID, err := tablecodec.DecodeIndexID(item.Key) - if err != nil { - return errors.Trace(err) - } - writer, ok := w.writers[indexID] - if !ok { - writer = w.writerFactory(indexID) - w.writers[indexID] = writer - } - if err = writer.WriteRow(ctx, item.Key, item.Val, nil); err != nil { - return errors.Trace(err) + for indexID, kvs := range groupedKvs { + for _, item := range kvs { + writer, ok := w.writers[indexID] + if !ok { + writer = w.writerFactory(indexID) + w.writers[indexID] = writer + } + if err := writer.WriteRow(ctx, item.Key, item.Val, nil); err != nil { + return errors.Trace(err) + } } } + return nil } diff --git a/pkg/executor/importer/chunk_process_testkit_test.go b/pkg/executor/importer/chunk_process_testkit_test.go index f24fc70ee6b8d..7a5f8605d583e 100644 --- a/pkg/executor/importer/chunk_process_testkit_test.go +++ b/pkg/executor/importer/chunk_process_testkit_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/mydump" + verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/executor/importer" @@ -132,21 +133,28 @@ func TestFileChunkProcess(t *testing.T) { }).AnyTimes() indexWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, columnNames []string, rows encode.Rows) error { - indexKVCnt += len(rows.(*kv.Pairs).Pairs) + group := rows.(kv.GroupedPairs) + for _, pairs := range group { + indexKVCnt += len(pairs) + } return nil }).AnyTimes() chunkInfo := &checkpoints.ChunkCheckpoint{ Chunk: mydump.Chunk{EndOffset: int64(len(sourceData)), RowIDMax: 10000}, } + checksum := verify.NewKVGroupChecksumWithKeyspace(codec) processor := importer.NewFileChunkProcessor( csvParser, encoder, codec, - chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, + chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, checksum, ) require.NoError(t, processor.Process(ctx)) require.True(t, ctrl.Satisfied()) - require.Equal(t, uint64(9), chunkInfo.Checksum.SumKVS()) - require.Equal(t, uint64(348), chunkInfo.Checksum.SumSize()) + checksumDataKVCnt, checksumIndexKVCnt := checksum.DataAndIndexSumKVS() + require.Equal(t, uint64(3), checksumDataKVCnt) + require.Equal(t, uint64(6), checksumIndexKVCnt) + dataKVSize, indexKVSize := checksum.DataAndIndexSumSize() + require.Equal(t, uint64(348), dataKVSize+indexKVSize) require.Equal(t, 3, dataKVCnt) require.Equal(t, 6, indexKVCnt) require.Equal(t, float64(len(sourceData)), metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateRestored))) @@ -174,7 +182,7 @@ func TestFileChunkProcess(t *testing.T) { } processor := importer.NewFileChunkProcessor( csvParser, encoder, codec, - chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, + chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, nil, ) require.ErrorIs(t, processor.Process(ctx), common.ErrEncodeKV) require.True(t, ctrl.Satisfied()) @@ -199,7 +207,7 @@ func TestFileChunkProcess(t *testing.T) { } processor := importer.NewFileChunkProcessor( csvParser, encoder, codec, - chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, + chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, nil, ) require.ErrorIs(t, processor.Process(ctx), common.ErrEncodeKV) require.True(t, ctrl.Satisfied()) @@ -223,7 +231,7 @@ func TestFileChunkProcess(t *testing.T) { } processor := importer.NewFileChunkProcessor( csvParser, encoder, codec, - chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, + chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, nil, ) require.ErrorContains(t, processor.Process(ctx), "data write error") require.True(t, ctrl.Satisfied()) @@ -248,7 +256,7 @@ func TestFileChunkProcess(t *testing.T) { } processor := importer.NewFileChunkProcessor( csvParser, encoder, codec, - chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, + chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, nil, ) require.ErrorContains(t, processor.Process(ctx), "index write error") require.True(t, ctrl.Satisfied()) diff --git a/pkg/executor/importer/engine_process.go b/pkg/executor/importer/engine_process.go index 51803023ac8fd..6720bdb8e45fe 100644 --- a/pkg/executor/importer/engine_process.go +++ b/pkg/executor/importer/engine_process.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/lightning/verification" "go.uber.org/zap" ) @@ -29,10 +29,10 @@ func ProcessChunk( ctx context.Context, chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, - dataEngine, - indexEngine *backend.OpenedEngine, + dataEngine, indexEngine *backend.OpenedEngine, progress *Progress, logger *zap.Logger, + groupChecksum *verification.KVGroupChecksum, ) error { // if the key are ordered, LocalWrite can optimize the writing. // table has auto-incremented _tidb_rowid must satisfy following restrictions: @@ -66,33 +66,19 @@ func ProcessChunk( } }() - return ProcessChunkWith(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger) + return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum) } -// ProcessChunkWith processes a chunk, and write kv pairs to dataWriter and indexWriter. -func ProcessChunkWith( +// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter. +func ProcessChunkWithWriter( ctx context.Context, chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataWriter, indexWriter backend.EngineWriter, progress *Progress, logger *zap.Logger, + groupChecksum *verification.KVGroupChecksum, ) error { - var ( - err error - parser mydump.Parser - ) - if tableImporter.DataSourceType == DataSourceTypeFile { - parser, err = tableImporter.getParser(ctx, chunk) - if err != nil { - return err - } - defer func() { - if err2 := parser.Close(); err2 != nil { - logger.Warn("close parser failed", zap.Error(err2)) - } - }() - } encoder, err := tableImporter.getKVEncoder(chunk) if err != nil { return err @@ -106,15 +92,25 @@ func ProcessChunkWith( // TODO: right now we use this chunk processor for global sort too, will // impl another one for it later. var cp ChunkProcessor - if tableImporter.DataSourceType == DataSourceTypeQuery { - cp = newQueryChunkProcessor( - tableImporter.rowCh, encoder, tableImporter.kvStore.GetCodec(), chunk, logger, - tableImporter.diskQuotaLock, dataWriter, indexWriter, - ) - } else { + switch tableImporter.DataSourceType { + case DataSourceTypeFile: + parser, err := tableImporter.getParser(ctx, chunk) + if err != nil { + return err + } + defer func() { + if err2 := parser.Close(); err2 != nil { + logger.Warn("close parser failed", zap.Error(err2)) + } + }() cp = NewFileChunkProcessor( - parser, encoder, tableImporter.kvStore.GetCodec(), chunk, logger, - tableImporter.diskQuotaLock, dataWriter, indexWriter, + parser, encoder, tableImporter.GetCodec(), chunk, logger, + tableImporter.diskQuotaLock, dataWriter, indexWriter, groupChecksum, + ) + case DataSourceTypeQuery: + cp = newQueryChunkProcessor( + tableImporter.rowCh, encoder, tableImporter.GetCodec(), logger, + tableImporter.diskQuotaLock, dataWriter, indexWriter, groupChecksum, ) } err = cp.Process(ctx) diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index 1495b8334cbcc..7665238bc2916 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -224,10 +224,12 @@ func TestPostProcess(t *testing.T) { logger := zap.NewExample() // verify checksum failed - localChecksum := verify.MakeKVChecksum(1, 2, 1) + localChecksum := verify.NewKVGroupChecksumForAdd() + localChecksum.AddRawGroup(verify.DataKVGroupID, 1, 2, 1) require.ErrorIs(t, importer.PostProcess(ctx, tk.Session(), nil, plan, localChecksum, logger), common.ErrChecksumMismatch) // success - localChecksum = verify.MakeKVChecksum(1, 1, 1) + localChecksum = verify.NewKVGroupChecksumForAdd() + localChecksum.AddRawGroup(verify.DataKVGroupID, 1, 1, 1) require.NoError(t, importer.PostProcess(ctx, tk.Session(), nil, plan, localChecksum, logger)) // get KV store failed importer.GetKVStore = func(path string, tls kvconfig.Security) (kv.Storage, error) { @@ -320,10 +322,13 @@ func TestProcessChunkWith(t *testing.T) { kvWriter := mock.NewMockEngineWriter(ctrl) kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() progress := importer.NewProgress() - err := importer.ProcessChunkWith(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample()) + checksum := verify.NewKVGroupChecksumWithKeyspace(store.GetCodec()) + err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum) require.NoError(t, err) require.Len(t, progress.GetColSize(), 3) - require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), chunkInfo.Checksum) + checksumMap := checksum.GetInnerChecksums() + require.Len(t, checksumMap, 1) + require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID]) }) t.Run("query chunk", func(t *testing.T) { @@ -349,10 +354,13 @@ func TestProcessChunkWith(t *testing.T) { kvWriter := mock.NewMockEngineWriter(ctrl) kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() progress := importer.NewProgress() - err := importer.ProcessChunkWith(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample()) + checksum := verify.NewKVGroupChecksumWithKeyspace(store.GetCodec()) + err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum) require.NoError(t, err) require.Len(t, progress.GetColSize(), 3) - require.Equal(t, verify.MakeKVChecksum(111, 3, 14231358899564314836), chunkInfo.Checksum) + checksumMap := checksum.GetInnerChecksums() + require.Len(t, checksumMap, 1) + require.Equal(t, verify.MakeKVChecksum(111, 3, 14231358899564314836), *checksumMap[verify.DataKVGroupID]) }) } diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 9dd11d1f72e28..15214346a3b01 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -61,6 +61,7 @@ import ( "github.com/pingcap/tidb/pkg/util/sqlkiller" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/util" pdhttp "github.com/tikv/pd/client/http" clientv3 "go.etcd.io/etcd/client/v3" @@ -315,6 +316,11 @@ func NewTableImporterForTest(param *JobImportParam, e *LoadDataController, id st }, nil } +// GetCodec gets the codec of the kv store. +func (ti *TableImporter) GetCodec() tikv.Codec { + return ti.kvStore.GetCodec() +} + func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { info := LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { @@ -690,23 +696,24 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C var ( mu sync.Mutex - checksum verify.KVChecksum + checksum = verify.NewKVGroupChecksumWithKeyspace(ti.GetCodec()) colSizeMap = make(map[int64]int64) ) eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx) for i := 0; i < ti.ThreadCnt; i++ { eg.Go(func() error { chunkCheckpoint := checkpoints.ChunkCheckpoint{} + chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.GetCodec()) progress := NewProgress() defer func() { mu.Lock() defer mu.Unlock() - checksum.Add(&chunkCheckpoint.Checksum) + checksum.Add(chunkChecksum) for k, v := range progress.GetColSize() { colSizeMap[k] += v } }() - return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger) + return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger, chunkChecksum) }) } if err = eg.Wait(); err != nil { @@ -782,10 +789,10 @@ func PostProcess( se sessionctx.Context, maxIDs map[autoid.AllocatorType]int64, plan *Plan, - localChecksum verify.KVChecksum, + localChecksum *verify.KVGroupChecksum, logger *zap.Logger, ) (err error) { - callLog := log.BeginTask(logger.With(zap.Any("checksum", localChecksum)), "post process") + callLog := log.BeginTask(logger.With(zap.Object("checksum", localChecksum)), "post process") defer func() { callLog.End(zap.ErrorLevel, err) }() @@ -794,7 +801,7 @@ func PostProcess( return err } - return VerifyChecksum(ctx, plan, localChecksum, se, logger) + return VerifyChecksum(ctx, plan, localChecksum.MergedChecksum(), se, logger) } type autoIDRequirement struct {