Skip to content

Commit

Permalink
sql: unify physical planning
Browse files Browse the repository at this point in the history
Previously system tenants and secondary tenants used different physical planning
implementations, with the system tenant and only the system tenant using nodeIDs
while other tenants used the instance table. This unifies those implementations
such that all tenants use NodeIDs if running in mixed mode and use the instance
table if not.

Release note: none.
Epic: CRDB-16910
  • Loading branch information
dt committed Mar 21, 2023
1 parent 0a72a49 commit 9f9c904
Showing 1 changed file with 63 additions and 42 deletions.
105 changes: 63 additions & 42 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,13 +1154,13 @@ func (dsp *DistSQLPlanner) PartitionSpans(
// If we're planning locally, map all spans to the gateway.
return []SpanPartition{{dsp.gatewaySQLInstanceID, spans}}, nil
}
if dsp.codec.ForSystemTenant() {
return dsp.partitionSpansSystem(ctx, planCtx, spans)
if dsp.useGossipPlanning(ctx, planCtx) {
return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans)
}
return dsp.partitionSpansTenant(ctx, planCtx, spans)
return dsp.partitionSpans(ctx, planCtx, spans)
}

// partitionSpans takes a single span and splits it up according to the owning
// partitionSpan takes a single span and splits it up according to the owning
// nodes (if the span touches multiple ranges).
//
// - partitions is the set of SpanPartitions so far. The updated set is
Expand Down Expand Up @@ -1266,14 +1266,14 @@ func (dsp *DistSQLPlanner) partitionSpan(
return partitions, lastPartitionIdx, nil
}

