Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support distributed adding index for normal table #43289

Merged
merged 39 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
18aa0dd
done
wjhuang2016 Apr 21, 2023
6e633ba
fix
wjhuang2016 Apr 21, 2023
0113673
Merge branch 'master' of github.com:pingcap/tidb into normal_table
wjhuang2016 Apr 24, 2023
8896e88
Merge branch 'master' of github.com:pingcap/tidb into normal_table
wjhuang2016 Apr 24, 2023
63b2f16
merge
wjhuang2016 May 5, 2023
84ccbe3
refine
wjhuang2016 May 5, 2023
b892418
fix
wjhuang2016 May 6, 2023
94cb484
refine
wjhuang2016 May 6, 2023
2f54dd6
refine
wjhuang2016 May 8, 2023
89d798a
refine
wjhuang2016 May 8, 2023
0741cba
refine
wjhuang2016 May 8, 2023
2a12411
try lock
wjhuang2016 May 8, 2023
202133c
fix
wjhuang2016 May 8, 2023
0e6b3a1
done
wjhuang2016 May 8, 2023
b551f37
done
wjhuang2016 May 8, 2023
19569c8
done
wjhuang2016 May 8, 2023
1aeb4b7
fix
wjhuang2016 May 8, 2023
426ee34
merge
wjhuang2016 May 15, 2023
593c50a
done
wjhuang2016 May 15, 2023
cb84597
fmt
wjhuang2016 May 16, 2023
41c2395
bazel
wjhuang2016 May 16, 2023
ff9d619
lint
wjhuang2016 May 16, 2023
aff37b3
Merge branch 'master' of github.com:pingcap/tidb into normal_table
wjhuang2016 May 17, 2023
1d091d2
done
wjhuang2016 May 17, 2023
6cb3e82
fix lint
wjhuang2016 May 17, 2023
d67df9f
clean
wjhuang2016 May 17, 2023
2393cfc
lint
wjhuang2016 May 17, 2023
90919fa
merge master
wjhuang2016 May 18, 2023
352298b
fix
wjhuang2016 May 18, 2023
3f17c9e
fix
wjhuang2016 May 18, 2023
52ebfa0
fix
wjhuang2016 May 18, 2023
3a65cdc
Merge branch 'master' of github.com:pingcap/tidb into normal_table
wjhuang2016 May 18, 2023
610bb5f
fix
wjhuang2016 May 18, 2023
080aca1
fix
wjhuang2016 May 21, 2023
2270efd
fix tet
wjhuang2016 May 21, 2023
508c25c
refine
wjhuang2016 May 22, 2023
c9e3b1e
refine
wjhuang2016 May 22, 2023
b4ffb5b
refine
wjhuang2016 May 22, 2023
3729433
fix
wjhuang2016 May 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/rangetask",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {

scheduler.RegisterSchedulerConstructor("backfill",
func(taskMeta []byte, step int64) (scheduler.Scheduler, error) {
return NewBackfillSchedulerHandle(taskMeta, d)
return NewBackfillSchedulerHandle(taskMeta, d, step == proto.StepTwo)
})

dispatcher.RegisterTaskFlowHandle(BackfillTaskType, NewLitBackfillFlowHandle(d))
Expand Down
112 changes: 90 additions & 22 deletions ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
package ddl

import (
"bytes"
"context"
"encoding/json"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/disttask/framework/dispatcher"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
"github.com/tikv/client-go/v2/tikv"
)

