Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#48687
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Sep 9, 2024
1 parent e15afd9 commit b40b6ee
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 1 deletion.
69 changes: 69 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,10 +756,21 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt
if err != nil {
return model.ReorgTypeNone, err
}
var pdLeaderAddr string
if d != nil {
//nolint:forcetypeassert
pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
}
if variable.EnableDistTask.Load() {
<<<<<<< HEAD:ddl/index.go
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli)
} else {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil)
=======
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
} else {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/index.go
}
if err != nil {
return model.ReorgTypeNone, err
Expand Down Expand Up @@ -911,7 +922,17 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
if ok && bc.Done() {
return true, 0, nil
}
<<<<<<< HEAD:ddl/index.go
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, nil)
=======
ctx := logutil.WithCategory(w.ctx, "ddl-ingest")
var pdLeaderAddr string
if d != nil {
//nolint:forcetypeassert
pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
}
bc, err = ingest.LitBackCtxMgr.Register(ctx, allIndexInfos[0].Unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/index.go
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1828,6 +1849,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
if err != nil {
return err
}
<<<<<<< HEAD:ddl/index.go
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID)
if indexInfo == nil {
return errors.New("unexpected error, can't find index info")
Expand All @@ -1841,6 +1863,11 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
return bc.CollectRemoteDuplicateRows(indexInfo.ID, t)
}
return nil
=======
//nolint:forcetypeassert
pdLeaderAddr := w.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
return checkDuplicateForUniqueIndex(w.ctx, t, reorgInfo, pdLeaderAddr)
>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/index.go
}
}

Expand Down Expand Up @@ -1874,6 +1901,48 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
return errors.Trace(err)
}

<<<<<<< HEAD:ddl/index.go
=======
func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, pdAddr string) error {
var bc ingest.BackendCtx
var err error
defer func() {
if bc != nil {
ingest.LitBackCtxMgr.Unregister(reorgInfo.ID)
}
}()
for _, elem := range reorgInfo.elements {
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, elem.ID)
if indexInfo == nil {
return errors.New("unexpected error, can't find index info")
}
if indexInfo.Unique {
ctx := logutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, pdAddr, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
return err
}
}
err = bc.CollectRemoteDuplicateRows(indexInfo.ID, t)
if err != nil {
return err
}
}
}
return nil
}

// MockDMLExecutionOnTaskFinished is used to mock DML execution when tasks finished.
var MockDMLExecutionOnTaskFinished func()

// MockDMLExecutionOnDDLPaused is used to mock DML execution when ddl job paused.
var MockDMLExecutionOnDDLPaused func()

// TestSyncChan is used to sync the test.
var TestSyncChan = make(chan struct{})