// partitionSpansSystem finds node owners for ranges touching the given spans
// deprecatedPartitionSpansSystem finds node owners for ranges touching the given spans
// for a system tenant.
func (dsp *DistSQLPlanner) partitionSpansSystem(
func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, _ error) {
nodeMap := make(map[base.SQLInstanceID]int)
resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.getSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
}
for _, span := range spans {
var err error
Expand All @@ -1287,14 +1287,16 @@ func (dsp *DistSQLPlanner) partitionSpansSystem(
return partitions, nil
}

// partitionSpansTenant assigns SQL instances in a tenant to spans. It performs
// region-aware physical planning among all available SQL instances if the
// region information is available on at least some of the instances, and it
// falls back to naive round-robin assignment if not.
func (dsp *DistSQLPlanner) partitionSpansTenant(
// partitionSpans assigns SQL instances to spans. In mixed sql and KV mode it
// generally assigns each span to the instance hosted on the KV node chosen by
// the configured replica oracle, while in clusters operating with standalone
// SQL instances it performs locality-aware physical planning among all
// available SQL instances if the locality info is available on at least some of
// the instances, and it falls back to naive round-robin assignment if not.
func (dsp *DistSQLPlanner) partitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, _ error) {
resolver, instances, hasLocalitySet, err := dsp.makeSQLInstanceIDForKVNodeIDTenantResolver(ctx)
resolver, instances, err := dsp.makeInstanceResolver(ctx)
if err != nil {
return nil, err
}
Expand All @@ -1305,7 +1307,7 @@ func (dsp *DistSQLPlanner) partitionSpansTenant(
// Rows with column families may have been split into different spans.
// These spans should be assigned the same pod so that the pod can
// stitch together the rows correctly. Split rows are in adjacent spans.
if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil {
if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 {
if safeKey.Equal(lastKey) {
if log.V(1) {
log.Infof(ctx, "partitioning span %s", span)
Expand All @@ -1323,17 +1325,17 @@ func (dsp *DistSQLPlanner) partitionSpansTenant(
return nil, err
}
}
if err = dsp.maybeReassignToGatewaySQLInstance(partitions, instances, hasLocalitySet); err != nil {
if err = dsp.maybeReassignToGatewaySQLInstance(partitions, instances); err != nil {
return nil, err
}
return partitions, nil
}

// getSQLInstanceIDForKVNodeIDSystem returns the SQL instance ID that should
// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should
// handle the range with the given node ID when planning is done on behalf of
// the system tenant. It ensures that the chosen SQL instance is healthy and of
// the compatible DistSQL version.
func (dsp *DistSQLPlanner) getSQLInstanceIDForKVNodeIDSystem(
func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID,
) base.SQLInstanceID {
sqlInstanceID := base.SQLInstanceID(nodeID)
Expand All @@ -1348,42 +1350,55 @@ func (dsp *DistSQLPlanner) getSQLInstanceIDForKVNodeIDSystem(
return sqlInstanceID
}

// makeSQLInstanceIDForKVNodeIDTenantResolver returns a function that can choose
// the SQL instance ID for a provided node ID on behalf of a tenant. It also
// returns a list of all healthy instances for the current tenant as well as a
// boolean indicating whether the locality information is available for at least
// some of those instances.
func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver(
// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an
// instance that is hosted in the process of a KV node. Currently SQL
// instances run in KV node processes have IDs fixed to be equal to the KV
// nodes' IDs, and all of the SQL instances for a given tenant are _either_
// run in this mixed mode or standalone, meaning if this server is in mixed
// mode, we can safely assume every other server is as well, and thus has
// IDs matching node IDs.
func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID {
return base.SQLInstanceID(nodeID)
}

// makeInstanceResolver returns a function that can choose the SQL instance ID
// for a provided KV node ID. It also returns a list of all healthy instances if
// that list was used in choosing an instance, specifically if the localities of
// those instances were used to decide the assignment, for use by any steps that
// wish to post-process that assignment (such as adjusting based on localities).
// If the instance was assigned statically or the instance list had no locality
// information leading to random assignments then no instance list is returned.
func (dsp *DistSQLPlanner) makeInstanceResolver(
ctx context.Context,
) (
resolver func(roachpb.NodeID) base.SQLInstanceID,
_ []sqlinstance.InstanceInfo,
hasLocalitySet bool,
_ error,
) {
) (func(roachpb.NodeID) base.SQLInstanceID, []sqlinstance.InstanceInfo, error) {
if _, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID(); mixedProcessMode {
return instanceIDForKVNodeHostedInstance, nil, nil
}

// GetAllInstances only returns healthy instances.
// TODO(yuzefovich): confirm that all instances are of compatible version.
instances, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
return nil, nil, false, err
return nil, nil, err
}
if len(instances) == 0 {
return nil, nil, false, errors.New("no healthy sql instances available for planning")
return nil, nil, errors.New("no healthy sql instances available for planning")
}

rng, _ := randutil.NewPseudoRand()

instancesHaveLocality := false
for i := range instances {
if instances[i].Locality.NonEmpty() {
hasLocalitySet = true
instancesHaveLocality = true
break
}
}

// If we were able to determine the locality information for at least some
// instances, use the region-aware resolver.
if hasLocalitySet {
resolver = func(nodeID roachpb.NodeID) base.SQLInstanceID {
if instancesHaveLocality {
resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID {
// Lookup the node localities to compare to the instance localities.
nodeDesc, err := dsp.nodeDescs.GetNodeDescriptor(nodeID)
if err != nil {
Expand All @@ -1398,7 +1413,7 @@ func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver(
// just return the gateway.
return dsp.gatewaySQLInstanceID
}
return resolver, instances, hasLocalitySet, nil
return resolver, instances, nil
}

// If no sql instances have locality information, fallback to a naive
Expand All @@ -1409,12 +1424,12 @@ func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver(
instances[i], instances[j] = instances[j], instances[i]
})
var i int
resolver = func(roachpb.NodeID) base.SQLInstanceID {
resolver := func(roachpb.NodeID) base.SQLInstanceID {
id := instances[i%len(instances)].InstanceID
i++
return id
}
return resolver, instances, false, nil
return resolver, nil, nil
}

// closestInstances returns the subset of instances which are closest to the
Expand Down Expand Up @@ -1445,15 +1460,15 @@ func closestInstances(
// the locality information isn't available for the instances, then we assume
// the assigned instance to be in the same region as the gateway.
func (dsp *DistSQLPlanner) maybeReassignToGatewaySQLInstance(
partitions []SpanPartition, instances []sqlinstance.InstanceInfo, hasLocalitySet bool,
partitions []SpanPartition, instances []sqlinstance.InstanceInfo,
) error {
if len(partitions) != 1 || partitions[0].SQLInstanceID == dsp.gatewaySQLInstanceID {
// Keep the existing partitioning if more than one instance is used or
// the gateway is already used as the single instance.
return nil
}
var gatewayRegion, assignedRegion string
if hasLocalitySet {
if len(instances) > 0 {
assignedInstance := partitions[0].SQLInstanceID
var ok bool
for _, instance := range instances {
Expand All @@ -1480,6 +1495,7 @@ func (dsp *DistSQLPlanner) maybeReassignToGatewaySQLInstance(
}
}
}

if gatewayRegion == assignedRegion {
partitions[0].SQLInstanceID = dsp.gatewaySQLInstanceID
}
Expand Down Expand Up @@ -1512,16 +1528,21 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
return 0, err
}

if dsp.codec.ForSystemTenant() {
return dsp.getSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
if dsp.useGossipPlanning(ctx, planCtx) {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
}
resolver, _, _, err := dsp.makeSQLInstanceIDForKVNodeIDTenantResolver(ctx)
resolver, _, err := dsp.makeInstanceResolver(ctx)
if err != nil {
return 0, err
}
return resolver(replDesc.NodeID), nil
}

func (dsp *DistSQLPlanner) useGossipPlanning(ctx context.Context, _ *PlanningCtx) bool {
// TODO(dt): enable this by default, e.g. // && !dsp.distSQLSrv.Settings.Version.IsActive(ctx, clusterversion.V23_1)
return dsp.codec.ForSystemTenant()
}

// convertOrdering maps the columns in props.ordering to the output columns of a
// processor.
func (dsp *DistSQLPlanner) convertOrdering(
Expand Down

0 comments on commit 9f9c904

Please sign in to comment.