Skip to content

Commit

Permalink
importinto: add more logs to external backend, disable parallel sort (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Oct 17, 2023
1 parent 91a8023 commit 1978479
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 25 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/util/mathutil",
"//pkg/util/size",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_docker_go_units//:go-units",
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_sync//errgroup",
Expand Down
76 changes: 53 additions & 23 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"context"
"encoding/hex"
"path/filepath"
"slices"
"strconv"
"sync"
"time"

"github.com/docker/go-units"
"github.com/jfcg/sorty/v2"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
Expand Down Expand Up @@ -385,12 +387,25 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
return err
}

ts := time.Now()
savedBytes := w.batchSize

var (
savedBytes uint64
statSize int
sortDuration, writeDuration time.Duration
writeStartTime time.Time
)
savedBytes = w.batchSize
startTs := time.Now()
var startTsForWrite time.Time

getSpeed := func(n uint64, dur float64, isBytes bool) string {
if dur == 0 {
return "-"
}
if isBytes {
return units.BytesSize(float64(n) / dur)
}
return units.HumanSize(float64(n) / dur)
}
kvCnt := len(w.writeBatch)
defer func() {
w.currentSeq++
err1, err2 := dataWriter.Close(ctx), statWriter.Close(ctx)
Expand All @@ -407,31 +422,45 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
err = err2
return
}
writeDuration = time.Since(writeStartTime)
logger.Info("flush kv",
zap.Duration("time", time.Since(ts)),
zap.Uint64("bytes", savedBytes),
zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds()))
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(time.Since(startTsForWrite).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTsForWrite).Seconds())
zap.Int("kv-cnt", kvCnt),
zap.Int("stat-size", statSize),
zap.Duration("sort-time", sortDuration),
zap.Duration("write-time", writeDuration),
zap.String("sort-speed(kv/s)", getSpeed(uint64(kvCnt), sortDuration.Seconds(), false)),
zap.String("write-speed(bytes/s)", getSpeed(savedBytes, writeDuration.Seconds(), true)),
zap.String("writer-id", w.writerID),
)
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(writeDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / writeDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort_and_write").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort_and_write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
}()

sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter()))
sorty.Sort(len(w.writeBatch), func(i, j, r, s int) bool {
if bytes.Compare(w.writeBatch[i].Key, w.writeBatch[j].Key) < 0 {
if r != s {
w.writeBatch[r], w.writeBatch[s] = w.writeBatch[s], w.writeBatch[r]
sortStart := time.Now()
if w.shareMu != nil {
sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter()))
sorty.Sort(len(w.writeBatch), func(i, j, r, s int) bool {
if bytes.Compare(w.writeBatch[i].Key, w.writeBatch[j].Key) < 0 {
if r != s {
w.writeBatch[r], w.writeBatch[s] = w.writeBatch[s], w.writeBatch[r]
}
return true
}
return true
}
return false
})

startTsForWrite = time.Now()
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
return false
})
} else {
slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})
}
sortDuration = time.Since(sortStart)

writeStartTime = time.Now()
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(sortDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(savedBytes) / 1024.0 / 1024.0 / sortDuration.Seconds())
w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc)
if err != nil {
return err
Expand All @@ -447,7 +476,9 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
}

w.kvStore.Close()
_, err = statWriter.Write(ctx, w.rc.encode())
encodedStat := w.rc.encode()
statSize = len(encodedStat)
_, err = statWriter.Write(ctx, encodedStat)
if err != nil {
return err
}
Expand All @@ -474,7 +505,6 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
w.writeBatch = w.writeBatch[:0]
w.rc.reset()
w.kvBuffer.Reset()
savedBytes = w.batchSize
w.batchSize = 0
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/disttask/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemoryS
builder := external.NewWriterBuilder().
SetOnCloseFunc(func(summary *external.WriterSummary) {
op.sharedVars.mergeIndexSummary(indexID, summary)
}).SetMemorySizeLimit(indexMemorySizeLimit).SetMutex(&op.sharedVars.ShareMu)
}).SetMemorySizeLimit(indexMemorySizeLimit)
prefix := subtaskPrefix(op.taskID, op.subtaskID)
// writer id for index: index/{indexID}/{workerID}
writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID)
Expand All @@ -168,7 +168,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemoryS

// sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID}
builder := external.NewWriterBuilder().
SetOnCloseFunc(op.sharedVars.mergeDataSummary).SetMutex(&op.sharedVars.ShareMu)
SetOnCloseFunc(op.sharedVars.mergeDataSummary)
prefix := subtaskPrefix(op.taskID, op.subtaskID)
// writer id for data: data/{workerID}
writerID := path.Join("data", workerUUID)
Expand Down

0 comments on commit 1978479

Please sign in to comment.