Skip to content

Commit

Permalink
ddl: dynamically adjusting the concurrency and batch size of reorgani…
Browse files Browse the repository at this point in the history
…zation job (#57468)

ref #57229
  • Loading branch information
fzzf678 authored Nov 21, 2024
1 parent fe1b9ed commit 2f8d1f6
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 74 deletions.
98 changes: 86 additions & 12 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,14 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
})

// Change the batch size dynamically.
newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
w.GetCtx().batchCnt = newBatchCnt
currentBatchCnt := w.GetCtx().batchCnt
targetBatchSize := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
if targetBatchSize != currentBatchCnt {
w.GetCtx().batchCnt = targetBatchSize
logger.Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int("current batch size", w.GetCtx().batchCnt))
}
result := w.handleBackfillTask(d, task, bf)
w.sendResult(result)

Expand Down Expand Up @@ -770,7 +776,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe)
err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
Expand All @@ -787,11 +793,59 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline) error {
func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) {
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
if opR == nil || opW == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
return
}
reader, readerOk := opR.(*TableScanOperator)
writer, writerOk := opW.(*IndexIngestOperator)
if !readerOk || !writerOk {
logutil.DDLIngestLogger().Error(
"unexpected operator types, config can't be adjusted",
zap.Int64("jobID", job.ID),
zap.Bool("isReaderValid", readerOk),
zap.Bool("isWriterValid", writerOk),
)
return
}
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
}
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
}
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error {
err := pipe.Execute()
if err != nil {
return err
}

// Adjust worker pool size dynamically.
if job != nil {
go func() {
adjustWorkerPoolSize(ctx, pipe, job, avgRowSize)
}()
}

err = pipe.Close()
if opErr := ctx.OperatorErr(); opErr != nil {
return opErr
Expand Down Expand Up @@ -825,6 +879,9 @@ func (s *localRowCntListener) SetTotal(total int) {
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + int64(total))
}

// UpdateDDLJobReorgCfgInterval is the interval to check and update reorg configuration.
const UpdateDDLJobReorgCfgInterval = 2 * time.Second

// writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
//
Expand Down Expand Up @@ -929,14 +986,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
// We try to adjust the worker size regularly to reduce
// the overhead of loading the DDL related global variables.
err2 = scheduler.adjustWorkerSize()
if err2 != nil {
logutil.DDLLogger().Warn("cannot adjust backfill worker size",
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
failpoint.InjectCall("afterUpdateReorgMeta")
}
}
Expand Down Expand Up @@ -978,6 +1027,31 @@ func (dc *ddlCtx) writePhysicalTableRecord(
return nil
})

// update the worker cnt goroutine
go func() {
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
currentWorkerCnt := scheduler.currentWorkerSize()
targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
if currentWorkerCnt != targetWorkerCnt {
err := scheduler.adjustWorkerSize()
if err != nil {
logutil.DDLLogger().Error("adjust ddl job config failed",
zap.Error(err))
} else {
logutil.DDLLogger().Info("adjust ddl job config success",
zap.Int("current worker count", scheduler.currentWorkerSize()))
}
}
}
}
}()

return eg.Wait()
}

Expand Down
61 changes: 38 additions & 23 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"path"
"strconv"
"sync"
"sync/atomic"
"time"

Expand All @@ -41,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -170,11 +172,17 @@ func NewAddIndexIngestPipeline(
if err != nil {
return nil, err
}
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
srcChkPool := createChunkPool(copCtx, reorgMeta)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
rm := reorgMeta
if rm.IsDistReorg {
// Currently, only the batch size of local ingest mode can be adjusted
rm = nil
}

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr,
reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), rm)
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener)
Expand Down Expand Up @@ -221,7 +229,7 @@ func NewWriteIndexToExternalStoragePipeline(
if err != nil {
return nil, err
}
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
srcChkPool := createChunkPool(copCtx, reorgMeta)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)

backend, err := storage.ParseBackend(extStoreURI, nil)
Expand All @@ -239,7 +247,8 @@ func NewWriteIndexToExternalStoragePipeline(
})

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil,
reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), nil)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID,
tbl, indexes, extStore, srcChkPool, writerCnt,
Expand All @@ -264,14 +273,13 @@ func NewWriteIndexToExternalStoragePipeline(
), nil
}

func createChunkPool(copCtx copr.CopContext, hintConc, hintBatchSize int) chan *chunk.Chunk {
poolSize := ingest.CopReadChunkPoolSize(hintConc)
batchSize := ingest.CopReadBatchSize(hintBatchSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize)
func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *sync.Pool {
return &sync.Pool{
New: func() any {
return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes,
reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())))
},
}
return srcChkPool
}

