diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index c7dd92508c40e..4c1cfe7d60bce 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -627,7 +627,14 @@ type indexIngestExternalWorker struct { } func (w *indexIngestExternalWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { - defer w.indexIngestBaseWorker.recoverFromPanic(rs) + defer tidbutil.Recover(metrics.LblAddIndex, "indexIngestExternalWorkerRecover", func() { + w.ctx.onError(errors.New("met panic in indexIngestExternalWorker")) + }, false) + defer func() { + if rs.Chunk != nil { + w.srcChunkPool <- rs.Chunk + } + }() w.indexIngestBaseWorker.HandleTask(rs, send) } @@ -638,7 +645,14 @@ type indexIngestLocalWorker struct { } func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { - defer w.indexIngestBaseWorker.recoverFromPanic(rs) + defer tidbutil.Recover(metrics.LblAddIndex, "indexIngestLocalWorkerRecover", func() { + w.ctx.onError(errors.New("met panic in indexIngestLocalWorker")) + }, false) + defer func() { + if rs.Chunk != nil { + w.srcChunkPool <- rs.Chunk + } + }() w.indexIngestBaseWorker.HandleTask(rs, send) // needs to flush and import to avoid too much use of disk. _, _, _, err := ingest.TryFlushAllIndexes(w.backendCtx, ingest.FlushModeAuto, w.indexIDs) @@ -664,15 +678,6 @@ type indexIngestBaseWorker struct { srcChunkPool chan *chunk.Chunk } -func (w *indexIngestBaseWorker) recoverFromPanic(rs IndexRecordChunk) { - tidbutil.Recover(metrics.LblAddIndex, "handleIndexIngtestTaskWithRecover", func() { - w.ctx.onError(errors.New("met panic in indexIngestBaseWorker")) - }, false) - if rs.Chunk != nil { - w.srcChunkPool <- rs.Chunk - } -} - func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { failpoint.Inject("injectPanicForIndexIngest", func() { panic("mock panic")