diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index ecaec7e5d6b03..1c581ae737a21 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -296,11 +296,9 @@ func (b *ingestBackfillScheduler) setupWorkers() error { return errors.Trace(errors.New("cannot get lightning backend")) } b.backendCtx = bc - if !b.distribute { - mgr, err := ingest.NewCheckpointManager(b.ctx, bc, b.sessPool, job.ID, b.reorgInfo.currElement.ID) - if err != nil { - return errors.Trace(err) - } + mgr := bc.GetCheckpointManager() + if mgr != nil { + mgr.Reset(b.tbl.GetPhysicalID()) b.checkpointMgr = mgr } copReqSenderPool, err := b.createCopReqSenderPool() @@ -333,7 +331,6 @@ func (b *ingestBackfillScheduler) close(force bool) { b.writerPool.ReleaseAndWait() } if b.checkpointMgr != nil { - b.checkpointMgr.Close() // Get the latest status after all workers are closed so that the result is more accurate. cnt, nextKey := b.checkpointMgr.Status() b.resultCh <- &backfillResult{ diff --git a/ddl/index.go b/ddl/index.go index f188f02b82488..41d3f6c8e7f7b 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -746,7 +746,7 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.R } // The lightning environment is unavailable, but we can still use the txn-merge backfill. logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", - zap.Bool("lightning env initialized", false)) + zap.Bool("lightning env initialized", ingest.LitInitialized)) job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge return model.ReorgTypeTxnMerge, nil } @@ -922,6 +922,13 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, err = tryFallbackToTxnMerge(job, err) return false, ver, errors.Trace(err) } + if bc.GetCheckpointManager() == nil { + mgr, err := ingest.NewCheckpointManager(w.ctx, bc, w.sessPool, job.ID, indexInfo.ID) + if err != nil { + logutil.BgLogger().Warn("[ddl-ingest] create checkpoint manager failed", zap.Error(err)) + } + bc.AttachCheckpointManager(mgr) + } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { ingest.LitBackCtxMgr.Unregister(job.ID) diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel index 77e885b39a5a9..da2afc87b2ac7 100644 --- a/ddl/ingest/BUILD.bazel +++ b/ddl/ingest/BUILD.bazel @@ -58,7 +58,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 8, + shard_count = 9, deps = [ ":ingest", "//config", diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index 5d018336a6a72..ec654e58f6be6 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -41,11 +41,26 @@ type BackendCtx interface { CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error FinishImport(indexID int64, unique bool, tbl table.Table) error ResetWorkers(jobID, indexID int64) - Flush(indexID int64, force bool) (flushed, imported bool, err error) + Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) Done() bool SetDone() + + AttachCheckpointManager(*CheckpointManager) + GetCheckpointManager() *CheckpointManager } +// FlushMode is used to control how to flush. +type FlushMode byte + +const ( + // FlushModeAuto means flush when the memory table size reaches the threshold. + FlushModeAuto FlushMode = iota + // FlushModeForceLocal means flush all data to local storage. + FlushModeForceLocal + // FlushModeForceGlobal means import all data in local storage to global storage. + FlushModeForceGlobal +) + // litBackendCtx store a backend info for add index reorg task. type litBackendCtx struct { generic.SyncMap[int64, *engineInfo] @@ -61,6 +76,7 @@ type litBackendCtx struct { timeOfLastFlush atomicutil.Time updateInterval time.Duration + checkpointMgr *CheckpointManager } // CollectRemoteDuplicateRows collects duplicate rows from remote TiKV. @@ -126,14 +142,14 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl } // Flush checks the disk quota and imports the current key-values in engine to the storage. -func (bc *litBackendCtx) Flush(indexID int64, force bool) (flushed, imported bool, err error) { +func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) { ei, exist := bc.Load(indexID) if !exist { logutil.BgLogger().Error(LitErrGetEngineFail, zap.Int64("index ID", indexID)) return false, false, dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found") } - shouldFlush, shouldImport := bc.ShouldSync(force) + shouldFlush, shouldImport := bc.ShouldSync(mode) if !shouldFlush { return false, false, nil } @@ -164,10 +180,13 @@ func (bc *litBackendCtx) Flush(indexID int64, force bool) (flushed, imported boo return true, true, nil } -func (bc *litBackendCtx) ShouldSync(force bool) (shouldFlush bool, shouldImport bool) { - if force { +func (bc *litBackendCtx) ShouldSync(mode FlushMode) (shouldFlush bool, shouldImport bool) { + if mode == FlushModeForceGlobal { return true, true } + if mode == FlushModeForceLocal { + return true, false + } bc.diskRoot.UpdateUsage() shouldImport = bc.diskRoot.ShouldImport() shouldFlush = shouldImport || @@ -184,3 +203,13 @@ func (bc *litBackendCtx) Done() bool { func (bc *litBackendCtx) SetDone() { bc.done = true } + +// AttachCheckpointManager attaches a checkpoint manager to the backend context. +func (bc *litBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) { + bc.checkpointMgr = mgr +} + +// GetCheckpointManager returns the checkpoint manager attached to the backend context. +func (bc *litBackendCtx) GetCheckpointManager() *CheckpointManager { + return bc.checkpointMgr +} diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index 89f00d6358946..6a5631d66990b 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -150,6 +150,9 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) { } bc.unregisterAll(jobID) bc.backend.Close() + if bc.checkpointMgr != nil { + bc.checkpointMgr.Close() + } m.memRoot.Release(StructSizeBackendCtx) m.Delete(jobID) m.memRoot.ReleaseWithTag(EncodeBackendTag(jobID)) diff --git a/ddl/ingest/checkpoint.go b/ddl/ingest/checkpoint.go index 02c8b6c47ad26..3d6f5c40f125b 100644 --- a/ddl/ingest/checkpoint.go +++ b/ddl/ingest/checkpoint.go @@ -35,11 +35,12 @@ import ( // CheckpointManager is a checkpoint manager implementation that used by non-distributed reorganization. type CheckpointManager struct { - ctx context.Context - flushCtrl FlushController - sessPool *sess.Pool - jobID int64 - indexID int64 + ctx context.Context + flushCtrl FlushController + sessPool *sess.Pool + jobID int64 + indexID int64 + physicalID int64 // Derived and unchanged after the initialization. instanceAddr string @@ -75,7 +76,7 @@ type TaskCheckpoint struct { // FlushController is an interface to control the flush of the checkpoint. type FlushController interface { - Flush(indexID int64, force bool) (flushed, imported bool, err error) + Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) } // NewCheckpointManager creates a new checkpoint manager. @@ -104,6 +105,8 @@ func NewCheckpointManager(ctx context.Context, flushCtrl FlushController, cm.updateCheckpointLoop() cm.updaterWg.Done() }() + logutil.BgLogger().Info("[ddl-ingest] create checkpoint manager", + zap.Int64("jobID", jobID), zap.Int64("indexID", indexID)) return cm, nil } @@ -163,7 +166,7 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { cp.currentKeys += added s.mu.Unlock() - flushed, imported, err := s.flushCtrl.Flush(s.indexID, false) + flushed, imported, err := s.flushCtrl.Flush(s.indexID, FlushModeAuto) if !flushed || err != nil { return err } @@ -193,11 +196,8 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { func (s *CheckpointManager) Close() { s.updaterExitCh <- struct{}{} s.updaterWg.Wait() - for id, cp := range s.checkpoints { - s.localCnt += cp.totalKeys - s.globalCnt = s.localCnt - delete(s.checkpoints, id) - } + logutil.BgLogger().Info("[ddl-ingest] close checkpoint manager", + zap.Int64("jobID", s.jobID), zap.Int64("indexID", s.indexID)) } // Sync syncs the checkpoint. @@ -208,6 +208,28 @@ func (s *CheckpointManager) Sync() { wg.Wait() } +// Reset resets the checkpoint manager between two partitions. +func (s *CheckpointManager) Reset(newPhysicalID int64) { + if s.physicalID != 0 { + _, _, err := s.flushCtrl.Flush(s.indexID, FlushModeForceLocal) + if err != nil { + logutil.BgLogger().Warn("[ddl-ingest] flush local engine failed", zap.Error(err)) + } + } + s.mu.Lock() + if s.physicalID != newPhysicalID { + s.minKeySyncLocal = nil + s.minKeySyncGlobal = nil + s.minTaskIDSynced = 0 + s.physicalID = newPhysicalID + for id, cp := range s.checkpoints { + s.localCnt += cp.totalKeys + delete(s.checkpoints, id) + } + } + s.mu.Unlock() +} + // JobReorgMeta is the metadata for a reorg job. type JobReorgMeta struct { Checkpoint *ReorgCheckpoint `json:"reorg_checkpoint"` @@ -220,6 +242,7 @@ type ReorgCheckpoint struct { GlobalSyncKey kv.Key `json:"global_sync_key"` GlobalKeyCount int `json:"global_key_count"` InstanceAddr string `json:"instance_addr"` + PhysicalID int64 `json:"physical_id"` Version int64 `json:"version"` } @@ -260,6 +283,7 @@ func (s *CheckpointManager) resumeCheckpoint() error { s.localDataIsValid = true s.minKeySyncLocal = cp.LocalSyncKey s.localCnt = cp.LocalKeyCount + s.physicalID = cp.PhysicalID } logutil.BgLogger().Info("[ddl-ingest] resume checkpoint", zap.Int64("job ID", s.jobID), zap.Int64("index ID", s.indexID), @@ -303,6 +327,7 @@ func (s *CheckpointManager) updateCheckpoint() error { LocalKeyCount: currentLocalCnt, GlobalKeyCount: currentGlobalCnt, InstanceAddr: s.instanceAddr, + PhysicalID: s.physicalID, Version: JobCheckpointVersionCurrent, } rawReorgMeta, err := json.Marshal(JobReorgMeta{Checkpoint: cp}) diff --git a/ddl/ingest/checkpoint_test.go b/ddl/ingest/checkpoint_test.go index 71a554c30d49e..1e3eff1df5940 100644 --- a/ddl/ingest/checkpoint_test.go +++ b/ddl/ingest/checkpoint_test.go @@ -154,6 +154,6 @@ type dummyFlushCtrl struct { imported bool } -func (d *dummyFlushCtrl) Flush(_ int64, _ bool) (bool, bool, error) { +func (d *dummyFlushCtrl) Flush(_ int64, _ ingest.FlushMode) (bool, bool, error) { return true, d.imported, nil } diff --git a/ddl/ingest/integration_test.go b/ddl/ingest/integration_test.go index d30eaabc393a8..4800ff62e0226 100644 --- a/ddl/ingest/integration_test.go +++ b/ddl/ingest/integration_test.go @@ -114,3 +114,24 @@ func TestIngestCopSenderErr(t *testing.T) { jobTp := rows[0][3].(string) require.True(t, strings.Contains(jobTp, "txn-merge"), jobTp) } + +func TestIngestPartitionRowCount(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + defer injectMockBackendMgr(t, store)() + + tk.MustExec(`create table t (a int, b int, c int as (b+10), d int as (b+c), + primary key (a) clustered) partition by range (a) ( + partition p0 values less than (1), + partition p1 values less than (2), + partition p2 values less than MAXVALUE);`) + tk.MustExec("insert into t (a, b) values (0, 0), (1, 1), (2, 2);") + tk.MustExec("alter table t add index idx(d);") + rows := tk.MustQuery("admin show ddl jobs 1;").Rows() + require.Len(t, rows, 1) + //nolint: forcetypeassert + rowCount := rows[0][7].(string) + require.Equal(t, "3", rowCount) + tk.MustExec("admin check table t;") +} diff --git a/ddl/ingest/mock.go b/ddl/ingest/mock.go index 6a510018969c8..4b13181108ddd 100644 --- a/ddl/ingest/mock.go +++ b/ddl/ingest/mock.go @@ -80,8 +80,9 @@ func (m *MockBackendCtxMgr) Load(jobID int64) (BackendCtx, bool) { // MockBackendCtx is a mock backend context. type MockBackendCtx struct { - sessCtx sessionctx.Context - mu sync.Mutex + sessCtx sessionctx.Context + mu sync.Mutex + checkpointMgr *CheckpointManager } // Register implements BackendCtx.Register interface. @@ -112,7 +113,7 @@ func (*MockBackendCtx) ResetWorkers(_, _ int64) { } // Flush implements BackendCtx.Flush interface. -func (*MockBackendCtx) Flush(_ int64, _ bool) (flushed bool, imported bool, err error) { +func (*MockBackendCtx) Flush(_ int64, _ FlushMode) (flushed bool, imported bool, err error) { return false, false, nil } @@ -125,6 +126,16 @@ func (*MockBackendCtx) Done() bool { func (*MockBackendCtx) SetDone() { } +// AttachCheckpointManager attaches a checkpoint manager to the backend context. +func (m *MockBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) { + m.checkpointMgr = mgr +} + +// GetCheckpointManager returns the checkpoint manager attached to the backend context. +func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager { + return m.checkpointMgr +} + // MockEngineInfo is a mock engine info. type MockEngineInfo struct { sessCtx sessionctx.Context diff --git a/ddl/job_table.go b/ddl/job_table.go index c3e8a1c72117f..33722301a18c6 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -17,6 +17,7 @@ package ddl import ( "bytes" "context" + "encoding/json" "fmt" "strconv" "strings" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/ddl/ingest" sess "github.com/pingcap/tidb/ddl/internal/session" "github.com/pingcap/tidb/ddl/syncer" "github.com/pingcap/tidb/kv" @@ -595,7 +597,7 @@ func updateDDLJob2Table(se *sess.Session, job *model.Job, updateRawArgs bool) er // getDDLReorgHandle gets DDL reorg handle. func getDDLReorgHandle(se *sess.Session, job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) { - sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id from mysql.tidb_ddl_reorg where job_id = %d", job.ID) + sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id, reorg_meta from mysql.tidb_ddl_reorg where job_id = %d", job.ID) ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job.Type)) rows, err := se.Execute(ctx, sql, "get_handle") if err != nil { @@ -613,6 +615,20 @@ func getDDLReorgHandle(se *sess.Session, job *model.Job) (element *meta.Element, startKey = rows[0].GetBytes(2) endKey = rows[0].GetBytes(3) physicalTableID = rows[0].GetInt64(4) + if !rows[0].IsNull(5) { + rawReorgMeta := rows[0].GetBytes(5) + var reorgMeta ingest.JobReorgMeta + err = json.Unmarshal(rawReorgMeta, &reorgMeta) + if err != nil { + return nil, nil, nil, 0, errors.Trace(err) + } + if cp := reorgMeta.Checkpoint; cp != nil { + logutil.BgLogger().Info("[ddl-ingest] resume physical table ID from checkpoint", + zap.Int64("job ID", job.ID), zap.Int64("old physical ID", physicalTableID), + zap.Int64("checkpoint physical ID", cp.PhysicalID)) + physicalTableID = cp.PhysicalID + } + } return } diff --git a/ddl/scheduler.go b/ddl/scheduler.go index 330cb36b97b45..aec4704107a51 100644 --- a/ddl/scheduler.go +++ b/ddl/scheduler.go @@ -199,7 +199,7 @@ func (b *backfillSchedulerHandle) SplitSubtask(ctx context.Context, subtask []by } ingestScheduler.close(false) - _, _, err = b.bc.Flush(b.index.ID, true) + _, _, err = b.bc.Flush(b.index.ID, ingest.FlushModeForceGlobal) if err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { err = convertToKeyExistsErr(err, b.index, b.ptbl.Meta())