// TableScanTask contains the start key and the end key of a region.
Expand Down Expand Up @@ -478,10 +486,11 @@ func NewTableScanOperator(
ctx *OperatorCtx,
sessPool opSessPool,
copCtx copr.CopContext,
srcChkPool chan *chunk.Chunk,
srcChkPool *sync.Pool,
concurrency int,
cpMgr *ingest.CheckpointManager,
hintBatchSize int,
reorgMeta *model.DDLReorgMeta,
) *TableScanOperator {
totalCount := new(atomic.Int64)
pool := workerpool.NewWorkerPool(
Expand All @@ -498,6 +507,7 @@ func NewTableScanOperator(
cpMgr: cpMgr,
hintBatchSize: hintBatchSize,
totalCount: totalCount,
reorgMeta: reorgMeta,
}
})
return &TableScanOperator{
Expand All @@ -518,9 +528,10 @@ type tableScanWorker struct {
copCtx copr.CopContext
sessPool opSessPool
se *session.Session
srcChkPool chan *chunk.Chunk
srcChkPool *sync.Pool

cpMgr *ingest.CheckpointManager
reorgMeta *model.DDLReorgMeta
hintBatchSize int
totalCount *atomic.Int64
}
Expand Down Expand Up @@ -588,17 +599,21 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
}

func (w *tableScanWorker) getChunk() *chunk.Chunk {
chk := <-w.srcChkPool
newCap := ingest.CopReadBatchSize(w.hintBatchSize)
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
targetCap := ingest.CopReadBatchSize(w.hintBatchSize)
if w.reorgMeta != nil {
targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())))
}
chk := w.srcChkPool.Get().(*chunk.Chunk)
if chk.Capacity() != targetCap {
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, targetCap)
logutil.Logger(w.ctx).Info("adjust ddl job config success", zap.Int("current batch size", chk.Capacity()))
}
chk.Reset()
return chk
}

func (w *tableScanWorker) recycleChunk(chk *chunk.Chunk) {
w.srcChkPool <- chk
w.srcChkPool.Put(chk)
}

// WriteExternalStoreOperator writes index records to external storage.
Expand All @@ -618,7 +633,7 @@ func NewWriteExternalStoreOperator(
tbl table.PhysicalTable,
indexes []table.Index,
store storage.ExternalStorage,
srcChunkPool chan *chunk.Chunk,
srcChunkPool *sync.Pool,
concurrency int,
onClose external.OnCloseFunc,
memoryQuota uint64,
Expand Down Expand Up @@ -704,7 +719,7 @@ func NewIndexIngestOperator(
tbl table.PhysicalTable,
indexes []table.Index,
engines []ingest.Engine,
srcChunkPool chan *chunk.Chunk,
srcChunkPool *sync.Pool,
concurrency int,
reorgMeta *model.DDLReorgMeta,
cpMgr *ingest.CheckpointManager,
Expand Down Expand Up @@ -765,7 +780,7 @@ type indexIngestExternalWorker struct {
func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool <- ck.Chunk
w.srcChunkPool.Put(ck.Chunk)
}
}()
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
Expand All @@ -787,7 +802,7 @@ type indexIngestLocalWorker struct {
func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool <- ck.Chunk
w.srcChunkPool.Put(ck.Chunk)
}
}()
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
Expand Down Expand Up @@ -827,7 +842,7 @@ type indexIngestBaseWorker struct {
restore func(sessionctx.Context)

writers []ingest.Writer
srcChunkPool chan *chunk.Chunk
srcChunkPool *sync.Pool
// only available in global sort
totalCount *atomic.Int64
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe)
return executeAndClosePipeline(opCtx, pipe, nil, 0)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe)
err = executeAndClosePipeline(opCtx, pipe, nil, 0)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.
Expand Down
34 changes: 34 additions & 0 deletions pkg/ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package ddl

import (
"bytes"
"context"
"testing"
"time"

"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/errctx"
Expand Down Expand Up @@ -485,3 +487,35 @@ func TestValidateAndFillRanges(t *testing.T) {
err = validateAndFillRanges(ranges, []byte("b"), []byte("f"))
require.Error(t, err)
}

func TestTuneTableScanWorkerBatchSize(t *testing.T) {
reorgMeta := &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 32,
}
copCtx := &copr.CopContextSingleIndex{
CopContextBase: &copr.CopContextBase{
FieldTypes: []*types.FieldType{},
},
}
opCtx, cancel := NewDistTaskOperatorCtx(context.Background(), 1, 1)
w := tableScanWorker{
copCtx: copCtx,
ctx: opCtx,
srcChkPool: createChunkPool(copCtx, reorgMeta),
hintBatchSize: 32,
reorgMeta: reorgMeta,
}
for i := 0; i < 10; i++ {
chk := w.getChunk()
require.Equal(t, 32, chk.Capacity())
w.srcChkPool.Put(chk)
}
reorgMeta.SetBatchSize(64)
for i := 0; i < 10; i++ {
chk := w.getChunk()
require.Equal(t, 64, chk.Capacity())
w.srcChkPool.Put(chk)
}
cancel()
}
Loading

0 comments on commit 2f8d1f6

Please sign in to comment.