Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/lightning: change KvPair's row ID type from int64 to []bytes #41787

Merged
merged 14 commits into from
Mar 1, 2023
Merged
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ func (kvcodec *tableKVEncoder) Encode(
}
kvPairs := kvcodec.se.takeKvPairs()
for i := 0; i < len(kvPairs.pairs); i++ {
kvPairs.pairs[i].RowID = rowID
var encoded [8]byte
tangenta marked this conversation as resolved.
Show resolved Hide resolved
kvPairs.pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID)
}
kvcodec.recordCache = record[:0]
return kvPairs, nil
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestEncode(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0x2},
RowID: 2,
RowID: common.EncodeIntRowID(2),
},
}))

Expand All @@ -140,7 +140,7 @@ func TestEncode(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
RowID: 1,
RowID: common.EncodeIntRowID(1),
},
}))
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestEncodeRowFormatV2(t *testing.T) {
0x1, 0x0, // not null offsets = [1]
0x7f, // column version = 127 (10000000 clamped to TINYINT)
},
RowID: 1,
RowID: common.EncodeIntRowID(1),
},
}))
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestEncodeTimestamp(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
RowID: 70,
RowID: common.EncodeIntRowID(70),
},
}))
}
Expand Down Expand Up @@ -346,12 +346,12 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
RowID: 70,
RowID: common.EncodeIntRowID(70),
},
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
RowID: common.EncodeIntRowID(70),
},
}), pairsExpect)

Expand Down Expand Up @@ -459,7 +459,7 @@ func TestDefaultAutoRandoms(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 70,
RowID: common.EncodeIntRowID(70),
},
}))
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(70))
Expand All @@ -470,7 +470,7 @@ func TestDefaultAutoRandoms(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 71,
RowID: common.EncodeIntRowID(71),
},
}))
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(71))
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ func (m *DuplicateManager) CollectDuplicateRowsFromDupDB(ctx context.Context, du
}

