Skip to content

Commit

Permalink
lightning: local backend use external engine part 1 (#46406)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
lance6716 authored Aug 28, 2023
1 parent 2163271 commit 5ecf980
Show file tree
Hide file tree
Showing 22 changed files with 595 additions and 272 deletions.
3 changes: 1 addition & 2 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ go_library(
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
Expand All @@ -33,6 +32,7 @@ go_library(
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -54,7 +54,6 @@ go_test(
shard_count = 22,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/common",
"//br/pkg/storage",
"//kv",
Expand Down
140 changes: 129 additions & 11 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ import (
"context"
"encoding/hex"
"sort"
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -35,6 +38,108 @@ type Engine struct {
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
bufPool *membuf.Pool

iter *MergeKVIter

keyAdapter common.KeyAdapter
duplicateDetection bool
duplicateDB *pebble.DB
dupDetectOpt common.DupDetectOpt
ts uint64

importedKVSize *atomic.Int64
importedKVCount *atomic.Int64
}

// NewExternalEngine creates an (external) engine.
func NewExternalEngine(
storage storage.ExternalStorage,
dataFiles []string,
statsFiles []string,
keyAdapter common.KeyAdapter,
duplicateDetection bool,
duplicateDB *pebble.DB,
dupDetectOpt common.DupDetectOpt,
ts uint64,
) common.Engine {
return &Engine{
storage: storage,
dataFiles: dataFiles,
statsFiles: statsFiles,
bufPool: membuf.NewPool(),
keyAdapter: keyAdapter,
duplicateDetection: duplicateDetection,
duplicateDB: duplicateDB,
dupDetectOpt: dupDetectOpt,
ts: ts,
importedKVSize: atomic.NewInt64(0),
importedKVCount: atomic.NewInt64(0),
}
}

// LoadIngestData loads the data from the external storage to memory in [start,
// end) range, so local backend can ingest it. The used byte slice of ingest data
// are allocated from Engine.bufPool and must be released by
// MemoryIngestData.Finish(). For external.Engine, LoadIngestData must be called
// with strictly increasing start / end key.
func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common.IngestData, error) {
if bytes.Equal(start, end) {
return nil, errors.Errorf("start key and end key must not be the same: %s",
hex.EncodeToString(start))
}

now := time.Now()
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
memBuf := e.bufPool.NewBuffer()

if e.iter == nil {
iter, err := e.createMergeIter(ctx, start)
if err != nil {
return nil, errors.Trace(err)
}
e.iter = iter
} else {
// there should be a key that just exceeds the end key in last LoadIngestData
// invocation.
k, v := e.iter.Key(), e.iter.Value()
keys = append(keys, memBuf.AddBytes(k))
values = append(values, memBuf.AddBytes(v))
}

cnt := 0
for e.iter.Next() {
cnt++
k, v := e.iter.Key(), e.iter.Value()
if bytes.Compare(k, start) < 0 {
continue
}
if bytes.Compare(k, end) >= 0 {
break
}
keys = append(keys, memBuf.AddBytes(k))
values = append(values, memBuf.AddBytes(v))
}
if e.iter.Error() != nil {
return nil, errors.Trace(e.iter.Error())
}

logutil.Logger(ctx).Info("load data from external storage",
zap.Duration("cost time", time.Since(now)),
zap.Int("iterated count", cnt))
return &MemoryIngestData{
keyAdapter: e.keyAdapter,
duplicateDetection: e.duplicateDetection,
duplicateDB: e.duplicateDB,
dupDetectOpt: e.dupDetectOpt,
keys: keys,
values: values,
ts: e.ts,
memBuf: memBuf,
importedKVSize: e.importedKVSize,
importedKVCount: e.importedKVCount,
}, nil
}

func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) {
Expand Down Expand Up @@ -64,24 +169,36 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte
return iter, nil
}

// Close releases the resources of the engine.
func (e *Engine) Close() error {
if e.iter == nil {
return nil
}
return errors.Trace(e.iter.Close())
}

// MemoryIngestData is the in-memory implementation of IngestData.
type MemoryIngestData struct {
keyAdapter local.KeyAdapter
keyAdapter common.KeyAdapter
duplicateDetection bool
duplicateDB *pebble.DB
dupDetectOpt local.DupDetectOpt
dupDetectOpt common.DupDetectOpt

keys [][]byte
values [][]byte
ts uint64

memBuf *membuf.Buffer
importedKVSize *atomic.Int64
importedKVCount *atomic.Int64
}

var _ local.IngestData = (*MemoryIngestData)(nil)
var _ common.IngestData = (*MemoryIngestData)(nil)

func (m *MemoryIngestData) firstAndLastKeyIndex(lowerBound, upperBound []byte) (int, int) {
firstKeyIdx := 0
if len(lowerBound) > 0 {
lowerBound = m.keyAdapter.Encode(nil, lowerBound, local.MinRowID)
lowerBound = m.keyAdapter.Encode(nil, lowerBound, common.MinRowID)
firstKeyIdx = sort.Search(len(m.keys), func(i int) bool {
return bytes.Compare(lowerBound, m.keys[i]) <= 0
})
Expand All @@ -92,7 +209,7 @@ func (m *MemoryIngestData) firstAndLastKeyIndex(lowerBound, upperBound []byte) (

lastKeyIdx := len(m.keys) - 1
if len(upperBound) > 0 {
upperBound = m.keyAdapter.Encode(nil, upperBound, local.MinRowID)
upperBound = m.keyAdapter.Encode(nil, upperBound, common.MinRowID)
i := sort.Search(len(m.keys), func(i int) bool {
reverseIdx := len(m.keys) - 1 - i
return bytes.Compare(upperBound, m.keys[reverseIdx]) > 0
Expand Down Expand Up @@ -174,7 +291,7 @@ func (m *memoryDataIter) Error() error {

type memoryDataDupDetectIter struct {
iter *memoryDataIter
dupDetector *local.DupDetector
dupDetector *common.DupDetector
err error
curKey, curVal []byte
}
Expand Down Expand Up @@ -231,7 +348,7 @@ func (m *memoryDataDupDetectIter) Error() error {
}

// NewIter implements IngestData.NewIter.
func (m *MemoryIngestData) NewIter(ctx context.Context, lowerBound, upperBound []byte) local.ForwardIter {
func (m *MemoryIngestData) NewIter(ctx context.Context, lowerBound, upperBound []byte) common.ForwardIter {
firstKeyIdx, lastKeyIdx := m.firstAndLastKeyIndex(lowerBound, upperBound)
iter := &memoryDataIter{
keys: m.keys,
Expand All @@ -243,7 +360,7 @@ func (m *MemoryIngestData) NewIter(ctx context.Context, lowerBound, upperBound [
return iter
}
logger := log.FromContext(ctx)
detector := local.NewDupDetector(m.keyAdapter, m.duplicateDB.NewBatch(), logger, m.dupDetectOpt)
detector := common.NewDupDetector(m.keyAdapter, m.duplicateDB.NewBatch(), logger, m.dupDetectOpt)
return &memoryDataDupDetectIter{
iter: iter,
dupDetector: detector,
Expand All @@ -257,6 +374,7 @@ func (m *MemoryIngestData) GetTS() uint64 {

// Finish implements IngestData.Finish.
func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) {
//TODO implement me
panic("implement me")
m.importedKVSize.Add(totalBytes)
m.importedKVCount.Add(totalCount)
m.memBuf.Destroy()
}
9 changes: 4 additions & 5 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -114,7 +113,7 @@ func TestIter(t *testing.T) {

func testGetFirstAndLastKey(
t *testing.T,
data local.IngestData,
data common.IngestData,
lowerBound, upperBound []byte,
expectedFirstKey, expectedLastKey []byte,
) {
Expand All @@ -126,7 +125,7 @@ func testGetFirstAndLastKey(

func testNewIter(
t *testing.T,
data local.IngestData,
data common.IngestData,
lowerBound, upperBound []byte,
expectedKeys, expectedValues [][]byte,
) {
Expand Down Expand Up @@ -186,7 +185,7 @@ func TestMemoryIngestData(t *testing.T) {
[]byte("value5"),
}
data := &MemoryIngestData{
keyAdapter: local.NoopKeyAdapter{},
keyAdapter: common.NoopKeyAdapter{},
keys: keys,
values: values,
ts: 123,
Expand All @@ -212,7 +211,7 @@ func TestMemoryIngestData(t *testing.T) {
dir := t.TempDir()
db, err := pebble.Open(path.Join(dir, "duplicate"), nil)
require.NoError(t, err)
keyAdapter := local.DupDetectKeyAdapter{}
keyAdapter := common.DupDetectKeyAdapter{}
data = &MemoryIngestData{
keyAdapter: keyAdapter,
duplicateDetection: true,
Expand Down
36 changes: 33 additions & 3 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import (
"sort"
"strings"

"github.com/pingcap/tidb/br/pkg/utils"
"golang.org/x/sync/errgroup"

kv2 "github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// prettyFileNames removes the directory prefix except the last level from the
Expand Down Expand Up @@ -156,3 +157,32 @@ func CleanUpFiles(ctx context.Context,
}
return eg.Wait()
}

// MockExternalEngine generates an external engine with the given keys and values.
func MockExternalEngine(
storage storage.ExternalStorage,
keys [][]byte,
values [][]byte,
) (dataFiles []string, statsFiles []string, err error) {
ctx := context.Background()
writer := NewWriterBuilder().
SetMemorySizeLimit(128).
SetPropSizeDistance(32).
SetPropKeysDistance(4).
Build(storage, "/mock-test", 0)
kvs := make([]common.KvPair, len(keys))
for i := range keys {
kvs[i].Key = keys[i]
kvs[i].Val = values[i]
}
rows := kv2.MakeRowsFromKvPairs(kvs)
err = writer.AppendRows(ctx, nil, rows)
if err != nil {
return nil, nil, err
}
_, err = writer.Close(ctx)
if err != nil {
return nil, nil, err
}
return GetAllFileNames(ctx, storage, "/mock-test")
}
7 changes: 3 additions & 4 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -152,9 +151,9 @@ func (b *WriterBuilder) Build(
bp = membuf.NewPool()
}
filenamePrefix := filepath.Join(prefix, strconv.Itoa(writerID))
keyAdapter := local.KeyAdapter(local.NoopKeyAdapter{})
keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{})
if b.dupeDetectEnabled {
keyAdapter = local.DupDetectKeyAdapter{}
keyAdapter = common.DupDetectKeyAdapter{}
}
return &Writer{
rc: &rangePropertiesCollector{
Expand Down Expand Up @@ -183,7 +182,7 @@ type Writer struct {
writerID int
currentSeq int
filenamePrefix string
keyAdapter local.KeyAdapter
keyAdapter common.KeyAdapter

kvStore *KeyValueStore
rc *rangePropertiesCollector
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/lightning/backend/external/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
dbkv "github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -195,12 +194,12 @@ func TestWriterDuplicateDetect(t *testing.T) {
dir := t.TempDir()
db, err := pebble.Open(path.Join(dir, "duplicate"), nil)
require.NoError(t, err)
keyAdapter := local.DupDetectKeyAdapter{}
keyAdapter := common.DupDetectKeyAdapter{}
data := &MemoryIngestData{
keyAdapter: keyAdapter,
duplicateDetection: true,
duplicateDB: db,
dupDetectOpt: local.DupDetectOpt{ReportErrOnDup: true},
dupDetectOpt: common.DupDetectOpt{ReportErrOnDup: true},
keys: keys,
values: values,
ts: 123,
Expand Down
Loading

0 comments on commit 5ecf980

Please sign in to comment.