Skip to content

Commit

Permalink
import into: log checksum for each KV group (#50871)
Browse files Browse the repository at this point in the history
ref #50832
  • Loading branch information
lance6716 authored Feb 18, 2024
1 parent 464a126 commit 65d57c0
Show file tree
Hide file tree
Showing 20 changed files with 662 additions and 462 deletions.
31 changes: 30 additions & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ type Pairs struct {
MemBuf *MemBuf
}

// GroupedPairs is a map from index ID to KvPairs.
type GroupedPairs map[int64][]common.KvPair

// SplitIntoChunks implements the encode.Rows interface. It just satisfies the
// type system and should never be called.
func (GroupedPairs) SplitIntoChunks(int) []encode.Rows {
panic("not implemented")
}

// Clear implements the encode.Rows interface. It just satisfies the type system
// and should never be called.
func (GroupedPairs) Clear() encode.Rows {
panic("not implemented")
}

// MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is
// mainly used for testing only. The resulting Rows instance should only be used
// for the importer backend.
Expand All @@ -171,7 +186,21 @@ func MakeRowFromKvPairs(pairs []common.KvPair) encode.Row {
// back into a slice of KvPair. This method panics if the Rows is not
// constructed in such way.
func Rows2KvPairs(rows encode.Rows) []common.KvPair {
return rows.(*Pairs).Pairs
switch v := rows.(type) {
case *Pairs:
return v.Pairs
case GroupedPairs:
cnt := 0
for _, pairs := range v {
cnt += len(pairs)
}
res := make([]common.KvPair, 0, cnt)
for _, pairs := range v {
res = append(res, pairs...)
}
return res
}
panic(fmt.Sprintf("unknown Rows type %T", rows))
}

// Row2KvPairs converts a Row instance constructed from MakeRowFromKvPairs
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 29,
shard_count = 28,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand Down
20 changes: 5 additions & 15 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,16 @@ func KillMySelf() error {
return errors.Trace(err)
}

// KvPair is a pair of key and value.
// KvPair contains a key-value pair and other fields that can be used to ingest
// KV pairs into TiKV.
type KvPair struct {
// Key is the key of the KV pair
Key []byte
// Val is the value of the KV pair
Val []byte
// RowID is the row id of the KV pair.
// RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It's
// often set to the file offset of the KvPair in the source file or the record
// handle.
RowID []byte
}

Expand All @@ -482,19 +485,6 @@ func TableHasAutoID(info *model.TableInfo) bool {
return TableHasAutoRowID(info) || info.GetAutoIncrementColInfo() != nil || info.ContainsAutoRandomBits()
}

// StringSliceEqual checks if two string slices are equal.
func StringSliceEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

// GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it.
// todo: better put in ddl package, but this will cause import cycle since ddl package import lightning
func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
Expand Down
13 changes: 0 additions & 13 deletions br/pkg/lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,6 @@ func TestSQLWithRetry(t *testing.T) {
require.Nil(t, mock.ExpectationsWereMet())
}

func TestStringSliceEqual(t *testing.T) {
assert.True(t, common.StringSliceEqual(nil, nil))
assert.True(t, common.StringSliceEqual(nil, []string{}))
assert.False(t, common.StringSliceEqual(nil, []string{"a"}))
assert.False(t, common.StringSliceEqual([]string{"a"}, nil))
assert.True(t, common.StringSliceEqual([]string{"a"}, []string{"a"}))
assert.False(t, common.StringSliceEqual([]string{"a"}, []string{"b"}))
assert.True(t, common.StringSliceEqual([]string{"a", "b", "c"}, []string{"a", "b", "c"}))
assert.False(t, common.StringSliceEqual([]string{"a"}, []string{"a", "b", "c"}))
assert.False(t, common.StringSliceEqual([]string{"a", "b", "c"}, []string{"a", "b"}))
assert.False(t, common.StringSliceEqual([]string{"a", "x", "y"}, []string{"a", "y", "x"}))
}

func TestInterpolateMySQLString(t *testing.T) {
assert.Equal(t, "'123'", common.InterpolateMySQLString("123"))
assert.Equal(t, "'1''23'", common.InterpolateMySQLString("1'23"))
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ go_test(
flaky = True,
shard_count = 48,
deps = [
"//br/pkg/lightning/common",
"@com_github_burntsushi_toml//:toml",
"@com_github_stretchr_testify//require",
],
Expand Down
30 changes: 8 additions & 22 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1221,35 +1220,22 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
[mydumper]
filter = ["db1.tbl1", "db2.*", "!db2.tbl1"]
`)))
require.Equal(t, 3, len(cfg1.Mydumper.Filter))
require.True(t, common.StringSliceEqual(
cfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg))
require.Equal(t, []string{"db1.tbl1", "db2.*", "!db2.tbl1"}, cfg1.Mydumper.Filter)
require.Equal(t, GetDefaultFilter(), originalDefaultCfg)

cfg2 := NewConfig()
require.True(t, common.StringSliceEqual(
cfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg))
require.Equal(t, originalDefaultCfg, cfg2.Mydumper.Filter)
require.Equal(t, GetDefaultFilter(), originalDefaultCfg)

gCfg1, err := LoadGlobalConfig([]string{"-f", "db1.tbl1", "-f", "db2.*", "-f", "!db2.tbl1"}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg))
require.Equal(t, []string{"db1.tbl1", "db2.*", "!db2.tbl1"}, gCfg1.Mydumper.Filter)
require.Equal(t, GetDefaultFilter(), originalDefaultCfg)

gCfg2, err := LoadGlobalConfig([]string{}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg))
require.Equal(t, originalDefaultCfg, gCfg2.Mydumper.Filter)
require.Equal(t, GetDefaultFilter(), originalDefaultCfg)
}

func TestCompressionType(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/verification/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ go_test(
timeout = "short",
srcs = ["checksum_test.go"],
flaky = True,
shard_count = 3,
deps = [
":verification",
"//br/pkg/lightning/common",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
],
)
143 changes: 137 additions & 6 deletions br/pkg/lightning/verification/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (

var ecmaTable = crc64.MakeTable(crc64.ECMA)

// KVChecksum is the checksum of a collection of key-value pairs.
// KVChecksum is the checksum of a collection of key-value pairs. The zero value
// of KVChecksum is a checksum for empty content and zero keyspace.
type KVChecksum struct {
base uint64
prefixLen int
Expand All @@ -34,11 +35,9 @@ type KVChecksum struct {
checksum uint64
}

// NewKVChecksum creates a new KVChecksum with the given checksum.
func NewKVChecksum(checksum uint64) *KVChecksum {
return &KVChecksum{
checksum: checksum,
}
// NewKVChecksum creates a pointer to zero KVChecksum.
func NewKVChecksum() *KVChecksum {
return &KVChecksum{}
}

// NewKVChecksumWithKeyspace creates a new KVChecksum with the given checksum and keyspace.
Expand Down Expand Up @@ -127,3 +126,135 @@ func (c *KVChecksum) MarshalJSON() ([]byte, error) {
result := fmt.Sprintf(`{"checksum":%d,"size":%d,"kvs":%d}`, c.checksum, c.bytes, c.kvs)
return []byte(result), nil
}

// KVGroupChecksum is KVChecksum(s) each for a data KV group or index KV groups.
type KVGroupChecksum struct {
m map[int64]*KVChecksum
codec tikv.Codec
}

// DataKVGroupID represents the ID for data KV group, as index id starts from 1,
// so we use -1 to represent data kv group.
const DataKVGroupID = -1

// NewKVGroupChecksumWithKeyspace creates a new KVGroupChecksum with the given
// keyspace.
func NewKVGroupChecksumWithKeyspace(k tikv.Codec) *KVGroupChecksum {
m := make(map[int64]*KVChecksum, 8)
m[DataKVGroupID] = NewKVChecksumWithKeyspace(k)
return &KVGroupChecksum{m: m, codec: k}
}

// NewKVGroupChecksumForAdd creates a new KVGroupChecksum, and it can't be used
// with UpdateOneDataKV or UpdateOneIndexKV.
func NewKVGroupChecksumForAdd() *KVGroupChecksum {
m := make(map[int64]*KVChecksum, 8)
m[DataKVGroupID] = NewKVChecksum()
return &KVGroupChecksum{m: m}
}

// UpdateOneDataKV updates the checksum with a single data(record) key-value
// pair. It will not check the key-value pair's key is a real data key again.
func (c *KVGroupChecksum) UpdateOneDataKV(kv common.KvPair) {
c.m[DataKVGroupID].UpdateOne(kv)
}

// UpdateOneIndexKV updates the checksum with a single index key-value pair. It
// will not check the key-value pair's key is a real index key of that index ID
// again.
func (c *KVGroupChecksum) UpdateOneIndexKV(indexID int64, kv common.KvPair) {
cksum := c.m[indexID]
if cksum == nil {
cksum = NewKVChecksumWithKeyspace(c.codec)
c.m[indexID] = cksum
}
cksum.UpdateOne(kv)
}

// Add adds the checksum of another KVGroupChecksum.
func (c *KVGroupChecksum) Add(other *KVGroupChecksum) {
for id, cksum := range other.m {
thisCksum := c.getOrCreateOneGroup(id)
thisCksum.Add(cksum)
}
}

func (c *KVGroupChecksum) getOrCreateOneGroup(id int64) *KVChecksum {
cksum, ok := c.m[id]
if ok {
return cksum
}
if c.codec == nil {
cksum = NewKVChecksum()
} else {
cksum = NewKVChecksumWithKeyspace(c.codec)
}
c.m[id] = cksum
return cksum
}

// AddRawGroup adds the raw information of a KV group.
func (c *KVGroupChecksum) AddRawGroup(id int64, bytes, kvs, checksum uint64) {
oneGroup := c.getOrCreateOneGroup(id)
tmp := MakeKVChecksum(bytes, kvs, checksum)
oneGroup.Add(&tmp)
}

// DataAndIndexSumSize returns the total size of data KV pairs and index KV pairs.
func (c *KVGroupChecksum) DataAndIndexSumSize() (dataSize, indexSize uint64) {
for id, cksum := range c.m {
if id == DataKVGroupID {
dataSize = cksum.SumSize()
} else {
indexSize += cksum.SumSize()
}
}
return
}

// DataAndIndexSumKVS returns the total number of data KV pairs and index KV pairs.
func (c *KVGroupChecksum) DataAndIndexSumKVS() (dataKVS, indexKVS uint64) {
for id, cksum := range c.m {
if id == DataKVGroupID {
dataKVS = cksum.SumKVS()
} else {
indexKVS += cksum.SumKVS()
}
}
return
}

// GetInnerChecksums returns a cloned map of index ID to its KVChecksum.
func (c *KVGroupChecksum) GetInnerChecksums() map[int64]*KVChecksum {
m := make(map[int64]*KVChecksum, len(c.m))
for id, cksum := range c.m {
m[id] = &KVChecksum{
base: cksum.base,
prefixLen: cksum.prefixLen,
bytes: cksum.bytes,
kvs: cksum.kvs,
checksum: cksum.checksum,
}
}
return m
}

// MergedChecksum merges all groups of this checksum into a single KVChecksum.
func (c *KVGroupChecksum) MergedChecksum() KVChecksum {
merged := NewKVChecksum()
for _, cksum := range c.m {
merged.Add(cksum)
}
return *merged
}

// MarshalLogObject implements the zapcore.ObjectMarshaler interface.
func (c *KVGroupChecksum) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
for id, cksum := range c.m {
err := encoder.AddObject(fmt.Sprintf("id=%d", id), cksum)
if err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit 65d57c0

Please sign in to comment.