type litBackfillFlowHandle struct {
Expand All @@ -39,11 +44,6 @@ func NewLitBackfillFlowHandle(d DDL) dispatcher.TaskFlowHandle {

// ProcessNormalFlow processes the normal flow.
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State != proto.TaskStatePending {
// This flow has only one step, finish task when it is not pending
return nil, nil
}

var globalTaskMeta BackfillGlobalMeta
if err = json.Unmarshal(gTask.Meta, &globalTaskMeta); err != nil {
return nil, err
Expand All @@ -56,36 +56,104 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatche

job := &globalTaskMeta.Job
var tblInfo *model.TableInfo
err = kv.RunInNewTxn(d.ctx, d.store, false, func(ctx context.Context, txn kv.Transaction) error {
err = kv.RunInNewTxn(d.ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID)
return err
})
if err != nil {
return nil, err
}

var subTaskMetas [][]byte
if tblInfo.Partition == nil {
return nil, errors.New("Non-partition table not supported yet")
}

defs := tblInfo.Partition.Definitions
physicalIDs := make([]int64, len(defs))
for i := range defs {
physicalIDs[i] = defs[i].ID
}

subTaskMetas := make([][]byte, 0, len(physicalIDs))
for _, physicalID := range physicalIDs {
subTaskMeta := &BackfillSubTaskMeta{
PhysicalTableID: physicalID,
switch gTask.Step {
case proto.StepOne:
serverNodes, err := dispatcher.GenerateSchedulerNodes(d.ctx)
if err != nil {
return nil, err
}
subTaskMetas = make([][]byte, 0, len(serverNodes))
dummyMeta := &BackfillSubTaskMeta{}
metaBytes, err := json.Marshal(dummyMeta)
if err != nil {
return nil, err
}
for range serverNodes {
subTaskMetas = append(subTaskMetas, metaBytes)
}
gTask.Step = proto.StepTwo
return subTaskMetas, nil
case proto.StepTwo:
return nil, nil
default:
}

metaBytes, err := json.Marshal(subTaskMeta)
tbl, err := getTable(d.store, job.SchemaID, tblInfo)
if err != nil {
return nil, err
}
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
}
startKey, endKey, err := getTableRange(d.jobContext(job.ID), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority)
if err != nil {
return nil, errors.Trace(err)
}
regionCache := d.store.(helper.Storage).GetRegionCache()
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey)
if err != nil {
return nil, err
}

subTaskMetas = append(subTaskMetas, metaBytes)
subTaskMetas = make([][]byte, 0, 100)
regionBatch := 20
sort.Slice(recordRegionMetas, func(i, j int) bool {
return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0
})
for i := 0; i < len(recordRegionMetas); i += regionBatch {
end := i + regionBatch
if end > len(recordRegionMetas) {
end = len(recordRegionMetas)
}
batch := recordRegionMetas[i:end]
subTaskMeta := &BackfillSubTaskMeta{StartKey: batch[0].StartKey(), EndKey: batch[len(batch)-1].EndKey()}
if i == 0 {
subTaskMeta.StartKey = startKey
}
if end == len(recordRegionMetas) {
subTaskMeta.EndKey = endKey
}
metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err
}
subTaskMetas = append(subTaskMetas, metaBytes)
}
} else {
if gTask.State != proto.TaskStatePending {
// This flow for partition table has only one step, finish task when it is not pending
return nil, nil
}

defs := tblInfo.Partition.Definitions
physicalIDs := make([]int64, len(defs))
for i := range defs {
physicalIDs[i] = defs[i].ID
}

subTaskMetas = make([][]byte, 0, len(physicalIDs))
for _, physicalID := range physicalIDs {
subTaskMeta := &BackfillSubTaskMeta{
PhysicalTableID: physicalID,
}

metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err
}

subTaskMetas = append(subTaskMetas, metaBytes)
}
}

gTask.Step = proto.StepOne
Expand Down
8 changes: 4 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
job.SnapshotVer = 0
job.SchemaState = model.StateWriteReorganization

