From 350188b38a4774b6ec37a99f4be4f4d1e3b5eb80 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Wed, 23 Feb 2022 20:31:25 -0800 Subject: [PATCH] distsql: add support for partition spans in tenant This PR adds `PartitionSpansTenant`, which assigns SQL instances to spans when distsql is in tenant mode. The current implementation is a naive round-robin approach. That is, we find all available instances in the tenant and assign each span requested by the caller to an instance in round-robin order. This is reasonable for the initial use case, backup operations, since spans must completely cover a table's span. Release note (sql change): Adds support for distributed backup in a multitenant environment that uses all available SQL pods in the tenant. --- pkg/ccl/backupccl/backup_test.go | 8 +-- pkg/sql/distsql_physical_planner.go | 62 ++++++++++++++++++++++++ pkg/sql/distsql_physical_planner_test.go | 3 ++ 3 files changed, 69 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 8f7169caffa6..36eed1ba9070 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -7253,11 +7253,11 @@ func TestBackupRestoreInsideTenant(t *testing.T) { }) } -// TestBackupRestoreInsideMultiPodTenant verifies that backup and restore work inside -// tenants with multiple SQL pods. Currently, verification -// that restore is distributed to all pods in the multi-pod tests must be done +// TestBackupRestoreInsideMultiPodTenant verifies that backup and restore work +// inside tenants with multiple SQL pods. Currently, verification that restore +// and backup are distributed to all pods in the multi-pod tests must be done // manually and by enabling logging and checking the log for messages containing -// "starting restore data" for nsql1 and nsql2. +// "starting restore data" or "starting backup data" for nsql1 and nsql2. // TODO(harding): Verify that backup and restore are distributed in test. func TestBackupRestoreInsideMultiPodTenant(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 7e390f702bab..cf80104200b3 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -13,6 +13,7 @@ package sql import ( "context" "fmt" + "math/rand" "reflect" "sort" @@ -983,6 +984,17 @@ func (h *distSQLNodeHealth) check(ctx context.Context, sqlInstanceID base.SQLIns // such nodes are assigned to the gateway. func (dsp *DistSQLPlanner) PartitionSpans( planCtx *PlanningCtx, spans roachpb.Spans, +) ([]SpanPartition, error) { + if dsp.codec.ForSystemTenant() { + return dsp.partitionSpansSystem(planCtx, spans) + } + return dsp.partitionSpansTenant(planCtx, spans) +} + +// partitionSpansSystem finds node owners for ranges touching the given spans +// for a system tenant. +func (dsp *DistSQLPlanner) partitionSpansSystem( + planCtx *PlanningCtx, spans roachpb.Spans, ) ([]SpanPartition, error) { if len(spans) == 0 { panic("no spans") @@ -1113,6 +1125,56 @@ func (dsp *DistSQLPlanner) PartitionSpans( return partitions, nil } +// partitionSpansTenant assigns SQL instances in a tenant to spans. Currently +// assignments are made to all available instances in a round-robin fashion. +func (dsp *DistSQLPlanner) partitionSpansTenant( + planCtx *PlanningCtx, spans roachpb.Spans, +) ([]SpanPartition, error) { + if len(spans) == 0 { + panic("no spans") + } + ctx := planCtx.ctx + partitions := make([]SpanPartition, 0, 1) + if planCtx.isLocal { + // If we're planning locally, map all spans to the local node. + partitions = append(partitions, + SpanPartition{dsp.gatewaySQLInstanceID, spans}) + return partitions, nil + } + if dsp.sqlInstanceProvider == nil { + return nil, errors.New("sql instance provider not available in multi-tenant environment") + } + // GetAllInstances only returns healthy instances. + instances, err := dsp.sqlInstanceProvider.GetAllInstances(ctx) + if err != nil { + return nil, err + } + // Randomize the order in which we assign partitions, so that work is + // allocated fairly across queries. + rand.Shuffle(len(instances), func(i, j int) { + instances[i], instances[j] = instances[j], instances[i] + }) + + // nodeMap maps a SQLInstanceID to an index inside the partitions array. + nodeMap := make(map[base.SQLInstanceID]int) + for i := range spans { + span := spans[i] + if log.V(1) { + log.Infof(ctx, "partitioning span %s", span) + } + sqlInstanceID := instances[i%len(instances)].InstanceID + partitionIdx, inNodeMap := nodeMap[sqlInstanceID] + if !inNodeMap { + partitionIdx = len(partitions) + partitions = append(partitions, SpanPartition{SQLInstanceID: sqlInstanceID}) + nodeMap[sqlInstanceID] = partitionIdx + } + partition := &partitions[partitionIdx] + partition.Spans = append(partition.Spans, span) + } + return partitions, nil +} + // nodeVersionIsCompatible decides whether a particular node's DistSQL version // is compatible with dsp.planVersion. It uses gossip to find out the node's // version range. diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index e249c1cee691..17c51847b3ea 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -907,6 +907,7 @@ func TestPartitionSpans(t *testing.T) { return true }, }, + codec: keys.SystemSQLCodec, } planCtx := dsp.NewPlanningCtx(context.Background(), &extendedEvalContext{ @@ -1091,6 +1092,7 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) { return true }, }, + codec: keys.SystemSQLCodec, } planCtx := dsp.NewPlanningCtx(context.Background(), &extendedEvalContext{ @@ -1190,6 +1192,7 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { return true }, }, + codec: keys.SystemSQLCodec, } planCtx := dsp.NewPlanningCtx(context.Background(), &extendedEvalContext{