diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index fa802478e871..3dab75fe8129 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1154,13 +1154,13 @@ func (dsp *DistSQLPlanner) PartitionSpans( // If we're planning locally, map all spans to the gateway. return []SpanPartition{{dsp.gatewaySQLInstanceID, spans}}, nil } - if dsp.codec.ForSystemTenant() { - return dsp.partitionSpansSystem(ctx, planCtx, spans) + if dsp.useGossipPlanning(ctx, planCtx) { + return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans) } - return dsp.partitionSpansTenant(ctx, planCtx, spans) + return dsp.partitionSpans(ctx, planCtx, spans) } -// partitionSpans takes a single span and splits it up according to the owning +// partitionSpan takes a single span and splits it up according to the owning // nodes (if the span touches multiple ranges). // // - partitions is the set of SpanPartitions so far. The updated set is @@ -1266,14 +1266,14 @@ func (dsp *DistSQLPlanner) partitionSpan( return partitions, lastPartitionIdx, nil } -// partitionSpansSystem finds node owners for ranges touching the given spans +// deprecatedPartitionSpansSystem finds node owners for ranges touching the given spans // for a system tenant. -func (dsp *DistSQLPlanner) partitionSpansSystem( +func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (partitions []SpanPartition, _ error) { nodeMap := make(map[base.SQLInstanceID]int) resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID { - return dsp.getSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID) + return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID) } for _, span := range spans { var err error @@ -1287,14 +1287,16 @@ func (dsp *DistSQLPlanner) partitionSpansSystem( return partitions, nil } -// partitionSpansTenant assigns SQL instances in a tenant to spans. It performs -// region-aware physical planning among all available SQL instances if the -// region information is available on at least some of the instances, and it -// falls back to naive round-robin assignment if not. -func (dsp *DistSQLPlanner) partitionSpansTenant( +// partitionSpans assigns SQL instances to spans. In mixed sql and KV mode it +// generally assigns each span to the instance hosted on the KV node chosen by +// the configured replica oracle, while in clusters operating with standalone +// SQL instances it performs locality-aware physical planning among all +// available SQL instances if the locality info is available on at least some of +// the instances, and it falls back to naive round-robin assignment if not. +func (dsp *DistSQLPlanner) partitionSpans( ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (partitions []SpanPartition, _ error) { - resolver, instances, hasLocalitySet, err := dsp.makeSQLInstanceIDForKVNodeIDTenantResolver(ctx) + resolver, instances, err := dsp.makeInstanceResolver(ctx) if err != nil { return nil, err } @@ -1305,7 +1307,7 @@ func (dsp *DistSQLPlanner) partitionSpansTenant( // Rows with column families may have been split into different spans. // These spans should be assigned the same pod so that the pod can // stitch together the rows correctly. Split rows are in adjacent spans. - if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil { + if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 { if safeKey.Equal(lastKey) { if log.V(1) { log.Infof(ctx, "partitioning span %s", span) @@ -1323,17 +1325,17 @@ func (dsp *DistSQLPlanner) partitionSpansTenant( return nil, err } } - if err = dsp.maybeReassignToGatewaySQLInstance(partitions, instances, hasLocalitySet); err != nil { + if err = dsp.maybeReassignToGatewaySQLInstance(partitions, instances); err != nil { return nil, err } return partitions, nil } -// getSQLInstanceIDForKVNodeIDSystem returns the SQL instance ID that should +// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should // handle the range with the given node ID when planning is done on behalf of // the system tenant. It ensures that the chosen SQL instance is healthy and of // the compatible DistSQL version. -func (dsp *DistSQLPlanner) getSQLInstanceIDForKVNodeIDSystem( +func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem( ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID, ) base.SQLInstanceID { sqlInstanceID := base.SQLInstanceID(nodeID) @@ -1348,42 +1350,55 @@ func (dsp *DistSQLPlanner) getSQLInstanceIDForKVNodeIDSystem( return sqlInstanceID } -// makeSQLInstanceIDForKVNodeIDTenantResolver returns a function that can choose -// the SQL instance ID for a provided node ID on behalf of a tenant. It also -// returns a list of all healthy instances for the current tenant as well as a -// boolean indicating whether the locality information is available for at least -// some of those instances. -func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver( +// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an +// instance that is hosted in the process of a KV node. Currently SQL +// instances run in KV node processes have IDs fixed to be equal to the KV +// nodes' IDs, and all of the SQL instances for a given tenant are _either_ +// run in this mixed mode or standalone, meaning if this server is in mixed +// mode, we can safely assume every other server is as well, and thus has +// IDs matching node IDs. +func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID { + return base.SQLInstanceID(nodeID) +} + +// makeInstanceResolver returns a function that can choose the SQL instance ID +// for a provided KV node ID. It also returns a list of all healthy instances if +// that list was used in choosing an instance, specifically if the localities of +// those instances were used to decide the assignment, for use by any steps that +// wish to post-process that assignment (such as adjusting based on localities). +// If the instance was assigned statically or the instance list had no locality +// information leading to random assignments then no instance list is returned. +func (dsp *DistSQLPlanner) makeInstanceResolver( ctx context.Context, -) ( - resolver func(roachpb.NodeID) base.SQLInstanceID, - _ []sqlinstance.InstanceInfo, - hasLocalitySet bool, - _ error, -) { +) (func(roachpb.NodeID) base.SQLInstanceID, []sqlinstance.InstanceInfo, error) { + if _, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID(); mixedProcessMode { + return instanceIDForKVNodeHostedInstance, nil, nil + } + // GetAllInstances only returns healthy instances. // TODO(yuzefovich): confirm that all instances are of compatible version. instances, err := dsp.sqlAddressResolver.GetAllInstances(ctx) if err != nil { - return nil, nil, false, err + return nil, nil, err } if len(instances) == 0 { - return nil, nil, false, errors.New("no healthy sql instances available for planning") + return nil, nil, errors.New("no healthy sql instances available for planning") } rng, _ := randutil.NewPseudoRand() + instancesHaveLocality := false for i := range instances { if instances[i].Locality.NonEmpty() { - hasLocalitySet = true + instancesHaveLocality = true break } } // If we were able to determine the locality information for at least some // instances, use the region-aware resolver. - if hasLocalitySet { - resolver = func(nodeID roachpb.NodeID) base.SQLInstanceID { + if instancesHaveLocality { + resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID { // Lookup the node localities to compare to the instance localities. nodeDesc, err := dsp.nodeDescs.GetNodeDescriptor(nodeID) if err != nil { @@ -1398,7 +1413,7 @@ func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver( // just return the gateway. return dsp.gatewaySQLInstanceID } - return resolver, instances, hasLocalitySet, nil + return resolver, instances, nil } // If no sql instances have locality information, fallback to a naive @@ -1409,12 +1424,12 @@ func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver( instances[i], instances[j] = instances[j], instances[i] }) var i int - resolver = func(roachpb.NodeID) base.SQLInstanceID { + resolver := func(roachpb.NodeID) base.SQLInstanceID { id := instances[i%len(instances)].InstanceID i++ return id } - return resolver, instances, false, nil + return resolver, nil, nil } // closestInstances returns the subset of instances which are closest to the @@ -1445,7 +1460,7 @@ func closestInstances( // the locality information isn't available for the instances, then we assume // the assigned instance to be in the same region as the gateway. func (dsp *DistSQLPlanner) maybeReassignToGatewaySQLInstance( - partitions []SpanPartition, instances []sqlinstance.InstanceInfo, hasLocalitySet bool, + partitions []SpanPartition, instances []sqlinstance.InstanceInfo, ) error { if len(partitions) != 1 || partitions[0].SQLInstanceID == dsp.gatewaySQLInstanceID { // Keep the existing partitioning if more than one instance is used or @@ -1453,7 +1468,7 @@ func (dsp *DistSQLPlanner) maybeReassignToGatewaySQLInstance( return nil } var gatewayRegion, assignedRegion string - if hasLocalitySet { + if len(instances) > 0 { assignedInstance := partitions[0].SQLInstanceID var ok bool for _, instance := range instances { @@ -1480,6 +1495,7 @@ func (dsp *DistSQLPlanner) maybeReassignToGatewaySQLInstance( } } } + if gatewayRegion == assignedRegion { partitions[0].SQLInstanceID = dsp.gatewaySQLInstanceID } @@ -1512,16 +1528,21 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( return 0, err } - if dsp.codec.ForSystemTenant() { - return dsp.getSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil + if dsp.useGossipPlanning(ctx, planCtx) { + return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil } - resolver, _, _, err := dsp.makeSQLInstanceIDForKVNodeIDTenantResolver(ctx) + resolver, _, err := dsp.makeInstanceResolver(ctx) if err != nil { return 0, err } return resolver(replDesc.NodeID), nil } +func (dsp *DistSQLPlanner) useGossipPlanning(ctx context.Context, _ *PlanningCtx) bool { + // TODO(dt): enable this by default, e.g. // && !dsp.distSQLSrv.Settings.Version.IsActive(ctx, clusterversion.V23_1) + return dsp.codec.ForSystemTenant() +} + // convertOrdering maps the columns in props.ordering to the output columns of a // processor. func (dsp *DistSQLPlanner) convertOrdering(