Skip to content

Commit

Permalink
sql: rename two DistributionType options
Browse files Browse the repository at this point in the history
This commit does the following renaming to better represent the options:
- `DistributionTypeNone` -> `LocalDistribution`
- `DistributionTypeAlways` -> `FullDistribution`.

Release note: None
  • Loading branch information
yuzefovich committed Mar 13, 2024
1 parent 6f8cfd7 commit 5633695
Show file tree
Hide file tree
Showing 19 changed files with 61 additions and 53 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ func makePlan(
maybeCfKnobs, haveKnobs := execCtx.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs)
var blankTxn *kv.Txn

distMode := sql.DistributionTypeAlways
distMode := sql.FullDistribution
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on this node.
distMode = sql.DistributionTypeNone
distMode = sql.LocalDistribution
}

var locFilter roachpb.Locality
Expand All @@ -393,7 +393,7 @@ func makePlan(
log.Infof(ctx, "spans returned by DistSQL: %s", spanPartitions)
}
switch {
case distMode == sql.DistributionTypeNone || rangeDistribution == int64(defaultDistribution):
case distMode == sql.LocalDistribution || rangeDistribution == int64(defaultDistribution):
case rangeDistribution == int64(balancedSimpleDistribution):
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "rebalancing ranges using balanced simple distribution")
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,9 @@ func buildReplicationStreamSpec(

// Partition the spans with SQLPlanner
dsp := jobExecCtx.DistSQLPlanner()
planCtx := dsp.NewPlanningCtx(ctx, jobExecCtx.ExtendedEvalContext(),
nil /* planner */, nil /* txn */, sql.DistributionTypeAlways)
planCtx := dsp.NewPlanningCtx(
ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution,
)

spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,9 @@ func runPlanInsidePlan(
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
plannerCopy.SessionData().DistSQLMode, plan.main,
)
distributeType := DistributionType(DistributionTypeNone)
distributeType := DistributionType(LocalDistribution)
if distributePlan.WillDistribute() {
distributeType = DistributionTypeAlways
distributeType = FullDistribution
}
evalCtx := evalCtxFactory()
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType)
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,9 @@ func (sc *SchemaChanger) distIndexBackfill(
}
sd := NewInternalSessionData(ctx, sc.execCfg.Settings, "dist-index-backfill")
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.KV().ReadTimestamp(), txn.Descriptors())
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, /* planner */
txn.KV(), DistributionTypeAlways)
planCtx = sc.distSQLPlanner.NewPlanningCtx(
ctx, &evalCtx, nil /* planner */, txn.KV(), FullDistribution,
)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
chunkSize := sc.getChunkSize(indexBatchSize)
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes)
Expand Down Expand Up @@ -1351,8 +1352,9 @@ func (sc *SchemaChanger) distColumnBackfill(
)
defer recv.Release()

planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn.KV(),
DistributionTypeAlways)
planCtx := sc.distSQLPlanner.NewPlanningCtx(
ctx, &evalCtx, nil /* planner */, txn.KV(), FullDistribution,
)
spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3148,7 +3148,7 @@ func (ex *connExecutor) execCopyIn(
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
defer p.curPlan.close(ctx)
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, LocalDistribution, nil /* progressAtomic */)
return err
},
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1991,9 +1991,9 @@ func (ex *connExecutor) dispatchToExecutionEngine(
if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.state.mu.autoRetryReason != nil {
ex.server.cfg.TestingKnobs.OnTxnRetry(ex.state.mu.autoRetryReason, planner.EvalContext())
}
distribute := DistributionType(DistributionTypeNone)
distribute := DistributionType(LocalDistribution)
if distributePlan.WillDistribute() {
distribute = DistributionTypeAlways
distribute = FullDistribution
}
ex.sessionTracing.TraceExecStart(ctx, "distributed")
stats, err = ex.execWithDistSQLEngine(
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,7 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er
}
}

planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn.KV(),
DistributionTypeAlways)
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn.KV(), FullDistribution)
// CREATE STATS flow doesn't produce any rows and only emits the
// metadata, so we can use a nil rowContainerHelper.
resultWriter := NewRowResultWriter(nil /* rowContainer */)
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,12 @@ type DistSQLPlanner struct {
type DistributionType int

const (
// DistributionTypeNone does not distribute a plan across multiple instances.
DistributionTypeNone = iota
// DistributionTypeAlways distributes a plan across multiple instances whether
// LocalDistribution does not distribute a plan across multiple SQL
// instances.
LocalDistribution = iota
// FullDistribution distributes a plan across multiple SQL instances whether
// it is a system tenant or non-system tenant.
DistributionTypeAlways
FullDistribution
)

// ReplicaOraclePolicy controls which policy the physical planner uses to choose
Expand Down Expand Up @@ -4837,7 +4838,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
oracle replicaoracle.Oracle,
localityFiler roachpb.Locality,
) *PlanningCtx {
distribute := distributionType == DistributionTypeAlways
distribute := distributionType == FullDistribution
infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), dsp.gatewaySQLInstanceID)
planCtx := &PlanningCtx{
ExtendedEvalCtx: evalCtx,
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,9 +1196,10 @@ func TestPartitionSpans(t *testing.T) {
},
}),
}
planCtx := dsp.NewPlanningCtxWithOracle(ctx, &extendedEvalContext{
Context: *evalCtx,
}, nil, nil, DistributionTypeAlways, physicalplan.DefaultReplicaChooser, locFilter)
planCtx := dsp.NewPlanningCtxWithOracle(
ctx, &extendedEvalContext{Context: *evalCtx}, nil, /* planner */
nil /* txn */, FullDistribution, physicalplan.DefaultReplicaChooser, locFilter,
)
planCtx.spanPartitionState.testingOverrideRandomSelection = tc.partitionState.testingOverrideRandomSelection
var spans []roachpb.Span
for _, s := range tc.spans {
Expand Down Expand Up @@ -1513,9 +1514,10 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
}

ctx := context.Background()
planCtx := dsp.NewPlanningCtx(ctx, &extendedEvalContext{
Context: eval.Context{Codec: keys.SystemSQLCodec},
}, nil, nil, DistributionTypeAlways)
planCtx := dsp.NewPlanningCtx(
ctx, &extendedEvalContext{Context: eval.Context{Codec: keys.SystemSQLCodec}},
nil /* planner */, nil /* txn */, FullDistribution,
)
partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span})
if err != nil {
t.Fatal(err)
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/distsql_plan_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func (dsp *DistSQLPlanner) setupAllNodesPlanningSystem(
oracle replicaoracle.Oracle,
localityFilter roachpb.Locality,
) (*PlanningCtx, []base.SQLInstanceID, error) {
planCtx := dsp.NewPlanningCtxWithOracle(ctx, evalCtx, nil /* planner */, nil, /* txn */
DistributionTypeAlways, oracle, localityFilter)
planCtx := dsp.NewPlanningCtxWithOracle(
ctx, evalCtx, nil /* planner */, nil /* txn */, FullDistribution, oracle, localityFilter,
)

ss, err := execCfg.NodesStatusServer.OptionalNodesStatusServer()
if err != nil {
Expand Down Expand Up @@ -99,8 +100,9 @@ func (dsp *DistSQLPlanner) setupAllNodesPlanningTenant(
oracle replicaoracle.Oracle,
localityFilter roachpb.Locality,
) (*PlanningCtx, []base.SQLInstanceID, error) {
planCtx := dsp.NewPlanningCtxWithOracle(ctx, evalCtx, nil /* planner */, nil, /* txn */
DistributionTypeAlways, oracle, localityFilter)
planCtx := dsp.NewPlanningCtxWithOracle(
ctx, evalCtx, nil /* planner */, nil /* txn */, FullDistribution, oracle, localityFilter,
)
pods, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_plan_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func PlanCDCExpression(
return cdcPlan, errors.AssertionFailedf("unexpected query structure")
}

planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, DistributionTypeNone)
planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, LocalDistribution)

return CDCExpressionPlan{
Plan: p.curPlan.main,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql_plan_ctas.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func PlanAndRunCTAS(
out execinfrapb.ProcessorCoreUnion,
recv *DistSQLReceiver,
) {
distribute := DistributionType(DistributionTypeNone)
distribute := DistributionType(LocalDistribution)
if !isLocal {
distribute = DistributionTypeAlways
distribute = FullDistribution
}
planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), planner,
txn, distribute)
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1784,9 +1784,9 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
ctx, planner.Descriptors().HasUncommittedTypes(),
planner.SessionData().DistSQLMode, subqueryPlan.plan,
).WillDistribute()
distribute := DistributionType(DistributionTypeNone)
distribute := DistributionType(LocalDistribution)
if distributeSubquery {
distribute = DistributionTypeAlways
distribute = FullDistribution
}
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute)
subqueryPlanCtx.stmtType = tree.Rows
Expand Down Expand Up @@ -2030,7 +2030,7 @@ func (dsp *DistSQLPlanner) PlanAndRun(
// is no point in providing the locality filter since it will be ignored
// anyway, so we don't use NewPlanningCtxWithOracle constructor.
localPlanCtx := dsp.NewPlanningCtx(
ctx, evalCtx, planCtx.planner, evalCtx.Txn, DistributionTypeNone,
ctx, evalCtx, planCtx.planner, evalCtx.Txn, LocalDistribution,
)
localPlanCtx.setUpForMainQuery(ctx, planCtx.planner, recv)
localPhysPlan, localPhysPlanCleanup, err := dsp.createPhysPlan(ctx, localPlanCtx, plan)
Expand Down Expand Up @@ -2280,9 +2280,9 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
ctx, planner.Descriptors().HasUncommittedTypes(),
planner.SessionData().DistSQLMode, postqueryPlan,
).WillDistribute()
distribute := DistributionType(DistributionTypeNone)
distribute := DistributionType(LocalDistribution)
if distributePostquery {
distribute = DistributionTypeAlways
distribute = FullDistribution
}
postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute)
postqueryPlanCtx.stmtType = tree.Rows
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
// We need distribute = true so that executing the plan involves marshaling
// the root txn meta to leaf txns. Local flows can start in aborted txns
// because they just use the root txn.
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, nil,
DistributionTypeAlways)
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, nil /* txn */, FullDistribution)
planCtx.stmtType = recv.stmtType

