Skip to content

Commit

Permalink
ddl: improve the log messages and refine code (#39456)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Dec 2, 2022
1 parent f799e72 commit cabc018
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 45 deletions.
5 changes: 4 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,10 @@ func pickBackfillType(w *worker, job *model.Job) model.ReorgType {
// canUseIngest indicates whether it can use ingest way to backfill index.
func canUseIngest(w *worker) bool {
// We only allow one task to use ingest at the same time, in order to limit the CPU usage.
if len(ingest.LitBackCtxMgr.Keys()) > 0 {
activeJobIDs := ingest.LitBackCtxMgr.Keys()
if len(activeJobIDs) > 0 {
logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job",
zap.Int64("job ID", activeJobIDs[0]))
return false
}
ctx, err := w.sessPool.get()
Expand Down
26 changes: 16 additions & 10 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,19 @@ import (
"go.uber.org/zap"
)

// copReadBatchFactor is the factor of batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range.
const copReadBatchFactor = 10
// copReadBatchSize is the batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid
// sending too many cop requests for the same handle range.
func copReadBatchSize() int {
return 10 * int(variable.GetDDLReorgBatchSize())
}

// copReadConcurrencyFactor is the factor of concurrency of coprocessor read.
const copReadConcurrencyFactor = 10
// copReadChunkPoolSize is the size of chunk pool, which
// represents the max concurrent ongoing coprocessor requests.
// It multiplies the tidb_ddl_reorg_worker_cnt by 10.
func copReadChunkPoolSize() int {
return 10 * int(variable.GetDDLReorgWorkerCounter())
}

func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, *chunk.Chunk, kv.Key, bool, error) {
ticker := time.NewTicker(500 * time.Millisecond)
Expand Down Expand Up @@ -137,7 +144,6 @@ func (c *copReqSender) run() {
p.resultsCh <- idxRecResult{id: task.id, err: err}
p.recycleIdxRecordsAndChunk(idxRec, srcChk)
terror.Call(rs.Close)
_ = rs.Close()
return
}
total += len(idxRec)
Expand All @@ -148,12 +154,12 @@ func (c *copReqSender) run() {
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
poolSize := int(variable.GetDDLReorgWorkerCounter() * copReadConcurrencyFactor)
poolSize := copReadChunkPoolSize()
idxBufPool := make(chan []*indexRecord, poolSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize())
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize()))
idxBufPool <- make([]*indexRecord, 0, copReadBatchSize())
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, copReadBatchSize())
}
return &copReqSenderPool{
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
Expand Down Expand Up @@ -220,7 +226,7 @@ func (c *copReqSenderPool) drainResults() {
func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) {
ir := <-c.idxBufPool
chk := <-c.srcChkPool
newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor
newCap := copReadBatchSize()
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ go_library(
"//sessionctx/variable",
"//table",
"//util",
"//util/dbterror",
"//util/generic",
"//util/logutil",
"//util/mathutil",
"//util/size",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pkg_errors//:errors",
"@org_uber_go_zap//:zap",
],
)
Expand Down
8 changes: 4 additions & 4 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ package ingest
import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/config"
tikv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -45,7 +45,7 @@ type BackendContext struct {
func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Table) error {
ei, exist := bc.EngMgr.Load(indexID)
if !exist {
return errors.New(LitErrGetEngineFail)
return dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found")
}

err := ei.ImportAndClean()
Expand All @@ -63,7 +63,7 @@ func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Tab
if err != nil {
logutil.BgLogger().Error(LitInfoRemoteDupCheck, zap.Error(err),
zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID))
return errors.New(LitInfoRemoteDupCheck)
return err
} else if hasDupe {
logutil.BgLogger().Error(LitErrRemoteDupExistErr,
zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID))
Expand All @@ -80,7 +80,7 @@ func (bc *BackendContext) Flush(indexID int64) error {
ei, exist := bc.EngMgr.Load(indexID)
if !exist {
logutil.BgLogger().Error(LitErrGetEngineFail, zap.Int64("index ID", indexID))
return errors.New(LitErrGetEngineFail)
return dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found")
}

err := bc.diskRoot.UpdateUsageAndQuota()
Expand Down
3 changes: 1 addition & 2 deletions ddl/ingest/disk_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ingest

import (
"github.com/pingcap/errors"
lcom "github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (d *diskRootImpl) UpdateUsageAndQuota() error {
sz, err := lcom.GetStorageSize(d.path)
if err != nil {
logutil.BgLogger().Error(LitErrGetStorageQuota, zap.Error(err))
return errors.New(LitErrGetStorageQuota)
return err
}
d.maxQuota = mathutil.Min(variable.DDLDiskQuota.Load(), uint64(capacityThreshold*float64(sz.Capacity)))
return nil
Expand Down
7 changes: 3 additions & 4 deletions ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -99,7 +98,7 @@ func (ei *engineInfo) ImportAndClean() error {
if err1 != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err1),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrCloseEngineErr)
return err1
}
ei.openedEngine = nil

Expand All @@ -118,15 +117,15 @@ func (ei *engineInfo) ImportAndClean() error {
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrIngestDataErr)
return err
}

