Skip to content

Commit

Permalink
ddl: correct ingest row count for partition tables (#43585) (#43606)
Browse files Browse the repository at this point in the history
close #43586
  • Loading branch information
ti-chi-bot authored May 15, 2023
1 parent 2fe8a35 commit 8612338
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 31 deletions.
9 changes: 3 additions & 6 deletions ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,9 @@ func (b *ingestBackfillScheduler) setupWorkers() error {
return errors.Trace(errors.New("cannot get lightning backend"))
}
b.backendCtx = bc
if !b.distribute {
mgr, err := ingest.NewCheckpointManager(b.ctx, bc, b.sessPool, job.ID, b.reorgInfo.currElement.ID)
if err != nil {
return errors.Trace(err)
}
mgr := bc.GetCheckpointManager()
if mgr != nil {
mgr.Reset(b.tbl.GetPhysicalID())
b.checkpointMgr = mgr
}
copReqSenderPool, err := b.createCopReqSenderPool()
Expand Down Expand Up @@ -333,7 +331,6 @@ func (b *ingestBackfillScheduler) close(force bool) {
b.writerPool.ReleaseAndWait()
}
if b.checkpointMgr != nil {
b.checkpointMgr.Close()
// Get the latest status after all workers are closed so that the result is more accurate.
cnt, nextKey := b.checkpointMgr.Status()
b.resultCh <- &backfillResult{
Expand Down
9 changes: 8 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.R
}
// The lightning environment is unavailable, but we can still use the txn-merge backfill.
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process",
zap.Bool("lightning env initialized", false))
zap.Bool("lightning env initialized", ingest.LitInitialized))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
return model.ReorgTypeTxnMerge, nil
}
Expand Down Expand Up @@ -922,6 +922,13 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
err = tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
}
if bc.GetCheckpointManager() == nil {
mgr, err := ingest.NewCheckpointManager(w.ctx, bc, w.sessPool, job.ID, indexInfo.ID)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] create checkpoint manager failed", zap.Error(err))
}
bc.AttachCheckpointManager(mgr)
}
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 8,
shard_count = 9,
deps = [
":ingest",
"//config",
Expand Down
39 changes: 34 additions & 5 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,26 @@ type BackendCtx interface {
CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error
FinishImport(indexID int64, unique bool, tbl table.Table) error
ResetWorkers(jobID, indexID int64)
Flush(indexID int64, force bool) (flushed, imported bool, err error)
Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error)
Done() bool
SetDone()

AttachCheckpointManager(*CheckpointManager)
GetCheckpointManager() *CheckpointManager
}

// FlushMode is used to control how to flush.
type FlushMode byte

const (
// FlushModeAuto means flush when the memory table size reaches the threshold.
FlushModeAuto FlushMode = iota
// FlushModeForceLocal means flush all data to local storage.
FlushModeForceLocal
// FlushModeForceGlobal means import all data in local storage to global storage.
FlushModeForceGlobal
)

// litBackendCtx store a backend info for add index reorg task.
type litBackendCtx struct {
generic.SyncMap[int64, *engineInfo]
Expand All @@ -61,6 +76,7 @@ type litBackendCtx struct {

timeOfLastFlush atomicutil.Time
updateInterval time.Duration
checkpointMgr *CheckpointManager
}

// CollectRemoteDuplicateRows collects duplicate rows from remote TiKV.
Expand Down Expand Up @@ -126,14 +142,14 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl
}

