Skip to content

Commit

Permalink
Merge pull request #20 from tangenta/new_writer-10
Browse files Browse the repository at this point in the history
make system variables take effects
  • Loading branch information
wjhuang2016 authored Jul 14, 2023
2 parents 8ea5d48 + 871cf77 commit ee42d9c
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 77 deletions.
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,9 +1037,9 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) (err error) {
totalKeySize += keySize
}
w.batchCount += len(kvs)
// NoopKeyAdapter doesn't really change the key,
// noopKeyAdapter doesn't really change the key,
// skipping the encoding to avoid unnecessary alloc and copy.
if _, ok := keyAdapter.(NoopKeyAdapter); !ok {
if _, ok := keyAdapter.(noopKeyAdapter); !ok {
if cap(w.sortedKeyBuf) < totalKeySize {
w.sortedKeyBuf = make([]byte, totalKeySize)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: NoopKeyAdapter{},
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/key_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ func reallocBytes(b []byte, n int) []byte {
return b
}

type NoopKeyAdapter struct{}
type noopKeyAdapter struct{}

func (NoopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte {
func (noopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte {
return append(dst, key...)
}

func (NoopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
func (noopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
return append(dst, data...), nil
}

func (NoopKeyAdapter) EncodedLen(key []byte, _ []byte) int {
func (noopKeyAdapter) EncodedLen(key []byte, _ []byte) int {
return len(key)
}

var _ KeyAdapter = NoopKeyAdapter{}
var _ KeyAdapter = noopKeyAdapter{}

type dupDetectKeyAdapter struct{}

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/local/key_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func randBytes(n int) []byte {
}

func TestNoopKeyAdapter(t *testing.T) {
keyAdapter := NoopKeyAdapter{}
keyAdapter := noopKeyAdapter{}
key := randBytes(32)
require.Len(t, key, keyAdapter.EncodedLen(key, ZeroRowID))
encodedKey := keyAdapter.Encode(nil, key, ZeroRowID)
Expand Down Expand Up @@ -112,7 +112,7 @@ func startWithSameMemory(x []byte, y []byte) bool {
}

func TestEncodeKeyToPreAllocatedBuf(t *testing.T) {
keyAdapters := []KeyAdapter{NoopKeyAdapter{}, dupDetectKeyAdapter{}}
keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}}
for _, keyAdapter := range keyAdapters {
key := randBytes(32)
buf := make([]byte, 256)
Expand All @@ -130,7 +130,7 @@ func TestDecodeKeyToPreAllocatedBuf(t *testing.T) {
0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7,
0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8,
}
keyAdapters := []KeyAdapter{NoopKeyAdapter{}, dupDetectKeyAdapter{}}
keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}}
for _, keyAdapter := range keyAdapters {
key, err := keyAdapter.Decode(nil, data)
require.NoError(t, err)
Expand All @@ -147,7 +147,7 @@ func TestDecodeKeyDstIsInsufficient(t *testing.T) {
0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7,
0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8,
}
keyAdapters := []KeyAdapter{NoopKeyAdapter{}, dupDetectKeyAdapter{}}
keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}}
for _, keyAdapter := range keyAdapters {
key, err := keyAdapter.Decode(nil, data)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func NewBackend(
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
importClientFactory := NewImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
keyAdapter := KeyAdapter(NoopKeyAdapter{})
keyAdapter := KeyAdapter(noopKeyAdapter{})
if config.DupeDetectEnabled {
keyAdapter = dupDetectKeyAdapter{}
}
Expand Down
14 changes: 7 additions & 7 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestRangeProperties(t *testing.T) {
userProperties := make(map[string]string, 1)
_ = collector.Finish(userProperties)

props, err := decodeRangeProperties(hack.Slice(userProperties[propRangeIndex]), NoopKeyAdapter{})
props, err := decodeRangeProperties(hack.Slice(userProperties[propRangeIndex]), noopKeyAdapter{})
require.NoError(t, err)

// Smallest key in props.
Expand Down Expand Up @@ -333,7 +333,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: NoopKeyAdapter{},
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func TestCheckPeersBusy(t *testing.T) {
ctx: engineCtx,
cancel: cancel2,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: NoopKeyAdapter{},
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
ctx: engineCtx,
cancel: cancel2,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: NoopKeyAdapter{},
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
Expand Down Expand Up @@ -1400,7 +1400,7 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
ctx: engineCtx,
cancel: cancel2,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: NoopKeyAdapter{},
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
Expand Down Expand Up @@ -1507,7 +1507,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
ctx: engineCtx,
cancel: cancel2,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: NoopKeyAdapter{},
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
Expand Down Expand Up @@ -1646,7 +1646,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: NoopKeyAdapter{},
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
Expand Down
29 changes: 21 additions & 8 deletions br/pkg/lightning/backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,7 @@ func (remote *Backend) SetRange(ctx context.Context, start, end kv.Key, dataFile
func (remote *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
switch remote.phase {
case PhaseUpload:
for _, w := range remote.mu.writers {
_, err := w.Close(ctx)
if err != nil {
return err
}
}
// Do nothing for uploading stage.
return nil
case PhaseImport:
if len(remote.startKey) == 0 {
Expand Down Expand Up @@ -332,7 +327,9 @@ func (remote *Backend) LocalWriter(ctx context.Context, cfg *backend.LocalWriter
zap.Uint64("totalSize", s.TotalSize))
}
prefix := filepath.Join(strconv.Itoa(int(remote.jobID)), engineUUID.String())
writer := sharedisk.NewWriter(ctx, remote.externalStorage, prefix, remote.allocWriterID(), onClose)
writer := sharedisk.NewWriter(ctx, remote.externalStorage, prefix,
remote.allocWriterID(), remote.config.MemQuota, remote.config.StatSampleKeys,
remote.config.StatSampleSize, remote.config.WriteBatchSize, onClose)
remote.mu.Lock()
remote.mu.writers = append(remote.mu.writers, writer)
remote.mu.Unlock()
Expand Down Expand Up @@ -362,6 +359,16 @@ func (remote *Backend) handleWriterSummary(s *sharedisk.WriterSummary) {
remote.mu.totalSize += s.TotalSize
}

func (remote *Backend) CloseWriters(ctx context.Context) error {
for _, w := range remote.mu.writers {
_, err := w.Close(ctx)
if err != nil {
return err
}
}
return nil
}

func (remote *Backend) GetSummary() (min, max kv.Key, totalSize uint64) {
remote.mu.Lock()
defer remote.mu.Unlock()
Expand All @@ -383,7 +390,13 @@ func (remote *Backend) GetRangeSplitter(ctx context.Context, totalKVSize uint64,
return nil, err
}
// TODO(tangenta): determine the max key and max ways.
maxSize := totalKVSize / uint64(instanceCnt)
var approxSubtaskCnt uint64
if remote.config.SubtaskCnt == -1 {
approxSubtaskCnt = uint64(instanceCnt)
} else {
approxSubtaskCnt = uint64(remote.config.SubtaskCnt)
}
maxSize := totalKVSize / approxSubtaskCnt
rs := sharedisk.NewRangeSplitter(maxSize, math.MaxUint64, math.MaxUint64, mergePropIter, dataFiles)
return rs, nil
}
Expand Down
62 changes: 22 additions & 40 deletions br/pkg/lightning/backend/sharedisk/sharedisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -135,11 +134,6 @@ type Engine struct {
rc *RangePropertiesCollector
}

var WriteBatchSize = 8 * 1024
var MemQuota = 1024 * 1024 * 1024
var SizeDist = 1024 * 1024
var KeyDist = 8 * 1024

type WriterSummary struct {
WriterID int
Seq int
Expand All @@ -153,44 +147,38 @@ type OnCloseFunc func(summary *WriterSummary)
func DummyOnCloseFunc(*WriterSummary) {}

func NewWriter(ctx context.Context, externalStorage storage.ExternalStorage,
prefix string, writerID int, onClose OnCloseFunc) *Writer {
// TODO(tangenta): make it configurable.
engine := NewEngine(uint64(SizeDist), uint64(KeyDist))
pool := membuf.NewPool()
prefix string, writerID int, memSizeLimit uint64, keyDist int64, sizeDist uint64, writeBatchSize int64,
onClose OnCloseFunc) *Writer {
engine := NewEngine(sizeDist, uint64(keyDist))
filePrefix := filepath.Join(prefix, strconv.Itoa(writerID))
return &Writer{
ctx: ctx,
engine: engine,
memtableSizeLimit: MemQuota,
keyAdapter: &local.NoopKeyAdapter{},
exStorage: externalStorage,
memBufPool: pool,
kvBuffer: pool.NewBuffer(),
writeBatch: make([]common.KvPair, 0, WriteBatchSize),
currentSeq: 0,
tikvCodec: keyspace.CodecV1,
filenamePrefix: filePrefix,
writerID: writerID,
kvStore: nil,
onClose: onClose,
closed: false,
ctx: ctx,
engine: engine,
memSizeLimit: memSizeLimit,
exStorage: externalStorage,
writeBatch: make([]common.KvPair, 0, writeBatchSize),
currentSeq: 0,
tikvCodec: keyspace.CodecV1,
filenamePrefix: filePrefix,
writerID: writerID,
kvStore: nil,
onClose: onClose,
closed: false,
}
}

// Writer is used to write data into external storage.
type Writer struct {
ctx context.Context
sync.Mutex
engine *Engine
memtableSizeLimit int
keyAdapter local.KeyAdapter
exStorage storage.ExternalStorage
engine *Engine
memSizeLimit uint64
exStorage storage.ExternalStorage

// bytes buffer for writeBatch
memBufPool *membuf.Pool
kvBuffer *membuf.Buffer
writeBatch []common.KvPair
batchSize int
batchSize uint64

currentSeq int
onClose OnCloseFunc
Expand Down Expand Up @@ -224,14 +212,10 @@ func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows enco
w.Lock()
defer w.Unlock()

keyAdapter := w.keyAdapter
for _, pair := range kvs {
w.batchSize += len(pair.Key) + len(pair.Val)
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)
w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
if w.batchSize >= w.memtableSizeLimit {
w.batchSize += uint64(len(pair.Key) + len(pair.Val))
w.writeBatch = append(w.writeBatch, common.KvPair{Key: pair.Key, Val: pair.Val})
if w.batchSize >= w.memSizeLimit {
if err := w.flushKVs(ctx); err != nil {
return err
}
Expand All @@ -252,7 +236,6 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
logutil.BgLogger().Info("close writer", zap.Int("writerID", w.writerID),
zap.String("minKey", hex.EncodeToString(w.minKey)), zap.String("maxKey", hex.EncodeToString(w.maxKey)))
w.closed = true
defer w.memBufPool.Destroy()
err := w.flushKVs(ctx)
if err != nil {
return status(false), err
Expand Down Expand Up @@ -363,7 +346,6 @@ func (w *Writer) flushKVs(ctx context.Context) error {
w.recordMinMax(w.writeBatch[0].Key, w.writeBatch[len(w.writeBatch)-1].Key, size)

w.writeBatch = w.writeBatch[:0]
w.kvBuffer.Reset()
w.batchSize = 0
//w.engine.rc.reset()
return nil
Expand Down
24 changes: 15 additions & 9 deletions br/pkg/lightning/backend/sharedisk/sharedisk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ func TestWriter(t *testing.T) {
err = cleanupFiles(ctx, storage, "jobID/engineUUID")
require.NoError(t, err)

writer := NewWriter(context.Background(), storage, "jobID/engineUUID", 0, DummyOnCloseFunc)

pool := membuf.NewPool()
defer pool.Destroy()
writer.kvBuffer = pool.NewBuffer()
const (
memLimit uint64 = 64 * 1024 * 1024
sizeDist uint64 = 1024 * 1024
keyDist = 8 * 1024
writeBatchSize = 8 * 1024
)
writer := NewWriter(context.Background(), storage, "jobID/engineUUID", 0, memLimit,
keyDist, sizeDist, writeBatchSize, DummyOnCloseFunc)

var kvs []common.KvPair
value := make([]byte, 128)
Expand Down Expand Up @@ -157,8 +160,12 @@ func TestWriterPerf(t *testing.T) {
var valueSize = 1000
var rowCnt = 2000
var readBufferSize = 64 * 1024
//var memLimit = 64 * 1024 * 1024
//MemQuota = memLimit
const (
memLimit uint64 = 64 * 1024 * 1024
keyDist = 8 * 1024
sizeDist uint64 = 1024 * 1024
writeBatchSize = 8 * 1024
)

bucket := "globalsorttest"
prefix := "tools_test_data/sharedisk"
Expand All @@ -174,12 +181,11 @@ func TestWriterPerf(t *testing.T) {
storage, err := storage2.New(context.Background(), backend, &storage2.ExternalStorageOptions{})
require.NoError(t, err)

writer := NewWriter(context.Background(), storage, "test", 0, DummyOnCloseFunc)
writer := NewWriter(context.Background(), storage, "test", 0, memLimit, keyDist, sizeDist, writeBatchSize, DummyOnCloseFunc)
writer.filenamePrefix = "test"

pool := membuf.NewPool()
defer pool.Destroy()
writer.kvBuffer = pool.NewBuffer()
defer writer.Close(ctx)

var startMemory runtime.MemStats
Expand Down
4 changes: 4 additions & 0 deletions ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ func (b *backfillSchedulerHandle) OnSubtaskFinished(ctx context.Context, meta []
if err != nil {
return nil, err
}
err = bc.CloseWriters(ctx)
if err != nil {
return nil, err
}
subtaskMeta.MinKey, subtaskMeta.MaxKey, subtaskMeta.TotalKVSize = bc.GetSummary()
log.FromContext(ctx).Info("get key boundary on subtask finished",
zap.String("min", hex.EncodeToString(subtaskMeta.MinKey)),
Expand Down

0 comments on commit ee42d9c

Please sign in to comment.