diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 2bc61430fb881..999b8460fd9e3 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "//pkg/util/mathutil", "//pkg/util/size", "@com_github_cockroachdb_pebble//:pebble", + "@com_github_docker_go_units//:go-units", "@com_github_jfcg_sorty_v2//:sorty", "@com_github_pingcap_errors//:errors", "@org_golang_x_sync//errgroup", diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index b651e01b19c59..9612e8a3d11f5 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -19,10 +19,12 @@ import ( "context" "encoding/hex" "path/filepath" + "slices" "strconv" "sync" "time" + "github.com/docker/go-units" "github.com/jfcg/sorty/v2" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" @@ -385,12 +387,25 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { return err } - ts := time.Now() - savedBytes := w.batchSize - + var ( + savedBytes uint64 + statSize int + sortDuration, writeDuration time.Duration + writeStartTime time.Time + ) + savedBytes = w.batchSize startTs := time.Now() - var startTsForWrite time.Time + getSpeed := func(n uint64, dur float64, isBytes bool) string { + if dur == 0 { + return "-" + } + if isBytes { + return units.BytesSize(float64(n) / dur) + } + return units.HumanSize(float64(n) / dur) + } + kvCnt := len(w.writeBatch) defer func() { w.currentSeq++ err1, err2 := dataWriter.Close(ctx), statWriter.Close(ctx) @@ -407,31 +422,45 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { err = err2 return } + writeDuration = time.Since(writeStartTime) logger.Info("flush kv", - zap.Duration("time", time.Since(ts)), zap.Uint64("bytes", savedBytes), - zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds())) - metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(time.Since(startTsForWrite).Seconds()) - metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTsForWrite).Seconds()) + zap.Int("kv-cnt", kvCnt), + zap.Int("stat-size", statSize), + zap.Duration("sort-time", sortDuration), + zap.Duration("write-time", writeDuration), + zap.String("sort-speed(kv/s)", getSpeed(uint64(kvCnt), sortDuration.Seconds(), false)), + zap.String("write-speed(bytes/s)", getSpeed(savedBytes, writeDuration.Seconds(), true)), + zap.String("writer-id", w.writerID), + ) + metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(writeDuration.Seconds()) + metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / writeDuration.Seconds()) metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort_and_write").Observe(time.Since(startTs).Seconds()) metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort_and_write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds()) }() - sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter())) - sorty.Sort(len(w.writeBatch), func(i, j, r, s int) bool { - if bytes.Compare(w.writeBatch[i].Key, w.writeBatch[j].Key) < 0 { - if r != s { - w.writeBatch[r], w.writeBatch[s] = w.writeBatch[s], w.writeBatch[r] + sortStart := time.Now() + if w.shareMu != nil { + sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter())) + sorty.Sort(len(w.writeBatch), func(i, j, r, s int) bool { + if bytes.Compare(w.writeBatch[i].Key, w.writeBatch[j].Key) < 0 { + if r != s { + w.writeBatch[r], w.writeBatch[s] = w.writeBatch[s], w.writeBatch[r] + } + return true } - return true - } - return false - }) - - startTsForWrite = time.Now() - metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(time.Since(startTs).Seconds()) - metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds()) + return false + }) + } else { + slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) int { + return bytes.Compare(i.Key, j.Key) + }) + } + sortDuration = time.Since(sortStart) + writeStartTime = time.Now() + metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(sortDuration.Seconds()) + metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(savedBytes) / 1024.0 / 1024.0 / sortDuration.Seconds()) w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc) if err != nil { return err @@ -447,7 +476,9 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { } w.kvStore.Close() - _, err = statWriter.Write(ctx, w.rc.encode()) + encodedStat := w.rc.encode() + statSize = len(encodedStat) + _, err = statWriter.Write(ctx, encodedStat) if err != nil { return err } @@ -474,7 +505,6 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { w.writeBatch = w.writeBatch[:0] w.rc.reset() w.kvBuffer.Reset() - savedBytes = w.batchSize w.batchSize = 0 return nil } diff --git a/pkg/disttask/importinto/encode_and_sort_operator.go b/pkg/disttask/importinto/encode_and_sort_operator.go index 8d5b36e93d613..2a74a45821920 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator.go +++ b/pkg/disttask/importinto/encode_and_sort_operator.go @@ -158,7 +158,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemoryS builder := external.NewWriterBuilder(). SetOnCloseFunc(func(summary *external.WriterSummary) { op.sharedVars.mergeIndexSummary(indexID, summary) - }).SetMemorySizeLimit(indexMemorySizeLimit).SetMutex(&op.sharedVars.ShareMu) + }).SetMemorySizeLimit(indexMemorySizeLimit) prefix := subtaskPrefix(op.taskID, op.subtaskID) // writer id for index: index/{indexID}/{workerID} writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) @@ -168,7 +168,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemoryS // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} builder := external.NewWriterBuilder(). - SetOnCloseFunc(op.sharedVars.mergeDataSummary).SetMutex(&op.sharedVars.ShareMu) + SetOnCloseFunc(op.sharedVars.mergeDataSummary) prefix := subtaskPrefix(op.taskID, op.subtaskID) // writer id for data: data/{workerID} writerID := path.Join("data", workerUUID)