Skip to content

Commit

Permalink
sql,backupccl: add replica oracle that prefers non-leaseholders, use …
Browse files Browse the repository at this point in the history
…in backup

Following cockroachdb#91405 which enables followers to serve ExportRequests, this patch
introduces an oracle that prefers non-potential-leaseholders when selecting a
replica. This oracle is then used in backups so that ExportRequests during
backups have a preference to be served by non-leaseholders.

Release note: None
  • Loading branch information
Rui Hu committed Dec 16, 2022
1 parent 62cfe1a commit 7a32704
Show file tree
Hide file tree
Showing 31 changed files with 171 additions and 67 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ go_library(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/physicalplan",
"//pkg/sql/physicalplan/replicaoracle",
"//pkg/sql/privilege",
"//pkg/sql/protoreflect",
"//pkg/sql/roleoption",
Expand Down
14 changes: 12 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -164,10 +165,19 @@ func backup(

evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()
oracleCfg := replicaoracle.Config{
NodeDescs: execCtx.ExecCfg().NodeDescs,
NodeID: 0, // Planning node ID not important.
Locality: roachpb.Locality{}, // Planning node locality not important.
Settings: execCtx.ExecCfg().Settings,
Clock: execCtx.ExecCfg().Clock,
RPCContext: execCtx.ExecCfg().RPCContext,
}
oracle := replicaoracle.NewOracle(replicaoracle.PreferFollowerChoice, oracleCfg)

// We don't return the compatible nodes here since PartitionSpans will
// filter out incompatible nodes.
planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg(), oracle)
if err != nil {
return roachpb.RowCount{}, errors.Wrap(err, "failed to determine nodes on which to run")
}
Expand Down Expand Up @@ -198,7 +208,7 @@ func backup(
}

numTotalSpans := 0
for _, spec := range backupSpecs {
for nodeID, spec := range backupSpecs {
numTotalSpans += len(spec.IntroducedSpans) + len(spec.Spans)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func distRestore(

makePlan := func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {

planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg())
planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg(), nil)
if err != nil {
return nil, nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ func makePlan(
distMode = sql.DistributionTypeNone
}

planCtx := dsp.NewPlanningCtx(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn,
sql.DistributionType(distMode))
planCtx := dsp.NewPlanningCtx(ctx, execCtx.ExtendedEvalContext(), nil, blankTxn, sql.DistributionType(distMode), nil)
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()

planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg(), nil)

if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ func getReplicationStreamSpec(
// Partition the spans with SQLPlanner
var noTxn *kv.Txn
dsp := jobExecCtx.DistSQLPlanner()
planCtx := dsp.NewPlanningCtx(ctx, jobExecCtx.ExtendedEvalContext(),
nil /* planner */, noTxn, sql.DistributionTypeSystemTenantOnly)
planCtx := dsp.NewPlanningCtx(ctx, jobExecCtx.ExtendedEvalContext(), nil, noTxn, sql.DistributionTypeSystemTenantOnly, nil)

details, ok := j.Details().(jobspb.StreamReplicationDetails)
if !ok {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ go_test(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/sql/physicalplan",
"//pkg/sql/physicalplan/replicaoracle",
"//pkg/sql/privilege",
"//pkg/sql/querycache",
"//pkg/sql/randgen",
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ func runPlanInsidePlan(
if distributePlan.WillDistribute() {
distributeType = DistributionTypeAlways
}
planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.NewPlanningCtx(
ctx, evalCtx, &plannerCopy, params.p.txn, distributeType)
planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, params.p.txn, distributeType, nil)
planCtx.planner.curPlan.planComponents = *plan
planCtx.ExtendedEvalCtx.Planner = &plannerCopy
planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func numRangesInSpans(
ctx context.Context, db *kv.DB, distSQLPlanner *DistSQLPlanner, spans []roachpb.Span,
) (int, error) {
txn := db.NewTxn(ctx, "num-ranges-in-spans")
spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn)
spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn, nil)
rangeIds := make(map[int64]struct{})
for _, span := range spans {
// For each span, iterate the spanResolver until it's exhausted, storing
Expand Down Expand Up @@ -896,7 +896,7 @@ func NumRangesInSpanContainedBy(
containedBy []roachpb.Span,
) (total, inContainedBy int, _ error) {
txn := db.NewTxn(ctx, "num-ranges-in-spans")
spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn)
spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn, nil)
// For each span, iterate the spanResolver until it's exhausted, storing
// the found range ids in the map to de-duplicate them.
spanResolver.Seek(ctx, outerSpan, kvcoord.Ascending)
Expand Down Expand Up @@ -1026,8 +1026,7 @@ func (sc *SchemaChanger) distIndexBackfill(
}
sd := NewFakeSessionData(sc.execCfg.SV())
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.ReadTimestamp(), descriptors)
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, /* planner */
txn, DistributionTypeSystemTenantOnly)
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
chunkSize := sc.getChunkSize(indexBatchSize)
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes)
Expand Down Expand Up @@ -1334,8 +1333,7 @@ func (sc *SchemaChanger) distColumnBackfill(
)
defer recv.Release()

planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
DistributionTypeSystemTenantOnly)
planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil)
spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,8 +1585,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
defer recv.Release()

evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner,
planner.txn, distribute)
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute, nil)
planCtx.stmtType = recv.stmtType
// Skip the diagram generation since on this "main" query path we can get it
// via the statement bundle.
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,7 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er
}
}

planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn,
DistributionTypeSystemTenantOnly)
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil)
// CREATE STATS flow doesn't produce any rows and only emits the
// metadata, so we can use a nil rowContainerHelper.
resultWriter := NewRowResultWriter(nil /* rowContainer */)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4228,6 +4228,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtx(
planner *planner,
txn *kv.Txn,
distributionType DistributionType,
oracle replicaoracle.Oracle,
) *PlanningCtx {
distribute := distributionType == DistributionTypeAlways || (distributionType == DistributionTypeSystemTenantOnly && evalCtx.Codec.ForSystemTenant())
planCtx := &PlanningCtx{
Expand All @@ -4254,7 +4255,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtx(
// we still need to instantiate a full planning context.
planCtx.parallelizeScansIfLocal = true
}
planCtx.spanIter = dsp.spanResolver.NewSpanResolverIterator(txn)
planCtx.spanIter = dsp.spanResolver.NewSpanResolverIterator(txn, oracle)
planCtx.nodeStatuses = make(map[base.SQLInstanceID]NodeStatus)
planCtx.nodeStatuses[dsp.gatewaySQLInstanceID] = NodeOK
return planCtx
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -578,7 +579,9 @@ type testSpanResolver struct {
}

// NewSpanResolverIterator is part of the SpanResolver interface.
func (tsr *testSpanResolver) NewSpanResolverIterator(_ *kv.Txn) physicalplan.SpanResolverIterator {
func (tsr *testSpanResolver) NewSpanResolverIterator(
txn *kv.Txn, optionalOracle replicaoracle.Oracle,
) physicalplan.SpanResolverIterator {
return &testSpanResolverIterator{tsr: tsr}
}

Expand Down Expand Up @@ -856,7 +859,7 @@ func TestPartitionSpans(t *testing.T) {
ctx := context.Background()
planCtx := dsp.NewPlanningCtx(ctx, &extendedEvalContext{
Context: eval.Context{Codec: keys.SystemSQLCodec},
}, nil, nil, DistributionTypeSystemTenantOnly)
}, nil, nil, DistributionTypeSystemTenantOnly, nil)
var spans []roachpb.Span
for _, s := range tc.spans {
spans = append(spans, roachpb.Span{Key: roachpb.Key(s[0]), EndKey: roachpb.Key(s[1])})
Expand Down Expand Up @@ -1042,7 +1045,7 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) {
ctx := context.Background()
planCtx := dsp.NewPlanningCtx(ctx, &extendedEvalContext{
Context: eval.Context{Codec: keys.SystemSQLCodec},
}, nil, nil, DistributionTypeSystemTenantOnly)
}, nil, nil, DistributionTypeSystemTenantOnly, nil)
partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1143,7 +1146,7 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
ctx := context.Background()
planCtx := dsp.NewPlanningCtx(ctx, &extendedEvalContext{
Context: eval.Context{Codec: keys.SystemSQLCodec},
}, nil, nil, DistributionTypeSystemTenantOnly)
}, nil, nil, DistributionTypeSystemTenantOnly, nil)
partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span})
if err != nil {
t.Fatal(err)
Expand Down
26 changes: 17 additions & 9 deletions pkg/sql/distsql_plan_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,34 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// SetupAllNodesPlanning creates a planCtx and sets up the planCtx.nodeStatuses
// map for all nodes. It returns all nodes that can be used for planning.
func (dsp *DistSQLPlanner) SetupAllNodesPlanning(
ctx context.Context, evalCtx *extendedEvalContext, execCfg *ExecutorConfig,
ctx context.Context,
evalCtx *extendedEvalContext,
execCfg *ExecutorConfig,
oracle replicaoracle.Oracle,
) (*PlanningCtx, []base.SQLInstanceID, error) {
if dsp.codec.ForSystemTenant() {
return dsp.setupAllNodesPlanningSystem(ctx, evalCtx, execCfg)
return dsp.setupAllNodesPlanningSystem(ctx, evalCtx, execCfg, oracle)
}
return dsp.setupAllNodesPlanningTenant(ctx, evalCtx, execCfg)
return dsp.setupAllNodesPlanningTenant(ctx, evalCtx, execCfg, oracle)
}

// setupAllNodesPlanningSystem creates a planCtx and returns all nodes available
// in a system tenant.
func (dsp *DistSQLPlanner) setupAllNodesPlanningSystem(
ctx context.Context, evalCtx *extendedEvalContext, execCfg *ExecutorConfig,
ctx context.Context,
evalCtx *extendedEvalContext,
execCfg *ExecutorConfig,
oracle replicaoracle.Oracle,
) (*PlanningCtx, []base.SQLInstanceID, error) {
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, nil, /* txn */
DistributionTypeAlways)
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil, nil, DistributionTypeAlways, oracle)

