From 5bcbdcd1c099e52abaead4f42a1e2c4581d044ed Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 13 Mar 2023 13:16:27 +0000 Subject: [PATCH] sql: unify physical planning Previously system tenants and secondary tenants used different physical planning implementations, with the system tenant and only the system tenant using nodeIDs while other tenants used the instance table. This unifies those implementations such that all tenants use NodeIDs if running in mixed mode and use the instance table if not. Release note: none. Epic: CRDB-16910 --- pkg/sql/distsql_physical_planner.go | 147 +++++++++++++---------- pkg/sql/distsql_physical_planner_test.go | 6 +- 2 files changed, 86 insertions(+), 67 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 9c023ad9887a..3f696c2fcbd4 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -1152,13 +1153,13 @@ func (dsp *DistSQLPlanner) PartitionSpans( // If we're planning locally, map all spans to the gateway. return []SpanPartition{{dsp.gatewaySQLInstanceID, spans}}, nil } - if dsp.codec.ForSystemTenant() { - return dsp.partitionSpansSystem(ctx, planCtx, spans) + if dsp.codec.ForSystemTenant() && !dsp.st.Version.IsActive(ctx, clusterversion.V23_1) { + return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans) } - return dsp.partitionSpansTenant(ctx, planCtx, spans) + return dsp.partitionSpans(ctx, planCtx, spans) } -// partitionSpans takes a single span and splits it up according to the owning +// partitionSpan takes a single span and splits it up according to the owning // nodes (if the span touches multiple ranges). // // - partitions is the set of SpanPartitions so far. The updated set is @@ -1264,14 +1265,14 @@ func (dsp *DistSQLPlanner) partitionSpan( return partitions, lastPartitionIdx, nil } -// partitionSpansSystem finds node owners for ranges touching the given spans +// deprecatedPartitionSpansSystem finds node owners for ranges touching the given spans // for a system tenant. -func (dsp *DistSQLPlanner) partitionSpansSystem( +func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (partitions []SpanPartition, _ error) { nodeMap := make(map[base.SQLInstanceID]int) resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID { - return dsp.getSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID) + return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID) } for _, span := range spans { var err error @@ -1285,14 +1286,16 @@ func (dsp *DistSQLPlanner) partitionSpansSystem( return partitions, nil } -// partitionSpansTenant assigns SQL instances in a tenant to spans. It performs -// region-aware physical planning among all available SQL instances if the -// region information is available on at least some of the instances, and it -// falls back to naive round-robin assignment if not. -func (dsp *DistSQLPlanner) partitionSpansTenant( +// partitionSpans assigns SQL instances to spans. In mixed sql and KV mode it +// generally assigns each span to the instance hosted on the KV node chosen by +// the configured replica oracle, while in clusters operating with standalone +// SQL instances it performs locality-aware physical planning among all +// available SQL instances if the locality info is available on at least some of +// the instances, and it falls back to naive round-robin assignment if not. +func (dsp *DistSQLPlanner) partitionSpans( ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (partitions []SpanPartition, _ error) { - resolver, instances, hasLocalitySet, err := dsp.makeSQLInstanceIDForKVNodeIDTenantResolver(ctx) + resolver, instances, err := dsp.makeInstanceResolver(ctx) if err != nil { return nil, err } @@ -1303,7 +1306,7 @@ func (dsp *DistSQLPlanner) partitionSpansTenant( // Rows with column families may have been split into different spans. // These spans should be assigned the same pod so that the pod can // stitch together the rows correctly. Split rows are in adjacent spans. - if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil { + if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 { if safeKey.Equal(lastKey) { if log.V(1) { log.Infof(ctx, "partitioning span %s", span) @@ -1321,17 +1324,17 @@ func (dsp *DistSQLPlanner) partitionSpansTenant( return nil, err } } - if err = dsp.maybeReassignToGatewaySQLInstance(partitions, instances, hasLocalitySet); err != nil { + if err = dsp.maybeReassignToGatewaySQLInstance(partitions, instances); err != nil { return nil, err } return partitions, nil } -// getSQLInstanceIDForKVNodeIDSystem returns the SQL instance ID that should +// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should // handle the range with the given node ID when planning is done on behalf of // the system tenant. It ensures that the chosen SQL instance is healthy and of // the compatible DistSQL version. -func (dsp *DistSQLPlanner) getSQLInstanceIDForKVNodeIDSystem( +func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem( ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID, ) base.SQLInstanceID { sqlInstanceID := base.SQLInstanceID(nodeID) @@ -1346,42 +1349,55 @@ func (dsp *DistSQLPlanner) getSQLInstanceIDForKVNodeIDSystem( return sqlInstanceID } -// makeSQLInstanceIDForKVNodeIDTenantResolver returns a function that can choose -// the SQL instance ID for a provided node ID on behalf of a tenant. It also -// returns a list of all healthy instances for the current tenant as well as a -// boolean indicating whether the locality information is available for at least -// some of those instances. -func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver( +// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an +// instance that is hosted in the process of a KV node. Currently SQL +// instances run in KV node processes have IDs fixed to be equal to the KV +// nodes' IDs, and all of the SQL instances for a given tenant are _either_ +// run in this mixed mode or standalone, meaning if this server is in mixed +// mode, we can safely assume every other server is as well, and thus has +// IDs matching node IDs. +func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID { + return base.SQLInstanceID(nodeID) +} + +// makeInstanceResolver returns a function that can choose the SQL instance ID +// for a provided KV node ID. It also returns a list of all healthy instances if +// that list was used in choosing an instance, specifically if the localities of +// those instances were used to decide the assignment, for use by any steps that +// wish to post-process that assignment (such as adjusting based on localities). +// If the instance was assigned statically or the instance list had no locality +// information leading to random assignments then no instance list is returned. +func (dsp *DistSQLPlanner) makeInstanceResolver( ctx context.Context, -) ( - resolver func(roachpb.NodeID) base.SQLInstanceID, - _ []sqlinstance.InstanceInfo, - hasLocalitySet bool, - _ error, -) { +) (func(roachpb.NodeID) base.SQLInstanceID, []sqlinstance.InstanceInfo, error) { + if _, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID(); mixedProcessMode { + return instanceIDForKVNodeHostedInstance, nil, nil + } + // GetAllInstances only returns healthy instances. // TODO(yuzefovich): confirm that all instances are of compatible version. instances, err := dsp.sqlAddressResolver.GetAllInstances(ctx) if err != nil { - return nil, nil, false, err + return nil, nil, err } if len(instances) == 0 { - return nil, nil, false, errors.New("no healthy sql instances available for planning") + return nil, nil, errors.New("no healthy sql instances available for planning") } rng, _ := randutil.NewPseudoRand() + instancesHaveLocality := false for i := range instances { if instances[i].Locality.NonEmpty() { - hasLocalitySet = true + instancesHaveLocality = true break } } // If we were able to determine the locality information for at least some // instances, use the region-aware resolver. - if hasLocalitySet { - resolver = func(nodeID roachpb.NodeID) base.SQLInstanceID { + if instancesHaveLocality { + resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID { // Lookup the node localities to compare to the instance localities. nodeDesc, err := dsp.nodeDescs.GetNodeDescriptor(nodeID) if err != nil { @@ -1396,7 +1412,7 @@ func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver( // just return the gateway. return dsp.gatewaySQLInstanceID } - return resolver, instances, hasLocalitySet, nil + return resolver, instances, nil } // If no sql instances have locality information, fallback to a naive @@ -1407,12 +1423,12 @@ func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver( instances[i], instances[j] = instances[j], instances[i] }) var i int - resolver = func(roachpb.NodeID) base.SQLInstanceID { + resolver := func(roachpb.NodeID) base.SQLInstanceID { id := instances[i%len(instances)].InstanceID i++ return id } - return resolver, instances, false, nil + return resolver, nil, nil } // closestInstances returns the subset of instances which are closest to the @@ -1443,41 +1459,40 @@ func closestInstances( // the locality information isn't available for the instances, then we assume // the assigned instance to be in the same region as the gateway. func (dsp *DistSQLPlanner) maybeReassignToGatewaySQLInstance( - partitions []SpanPartition, instances []sqlinstance.InstanceInfo, hasLocalitySet bool, + partitions []SpanPartition, instances []sqlinstance.InstanceInfo, ) error { - if len(partitions) != 1 || partitions[0].SQLInstanceID == dsp.gatewaySQLInstanceID { + if len(partitions) != 1 || partitions[0].SQLInstanceID == dsp.gatewaySQLInstanceID || len(instances) < 1 { // Keep the existing partitioning if more than one instance is used or // the gateway is already used as the single instance. return nil } var gatewayRegion, assignedRegion string - if hasLocalitySet { - assignedInstance := partitions[0].SQLInstanceID - var ok bool - for _, instance := range instances { - if instance.InstanceID == dsp.gatewaySQLInstanceID { - gatewayRegion, ok = instance.Locality.Find("region") - if !ok { - // If we can't determine the region of the gateway, keep the - // spans assigned to the other instance. - break - } - } else if instance.InstanceID == assignedInstance { - assignedRegion, ok = instance.Locality.Find("region") - if !ok { - // We couldn't determine the region of the assigned instance - // but it shouldn't be possible since we wouldn't have used - // the instance in the planning (since we wouldn't include - // it into regionToSQLInstanceIDs map in - // makeSQLInstanceIDForKVNodeIDTenantResolver). - return errors.AssertionFailedf( - "unexpectedly planned all spans on a SQL instance %s "+ - "which we could not find region for", instance, - ) - } + assignedInstance := partitions[0].SQLInstanceID + var ok bool + for _, instance := range instances { + if instance.InstanceID == dsp.gatewaySQLInstanceID { + gatewayRegion, ok = instance.Locality.Find("region") + if !ok { + // If we can't determine the region of the gateway, keep the + // spans assigned to the other instance. + break + } + } else if instance.InstanceID == assignedInstance { + assignedRegion, ok = instance.Locality.Find("region") + if !ok { + // We couldn't determine the region of the assigned instance + // but it shouldn't be possible since we wouldn't have used + // the instance in the planning (since we wouldn't include + // it into regionToSQLInstanceIDs map in + // makeSQLInstanceIDForKVNodeIDTenantResolver). + return errors.AssertionFailedf( + "unexpectedly planned all spans on a SQL instance %s "+ + "which we could not find region for", instance, + ) } } } + if gatewayRegion == assignedRegion { partitions[0].SQLInstanceID = dsp.gatewaySQLInstanceID } @@ -1510,10 +1525,10 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( return 0, err } - if dsp.codec.ForSystemTenant() { - return dsp.getSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil + if dsp.codec.ForSystemTenant() && !dsp.distSQLSrv.Settings.Version.IsActive(ctx, clusterversion.V23_1) { + return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil } - resolver, _, _, err := dsp.makeSQLInstanceIDForKVNodeIDTenantResolver(ctx) + resolver, _, err := dsp.makeInstanceResolver(ctx) if err != nil { return 0, err } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 6f0c4f22a2e9..1cff492d082d 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -1019,6 +1019,9 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) { ranges: ranges, } + nID := &base.NodeIDContainer{} + nID.Reset(tsp.nodes[gatewayNode-1].NodeID) + gw := gossip.MakeOptionalGossip(mockGossip) dsp := DistSQLPlanner{ planVersion: tc.planVersion, @@ -1037,7 +1040,8 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) { return true }, }, - codec: keys.SystemSQLCodec, + codec: keys.SystemSQLCodec, + distSQLSrv: &distsql.ServerImpl{ServerConfig: execinfra.ServerConfig{NodeID: base.NewSQLIDContainerForNode(nID)}}, } ctx := context.Background()