diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index dabaf890349f7..a7025ddcf3946 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) @@ -142,7 +143,8 @@ func (s *backfillDistScheduler) Init(ctx context.Context) error { if idx == nil { return errors.Trace(errors.Errorf("index info not found: %d", bgm.EleIDs[0])) } - bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName) + pdLeaderAddr := d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr() + bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index c45d535bf0ed9..ed0b11972b064 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -770,10 +770,15 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt if err != nil { return model.ReorgTypeNone, err } + var pdLeaderAddr string + if d != nil { + //nolint:forcetypeassert + pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr() + } if variable.EnableDistTask.Load() { - _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName) + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName) } else { - _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, job.ReorgMeta.ResourceGroupName) + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName) } if err != nil { return model.ReorgTypeNone, err @@ -964,7 +969,12 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, return true, 0, nil } ctx := logutil.WithCategory(w.ctx, "ddl-ingest") - bc, err = ingest.LitBackCtxMgr.Register(ctx, allIndexInfos[0].Unique, job.ID, nil, job.ReorgMeta.ResourceGroupName) + var pdLeaderAddr string + if d != nil { + //nolint:forcetypeassert + pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr() + } + bc, err = ingest.LitBackCtxMgr.Register(ctx, allIndexInfos[0].Unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName) if err != nil { ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), allIndexInfos, err) return false, ver, errors.Trace(err) @@ -1976,7 +1986,9 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { if err != nil { return err } - return checkDuplicateForUniqueIndex(w.ctx, t, reorgInfo) + //nolint:forcetypeassert + pdLeaderAddr := w.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr() + return checkDuplicateForUniqueIndex(w.ctx, t, reorgInfo, pdLeaderAddr) } } @@ -2013,7 +2025,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { return errors.Trace(err) } -func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo) error { +func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, pdAddr string) error { var bc ingest.BackendCtx var err error defer func() { @@ -2029,7 +2041,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo if indexInfo.Unique { ctx := logutil.WithCategory(ctx, "ddl-ingest") if bc == nil { - bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, reorgInfo.ReorgMeta.ResourceGroupName) + bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, pdAddr, reorgInfo.ReorgMeta.ResourceGroupName) if err != nil { return err } diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 150ad813eeeb5..389ca249a7f9e 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -33,7 +33,7 @@ import ( // BackendCtxMgr is used to manage the backend context. type BackendCtxMgr interface { CheckAvailable() (bool, error) - Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error) + Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) } @@ -79,7 +79,7 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { } // Register creates a new backend and registers it to the backend context. -func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error) { +func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) { bc, exist := m.Load(jobID) if !exist { m.memRoot.RefreshConsumption() @@ -92,6 +92,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6 logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err } + cfg.Lightning.TiDB.PdAddr = pdAddr bd, err := createLocalBackend(ctx, cfg, resourceGroupName) if err != nil { logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err)) diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index 9a86ff269a4e9..27351a0668c19 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -62,7 +62,6 @@ func genConfig(ctx context.Context, memRoot MemRoot, jobID int64, unique bool) ( } else { cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgNone } - cfg.TiDB.PdAddr = tidbCfg.Path cfg.TiDB.Host = "127.0.0.1" cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort) // Set TLS related information diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 5a13fad34e976..c2ef78cf1bd86 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -48,7 +48,7 @@ func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) { } // Register implements BackendCtxMgr.Register interface. -func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string) (BackendCtx, error) { +func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string, _ string) (BackendCtx, error) { logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/tests/realtikvtest/addindextest4/ingest_test.go b/tests/realtikvtest/addindextest4/ingest_test.go index 8352ba38c2f84..c95232c5ecc94 100644 --- a/tests/realtikvtest/addindextest4/ingest_test.go +++ b/tests/realtikvtest/addindextest4/ingest_test.go @@ -91,7 +91,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) { tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);") // Mock there is a running ingest job. - _, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, "") + _, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, realtikvtest.PDAddr, "") require.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(2) diff --git a/tests/realtikvtest/testkit.go b/tests/realtikvtest/testkit.go index e624172c24e67..1b8fc505ca11c 100644 --- a/tests/realtikvtest/testkit.go +++ b/tests/realtikvtest/testkit.go @@ -48,6 +48,9 @@ var ( // TiKVPath is the path of the TiKV Storage. TiKVPath = flag.String("tikv-path", "tikv://127.0.0.1:2379?disableGC=true", "TiKV addr") + // PDAddr is the address of PD. + PDAddr = "127.0.0.1:2379" + // KeyspaceName is an option to specify the name of keyspace that the tests run on, // this option is only valid while the flag WithRealTiKV is set. KeyspaceName = flag.String("keyspace-name", "", "the name of keyspace that the tests run on")