From 4e8295279868bb4e3262ac40fdfc774a9a033dbd Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 22 Sep 2023 18:55:44 +0800 Subject: [PATCH] importinto: use one writer for each kv group for all concurrent encoder (#47185) ref pingcap/tidb#46704 --- br/pkg/lightning/backend/external/writer.go | 7 + .../importinto/encode_and_sort_operator.go | 129 +++++++++++------- .../encode_and_sort_operator_test.go | 11 +- disttask/importinto/scheduler.go | 2 +- executor/importer/BUILD.bazel | 5 +- executor/importer/chunk_process.go | 54 +++++++- executor/importer/chunk_process_test.go | 54 ++++++++ .../importintotest4/global_sort_test.go | 28 ++++ 8 files changed, 225 insertions(+), 65 deletions(-) create mode 100644 executor/importer/chunk_process_test.go diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 401ecb6c8e479..bdffa723a3a6e 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -488,6 +488,9 @@ func (w *Writer) createStorageWriter(ctx context.Context) ( // EngineWriter implements backend.EngineWriter interface. type EngineWriter struct { + // Only 1 writer is used for some kv group(data or some index), no matter + // how many routines are encoding data, so need to sync write to it. + sync.Mutex w *Writer } @@ -498,6 +501,8 @@ func NewEngineWriter(w *Writer) *EngineWriter { // AppendRows implements backend.EngineWriter interface. func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { + e.Lock() + defer e.Unlock() kvs := kv.Rows2KvPairs(rows) if len(kvs) == 0 { return nil @@ -519,5 +524,7 @@ func (e *EngineWriter) IsSynced() bool { // Close implements backend.EngineWriter interface. func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + e.Lock() + defer e.Unlock() return nil, e.w.Close(ctx) } diff --git a/disttask/importinto/encode_and_sort_operator.go b/disttask/importinto/encode_and_sort_operator.go index df501f93267b0..e3a3f55b65ae0 100644 --- a/disttask/importinto/encode_and_sort_operator.go +++ b/disttask/importinto/encode_and_sort_operator.go @@ -20,6 +20,7 @@ import ( "strconv" "time" + "github.com/docker/go-units" "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/external" @@ -63,6 +64,9 @@ type encodeAndSortOperator struct { sharedVars *SharedVars logger *zap.Logger errCh chan error + + dataWriter *external.EngineWriter + indexWriter *importer.IndexRouteWriter } var _ operator.Operator = (*encodeAndSortOperator)(nil) @@ -82,18 +86,64 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, logger: executor.logger, errCh: make(chan error), } + + if op.tableImporter.IsGlobalSort() { + op.initWriters(executor, indexMemorySizeLimit) + } + pool := workerpool.NewWorkerPool( "encodeAndSortOperator", util.ImportInto, int(executor.taskMeta.Plan.ThreadCnt), func() workerpool.Worker[*importStepMinimalTask, workerpool.None] { - return newChunkWorker(ctx, op, indexMemorySizeLimit) + return newChunkWorker(subCtx, op) }, ) op.AsyncOperator = operator.NewAsyncOperator(subCtx, pool) return op } +// with current design of global sort writer, we only create one writer for +// each kv group, and all chunks shares the same writers. +// the writer itself will sort and upload data concurrently. +func (op *encodeAndSortOperator) initWriters(executor *importStepExecutor, indexMemorySizeLimit uint64) { + totalDataKVMemSizeLimit := external.DefaultMemSizeLimit * uint64(executor.taskMeta.Plan.ThreadCnt) + totalMemSizeLimitPerIndexWriter := indexMemorySizeLimit * uint64(executor.taskMeta.Plan.ThreadCnt) + op.logger.Info("init global sort writer with mem limit", + zap.String("data-limit", units.BytesSize(float64(totalDataKVMemSizeLimit))), + zap.String("per-index-limit", units.BytesSize(float64(totalMemSizeLimitPerIndexWriter)))) + + // in case on network partition, 2 nodes might run the same subtask. + // so use uuid to make sure the path is unique. + workerUUID := uuid.New().String() + // sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID} + indexWriterFn := func(indexID int64) *external.Writer { + builder := external.NewWriterBuilder(). + SetOnCloseFunc(func(summary *external.WriterSummary) { + op.sharedVars.mergeIndexSummary(indexID, summary) + }).SetMemorySizeLimit(totalMemSizeLimitPerIndexWriter). + SetMutex(&op.sharedVars.ShareMu) + prefix := subtaskPrefix(op.taskID, op.subtaskID) + // writer id for index: index/{indexID}/{workerID} + writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) + writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) + return writer + } + + // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} + builder := external.NewWriterBuilder(). + SetOnCloseFunc(op.sharedVars.mergeDataSummary). + SetMemorySizeLimit(totalDataKVMemSizeLimit). + SetMutex(&op.sharedVars.ShareMu) + prefix := subtaskPrefix(op.taskID, op.subtaskID) + // writer id for data: data/{workerID} + writerID := path.Join("data", workerUUID) + writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) + + op.dataWriter = external.NewEngineWriter(writer) + op.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn) +} + func (op *encodeAndSortOperator) Open() error { op.wg.Run(func() { for err := range op.errCh { @@ -114,9 +164,31 @@ func (op *encodeAndSortOperator) Close() error { // right now AsyncOperator.Close always returns nil, ok to ignore it. // nolint:errcheck op.AsyncOperator.Close() + + closeCtx := op.ctx + if closeCtx.Err() != nil { + // in case of context canceled, we need to create a new context to close writers. + newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration) + closeCtx = newCtx + defer cancel() + } + if op.dataWriter != nil { + // Note: we cannot ignore close error as we're writing to S3 or GCS. + // ignore error might cause data loss. below too. + if _, err := op.dataWriter.Close(closeCtx); err != nil { + op.onError(errors.Trace(err)) + } + } + if op.indexWriter != nil { + if _, err := op.indexWriter.Close(closeCtx); err != nil { + op.onError(errors.Trace(err)) + } + } + op.cancel() close(op.errCh) op.wg.Wait() + // see comments on interface definition, this Close is actually WaitAndClose. return op.firstErr.Load() } @@ -140,43 +212,13 @@ func (op *encodeAndSortOperator) Done() <-chan struct{} { type chunkWorker struct { ctx context.Context op *encodeAndSortOperator - - dataWriter *external.EngineWriter - indexWriter *importer.IndexRouteWriter } -func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemorySizeLimit uint64) *chunkWorker { +func newChunkWorker(ctx context.Context, op *encodeAndSortOperator) *chunkWorker { w := &chunkWorker{ ctx: ctx, op: op, } - if op.tableImporter.IsGlobalSort() { - // in case on network partition, 2 nodes might run the same subtask. - workerUUID := uuid.New().String() - // sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID} - indexWriterFn := func(indexID int64) *external.Writer { - builder := external.NewWriterBuilder(). - SetOnCloseFunc(func(summary *external.WriterSummary) { - op.sharedVars.mergeIndexSummary(indexID, summary) - }).SetMemorySizeLimit(indexMemorySizeLimit).SetMutex(&op.sharedVars.ShareMu) - prefix := subtaskPrefix(op.taskID, op.subtaskID) - // writer id for index: index/{indexID}/{workerID} - writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) - writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) - return writer - } - - // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} - builder := external.NewWriterBuilder(). - SetOnCloseFunc(op.sharedVars.mergeDataSummary).SetMutex(&op.sharedVars.ShareMu) - prefix := subtaskPrefix(op.taskID, op.subtaskID) - // writer id for data: data/{workerID} - writerID := path.Join("data", workerUUID) - writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) - w.dataWriter = external.NewEngineWriter(writer) - - w.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn) - } return w } @@ -187,31 +229,12 @@ func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool. // we don't use the input send function, it makes workflow more complex // we send result to errCh and handle it here. executor := newImportMinimalTaskExecutor(task) - if err := executor.Run(w.ctx, w.dataWriter, w.indexWriter); err != nil { + if err := executor.Run(w.ctx, w.op.dataWriter, w.op.indexWriter); err != nil { w.op.onError(err) } } -func (w *chunkWorker) Close() { - closeCtx := w.ctx - if closeCtx.Err() != nil { - // in case of context canceled, we need to create a new context to close writers. - newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration) - closeCtx = newCtx - defer cancel() - } - if w.dataWriter != nil { - // Note: we cannot ignore close error as we're writing to S3 or GCS. - // ignore error might cause data loss. below too. - if _, err := w.dataWriter.Close(closeCtx); err != nil { - w.op.onError(errors.Trace(err)) - } - } - if w.indexWriter != nil { - if _, err := w.indexWriter.Close(closeCtx); err != nil { - w.op.onError(errors.Trace(err)) - } - } +func (*chunkWorker) Close() { } func subtaskPrefix(taskID, subtaskID int64) string { diff --git a/disttask/importinto/encode_and_sort_operator_test.go b/disttask/importinto/encode_and_sort_operator_test.go index 3aa2ee0377732..ac6eed53c7492 100644 --- a/disttask/importinto/encode_and_sort_operator_test.go +++ b/disttask/importinto/encode_and_sort_operator_test.go @@ -72,15 +72,20 @@ func TestEncodeAndSortOperator(t *testing.T) { tableImporter: &importer.TableImporter{ LoadDataController: &importer.LoadDataController{ Plan: &importer.Plan{ - CloudStorageURI: "", + CloudStorageURI: "s3://test-bucket/test-path", }, }, }, logger: logger, } + sharedVars := &SharedVars{ + SortedDataMeta: &external.SortedKVMeta{}, + SortedIndexMetas: map[int64]*external.SortedKVMeta{}, + } + source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3, 0) + op := newEncodeAndSortOperator(context.Background(), executorForParam, sharedVars, 3, 0) op.SetSource(source) require.NoError(t, op.Open()) require.Greater(t, len(op.String()), 0) @@ -100,7 +105,7 @@ func TestEncodeAndSortOperator(t *testing.T) { // cancel on error and log other errors mockErr2 := errors.New("mock err 2") source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2, 0) + op = newEncodeAndSortOperator(context.Background(), executorForParam, sharedVars, 2, 0) op.SetSource(source) executor1 := mock.NewMockMiniTaskExecutor(ctrl) executor2 := mock.NewMockMiniTaskExecutor(ctrl) diff --git a/disttask/importinto/scheduler.go b/disttask/importinto/scheduler.go index 0ba3eda154ad6..136f5a1a7e359 100644 --- a/disttask/importinto/scheduler.go +++ b/disttask/importinto/scheduler.go @@ -107,7 +107,7 @@ func (s *importStepExecutor) Init(ctx context.Context) error { }() } s.indexMemorySizeLimit = getWriterMemorySizeLimit(s.tableImporter.Plan) - s.logger.Info("index writer memory size limit", + s.logger.Info("memory size limit per index writer per concurrency", zap.String("limit", units.BytesSize(float64(s.indexMemorySizeLimit)))) return nil } diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 7e1d693ebf0f8..875a3b45459de 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -78,6 +78,7 @@ go_test( name = "importer_test", timeout = "short", srcs = [ + "chunk_process_test.go", "import_test.go", "job_test.go", "precheck_test.go", @@ -86,9 +87,10 @@ go_test( embed = [":importer"], flaky = True, race = "on", - shard_count = 16, + shard_count = 17, deps = [ "//br/pkg/errors", + "//br/pkg/lightning/backend/external", "//br/pkg/lightning/config", "//br/pkg/lightning/mydump", "//br/pkg/streamhelper", @@ -104,6 +106,7 @@ go_test( "//sessionctx/variable", "//testkit", "//types", + "//util", "//util/dbterror/exeerrors", "//util/etcd", "//util/logutil", diff --git a/executor/importer/chunk_process.go b/executor/importer/chunk_process.go index de28449d4933b..f435604bb0e6a 100644 --- a/executor/importer/chunk_process.go +++ b/executor/importer/chunk_process.go @@ -17,6 +17,7 @@ package importer import ( "context" "io" + "sync" "time" "github.com/docker/go-units" @@ -32,6 +33,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/mydump" verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/executor/asyncloaddata" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/syncutil" "github.com/tikv/client-go/v2/tikv" @@ -347,20 +349,60 @@ func (p *chunkProcessor) deliverLoop(ctx context.Context) error { // writer will take 256MiB buffer on default. // this will take a lot of memory, or even OOM. type IndexRouteWriter struct { - writers map[int64]*external.Writer + // this writer and all wrappedWriters are shared by all deliver routines, + // so we need to synchronize them. + sync.RWMutex + writers map[int64]*wrappedWriter logger *zap.Logger writerFactory func(int64) *external.Writer } +type wrappedWriter struct { + sync.Mutex + *external.Writer +} + +func (w *wrappedWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error { + w.Lock() + defer w.Unlock() + return w.Writer.WriteRow(ctx, idxKey, idxVal, handle) +} + +func (w *wrappedWriter) Close(ctx context.Context) error { + w.Lock() + defer w.Unlock() + return w.Writer.Close(ctx) +} + // NewIndexRouteWriter creates a new IndexRouteWriter. func NewIndexRouteWriter(logger *zap.Logger, writerFactory func(int64) *external.Writer) *IndexRouteWriter { return &IndexRouteWriter{ - writers: make(map[int64]*external.Writer), + writers: make(map[int64]*wrappedWriter), logger: logger, writerFactory: writerFactory, } } +func (w *IndexRouteWriter) getWriter(indexID int64) *wrappedWriter { + w.RLock() + writer, ok := w.writers[indexID] + w.RUnlock() + if ok { + return writer + } + + w.Lock() + defer w.Unlock() + writer, ok = w.writers[indexID] + if !ok { + writer = &wrappedWriter{ + Writer: w.writerFactory(indexID), + } + w.writers[indexID] = writer + } + return writer +} + // AppendRows implements backend.EngineWriter interface. func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { kvs := kv.Rows2KvPairs(rows) @@ -372,11 +414,7 @@ func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows enco if err != nil { return errors.Trace(err) } - writer, ok := w.writers[indexID] - if !ok { - writer = w.writerFactory(indexID) - w.writers[indexID] = writer - } + writer := w.getWriter(indexID) if err = writer.WriteRow(ctx, item.Key, item.Val, nil); err != nil { return errors.Trace(err) } @@ -392,6 +430,8 @@ func (*IndexRouteWriter) IsSynced() bool { // Close implements backend.EngineWriter interface. func (w *IndexRouteWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { var firstErr error + w.Lock() + defer w.Unlock() for _, writer := range w.writers { if err := writer.Close(ctx); err != nil { if firstErr == nil { diff --git a/executor/importer/chunk_process_test.go b/executor/importer/chunk_process_test.go new file mode 100644 index 0000000000000..ccf95a484a973 --- /dev/null +++ b/executor/importer/chunk_process_test.go @@ -0,0 +1,54 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer + +import ( + "math/rand" + "testing" + "time" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" + "github.com/pingcap/tidb/util" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestIndexRouteWriter(t *testing.T) { + logger := zap.NewExample() + routeWriter := NewIndexRouteWriter(logger, func(i int64) *external.Writer { + return external.NewWriterBuilder().Build(nil, "", "") + }) + wg := util.WaitGroupWrapper{} + for i := 0; i < 10; i++ { + idx := i + wg.Run(func() { + seed := time.Now().Unix() + logger.Info("seed", zap.Int("idx", idx), zap.Int64("seed", seed)) + r := rand.New(rand.NewSource(seed)) + gotWriters := make(map[int64]*wrappedWriter) + for i := 0; i < 3000; i++ { + indexID := int64(r.Int()) % 100 + writer := routeWriter.getWriter(indexID) + require.NotNil(t, writer) + if got, ok := gotWriters[indexID]; ok { + require.Equal(t, got, writer) + } else { + gotWriters[indexID] = writer + } + } + }) + } + wg.Wait() +} diff --git a/tests/realtikvtest/importintotest4/global_sort_test.go b/tests/realtikvtest/importintotest4/global_sort_test.go index 606ef8dbcef2c..6b173fc0a7604 100644 --- a/tests/realtikvtest/importintotest4/global_sort_test.go +++ b/tests/realtikvtest/importintotest4/global_sort_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "net/url" + "slices" "strconv" "testing" "time" @@ -115,3 +116,30 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { s.NoError(err) s.Len(files, 0) } + +func (s *mockGCSSuite) TestGlobalSortMultiFiles() { + var allData []string + for i := 0; i < 10; i++ { + var content []byte + keyCnt := 1000 + for j := 0; j < keyCnt; j++ { + idx := i*keyCnt + j + content = append(content, []byte(fmt.Sprintf("%d,test-%d\n", idx, idx))...) + allData = append(allData, fmt.Sprintf("%d test-%d", idx, idx)) + } + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gs-multi-files", Name: fmt.Sprintf("t.%d.csv", i)}, + Content: content, + }) + } + slices.Sort(allData) + s.prepareAndUseDB("gs_multi_files") + s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "sorted"}) + s.tk.MustExec("create table t (a bigint primary key , b varchar(100), key(b), key(a,b), key(b,a));") + // 1 subtask, encoding 10 files using 4 threads. + sortStorageURI := fmt.Sprintf("gs://sorted/gs_multi_files?endpoint=%s", gcsEndpoint) + importSQL := fmt.Sprintf(`import into t FROM 'gs://gs-multi-files/t.*.csv?endpoint=%s' + with thread=4, cloud_storage_uri='%s'`, gcsEndpoint, sortStorageURI) + s.tk.MustQuery(importSQL) + s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows(allData...)) +}