Skip to content

Commit

Permalink
distsql: transition distsql physical planner to use SQLInstanceIDs in…
Browse files Browse the repository at this point in the history
…stead of NodeIDs

Refactors DistSQL and some related functions to use `SQLInstanceID`
instead of `NodeID` for multi-tenant support.

Fixes: #49596

Release note: None
  • Loading branch information
rharding6373 committed Jan 26, 2022
1 parent e1ceeda commit 711e228
Show file tree
Hide file tree
Showing 59 changed files with 597 additions and 589 deletions.
21 changes: 11 additions & 10 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package backupccl
import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -39,7 +40,7 @@ func distBackupPlanSpecs(
encryption *jobspb.BackupEncryptionOptions,
mvccFilter roachpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
) (map[roachpb.NodeID]*execinfrapb.BackupDataSpec, error) {
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backup-plan-specs")
_ = ctx // ctx is currently unused, but this new ctx should be used below in the future.
Expand Down Expand Up @@ -88,7 +89,7 @@ func distBackupPlanSpecs(

// First construct spans based on span partitions. Then add on
// introducedSpans based on how those partition.
nodeToSpec := make(map[roachpb.NodeID]*execinfrapb.BackupDataSpec)
sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec)
for _, partition := range spanPartitions {
spec := &execinfrapb.BackupDataSpec{
Spans: partition.Spans,
Expand All @@ -101,11 +102,11 @@ func distBackupPlanSpecs(
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
}
nodeToSpec[partition.Node] = spec
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}

for _, partition := range introducedSpanPartitions {
if spec, ok := nodeToSpec[partition.Node]; ok {
if spec, ok := sqlInstanceIDToSpec[partition.SQLInstanceID]; ok {
spec.IntroducedSpans = partition.Spans
} else {
// We may need to introduce a new spec in the case that there is a node
Expand All @@ -122,21 +123,21 @@ func distBackupPlanSpecs(
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
}
nodeToSpec[partition.Node] = spec
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
}

backupPlanningTraceEvent := BackupProcessorPlanningTraceEvent{
NodeToNumSpans: make(map[int32]int64),
}
for node, spec := range nodeToSpec {
for node, spec := range sqlInstanceIDToSpec {
numSpans := int64(len(spec.Spans) + len(spec.IntroducedSpans))
backupPlanningTraceEvent.NodeToNumSpans[int32(node)] = numSpans
backupPlanningTraceEvent.TotalNumSpans += numSpans
}
span.RecordStructured(&backupPlanningTraceEvent)

return nodeToSpec, nil
return sqlInstanceIDToSpec, nil
}

// distBackup is used to plan the processors for a distributed backup. It
Expand All @@ -148,7 +149,7 @@ func distBackup(
planCtx *sql.PlanningCtx,
dsp *sql.DistSQLPlanner,
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupSpecs map[roachpb.NodeID]*execinfrapb.BackupDataSpec,
backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec,
) error {
ctx, span := tracing.ChildSpan(ctx, "backup-distsql")
defer span.Finish()
Expand All @@ -164,8 +165,8 @@ func distBackup(
// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(backupSpecs))
i := 0
for node, spec := range backupSpecs {
corePlacement[i].NodeID = node
for sqlInstanceID, spec := range backupSpecs {
corePlacement[i].SQLInstanceID = sqlInstanceID
corePlacement[i].Core.BackupData = spec
i++
}
Expand Down
47 changes: 25 additions & 22 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -76,12 +77,12 @@ func distRestore(
fileEncryption = &roachpb.FileEncryptionOptions{Key: encryption.Key}
}

planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
if err != nil {
return err
}

splitAndScatterSpecs, err := makeSplitAndScatterSpecs(nodes, chunks, rekeys)
splitAndScatterSpecs, err := makeSplitAndScatterSpecs(sqlInstanceIDs, chunks, rekeys)
if err != nil {
return err
}
Expand All @@ -102,8 +103,8 @@ func distRestore(
p := planCtx.NewPhysicalPlan()

// Plan SplitAndScatter in a round-robin fashion.
splitAndScatterStageID := p.NewStageOnNodes(nodes)
splitAndScatterProcs := make(map[roachpb.NodeID]physicalplan.ProcessorIdx)
splitAndScatterStageID := p.NewStageOnNodes(sqlInstanceIDs)
splitAndScatterProcs := make(map[base.SQLInstanceID]physicalplan.ProcessorIdx)

defaultStream := int32(0)
rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{
Expand All @@ -116,8 +117,8 @@ func distRestore(
},
},
}
for stream, nodeID := range nodes {
startBytes, endBytes, err := routingSpanForNode(nodeID)
for stream, sqlInstanceID := range sqlInstanceIDs {
startBytes, endBytes, err := routingSpanForSQLInstance(sqlInstanceID)
if err != nil {
return err
}
Expand All @@ -134,7 +135,7 @@ func distRestore(
return bytes.Compare(rangeRouterSpec.Spans[i].Start, rangeRouterSpec.Spans[j].Start) == -1
})

for _, n := range nodes {
for _, n := range sqlInstanceIDs {
spec := splitAndScatterSpecs[n]
if spec == nil {
// We may have fewer chunks than we have nodes for very small imports. In
Expand All @@ -144,7 +145,7 @@ func distRestore(
continue
}
proc := physicalplan.Processor{
Node: n,
SQLInstanceID: n,
Spec: execinfrapb.ProcessorSpec{
Core: execinfrapb.ProcessorCoreUnion{SplitAndScatter: splitAndScatterSpecs[n]},
Post: execinfrapb.PostProcessSpec{},
Expand All @@ -163,11 +164,11 @@ func distRestore(
}

// Plan RestoreData.
restoreDataStageID := p.NewStageOnNodes(nodes)
restoreDataProcs := make(map[roachpb.NodeID]physicalplan.ProcessorIdx)
for _, n := range nodes {
restoreDataStageID := p.NewStageOnNodes(sqlInstanceIDs)
restoreDataProcs := make(map[base.SQLInstanceID]physicalplan.ProcessorIdx)
for _, sqlInstanceID := range sqlInstanceIDs {
proc := physicalplan.Processor{
Node: n,
SQLInstanceID: sqlInstanceID,
Spec: execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{
{ColumnTypes: splitAndScatterOutputTypes},
Expand All @@ -180,17 +181,17 @@ func distRestore(
},
}
pIdx := p.AddProcessor(proc)
restoreDataProcs[n] = pIdx
restoreDataProcs[sqlInstanceID] = pIdx
p.ResultRouters = append(p.ResultRouters, pIdx)
}

for _, srcProc := range splitAndScatterProcs {
slot := 0
for _, destNode := range nodes {
for _, destSQLInstanceID := range sqlInstanceIDs {
// Streams were added to the range router in the same order that the
// nodes appeared in `nodes`. Make sure that the `slot`s here are
// ordered the same way.
destProc := restoreDataProcs[destNode]
destProc := restoreDataProcs[destSQLInstanceID]
p.Streams = append(p.Streams, physicalplan.Stream{
SourceProcessor: srcProc,
SourceRouterSlot: slot,
Expand Down Expand Up @@ -236,23 +237,25 @@ func distRestore(
// spec that should be planned on that node. Given the chunks of ranges to
// import it round-robin distributes the chunks amongst the given nodes.
func makeSplitAndScatterSpecs(
nodes []roachpb.NodeID, chunks [][]execinfrapb.RestoreSpanEntry, rekeys []execinfrapb.TableRekey,
) (map[roachpb.NodeID]*execinfrapb.SplitAndScatterSpec, error) {
specsByNodes := make(map[roachpb.NodeID]*execinfrapb.SplitAndScatterSpec)
sqlInstanceIDs []base.SQLInstanceID,
chunks [][]execinfrapb.RestoreSpanEntry,
rekeys []execinfrapb.TableRekey,
) (map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec, error) {
specsBySQLInstanceID := make(map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec)
for i, chunk := range chunks {
node := nodes[i%len(nodes)]
if spec, ok := specsByNodes[node]; ok {
sqlInstanceID := sqlInstanceIDs[i%len(sqlInstanceIDs)]
if spec, ok := specsBySQLInstanceID[sqlInstanceID]; ok {
spec.Chunks = append(spec.Chunks, execinfrapb.SplitAndScatterSpec_RestoreEntryChunk{
Entries: chunk,
})
} else {
specsByNodes[node] = &execinfrapb.SplitAndScatterSpec{
specsBySQLInstanceID[sqlInstanceID] = &execinfrapb.SplitAndScatterSpec{
Chunks: []execinfrapb.SplitAndScatterSpec_RestoreEntryChunk{{
Entries: chunk,
}},
Rekeys: rekeys,
}
}
}
return specsByNodes, nil
return specsBySQLInstanceID, nil
}
15 changes: 9 additions & 6 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -291,7 +292,7 @@ func (ssp *splitAndScatterProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr
// The routing datums informs the router which output stream should be used.
routingDatum, ok := ssp.routingDatumCache[scatteredEntry.node]
if !ok {
routingDatum, _ = routingDatumsForNode(scatteredEntry.node)
routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
ssp.routingDatumCache[scatteredEntry.node] = routingDatum
}

Expand Down Expand Up @@ -414,18 +415,20 @@ func runSplitAndScatter(
return g.Wait()
}

func routingDatumsForNode(nodeID roachpb.NodeID) (rowenc.EncDatum, rowenc.EncDatum) {
routingBytes := roachpb.Key(fmt.Sprintf("node%d", nodeID))
func routingDatumsForSQLInstance(
sqlInstanceID base.SQLInstanceID,
) (rowenc.EncDatum, rowenc.EncDatum) {
routingBytes := roachpb.Key(fmt.Sprintf("node%d", sqlInstanceID))
startDatum := rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes)))
endDatum := rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes.Next())))
return startDatum, endDatum
}

// routingSpanForNode provides the mapping to be used during distsql planning
// routingSpanForSQLInstance provides the mapping to be used during distsql planning
// when setting up the output router.
func routingSpanForNode(nodeID roachpb.NodeID) ([]byte, []byte, error) {
func routingSpanForSQLInstance(sqlInstanceID base.SQLInstanceID) ([]byte, []byte, error) {
var alloc tree.DatumAlloc
startDatum, endDatum := routingDatumsForNode(nodeID)
startDatum, endDatum := routingDatumsForSQLInstance(sqlInstanceID)

startBytes, endBytes := make([]byte, 0), make([]byte, 0)
startBytes, err := startDatum.Encode(splitAndScatterOutputTypes[0], &alloc, descpb.DatumEncoding_ASCENDING_KEY, startBytes)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestSplitAndScatterProcessor(t *testing.T) {
}
for stream := 0; stream < c.numStreams; stream++ {
// In this test, nodes are 1 indexed.
startBytes, endBytes, err := routingSpanForNode(roachpb.NodeID(stream + 1))
startBytes, endBytes, err := routingSpanForSQLInstance(base.SQLInstanceID(stream + 1))
require.NoError(t, err)

span := execinfrapb.OutputRouterSpec_RangeRouterSpec_Span{
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeeddist/distflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func StartDistChangefeed(
var spanPartitions []sql.SpanPartition
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on the gateway.
spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}}
spanPartitions = []sql.SpanPartition{{SQLInstanceID: dsp.GatewayID(), Spans: trackedSpans}}
} else {
// All other feeds get a ChangeAggregator local on the leaseholder.
var err error
Expand Down Expand Up @@ -86,7 +86,7 @@ func StartDistChangefeed(
UserProto: execCtx.User().EncodeProto(),
JobID: jobID,
}
corePlacement[i].NodeID = sp.Node
corePlacement[i].SQLInstanceID = sp.SQLInstanceID
corePlacement[i].Core.ChangeAggregator = spec
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/importccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/backupccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
Expand Down
17 changes: 9 additions & 8 deletions pkg/ccl/importccl/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -54,7 +55,7 @@ func distImport(
dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()

planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
if err != nil {
return roachpb.BulkOpSummary{}, err
}
Expand All @@ -70,15 +71,15 @@ func distImport(
accumulatedBulkSummary.BulkOpSummary = getLastImportSummary(job)
accumulatedBulkSummary.Unlock()

inputSpecs := makeImportReaderSpecs(job, tables, typeDescs, from, format, nodes, walltime,
inputSpecs := makeImportReaderSpecs(job, tables, typeDescs, from, format, sqlInstanceIDs, walltime,
execCtx.User())

p := planCtx.NewPhysicalPlan()

// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(inputSpecs))
for i := range inputSpecs {
corePlacement[i].NodeID = nodes[i]
corePlacement[i].SQLInstanceID = sqlInstanceIDs[i]
corePlacement[i].Core.ReadImport = inputSpecs[i]
}
p.AddNoInputStage(
Expand Down Expand Up @@ -237,19 +238,19 @@ func makeImportReaderSpecs(
typeDescs []*descpb.TypeDescriptor,
from []string,
format roachpb.IOFileFormat,
nodes []roachpb.NodeID,
sqlInstanceIDs []base.SQLInstanceID,
walltime int64,
user security.SQLUsername,
) []*execinfrapb.ReadImportDataSpec {
details := job.Details().(jobspb.ImportDetails)
// For each input file, assign it to a node.
inputSpecs := make([]*execinfrapb.ReadImportDataSpec, 0, len(nodes))
inputSpecs := make([]*execinfrapb.ReadImportDataSpec, 0, len(sqlInstanceIDs))
progress := job.Progress()
importProgress := progress.GetImport()
for i, input := range from {
// Round robin assign CSV files to nodes. Files 0 through len(nodes)-1
// Round robin assign CSV files to sqlInstanceIDs. Files 0 through len(sqlInstanceIDs)-1
// creates the spec. Future files just add themselves to the Uris.
if i < len(nodes) {
if i < len(sqlInstanceIDs) {
spec := &execinfrapb.ReadImportDataSpec{
Tables: tables,
Types: typeDescs,
Expand All @@ -266,7 +267,7 @@ func makeImportReaderSpecs(
}
inputSpecs = append(inputSpecs, spec)
}
n := i % len(nodes)
n := i % len(sqlInstanceIDs)
inputSpecs[n].Uri[int32(i)] = input
if importProgress.ResumePos != nil {
inputSpecs[n].ResumePos[int32(i)] = importProgress.ResumePos[int32(i)]
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ func ingest(
evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()

planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
if err != nil {
return err
}

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, nodes, initialHighWater, jobID)
streamAddress, topology, sqlInstanceIDs, initialHighWater, jobID)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
return distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
return distStreamIngest(ctx, execCtx, sqlInstanceIDs, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec)
}

Expand Down
Loading

0 comments on commit 711e228

Please sign in to comment.