Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
38374: pkg: Add and integrate BulkRowWriter processor into CTAS execution. r=adityamaru27 a=adityamaru27

This change introduces a BulkRowWriter processor which uses
AddSSTable to write rows from a RowSource to the target
table. This is in an attempt to make the CREATE TABLE ... AS
statement more scalable.

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru27 committed Jul 9, 2019
2 parents 20fda8b + 3e1f7f7 commit 8c6fdc6
Show file tree
Hide file tree
Showing 11 changed files with 694 additions and 272 deletions.
57 changes: 57 additions & 0 deletions pkg/sql/distsql_plan_ctas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/pkg/errors"
)

// PlanAndRunCTAS plans and runs the CREATE TABLE AS command.
func PlanAndRunCTAS(
ctx context.Context,
dsp *DistSQLPlanner,
planner *planner,
txn *client.Txn,
isLocal bool,
in planNode,
out distsqlpb.ProcessorCoreUnion,
recv *DistSQLReceiver,
) {
planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), txn)
planCtx.isLocal = isLocal
planCtx.planner = planner
planCtx.stmtType = tree.Rows

p, err := dsp.createPlanForNode(planCtx, in)
if err != nil {
recv.SetError(errors.Wrapf(err, "constructing distSQL plan"))
return
}

p.AddNoGroupingStage(
out, distsqlpb.PostProcessSpec{}, distsqlrun.CTASPlanResultTypes, distsqlpb.Ordering{},
)

// The bulk row writers will emit a binary encoded BulkOpSummary.
p.PlanToStreamColMap = []int{0}
p.ResultTypes = distsqlrun.CTASPlanResultTypes

// Make copy of evalCtx as Run might modify it.
evalCtxCopy := planner.ExtendedEvalContextCopy()
dsp.FinalizePlan(planCtx, &p)
dsp.Run(planCtx, txn, &p, recv, evalCtxCopy, nil /* finishedSetupFn */)
}
5 changes: 5 additions & 0 deletions pkg/sql/distsqlpb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,11 @@ func (s *CSVWriterSpec) summary() (string, []string) {
return "CSVWriter", []string{s.Destination}
}

// summary implements the diagramCellType interface.
func (s *BulkRowWriterSpec) summary() (string, []string) {
return "BulkRowWriterSpec", []string{}
}

// summary implements the diagramCellType interface.
func (w *WindowerSpec) summary() (string, []string) {
details := make([]string, 0, len(w.WindowFns))
Expand Down
208 changes: 132 additions & 76 deletions pkg/sql/distsqlpb/processors.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/sql/distsqlpb/processors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ message ProcessorCoreUnion {
optional ChangeAggregatorSpec changeAggregator = 25;
optional ChangeFrontierSpec changeFrontier = 26;
optional OrdinalitySpec ordinality = 27;
optional BulkRowWriterSpec bulkRowWriter = 28;

reserved 6, 12;
}
Expand Down
Loading

0 comments on commit 8c6fdc6

Please sign in to comment.