Skip to content

Commit

Permalink
ddl: use latest PD address to register lightning (#48687) (#55936)
Browse files Browse the repository at this point in the history
close #48680
  • Loading branch information
ti-chi-bot authored Nov 25, 2024
1 parent ae4703a commit 5f4110d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
19 changes: 18 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,24 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
job.RowCount = 0
return false, ver, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
var pdLeaderAddr string
if d != nil {
store, ok2 := d.store.(tikv.Storage)
if ok2 {
pdLeaderAddr = store.GetRegionCache().PDClient().GetLeaderAddr()
}
// happens in unit tests
if !ok2 || pdLeaderAddr == "mockpd" {
pdLeaderAddr = config.GetGlobalConfig().Path
}
}
bc, err = ingest.LitBackCtxMgr.Register(
w.ctx,
indexInfo.Unique,
job.ID,
job.ReorgMeta.SQLMode,
pdLeaderAddr,
)
if err != nil {
err = tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
Expand Down
9 changes: 8 additions & 1 deletion ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ func (m *backendCtxManager) init(memRoot MemRoot, diskRoot DiskRoot) {
}

// Register creates a new backend and registers it to the backend context.
func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int64, _ mysql.SQLMode) (*BackendContext, error) {
func (m *backendCtxManager) Register(
ctx context.Context,
unique bool,
jobID int64,
_ mysql.SQLMode,
pdAddr string,
) (*BackendContext, error) {
bc, exist := m.Load(jobID)
if !exist {
m.memRoot.RefreshConsumption()
Expand All @@ -61,6 +67,7 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int
logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
cfg.TiDB.PdAddr = pdAddr
bd, err := createLocalBackend(ctx, cfg, glueLit{})
if err != nil {
logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
Expand Down
1 change: 0 additions & 1 deletion ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config
} else {
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
}
cfg.TiDB.PdAddr = tidbCfg.Path
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort)
// Set TLS related information
Expand Down

0 comments on commit 5f4110d

Please sign in to comment.