Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import into: log checksum for each KV group #50871

Merged
merged 17 commits into from
Feb 18, 2024
31 changes: 30 additions & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,21 @@ type Pairs struct {
MemBuf *MemBuf
}

// GroupedPairs is a map from index ID to KvPairs.
type GroupedPairs map[int64][]common.KvPair

// SplitIntoChunks implements the encode.Rows interface. It just satisfies the
// type system and should never be called.
func (GroupedPairs) SplitIntoChunks(int) []encode.Rows {
panic("not implemented")
}

// Clear implements the encode.Rows interface. It just satisfies the type system
// and should never be called.
func (GroupedPairs) Clear() encode.Rows {
panic("not implemented")
}

// MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is
// mainly used for testing only. The resulting Rows instance should only be used
// for the importer backend.
Expand All @@ -166,7 +181,21 @@ func MakeRowFromKvPairs(pairs []common.KvPair) encode.Row {
// back into a slice of KvPair. This method panics if the Rows is not
// constructed in such way.
func Rows2KvPairs(rows encode.Rows) []common.KvPair {
return rows.(*Pairs).Pairs
switch v := rows.(type) {
case *Pairs:
return v.Pairs
case GroupedPairs:
cnt := 0
for _, pairs := range v {
cnt += len(pairs)
}
res := make([]common.KvPair, 0, cnt)
for _, pairs := range v {
res = append(res, pairs...)
}
return res
}
panic(fmt.Sprintf("unknown Rows type %T", rows))
}

// Row2KvPairs converts a Row instance constructed from MakeRowFromKvPairs
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 29,
shard_count = 28,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand Down
20 changes: 5 additions & 15 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,16 @@ func KillMySelf() error {
return errors.Trace(err)
}

// KvPair is a pair of key and value.
// KvPair contains a key-value pair and other fields that can be used to ingest
// KV pairs into TiKV.
type KvPair struct {
// Key is the key of the KV pair
Key []byte
// Val is the value of the KV pair
Val []byte
// RowID is the row id of the KV pair.
// RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It's
// often set to the file offset of the KvPair in the source file or the record
// handle.
RowID []byte
}

Expand All @@ -482,19 +485,6 @@ func TableHasAutoID(info *model.TableInfo) bool {
return TableHasAutoRowID(info) || info.GetAutoIncrementColInfo() != nil || info.ContainsAutoRandomBits()
}

// StringSliceEqual checks if two string slices are equal.
func StringSliceEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

// GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it.
// todo: better put in ddl package, but this will cause import cycle since ddl package import lightning
func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
Expand Down
13 changes: 0 additions & 13 deletions br/pkg/lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,6 @@ func TestSQLWithRetry(t *testing.T) {
require.Nil(t, mock.ExpectationsWereMet())
}

func TestStringSliceEqual(t *testing.T) {
assert.True(t, common.StringSliceEqual(nil, nil))
assert.True(t, common.StringSliceEqual(nil, []string{}))
assert.False(t, common.StringSliceEqual(nil, []string{"a"}))
assert.False(t, common.StringSliceEqual([]string{"a"}, nil))
assert.True(t, common.StringSliceEqual([]string{"a"}, []string{"a"}))
assert.False(t, common.StringSliceEqual([]string{"a"}, []string{"b"}))
assert.True(t, common.StringSliceEqual([]string{"a", "b", "c"}, []string{"a", "b", "c"}))
assert.False(t, common.StringSliceEqual([]string{"a"}, []string{"a", "b", "c"}))
assert.False(t, common.StringSliceEqual([]string{"a", "b", "c"}, []string{"a", "b"}))
assert.False(t, common.StringSliceEqual([]string{"a", "x", "y"}, []string{"a", "y", "x"}))
}

func TestInterpolateMySQLString(t *testing.T) {
assert.Equal(t, "'123'", common.InterpolateMySQLString("123"))
assert.Equal(t, "'1''23'", common.InterpolateMySQLString("1'23"))
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ go_test(
flaky = True,
shard_count = 48,
deps = [
"//br/pkg/lightning/common",
"@com_github_burntsushi_toml//:toml",
"@com_github_stretchr_testify//require",
],
Expand Down
30 changes: 8 additions & 22 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1221,35 +1220,22 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
[mydumper]
filter = ["db1.tbl1", "db2.*", "!db2.tbl1"]
`)))
require.Equal(t, 3, len(cfg1.Mydumper.Filter))
require.True(t, common.StringSliceEqual(
cfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg))
require.Equal(t, []string{"db1.tbl1", "db2.*", "!db2.tbl1"}, cfg1.Mydumper.Filter)
require.Equal(t, GetDefaultFilter(), originalDefaultCfg)

cfg2 := NewConfig()
require.True(t, common.StringSliceEqual(
cfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg))
require.Equal(t, originalDefaultCfg, cfg2.Mydumper.Filter)
require.Equal(t, GetDefaultFilter(), originalDefaultCfg)

gCfg1, err := LoadGlobalConfig([]string{"-f", "db1.tbl1", "-f", "db2.*", "-f", "!db2.tbl1"}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg))
require.Equal(t, []string{"db1.tbl1", "db2.*", "!db2.tbl1"}, gCfg1.Mydumper.Filter)
require.Equal(t, GetDefaultFilter(), originalDefaultCfg)

gCfg2, err := LoadGlobalConfig([]string{}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(GetDefaultFilter(), originalDefaultCfg))
require.Equal(t, originalDefaultCfg, gCfg2.Mydumper.Filter)
require.Equal(t, GetDefaultFilter(), originalDefaultCfg)
}

func TestCompressionType(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/verification/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ go_test(
timeout = "short",
srcs = ["checksum_test.go"],
flaky = True,
shard_count = 3,
deps = [
":verification",
"//br/pkg/lightning/common",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
],
)
143 changes: 137 additions & 6 deletions br/pkg/lightning/verification/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (

var ecmaTable = crc64.MakeTable(crc64.ECMA)

// KVChecksum is the checksum of a collection of key-value pairs.
// KVChecksum is the checksum of a collection of key-value pairs. The zero value
// of KVChecksum is a checksum for empty content and zero keyspace.
type KVChecksum struct {
base uint64
prefixLen int
Expand All @@ -34,11 +35,9 @@ type KVChecksum struct {
checksum uint64
}

// NewKVChecksum creates a new KVChecksum with the given checksum.
func NewKVChecksum(checksum uint64) *KVChecksum {
return &KVChecksum{
checksum: checksum,
}
// NewKVChecksum creates a pointer to zero KVChecksum.
func NewKVChecksum() *KVChecksum {
return &KVChecksum{}
}

// NewKVChecksumWithKeyspace creates a new KVChecksum with the given checksum and keyspace.
Expand Down Expand Up @@ -127,3 +126,135 @@ func (c *KVChecksum) MarshalJSON() ([]byte, error) {
result := fmt.Sprintf(`{"checksum":%d,"size":%d,"kvs":%d}`, c.checksum, c.bytes, c.kvs)
return []byte(result), nil
}

// KVGroupChecksum is KVChecksum(s) each for a data KV group or index KV groups.
type KVGroupChecksum struct {
m map[int64]*KVChecksum
codec tikv.Codec
}

// DataKVGroupID represents the ID for data KV group, as index id starts from 1,
// so we use -1 to represent data kv group.
const DataKVGroupID = -1

// NewKVGroupChecksumWithKeyspace creates a new KVGroupChecksum with the given
// keyspace.
func NewKVGroupChecksumWithKeyspace(k tikv.Codec) *KVGroupChecksum {
m := make(map[int64]*KVChecksum, 8)
m[DataKVGroupID] = NewKVChecksumWithKeyspace(k)
return &KVGroupChecksum{m: m, codec: k}
}

// NewKVGroupChecksumForAdd creates a new KVGroupChecksum, and it can't be used
// with UpdateOneDataKV or UpdateOneIndexKV.
func NewKVGroupChecksumForAdd() *KVGroupChecksum {
m := make(map[int64]*KVChecksum, 8)
m[DataKVGroupID] = NewKVChecksum()
return &KVGroupChecksum{m: m}
}

// UpdateOneDataKV updates the checksum with a single data(record) key-value
// pair. It will not check the key-value pair's key is a real data key again.
func (c *KVGroupChecksum) UpdateOneDataKV(kv common.KvPair) {
c.m[DataKVGroupID].UpdateOne(kv)
}

// UpdateOneIndexKV updates the checksum with a single index key-value pair. It
// will not check the key-value pair's key is a real index key of that index ID
// again.
func (c *KVGroupChecksum) UpdateOneIndexKV(indexID int64, kv common.KvPair) {
cksum := c.m[indexID]
if cksum == nil {
cksum = NewKVChecksumWithKeyspace(c.codec)
c.m[indexID] = cksum
}
cksum.UpdateOne(kv)
}

// Add adds the checksum of another KVGroupChecksum.
func (c *KVGroupChecksum) Add(other *KVGroupChecksum) {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
for id, cksum := range other.m {
thisCksum := c.getOrCreateOneGroup(id)
thisCksum.Add(cksum)
}
}

func (c *KVGroupChecksum) getOrCreateOneGroup(id int64) *KVChecksum {
cksum, ok := c.m[id]
if ok {
return cksum
}
if c.codec == nil {
cksum = NewKVChecksum()
} else {
cksum = NewKVChecksumWithKeyspace(c.codec)
}
c.m[id] = cksum
return cksum
}

// AddRawGroup adds the raw information of a KV group.
func (c *KVGroupChecksum) AddRawGroup(id int64, bytes, kvs, checksum uint64) {
oneGroup := c.getOrCreateOneGroup(id)
tmp := MakeKVChecksum(bytes, kvs, checksum)
oneGroup.Add(&tmp)
}

// DataAndIndexSumSize returns the total size of data KV pairs and index KV pairs.
func (c *KVGroupChecksum) DataAndIndexSumSize() (dataSize, indexSize uint64) {
for id, cksum := range c.m {
if id == DataKVGroupID {
dataSize = cksum.SumSize()
} else {
indexSize += cksum.SumSize()
}
}
return
}

// DataAndIndexSumKVS returns the total number of data KV pairs and index KV pairs.
func (c *KVGroupChecksum) DataAndIndexSumKVS() (dataKVS, indexKVS uint64) {
for id, cksum := range c.m {
if id == DataKVGroupID {
dataKVS = cksum.SumKVS()
} else {
indexKVS += cksum.SumKVS()
}
}
return
}

// GetInnerChecksums returns a cloned map of index ID to its KVChecksum.
func (c *KVGroupChecksum) GetInnerChecksums() map[int64]*KVChecksum {
m := make(map[int64]*KVChecksum, len(c.m))
for id, cksum := range c.m {
m[id] = &KVChecksum{
base: cksum.base,
prefixLen: cksum.prefixLen,
bytes: cksum.bytes,
kvs: cksum.kvs,
checksum: cksum.checksum,
}
}
return m
}

// MergedChecksum merges all groups of this checksum into a single KVChecksum.
func (c *KVGroupChecksum) MergedChecksum() KVChecksum {
merged := NewKVChecksum()
for _, cksum := range c.m {
merged.Add(cksum)
}
return *merged
}

// MarshalLogObject implements the zapcore.ObjectMarshaler interface.
func (c *KVGroupChecksum) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
for id, cksum := range c.m {
err := encoder.AddObject(fmt.Sprintf("id=%d", id), cksum)
if err != nil {
return err
}
}
return nil
}
Loading
Loading