>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/index.go
func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
return errors.New("do not support merge index")
Expand Down
13 changes: 13 additions & 0 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ import (
// BackendCtxMgr is used to manage the backend context.
type BackendCtxMgr interface {
CheckAvailable() (bool, error)
<<<<<<< HEAD:ddl/ingest/backend_mgr.go
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error)
=======
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error)
>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/ingest/backend_mgr.go
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)
}
Expand Down Expand Up @@ -78,7 +82,11 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register creates a new backend and registers it to the backend context.
<<<<<<< HEAD:ddl/ingest/backend_mgr.go
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) {
=======
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) {
>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/ingest/backend_mgr.go
bc, exist := m.Load(jobID)
if !exist {
m.memRoot.RefreshConsumption()
Expand All @@ -91,7 +99,12 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
<<<<<<< HEAD:ddl/ingest/backend_mgr.go
bd, err := createLocalBackend(ctx, cfg)
=======
cfg.Lightning.TiDB.PdAddr = pdAddr
bd, err := createLocalBackend(ctx, cfg, resourceGroupName)
>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/ingest/backend_mgr.go
if err != nil {
logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand Down
1 change: 0 additions & 1 deletion ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func genConfig(memRoot MemRoot, jobID int64, unique bool) (*Config, error) {
} else {
cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgNone
}
cfg.TiDB.PdAddr = tidbCfg.Path
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort)
// Set TLS related information
Expand Down
4 changes: 4 additions & 0 deletions ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register implements BackendCtxMgr.Register interface.
<<<<<<< HEAD:ddl/ingest/mock.go
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error) {
=======
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string, _ string) (BackendCtx, error) {
>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/ingest/mock.go
logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
174 changes: 174 additions & 0 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler/execute"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

// BackfillGlobalMeta is the global task meta for backfilling index.
type BackfillGlobalMeta struct {
Job model.Job `json:"job"`
// EleIDs stands for the index/column IDs to backfill with distributed framework.
EleIDs []int64 `json:"ele_ids"`
// EleTypeKey is the type of the element to backfill with distributed framework.
// For now, only index type is supported.
EleTypeKey []byte `json:"ele_type_key"`

CloudStorageURI string `json:"cloud_storage_uri"`
// UseMergeSort indicate whether the backfilling task use merge sort step for global sort.
// Merge Sort step aims to support more data.
UseMergeSort bool `json:"use_merge_sort"`
}

// BackfillSubTaskMeta is the sub-task meta for backfilling index.
type BackfillSubTaskMeta struct {
PhysicalTableID int64 `json:"physical_table_id"`

RangeSplitKeys [][]byte `json:"range_split_keys"`
DataFiles []string `json:"data-files"`
StatFiles []string `json:"stat-files"`
external.SortedKVMeta `json:",inline"`
}

// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.SubtaskExecutor, error) {
bgm := &BackfillGlobalMeta{}
err := json.Unmarshal(taskMeta, bgm)
if err != nil {
return nil, err
}
jobMeta := &bgm.Job

_, tbl, err := d.getTableByTxn((*asAutoIDRequirement)(d.ddlCtx), jobMeta.SchemaID, jobMeta.TableID)
if err != nil {
return nil, err
}
indexInfos := make([]*model.IndexInfo, 0, len(bgm.EleIDs))
for _, eid := range bgm.EleIDs {
indexInfo := model.FindIndexInfoByID(tbl.Meta().Indices, eid)
if indexInfo == nil {
logutil.BgLogger().Warn("index info not found", zap.String("category", "ddl-ingest"),
zap.Int64("table ID", tbl.Meta().ID), zap.Int64("index ID", eid))
return nil, errors.Errorf("index info not found: %d", eid)
}
indexInfos = append(indexInfos, indexInfo)
}

switch stage {
case proto.StepOne:
jc := d.jobContext(jobMeta.ID, jobMeta.ReorgMeta)
d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query)
d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
return newReadIndexExecutor(
d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil
case proto.StepTwo:
return newMergeSortExecutor(jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
case proto.StepThree:
if len(bgm.CloudStorageURI) > 0 {
return newCloudImportExecutor(&bgm.Job, jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
}
return nil, errors.Errorf("local import does not have write & ingest step")
default:
return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID)
}
}

type backfillDistScheduler struct {
*scheduler.BaseScheduler
d *ddl
task *proto.Task
taskTable scheduler.TaskTable
backendCtx ingest.BackendCtx
jobID int64
}

func newBackfillDistScheduler(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable, d *ddl) scheduler.Scheduler {
s := &backfillDistScheduler{
BaseScheduler: scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable),
d: d,
task: task,
taskTable: taskTable,
}
s.BaseScheduler.Extension = s
return s
}

func (s *backfillDistScheduler) Init(ctx context.Context) error {
err := s.BaseScheduler.Init(ctx)
if err != nil {
return err
}
d := s.d

bgm := &BackfillGlobalMeta{}
err = json.Unmarshal(s.task.Meta, bgm)
if err != nil {
return errors.Trace(err)
}
job := &bgm.Job
_, tbl, err := d.getTableByTxn((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, job.TableID)
if err != nil {
return errors.Trace(err)
}
// We only support adding multiple unique indexes or multiple non-unique indexes,
// we use the first index uniqueness here.
idx := model.FindIndexInfoByID(tbl.Meta().Indices, bgm.EleIDs[0])
if idx == nil {
return errors.Trace(errors.Errorf("index info not found: %d", bgm.EleIDs[0]))
}
pdLeaderAddr := d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
if err != nil {
return errors.Trace(err)
}
s.backendCtx = bc
s.jobID = job.ID
return nil
}

func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) {
switch task.Step {
case proto.StepOne, proto.StepTwo, proto.StepThree:
return NewBackfillSubtaskExecutor(ctx, task.Meta, s.d, s.backendCtx, task.Step, summary)
default:
return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID)
}
}

func (*backfillDistScheduler) IsIdempotent(*proto.Subtask) bool {
return true
}

func (s *backfillDistScheduler) Close() {
if s.backendCtx != nil {
ingest.LitBackCtxMgr.Unregister(s.jobID)
}
s.BaseScheduler.Close()
}
4 changes: 4 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

// Mock there is a running ingest job.
<<<<<<< HEAD:tests/realtikvtest/addindextest/integration_test.go
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil)
=======
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, realtikvtest.PDAddr, "")
>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):tests/realtikvtest/addindextest4/ingest_test.go
require.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(2)
Expand Down
3 changes: 3 additions & 0 deletions tests/realtikvtest/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ var (
// TiKVPath is the path of the TiKV Storage.
TiKVPath = flag.String("tikv-path", "tikv://127.0.0.1:2379?disableGC=true", "TiKV addr")

// PDAddr is the address of PD.
PDAddr = "127.0.0.1:2379"

// KeyspaceName is an option to specify the name of keyspace that the tests run on,
// this option is only valid while the flag WithRealTiKV is set.
KeyspaceName = flag.String("keyspace-name", "", "the name of keyspace that the tests run on")
Expand Down

0 comments on commit b40b6ee

Please sign in to comment.