diff --git a/ddl/index.go b/ddl/index.go index 6b819fb062e1e..9daa30fe93370 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -711,7 +711,10 @@ func pickBackfillType(w *worker, job *model.Job) model.ReorgType { // canUseIngest indicates whether it can use ingest way to backfill index. func canUseIngest(w *worker) bool { // We only allow one task to use ingest at the same time, in order to limit the CPU usage. - if len(ingest.LitBackCtxMgr.Keys()) > 0 { + activeJobIDs := ingest.LitBackCtxMgr.Keys() + if len(activeJobIDs) > 0 { + logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job", + zap.Int64("job ID", activeJobIDs[0])) return false } ctx, err := w.sessPool.get() diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 950cd91404bc6..23b3f5dcc9cd5 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -43,12 +43,19 @@ import ( "go.uber.org/zap" ) -// copReadBatchFactor is the factor of batch size of coprocessor read. -// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range. -const copReadBatchFactor = 10 +// copReadBatchSize is the batch size of coprocessor read. +// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid +// sending too many cop requests for the same handle range. +func copReadBatchSize() int { + return 10 * int(variable.GetDDLReorgBatchSize()) +} -// copReadConcurrencyFactor is the factor of concurrency of coprocessor read. -const copReadConcurrencyFactor = 10 +// copReadChunkPoolSize is the size of chunk pool, which +// represents the max concurrent ongoing coprocessor requests. +// It multiplies the tidb_ddl_reorg_worker_cnt by 10. +func copReadChunkPoolSize() int { + return 10 * int(variable.GetDDLReorgWorkerCounter()) +} func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, *chunk.Chunk, kv.Key, bool, error) { ticker := time.NewTicker(500 * time.Millisecond) @@ -137,7 +144,6 @@ func (c *copReqSender) run() { p.resultsCh <- idxRecResult{id: task.id, err: err} p.recycleIdxRecordsAndChunk(idxRec, srcChk) terror.Call(rs.Close) - _ = rs.Close() return } total += len(idxRec) @@ -148,12 +154,12 @@ func (c *copReqSender) run() { } func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool { - poolSize := int(variable.GetDDLReorgWorkerCounter() * copReadConcurrencyFactor) + poolSize := copReadChunkPoolSize() idxBufPool := make(chan []*indexRecord, poolSize) srcChkPool := make(chan *chunk.Chunk, poolSize) for i := 0; i < poolSize; i++ { - idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize()) - srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize())) + idxBufPool <- make([]*indexRecord, 0, copReadBatchSize()) + srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, copReadBatchSize()) } return &copReqSenderPool{ tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), @@ -220,7 +226,7 @@ func (c *copReqSenderPool) drainResults() { func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) { ir := <-c.idxBufPool chk := <-c.srcChkPool - newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor + newCap := copReadBatchSize() if chk.Capacity() != newCap { chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap) } diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel index 3fd286e450b25..962ae4da35637 100644 --- a/ddl/ingest/BUILD.bazel +++ b/ddl/ingest/BUILD.bazel @@ -33,13 +33,13 @@ go_library( "//sessionctx/variable", "//table", "//util", + "//util/dbterror", "//util/generic", "//util/logutil", "//util/mathutil", "//util/size", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", - "@com_github_pkg_errors//:errors", "@org_uber_go_zap//:zap", ], ) diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index 63034f0be3a22..26344359dd6b9 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -17,13 +17,13 @@ package ingest import ( "context" - "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/config" tikv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -45,7 +45,7 @@ type BackendContext struct { func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Table) error { ei, exist := bc.EngMgr.Load(indexID) if !exist { - return errors.New(LitErrGetEngineFail) + return dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found") } err := ei.ImportAndClean() @@ -63,7 +63,7 @@ func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Tab if err != nil { logutil.BgLogger().Error(LitInfoRemoteDupCheck, zap.Error(err), zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) - return errors.New(LitInfoRemoteDupCheck) + return err } else if hasDupe { logutil.BgLogger().Error(LitErrRemoteDupExistErr, zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) @@ -80,7 +80,7 @@ func (bc *BackendContext) Flush(indexID int64) error { ei, exist := bc.EngMgr.Load(indexID) if !exist { logutil.BgLogger().Error(LitErrGetEngineFail, zap.Int64("index ID", indexID)) - return errors.New(LitErrGetEngineFail) + return dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found") } err := bc.diskRoot.UpdateUsageAndQuota() diff --git a/ddl/ingest/disk_root.go b/ddl/ingest/disk_root.go index c1c98f3fe681a..445115333edd1 100644 --- a/ddl/ingest/disk_root.go +++ b/ddl/ingest/disk_root.go @@ -15,7 +15,6 @@ package ingest import ( - "github.com/pingcap/errors" lcom "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" @@ -64,7 +63,7 @@ func (d *diskRootImpl) UpdateUsageAndQuota() error { sz, err := lcom.GetStorageSize(d.path) if err != nil { logutil.BgLogger().Error(LitErrGetStorageQuota, zap.Error(err)) - return errors.New(LitErrGetStorageQuota) + return err } d.maxQuota = mathutil.Min(variable.DDLDiskQuota.Load(), uint64(capacityThreshold*float64(sz.Capacity))) return nil diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index d875d78e346d0..8392674c1eae6 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" - "github.com/pkg/errors" "go.uber.org/zap" ) @@ -99,7 +98,7 @@ func (ei *engineInfo) ImportAndClean() error { if err1 != nil { logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err1), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - return errors.New(LitErrCloseEngineErr) + return err1 } ei.openedEngine = nil @@ -118,7 +117,7 @@ func (ei *engineInfo) ImportAndClean() error { if err != nil { logutil.BgLogger().Error(LitErrIngestDataErr, zap.Error(err), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - return errors.New(LitErrIngestDataErr) + return err } // Clean up the engine local workspace. @@ -126,7 +125,7 @@ func (ei *engineInfo) ImportAndClean() error { if err != nil { logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - return errors.New(LitErrCloseEngineErr) + return err } return nil } diff --git a/ddl/ingest/engine_mgr.go b/ddl/ingest/engine_mgr.go index 4cf1734747ee6..565d0b30d1ab8 100644 --- a/ddl/ingest/engine_mgr.go +++ b/ddl/ingest/engine_mgr.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -73,7 +74,7 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.Int64("job ID", job.ID), zap.Int64("index ID", indexID), zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency)) - return nil, errors.New(LitErrExceedConcurrency) + return nil, dbterror.ErrIngestFailed.FastGenByArgs("concurrency quota exceeded") } en.writerCount++ info = LitInfoAddWriter diff --git a/ddl/ingest/env.go b/ddl/ingest/env.go index 6e482523ad84a..864cc61ae4e02 100644 --- a/ddl/ingest/env.go +++ b/ddl/ingest/env.go @@ -53,6 +53,7 @@ func InitGlobalLightningEnv() { zap.String("storage limitation", "only support TiKV storage"), zap.String("current storage", globalCfg.Store), zap.Bool("lightning is initialized", LitInitialized)) + return } sPath, err := genLightningDataDir() if err != nil { @@ -109,8 +110,5 @@ func genLightningDataDir() (string, error) { return sortPath, nil } -// GenRLimitForTest is only used for test. -var GenRLimitForTest = util.GenRLimit() - // GenLightningDataDirForTest is only used for test. var GenLightningDataDirForTest = genLightningDataDir diff --git a/ddl/ingest/message.go b/ddl/ingest/message.go index 1a68541d68f0c..0828d68796ba4 100644 --- a/ddl/ingest/message.go +++ b/ddl/ingest/message.go @@ -15,7 +15,7 @@ package ingest import ( - "github.com/pingcap/errors" + "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -23,28 +23,26 @@ import ( // Message const text const ( LitErrAllocMemFail string = "[ddl-ingest] allocate memory failed" - LitErrOutMaxMem string = "[ddl-ingest] memory used up for lightning add index" - LitErrCreateDirFail string = "[ddl-ingest] create lightning sort path error" - LitErrStatDirFail string = "[ddl-ingest] stat lightning sort path error" - LitErrDeleteDirFail string = "[ddl-ingest] delete lightning sort path error" - LitErrCreateBackendFail string = "[ddl-ingest] build lightning backend failed, will use kernel index reorg method to backfill the index" - LitErrGetBackendFail string = "[ddl-ingest] can not get cached backend" - LitErrCreateEngineFail string = "[ddl-ingest] build lightning engine failed, will use kernel index reorg method to backfill the index" - LitErrCreateContextFail string = "[ddl-ingest] build lightning worker context failed, will use kernel index reorg method to backfill the index" - LitErrGetEngineFail string = "[ddl-ingest] can not get cached engine info" + LitErrCreateDirFail string = "[ddl-ingest] create ingest sort path error" + LitErrStatDirFail string = "[ddl-ingest] stat ingest sort path error" + LitErrDeleteDirFail string = "[ddl-ingest] delete ingest sort path error" + LitErrCreateBackendFail string = "[ddl-ingest] build ingest backend failed" + LitErrGetBackendFail string = "[ddl-ingest] cannot get ingest backend" + LitErrCreateEngineFail string = "[ddl-ingest] build ingest engine failed" + LitErrCreateContextFail string = "[ddl-ingest] build ingest writer context failed" + LitErrGetEngineFail string = "[ddl-ingest] can not get ingest engine info" LitErrGetStorageQuota string = "[ddl-ingest] get storage quota error" LitErrCloseEngineErr string = "[ddl-ingest] close engine error" LitErrCleanEngineErr string = "[ddl-ingest] clean engine error" LitErrFlushEngineErr string = "[ddl-ingest] flush engine data err" LitErrIngestDataErr string = "[ddl-ingest] ingest data into storage error" LitErrRemoteDupExistErr string = "[ddl-ingest] remote duplicate index key exist" - LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than lightning limit(tikv-importer.range-concurrency)" + LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than ingest limit" LitErrUpdateDiskStats string = "[ddl-ingest] update disk usage error" LitWarnEnvInitFail string = "[ddl-ingest] initialize environment failed" LitWarnConfigError string = "[ddl-ingest] build config for backend failed" - LitWarnGenMemLimit string = "[ddl-ingest] generate memory max limitation" - LitInfoEnvInitSucc string = "[ddl-ingest] init global lightning backend environment finished" - LitInfoSortDir string = "[ddl-ingest] the lightning sorted dir" + LitInfoEnvInitSucc string = "[ddl-ingest] init global ingest backend environment finished" + LitInfoSortDir string = "[ddl-ingest] the ingest sorted directory" LitInfoCreateBackend string = "[ddl-ingest] create one backend for an DDL job" LitInfoCloseBackend string = "[ddl-ingest] close one backend for DDL job" LitInfoOpenEngine string = "[ddl-ingest] open an engine for index reorg task" @@ -53,9 +51,8 @@ const ( LitInfoCloseEngine string = "[ddl-ingest] flush all writer and get closed engine" LitInfoRemoteDupCheck string = "[ddl-ingest] start remote duplicate checking" LitInfoStartImport string = "[ddl-ingest] start to import data" - LitInfoSetMemLimit string = "[ddl-ingest] set max memory limitation" - LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for lightning" - LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for lightning" + LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for ingest" + LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for ingest" LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage" ) @@ -63,7 +60,7 @@ func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error { logutil.BgLogger().Warn(LitErrAllocMemFail, zap.Int64("job ID", jobID), zap.Int64("current memory usage", memRoot.CurrentUsage()), zap.Int64("max memory quota", memRoot.MaxMemoryQuota())) - return errors.New(LitErrOutMaxMem) + return dbterror.ErrIngestFailed.FastGenByArgs("memory used up") } func genEngineAllocMemFailedErr(memRoot MemRoot, jobID, idxID int64) error { @@ -71,5 +68,5 @@ func genEngineAllocMemFailedErr(memRoot MemRoot, jobID, idxID int64) error { zap.Int64("index ID", idxID), zap.Int64("current memory usage", memRoot.CurrentUsage()), zap.Int64("max memory quota", memRoot.MaxMemoryQuota())) - return errors.New(LitErrOutMaxMem) + return dbterror.ErrIngestFailed.FastGenByArgs("memory used up") } diff --git a/errno/errcode.go b/errno/errcode.go index d9c1761887c3c..8c1cd580f3d7e 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1086,6 +1086,7 @@ const ( ErrPartitionColumnStatsMissing = 8244 ErrColumnInChange = 8245 ErrDDLSetting = 8246 + ErrIngestFailed = 8247 // TiKV/PD/TiFlash errors. ErrPDServerTimeout = 9001 diff --git a/errno/errname.go b/errno/errname.go index c851e1767aa08..d14d9a286da14 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1079,6 +1079,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrPartitionStatsMissing: mysql.Message("Build global-level stats failed due to missing partition-level stats: %s", nil), ErrPartitionColumnStatsMissing: mysql.Message("Build global-level stats failed due to missing partition-level column stats: %s, please run analyze table to refresh columns of all partitions", nil), ErrDDLSetting: mysql.Message("Error happened when enable/disable DDL: %s", nil), + ErrIngestFailed: mysql.Message("Ingest failed: %s", nil), ErrNotSupportedWithSem: mysql.Message("Feature '%s' is not supported when security enhanced mode is enabled", nil), ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil), diff --git a/errors.toml b/errors.toml index 2de8170f05f60..3e445687d00e5 100644 --- a/errors.toml +++ b/errors.toml @@ -1361,6 +1361,11 @@ error = ''' Error happened when enable/disable DDL: %s ''' +["ddl:8247"] +error = ''' +Ingest failed: %s +''' + ["domain:8027"] error = ''' Information schema is out of date: schema failed to update in 1 lease, please make sure TiDB can connect to TiKV diff --git a/tests/realtikvtest/addindextest/main_test.go b/tests/realtikvtest/addindextest/main_test.go index 5171b56a48d1a..a308c7831a249 100644 --- a/tests/realtikvtest/addindextest/main_test.go +++ b/tests/realtikvtest/addindextest/main_test.go @@ -18,6 +18,7 @@ import ( "flag" "testing" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/tests/realtikvtest" ) @@ -26,5 +27,8 @@ import ( var FullMode = flag.Bool("full-mode", false, "whether tests run in full mode") func TestMain(m *testing.M) { + config.UpdateGlobal(func(conf *config.Config) { + conf.Store = "tikv" + }) realtikvtest.RunTestMain(m) } diff --git a/util/dbterror/ddl_terror.go b/util/dbterror/ddl_terror.go index cce0cd23c8cb7..7926dc84ebc66 100644 --- a/util/dbterror/ddl_terror.go +++ b/util/dbterror/ddl_terror.go @@ -391,6 +391,8 @@ var ( ErrCannotCancelDDLJob = ClassDDL.NewStd(mysql.ErrCannotCancelDDLJob) // ErrDDLSetting returns when failing to enable/disable DDL ErrDDLSetting = ClassDDL.NewStd(mysql.ErrDDLSetting) + // ErrIngestFailed returns when the DDL ingest job is failed. + ErrIngestFailed = ClassDDL.NewStd(mysql.ErrIngestFailed) // ErrColumnInChange indicates there is modification on the column in parallel. ErrColumnInChange = ClassDDL.NewStd(mysql.ErrColumnInChange)