diff --git a/ddl/index.go b/ddl/index.go index ea230e14a8305..b93ed738a958e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -756,10 +756,21 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt if err != nil { return model.ReorgTypeNone, err } + var pdLeaderAddr string + if d != nil { + //nolint:forcetypeassert + pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr() + } if variable.EnableDistTask.Load() { +<<<<<<< HEAD:ddl/index.go _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli) } else { _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil) +======= + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName) + } else { + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName) +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/index.go } if err != nil { return model.ReorgTypeNone, err @@ -911,7 +922,17 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, if ok && bc.Done() { return true, 0, nil } +<<<<<<< HEAD:ddl/index.go bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, nil) +======= + ctx := logutil.WithCategory(w.ctx, "ddl-ingest") + var pdLeaderAddr string + if d != nil { + //nolint:forcetypeassert + pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr() + } + bc, err = ingest.LitBackCtxMgr.Register(ctx, allIndexInfos[0].Unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName) +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/index.go if err != nil { ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) return false, ver, errors.Trace(err) @@ -1828,6 +1849,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { if err != nil { return err } +<<<<<<< HEAD:ddl/index.go indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) if indexInfo == nil { return errors.New("unexpected error, can't find index info") @@ -1841,6 +1863,11 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { return bc.CollectRemoteDuplicateRows(indexInfo.ID, t) } return nil +======= + //nolint:forcetypeassert + pdLeaderAddr := w.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr() + return checkDuplicateForUniqueIndex(w.ctx, t, reorgInfo, pdLeaderAddr) +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/index.go } } @@ -1874,6 +1901,48 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { return errors.Trace(err) } +<<<<<<< HEAD:ddl/index.go +======= +func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, pdAddr string) error { + var bc ingest.BackendCtx + var err error + defer func() { + if bc != nil { + ingest.LitBackCtxMgr.Unregister(reorgInfo.ID) + } + }() + for _, elem := range reorgInfo.elements { + indexInfo := model.FindIndexInfoByID(t.Meta().Indices, elem.ID) + if indexInfo == nil { + return errors.New("unexpected error, can't find index info") + } + if indexInfo.Unique { + ctx := logutil.WithCategory(ctx, "ddl-ingest") + if bc == nil { + bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, pdAddr, reorgInfo.ReorgMeta.ResourceGroupName) + if err != nil { + return err + } + } + err = bc.CollectRemoteDuplicateRows(indexInfo.ID, t) + if err != nil { + return err + } + } + } + return nil +} + +// MockDMLExecutionOnTaskFinished is used to mock DML execution when tasks finished. +var MockDMLExecutionOnTaskFinished func() + +// MockDMLExecutionOnDDLPaused is used to mock DML execution when ddl job paused. +var MockDMLExecutionOnDDLPaused func() + +// TestSyncChan is used to sync the test. +var TestSyncChan = make(chan struct{}) + +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/index.go func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { return errors.New("do not support merge index") diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index 764d61e893a17..d64dcf141c471 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -32,7 +32,11 @@ import ( // BackendCtxMgr is used to manage the backend context. type BackendCtxMgr interface { CheckAvailable() (bool, error) +<<<<<<< HEAD:ddl/ingest/backend_mgr.go Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) +======= + Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/ingest/backend_mgr.go Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) } @@ -78,7 +82,11 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { } // Register creates a new backend and registers it to the backend context. +<<<<<<< HEAD:ddl/ingest/backend_mgr.go func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) { +======= +func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) { +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/ingest/backend_mgr.go bc, exist := m.Load(jobID) if !exist { m.memRoot.RefreshConsumption() @@ -91,7 +99,12 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6 logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err } +<<<<<<< HEAD:ddl/ingest/backend_mgr.go bd, err := createLocalBackend(ctx, cfg) +======= + cfg.Lightning.TiDB.PdAddr = pdAddr + bd, err := createLocalBackend(ctx, cfg, resourceGroupName) +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/ingest/backend_mgr.go if err != nil { logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err diff --git a/ddl/ingest/config.go b/ddl/ingest/config.go index cc69fae5ccb2e..f1b3b51be4977 100644 --- a/ddl/ingest/config.go +++ b/ddl/ingest/config.go @@ -57,7 +57,6 @@ func genConfig(memRoot MemRoot, jobID int64, unique bool) (*Config, error) { } else { cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgNone } - cfg.TiDB.PdAddr = tidbCfg.Path cfg.TiDB.Host = "127.0.0.1" cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort) // Set TLS related information diff --git a/ddl/ingest/mock.go b/ddl/ingest/mock.go index b95a16eb52a32..18633d4c27b32 100644 --- a/ddl/ingest/mock.go +++ b/ddl/ingest/mock.go @@ -46,7 +46,11 @@ func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) { } // Register implements BackendCtxMgr.Register interface. +<<<<<<< HEAD:ddl/ingest/mock.go func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error) { +======= +func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string, _ string) (BackendCtx, error) { +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):pkg/ddl/ingest/mock.go logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go new file mode 100644 index 0000000000000..a7025ddcf3946 --- /dev/null +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -0,0 +1,174 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "encoding/json" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" + "github.com/pingcap/tidb/pkg/ddl/ingest" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" + "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/execute" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/tikv/client-go/v2/tikv" + "go.uber.org/zap" +) + +// BackfillGlobalMeta is the global task meta for backfilling index. +type BackfillGlobalMeta struct { + Job model.Job `json:"job"` + // EleIDs stands for the index/column IDs to backfill with distributed framework. + EleIDs []int64 `json:"ele_ids"` + // EleTypeKey is the type of the element to backfill with distributed framework. + // For now, only index type is supported. + EleTypeKey []byte `json:"ele_type_key"` + + CloudStorageURI string `json:"cloud_storage_uri"` + // UseMergeSort indicate whether the backfilling task use merge sort step for global sort. + // Merge Sort step aims to support more data. + UseMergeSort bool `json:"use_merge_sort"` +} + +// BackfillSubTaskMeta is the sub-task meta for backfilling index. +type BackfillSubTaskMeta struct { + PhysicalTableID int64 `json:"physical_table_id"` + + RangeSplitKeys [][]byte `json:"range_split_keys"` + DataFiles []string `json:"data-files"` + StatFiles []string `json:"stat-files"` + external.SortedKVMeta `json:",inline"` +} + +// NewBackfillSubtaskExecutor creates a new backfill subtask executor. +func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl, + bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.SubtaskExecutor, error) { + bgm := &BackfillGlobalMeta{} + err := json.Unmarshal(taskMeta, bgm) + if err != nil { + return nil, err + } + jobMeta := &bgm.Job + + _, tbl, err := d.getTableByTxn((*asAutoIDRequirement)(d.ddlCtx), jobMeta.SchemaID, jobMeta.TableID) + if err != nil { + return nil, err + } + indexInfos := make([]*model.IndexInfo, 0, len(bgm.EleIDs)) + for _, eid := range bgm.EleIDs { + indexInfo := model.FindIndexInfoByID(tbl.Meta().Indices, eid) + if indexInfo == nil { + logutil.BgLogger().Warn("index info not found", zap.String("category", "ddl-ingest"), + zap.Int64("table ID", tbl.Meta().ID), zap.Int64("index ID", eid)) + return nil, errors.Errorf("index info not found: %d", eid) + } + indexInfos = append(indexInfos, indexInfo) + } + + switch stage { + case proto.StepOne: + jc := d.jobContext(jobMeta.ID, jobMeta.ReorgMeta) + d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) + d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) + return newReadIndexExecutor( + d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil + case proto.StepTwo: + return newMergeSortExecutor(jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) + case proto.StepThree: + if len(bgm.CloudStorageURI) > 0 { + return newCloudImportExecutor(&bgm.Job, jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) + } + return nil, errors.Errorf("local import does not have write & ingest step") + default: + return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID) + } +} + +type backfillDistScheduler struct { + *scheduler.BaseScheduler + d *ddl + task *proto.Task + taskTable scheduler.TaskTable + backendCtx ingest.BackendCtx + jobID int64 +} + +func newBackfillDistScheduler(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable, d *ddl) scheduler.Scheduler { + s := &backfillDistScheduler{ + BaseScheduler: scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable), + d: d, + task: task, + taskTable: taskTable, + } + s.BaseScheduler.Extension = s + return s +} + +func (s *backfillDistScheduler) Init(ctx context.Context) error { + err := s.BaseScheduler.Init(ctx) + if err != nil { + return err + } + d := s.d + + bgm := &BackfillGlobalMeta{} + err = json.Unmarshal(s.task.Meta, bgm) + if err != nil { + return errors.Trace(err) + } + job := &bgm.Job + _, tbl, err := d.getTableByTxn((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, job.TableID) + if err != nil { + return errors.Trace(err) + } + // We only support adding multiple unique indexes or multiple non-unique indexes, + // we use the first index uniqueness here. + idx := model.FindIndexInfoByID(tbl.Meta().Indices, bgm.EleIDs[0]) + if idx == nil { + return errors.Trace(errors.Errorf("index info not found: %d", bgm.EleIDs[0])) + } + pdLeaderAddr := d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr() + bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName) + if err != nil { + return errors.Trace(err) + } + s.backendCtx = bc + s.jobID = job.ID + return nil +} + +func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) { + switch task.Step { + case proto.StepOne, proto.StepTwo, proto.StepThree: + return NewBackfillSubtaskExecutor(ctx, task.Meta, s.d, s.backendCtx, task.Step, summary) + default: + return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID) + } +} + +func (*backfillDistScheduler) IsIdempotent(*proto.Subtask) bool { + return true +} + +func (s *backfillDistScheduler) Close() { + if s.backendCtx != nil { + ingest.LitBackCtxMgr.Unregister(s.jobID) + } + s.BaseScheduler.Close() +} diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index bb6af7e925673..ff73cd5b08d42 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -84,7 +84,11 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) { tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);") // Mock there is a running ingest job. +<<<<<<< HEAD:tests/realtikvtest/addindextest/integration_test.go _, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil) +======= + _, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, realtikvtest.PDAddr, "") +>>>>>>> 6260e66ad8f (ddl: use latest PD address to register lightning (#48687)):tests/realtikvtest/addindextest4/ingest_test.go require.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(2) diff --git a/tests/realtikvtest/testkit.go b/tests/realtikvtest/testkit.go index 8b05e1518e3ff..41d044a40e314 100644 --- a/tests/realtikvtest/testkit.go +++ b/tests/realtikvtest/testkit.go @@ -48,6 +48,9 @@ var ( // TiKVPath is the path of the TiKV Storage. TiKVPath = flag.String("tikv-path", "tikv://127.0.0.1:2379?disableGC=true", "TiKV addr") + // PDAddr is the address of PD. + PDAddr = "127.0.0.1:2379" + // KeyspaceName is an option to specify the name of keyspace that the tests run on, // this option is only valid while the flag WithRealTiKV is set. KeyspaceName = flag.String("keyspace-name", "", "the name of keyspace that the tests run on")