Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add global sort related metric and fix cancel add index #47485

Merged
merged 18 commits into from
Oct 17, 2023
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//br/pkg/membuf",
"//br/pkg/storage",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx/variable",
"//pkg/util/hack",
"//pkg/util/logutil",
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -197,12 +198,13 @@ func (e *Engine) loadIngestData(
hex.EncodeToString(start))
}

now := time.Now()
startTs := time.Now()
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
memBuf := e.bufPool.NewBuffer()
cnt := 0
size := 0
totalSize := 0
largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize)
ret := make([]common.DataAndRange, 0, 1)
curStart := start
Expand All @@ -215,6 +217,7 @@ func (e *Engine) loadIngestData(
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
totalSize += len(k) + len(v)
}

for iter.Next() {
Expand All @@ -241,13 +244,16 @@ func (e *Engine) loadIngestData(
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
totalSize += len(k) + len(v)
}
if iter.Error() != nil {
return nil, errors.Trace(iter.Error())
}

metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort").Observe(float64(totalSize) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for hot path we should cache the result of WithLabelValues to avoid the inner lookup.

metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_and_sort").Observe(time.Since(startTs).Seconds())
logutil.Logger(ctx).Info("load data from external storage",
zap.Duration("cost time", time.Since(now)),
zap.Duration("cost time", time.Since(startTs)),
zap.Int("iterated count", cnt))
ret = append(ret, common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Expand Down
14 changes: 13 additions & 1 deletion br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
Expand Down Expand Up @@ -385,7 +386,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
}

ts := time.Now()
var savedBytes uint64
savedBytes := w.batchSize

startTs := time.Now()
var startTsForWrite time.Time

defer func() {
w.currentSeq++
Expand All @@ -407,6 +411,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
zap.Duration("time", time.Since(ts)),
zap.Uint64("bytes", savedBytes),
zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds()))
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(time.Since(startTsForWrite).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTsForWrite).Seconds())
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort_and_write").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort_and_write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
}()

sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter()))
Expand All @@ -420,6 +428,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
return false
})

startTsForWrite = time.Now()
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())

w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//pkg/distsql",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/sessionctx/variable",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -1322,6 +1323,7 @@ func (local *Backend) startWorker(
jobInCh, jobOutCh chan *regionJob,
jobWg *sync.WaitGroup,
) error {
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Set(0)
for {
select {
case <-ctx.Done():
Expand All @@ -1333,7 +1335,9 @@ func (local *Backend) startWorker(
return nil
}

metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Inc()
err := local.executeJob(ctx, job)
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Dec()
switch job.stage {
case regionScanned, wrote, ingested:
jobOutCh <- job
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
util2 "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -442,7 +443,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
for !done {
srcChk := w.getChunk()
done, err = fetchTableScanResult(w.ctx, w.copCtx.GetBase(), rs, srcChk)
if err != nil {
if err != nil || util2.IsContextDone(w.ctx) {
w.recycleChunk(srcChk)
terror.Call(rs.Close)
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/hex"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -146,6 +147,7 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
p.checkpointMgr.Register(task.id, task.endKey)
}
var done bool
startTime := time.Now()
for !done {
srcChk := p.getChunk()
done, err = fetchTableScanResult(p.ctx, p.copCtx.GetBase(), rs, srcChk)
Expand All @@ -158,10 +160,13 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done)
}
idxRs := IndexRecordChunk{ID: task.id, Chunk: srcChk, Done: done}
rate := float64(srcChk.MemoryUsage()) / 1024.0 / 1024.0 / time.Since(startTime).Seconds()
metrics.AddIndexScanRate.WithLabelValues(metrics.LblAddIndex).Observe(rate)
failpoint.Inject("mockCopSenderError", func() {
idxRs.Err = errors.New("mock cop error")
})
p.chunkSender.AddTask(idxRs)
startTime = time.Now()
}
terror.Call(rs.Close)
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"domain.go",
"executor.go",
"gc_worker.go",
"globalsort.go",
"import.go",
"log_backup.go",
"meta.go",
Expand Down
9 changes: 9 additions & 0 deletions pkg/metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
BackfillProgressGauge *prometheus.GaugeVec
DDLJobTableDuration *prometheus.HistogramVec
DDLRunningJobCount *prometheus.GaugeVec
AddIndexScanRate *prometheus.HistogramVec
)

// InitDDLMetrics initializes defines DDL metrics.
Expand Down Expand Up @@ -165,6 +166,14 @@ func InitDDLMetrics() {
Name: "running_job_count",
Help: "Running DDL jobs count",
}, []string{LblType})

AddIndexScanRate = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "ddl",
Name: "scan_rate",
Help: "scan rate",
Buckets: prometheus.ExponentialBuckets(0.05, 2, 20),
}, []string{LblType})
}

// Label constants.
Expand Down
72 changes: 72 additions & 0 deletions pkg/metrics/globalsort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import "github.com/prometheus/client_golang/prometheus"

var (
// GlobalSortWriteToCloudStorageDuration records the duration of writing to cloud storage.
GlobalSortWriteToCloudStorageDuration *prometheus.HistogramVec
// GlobalSortWriteToCloudStorageRate records the rate of writing to cloud storage.
GlobalSortWriteToCloudStorageRate *prometheus.HistogramVec
// GlobalSortReadFromCloudStorageDuration records the duration of reading from cloud storage.
GlobalSortReadFromCloudStorageDuration *prometheus.HistogramVec
// GlobalSortReadFromCloudStorageRate records the rate of reading from cloud storage.
GlobalSortReadFromCloudStorageRate *prometheus.HistogramVec
// GlobalSortIngestWorkerCnt records the working number of ingest workers.
GlobalSortIngestWorkerCnt *prometheus.GaugeVec
)

// InitGlobalSortMetrics initializes defines global sort metrics.
func InitGlobalSortMetrics() {
GlobalSortWriteToCloudStorageDuration = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "write_to_cloud_storage_duration",
Help: "write to cloud storage duration",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s
}, []string{LblType})

GlobalSortWriteToCloudStorageRate = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "write_to_cloud_storage_rate",
Help: "write to cloud storage rate",
Buckets: prometheus.ExponentialBuckets(0.05, 2, 20),
}, []string{LblType})

GlobalSortReadFromCloudStorageDuration = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "read_from_cloud_storage_duration",
Help: "read from cloud storage duration",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
}, []string{LblType})

GlobalSortReadFromCloudStorageRate = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "read_from_cloud_storage_rate",
Help: "read from cloud storage rate",
Buckets: prometheus.ExponentialBuckets(0.05, 2, 20),
}, []string{LblType})

GlobalSortIngestWorkerCnt = NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "ingest_worker_cnt",
Help: "ingest worker cnt",
}, []string{LblType})
}
Loading
Loading