Skip to content

Commit

Permalink
importinto: use one writer for each kv group for all concurrent encod…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Sep 22, 2023
1 parent db4ebd7 commit 4e82952
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 65 deletions.
7 changes: 7 additions & 0 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,9 @@ func (w *Writer) createStorageWriter(ctx context.Context) (

// EngineWriter implements backend.EngineWriter interface.
type EngineWriter struct {
// Only 1 writer is used for some kv group(data or some index), no matter
// how many routines are encoding data, so need to sync write to it.
sync.Mutex
w *Writer
}

Expand All @@ -498,6 +501,8 @@ func NewEngineWriter(w *Writer) *EngineWriter {

// AppendRows implements backend.EngineWriter interface.
func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
e.Lock()
defer e.Unlock()
kvs := kv.Rows2KvPairs(rows)
if len(kvs) == 0 {
return nil
Expand All @@ -519,5 +524,7 @@ func (e *EngineWriter) IsSynced() bool {

// Close implements backend.EngineWriter interface.
func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
e.Lock()
defer e.Unlock()
return nil, e.w.Close(ctx)
}
129 changes: 76 additions & 53 deletions disttask/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"time"

"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
Expand Down Expand Up @@ -63,6 +64,9 @@ type encodeAndSortOperator struct {
sharedVars *SharedVars
logger *zap.Logger
errCh chan error

dataWriter *external.EngineWriter
indexWriter *importer.IndexRouteWriter
}

var _ operator.Operator = (*encodeAndSortOperator)(nil)
Expand All @@ -82,18 +86,64 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor,
logger: executor.logger,
errCh: make(chan error),
}

if op.tableImporter.IsGlobalSort() {
op.initWriters(executor, indexMemorySizeLimit)
}

pool := workerpool.NewWorkerPool(
"encodeAndSortOperator",
util.ImportInto,
int(executor.taskMeta.Plan.ThreadCnt),
func() workerpool.Worker[*importStepMinimalTask, workerpool.None] {
return newChunkWorker(ctx, op, indexMemorySizeLimit)
return newChunkWorker(subCtx, op)
},
)
op.AsyncOperator = operator.NewAsyncOperator(subCtx, pool)
return op
}

// with current design of global sort writer, we only create one writer for
// each kv group, and all chunks shares the same writers.
// the writer itself will sort and upload data concurrently.
func (op *encodeAndSortOperator) initWriters(executor *importStepExecutor, indexMemorySizeLimit uint64) {
totalDataKVMemSizeLimit := external.DefaultMemSizeLimit * uint64(executor.taskMeta.Plan.ThreadCnt)
totalMemSizeLimitPerIndexWriter := indexMemorySizeLimit * uint64(executor.taskMeta.Plan.ThreadCnt)
op.logger.Info("init global sort writer with mem limit",
zap.String("data-limit", units.BytesSize(float64(totalDataKVMemSizeLimit))),
zap.String("per-index-limit", units.BytesSize(float64(totalMemSizeLimitPerIndexWriter))))

// in case on network partition, 2 nodes might run the same subtask.
// so use uuid to make sure the path is unique.
workerUUID := uuid.New().String()
// sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID}
indexWriterFn := func(indexID int64) *external.Writer {
builder := external.NewWriterBuilder().
SetOnCloseFunc(func(summary *external.WriterSummary) {
op.sharedVars.mergeIndexSummary(indexID, summary)
}).SetMemorySizeLimit(totalMemSizeLimitPerIndexWriter).
SetMutex(&op.sharedVars.ShareMu)
prefix := subtaskPrefix(op.taskID, op.subtaskID)
// writer id for index: index/{indexID}/{workerID}
writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID)
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
return writer
}

// sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID}
builder := external.NewWriterBuilder().
SetOnCloseFunc(op.sharedVars.mergeDataSummary).
SetMemorySizeLimit(totalDataKVMemSizeLimit).
SetMutex(&op.sharedVars.ShareMu)
prefix := subtaskPrefix(op.taskID, op.subtaskID)
// writer id for data: data/{workerID}
writerID := path.Join("data", workerUUID)
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)

op.dataWriter = external.NewEngineWriter(writer)
op.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn)
}