execCfg.DistSQLPlanner.PlanAndRun(
Expand Down Expand Up @@ -297,7 +296,7 @@ func TestDistSQLRunningParallelFKChecksAfterAbort(t *testing.T) {
defer p.curPlan.close(ctx)

evalCtx := p.ExtendedEvalContext()
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, txn, DistributionTypeNone)
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, txn, LocalDistribution)
planCtx.stmtType = recv.stmtType

evalCtxFactory := func(bool) *extendedEvalContext {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func newDistSQLSpecExecFactory(
planningMode: planningMode,
gatewaySQLInstanceID: p.extendedEvalCtx.DistSQLPlanner.gatewaySQLInstanceID,
}
distribute := DistributionType(DistributionTypeNone)
distribute := DistributionType(LocalDistribution)
if e.planningMode != distSQLLocalOnlyPlanning {
distribute = DistributionTypeAlways
distribute = FullDistribution
}
evalCtx := p.ExtendedEvalContext()
e.planCtx = e.dsp.NewPlanningCtx(ctx, evalCtx, e.planner, e.planner.txn, distribute)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func newPlanningCtxForExplainPurposes(
subqueryPlans []subquery,
distribution physicalplan.PlanDistribution,
) *PlanningCtx {
distribute := DistributionType(DistributionTypeNone)
distribute := DistributionType(LocalDistribution)
if distribution.WillDistribute() {
distribute = DistributionTypeAlways
distribute = FullDistribution
}
planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx,
params.p, params.p.txn, distribute)
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ func (ib *IndexBackfillPlanner) plan(
) error {
sd := NewInternalSessionData(ctx, ib.execCfg.Settings, "plan-index-backfill")
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, sd, nowTimestamp, descriptors)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx,
nil /* planner */, txn.KV(), DistributionTypeAlways)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(
ctx, &evalCtx, nil /* planner */, txn.KV(), FullDistribution,
)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/mvcc_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ func (im *IndexBackfillerMergePlanner) plan(
) error {
sd := NewInternalSessionData(ctx, im.execCfg.Settings, "plan-index-backfill-merge")
evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, sd, txn.KV().ReadTimestamp(), descriptors)
planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn.KV(),
DistributionTypeAlways)
planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(
ctx, &evalCtx, nil /* planner */, txn.KV(), FullDistribution,
)

spec, err := initIndexBackfillMergerSpec(*tableDesc.TableDesc(), addedIndexes, temporaryIndexes, mergeTimestamp)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ func (dsp *DistSQLPlanner) Exec(
)
defer recv.Release()

distributionType := DistributionType(DistributionTypeNone)
distributionType := DistributionType(LocalDistribution)
if distribute {
distributionType = DistributionTypeAlways
distributionType = FullDistribution
}
evalCtx := p.ExtendedEvalContext()
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, p.txn,
Expand Down Expand Up @@ -181,7 +181,7 @@ func (dsp *DistSQLPlanner) ExecLocalAll(
)
defer recv.Release()

distributionType := DistributionType(DistributionTypeNone)
distributionType := DistributionType(LocalDistribution)
evalCtx := p.ExtendedEvalContext()
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, p.txn,
distributionType)
Expand Down

0 comments on commit 5633695

Please sign in to comment.