Skip to content

Commit

Permalink
sql: Add a BulkRowWriter processor to be initially used by CTAS.
Browse files Browse the repository at this point in the history
This change introduces a BulkRowWriter processor which uses the
AddSSTable method to write rows from a RowSource to the target
table. The processor is required to make the CTAS statement
scalable.

Release note: None
  • Loading branch information
adityamaru27 committed Jul 1, 2019
1 parent 0e9bd94 commit 1c87d20
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 144 deletions.
48 changes: 48 additions & 0 deletions pkg/sql/distsql_plan_ctas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/pkg/errors"
)

// CTASPlanResultTypes is the result types for EXPORT plans.
var CTASPlanResultTypes = []types.T{
*types.Bytes, // rows
}

// PlanAndRunCTAS plans and runs the CREATE TABLE AS command.
func PlanAndRunCTAS(
ctx context.Context,
dsp *DistSQLPlanner,
evalCtx *extendedEvalContext,
txn *client.Txn,
canDistribute bool,
isLocal bool,
in planNode,
out distsqlpb.ProcessorCoreUnion,
recv *DistSQLReceiver,
) {
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, txn)
planCtx.isLocal = isLocal

p, err := dsp.createPlanForNode(planCtx, in)
if err != nil {
recv.SetError(errors.Wrapf(err, "constructing distSQL plan"))
return
}

p.AddNoGroupingStage(
out, distsqlpb.PostProcessSpec{}, CTASPlanResultTypes, distsqlpb.Ordering{},
)

// The bulk row writers will emit a binary encoded BulkOpSummary.
p.PlanToStreamColMap = []int{0}
p.ResultTypes = CTASPlanResultTypes

dsp.FinalizePlan(planCtx, &p)
dsp.Run(planCtx, txn, &p, recv, evalCtx, nil /* finishedSetupFn */)
}
5 changes: 5 additions & 0 deletions pkg/sql/distsqlpb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,11 @@ func (s *CSVWriterSpec) summary() (string, []string) {
return "CSVWriter", []string{s.Destination}
}

// summary implements the diagramCellType interface.
func (s *BulkRowWriterSpec) summary() (string, []string) {
return "BulkRowWriterSpec", []string{}
}

// summary implements the diagramCellType interface.
func (w *WindowerSpec) summary() (string, []string) {
details := make([]string, 0, len(w.WindowFns))
Expand Down
208 changes: 132 additions & 76 deletions pkg/sql/distsqlpb/processors.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/sql/distsqlpb/processors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ message ProcessorCoreUnion {
optional ChangeAggregatorSpec changeAggregator = 25;
optional ChangeFrontierSpec changeFrontier = 26;
optional OrdinalitySpec ordinality = 27;
optional BulkRowWriterSpec bulkRowWriter = 28;

reserved 6, 12;
}
Expand Down
Loading

0 comments on commit 1c87d20

Please sign in to comment.