ss, err := execCfg.NodesStatusServer.OptionalNodesStatusServer(47900)
if err != nil {
Expand Down Expand Up @@ -65,13 +71,15 @@ func (dsp *DistSQLPlanner) setupAllNodesPlanningSystem(
// setupAllNodesPlanningTenant creates a planCtx and returns all nodes available
// in a non-system tenant.
func (dsp *DistSQLPlanner) setupAllNodesPlanningTenant(
ctx context.Context, evalCtx *extendedEvalContext, execCfg *ExecutorConfig,
ctx context.Context,
evalCtx *extendedEvalContext,
execCfg *ExecutorConfig,
oracle replicaoracle.Oracle,
) (*PlanningCtx, []base.SQLInstanceID, error) {
if dsp.sqlAddressResolver == nil {
return nil, nil, errors.New("sql instance provider not available in multi-tenant environment")
}
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, nil, /* txn */
DistributionTypeAlways)
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil, nil, DistributionTypeAlways, oracle)
pods, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_plan_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func PlanCDCExpression(
return cdcPlan, errors.AssertionFailedf("unexpected query structure")
}

planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, DistributionTypeNone)
planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, DistributionTypeNone, nil)

return CDCExpressionPlan{
Plan: p.curPlan.main,
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/distsql_plan_ctas.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func PlanAndRunCTAS(
if !isLocal {
distribute = DistributionTypeSystemTenantOnly
}
planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), planner,
txn, distribute)
planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), planner, txn, distribute, nil)
planCtx.stmtType = tree.Rows

physPlan, cleanup, err := dsp.createPhysPlan(ctx, planCtx, in)
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1606,8 +1606,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
if distributeSubquery {
distribute = DistributionTypeAlways
}
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn,
distribute)
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute, nil)
subqueryPlanCtx.stmtType = tree.Rows
subqueryPlanCtx.skipDistSQLDiagramGeneration = skipDistSQLDiagramGeneration
if planner.instrumentation.ShouldSaveFlows() {
Expand Down Expand Up @@ -1938,7 +1937,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
if distributePostquery {
distribute = DistributionTypeAlways
}
postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute)
postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute, nil)
postqueryPlanCtx.stmtType = tree.Rows
// Postqueries are only executed on the main query path where we skip the
// diagram generation.
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
// We need distribute = true so that executing the plan involves marshaling
// the root txn meta to leaf txns. Local flows can start in aborted txns
// because they just use the root txn.
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, nil,
DistributionTypeSystemTenantOnly)
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, nil, DistributionTypeSystemTenantOnly, nil)
planCtx.stmtType = recv.stmtType

execCfg.DistSQLPlanner.PlanAndRun(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newDistSQLSpecExecFactory(
distribute = DistributionTypeSystemTenantOnly
}
evalCtx := p.ExtendedEvalContext()
e.planCtx = e.dsp.NewPlanningCtx(ctx, evalCtx, e.planner, e.planner.txn, distribute)
e.planCtx = e.dsp.NewPlanningCtx(ctx, evalCtx, e.planner, e.planner.txn, distribute, nil)
return e
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ func newPlanningCtxForExplainPurposes(
if distribution.WillDistribute() {
distribute = DistributionTypeAlways
}
planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx,
params.p, params.p.txn, distribute)
planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx, params.p, params.p.txn, distribute, nil)
planCtx.planner.curPlan.subqueryPlans = subqueryPlans
for i := range planCtx.planner.curPlan.subqueryPlans {
p := &planCtx.planner.curPlan.subqueryPlans[i]
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func distImport(
makePlan := func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
evalCtx := execCtx.ExtendedEvalContext()

planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg(), nil)
if err != nil {
return nil, nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ func (ib *IndexBackfillPlanner) plan(
) error {
sd := NewFakeSessionData(ib.execCfg.SV())
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, sd, nowTimestamp, descriptors)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx,
nil /* planner */, txn, DistributionTypeSystemTenantOnly)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/mvcc_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ func (im *IndexBackfillerMergePlanner) plan(
) error {
sd := NewFakeSessionData(im.execCfg.SV())
evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, sd, txn.ReadTimestamp(), descriptors)
planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
DistributionTypeSystemTenantOnly)
planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil)

spec, err := initIndexBackfillMergerSpec(*tableDesc.TableDesc(), addedIndexes, temporaryIndexes, mergeTimestamp)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/physicalplan/fake_span_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)
Expand Down Expand Up @@ -65,7 +66,9 @@ type fakeSpanResolverIterator struct {
}

// NewSpanResolverIterator is part of the SpanResolver interface.
func (fsr *fakeSpanResolver) NewSpanResolverIterator(txn *kv.Txn) SpanResolverIterator {
func (fsr *fakeSpanResolver) NewSpanResolverIterator(
txn *kv.Txn, optionalOracle replicaoracle.Oracle,
) SpanResolverIterator {
rng, _ := randutil.NewTestRand()
return &fakeSpanResolverIterator{fsr: fsr, db: txn.DB(), rng: rng}
}
Expand Down
Loading

0 comments on commit 7a32704

Please sign in to comment.