Skip to content

Commit

Permalink
ddl: flush index records for local distributed sort (#52641)
Browse files Browse the repository at this point in the history
close #52640
  • Loading branch information
tangenta committed May 10, 2024
1 parent f907137 commit 0442470
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 54 deletions.
117 changes: 80 additions & 37 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func NewAddIndexIngestPipeline(

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
ingestOp := NewIndexIngestOperator(ctx, copCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta)
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, totalRowCount, metricCounter)

operator.Compose[TableScanTask](srcOp, scanOp)
Expand Down Expand Up @@ -518,16 +518,18 @@ func NewWriteExternalStoreOperator(
writers = append(writers, writer)
}

return &indexIngestWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,
se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
return &indexIngestExternalWorker{
indexIngestBaseWorker: indexIngestBaseWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,
se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
},
}
})
return &WriteExternalStoreOperator{
Expand All @@ -552,6 +554,7 @@ type IndexIngestOperator struct {
func NewIndexIngestOperator(
ctx *OperatorCtx,
copCtx copr.CopContext,
backendCtx ingest.BackendCtx,
sessPool opSessPool,
tbl table.PhysicalTable,
indexes []table.Index,
Expand All @@ -577,24 +580,73 @@ func NewIndexIngestOperator(
writers = append(writers, writer)
}

return &indexIngestWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,
se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
indexIDs := make([]int64, len(indexes))
for i := 0; i < len(indexes); i++ {
indexIDs[i] = indexes[i].Meta().ID
}
return &indexIngestLocalWorker{
indexIngestBaseWorker: indexIngestBaseWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,

se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
},
indexIDs: indexIDs,
backendCtx: backendCtx,
}
})
return &IndexIngestOperator{
AsyncOperator: operator.NewAsyncOperator[IndexRecordChunk, IndexWriteResult](ctx, pool),
}
}

type indexIngestWorker struct {
type indexIngestExternalWorker struct {
indexIngestBaseWorker
}

func (w *indexIngestExternalWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) {
defer tidbutil.Recover(metrics.LblAddIndex, "indexIngestExternalWorkerRecover", func() {
w.ctx.onError(errors.New("met panic in indexIngestExternalWorker"))
}, false)
defer func() {
if rs.Chunk != nil {
w.srcChunkPool <- rs.Chunk
}
}()
w.indexIngestBaseWorker.HandleTask(rs, send)
}

type indexIngestLocalWorker struct {
indexIngestBaseWorker
indexIDs []int64
backendCtx ingest.BackendCtx
}

func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) {
defer tidbutil.Recover(metrics.LblAddIndex, "indexIngestLocalWorkerRecover", func() {
w.ctx.onError(errors.New("met panic in indexIngestLocalWorker"))
}, false)
defer func() {
if rs.Chunk != nil {
w.srcChunkPool <- rs.Chunk
}
}()
w.indexIngestBaseWorker.HandleTask(rs, send)
// needs to flush and import to avoid too much use of disk.
_, _, _, err := ingest.TryFlushAllIndexes(w.backendCtx, ingest.FlushModeAuto, w.indexIDs)
if err != nil {
w.ctx.onError(err)
return
}
}

type indexIngestBaseWorker struct {
ctx *OperatorCtx

tbl table.PhysicalTable
Expand All @@ -610,16 +662,7 @@ type indexIngestWorker struct {
srcChunkPool chan *chunk.Chunk
}

func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if rs.Chunk != nil {
w.srcChunkPool <- rs.Chunk
}
}()
defer tidbutil.Recover(metrics.LblAddIndex, "handleIndexIngtestTaskWithRecover", func() {
w.ctx.onError(errors.New("met panic in indexIngestWorker"))
}, false)

func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) {
failpoint.Inject("injectPanicForIndexIngest", func() {
panic("mock panic")
})
Expand All @@ -628,7 +671,7 @@ func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWrite
ID: rs.ID,
}
w.initSessCtx()
count, nextKey, err := w.WriteLocal(&rs)
count, nextKey, err := w.WriteChunk(&rs)
if err != nil {
w.ctx.onError(err)
return
Expand All @@ -646,7 +689,7 @@ func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWrite
send(result)
}

