From 9f2f0063d3908273672197fefa60b6b3aee407d7 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 16 Oct 2023 17:33:04 +0800 Subject: [PATCH 1/3] log and use single thread sort --- br/pkg/lightning/backend/external/writer.go | 61 ++++++++++++++----- .../importinto/encode_and_sort_operator.go | 4 +- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index fc324bb9ee482..bf7ba9c2abbd3 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -19,10 +19,12 @@ import ( "context" "encoding/hex" "path/filepath" + "slices" "strconv" "sync" "time" + "github.com/docker/go-units" "github.com/jfcg/sorty/v2" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" @@ -384,9 +386,23 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { return err } - ts := time.Now() - var savedBytes uint64 + var ( + savedBytes uint64 + statSize int + sortDuration, writeDuration time.Duration + writeStartTime time.Time + ) + getSpeed := func(n uint64, dur float64, isBytes bool) string { + if dur == 0 { + return "-" + } + if isBytes { + return units.BytesSize(float64(n) / dur) + } + return units.HumanSize(float64(n) / dur) + } + kvCnt := len(w.writeBatch) defer func() { w.currentSeq++ err1, err2 := dataWriter.Close(ctx), statWriter.Close(ctx) @@ -403,23 +419,38 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { err = err2 return } + writeDuration = time.Since(writeStartTime) logger.Info("flush kv", - zap.Duration("time", time.Since(ts)), zap.Uint64("bytes", savedBytes), - zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds())) + zap.Int("kv-cnt", kvCnt), + zap.Int("stat-size", statSize), + zap.Duration("sort-time", sortDuration), + zap.Duration("write-time", writeDuration), + zap.String("sort-speed(kv/s)", getSpeed(uint64(kvCnt), sortDuration.Seconds(), false)), + zap.String("write-speed(bytes/s)", getSpeed(savedBytes, writeDuration.Seconds(), true)), + ) }() - sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter())) - sorty.Sort(len(w.writeBatch), func(i, j, r, s int) bool { - if bytes.Compare(w.writeBatch[i].Key, w.writeBatch[j].Key) < 0 { - if r != s { - w.writeBatch[r], w.writeBatch[s] = w.writeBatch[s], w.writeBatch[r] + sortStart := time.Now() + if w.shareMu != nil { + sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter())) + sorty.Sort(len(w.writeBatch), func(i, j, r, s int) bool { + if bytes.Compare(w.writeBatch[i].Key, w.writeBatch[j].Key) < 0 { + if r != s { + w.writeBatch[r], w.writeBatch[s] = w.writeBatch[s], w.writeBatch[r] + } + return true } - return true - } - return false - }) + return false + }) + } else { + slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) int { + return bytes.Compare(i.Key, j.Key) + }) + } + sortDuration = time.Since(sortStart) + writeStartTime = time.Now() w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc) if err != nil { return err @@ -435,7 +466,9 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { } w.kvStore.Close() - _, err = statWriter.Write(ctx, w.rc.encode()) + encodedStat := w.rc.encode() + statSize = len(encodedStat) + _, err = statWriter.Write(ctx, encodedStat) if err != nil { return err } diff --git a/pkg/disttask/importinto/encode_and_sort_operator.go b/pkg/disttask/importinto/encode_and_sort_operator.go index 8d5b36e93d613..2a74a45821920 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator.go +++ b/pkg/disttask/importinto/encode_and_sort_operator.go @@ -158,7 +158,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemoryS builder := external.NewWriterBuilder(). SetOnCloseFunc(func(summary *external.WriterSummary) { op.sharedVars.mergeIndexSummary(indexID, summary) - }).SetMemorySizeLimit(indexMemorySizeLimit).SetMutex(&op.sharedVars.ShareMu) + }).SetMemorySizeLimit(indexMemorySizeLimit) prefix := subtaskPrefix(op.taskID, op.subtaskID) // writer id for index: index/{indexID}/{workerID} writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) @@ -168,7 +168,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemoryS // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} builder := external.NewWriterBuilder(). - SetOnCloseFunc(op.sharedVars.mergeDataSummary).SetMutex(&op.sharedVars.ShareMu) + SetOnCloseFunc(op.sharedVars.mergeDataSummary) prefix := subtaskPrefix(op.taskID, op.subtaskID) // writer id for data: data/{workerID} writerID := path.Join("data", workerUUID) From fde96b1a0ceee829b44cae403d05f7f4c2dee531 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 16 Oct 2023 19:13:07 +0800 Subject: [PATCH 2/3] log writer id --- br/pkg/lightning/backend/external/writer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index bf7ba9c2abbd3..cdbffb2ade083 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -428,6 +428,7 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { zap.Duration("write-time", writeDuration), zap.String("sort-speed(kv/s)", getSpeed(uint64(kvCnt), sortDuration.Seconds(), false)), zap.String("write-speed(bytes/s)", getSpeed(savedBytes, writeDuration.Seconds(), true)), + zap.String("writerID", w.writerID), ) }() From d9426e2d2f4aa8f3350a476ec66a05e4f97dd486 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 17 Oct 2023 00:31:50 +0800 Subject: [PATCH 3/3] change --- br/pkg/lightning/backend/external/BUILD.bazel | 1 + br/pkg/lightning/backend/external/writer.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 07e7bd9747b9b..c5351b13752f6 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//pkg/util/mathutil", "//pkg/util/size", "@com_github_cockroachdb_pebble//:pebble", + "@com_github_docker_go_units//:go-units", "@com_github_jfcg_sorty_v2//:sorty", "@com_github_pingcap_errors//:errors", "@org_golang_x_sync//errgroup", diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index cdbffb2ade083..002e169191c96 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -428,7 +428,7 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { zap.Duration("write-time", writeDuration), zap.String("sort-speed(kv/s)", getSpeed(uint64(kvCnt), sortDuration.Seconds(), false)), zap.String("write-speed(bytes/s)", getSpeed(savedBytes, writeDuration.Seconds(), true)), - zap.String("writerID", w.writerID), + zap.String("writer-id", w.writerID), ) }()