diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 88345cf4dbbf..817354c70892 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -102,6 +102,7 @@ go_library( "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgnotice", "//pkg/sql/physicalplan", + "//pkg/sql/physicalplan/replicaoracle", "//pkg/sql/privilege", "//pkg/sql/protoreflect", "//pkg/sql/roleoption", diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index dec5ac0ee6f2..08b9d6096e4d 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/stats" @@ -164,10 +165,19 @@ func backup( evalCtx := execCtx.ExtendedEvalContext() dsp := execCtx.DistSQLPlanner() + oracleCfg := replicaoracle.Config{ + NodeDescs: execCtx.ExecCfg().NodeDescs, + NodeID: 0, // Planning node ID not important. + Locality: roachpb.Locality{}, // Planning node locality not important. + Settings: execCtx.ExecCfg().Settings, + Clock: execCtx.ExecCfg().Clock, + RPCContext: execCtx.ExecCfg().RPCContext, + } + oracle := replicaoracle.NewOracle(replicaoracle.PreferFollowerChoice, oracleCfg) // We don't return the compatible nodes here since PartitionSpans will // filter out incompatible nodes. - planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) + planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg(), oracle) if err != nil { return roachpb.RowCount{}, errors.Wrap(err, "failed to determine nodes on which to run") } @@ -198,7 +208,7 @@ func backup( } numTotalSpans := 0 - for _, spec := range backupSpecs { + for nodeID, spec := range backupSpecs { numTotalSpans += len(spec.IntroducedSpans) + len(spec.Spans) } diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 3f422f296afd..c4e7163edca5 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -100,7 +100,7 @@ func distRestore( makePlan := func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { - planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg()) + planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg(), nil) if err != nil { return nil, nil, err } diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index ec562bd8f5da..2a1aa447bc6b 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -347,8 +347,7 @@ func makePlan( distMode = sql.DistributionTypeNone } - planCtx := dsp.NewPlanningCtx(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn, - sql.DistributionType(distMode)) + planCtx := dsp.NewPlanningCtx(ctx, execCtx.ExtendedEvalContext(), nil, blankTxn, sql.DistributionType(distMode), nil) spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans) if err != nil { return nil, nil, err diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index b3e1478b159a..7a6d28c4419b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -260,7 +260,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. evalCtx := execCtx.ExtendedEvalContext() dsp := execCtx.DistSQLPlanner() - planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) + planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg(), nil) if err != nil { return err diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index ce679ac8b9af..23e1ae19e642 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -221,8 +221,7 @@ func getReplicationStreamSpec( // Partition the spans with SQLPlanner var noTxn *kv.Txn dsp := jobExecCtx.DistSQLPlanner() - planCtx := dsp.NewPlanningCtx(ctx, jobExecCtx.ExtendedEvalContext(), - nil /* planner */, noTxn, sql.DistributionTypeSystemTenantOnly) + planCtx := dsp.NewPlanningCtx(ctx, jobExecCtx.ExtendedEvalContext(), nil, noTxn, sql.DistributionTypeSystemTenantOnly, nil) details, ok := j.Details().(jobspb.StreamReplicationDetails) if !ok { diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 80fb709898e9..4a957496347f 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -728,6 +728,7 @@ go_test( "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgwirebase", "//pkg/sql/physicalplan", + "//pkg/sql/physicalplan/replicaoracle", "//pkg/sql/privilege", "//pkg/sql/querycache", "//pkg/sql/randgen", diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index 4d648255142b..152b65d08280 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -324,8 +324,7 @@ func runPlanInsidePlan( if distributePlan.WillDistribute() { distributeType = DistributionTypeAlways } - planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.NewPlanningCtx( - ctx, evalCtx, &plannerCopy, params.p.txn, distributeType) + planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, params.p.txn, distributeType, nil) planCtx.planner.curPlan.planComponents = *plan planCtx.ExtendedEvalCtx.Planner = &plannerCopy planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index d33a6dd99808..4e876b1ba823 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -862,7 +862,7 @@ func numRangesInSpans( ctx context.Context, db *kv.DB, distSQLPlanner *DistSQLPlanner, spans []roachpb.Span, ) (int, error) { txn := db.NewTxn(ctx, "num-ranges-in-spans") - spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn) + spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn, nil) rangeIds := make(map[int64]struct{}) for _, span := range spans { // For each span, iterate the spanResolver until it's exhausted, storing @@ -896,7 +896,7 @@ func NumRangesInSpanContainedBy( containedBy []roachpb.Span, ) (total, inContainedBy int, _ error) { txn := db.NewTxn(ctx, "num-ranges-in-spans") - spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn) + spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn, nil) // For each span, iterate the spanResolver until it's exhausted, storing // the found range ids in the map to de-duplicate them. spanResolver.Seek(ctx, outerSpan, kvcoord.Ascending) @@ -1026,8 +1026,7 @@ func (sc *SchemaChanger) distIndexBackfill( } sd := NewFakeSessionData(sc.execCfg.SV()) evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.ReadTimestamp(), descriptors) - planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, /* planner */ - txn, DistributionTypeSystemTenantOnly) + planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil) indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV) chunkSize := sc.getChunkSize(indexBatchSize) spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes) @@ -1334,8 +1333,7 @@ func (sc *SchemaChanger) distColumnBackfill( ) defer recv.Release() - planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, - DistributionTypeSystemTenantOnly) + planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil) spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf) if err != nil { return err diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 2a236d35e5ee..1c465688c804 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1585,8 +1585,7 @@ func (ex *connExecutor) execWithDistSQLEngine( defer recv.Release() evalCtx := planner.ExtendedEvalContext() - planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, - planner.txn, distribute) + planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute, nil) planCtx.stmtType = recv.stmtType // Skip the diagram generation since on this "main" query path we can get it // via the statement bundle. diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 1afde90b44d1..1c5511b7c2b4 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -618,8 +618,7 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er } } - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn, - DistributionTypeSystemTenantOnly) + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil) // CREATE STATS flow doesn't produce any rows and only emits the // metadata, so we can use a nil rowContainerHelper. resultWriter := NewRowResultWriter(nil /* rowContainer */) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index c58a78f0bf86..401bba742ab0 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -4228,6 +4228,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtx( planner *planner, txn *kv.Txn, distributionType DistributionType, + oracle replicaoracle.Oracle, ) *PlanningCtx { distribute := distributionType == DistributionTypeAlways || (distributionType == DistributionTypeSystemTenantOnly && evalCtx.Codec.ForSystemTenant()) planCtx := &PlanningCtx{ @@ -4254,7 +4255,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtx( // we still need to instantiate a full planning context. planCtx.parallelizeScansIfLocal = true } - planCtx.spanIter = dsp.spanResolver.NewSpanResolverIterator(txn) + planCtx.spanIter = dsp.spanResolver.NewSpanResolverIterator(txn, oracle) planCtx.nodeStatuses = make(map[base.SQLInstanceID]NodeStatus) planCtx.nodeStatuses[dsp.gatewaySQLInstanceID] = NodeOK return planCtx diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 7bab41625173..428f6cbaa78d 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -578,7 +579,9 @@ type testSpanResolver struct { } // NewSpanResolverIterator is part of the SpanResolver interface. -func (tsr *testSpanResolver) NewSpanResolverIterator(_ *kv.Txn) physicalplan.SpanResolverIterator { +func (tsr *testSpanResolver) NewSpanResolverIterator( + txn *kv.Txn, optionalOracle replicaoracle.Oracle, +) physicalplan.SpanResolverIterator { return &testSpanResolverIterator{tsr: tsr} } @@ -856,7 +859,7 @@ func TestPartitionSpans(t *testing.T) { ctx := context.Background() planCtx := dsp.NewPlanningCtx(ctx, &extendedEvalContext{ Context: eval.Context{Codec: keys.SystemSQLCodec}, - }, nil, nil, DistributionTypeSystemTenantOnly) + }, nil, nil, DistributionTypeSystemTenantOnly, nil) var spans []roachpb.Span for _, s := range tc.spans { spans = append(spans, roachpb.Span{Key: roachpb.Key(s[0]), EndKey: roachpb.Key(s[1])}) @@ -1042,7 +1045,7 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) { ctx := context.Background() planCtx := dsp.NewPlanningCtx(ctx, &extendedEvalContext{ Context: eval.Context{Codec: keys.SystemSQLCodec}, - }, nil, nil, DistributionTypeSystemTenantOnly) + }, nil, nil, DistributionTypeSystemTenantOnly, nil) partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span}) if err != nil { t.Fatal(err) @@ -1143,7 +1146,7 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { ctx := context.Background() planCtx := dsp.NewPlanningCtx(ctx, &extendedEvalContext{ Context: eval.Context{Codec: keys.SystemSQLCodec}, - }, nil, nil, DistributionTypeSystemTenantOnly) + }, nil, nil, DistributionTypeSystemTenantOnly, nil) partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span}) if err != nil { t.Fatal(err) diff --git a/pkg/sql/distsql_plan_bulk.go b/pkg/sql/distsql_plan_bulk.go index ae5f98fbe54d..b95e98c93727 100644 --- a/pkg/sql/distsql_plan_bulk.go +++ b/pkg/sql/distsql_plan_bulk.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -23,21 +24,26 @@ import ( // SetupAllNodesPlanning creates a planCtx and sets up the planCtx.nodeStatuses // map for all nodes. It returns all nodes that can be used for planning. func (dsp *DistSQLPlanner) SetupAllNodesPlanning( - ctx context.Context, evalCtx *extendedEvalContext, execCfg *ExecutorConfig, + ctx context.Context, + evalCtx *extendedEvalContext, + execCfg *ExecutorConfig, + oracle replicaoracle.Oracle, ) (*PlanningCtx, []base.SQLInstanceID, error) { if dsp.codec.ForSystemTenant() { - return dsp.setupAllNodesPlanningSystem(ctx, evalCtx, execCfg) + return dsp.setupAllNodesPlanningSystem(ctx, evalCtx, execCfg, oracle) } - return dsp.setupAllNodesPlanningTenant(ctx, evalCtx, execCfg) + return dsp.setupAllNodesPlanningTenant(ctx, evalCtx, execCfg, oracle) } // setupAllNodesPlanningSystem creates a planCtx and returns all nodes available // in a system tenant. func (dsp *DistSQLPlanner) setupAllNodesPlanningSystem( - ctx context.Context, evalCtx *extendedEvalContext, execCfg *ExecutorConfig, + ctx context.Context, + evalCtx *extendedEvalContext, + execCfg *ExecutorConfig, + oracle replicaoracle.Oracle, ) (*PlanningCtx, []base.SQLInstanceID, error) { - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, nil, /* txn */ - DistributionTypeAlways) + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil, nil, DistributionTypeAlways, oracle) ss, err := execCfg.NodesStatusServer.OptionalNodesStatusServer(47900) if err != nil { @@ -65,13 +71,15 @@ func (dsp *DistSQLPlanner) setupAllNodesPlanningSystem( // setupAllNodesPlanningTenant creates a planCtx and returns all nodes available // in a non-system tenant. func (dsp *DistSQLPlanner) setupAllNodesPlanningTenant( - ctx context.Context, evalCtx *extendedEvalContext, execCfg *ExecutorConfig, + ctx context.Context, + evalCtx *extendedEvalContext, + execCfg *ExecutorConfig, + oracle replicaoracle.Oracle, ) (*PlanningCtx, []base.SQLInstanceID, error) { if dsp.sqlAddressResolver == nil { return nil, nil, errors.New("sql instance provider not available in multi-tenant environment") } - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, nil, /* txn */ - DistributionTypeAlways) + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil, nil, DistributionTypeAlways, oracle) pods, err := dsp.sqlAddressResolver.GetAllInstances(ctx) if err != nil { return nil, nil, err diff --git a/pkg/sql/distsql_plan_changefeed.go b/pkg/sql/distsql_plan_changefeed.go index 5b07d2e2ae96..6834e6b3dacc 100644 --- a/pkg/sql/distsql_plan_changefeed.go +++ b/pkg/sql/distsql_plan_changefeed.go @@ -139,7 +139,7 @@ func PlanCDCExpression( return cdcPlan, errors.AssertionFailedf("unexpected query structure") } - planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, DistributionTypeNone) + planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, DistributionTypeNone, nil) return CDCExpressionPlan{ Plan: p.curPlan.main, diff --git a/pkg/sql/distsql_plan_ctas.go b/pkg/sql/distsql_plan_ctas.go index a4f9df99ede7..cdc4c987885b 100644 --- a/pkg/sql/distsql_plan_ctas.go +++ b/pkg/sql/distsql_plan_ctas.go @@ -35,8 +35,7 @@ func PlanAndRunCTAS( if !isLocal { distribute = DistributionTypeSystemTenantOnly } - planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), planner, - txn, distribute) + planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), planner, txn, distribute, nil) planCtx.stmtType = tree.Rows physPlan, cleanup, err := dsp.createPhysPlan(ctx, planCtx, in) diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 4710a40204c8..61a2a3a43ead 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1606,8 +1606,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( if distributeSubquery { distribute = DistributionTypeAlways } - subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, - distribute) + subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute, nil) subqueryPlanCtx.stmtType = tree.Rows subqueryPlanCtx.skipDistSQLDiagramGeneration = skipDistSQLDiagramGeneration if planner.instrumentation.ShouldSaveFlows() { @@ -1938,7 +1937,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( if distributePostquery { distribute = DistributionTypeAlways } - postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute) + postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute, nil) postqueryPlanCtx.stmtType = tree.Rows // Postqueries are only executed on the main query path where we skip the // diagram generation. diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 29b512baa514..3aca7aeceb01 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -177,8 +177,7 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { // We need distribute = true so that executing the plan involves marshaling // the root txn meta to leaf txns. Local flows can start in aborted txns // because they just use the root txn. - planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, nil, - DistributionTypeSystemTenantOnly) + planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, nil, DistributionTypeSystemTenantOnly, nil) planCtx.stmtType = recv.stmtType execCfg.DistSQLPlanner.PlanAndRun( diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 56e5af9edf31..070907786007 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -78,7 +78,7 @@ func newDistSQLSpecExecFactory( distribute = DistributionTypeSystemTenantOnly } evalCtx := p.ExtendedEvalContext() - e.planCtx = e.dsp.NewPlanningCtx(ctx, evalCtx, e.planner, e.planner.txn, distribute) + e.planCtx = e.dsp.NewPlanningCtx(ctx, evalCtx, e.planner, e.planner.txn, distribute, nil) return e } diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index 7528200e5115..b8d525652cdc 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -126,8 +126,7 @@ func newPlanningCtxForExplainPurposes( if distribution.WillDistribute() { distribute = DistributionTypeAlways } - planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx, - params.p, params.p.txn, distribute) + planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx, params.p, params.p.txn, distribute, nil) planCtx.planner.curPlan.subqueryPlans = subqueryPlans for i := range planCtx.planner.curPlan.subqueryPlans { p := &planCtx.planner.curPlan.subqueryPlans[i] diff --git a/pkg/sql/importer/import_processor_planning.go b/pkg/sql/importer/import_processor_planning.go index b4e659ae5208..cc31ec66963a 100644 --- a/pkg/sql/importer/import_processor_planning.go +++ b/pkg/sql/importer/import_processor_planning.go @@ -75,7 +75,7 @@ func distImport( makePlan := func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { evalCtx := execCtx.ExtendedEvalContext() - planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) + planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg(), nil) if err != nil { return nil, nil, err } diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index ca31cf02ca58..66b2e1d059ee 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -168,8 +168,7 @@ func (ib *IndexBackfillPlanner) plan( ) error { sd := NewFakeSessionData(ib.execCfg.SV()) evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, sd, nowTimestamp, descriptors) - planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, - nil /* planner */, txn, DistributionTypeSystemTenantOnly) + planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil) // TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the // batch size. Also plumb in a testing knob. chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV) diff --git a/pkg/sql/mvcc_backfiller.go b/pkg/sql/mvcc_backfiller.go index 70555ae4ce46..abd7855a9758 100644 --- a/pkg/sql/mvcc_backfiller.go +++ b/pkg/sql/mvcc_backfiller.go @@ -128,8 +128,7 @@ func (im *IndexBackfillerMergePlanner) plan( ) error { sd := NewFakeSessionData(im.execCfg.SV()) evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, sd, txn.ReadTimestamp(), descriptors) - planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, - DistributionTypeSystemTenantOnly) + planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, txn, DistributionTypeSystemTenantOnly, nil) spec, err := initIndexBackfillMergerSpec(*tableDesc.TableDesc(), addedIndexes, temporaryIndexes, mergeTimestamp) if err != nil { diff --git a/pkg/sql/physicalplan/fake_span_resolver.go b/pkg/sql/physicalplan/fake_span_resolver.go index a4edc005e684..0da34b939410 100644 --- a/pkg/sql/physicalplan/fake_span_resolver.go +++ b/pkg/sql/physicalplan/fake_span_resolver.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" ) @@ -65,7 +66,9 @@ type fakeSpanResolverIterator struct { } // NewSpanResolverIterator is part of the SpanResolver interface. -func (fsr *fakeSpanResolver) NewSpanResolverIterator(txn *kv.Txn) SpanResolverIterator { +func (fsr *fakeSpanResolver) NewSpanResolverIterator( + txn *kv.Txn, optionalOracle replicaoracle.Oracle, +) SpanResolverIterator { rng, _ := randutil.NewTestRand() return &fakeSpanResolverIterator{fsr: fsr, db: txn.DB(), rng: rng} } diff --git a/pkg/sql/physicalplan/fake_span_resolver_test.go b/pkg/sql/physicalplan/fake_span_resolver_test.go index 153be25e2671..f690c9473c53 100644 --- a/pkg/sql/physicalplan/fake_span_resolver_test.go +++ b/pkg/sql/physicalplan/fake_span_resolver_test.go @@ -54,7 +54,7 @@ func TestFakeSpanResolver(t *testing.T) { db := tc.Server(0).DB() txn := kv.NewTxn(ctx, db, tc.Server(0).NodeID()) - it := resolver.NewSpanResolverIterator(txn) + it := resolver.NewSpanResolverIterator(txn, nil) tableDesc := desctestutils.TestingGetPublicTableDescriptor(db, keys.SystemSQLCodec, "test", "t") primIdxValDirs := catalogkeys.IndexKeyValDirs(tableDesc.GetPrimaryIndex()) diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 929df9b8e6c4..c85ef3f2ec80 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -16,6 +16,7 @@ import ( "context" "math" "math/rand" + "sort" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" @@ -38,6 +39,8 @@ var ( BinPackingChoice = RegisterPolicy(newBinPackingOracle) // ClosestChoice chooses the node closest to the current node. ClosestChoice = RegisterPolicy(newClosestOracle) + // PreferFollowerChoice prefers choosing followers over leaseholders. + PreferFollowerChoice = RegisterPolicy(newPreferFollowerOracle) ) // Config is used to construct an OracleFactory. @@ -288,3 +291,40 @@ func latencyFunc(rpcCtx *rpc.Context) kvcoord.LatencyFunc { } return nil } + +type preferFollowerOracle struct { + nodeDescs kvcoord.NodeDescStore +} + +func newPreferFollowerOracle(cfg Config) Oracle { + return &preferFollowerOracle{nodeDescs: cfg.NodeDescs} +} + +func (o preferFollowerOracle) ChoosePreferredReplica( + ctx context.Context, + _ *kv.Txn, + desc *roachpb.RangeDescriptor, + _ *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, + _ QueryState, +) (roachpb.ReplicaDescriptor, error) { + replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc, kvcoord.AllExtantReplicas) + if err != nil { + return roachpb.ReplicaDescriptor{}, err + } + + leaseholders, err := replicaSliceOrErr(ctx, o.nodeDescs, desc, kvcoord.OnlyPotentialLeaseholders) + if err != nil { + return roachpb.ReplicaDescriptor{}, err + } + leaseholderNodeIDs := make(map[roachpb.NodeID]bool) + for i := range leaseholders { + leaseholderNodeIDs[leaseholders[i].NodeID] = true + } + + sort.Slice(replicas, func(i, j int) bool { + return !leaseholderNodeIDs[replicas[i].NodeID] && leaseholderNodeIDs[replicas[j].NodeID] + }) + + return replicas[0].ReplicaDescriptor, nil +} diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index 1cce2c93e815..ccbfff3bdc79 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -42,7 +42,7 @@ func TestClosest(t *testing.T) { ctx := context.Background() stopper := stop.NewStopper() defer stopper.Stop(ctx) - g, _ := makeGossip(t, stopper) + g, _ := makeGossip(t, stopper, []int{2, 3}) nd2, err := g.GetNodeDescriptor(2) require.NoError(t, err) o := NewOracle(ClosestChoice, Config{ @@ -83,7 +83,7 @@ func TestClosest(t *testing.T) { }) } -func makeGossip(t *testing.T, stopper *stop.Stopper) (*gossip.Gossip, *hlc.Clock) { +func makeGossip(t *testing.T, stopper *stop.Stopper, nodeIDs []int) (*gossip.Gossip, *hlc.Clock) { clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) ctx := context.Background() rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) @@ -97,7 +97,8 @@ func makeGossip(t *testing.T, stopper *stop.Stopper) (*gossip.Gossip, *hlc.Clock if err := g.AddInfo(gossip.KeySentinel, nil, time.Hour); err != nil { t.Fatal(err) } - for i := roachpb.NodeID(2); i <= 3; i++ { + for _, id := range nodeIDs { + i := roachpb.NodeID(id) err := g.AddInfoProto(gossip.MakeNodeIDKey(i), newNodeDesc(i), gossip.NodeDescriptorTTL) if err != nil { t.Fatal(err) @@ -120,3 +121,48 @@ func newNodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor { }, } } + +func TestPreferFollower(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + g, _ := makeGossip(t, stopper, []int{2, 3, 4, 5, 6}) + o := NewOracle(PreferFollowerChoice, Config{ + NodeDescs: g, + }) + internalReplicas := []roachpb.ReplicaDescriptor{ + {ReplicaID: 2, NodeID: 2, StoreID: 2, Type: roachpb.VOTER_FULL}, + {ReplicaID: 3, NodeID: 3, StoreID: 3, Type: roachpb.VOTER_FULL}, + {ReplicaID: 4, NodeID: 4, StoreID: 4, Type: roachpb.VOTER_FULL}, + {ReplicaID: 5, NodeID: 5, StoreID: 5, Type: roachpb.NON_VOTER}, + {ReplicaID: 6, NodeID: 6, StoreID: 6, Type: roachpb.NON_VOTER}, + } + rand.Shuffle(len(internalReplicas), func(i, j int) { + internalReplicas[i], internalReplicas[j] = internalReplicas[j], internalReplicas[i] + }) + info, err := o.ChoosePreferredReplica( + ctx, + nil, /* txn */ + &roachpb.RangeDescriptor{ + InternalReplicas: internalReplicas, + }, + nil, /* leaseHolder */ + roachpb.LAG_BY_CLUSTER_SETTING, + QueryState{}, + ) + if err != nil { + t.Fatalf("Failed to choose follower replica: %v", err) + } + + fullVoters := make(map[roachpb.NodeID]bool) + for _, r := range internalReplicas { + if r.Type == roachpb.VOTER_FULL { + fullVoters[r.NodeID] = true + } + } + + if fullVoters[info.NodeID] { + t.Fatalf("Chose a VOTER_FULL replica: %d", info.NodeID) + } +} diff --git a/pkg/sql/physicalplan/span_resolver.go b/pkg/sql/physicalplan/span_resolver.go index e876940701e6..28c08ab85364 100644 --- a/pkg/sql/physicalplan/span_resolver.go +++ b/pkg/sql/physicalplan/span_resolver.go @@ -63,7 +63,7 @@ import ( type SpanResolver interface { // NewSpanResolverIterator creates a new SpanResolverIterator. // Txn is used for testing and for determining if follower reads are possible. - NewSpanResolverIterator(txn *kv.Txn) SpanResolverIterator + NewSpanResolverIterator(txn *kv.Txn, optionalOracle replicaoracle.Oracle) SpanResolverIterator } // SpanResolverIterator is used to iterate over the ranges composing a key span. @@ -170,11 +170,17 @@ type spanResolverIterator struct { var _ SpanResolverIterator = &spanResolverIterator{} // NewSpanResolverIterator creates a new SpanResolverIterator. -func (sr *spanResolver) NewSpanResolverIterator(txn *kv.Txn) SpanResolverIterator { +func (sr *spanResolver) NewSpanResolverIterator( + txn *kv.Txn, optionalOracle replicaoracle.Oracle, +) SpanResolverIterator { + oracle := optionalOracle + if optionalOracle == nil { + oracle = sr.oracle + } return &spanResolverIterator{ txn: txn, it: kvcoord.MakeRangeIterator(sr.distSender), - oracle: sr.oracle, + oracle: oracle, queryState: replicaoracle.MakeQueryState(), } } diff --git a/pkg/sql/physicalplan/span_resolver_test.go b/pkg/sql/physicalplan/span_resolver_test.go index 7ab53881ea7b..1f17aa01ae13 100644 --- a/pkg/sql/physicalplan/span_resolver_test.go +++ b/pkg/sql/physicalplan/span_resolver_test.go @@ -111,7 +111,7 @@ func TestSpanResolverUsesCaches(t *testing.T) { // Resolve the spans. Since the range descriptor cache doesn't have any // leases, all the ranges should be grouped and "assigned" to replica 0. - replicas, err := resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil), spans...) + replicas, err := resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil, nil), spans...) if err != nil { t.Fatal(err) } @@ -138,7 +138,7 @@ func TestSpanResolverUsesCaches(t *testing.T) { if err := populateCache(tc.Conns[3], 3 /* expectedNumRows */); err != nil { t.Fatal(err) } - replicas, err = resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil), spans...) + replicas, err = resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil, nil), spans...) if err != nil { t.Fatal(err) } @@ -209,7 +209,7 @@ func TestSpanResolver(t *testing.T) { replicaoracle.BinPackingChoice) ctx := context.Background() - it := lr.NewSpanResolverIterator(nil) + it := lr.NewSpanResolverIterator(nil, nil) testCases := []struct { spans []roachpb.Span @@ -308,7 +308,7 @@ func TestMixedDirections(t *testing.T) { replicaoracle.BinPackingChoice) ctx := context.Background() - it := lr.NewSpanResolverIterator(nil) + it := lr.NewSpanResolverIterator(nil, nil) spans := []spanWithDir{ orient(kvcoord.Ascending, makeSpan(tableDesc, 11, 15))[0], diff --git a/pkg/sql/testutils.go b/pkg/sql/testutils.go index 030caee9ae1c..4ff0641be39b 100644 --- a/pkg/sql/testutils.go +++ b/pkg/sql/testutils.go @@ -153,8 +153,7 @@ func (dsp *DistSQLPlanner) Exec( distributionType = DistributionTypeSystemTenantOnly } evalCtx := p.ExtendedEvalContext() - planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, p.txn, - distributionType) + planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, p.txn, distributionType, nil) planCtx.stmtType = recv.stmtType dsp.PlanAndRun(ctx, evalCtx, planCtx, p.txn, p.curPlan.main, recv) @@ -180,8 +179,7 @@ func (dsp *DistSQLPlanner) ExecLocalAll( distributionType := DistributionType(DistributionTypeNone) evalCtx := p.ExtendedEvalContext() - planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, p.txn, - distributionType) + planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, p.txn, distributionType, nil) planCtx.stmtType = recv.stmtType var evalCtxFactory func() *extendedEvalContext diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index bc86ec6fa419..d7f24f9cedf0 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -207,7 +207,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err // We don't return the compatible nodes here since PartitionSpans will // filter out incompatible nodes. - planCtx, _, err := distSQLPlanner.SetupAllNodesPlanning(ctx, evalCtx, execCfg) + planCtx, _, err := distSQLPlanner.SetupAllNodesPlanning(ctx, evalCtx, execCfg, nil) if err != nil { return err }