// Flush checks the disk quota and imports the current key-values in engine to the storage.
func (bc *litBackendCtx) Flush(indexID int64, force bool) (flushed, imported bool, err error) {
func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) {
ei, exist := bc.Load(indexID)
if !exist {
logutil.BgLogger().Error(LitErrGetEngineFail, zap.Int64("index ID", indexID))
return false, false, dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found")
}

shouldFlush, shouldImport := bc.ShouldSync(force)
shouldFlush, shouldImport := bc.ShouldSync(mode)
if !shouldFlush {
return false, false, nil
}
Expand Down Expand Up @@ -164,10 +180,13 @@ func (bc *litBackendCtx) Flush(indexID int64, force bool) (flushed, imported boo
return true, true, nil
}

func (bc *litBackendCtx) ShouldSync(force bool) (shouldFlush bool, shouldImport bool) {
if force {
func (bc *litBackendCtx) ShouldSync(mode FlushMode) (shouldFlush bool, shouldImport bool) {
if mode == FlushModeForceGlobal {
return true, true
}
if mode == FlushModeForceLocal {
return true, false
}
bc.diskRoot.UpdateUsage()
shouldImport = bc.diskRoot.ShouldImport()
shouldFlush = shouldImport ||
Expand All @@ -184,3 +203,13 @@ func (bc *litBackendCtx) Done() bool {
func (bc *litBackendCtx) SetDone() {
bc.done = true
}

// AttachCheckpointManager attaches a checkpoint manager to the backend context.
func (bc *litBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) {
bc.checkpointMgr = mgr
}

// GetCheckpointManager returns the checkpoint manager attached to the backend context.
func (bc *litBackendCtx) GetCheckpointManager() *CheckpointManager {
return bc.checkpointMgr
}
3 changes: 3 additions & 0 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) {
}
bc.unregisterAll(jobID)
bc.backend.Close()
if bc.checkpointMgr != nil {
bc.checkpointMgr.Close()
}
m.memRoot.Release(StructSizeBackendCtx)
m.Delete(jobID)
m.memRoot.ReleaseWithTag(EncodeBackendTag(jobID))
Expand Down
49 changes: 37 additions & 12 deletions ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ import (

// CheckpointManager is a checkpoint manager implementation that used by non-distributed reorganization.
type CheckpointManager struct {
ctx context.Context
flushCtrl FlushController
sessPool *sess.Pool
jobID int64
indexID int64
ctx context.Context
flushCtrl FlushController
sessPool *sess.Pool
jobID int64
indexID int64
physicalID int64

// Derived and unchanged after the initialization.
instanceAddr string
Expand Down Expand Up @@ -75,7 +76,7 @@ type TaskCheckpoint struct {

// FlushController is an interface to control the flush of the checkpoint.
type FlushController interface {
Flush(indexID int64, force bool) (flushed, imported bool, err error)
Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error)
}

// NewCheckpointManager creates a new checkpoint manager.
Expand Down Expand Up @@ -104,6 +105,8 @@ func NewCheckpointManager(ctx context.Context, flushCtrl FlushController,
cm.updateCheckpointLoop()
cm.updaterWg.Done()
}()
logutil.BgLogger().Info("[ddl-ingest] create checkpoint manager",
zap.Int64("jobID", jobID), zap.Int64("indexID", indexID))
return cm, nil
}

Expand Down Expand Up @@ -163,7 +166,7 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error {
cp.currentKeys += added
s.mu.Unlock()

flushed, imported, err := s.flushCtrl.Flush(s.indexID, false)
flushed, imported, err := s.flushCtrl.Flush(s.indexID, FlushModeAuto)
if !flushed || err != nil {
return err
}
Expand Down Expand Up @@ -193,11 +196,8 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error {
func (s *CheckpointManager) Close() {
s.updaterExitCh <- struct{}{}
s.updaterWg.Wait()
for id, cp := range s.checkpoints {
s.localCnt += cp.totalKeys
s.globalCnt = s.localCnt
delete(s.checkpoints, id)
}
logutil.BgLogger().Info("[ddl-ingest] close checkpoint manager",
zap.Int64("jobID", s.jobID), zap.Int64("indexID", s.indexID))
}

// Sync syncs the checkpoint.
Expand All @@ -208,6 +208,28 @@ func (s *CheckpointManager) Sync() {
wg.Wait()
}

// Reset resets the checkpoint manager between two partitions.
func (s *CheckpointManager) Reset(newPhysicalID int64) {
if s.physicalID != 0 {
_, _, err := s.flushCtrl.Flush(s.indexID, FlushModeForceLocal)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] flush local engine failed", zap.Error(err))
}
}
s.mu.Lock()
if s.physicalID != newPhysicalID {
s.minKeySyncLocal = nil
s.minKeySyncGlobal = nil
s.minTaskIDSynced = 0
s.physicalID = newPhysicalID
for id, cp := range s.checkpoints {
s.localCnt += cp.totalKeys
delete(s.checkpoints, id)
}
}
s.mu.Unlock()
}

// JobReorgMeta is the metadata for a reorg job.
type JobReorgMeta struct {
Checkpoint *ReorgCheckpoint `json:"reorg_checkpoint"`
Expand All @@ -220,6 +242,7 @@ type ReorgCheckpoint struct {
GlobalSyncKey kv.Key `json:"global_sync_key"`
GlobalKeyCount int `json:"global_key_count"`
InstanceAddr string `json:"instance_addr"`
PhysicalID int64 `json:"physical_id"`
Version int64 `json:"version"`
}

Expand Down Expand Up @@ -260,6 +283,7 @@ func (s *CheckpointManager) resumeCheckpoint() error {
s.localDataIsValid = true
s.minKeySyncLocal = cp.LocalSyncKey
s.localCnt = cp.LocalKeyCount
s.physicalID = cp.PhysicalID
}
logutil.BgLogger().Info("[ddl-ingest] resume checkpoint",
zap.Int64("job ID", s.jobID), zap.Int64("index ID", s.indexID),
Expand Down Expand Up @@ -303,6 +327,7 @@ func (s *CheckpointManager) updateCheckpoint() error {
LocalKeyCount: currentLocalCnt,
GlobalKeyCount: currentGlobalCnt,
InstanceAddr: s.instanceAddr,
PhysicalID: s.physicalID,
Version: JobCheckpointVersionCurrent,
}
rawReorgMeta, err := json.Marshal(JobReorgMeta{Checkpoint: cp})
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,6 @@ type dummyFlushCtrl struct {
imported bool
}

func (d *dummyFlushCtrl) Flush(_ int64, _ bool) (bool, bool, error) {
func (d *dummyFlushCtrl) Flush(_ int64, _ ingest.FlushMode) (bool, bool, error) {
return true, d.imported, nil
}
21 changes: 21 additions & 0 deletions ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,24 @@ func TestIngestCopSenderErr(t *testing.T) {
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "txn-merge"), jobTp)
}

func TestIngestPartitionRowCount(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer injectMockBackendMgr(t, store)()

tk.MustExec(`create table t (a int, b int, c int as (b+10), d int as (b+c),
primary key (a) clustered) partition by range (a) (
partition p0 values less than (1),
partition p1 values less than (2),
partition p2 values less than MAXVALUE);`)
tk.MustExec("insert into t (a, b) values (0, 0), (1, 1), (2, 2);")
tk.MustExec("alter table t add index idx(d);")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
//nolint: forcetypeassert
rowCount := rows[0][7].(string)
require.Equal(t, "3", rowCount)
tk.MustExec("admin check table t;")
}
17 changes: 14 additions & 3 deletions ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func (m *MockBackendCtxMgr) Load(jobID int64) (BackendCtx, bool) {

// MockBackendCtx is a mock backend context.
type MockBackendCtx struct {
sessCtx sessionctx.Context
mu sync.Mutex
sessCtx sessionctx.Context
mu sync.Mutex
checkpointMgr *CheckpointManager
}

// Register implements BackendCtx.Register interface.
Expand Down Expand Up @@ -112,7 +113,7 @@ func (*MockBackendCtx) ResetWorkers(_, _ int64) {
}

// Flush implements BackendCtx.Flush interface.
func (*MockBackendCtx) Flush(_ int64, _ bool) (flushed bool, imported bool, err error) {
func (*MockBackendCtx) Flush(_ int64, _ FlushMode) (flushed bool, imported bool, err error) {
return false, false, nil
}

Expand All @@ -125,6 +126,16 @@ func (*MockBackendCtx) Done() bool {
func (*MockBackendCtx) SetDone() {
}

// AttachCheckpointManager attaches a checkpoint manager to the backend context.
func (m *MockBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) {
m.checkpointMgr = mgr
}

// GetCheckpointManager returns the checkpoint manager attached to the backend context.
func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager {
return m.checkpointMgr
}

// MockEngineInfo is a mock engine info.
type MockEngineInfo struct {
sessCtx sessionctx.Context
Expand Down
18 changes: 17 additions & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/ddl/ingest"
sess "github.com/pingcap/tidb/ddl/internal/session"
"github.com/pingcap/tidb/ddl/syncer"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -595,7 +597,7 @@ func updateDDLJob2Table(se *sess.Session, job *model.Job, updateRawArgs bool) er

// getDDLReorgHandle gets DDL reorg handle.
func getDDLReorgHandle(se *sess.Session, job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) {
sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id from mysql.tidb_ddl_reorg where job_id = %d", job.ID)
sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id, reorg_meta from mysql.tidb_ddl_reorg where job_id = %d", job.ID)
ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job.Type))
rows, err := se.Execute(ctx, sql, "get_handle")
if err != nil {
Expand All @@ -613,6 +615,20 @@ func getDDLReorgHandle(se *sess.Session, job *model.Job) (element *meta.Element,
startKey = rows[0].GetBytes(2)
endKey = rows[0].GetBytes(3)
physicalTableID = rows[0].GetInt64(4)
if !rows[0].IsNull(5) {
rawReorgMeta := rows[0].GetBytes(5)
var reorgMeta ingest.JobReorgMeta
err = json.Unmarshal(rawReorgMeta, &reorgMeta)
if err != nil {
return nil, nil, nil, 0, errors.Trace(err)
}
if cp := reorgMeta.Checkpoint; cp != nil {
logutil.BgLogger().Info("[ddl-ingest] resume physical table ID from checkpoint",
zap.Int64("job ID", job.ID), zap.Int64("old physical ID", physicalTableID),
zap.Int64("checkpoint physical ID", cp.PhysicalID))
physicalTableID = cp.PhysicalID
}
}
return
}

Expand Down
2 changes: 1 addition & 1 deletion ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (b *backfillSchedulerHandle) SplitSubtask(ctx context.Context, subtask []by
}
ingestScheduler.close(false)

_, _, err = b.bc.Flush(b.index.ID, true)
_, _, err = b.bc.Flush(b.index.ID, ingest.FlushModeForceGlobal)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, b.index, b.ptbl.Meta())
Expand Down

0 comments on commit 8612338

Please sign in to comment.