if job.MultiSchemaInfo == nil && tblInfo.GetPartitionInfo() != nil {
if job.MultiSchemaInfo == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also remove this @tangenta

initDistReorg(job.ReorgMeta)
}
case model.StateWriteReorganization:
Expand Down Expand Up @@ -736,7 +736,7 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.R
if err != nil {
return model.ReorgTypeNone, err
}
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil)
if err != nil {
return model.ReorgTypeNone, err
}
Expand Down Expand Up @@ -905,7 +905,7 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
if ok && bc.Done() {
return true, 0, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID)
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, nil)
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1811,7 +1811,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
return errors.New("unexpected error, can't find index info")
}
if indexInfo.Unique {
bc, err := ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, reorgInfo.ID)
bc, err := ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, reorgInfo.ID, nil)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ go_library(
"//util/size",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
55 changes: 53 additions & 2 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ingest

import (
"context"
"fmt"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -57,6 +60,8 @@ const (
FlushModeAuto FlushMode = iota
// FlushModeForceLocal means flush all data to local storage.
FlushModeForceLocal
// FlushModeForceLocalAndCheckDiskQuota means flush all data to local storage and check disk quota.
FlushModeForceLocalAndCheckDiskQuota
// FlushModeForceGlobal means import all data in local storage to global storage.
FlushModeForceGlobal
)
Expand All @@ -77,6 +82,7 @@ type litBackendCtx struct {
timeOfLastFlush atomicutil.Time
updateInterval time.Duration
checkpointMgr *CheckpointManager
etcdClient *clientv3.Client
}

// CollectRemoteDuplicateRows collects duplicate rows from remote TiKV.
Expand Down Expand Up @@ -141,6 +147,24 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl
return nil
}

func acquireLock(ctx context.Context, se *concurrency.Session, key string) error {
mu := concurrency.NewMutex(se, key)
err := mu.Lock(ctx)
if err != nil {
return err
}
return nil
}

func releaseLock(ctx context.Context, se *concurrency.Session, key string) error {
mu := concurrency.NewMutex(se, key)
err := mu.Unlock(ctx)
if err != nil {
return err
}
return nil
}

// Flush checks the disk quota and imports the current key-values in engine to the storage.
func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) {
ei, exist := bc.Load(indexID)
Expand Down Expand Up @@ -169,6 +193,29 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported
if !shouldImport {
return true, false, nil
}

// Use distributed lock if run in distributed mode).
if bc.etcdClient != nil {
distLockKey := fmt.Sprintf("/tidb/distributeLock/%d/%d", bc.jobID, indexID)
se, _ := concurrency.NewSession(bc.etcdClient)
err := acquireLock(bc.ctx, se, distLockKey)
if err != nil {
return true, false, err
}
logutil.BgLogger().Info("[ddl] acquire lock success")
tangenta marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
err = releaseLock(bc.ctx, se, distLockKey)
if err != nil {
logutil.BgLogger().Warn("[ddl] release lock error", zap.Error(err))
}
logutil.BgLogger().Info("[ddl] release lock success")
tangenta marked this conversation as resolved.
Show resolved Hide resolved
err = se.Close()
if err != nil {
logutil.BgLogger().Warn("[ddl] close session error", zap.Error(err))
}
}()
}

logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys))
Expand All @@ -189,8 +236,12 @@ func (bc *litBackendCtx) ShouldSync(mode FlushMode) (shouldFlush bool, shouldImp
}
bc.diskRoot.UpdateUsage()
shouldImport = bc.diskRoot.ShouldImport()
shouldFlush = shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= bc.updateInterval
if mode == FlushModeForceLocalAndCheckDiskQuota {
shouldFlush = true
} else {
shouldFlush = shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= bc.updateInterval
}
return shouldFlush, shouldImport
}

Expand Down
11 changes: 6 additions & 5 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

// BackendCtxMgr is used to manage the backend context.
type BackendCtxMgr interface {
CheckAvailable() (bool, error)
Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)
}
Expand Down Expand Up @@ -77,7 +78,7 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error) {
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if !exist {
m.memRoot.RefreshConsumption()
Expand All @@ -96,7 +97,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
return nil, err
}

bcCtx := newBackendContext(ctx, jobID, bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot)
bcCtx := newBackendContext(ctx, jobID, bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
m.Store(jobID, bcCtx)

m.memRoot.Consume(StructSizeBackendCtx)
Expand Down Expand Up @@ -126,8 +127,7 @@ func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error

const checkpointUpdateInterval = 10 * time.Minute

func newBackendContext(ctx context.Context, jobID int64, be *local.Backend,
cfg *config.Config, vars map[string]string, memRoot MemRoot, diskRoot DiskRoot) *litBackendCtx {
func newBackendContext(ctx context.Context, jobID int64, be *local.Backend, cfg *config.Config, vars map[string]string, memRoot MemRoot, diskRoot DiskRoot, etcdClient *clientv3.Client) *litBackendCtx {
bCtx := &litBackendCtx{
SyncMap: generic.NewSyncMap[int64, *engineInfo](10),
MemRoot: memRoot,
Expand All @@ -139,6 +139,7 @@ func newBackendContext(ctx context.Context, jobID int64, be *local.Backend,
sysVars: vars,
diskRoot: diskRoot,
updateInterval: checkpointUpdateInterval,
etcdClient: etcdClient,
}
bCtx.timeOfLastFlush.Store(time.Now())
return bCtx
Expand Down
3 changes: 2 additions & 1 deletion ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand All @@ -46,7 +47,7 @@ func (*MockBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64) (BackendCtx, error) {
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error) {
logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
Loading