Skip to content

Commit

Permalink
distsql: add support for partition spans in tenant
Browse files Browse the repository at this point in the history
This PR adds `PartitionSpansTenant`, which assigns SQL instances to
spans when distsql is in tenant mode. The current implementation is a
naive round-robin approach. That is, we find all available instances in
the tenant and assign each span requested by the caller to an instance
in round-robin order. This is reasonable for the initial use case,
backup operations, since spans must completely cover a table's span.

Release note (sql change): Adds support for distributed backup in a
multitenant environment that uses all available SQL pods in the tenant.
  • Loading branch information
rharding6373 committed Feb 26, 2022
1 parent 87bd590 commit 350188b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 4 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7253,11 +7253,11 @@ func TestBackupRestoreInsideTenant(t *testing.T) {
})
}

// TestBackupRestoreInsideMultiPodTenant verifies that backup and restore work inside
// tenants with multiple SQL pods. Currently, verification
// that restore is distributed to all pods in the multi-pod tests must be done
// TestBackupRestoreInsideMultiPodTenant verifies that backup and restore work
// inside tenants with multiple SQL pods. Currently, verification that restore
// and backup are distributed to all pods in the multi-pod tests must be done
// manually and by enabling logging and checking the log for messages containing
// "starting restore data" for nsql1 and nsql2.
// "starting restore data" or "starting backup data" for nsql1 and nsql2.
// TODO(harding): Verify that backup and restore are distributed in test.
func TestBackupRestoreInsideMultiPodTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
62 changes: 62 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package sql
import (
"context"
"fmt"
"math/rand"
"reflect"
"sort"

Expand Down Expand Up @@ -983,6 +984,17 @@ func (h *distSQLNodeHealth) check(ctx context.Context, sqlInstanceID base.SQLIns
// such nodes are assigned to the gateway.
func (dsp *DistSQLPlanner) PartitionSpans(
planCtx *PlanningCtx, spans roachpb.Spans,
) ([]SpanPartition, error) {
if dsp.codec.ForSystemTenant() {
return dsp.partitionSpansSystem(planCtx, spans)
}
return dsp.partitionSpansTenant(planCtx, spans)
}

// partitionSpansSystem finds node owners for ranges touching the given spans
// for a system tenant.
func (dsp *DistSQLPlanner) partitionSpansSystem(
planCtx *PlanningCtx, spans roachpb.Spans,
) ([]SpanPartition, error) {
if len(spans) == 0 {
panic("no spans")
Expand Down Expand Up @@ -1113,6 +1125,56 @@ func (dsp *DistSQLPlanner) PartitionSpans(
return partitions, nil
}

// partitionSpansTenant assigns SQL instances in a tenant to spans. Currently
// assignments are made to all available instances in a round-robin fashion.
func (dsp *DistSQLPlanner) partitionSpansTenant(
planCtx *PlanningCtx, spans roachpb.Spans,
) ([]SpanPartition, error) {
if len(spans) == 0 {
panic("no spans")
}
ctx := planCtx.ctx
partitions := make([]SpanPartition, 0, 1)
if planCtx.isLocal {
// If we're planning locally, map all spans to the local node.
partitions = append(partitions,
SpanPartition{dsp.gatewaySQLInstanceID, spans})
return partitions, nil
}
if dsp.sqlInstanceProvider == nil {
return nil, errors.New("sql instance provider not available in multi-tenant environment")
}
// GetAllInstances only returns healthy instances.
instances, err := dsp.sqlInstanceProvider.GetAllInstances(ctx)
if err != nil {
return nil, err
}
// Randomize the order in which we assign partitions, so that work is
// allocated fairly across queries.
rand.Shuffle(len(instances), func(i, j int) {
instances[i], instances[j] = instances[j], instances[i]
})

// nodeMap maps a SQLInstanceID to an index inside the partitions array.
nodeMap := make(map[base.SQLInstanceID]int)
for i := range spans {
span := spans[i]
if log.V(1) {
log.Infof(ctx, "partitioning span %s", span)
}
sqlInstanceID := instances[i%len(instances)].InstanceID
partitionIdx, inNodeMap := nodeMap[sqlInstanceID]
if !inNodeMap {
partitionIdx = len(partitions)
partitions = append(partitions, SpanPartition{SQLInstanceID: sqlInstanceID})
nodeMap[sqlInstanceID] = partitionIdx
}
partition := &partitions[partitionIdx]
partition.Spans = append(partition.Spans, span)
}
return partitions, nil
}

// nodeVersionIsCompatible decides whether a particular node's DistSQL version
// is compatible with dsp.planVersion. It uses gossip to find out the node's
// version range.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ func TestPartitionSpans(t *testing.T) {
return true
},
},
codec: keys.SystemSQLCodec,
}

planCtx := dsp.NewPlanningCtx(context.Background(), &extendedEvalContext{
Expand Down Expand Up @@ -1091,6 +1092,7 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) {
return true
},
},
codec: keys.SystemSQLCodec,
}

planCtx := dsp.NewPlanningCtx(context.Background(), &extendedEvalContext{
Expand Down Expand Up @@ -1190,6 +1192,7 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
return true
},
},
codec: keys.SystemSQLCodec,
}

planCtx := dsp.NewPlanningCtx(context.Background(), &extendedEvalContext{
Expand Down

0 comments on commit 350188b

Please sign in to comment.