diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 77bce0b05dd24..943636a19deae 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -433,7 +433,8 @@ func (kvcodec *tableKVEncoder) Encode( } kvPairs := kvcodec.se.takeKvPairs() for i := 0; i < len(kvPairs.pairs); i++ { - kvPairs.pairs[i].RowID = rowID + var encoded [9]byte // The max length of encoded int64 is 9. + kvPairs.pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID) } kvcodec.recordCache = record[:0] return kvPairs, nil diff --git a/br/pkg/lightning/backend/kv/sql2kv_test.go b/br/pkg/lightning/backend/kv/sql2kv_test.go index 8c14731cf82dd..9d60167f87e6f 100644 --- a/br/pkg/lightning/backend/kv/sql2kv_test.go +++ b/br/pkg/lightning/backend/kv/sql2kv_test.go @@ -113,7 +113,7 @@ func TestEncode(t *testing.T) { { Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, Val: []uint8{0x8, 0x2, 0x8, 0x2}, - RowID: 2, + RowID: common.EncodeIntRowID(2), }, })) @@ -140,7 +140,7 @@ func TestEncode(t *testing.T) { { Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1}, - RowID: 1, + RowID: common.EncodeIntRowID(1), }, })) } @@ -274,7 +274,7 @@ func TestEncodeRowFormatV2(t *testing.T) { 0x1, 0x0, // not null offsets = [1] 0x7f, // column version = 127 (10000000 clamped to TINYINT) }, - RowID: 1, + RowID: common.EncodeIntRowID(1), }, })) } @@ -313,7 +313,7 @@ func TestEncodeTimestamp(t *testing.T) { { Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46}, Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19}, - RowID: 70, + RowID: common.EncodeIntRowID(70), }, })) } @@ -346,12 +346,12 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) { { Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46}, Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - RowID: 70, + RowID: common.EncodeIntRowID(70), }, { Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46}, - RowID: 70, + RowID: common.EncodeIntRowID(70), }, }), pairsExpect) @@ -459,7 +459,7 @@ func TestDefaultAutoRandoms(t *testing.T) { { Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46}, Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0}, - RowID: 70, + RowID: common.EncodeIntRowID(70), }, })) require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(70)) @@ -470,7 +470,7 @@ func TestDefaultAutoRandoms(t *testing.T) { { Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47}, Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0}, - RowID: 71, + RowID: common.EncodeIntRowID(71), }, })) require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(71)) diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index fe6cd110a026c..ff8f4f04821cd 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -699,8 +699,8 @@ func (m *DuplicateManager) CollectDuplicateRowsFromDupDB(ctx context.Context, du } // Delete the key range in duplicate DB since we have the duplicates have been collected. - rawStartKey := keyAdapter.Encode(nil, task.StartKey, math.MinInt64) - rawEndKey := keyAdapter.Encode(nil, task.EndKey, math.MinInt64) + rawStartKey := keyAdapter.Encode(nil, task.StartKey, MinRowID) + rawEndKey := keyAdapter.Encode(nil, task.EndKey, MinRowID) err = dupDB.DeleteRange(rawStartKey, rawEndKey, nil) return errors.Trace(err) }) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 395b69ece0800..8e0554a76bed0 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1070,7 +1070,7 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { keyAdapter := w.engine.keyAdapter totalKeySize := 0 for i := 0; i < len(kvs); i++ { - keySize := keyAdapter.EncodedLen(kvs[i].Key) + keySize := keyAdapter.EncodedLen(kvs[i].Key, kvs[i].RowID) w.batchSize += int64(keySize + len(kvs[i].Val)) totalKeySize += keySize } @@ -1107,7 +1107,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er } lastKey = pair.Key w.batchSize += int64(len(pair.Key) + len(pair.Val)) - buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key)) + buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID)) key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID) val := w.kvBuffer.AddBytes(pair.Val) if cnt < l { diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index 29ed50b743773..c55547ea60f66 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -17,7 +17,6 @@ package local import ( "bytes" "context" - "math" "github.com/cockroachdb/pebble" sst "github.com/pingcap/kvproto/pkg/import_sstpb" @@ -91,7 +90,7 @@ type dupDetectOpt struct { } func (d *dupDetectIter) Seek(key []byte) bool { - rawKey := d.keyAdapter.Encode(nil, key, 0) + rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID) if d.err != nil || !d.iter.SeekGE(rawKey) { return false } @@ -209,10 +208,10 @@ func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter, opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt dupDetectOpt) *dupDetectIter { newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter} if len(opts.LowerBound) > 0 { - newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64) + newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID) } if len(opts.UpperBound) > 0 { - newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64) + newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID) } return &dupDetectIter{ ctx: ctx, @@ -232,7 +231,7 @@ type dupDBIter struct { } func (d *dupDBIter) Seek(key []byte) bool { - rawKey := d.keyAdapter.Encode(nil, key, 0) + rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID) if d.err != nil || !d.iter.SeekGE(rawKey) { return false } @@ -296,10 +295,10 @@ var _ Iter = &dupDBIter{} func newDupDBIter(dupDB *pebble.DB, keyAdapter KeyAdapter, opts *pebble.IterOptions) *dupDBIter { newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter} if len(opts.LowerBound) > 0 { - newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64) + newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID) } if len(opts.UpperBound) > 0 { - newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64) + newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID) } return &dupDBIter{ iter: dupDB.NewIter(newOpts), diff --git a/br/pkg/lightning/backend/local/iterator_test.go b/br/pkg/lightning/backend/local/iterator_test.go index c183963443bae..ee643bf42ce0b 100644 --- a/br/pkg/lightning/backend/local/iterator_test.go +++ b/br/pkg/lightning/backend/local/iterator_test.go @@ -37,7 +37,7 @@ func TestDupDetectIterator(t *testing.T) { pairs = append(pairs, common.KvPair{ Key: randBytes(32), Val: randBytes(128), - RowID: prevRowMax, + RowID: common.EncodeIntRowID(prevRowMax), }) prevRowMax++ } @@ -47,13 +47,13 @@ func TestDupDetectIterator(t *testing.T) { pairs = append(pairs, common.KvPair{ Key: key, Val: randBytes(128), - RowID: prevRowMax, + RowID: common.EncodeIntRowID(prevRowMax), }) prevRowMax++ pairs = append(pairs, common.KvPair{ Key: key, Val: randBytes(128), - RowID: prevRowMax, + RowID: common.EncodeIntRowID(prevRowMax), }) prevRowMax++ } @@ -63,19 +63,19 @@ func TestDupDetectIterator(t *testing.T) { pairs = append(pairs, common.KvPair{ Key: key, Val: randBytes(128), - RowID: prevRowMax, + RowID: common.EncodeIntRowID(prevRowMax), }) prevRowMax++ pairs = append(pairs, common.KvPair{ Key: key, Val: randBytes(128), - RowID: prevRowMax, + RowID: common.EncodeIntRowID(prevRowMax), }) prevRowMax++ pairs = append(pairs, common.KvPair{ Key: key, Val: randBytes(128), - RowID: prevRowMax, + RowID: common.EncodeIntRowID(prevRowMax), }) prevRowMax++ } @@ -184,22 +184,22 @@ func TestDupDetectIterSeek(t *testing.T) { { Key: []byte{1, 2, 3, 0}, Val: randBytes(128), - RowID: 1, + RowID: common.EncodeIntRowID(1), }, { Key: []byte{1, 2, 3, 1}, Val: randBytes(128), - RowID: 2, + RowID: common.EncodeIntRowID(2), }, { Key: []byte{1, 2, 3, 1}, Val: randBytes(128), - RowID: 3, + RowID: common.EncodeIntRowID(3), }, { Key: []byte{1, 2, 3, 2}, Val: randBytes(128), - RowID: 4, + RowID: common.EncodeIntRowID(4), }, } @@ -227,3 +227,17 @@ func TestDupDetectIterSeek(t *testing.T) { require.NoError(t, db.Close()) require.NoError(t, dupDB.Close()) } + +func TestKeyAdapterEncoding(t *testing.T) { + keyAdapter := dupDetectKeyAdapter{} + srcKey := []byte{1, 2, 3} + v := keyAdapter.Encode(nil, srcKey, common.EncodeIntRowID(1)) + resKey, err := keyAdapter.Decode(nil, v) + require.NoError(t, err) + require.EqualValues(t, srcKey, resKey) + + v = keyAdapter.Encode(nil, srcKey, []byte("mock_common_handle")) + resKey, err = keyAdapter.Decode(nil, v) + require.NoError(t, err) + require.EqualValues(t, srcKey, resKey) +} diff --git a/br/pkg/lightning/backend/local/key_adapter.go b/br/pkg/lightning/backend/local/key_adapter.go index 7a767cbc04f62..2dacccea2d697 100644 --- a/br/pkg/lightning/backend/local/key_adapter.go +++ b/br/pkg/lightning/backend/local/key_adapter.go @@ -15,9 +15,10 @@ package local import ( - "encoding/binary" + "math" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/util/codec" ) @@ -25,13 +26,13 @@ import ( type KeyAdapter interface { // Encode encodes the key with its corresponding rowID. It appends the encoded key to dst and returns the // resulting slice. The encoded key is guaranteed to be in ascending order for comparison. - Encode(dst []byte, key []byte, rowID int64) []byte + Encode(dst []byte, key []byte, rowID []byte) []byte // Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice. Decode(dst []byte, data []byte) ([]byte, error) // EncodedLen returns the encoded key length. - EncodedLen(key []byte) int + EncodedLen(key []byte, rowID []byte) int } func reallocBytes(b []byte, n int) []byte { @@ -46,7 +47,7 @@ func reallocBytes(b []byte, n int) []byte { type noopKeyAdapter struct{} -func (noopKeyAdapter) Encode(dst []byte, key []byte, _ int64) []byte { +func (noopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte { return append(dst, key...) } @@ -54,7 +55,7 @@ func (noopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) { return append(dst, data...), nil } -func (noopKeyAdapter) EncodedLen(key []byte) int { +func (noopKeyAdapter) EncodedLen(key []byte, _ []byte) int { return len(key) } @@ -62,20 +63,25 @@ var _ KeyAdapter = noopKeyAdapter{} type dupDetectKeyAdapter struct{} -func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID int64) []byte { +func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte { dst = codec.EncodeBytes(dst, key) - dst = reallocBytes(dst, 8) - n := len(dst) - dst = dst[:n+8] - binary.BigEndian.PutUint64(dst[n:n+8], codec.EncodeIntToCmpUint(rowID)) + dst = reallocBytes(dst, len(rowID)+2) + dst = append(dst, rowID...) + rowIDLen := uint16(len(rowID)) + dst = append(dst, byte(rowIDLen>>8), byte(rowIDLen)) return dst } func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) { - if len(data) < 8 { + if len(data) < 2 { return nil, errors.New("insufficient bytes to decode value") } - _, key, err := codec.DecodeBytes(data[:len(data)-8], dst[len(dst):cap(dst)]) + rowIDLen := uint16(data[len(data)-2])<<8 | uint16(data[len(data)-1]) + tailLen := int(rowIDLen + 2) + if len(data) < tailLen { + return nil, errors.New("insufficient bytes to decode value") + } + _, key, err := codec.DecodeBytes(data[:len(data)-tailLen], dst[len(dst):cap(dst)]) if err != nil { return nil, err } @@ -90,8 +96,13 @@ func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) { return append(dst, key...), nil } -func (dupDetectKeyAdapter) EncodedLen(key []byte) int { - return codec.EncodedBytesLength(len(key)) + 8 +func (dupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int { + return codec.EncodedBytesLength(len(key)) + len(rowID) + 2 } var _ KeyAdapter = dupDetectKeyAdapter{} + +var ( + MinRowID = common.EncodeIntRowID(math.MinInt64) + ZeroRowID = common.EncodeIntRowID(0) +) diff --git a/br/pkg/lightning/backend/local/key_adapter_test.go b/br/pkg/lightning/backend/local/key_adapter_test.go index 4b9abe1c25c3f..d80efa6de2af4 100644 --- a/br/pkg/lightning/backend/local/key_adapter_test.go +++ b/br/pkg/lightning/backend/local/key_adapter_test.go @@ -22,6 +22,7 @@ import ( "testing" "unsafe" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/stretchr/testify/require" ) @@ -34,8 +35,8 @@ func randBytes(n int) []byte { func TestNoopKeyAdapter(t *testing.T) { keyAdapter := noopKeyAdapter{} key := randBytes(32) - require.Len(t, key, keyAdapter.EncodedLen(key)) - encodedKey := keyAdapter.Encode(nil, key, 0) + require.Len(t, key, keyAdapter.EncodedLen(key, ZeroRowID)) + encodedKey := keyAdapter.Encode(nil, key, ZeroRowID) require.Equal(t, key, encodedKey) decodedKey, err := keyAdapter.Decode(nil, encodedKey) @@ -68,8 +69,9 @@ func TestDupDetectKeyAdapter(t *testing.T) { keyAdapter := dupDetectKeyAdapter{} for _, input := range inputs { - result := keyAdapter.Encode(nil, input.key, input.rowID) - require.Equal(t, keyAdapter.EncodedLen(input.key), len(result)) + encodedRowID := common.EncodeIntRowID(input.rowID) + result := keyAdapter.Encode(nil, input.key, encodedRowID) + require.Equal(t, keyAdapter.EncodedLen(input.key, encodedRowID), len(result)) // Decode the result. key, err := keyAdapter.Decode(nil, result) @@ -89,7 +91,7 @@ func TestDupDetectKeyOrder(t *testing.T) { keyAdapter := dupDetectKeyAdapter{} encodedKeys := make([][]byte, 0, len(keys)) for _, key := range keys { - encodedKeys = append(encodedKeys, keyAdapter.Encode(nil, key, 1)) + encodedKeys = append(encodedKeys, keyAdapter.Encode(nil, key, common.EncodeIntRowID(1))) } sorted := sort.SliceIsSorted(encodedKeys, func(i, j int) bool { return bytes.Compare(encodedKeys[i], encodedKeys[j]) < 0 @@ -100,8 +102,8 @@ func TestDupDetectKeyOrder(t *testing.T) { func TestDupDetectEncodeDupKey(t *testing.T) { keyAdapter := dupDetectKeyAdapter{} key := randBytes(32) - result1 := keyAdapter.Encode(nil, key, 10) - result2 := keyAdapter.Encode(nil, key, 20) + result1 := keyAdapter.Encode(nil, key, common.EncodeIntRowID(10)) + result2 := keyAdapter.Encode(nil, key, common.EncodeIntRowID(20)) require.NotEqual(t, result1, result2) } @@ -114,7 +116,7 @@ func TestEncodeKeyToPreAllocatedBuf(t *testing.T) { for _, keyAdapter := range keyAdapters { key := randBytes(32) buf := make([]byte, 256) - buf2 := keyAdapter.Encode(buf[:4], key, 1) + buf2 := keyAdapter.Encode(buf[:4], key, common.EncodeIntRowID(1)) require.True(t, startWithSameMemory(buf, buf2)) // Verify the encoded result first. key2, err := keyAdapter.Decode(nil, buf2[4:]) @@ -126,7 +128,7 @@ func TestEncodeKeyToPreAllocatedBuf(t *testing.T) { func TestDecodeKeyToPreAllocatedBuf(t *testing.T) { data := []byte{ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7, - 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, + 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8, } keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}} for _, keyAdapter := range keyAdapters { @@ -143,7 +145,7 @@ func TestDecodeKeyToPreAllocatedBuf(t *testing.T) { func TestDecodeKeyDstIsInsufficient(t *testing.T) { data := []byte{ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7, - 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, + 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8, } keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}} for _, keyAdapter := range keyAdapters { diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 808832cd7e5a8..48b1c6a9a5385 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//store/driver/error", "//table/tables", "//util", + "//util/codec", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index c92b071502453..cd18d199adac0 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -38,6 +38,7 @@ import ( tmysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" ) @@ -392,7 +393,15 @@ type KvPair struct { // Val is the value of the KV pair Val []byte // RowID is the row id of the KV pair. - RowID int64 + RowID []byte +} + +// EncodeIntRowIDToBuf encodes an int64 row id to a buffer. +var EncodeIntRowIDToBuf = codec.EncodeComparableVarint + +// EncodeIntRowID encodes an int64 row id. +func EncodeIntRowID(rowID int64) []byte { + return codec.EncodeComparableVarint(nil, rowID) } // TableHasAutoRowID return whether table has auto generated row id diff --git a/ddl/index.go b/ddl/index.go index f9bfeba446855..f64e3e49f29c5 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1781,7 +1781,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC if err != nil { return errors.Trace(err) } - err = w.writerCtx.WriteRow(key, idxVal) + err = w.writerCtx.WriteRow(key, idxVal, idxRecord.handle) if err != nil { return errors.Trace(err) } diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index 24779ef9d7718..7898cbbec6d55 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -146,7 +147,7 @@ func (ei *engineInfo) ImportAndClean() error { // WriterContext is used to keep a lightning local writer for each backfill worker. type WriterContext struct { ctx context.Context - rowSeq func() int64 + unique bool lWrite *backend.LocalEngineWriter } @@ -191,13 +192,9 @@ func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContex } wc := &WriterContext{ ctx: ei.ctx, + unique: unique, lWrite: lWrite, } - if unique { - wc.rowSeq = func() int64 { - return ei.rowSeq.Add(1) - } - } return wc, nil } @@ -218,12 +215,12 @@ func (ei *engineInfo) closeWriters() error { } // WriteRow Write one row into local writer buffer. -func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error { +func (wCtx *WriterContext) WriteRow(key, idxVal []byte, handle tidbkv.Handle) error { kvs := make([]common.KvPair, 1) kvs[0].Key = key kvs[0].Val = idxVal - if wCtx.rowSeq != nil { - kvs[0].RowID = wCtx.rowSeq() + if wCtx.unique { + kvs[0].RowID = handle.Encoded() } row := kv.MakeRowsFromKvPairs(kvs) return wCtx.lWrite.WriteRows(wCtx.ctx, nil, row)