// Clean up the engine local workspace.
err = closeEngine.Cleanup(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrCloseEngineErr)
return err
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int
logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.Int64("job ID", job.ID),
zap.Int64("index ID", indexID),
zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency))
return nil, errors.New(LitErrExceedConcurrency)
return nil, dbterror.ErrIngestFailed.FastGenByArgs("concurrency quota exceeded")
}
en.writerCount++
info = LitInfoAddWriter
Expand Down
4 changes: 1 addition & 3 deletions ddl/ingest/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func InitGlobalLightningEnv() {
zap.String("storage limitation", "only support TiKV storage"),
zap.String("current storage", globalCfg.Store),
zap.Bool("lightning is initialized", LitInitialized))
return
}
sPath, err := genLightningDataDir()
if err != nil {
Expand Down Expand Up @@ -109,8 +110,5 @@ func genLightningDataDir() (string, error) {
return sortPath, nil
}

// GenRLimitForTest is only used for test.
var GenRLimitForTest = util.GenRLimit()

// GenLightningDataDirForTest is only used for test.
var GenLightningDataDirForTest = genLightningDataDir
35 changes: 16 additions & 19 deletions ddl/ingest/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,34 @@
package ingest

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// Message const text
const (
LitErrAllocMemFail string = "[ddl-ingest] allocate memory failed"
LitErrOutMaxMem string = "[ddl-ingest] memory used up for lightning add index"
LitErrCreateDirFail string = "[ddl-ingest] create lightning sort path error"
LitErrStatDirFail string = "[ddl-ingest] stat lightning sort path error"
LitErrDeleteDirFail string = "[ddl-ingest] delete lightning sort path error"
LitErrCreateBackendFail string = "[ddl-ingest] build lightning backend failed, will use kernel index reorg method to backfill the index"
LitErrGetBackendFail string = "[ddl-ingest] can not get cached backend"
LitErrCreateEngineFail string = "[ddl-ingest] build lightning engine failed, will use kernel index reorg method to backfill the index"
LitErrCreateContextFail string = "[ddl-ingest] build lightning worker context failed, will use kernel index reorg method to backfill the index"
LitErrGetEngineFail string = "[ddl-ingest] can not get cached engine info"
LitErrCreateDirFail string = "[ddl-ingest] create ingest sort path error"
LitErrStatDirFail string = "[ddl-ingest] stat ingest sort path error"
LitErrDeleteDirFail string = "[ddl-ingest] delete ingest sort path error"
LitErrCreateBackendFail string = "[ddl-ingest] build ingest backend failed"
LitErrGetBackendFail string = "[ddl-ingest] cannot get ingest backend"
LitErrCreateEngineFail string = "[ddl-ingest] build ingest engine failed"
LitErrCreateContextFail string = "[ddl-ingest] build ingest writer context failed"
LitErrGetEngineFail string = "[ddl-ingest] can not get ingest engine info"
LitErrGetStorageQuota string = "[ddl-ingest] get storage quota error"
LitErrCloseEngineErr string = "[ddl-ingest] close engine error"
LitErrCleanEngineErr string = "[ddl-ingest] clean engine error"
LitErrFlushEngineErr string = "[ddl-ingest] flush engine data err"
LitErrIngestDataErr string = "[ddl-ingest] ingest data into storage error"
LitErrRemoteDupExistErr string = "[ddl-ingest] remote duplicate index key exist"
LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than lightning limit(tikv-importer.range-concurrency)"
LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than ingest limit"
LitErrUpdateDiskStats string = "[ddl-ingest] update disk usage error"
LitWarnEnvInitFail string = "[ddl-ingest] initialize environment failed"
LitWarnConfigError string = "[ddl-ingest] build config for backend failed"
LitWarnGenMemLimit string = "[ddl-ingest] generate memory max limitation"
LitInfoEnvInitSucc string = "[ddl-ingest] init global lightning backend environment finished"
LitInfoSortDir string = "[ddl-ingest] the lightning sorted dir"
LitInfoEnvInitSucc string = "[ddl-ingest] init global ingest backend environment finished"
LitInfoSortDir string = "[ddl-ingest] the ingest sorted directory"
LitInfoCreateBackend string = "[ddl-ingest] create one backend for an DDL job"
LitInfoCloseBackend string = "[ddl-ingest] close one backend for DDL job"
LitInfoOpenEngine string = "[ddl-ingest] open an engine for index reorg task"
Expand All @@ -53,23 +51,22 @@ const (
LitInfoCloseEngine string = "[ddl-ingest] flush all writer and get closed engine"
LitInfoRemoteDupCheck string = "[ddl-ingest] start remote duplicate checking"
LitInfoStartImport string = "[ddl-ingest] start to import data"
LitInfoSetMemLimit string = "[ddl-ingest] set max memory limitation"
LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for lightning"
LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for lightning"
LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for ingest"
LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for ingest"
LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage"
)

func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error {
logutil.BgLogger().Warn(LitErrAllocMemFail, zap.Int64("job ID", jobID),
zap.Int64("current memory usage", memRoot.CurrentUsage()),
zap.Int64("max memory quota", memRoot.MaxMemoryQuota()))
return errors.New(LitErrOutMaxMem)
return dbterror.ErrIngestFailed.FastGenByArgs("memory used up")
}

func genEngineAllocMemFailedErr(memRoot MemRoot, jobID, idxID int64) error {
logutil.BgLogger().Warn(LitErrAllocMemFail, zap.Int64("job ID", jobID),
zap.Int64("index ID", idxID),
zap.Int64("current memory usage", memRoot.CurrentUsage()),
zap.Int64("max memory quota", memRoot.MaxMemoryQuota()))
return errors.New(LitErrOutMaxMem)
return dbterror.ErrIngestFailed.FastGenByArgs("memory used up")
}
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,7 @@ const (
ErrPartitionColumnStatsMissing = 8244
ErrColumnInChange = 8245
ErrDDLSetting = 8246
ErrIngestFailed = 8247

// TiKV/PD/TiFlash errors.
ErrPDServerTimeout = 9001
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrPartitionStatsMissing: mysql.Message("Build global-level stats failed due to missing partition-level stats: %s", nil),
ErrPartitionColumnStatsMissing: mysql.Message("Build global-level stats failed due to missing partition-level column stats: %s, please run analyze table to refresh columns of all partitions", nil),
ErrDDLSetting: mysql.Message("Error happened when enable/disable DDL: %s", nil),
ErrIngestFailed: mysql.Message("Ingest failed: %s", nil),
ErrNotSupportedWithSem: mysql.Message("Feature '%s' is not supported when security enhanced mode is enabled", nil),

ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,11 @@ error = '''
Error happened when enable/disable DDL: %s
'''

["ddl:8247"]
error = '''
Ingest failed: %s
'''

["domain:8027"]
error = '''
Information schema is out of date: schema failed to update in 1 lease, please make sure TiDB can connect to TiKV
Expand Down
4 changes: 4 additions & 0 deletions tests/realtikvtest/addindextest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"testing"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/tests/realtikvtest"
)

Expand All @@ -26,5 +27,8 @@ import (
var FullMode = flag.Bool("full-mode", false, "whether tests run in full mode")

func TestMain(m *testing.M) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Store = "tikv"
})
realtikvtest.RunTestMain(m)
}
2 changes: 2 additions & 0 deletions util/dbterror/ddl_terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ var (
ErrCannotCancelDDLJob = ClassDDL.NewStd(mysql.ErrCannotCancelDDLJob)
// ErrDDLSetting returns when failing to enable/disable DDL
ErrDDLSetting = ClassDDL.NewStd(mysql.ErrDDLSetting)
// ErrIngestFailed returns when the DDL ingest job is failed.
ErrIngestFailed = ClassDDL.NewStd(mysql.ErrIngestFailed)

// ErrColumnInChange indicates there is modification on the column in parallel.
ErrColumnInChange = ClassDDL.NewStd(mysql.ErrColumnInChange)
Expand Down

0 comments on commit cabc018

Please sign in to comment.