From bb9db75367d7bdc94faa369410121a21b6cfaa98 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Thu, 28 Mar 2024 13:51:59 +0800 Subject: [PATCH 1/3] try fix --- pkg/ddl/backfilling_operators.go | 19 ++++++------ pkg/ddl/backfilling_scheduler.go | 7 ++++- pkg/ddl/index.go | 5 ++++ pkg/ddl/ingest/checkpoint.go | 18 ++--------- pkg/ddl/ingest/flush.go | 30 +++++++++++++++++++ pkg/sessionctx/variable/sysvar.go | 5 ++-- .../addindextest/add_index_test.go | 3 ++ 7 files changed, 59 insertions(+), 28 deletions(-) create mode 100644 pkg/ddl/ingest/flush.go diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index bec5126a4cfc6..ab5249790c7bc 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -801,18 +801,19 @@ func (s *indexWriteResultSink) flush() error { failpoint.Inject("mockFlushError", func(_ failpoint.Value) { failpoint.Return(errors.New("mock flush error")) }) + indexIDs := make([]int64, 0, len(s.indexes)) for _, index := range s.indexes { - idxInfo := index.Meta() - _, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal) - if err != nil { - if common.ErrFoundDuplicateKeys.Equal(err) { - err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta()) - return err - } - logutil.Logger(s.ctx).Error("flush error", - zap.String("category", "ddl"), zap.Error(err)) + indexIDs = append(indexIDs, index.Meta().ID) + } + _, _, failedID, err := ingest.TryFlushAllIndexes(s.backendCtx, ingest.FlushModeForceGlobal, indexIDs) + if err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + err = convertToKeyExistsErr(err, s.indexes[failedID].Meta(), s.tbl.Meta()) return err } + logutil.Logger(s.ctx).Error("flush error", + zap.String("category", "ddl"), zap.Error(err)) + return err } return nil } diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index f9f09529e1f45..22f8f0b589ec6 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -464,7 +464,7 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordCh worker, err := newAddIndexIngestWorker( b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID, reorgInfo.SchemaName, indexIDs, b.writerMaxID, - b.copReqSenderPool, sessCtx, b.checkpointMgr, b.distribute) + b.copReqSenderPool, sessCtx, bcCtx, b.checkpointMgr, b.distribute) if err != nil { // Return an error only if it is the first worker. if b.writerMaxID == 0 { @@ -574,6 +574,11 @@ func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool result.addedCount = count result.scanCount = count result.nextKey = nextKey + // needs to flush and import to avoid too much use of disk. + flushed, _, _, err := ingest.TryFlushAllIndexes(w.backendCtx, ingest.FlushModeAuto, w.indexIDs) + if !flushed || err != nil { + result.err = err + } } if ResultCounterForTest != nil && result.err == nil { ResultCounterForTest.Add(1) diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 68dbf356ba521..d19903c83ace3 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1701,8 +1701,10 @@ type addIndexIngestWorker struct { tbl table.PhysicalTable indexes []table.Index + indexIDs []int64 writers []ingest.Writer copReqSenderPool *copReqSenderPool + backendCtx ingest.BackendCtx checkpointMgr *ingest.CheckpointManager resultCh chan *backfillResult @@ -1722,6 +1724,7 @@ func newAddIndexIngestWorker( writerID int, copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context, + backendCtx ingest.BackendCtx, checkpointMgr *ingest.CheckpointManager, distribute bool, ) (*addIndexIngestWorker, error) { @@ -1745,11 +1748,13 @@ func newAddIndexIngestWorker( metricCounter: metrics.BackfillTotalCounter.WithLabelValues( metrics.GenerateReorgLabel("add_idx_rate", schemaName, t.Meta().Name.O)), tbl: t, + indexIDs: indexIDs, indexes: indexes, writers: writers, copReqSenderPool: copReqSenderPool, resultCh: resultCh, jobID: jobID, + backendCtx: backendCtx, checkpointMgr: checkpointMgr, distribute: distribute, }, nil diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 9be82954b39c5..67afd8dfd3a57 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -174,7 +174,7 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { cp.currentKeys += added s.mu.Unlock() - flushed, imported, err := s.tryFlushAllIndexes(FlushModeAuto) + flushed, imported, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeAuto, s.indexIDs) if !flushed || err != nil { return err } @@ -194,20 +194,6 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { return nil } -func (s *CheckpointManager) tryFlushAllIndexes(mode FlushMode) (flushed, imported bool, err error) { - allFlushed := true - allImported := true - for _, idxID := range s.indexIDs { - flushed, imported, err := s.flushCtrl.Flush(idxID, mode) - if err != nil { - return false, false, err - } - allFlushed = allFlushed && flushed - allImported = allImported && imported - } - return allFlushed, allImported, nil -} - func (s *CheckpointManager) progressLocalSyncMinKey() { for { cp := s.checkpoints[s.minTaskIDSynced+1] @@ -232,7 +218,7 @@ func (s *CheckpointManager) Close() { // Sync syncs the checkpoint. func (s *CheckpointManager) Sync() { - _, _, err := s.tryFlushAllIndexes(FlushModeForceLocal) + _, _, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeForceLocal, s.indexIDs) if err != nil { logutil.BgLogger().Warn("flush local engine failed", zap.String("category", "ddl-ingest"), zap.Error(err)) } diff --git a/pkg/ddl/ingest/flush.go b/pkg/ddl/ingest/flush.go new file mode 100644 index 0000000000000..4d407f8733eb1 --- /dev/null +++ b/pkg/ddl/ingest/flush.go @@ -0,0 +1,30 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +// TryFlushAllIndexes tries to flush and import all indexes. +func TryFlushAllIndexes(flushCtrl FlushController, mode FlushMode, indexIDs []int64) (flushed, imported bool, failedIdxID int64, err error) { + allFlushed := true + allImported := true + for _, idxID := range indexIDs { + flushed, imported, err := flushCtrl.Flush(idxID, mode) + if err != nil { + return false, false, idxID, err + } + allFlushed = allFlushed && flushed + allImported = allImported && imported + } + return allFlushed, allImported, -1, nil +} diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 190a275b8ff0c..972e9ebc44854 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -2458,10 +2458,11 @@ var defaultSysVars = []*SysVar{ return nil }}, // This system var is set disk quota for lightning sort dir, from 100 GB to 1PB. - {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: DefTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { + {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: 0, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { return strconv.FormatUint(DDLDiskQuota.Load(), 10), nil }, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { - DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota)) + DDLDiskQuota.Store(TidbOptUint64("10", DefTiDBDDLDiskQuota)) + logutil.BgLogger().Info("ywq test disk quota", zap.Any("val", DDLDiskQuota.Load())) return nil }}, // can't assign validate function here. Because validation function will run after GetGlobal function diff --git a/tests/realtikvtest/addindextest/add_index_test.go b/tests/realtikvtest/addindextest/add_index_test.go index bd234dbf4a429..19affbe9cd119 100644 --- a/tests/realtikvtest/addindextest/add_index_test.go +++ b/tests/realtikvtest/addindextest/add_index_test.go @@ -129,6 +129,9 @@ func TestIssue51162(t *testing.T) { PRIMARY KEY (col_47,col_46(2)) /*T![clustered_index] CLUSTERED */ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;`) + tk.MustExec(`set global tidb_ddl_disk_quota="10";`) + tk.MustExec(`set global tidb_enable_dist_task=on;`) + tk.MustExec(`INSERT INTO tl VALUES ('[\"1\"]',0,'1','[1]','Wxup81','1','10:14:20');`) From 25193206224eab554b896d23cce29f5d8409c4cc Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Thu, 28 Mar 2024 13:53:55 +0800 Subject: [PATCH 2/3] u build --- pkg/ddl/ingest/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index 09c2273707a8c..08509c81b52a3 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "engine.go", "engine_mgr.go", "env.go", + "flush.go", "mem_root.go", "message.go", "mock.go", From 31e6056012b2f078dfec4ff157b36ad3e3441138 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Thu, 28 Mar 2024 15:18:09 +0800 Subject: [PATCH 3/3] fix test --- pkg/ddl/backfilling_operators.go | 19 +++++++++---------- .../addindextest/add_index_test.go | 4 ++++ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index ab5249790c7bc..bec5126a4cfc6 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -801,19 +801,18 @@ func (s *indexWriteResultSink) flush() error { failpoint.Inject("mockFlushError", func(_ failpoint.Value) { failpoint.Return(errors.New("mock flush error")) }) - indexIDs := make([]int64, 0, len(s.indexes)) for _, index := range s.indexes { - indexIDs = append(indexIDs, index.Meta().ID) - } - _, _, failedID, err := ingest.TryFlushAllIndexes(s.backendCtx, ingest.FlushModeForceGlobal, indexIDs) - if err != nil { - if common.ErrFoundDuplicateKeys.Equal(err) { - err = convertToKeyExistsErr(err, s.indexes[failedID].Meta(), s.tbl.Meta()) + idxInfo := index.Meta() + _, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal) + if err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta()) + return err + } + logutil.Logger(s.ctx).Error("flush error", + zap.String("category", "ddl"), zap.Error(err)) return err } - logutil.Logger(s.ctx).Error("flush error", - zap.String("category", "ddl"), zap.Error(err)) - return err } return nil } diff --git a/tests/realtikvtest/addindextest/add_index_test.go b/tests/realtikvtest/addindextest/add_index_test.go index 19affbe9cd119..74639e7d6449a 100644 --- a/tests/realtikvtest/addindextest/add_index_test.go +++ b/tests/realtikvtest/addindextest/add_index_test.go @@ -137,6 +137,7 @@ func TestIssue51162(t *testing.T) { tk.MustExec("alter table tl add index idx_16(`col_48`,(cast(`col_45` as signed array)),`col_46`(5));") tk.MustExec("admin check table tl") + tk.MustExec(`set global tidb_enable_dist_task=off;`) } func TestAddUKWithSmallIntHandles(t *testing.T) { @@ -145,8 +146,11 @@ func TestAddUKWithSmallIntHandles(t *testing.T) { tk.MustExec("drop database if exists small;") tk.MustExec("create database small;") tk.MustExec("use small;") + tk.MustExec(`set global tidb_enable_dist_task=on;`) tk.MustExec(`set global tidb_ddl_enable_fast_reorg=1;`) tk.MustExec("create table t (a bigint, b int, primary key (a) clustered)") tk.MustExec("insert into t values (-9223372036854775808, 1),(-9223372036854775807, 1)") tk.MustContainErrMsg("alter table t add unique index uk(b)", "Duplicate entry '1' for key 't.uk'") + tk.MustExec(`set global tidb_enable_dist_task=off;`) + tk.MustContainErrMsg("alter table t add unique index uk(b)", "Duplicate entry '1' for key 't.uk'") }