From 006e9a9622dbb3e0b72e36e044e29f2e4506dde5 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 19 Apr 2023 21:05:41 +0800 Subject: [PATCH 01/12] ddl: limit the concurrent number of ingest jobs to 1 --- ddl/backfilling_test.go | 37 ++++++++++++ ddl/index.go | 60 ++++++++++++------- ddl/ingest/backend_mgr.go | 14 +++-- ddl/ingest/disk_root.go | 14 +++++ ddl/ingest/mock.go | 6 +- sessionctx/variable/sysvar.go | 2 +- sessionctx/variable/tidb_vars.go | 1 + .../addindextest/integration_test.go | 3 +- 8 files changed, 106 insertions(+), 31 deletions(-) diff --git a/ddl/backfilling_test.go b/ddl/backfilling_test.go index 167b809dd4487..451b6c9e80237 100644 --- a/ddl/backfilling_test.go +++ b/ddl/backfilling_test.go @@ -16,9 +16,14 @@ package ddl import ( "bytes" + "context" "testing" + "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/stretchr/testify/require" ) @@ -43,3 +48,35 @@ func TestDoneTaskKeeper(t *testing.T) { n.updateNextKey(6, kv.Key("h")) require.True(t, bytes.Equal(n.nextKey, kv.Key("h"))) } + +func TestPickBackfillType(t *testing.T) { + mockMgr := ingest.NewMockBackendCtxMgr( + func() sessionctx.Context { + return nil + }) + ingest.LitBackCtxMgr = mockMgr + mockCtx := context.Background() + const uk = false + mockJob := &model.Job{ + ID: 1, + ReorgMeta: &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeTxn, + }, + } + variable.EnableFastReorg.Store(true) + tp, err := pickBackfillType(mockJob, mockCtx, uk) + require.NoError(t, err) + require.Equal(t, tp, model.ReorgTypeTxn) + + mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone + ingest.LitInitialized = false + tp, err = pickBackfillType(mockJob, mockCtx, uk) + require.NoError(t, err) + require.Equal(t, tp, model.ReorgTypeTxnMerge) + + mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone + ingest.LitInitialized = true + tp, err = pickBackfillType(mockJob, mockCtx, uk) + require.NoError(t, err) + require.Equal(t, tp, model.ReorgTypeLitMerge) +} diff --git a/ddl/index.go b/ddl/index.go index d0153c848b37a..486e059c253b4 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -622,7 +622,11 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo switch indexInfo.State { case model.StateNone: // none -> delete only - reorgTp := pickBackfillType(job) + var reorgTp model.ReorgType + reorgTp, err = pickBackfillType(job, w.ctx, indexInfo.Unique) + if err != nil { + break + } if reorgTp.NeedMergeProcess() { // Increase telemetryAddIndexIngestUsage telemetryAddIndexIngestUsage.Inc() @@ -711,39 +715,48 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } // pickBackfillType determines which backfill process will be used. -func pickBackfillType(job *model.Job) model.ReorgType { +func pickBackfillType(job *model.Job, ctx context.Context, unique bool) (model.ReorgType, error) { if job.ReorgMeta.ReorgTp != model.ReorgTypeNone { // The backfill task has been started. - // Don't switch the backfill process. - return job.ReorgMeta.ReorgTp + // Don't change the backfill type. + return job.ReorgMeta.ReorgTp, nil } if IsEnableFastReorg() { - var useIngest bool - if ingest.LitInitialized && ingest.LitBackCtxMgr.Available() { - cleanupSortPath(job.ID) + if ingest.LitInitialized { + err := ingest.LitBackCtxMgr.CheckAvailable() + if err != nil { + return model.ReorgTypeNone, err + } + err = cleanupSortPath(job.ID) + if err != nil { + return model.ReorgTypeNone, err + } + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID) + if err != nil { + return model.ReorgTypeNone, err + } job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge - return model.ReorgTypeLitMerge + return model.ReorgTypeLitMerge, nil } // The lightning environment is unavailable, but we can still use the txn-merge backfill. logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", - zap.Bool("lightning env initialized", ingest.LitInitialized), - zap.Bool("can use ingest", useIngest)) + zap.Bool("lightning env initialized", ingest.LitInitialized)) job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge - return model.ReorgTypeTxnMerge + return model.ReorgTypeTxnMerge, nil } job.ReorgMeta.ReorgTp = model.ReorgTypeTxn - return model.ReorgTypeTxn + return model.ReorgTypeTxn, nil } // cleanupSortPath is used to clean up the temp data of the previous jobs. // Because we don't remove all the files after the support of checkpoint, // there maybe some stale files in the sort path if TiDB is killed during the backfill process. -func cleanupSortPath(currentJobID int64) { +func cleanupSortPath(currentJobID int64) error { sortPath := ingest.ConfigSortPath() entries, err := os.ReadDir(sortPath) if err != nil { - logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err)) - return + logutil.BgLogger().Warn("[ddl-ingest] cannot read sort path", zap.Error(err)) + return errors.Trace(err) } for _, entry := range entries { if !entry.IsDir() { @@ -762,10 +775,11 @@ func cleanupSortPath(currentJobID int64) { err := os.RemoveAll(filepath.Join(sortPath, entry.Name())) if err != nil { logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err)) - return + return nil } } } + return nil } // IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed. @@ -824,17 +838,21 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { - bfProcess := pickBackfillType(job) - if !bfProcess.NeedMergeProcess() { + var reorgTp model.ReorgType + reorgTp, err = pickBackfillType(job, w.ctx, indexInfo.Unique) + if err != nil { + return false, ver, err + } + if !reorgTp.NeedMergeProcess() { return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) } switch indexInfo.BackfillState { case model.BackfillStateRunning: logutil.BgLogger().Info("[ddl] index backfill state running", zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), - zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge), + zap.Bool("ingest mode", reorgTp == model.ReorgTypeLitMerge), zap.String("index", indexInfo.Name.O)) - switch bfProcess { + switch reorgTp { case model.ReorgTypeLitMerge: if job.ReorgMeta.IsDistReorg { done, ver, err = runIngestReorgJobDist(w, d, t, job, tbl, indexInfo) @@ -854,7 +872,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O)) indexInfo.BackfillState = model.BackfillStateMerging - if bfProcess == model.ReorgTypeLitMerge { + if reorgTp == model.ReorgTypeLitMerge { ingest.LitBackCtxMgr.Unregister(job.ID) } job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg. diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index 3a712ed4c90dc..ba64403fbb5b5 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -30,7 +30,7 @@ import ( // BackendCtxMgr is used to manage the backend context. type BackendCtxMgr interface { - Available() bool + CheckAvailable() error Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error) Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) @@ -56,16 +56,20 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr { return mgr } -// Available checks if the ingest backfill is available. -func (m *litBackendCtxMgr) Available() bool { +// CheckAvailable checks if the ingest backfill is available. +func (m *litBackendCtxMgr) CheckAvailable() error { // We only allow one task to use ingest at the same time, in order to limit the CPU usage. activeJobIDs := m.Keys() if len(activeJobIDs) > 0 { logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job", zap.Int64("job ID", activeJobIDs[0])) - return false + return nil } - return true + if err := m.diskRoot.PreCheckUsage(); err != nil { + logutil.BgLogger().Info("[ddl-ingest] ingest backfill is not available", zap.Error(err)) + return err + } + return nil } // Register creates a new backend and registers it to the backend context. diff --git a/ddl/ingest/disk_root.go b/ddl/ingest/disk_root.go index b9713fa26adae..3c50829d77cdf 100644 --- a/ddl/ingest/disk_root.go +++ b/ddl/ingest/disk_root.go @@ -18,6 +18,7 @@ import ( "fmt" "sync" + "github.com/pingcap/errors" lcom "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" @@ -29,6 +30,7 @@ type DiskRoot interface { UpdateUsage() ShouldImport() bool UsageInfo() string + PreCheckUsage() error } const capacityThreshold = 0.9 @@ -87,3 +89,15 @@ func (d *diskRootImpl) UsageInfo() string { defer d.mu.RUnlock() return fmt.Sprintf("disk usage: %d/%d, backend usage: %d", d.used, d.capacity, d.bcUsed) } + +// PreCheckUsage implements DiskRoot interface. +func (d *diskRootImpl) PreCheckUsage() error { + sz, err := lcom.GetStorageSize(d.path) + if err != nil { + return errors.Trace(err) + } + if float64(sz.Available) < (1-capacityThreshold)*float64(sz.Capacity) { + return errors.Errorf("%s, please clean up the disk and retry", d.UsageInfo()) + } + return nil +} diff --git a/ddl/ingest/mock.go b/ddl/ingest/mock.go index a0d5d6794d7f0..5811ff70e6702 100644 --- a/ddl/ingest/mock.go +++ b/ddl/ingest/mock.go @@ -39,9 +39,9 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken } } -// Available implements BackendCtxMgr.Available interface. -func (*MockBackendCtxMgr) Available() bool { - return true +// CheckAvailable implements BackendCtxMgr.Available interface. +func (*MockBackendCtxMgr) CheckAvailable() error { + return nil } // Register implements BackendCtxMgr.Register interface. diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 8b2e86d86dc65..56bb53ff85c53 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2185,7 +2185,7 @@ var defaultSysVars = []*SysVar{ return nil }}, // This system var is set disk quota for lightning sort dir, from 100 GB to 1PB. - {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: DefTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { + {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: MinTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { return strconv.FormatUint(DDLDiskQuota.Load(), 10), nil }, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota)) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 49bc83900d6bf..06c7b69550e3b 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1199,6 +1199,7 @@ const ( DefMemoryUsageAlarmKeepRecordNum = 5 DefTiDBEnableFastReorg = true DefTiDBDDLDiskQuota = 100 * 1024 * 1024 * 1024 // 100GB + MinTiDBDDLDiskQuota = 1 * 1024 * 1024 * 1024 // 1GB DefExecutorConcurrency = 5 DefTiDBEnableNonPreparedPlanCache = false DefTiDBNonPreparedPlanCacheSize = 100 diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index c07cdd5ce509d..fd12f740b7445 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -395,7 +395,8 @@ func TestAddIndexIngestCancel(t *testing.T) { tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob) require.True(t, cancelled) dom.DDL().SetHook(defHook) - require.True(t, ingest.LitBackCtxMgr.Available()) + err := ingest.LitBackCtxMgr.CheckAvailable() + require.NoError(t, err) } func TestAddIndexSplitTableRanges(t *testing.T) { From b3a1b2d08c5c48faa8ce0172348de85ba5427e25 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 19 Apr 2023 21:19:31 +0800 Subject: [PATCH 02/12] update bazel and fix linter --- ddl/BUILD.bazel | 1 + ddl/backfilling_test.go | 6 +++--- ddl/index.go | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index e4ce2b5c596f6..8159033d7b4bb 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -219,6 +219,7 @@ go_test( deps = [ "//autoid_service", "//config", + "//ddl/ingest", "//ddl/internal/callback", "//ddl/placement", "//ddl/schematracker", diff --git a/ddl/backfilling_test.go b/ddl/backfilling_test.go index 451b6c9e80237..973561dfe8435 100644 --- a/ddl/backfilling_test.go +++ b/ddl/backfilling_test.go @@ -64,19 +64,19 @@ func TestPickBackfillType(t *testing.T) { }, } variable.EnableFastReorg.Store(true) - tp, err := pickBackfillType(mockJob, mockCtx, uk) + tp, err := pickBackfillType(mockCtx, mockJob, uk) require.NoError(t, err) require.Equal(t, tp, model.ReorgTypeTxn) mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone ingest.LitInitialized = false - tp, err = pickBackfillType(mockJob, mockCtx, uk) + tp, err = pickBackfillType(mockCtx, mockJob, uk) require.NoError(t, err) require.Equal(t, tp, model.ReorgTypeTxnMerge) mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone ingest.LitInitialized = true - tp, err = pickBackfillType(mockJob, mockCtx, uk) + tp, err = pickBackfillType(mockCtx, mockJob, uk) require.NoError(t, err) require.Equal(t, tp, model.ReorgTypeLitMerge) } diff --git a/ddl/index.go b/ddl/index.go index 486e059c253b4..811cc9c294604 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -623,7 +623,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo case model.StateNone: // none -> delete only var reorgTp model.ReorgType - reorgTp, err = pickBackfillType(job, w.ctx, indexInfo.Unique) + reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique) if err != nil { break } @@ -715,7 +715,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } // pickBackfillType determines which backfill process will be used. -func pickBackfillType(job *model.Job, ctx context.Context, unique bool) (model.ReorgType, error) { +func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.ReorgType, error) { if job.ReorgMeta.ReorgTp != model.ReorgTypeNone { // The backfill task has been started. // Don't change the backfill type. @@ -839,7 +839,7 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { var reorgTp model.ReorgType - reorgTp, err = pickBackfillType(job, w.ctx, indexInfo.Unique) + reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique) if err != nil { return false, ver, err } From 64fe65f1637afd8a72882a809e52515a22109932 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 19 Apr 2023 21:31:32 +0800 Subject: [PATCH 03/12] fix TestSetTIDBDiskQuota --- sessionctx/variable/sysvar_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 339f6e1d637a5..7811869a891cc 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -770,6 +770,7 @@ func TestSetTIDBDiskQuota(t *testing.T) { vars.GlobalVarsAccessor = mock diskQuota := GetSysVar(TiDBDDLDiskQuota) var ( + mb int64 = 1024 * 1024 gb int64 = 1024 * 1024 * 1024 pb int64 = 1024 * 1024 * 1024 * 1024 * 1024 err error @@ -778,12 +779,12 @@ func TestSetTIDBDiskQuota(t *testing.T) { // Default 100 GB require.Equal(t, diskQuota.Value, strconv.FormatInt(100*gb, 10)) - // MinValue is 100 GB, set to 50 Gb is not allowed - err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(50*gb, 10)) + // MinValue is 1 GB, set to 500 MB is not allowed + err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(500*mb, 10)) require.NoError(t, err) val, err = mock.GetGlobalSysVar(TiDBDDLDiskQuota) require.NoError(t, err) - require.Equal(t, strconv.FormatInt(100*gb, 10), val) + require.Equal(t, strconv.FormatInt(1*gb, 10), val) // Set to 100 GB err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(100*gb, 10)) From fffc1fb66aa6a771e5b6c0ae0f220c9f43a1a2c8 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 19 Apr 2023 22:03:46 +0800 Subject: [PATCH 04/12] fix check available --- ddl/index.go | 28 +++++++++++++++------------- ddl/ingest/backend_mgr.go | 10 +++++----- ddl/ingest/mock.go | 4 ++-- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 811cc9c294604..8e30ae74c3d82 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -721,12 +721,16 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.R // Don't change the backfill type. return job.ReorgMeta.ReorgTp, nil } - if IsEnableFastReorg() { - if ingest.LitInitialized { - err := ingest.LitBackCtxMgr.CheckAvailable() - if err != nil { - return model.ReorgTypeNone, err - } + if !IsEnableFastReorg() { + job.ReorgMeta.ReorgTp = model.ReorgTypeTxn + return model.ReorgTypeTxn, nil + } + if ingest.LitInitialized { + available, err := ingest.LitBackCtxMgr.CheckAvailable() + if err != nil { + return model.ReorgTypeNone, err + } + if available { err = cleanupSortPath(job.ID) if err != nil { return model.ReorgTypeNone, err @@ -738,14 +742,12 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.R job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge return model.ReorgTypeLitMerge, nil } - // The lightning environment is unavailable, but we can still use the txn-merge backfill. - logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", - zap.Bool("lightning env initialized", ingest.LitInitialized)) - job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge - return model.ReorgTypeTxnMerge, nil } - job.ReorgMeta.ReorgTp = model.ReorgTypeTxn - return model.ReorgTypeTxn, nil + // The lightning environment is unavailable, but we can still use the txn-merge backfill. + logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", + zap.Bool("lightning env initialized", false)) + job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge + return model.ReorgTypeTxnMerge, nil } // cleanupSortPath is used to clean up the temp data of the previous jobs. diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index ba64403fbb5b5..f60360335aacf 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -30,7 +30,7 @@ import ( // BackendCtxMgr is used to manage the backend context. type BackendCtxMgr interface { - CheckAvailable() error + CheckAvailable() (bool, error) Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error) Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) @@ -57,19 +57,19 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr { } // CheckAvailable checks if the ingest backfill is available. -func (m *litBackendCtxMgr) CheckAvailable() error { +func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { // We only allow one task to use ingest at the same time, in order to limit the CPU usage. activeJobIDs := m.Keys() if len(activeJobIDs) > 0 { logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job", zap.Int64("job ID", activeJobIDs[0])) - return nil + return false, nil } if err := m.diskRoot.PreCheckUsage(); err != nil { logutil.BgLogger().Info("[ddl-ingest] ingest backfill is not available", zap.Error(err)) - return err + return false, err } - return nil + return true, nil } // Register creates a new backend and registers it to the backend context. diff --git a/ddl/ingest/mock.go b/ddl/ingest/mock.go index 5811ff70e6702..ad47ac02f122d 100644 --- a/ddl/ingest/mock.go +++ b/ddl/ingest/mock.go @@ -40,8 +40,8 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken } // CheckAvailable implements BackendCtxMgr.Available interface. -func (*MockBackendCtxMgr) CheckAvailable() error { - return nil +func (*MockBackendCtxMgr) CheckAvailable() (bool, error) { + return true, nil } // Register implements BackendCtxMgr.Register interface. From a09389f807828c56109278a6cde3c0673e631444 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 19 Apr 2023 22:16:48 +0800 Subject: [PATCH 05/12] fix TestAddIndexIngestCancel --- tests/realtikvtest/addindextest/integration_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index fd12f740b7445..5446b316ddb0a 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -395,8 +395,9 @@ func TestAddIndexIngestCancel(t *testing.T) { tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob) require.True(t, cancelled) dom.DDL().SetHook(defHook) - err := ingest.LitBackCtxMgr.CheckAvailable() + ok, err := ingest.LitBackCtxMgr.CheckAvailable() require.NoError(t, err) + require.True(t, ok) } func TestAddIndexSplitTableRanges(t *testing.T) { From ee40a9c742f5630acd3b6de5564e7eaa66aa8b39 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 20 Apr 2023 10:18:24 +0800 Subject: [PATCH 06/12] fix cannot read sort path --- ddl/index.go | 4 ++++ ddl/rollingback.go | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index 8e30ae74c3d82..5bffcac26451a 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -755,6 +755,10 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.R // there maybe some stale files in the sort path if TiDB is killed during the backfill process. func cleanupSortPath(currentJobID int64) error { sortPath := ingest.ConfigSortPath() + err := os.MkdirAll(sortPath, 0700) + if err != nil { + return errors.Trace(err) + } entries, err := os.ReadDir(sortPath) if err != nil { logutil.BgLogger().Warn("[ddl-ingest] cannot read sort path", zap.Error(err)) diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 379846092417c..596941b621c1a 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -88,7 +88,9 @@ func cleanupLocalIndexData(jobID int64) { sortPath := ingest.ConfigSortPath() f := filepath.Join(sortPath, ingest.EncodeBackendTag(jobID)) err := os.RemoveAll(f) - logutil.BgLogger().Error("[ddl-ingest] can not remove local index data", zap.Error(err)) + if err != nil { + logutil.BgLogger().Error("[ddl-ingest] can not remove local index data", zap.Error(err)) + } } // convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, From 0dfff1846178912b2a7ee0ce87ca52e3a12dde2d Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 20 Apr 2023 10:28:51 +0800 Subject: [PATCH 07/12] add ut for PreCheckUsage --- ddl/ingest/disk_root.go | 10 ++++++++-- ddl/ingest/mem_root_test.go | 6 ++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/ddl/ingest/disk_root.go b/ddl/ingest/disk_root.go index 3c50829d77cdf..3dd75fbb2dd2e 100644 --- a/ddl/ingest/disk_root.go +++ b/ddl/ingest/disk_root.go @@ -96,8 +96,14 @@ func (d *diskRootImpl) PreCheckUsage() error { if err != nil { return errors.Trace(err) } - if float64(sz.Available) < (1-capacityThreshold)*float64(sz.Capacity) { - return errors.Errorf("%s, please clean up the disk and retry", d.UsageInfo()) + if RiskOfDiskFull(sz.Available, sz.Capacity) { + sortPath := ConfigSortPath() + return errors.Errorf("sort path: %s, %s, please clean up the disk and retry", sortPath, d.UsageInfo()) } return nil } + +// RiskOfDiskFull checks if the disk has less than 10% space. +func RiskOfDiskFull(available, capacity uint64) bool { + return float64(available) < (1-capacityThreshold)*float64(capacity) +} diff --git a/ddl/ingest/mem_root_test.go b/ddl/ingest/mem_root_test.go index 43bc36d13455e..60ff34d1d2572 100644 --- a/ddl/ingest/mem_root_test.go +++ b/ddl/ingest/mem_root_test.go @@ -58,3 +58,9 @@ func TestMemoryRoot(t *testing.T) { memRoot.Consume(10) // Mix usage of tag and non-tag. require.Equal(t, int64(522), memRoot.CurrentUsage()) } + +func TestRiskOfDiskFull(t *testing.T) { + require.False(t, ingest.RiskOfDiskFull(11, 100)) + require.False(t, ingest.RiskOfDiskFull(10, 100)) + require.True(t, ingest.RiskOfDiskFull(9, 100)) +} From 025e1c6c6c44e88ec4c70bdb404cfbc10614ad13 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 20 Apr 2023 14:22:49 +0800 Subject: [PATCH 08/12] update bazel --- ddl/ingest/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel index e4a212954f2e8..77e885b39a5a9 100644 --- a/ddl/ingest/BUILD.bazel +++ b/ddl/ingest/BUILD.bazel @@ -58,7 +58,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 7, + shard_count = 8, deps = [ ":ingest", "//config", From 6c580fb34e229c2cdc1bc702294a859a4f5ebeee Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 20 Apr 2023 15:13:35 +0800 Subject: [PATCH 09/12] revert tidb_ddl_disk_quota --- sessionctx/variable/sysvar.go | 2 +- sessionctx/variable/sysvar_test.go | 6 +++--- sessionctx/variable/tidb_vars.go | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 56bb53ff85c53..8b2e86d86dc65 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2185,7 +2185,7 @@ var defaultSysVars = []*SysVar{ return nil }}, // This system var is set disk quota for lightning sort dir, from 100 GB to 1PB. - {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: MinTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { + {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: DefTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { return strconv.FormatUint(DDLDiskQuota.Load(), 10), nil }, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota)) diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 7811869a891cc..aa741024a1eeb 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -779,12 +779,12 @@ func TestSetTIDBDiskQuota(t *testing.T) { // Default 100 GB require.Equal(t, diskQuota.Value, strconv.FormatInt(100*gb, 10)) - // MinValue is 1 GB, set to 500 MB is not allowed - err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(500*mb, 10)) + // MinValue is 100 GB, set to 50 Gb is not allowed + err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(50*gb, 10)) require.NoError(t, err) val, err = mock.GetGlobalSysVar(TiDBDDLDiskQuota) require.NoError(t, err) - require.Equal(t, strconv.FormatInt(1*gb, 10), val) + require.Equal(t, strconv.FormatInt(100*gb, 10), val) // Set to 100 GB err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(100*gb, 10)) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 06c7b69550e3b..49bc83900d6bf 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1199,7 +1199,6 @@ const ( DefMemoryUsageAlarmKeepRecordNum = 5 DefTiDBEnableFastReorg = true DefTiDBDDLDiskQuota = 100 * 1024 * 1024 * 1024 // 100GB - MinTiDBDDLDiskQuota = 1 * 1024 * 1024 * 1024 // 1GB DefExecutorConcurrency = 5 DefTiDBEnableNonPreparedPlanCache = false DefTiDBNonPreparedPlanCacheSize = 100 From a755259fb65a7943767591d2679d066d8db7c2bc Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 20 Apr 2023 15:54:03 +0800 Subject: [PATCH 10/12] remove unused variable --- sessionctx/variable/sysvar_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index aa741024a1eeb..339f6e1d637a5 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -770,7 +770,6 @@ func TestSetTIDBDiskQuota(t *testing.T) { vars.GlobalVarsAccessor = mock diskQuota := GetSysVar(TiDBDDLDiskQuota) var ( - mb int64 = 1024 * 1024 gb int64 = 1024 * 1024 * 1024 pb int64 = 1024 * 1024 * 1024 * 1024 * 1024 err error From 31db2ffb4b28df321dc8d9798e4665b67c485b56 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 20 Apr 2023 17:16:51 +0800 Subject: [PATCH 11/12] add start up check for disk quota --- ddl/ingest/backend_mgr.go | 4 ++++ ddl/ingest/disk_root.go | 16 ++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index f60360335aacf..89f00d6358946 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -53,6 +53,10 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr { LitMemRoot = mgr.memRoot LitDiskRoot = mgr.diskRoot LitDiskRoot.UpdateUsage() + err := LitDiskRoot.StartupCheck() + if err != nil { + logutil.BgLogger().Warn("[ddl-ingest] ingest backfill may not be available", zap.Error(err)) + } return mgr } diff --git a/ddl/ingest/disk_root.go b/ddl/ingest/disk_root.go index 3dd75fbb2dd2e..2c95c7820131a 100644 --- a/ddl/ingest/disk_root.go +++ b/ddl/ingest/disk_root.go @@ -31,6 +31,7 @@ type DiskRoot interface { ShouldImport() bool UsageInfo() string PreCheckUsage() error + StartupCheck() error } const capacityThreshold = 0.9 @@ -103,6 +104,21 @@ func (d *diskRootImpl) PreCheckUsage() error { return nil } +// StartupCheck implements DiskRoot interface. +func (d *diskRootImpl) StartupCheck() error { + sz, err := lcom.GetStorageSize(d.path) + if err != nil { + return errors.Trace(err) + } + quota := variable.DDLDiskQuota.Load() + if sz.Available < quota { + sortPath := ConfigSortPath() + return errors.Errorf("the available disk space(%d) in %s should be greater than @@tidb_ddl_disk_quota(%d)", + sz.Available, sortPath, quota) + } + return nil +} + // RiskOfDiskFull checks if the disk has less than 10% space. func RiskOfDiskFull(available, capacity uint64) bool { return float64(available) < (1-capacityThreshold)*float64(capacity) From c43a21deb08d2c53f135d60fcac731ffa60bf2e2 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 20 Apr 2023 17:57:51 +0800 Subject: [PATCH 12/12] restore the global variable after testing --- ddl/backfilling_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ddl/backfilling_test.go b/ddl/backfilling_test.go index 973561dfe8435..62e627630cad6 100644 --- a/ddl/backfilling_test.go +++ b/ddl/backfilling_test.go @@ -50,6 +50,14 @@ func TestDoneTaskKeeper(t *testing.T) { } func TestPickBackfillType(t *testing.T) { + originMgr := ingest.LitBackCtxMgr + originInit := ingest.LitInitialized + originFastReorg := variable.EnableFastReorg.Load() + defer func() { + ingest.LitBackCtxMgr = originMgr + ingest.LitInitialized = originInit + variable.EnableFastReorg.Store(originFastReorg) + }() mockMgr := ingest.NewMockBackendCtxMgr( func() sessionctx.Context { return nil