// Delete the key range in duplicate DB since we have the duplicates have been collected.
rawStartKey := keyAdapter.Encode(nil, task.StartKey, math.MinInt64)
rawEndKey := keyAdapter.Encode(nil, task.EndKey, math.MinInt64)
rawStartKey := keyAdapter.Encode(nil, task.StartKey, MinRowID)
rawEndKey := keyAdapter.Encode(nil, task.EndKey, MinRowID)
err = dupDB.DeleteRange(rawStartKey, rawEndKey, nil)
return errors.Trace(err)
})
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
keyAdapter := w.engine.keyAdapter
totalKeySize := 0
for i := 0; i < len(kvs); i++ {
keySize := keyAdapter.EncodedLen(kvs[i].Key)
keySize := keyAdapter.EncodedLen(kvs[i].Key, kvs[i].RowID)
w.batchSize += int64(keySize + len(kvs[i].Val))
totalKeySize += keySize
}
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
}
lastKey = pair.Key
w.batchSize += int64(len(pair.Key) + len(pair.Val))
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key))
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)
if cnt < l {
Expand Down
13 changes: 6 additions & 7 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package local
import (
"bytes"
"context"
"math"

"github.com/cockroachdb/pebble"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
Expand Down Expand Up @@ -91,7 +90,7 @@ type dupDetectOpt struct {
}

func (d *dupDetectIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, 0)
rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
Expand Down Expand Up @@ -209,10 +208,10 @@ func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter,
opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt dupDetectOpt) *dupDetectIter {
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
if len(opts.LowerBound) > 0 {
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID)
}
if len(opts.UpperBound) > 0 {
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64)
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID)
}
return &dupDetectIter{
ctx: ctx,
Expand All @@ -232,7 +231,7 @@ type dupDBIter struct {
}

func (d *dupDBIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, 0)
rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
Expand Down Expand Up @@ -296,10 +295,10 @@ var _ Iter = &dupDBIter{}
func newDupDBIter(dupDB *pebble.DB, keyAdapter KeyAdapter, opts *pebble.IterOptions) *dupDBIter {
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
if len(opts.LowerBound) > 0 {
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID)
}
if len(opts.UpperBound) > 0 {
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64)
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID)
}
return &dupDBIter{
iter: dupDB.NewIter(newOpts),
Expand Down
34 changes: 24 additions & 10 deletions br/pkg/lightning/backend/local/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestDupDetectIterator(t *testing.T) {
pairs = append(pairs, common.KvPair{
Key: randBytes(32),
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
Expand All @@ -47,13 +47,13 @@ func TestDupDetectIterator(t *testing.T) {
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
Expand All @@ -63,19 +63,19 @@ func TestDupDetectIterator(t *testing.T) {
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
Expand Down Expand Up @@ -184,22 +184,22 @@ func TestDupDetectIterSeek(t *testing.T) {
{
Key: []byte{1, 2, 3, 0},
Val: randBytes(128),
RowID: 1,
RowID: common.EncodeIntRowID(1),
},
{
Key: []byte{1, 2, 3, 1},
Val: randBytes(128),
RowID: 2,
RowID: common.EncodeIntRowID(2),
},
{
Key: []byte{1, 2, 3, 1},
Val: randBytes(128),
RowID: 3,
RowID: common.EncodeIntRowID(3),
},
{
Key: []byte{1, 2, 3, 2},
Val: randBytes(128),
RowID: 4,
RowID: common.EncodeIntRowID(4),
},
}

Expand Down Expand Up @@ -227,3 +227,17 @@ func TestDupDetectIterSeek(t *testing.T) {
require.NoError(t, db.Close())
require.NoError(t, dupDB.Close())
}

func TestKeyAdapterEncoding(t *testing.T) {
keyAdapter := dupDetectKeyAdapter{}
srcKey := []byte{1, 2, 3}
v := keyAdapter.Encode(nil, srcKey, common.EncodeIntRowID(1))
resKey, err := keyAdapter.Decode(nil, v)
require.NoError(t, err)
require.EqualValues(t, srcKey, resKey)

v = keyAdapter.Encode(nil, srcKey, []byte("mock_common_handle"))
resKey, err = keyAdapter.Decode(nil, v)
require.NoError(t, err)
require.EqualValues(t, srcKey, resKey)
}
40 changes: 26 additions & 14 deletions br/pkg/lightning/backend/local/key_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@
package local

import (
"encoding/binary"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/util/codec"
)

// KeyAdapter is used to encode and decode keys.
type KeyAdapter interface {
// Encode encodes the key with its corresponding rowID. It appends the encoded key to dst and returns the
// resulting slice. The encoded key is guaranteed to be in ascending order for comparison.
Encode(dst []byte, key []byte, rowID int64) []byte
Encode(dst []byte, key []byte, rowID []byte) []byte

// Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice.
Decode(dst []byte, data []byte) ([]byte, error)

// EncodedLen returns the encoded key length.
EncodedLen(key []byte) int
EncodedLen(key []byte, rowID []byte) int
}

func reallocBytes(b []byte, n int) []byte {
Expand All @@ -46,36 +47,42 @@ func reallocBytes(b []byte, n int) []byte {

type noopKeyAdapter struct{}

func (noopKeyAdapter) Encode(dst []byte, key []byte, _ int64) []byte {
func (noopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte {
return append(dst, key...)
}

func (noopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
return append(dst, data...), nil
}

func (noopKeyAdapter) EncodedLen(key []byte) int {
func (noopKeyAdapter) EncodedLen(key []byte, _ []byte) int {
return len(key)
}

var _ KeyAdapter = noopKeyAdapter{}

type dupDetectKeyAdapter struct{}

func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID int64) []byte {
func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte {
dst = codec.EncodeBytes(dst, key)
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
dst = reallocBytes(dst, 8)
n := len(dst)
dst = dst[:n+8]
binary.BigEndian.PutUint64(dst[n:n+8], codec.EncodeIntToCmpUint(rowID))
dst = reallocBytes(dst, len(rowID)+2)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
dst = append(dst, key...)
dst = append(dst, rowID...)
rowIDLen := uint16(len(rowID))
dst = append(dst, byte(rowIDLen>>8), byte(rowIDLen))
return dst
}

func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
if len(data) < 8 {
if len(data) < 2 {
return nil, errors.New("insufficient bytes to decode value")
}
_, key, err := codec.DecodeBytes(data[:len(data)-8], dst[len(dst):cap(dst)])
rowIDLen := uint16(data[len(data)-2])<<8 | uint16(data[len(data)-1])
tailLen := int(rowIDLen + 2)
if len(data) < tailLen {
return nil, errors.New("insufficient bytes to decode value")
}
_, key, err := codec.DecodeBytes(data[:len(data)-tailLen], dst[len(dst):cap(dst)])
if err != nil {
return nil, err
}
Expand All @@ -90,8 +97,13 @@ func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
return append(dst, key...), nil
}

func (dupDetectKeyAdapter) EncodedLen(key []byte) int {
return codec.EncodedBytesLength(len(key)) + 8
func (dupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int {
return codec.EncodedBytesLength(len(key)) + len(rowID) + 2
}

var _ KeyAdapter = dupDetectKeyAdapter{}

var (
MinRowID = common.EncodeIntRowID(math.MinInt64)
ZeroRowID = common.EncodeIntRowID(0)
)
Loading