func (op *encodeAndSortOperator) Open() error {
op.wg.Run(func() {
for err := range op.errCh {
Expand All @@ -114,9 +164,31 @@ func (op *encodeAndSortOperator) Close() error {
// right now AsyncOperator.Close always returns nil, ok to ignore it.
// nolint:errcheck
op.AsyncOperator.Close()

closeCtx := op.ctx
if closeCtx.Err() != nil {
// in case of context canceled, we need to create a new context to close writers.
newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration)
closeCtx = newCtx
defer cancel()
}
if op.dataWriter != nil {
// Note: we cannot ignore close error as we're writing to S3 or GCS.
// ignore error might cause data loss. below too.
if _, err := op.dataWriter.Close(closeCtx); err != nil {
op.onError(errors.Trace(err))
}
}
if op.indexWriter != nil {
if _, err := op.indexWriter.Close(closeCtx); err != nil {
op.onError(errors.Trace(err))
}
}

op.cancel()
close(op.errCh)
op.wg.Wait()

// see comments on interface definition, this Close is actually WaitAndClose.
return op.firstErr.Load()
}
Expand All @@ -140,43 +212,13 @@ func (op *encodeAndSortOperator) Done() <-chan struct{} {
type chunkWorker struct {
ctx context.Context
op *encodeAndSortOperator

dataWriter *external.EngineWriter
indexWriter *importer.IndexRouteWriter
}

func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemorySizeLimit uint64) *chunkWorker {
func newChunkWorker(ctx context.Context, op *encodeAndSortOperator) *chunkWorker {
w := &chunkWorker{
ctx: ctx,
op: op,
}
if op.tableImporter.IsGlobalSort() {
// in case on network partition, 2 nodes might run the same subtask.
workerUUID := uuid.New().String()
// sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID}
indexWriterFn := func(indexID int64) *external.Writer {
builder := external.NewWriterBuilder().
SetOnCloseFunc(func(summary *external.WriterSummary) {
op.sharedVars.mergeIndexSummary(indexID, summary)
}).SetMemorySizeLimit(indexMemorySizeLimit).SetMutex(&op.sharedVars.ShareMu)
prefix := subtaskPrefix(op.taskID, op.subtaskID)
// writer id for index: index/{indexID}/{workerID}
writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID)
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
return writer
}

// sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID}
builder := external.NewWriterBuilder().
SetOnCloseFunc(op.sharedVars.mergeDataSummary).SetMutex(&op.sharedVars.ShareMu)
prefix := subtaskPrefix(op.taskID, op.subtaskID)
// writer id for data: data/{workerID}
writerID := path.Join("data", workerUUID)
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
w.dataWriter = external.NewEngineWriter(writer)

w.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn)
}
return w
}

Expand All @@ -187,31 +229,12 @@ func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool.
// we don't use the input send function, it makes workflow more complex
// we send result to errCh and handle it here.
executor := newImportMinimalTaskExecutor(task)
if err := executor.Run(w.ctx, w.dataWriter, w.indexWriter); err != nil {
if err := executor.Run(w.ctx, w.op.dataWriter, w.op.indexWriter); err != nil {
w.op.onError(err)
}
}

func (w *chunkWorker) Close() {
closeCtx := w.ctx
if closeCtx.Err() != nil {
// in case of context canceled, we need to create a new context to close writers.
newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration)
closeCtx = newCtx
defer cancel()
}
if w.dataWriter != nil {
// Note: we cannot ignore close error as we're writing to S3 or GCS.
// ignore error might cause data loss. below too.
if _, err := w.dataWriter.Close(closeCtx); err != nil {
w.op.onError(errors.Trace(err))
}
}
if w.indexWriter != nil {
if _, err := w.indexWriter.Close(closeCtx); err != nil {
w.op.onError(errors.Trace(err))
}
}
func (*chunkWorker) Close() {
}

func subtaskPrefix(taskID, subtaskID int64) string {
Expand Down
11 changes: 8 additions & 3 deletions disttask/importinto/encode_and_sort_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,20 @@ func TestEncodeAndSortOperator(t *testing.T) {
tableImporter: &importer.TableImporter{
LoadDataController: &importer.LoadDataController{
Plan: &importer.Plan{
CloudStorageURI: "",
CloudStorageURI: "s3://test-bucket/test-path",
},
},
},
logger: logger,
}

sharedVars := &SharedVars{
SortedDataMeta: &external.SortedKVMeta{},
SortedIndexMetas: map[int64]*external.SortedKVMeta{},
}

source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3, 0)
op := newEncodeAndSortOperator(context.Background(), executorForParam, sharedVars, 3, 0)
op.SetSource(source)
require.NoError(t, op.Open())
require.Greater(t, len(op.String()), 0)
Expand All @@ -100,7 +105,7 @@ func TestEncodeAndSortOperator(t *testing.T) {
// cancel on error and log other errors
mockErr2 := errors.New("mock err 2")
source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2, 0)
op = newEncodeAndSortOperator(context.Background(), executorForParam, sharedVars, 2, 0)
op.SetSource(source)
executor1 := mock.NewMockMiniTaskExecutor(ctrl)
executor2 := mock.NewMockMiniTaskExecutor(ctrl)
Expand Down
2 changes: 1 addition & 1 deletion disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *importStepExecutor) Init(ctx context.Context) error {
}()
}
s.indexMemorySizeLimit = getWriterMemorySizeLimit(s.tableImporter.Plan)
s.logger.Info("index writer memory size limit",
s.logger.Info("memory size limit per index writer per concurrency",
zap.String("limit", units.BytesSize(float64(s.indexMemorySizeLimit))))
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_test(
name = "importer_test",
timeout = "short",
srcs = [
"chunk_process_test.go",
"import_test.go",
"job_test.go",
"precheck_test.go",
Expand All @@ -86,9 +87,10 @@ go_test(
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 16,
shard_count = 17,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/backend/external",
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//br/pkg/streamhelper",
Expand All @@ -104,6 +106,7 @@ go_test(
"//sessionctx/variable",
"//testkit",
"//types",
"//util",
"//util/dbterror/exeerrors",
"//util/etcd",
"//util/logutil",
Expand Down
54 changes: 47 additions & 7 deletions executor/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package importer
import (
"context"
"io"
"sync"
"time"

"github.com/docker/go-units"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/executor/asyncloaddata"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/syncutil"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -347,20 +349,60 @@ func (p *chunkProcessor) deliverLoop(ctx context.Context) error {
// writer will take 256MiB buffer on default.
// this will take a lot of memory, or even OOM.
type IndexRouteWriter struct {
writers map[int64]*external.Writer
// this writer and all wrappedWriters are shared by all deliver routines,
// so we need to synchronize them.
sync.RWMutex
writers map[int64]*wrappedWriter
logger *zap.Logger
writerFactory func(int64) *external.Writer
}

type wrappedWriter struct {
sync.Mutex
*external.Writer
}

func (w *wrappedWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error {
w.Lock()
defer w.Unlock()
return w.Writer.WriteRow(ctx, idxKey, idxVal, handle)
}

func (w *wrappedWriter) Close(ctx context.Context) error {
w.Lock()
defer w.Unlock()
return w.Writer.Close(ctx)
}

// NewIndexRouteWriter creates a new IndexRouteWriter.
func NewIndexRouteWriter(logger *zap.Logger, writerFactory func(int64) *external.Writer) *IndexRouteWriter {
return &IndexRouteWriter{
writers: make(map[int64]*external.Writer),
writers: make(map[int64]*wrappedWriter),
logger: logger,
writerFactory: writerFactory,
}
}

func (w *IndexRouteWriter) getWriter(indexID int64) *wrappedWriter {
w.RLock()
writer, ok := w.writers[indexID]
w.RUnlock()
if ok {
return writer
}

w.Lock()
defer w.Unlock()
writer, ok = w.writers[indexID]
if !ok {
writer = &wrappedWriter{
Writer: w.writerFactory(indexID),
}
w.writers[indexID] = writer
}
return writer
}

// AppendRows implements backend.EngineWriter interface.
func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
kvs := kv.Rows2KvPairs(rows)
Expand All @@ -372,11 +414,7 @@ func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows enco
if err != nil {
return errors.Trace(err)
}
writer, ok := w.writers[indexID]
if !ok {
writer = w.writerFactory(indexID)
w.writers[indexID] = writer
}
writer := w.getWriter(indexID)
if err = writer.WriteRow(ctx, item.Key, item.Val, nil); err != nil {
return errors.Trace(err)
}
Expand All @@ -392,6 +430,8 @@ func (*IndexRouteWriter) IsSynced() bool {
// Close implements backend.EngineWriter interface.
func (w *IndexRouteWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
var firstErr error
w.Lock()
defer w.Unlock()
for _, writer := range w.writers {
if err := writer.Close(ctx); err != nil {
if firstErr == nil {
Expand Down
Loading

0 comments on commit 4e82952

Please sign in to comment.