func (w *indexIngestWorker) initSessCtx() {
func (w *indexIngestBaseWorker) initSessCtx() {
if w.se == nil {
sessCtx, err := w.sessPool.Get()
if err != nil {
Expand All @@ -665,7 +708,7 @@ func (w *indexIngestWorker) initSessCtx() {
}
}

func (w *indexIngestWorker) Close() {
func (w *indexIngestBaseWorker) Close() {
for _, writer := range w.writers {
err := writer.Close(w.ctx)
if err != nil {
Expand All @@ -678,8 +721,8 @@ func (w *indexIngestWorker) Close() {
}
}

// WriteLocal will write index records to lightning engine.
func (w *indexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
// WriteChunk will write index records to lightning engine.
func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
failpoint.Inject("mockWriteLocalError", func(_ failpoint.Value) {
failpoint.Return(0, nil, errors.New("mock write local error"))
})
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"engine.go",
"engine_mgr.go",
"env.go",
"flush.go",
"mem_root.go",
"message.go",
"mock.go",
Expand Down
18 changes: 2 additions & 16 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error {
cp.currentKeys += added
s.mu.Unlock()

flushed, imported, err := s.tryFlushAllIndexes(FlushModeAuto)
flushed, imported, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeAuto, s.indexIDs)
if !flushed || err != nil {
return err
}
Expand All @@ -194,20 +194,6 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error {
return nil
}

func (s *CheckpointManager) tryFlushAllIndexes(mode FlushMode) (flushed, imported bool, err error) {
allFlushed := true
allImported := true
for _, idxID := range s.indexIDs {
flushed, imported, err := s.flushCtrl.Flush(idxID, mode)
if err != nil {
return false, false, err
}
allFlushed = allFlushed && flushed
allImported = allImported && imported
}
return allFlushed, allImported, nil
}

func (s *CheckpointManager) progressLocalSyncMinKey() {
for {
cp := s.checkpoints[s.minTaskIDSynced+1]
Expand All @@ -232,7 +218,7 @@ func (s *CheckpointManager) Close() {

// Sync syncs the checkpoint.
func (s *CheckpointManager) Sync() {
_, _, err := s.tryFlushAllIndexes(FlushModeForceLocal)
_, _, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeForceLocal, s.indexIDs)
if err != nil {
logutil.BgLogger().Warn("flush local engine failed", zap.String("category", "ddl-ingest"), zap.Error(err))
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/ddl/ingest/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ingest

// TryFlushAllIndexes tries to flush and import all indexes.
func TryFlushAllIndexes(flushCtrl FlushController, mode FlushMode, indexIDs []int64) (flushed, imported bool, failedIdxID int64, err error) {
allFlushed := true
allImported := true
for _, idxID := range indexIDs {
flushed, imported, err := flushCtrl.Flush(idxID, mode)
if err != nil {
return false, false, idxID, err
}
allFlushed = allFlushed && flushed
allImported = allImported && imported
}
return allFlushed, allImported, -1, nil
}
3 changes: 2 additions & 1 deletion tests/realtikvtest/addindextest3/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,14 @@ func TestBackfillOperators(t *testing.T) {
srcChkPool := make(chan *chunk.Chunk, regionCnt*2)
pTbl := tbl.(table.PhysicalTable)
index := tables.NewIndex(pTbl.GetPhysicalID(), tbl.Meta(), idxInfo)
mockBackendCtx := &ingest.MockBackendCtx{}
mockEngine := ingest.NewMockEngineInfo(nil)
mockEngine.SetHook(onWrite)

src := newTestSource(chunkResults...)
reorgMeta := ddl.NewDDLReorgMeta(tk.Session())
ingestOp := ddl.NewIndexIngestOperator(
opCtx, copCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, srcChkPool, 3, reorgMeta)
opCtx, copCtx, mockBackendCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, srcChkPool, 3, reorgMeta)
sink := newTestSink[ddl.IndexWriteResult]()

operator.Compose[ddl.IndexRecordChunk](src, ingestOp)
Expand Down

0 comments on commit 0442470

Please sign in to comment.