Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: improve the log messages and refine code #39456

Merged
merged 17 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,10 @@ func pickBackfillType(w *worker, job *model.Job) model.ReorgType {
// canUseIngest indicates whether it can use ingest way to backfill index.
func canUseIngest(w *worker) bool {
// We only allow one task to use ingest at the same time, in order to limit the CPU usage.
if len(ingest.LitBackCtxMgr.Keys()) > 0 {
activeJobIDs := ingest.LitBackCtxMgr.Keys()
if len(activeJobIDs) > 0 {
logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job",
zap.Int64("job ID", activeJobIDs[0]))
return false
}
ctx, err := w.sessPool.get()
Expand Down
25 changes: 16 additions & 9 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,19 @@ import (
"go.uber.org/zap"
)

// copReadBatchFactor is the factor of batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range.
const copReadBatchFactor = 10
// copReadBatchSize is the batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid
// sending too many cop requests for the same handle range.
func copReadBatchSize() int {
return 10 * int(variable.GetDDLReorgBatchSize())
}

// copReadConcurrencyFactor is the factor of concurrency of coprocessor read.
const copReadConcurrencyFactor = 10
// copReadChunkPoolSize is the size of chunk pool, which
// represents the max concurrent ongoing coprocessor requests.
// It multiplies the tidb_ddl_reorg_worker_cnt by 10.
func copReadChunkPoolSize() int {
return 10 * int(variable.GetDDLReorgWorkerCounter())
}

func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, *chunk.Chunk, kv.Key, bool, error) {
ticker := time.NewTicker(500 * time.Millisecond)
Expand Down Expand Up @@ -148,12 +155,12 @@ func (c *copReqSender) run() {
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
poolSize := int(variable.GetDDLReorgWorkerCounter() * copReadConcurrencyFactor)
poolSize := copReadChunkPoolSize()
idxBufPool := make(chan []*indexRecord, poolSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize())
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize()))
idxBufPool <- make([]*indexRecord, 0, copReadBatchSize())
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, copReadBatchSize())
}
return &copReqSenderPool{
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
Expand Down Expand Up @@ -220,7 +227,7 @@ func (c *copReqSenderPool) drainResults() {
func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) {
ir := <-c.idxBufPool
chk := <-c.srcChkPool
newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor
newCap := copReadBatchSize()
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap)
}
Expand Down
1 change: 0 additions & 1 deletion ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ go_library(
"//util/size",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pkg_errors//:errors",
"@org_uber_go_zap//:zap",
],
)
Expand Down
6 changes: 3 additions & 3 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type BackendContext struct {
func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Table) error {
ei, exist := bc.EngMgr.Load(indexID)
if !exist {
return errors.New(LitErrGetEngineFail)
return errors.New("ingest engine not found")
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
}

err := ei.ImportAndClean()
Expand All @@ -63,7 +63,7 @@ func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Tab
if err != nil {
logutil.BgLogger().Error(LitInfoRemoteDupCheck, zap.Error(err),
zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID))
return errors.New(LitInfoRemoteDupCheck)
return err
} else if hasDupe {
logutil.BgLogger().Error(LitErrRemoteDupExistErr,
zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID))
Expand All @@ -80,7 +80,7 @@ func (bc *BackendContext) Flush(indexID int64) error {
ei, exist := bc.EngMgr.Load(indexID)
if !exist {
logutil.BgLogger().Error(LitErrGetEngineFail, zap.Int64("index ID", indexID))
return errors.New(LitErrGetEngineFail)
return errors.New("ingest engine not found")
}

err := bc.diskRoot.UpdateUsageAndQuota()
Expand Down
3 changes: 1 addition & 2 deletions ddl/ingest/disk_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ingest

import (
"github.com/pingcap/errors"
lcom "github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (d *diskRootImpl) UpdateUsageAndQuota() error {
sz, err := lcom.GetStorageSize(d.path)
if err != nil {
logutil.BgLogger().Error(LitErrGetStorageQuota, zap.Error(err))
return errors.New(LitErrGetStorageQuota)
return err
}
d.maxQuota = mathutil.Min(variable.DDLDiskQuota.Load(), uint64(capacityThreshold*float64(sz.Capacity)))
return nil
Expand Down
7 changes: 3 additions & 4 deletions ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -99,7 +98,7 @@ func (ei *engineInfo) ImportAndClean() error {
if err1 != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err1),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrCloseEngineErr)
return err1
}
ei.openedEngine = nil

Expand All @@ -118,15 +117,15 @@ func (ei *engineInfo) ImportAndClean() error {
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrIngestDataErr)
return err
}

