diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index a766142977243..16a760379b035 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -145,7 +145,7 @@ func NewAddIndexIngestPipeline( srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt) - ingestOp := NewIndexIngestOperator(ctx, copCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta) + ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, totalRowCount, metricCounter) operator.Compose[TableScanTask](srcOp, scanOp) @@ -518,16 +518,18 @@ func NewWriteExternalStoreOperator( writers = append(writers, writer) } - return &indexIngestWorker{ - ctx: ctx, - tbl: tbl, - indexes: indexes, - copCtx: copCtx, - se: nil, - sessPool: sessPool, - writers: writers, - srcChunkPool: srcChunkPool, - reorgMeta: reorgMeta, + return &indexIngestExternalWorker{ + indexIngestBaseWorker: indexIngestBaseWorker{ + ctx: ctx, + tbl: tbl, + indexes: indexes, + copCtx: copCtx, + se: nil, + sessPool: sessPool, + writers: writers, + srcChunkPool: srcChunkPool, + reorgMeta: reorgMeta, + }, } }) return &WriteExternalStoreOperator{ @@ -552,6 +554,7 @@ type IndexIngestOperator struct { func NewIndexIngestOperator( ctx *OperatorCtx, copCtx copr.CopContext, + backendCtx ingest.BackendCtx, sessPool opSessPool, tbl table.PhysicalTable, indexes []table.Index, @@ -577,16 +580,25 @@ func NewIndexIngestOperator( writers = append(writers, writer) } - return &indexIngestWorker{ - ctx: ctx, - tbl: tbl, - indexes: indexes, - copCtx: copCtx, - se: nil, - sessPool: sessPool, - writers: writers, - srcChunkPool: srcChunkPool, - reorgMeta: reorgMeta, + indexIDs := make([]int64, len(indexes)) + for i := 0; i < len(indexes); i++ { + indexIDs[i] = indexes[i].Meta().ID + } + return &indexIngestLocalWorker{ + indexIngestBaseWorker: indexIngestBaseWorker{ + ctx: ctx, + tbl: tbl, + indexes: indexes, + copCtx: copCtx, + + se: nil, + sessPool: sessPool, + writers: writers, + srcChunkPool: srcChunkPool, + reorgMeta: reorgMeta, + }, + indexIDs: indexIDs, + backendCtx: backendCtx, } }) return &IndexIngestOperator{ @@ -594,7 +606,47 @@ func NewIndexIngestOperator( } } -type indexIngestWorker struct { +type indexIngestExternalWorker struct { + indexIngestBaseWorker +} + +func (w *indexIngestExternalWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { + defer tidbutil.Recover(metrics.LblAddIndex, "indexIngestExternalWorkerRecover", func() { + w.ctx.onError(errors.New("met panic in indexIngestExternalWorker")) + }, false) + defer func() { + if rs.Chunk != nil { + w.srcChunkPool <- rs.Chunk + } + }() + w.indexIngestBaseWorker.HandleTask(rs, send) +} + +type indexIngestLocalWorker struct { + indexIngestBaseWorker + indexIDs []int64 + backendCtx ingest.BackendCtx +} + +func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { + defer tidbutil.Recover(metrics.LblAddIndex, "indexIngestLocalWorkerRecover", func() { + w.ctx.onError(errors.New("met panic in indexIngestLocalWorker")) + }, false) + defer func() { + if rs.Chunk != nil { + w.srcChunkPool <- rs.Chunk + } + }() + w.indexIngestBaseWorker.HandleTask(rs, send) + // needs to flush and import to avoid too much use of disk. + _, _, _, err := ingest.TryFlushAllIndexes(w.backendCtx, ingest.FlushModeAuto, w.indexIDs) + if err != nil { + w.ctx.onError(err) + return + } +} + +type indexIngestBaseWorker struct { ctx *OperatorCtx tbl table.PhysicalTable @@ -610,16 +662,7 @@ type indexIngestWorker struct { srcChunkPool chan *chunk.Chunk } -func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { - defer func() { - if rs.Chunk != nil { - w.srcChunkPool <- rs.Chunk - } - }() - defer tidbutil.Recover(metrics.LblAddIndex, "handleIndexIngtestTaskWithRecover", func() { - w.ctx.onError(errors.New("met panic in indexIngestWorker")) - }, false) - +func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { failpoint.Inject("injectPanicForIndexIngest", func() { panic("mock panic") }) @@ -628,7 +671,7 @@ func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWrite ID: rs.ID, } w.initSessCtx() - count, nextKey, err := w.WriteLocal(&rs) + count, nextKey, err := w.WriteChunk(&rs) if err != nil { w.ctx.onError(err) return @@ -646,7 +689,7 @@ func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWrite send(result) } -func (w *indexIngestWorker) initSessCtx() { +func (w *indexIngestBaseWorker) initSessCtx() { if w.se == nil { sessCtx, err := w.sessPool.Get() if err != nil { @@ -665,7 +708,7 @@ func (w *indexIngestWorker) initSessCtx() { } } -func (w *indexIngestWorker) Close() { +func (w *indexIngestBaseWorker) Close() { for _, writer := range w.writers { err := writer.Close(w.ctx) if err != nil { @@ -678,8 +721,8 @@ func (w *indexIngestWorker) Close() { } } -// WriteLocal will write index records to lightning engine. -func (w *indexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) { +// WriteChunk will write index records to lightning engine. +func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) { failpoint.Inject("mockWriteLocalError", func(_ failpoint.Value) { failpoint.Return(0, nil, errors.New("mock write local error")) }) diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index 979b3050bcd93..c8a9f76646349 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "engine.go", "engine_mgr.go", "env.go", + "flush.go", "mem_root.go", "message.go", "mock.go", diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index ff5b9d5b58106..4332f6cbe5118 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -174,7 +174,11 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { cp.currentKeys += added s.mu.Unlock() +<<<<<<< HEAD flushed, imported, err := s.flushCtrl.Flush(s.indexID, FlushModeAuto) +======= + flushed, imported, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeAuto, s.indexIDs) +>>>>>>> 924e7bf9f28 (ddl: flush index records for local distributed sort (#52641)) if !flushed || err != nil { return err } @@ -218,7 +222,11 @@ func (s *CheckpointManager) Close() { // Sync syncs the checkpoint. func (s *CheckpointManager) Sync() { +<<<<<<< HEAD _, _, err := s.flushCtrl.Flush(s.indexID, FlushModeForceLocal) +======= + _, _, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeForceLocal, s.indexIDs) +>>>>>>> 924e7bf9f28 (ddl: flush index records for local distributed sort (#52641)) if err != nil { logutil.BgLogger().Warn("flush local engine failed", zap.String("category", "ddl-ingest"), zap.Error(err)) } diff --git a/pkg/ddl/ingest/flush.go b/pkg/ddl/ingest/flush.go new file mode 100644 index 0000000000000..4d407f8733eb1 --- /dev/null +++ b/pkg/ddl/ingest/flush.go @@ -0,0 +1,30 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +// TryFlushAllIndexes tries to flush and import all indexes. +func TryFlushAllIndexes(flushCtrl FlushController, mode FlushMode, indexIDs []int64) (flushed, imported bool, failedIdxID int64, err error) { + allFlushed := true + allImported := true + for _, idxID := range indexIDs { + flushed, imported, err := flushCtrl.Flush(idxID, mode) + if err != nil { + return false, false, idxID, err + } + allFlushed = allFlushed && flushed + allImported = allImported && imported + } + return allFlushed, allImported, -1, nil +} diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index cbb26852b039c..3150624b1b32a 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -135,13 +135,14 @@ func TestBackfillOperators(t *testing.T) { srcChkPool := make(chan *chunk.Chunk, regionCnt*2) pTbl := tbl.(table.PhysicalTable) index := tables.NewIndex(pTbl.GetPhysicalID(), tbl.Meta(), idxInfo) + mockBackendCtx := &ingest.MockBackendCtx{} mockEngine := ingest.NewMockEngineInfo(nil) mockEngine.SetHook(onWrite) src := newTestSource(chunkResults...) reorgMeta := ddl.NewDDLReorgMeta(tk.Session()) ingestOp := ddl.NewIndexIngestOperator( - opCtx, copCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, srcChkPool, 3, reorgMeta) + opCtx, copCtx, mockBackendCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, srcChkPool, 3, reorgMeta) sink := newTestSink[ddl.IndexWriteResult]() operator.Compose[ddl.IndexRecordChunk](src, ingestOp)