Skip to content

Commit

Permalink
ddl: check local file existence before resume checkpoint (#53072)
Browse files Browse the repository at this point in the history
close #53009
  • Loading branch information
lance6716 authored May 7, 2024
1 parent 65817ac commit b1b0995
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 206 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (s *indexWriteResultSink) flush() error {
})
for _, index := range s.indexes {
idxInfo := index.Meta()
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal)
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceFlushAndImport)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (b *ingestBackfillScheduler) close(force bool) {
b.writerPool.ReleaseAndWait()
}
if b.checkpointMgr != nil {
b.checkpointMgr.Sync()
b.checkpointMgr.Flush()
// Get the latest status after all workers are closed so that the result is more accurate.
cnt, nextKey := b.checkpointMgr.Status()
b.sendResult(&backfillResult{
Expand Down Expand Up @@ -585,7 +585,7 @@ func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool
cnt, nextKey := w.checkpointMgr.Status()
result.totalCount = cnt
result.nextKey = nextKey
result.err = w.checkpointMgr.UpdateCurrent(rs.ID, count)
result.err = w.checkpointMgr.UpdateWrittenKeys(rs.ID, count)
} else {
result.addedCount = count
result.scanCount = count
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *copReqSender) run() {
if !ok {
return
}
if p.checkpointMgr != nil && p.checkpointMgr.IsComplete(task.endKey) {
if p.checkpointMgr != nil && p.checkpointMgr.IsKeyProcessed(task.endKey) {
logutil.Logger(p.ctx).Info("checkpoint detected, skip a cop-request task",
zap.Int("task ID", task.id),
zap.String("task end key", hex.EncodeToString(task.endKey)))
Expand Down Expand Up @@ -163,7 +163,7 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
return err
}
if p.checkpointMgr != nil {
p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done)
p.checkpointMgr.UpdateTotalKeys(task.id, srcChk.NumRows(), done)
}
idxRs := IndexRecordChunk{ID: task.id, Chunk: srcChk, Done: done}
rate := float64(srcChk.MemoryUsage()) / 1024.0 / 1024.0 / time.Since(startTime).Seconds()
Expand Down
44 changes: 20 additions & 24 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ type BackendCtx interface {
type FlushMode byte

const (
// FlushModeAuto means flush when the memory table size reaches the threshold.
// FlushModeAuto means caller does not enforce any flush, the implementation can
// decide it.
FlushModeAuto FlushMode = iota
// FlushModeForceLocal means flush all data to local storage.
FlushModeForceLocal
// FlushModeForceLocalAndCheckDiskQuota means flush all data to local storage and check disk quota.
FlushModeForceLocalAndCheckDiskQuota
// FlushModeForceGlobal means import all data in local storage to global storage.
FlushModeForceGlobal
// FlushModeForceFlushNoImport means flush all data to local storage, but don't
// import the data to TiKV.
FlushModeForceFlushNoImport
// FlushModeForceFlushAndImport means flush and import all data to TiKV.
FlushModeForceFlushAndImport
)

// litBackendCtx store a backend info for add index reorg task.
Expand Down Expand Up @@ -183,7 +183,7 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported
return false, false, dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found")
}

shouldFlush, shouldImport := bc.ShouldSync(mode)
shouldFlush, shouldImport := bc.checkFlush(mode)
if !shouldFlush {
return false, false, nil
}
Expand Down Expand Up @@ -268,28 +268,24 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
// ForceSyncFlagForTest is a flag to force sync only for test.
var ForceSyncFlagForTest = false

func (bc *litBackendCtx) ShouldSync(mode FlushMode) (shouldFlush bool, shouldImport bool) {
if mode == FlushModeForceGlobal || ForceSyncFlagForTest {
func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImport bool) {
if mode == FlushModeForceFlushAndImport || ForceSyncFlagForTest {
return true, true
}
if mode == FlushModeForceLocal {
if mode == FlushModeForceFlushNoImport {
return true, false
}
bc.diskRoot.UpdateUsage()
shouldImport = bc.diskRoot.ShouldImport()
if mode == FlushModeForceLocalAndCheckDiskQuota {
shouldFlush = true
} else {
interval := bc.updateInterval
// This failpoint will be manually set through HTTP status port.
failpoint.Inject("mockSyncIntervalMs", func(val failpoint.Value) {
if v, ok := val.(int); ok {
interval = time.Duration(v) * time.Millisecond
}
})
shouldFlush = shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= interval
}
interval := bc.updateInterval
// This failpoint will be manually set through HTTP status port.
failpoint.Inject("mockSyncIntervalMs", func(val failpoint.Value) {
if v, ok := val.(int); ok {
interval = time.Duration(v) * time.Millisecond
}
})
shouldFlush = shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= interval
return shouldFlush, shouldImport
}

Expand Down
Loading

0 comments on commit b1b0995

Please sign in to comment.