// Clean up the engine local workspace.
err = closeEngine.Cleanup(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrCloseEngineErr)
return err
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int
logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.Int64("job ID", job.ID),
zap.Int64("index ID", indexID),
zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency))
return nil, errors.New(LitErrExceedConcurrency)
return nil, errors.New("concurrency quota exceeded")
}
en.writerCount++
info = LitInfoAddWriter
Expand Down
33 changes: 15 additions & 18 deletions ddl/ingest/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,26 @@ import (
// Message const text
const (
LitErrAllocMemFail string = "[ddl-ingest] allocate memory failed"
LitErrOutMaxMem string = "[ddl-ingest] memory used up for lightning add index"
LitErrCreateDirFail string = "[ddl-ingest] create lightning sort path error"
LitErrStatDirFail string = "[ddl-ingest] stat lightning sort path error"
LitErrDeleteDirFail string = "[ddl-ingest] delete lightning sort path error"
LitErrCreateBackendFail string = "[ddl-ingest] build lightning backend failed, will use kernel index reorg method to backfill the index"
LitErrGetBackendFail string = "[ddl-ingest] can not get cached backend"
LitErrCreateEngineFail string = "[ddl-ingest] build lightning engine failed, will use kernel index reorg method to backfill the index"
LitErrCreateContextFail string = "[ddl-ingest] build lightning worker context failed, will use kernel index reorg method to backfill the index"
LitErrGetEngineFail string = "[ddl-ingest] can not get cached engine info"
LitErrCreateDirFail string = "[ddl-ingest] create ingest sort path error"
LitErrStatDirFail string = "[ddl-ingest] stat ingest sort path error"
LitErrDeleteDirFail string = "[ddl-ingest] delete ingest sort path error"
LitErrCreateBackendFail string = "[ddl-ingest] build ingest backend failed"
LitErrGetBackendFail string = "[ddl-ingest] cannot get ingest backend"
LitErrCreateEngineFail string = "[ddl-ingest] build ingest engine failed"
LitErrCreateContextFail string = "[ddl-ingest] build ingest writer context failed"
LitErrGetEngineFail string = "[ddl-ingest] can not get ingest engine info"
LitErrGetStorageQuota string = "[ddl-ingest] get storage quota error"
LitErrCloseEngineErr string = "[ddl-ingest] close engine error"
LitErrCleanEngineErr string = "[ddl-ingest] clean engine error"
LitErrFlushEngineErr string = "[ddl-ingest] flush engine data err"
LitErrIngestDataErr string = "[ddl-ingest] ingest data into storage error"
LitErrRemoteDupExistErr string = "[ddl-ingest] remote duplicate index key exist"
LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than lightning limit(tikv-importer.range-concurrency)"
LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than ingest limit"
LitErrUpdateDiskStats string = "[ddl-ingest] update disk usage error"
LitWarnEnvInitFail string = "[ddl-ingest] initialize environment failed"
LitWarnConfigError string = "[ddl-ingest] build config for backend failed"
LitWarnGenMemLimit string = "[ddl-ingest] generate memory max limitation"
LitInfoEnvInitSucc string = "[ddl-ingest] init global lightning backend environment finished"
LitInfoSortDir string = "[ddl-ingest] the lightning sorted dir"
LitInfoEnvInitSucc string = "[ddl-ingest] init global ingest backend environment finished"
LitInfoSortDir string = "[ddl-ingest] the ingest sorted directory"
LitInfoCreateBackend string = "[ddl-ingest] create one backend for an DDL job"
LitInfoCloseBackend string = "[ddl-ingest] close one backend for DDL job"
LitInfoOpenEngine string = "[ddl-ingest] open an engine for index reorg task"
Expand All @@ -53,23 +51,22 @@ const (
LitInfoCloseEngine string = "[ddl-ingest] flush all writer and get closed engine"
LitInfoRemoteDupCheck string = "[ddl-ingest] start remote duplicate checking"
LitInfoStartImport string = "[ddl-ingest] start to import data"
LitInfoSetMemLimit string = "[ddl-ingest] set max memory limitation"
LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for lightning"
LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for lightning"
LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for ingest"
LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for ingest"
LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage"
)

func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error {
logutil.BgLogger().Warn(LitErrAllocMemFail, zap.Int64("job ID", jobID),
zap.Int64("current memory usage", memRoot.CurrentUsage()),
zap.Int64("max memory quota", memRoot.MaxMemoryQuota()))
return errors.New(LitErrOutMaxMem)
return errors.New("memory used up")
}

func genEngineAllocMemFailedErr(memRoot MemRoot, jobID, idxID int64) error {
logutil.BgLogger().Warn(LitErrAllocMemFail, zap.Int64("job ID", jobID),
zap.Int64("index ID", idxID),
zap.Int64("current memory usage", memRoot.CurrentUsage()),
zap.Int64("max memory quota", memRoot.MaxMemoryQuota()))
return errors.New(LitErrOutMaxMem)
return errors.New("memory used up")
}