Skip to content

Commit

Permalink
sql: Updated CTAS logic to utilize the BulkRowWriter processor.
Browse files Browse the repository at this point in the history
This change integrates the BulkRowWriter into the distsql
PlanAndRun phase for CTAS statements.

Release Note: None
  • Loading branch information
adityamaru27 committed Jul 1, 2019
1 parent 1c87d20 commit 8c52887
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 132 deletions.
204 changes: 204 additions & 0 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ import (
"math"
"sort"
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -29,9 +33,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
Expand Down Expand Up @@ -1639,3 +1648,198 @@ func MakeCheckConstraint(
ColumnIDs: colIDs,
}, nil
}

func newBulkRowWriterProcessor(
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlpb.BulkRowWriterSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
c := &bulkRowWriter{
flowCtx: flowCtx,
processorID: processorID,
batchIdxAtomic: 0,
spec: spec,
input: input,
output: output,
}
if err := c.out.Init(&distsqlpb.PostProcessSpec{}, CTASPlanResultTypes,
flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
}
return c, nil
}

type bulkRowWriter struct {
flowCtx *distsqlrun.FlowCtx
processorID int32
batchIdxAtomic int64
spec distsqlpb.BulkRowWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
}

var _ distsqlrun.Processor = &bulkRowWriter{}

func (sp *bulkRowWriter) OutputTypes() []types.T {
return CTASPlanResultTypes
}

// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func ingestKvs(
ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan []roachpb.KeyValue,
) error {
for kvBatch := range kvCh {
for _, kv := range kvBatch {
if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
}
}

if err := adder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
return nil
}

func (sp *bulkRowWriter) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "bulkRowWriter")
defer tracing.FinishSpan(span)

var kvCh chan []roachpb.KeyValue
var g ctxgroup.Group
err := func() error {
typs := sp.input.OutputTypes()
sp.input.Start(ctx)
input := distsqlrun.MakeNoMetadataRowSource(sp.input, sp.output)

alloc := &sqlbase.DatumAlloc{}

// Create a new evalCtx per converter so each go routine gets its own
// collationenv, which can't be accessed in parallel.
evalCtx := sp.flowCtx.EvalCtx.Copy()
kvCh = make(chan []roachpb.KeyValue, 10)

conv, err := NewRowConverter(&sp.spec.Table, evalCtx, kvCh)
if err != nil {
return err
}
if conv.EvalCtx.SessionData == nil {
panic("uninitialized session data")
}

done := false

g = ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
writeTS := sp.spec.Table.CreateAsOfTime
const bufferSize, flushSize = 64 << 20, 16 << 20
adder, err := sp.flowCtx.BulkAdder(ctx, sp.flowCtx.ClientDB,
bufferSize, flushSize, writeTS)
if err != nil {
return err
}
defer adder.Close(ctx)

// Drain the kvCh using the BulkAdder until it closes.
if err := ingestKvs(ctx, adder, kvCh); err != nil {
return err
}

added := adder.GetSummary()
countsBytes, err := protoutil.Marshal(&added)
if err != nil {
return err
}
cs, err := sp.out.EmitRow(ctx, sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
})
if err != nil {
return err
}

if cs != distsqlrun.NeedMoreRows {
return errors.New("unexpected closure of consumer")
}

return nil
})

for {
var rows int64
for {
row, err := input.NextRow()
if err != nil {
return err
}
if row == nil {
done = true
break
}
rows++

for i, ed := range row {
if ed.IsNull() {
conv.Datums[i] = tree.DNull
continue
}
if err := ed.EnsureDecoded(&typs[i], alloc); err != nil {
return err
}
conv.Datums[i] = ed.Datum
}

// `conv.Row` uses these as arguments to GenerateUniqueID to generate
// hidden primary keys, when necessary. We want them to be ascending per
// to reduce overlap in the resulting kvs and non-conflicting (because
// of primary key uniqueness). The ids that come out of GenerateUniqueID
// are sorted by (fileIndex, rowIndex) and unique as long as the two
// inputs are a unique combo, so using the processor ID and a
// monotonically increasing batch index should do what we want.
if err := conv.Row(ctx, sp.processorID, sp.batchIdxAtomic); err != nil {
return err
}
atomic.AddInt64(&sp.batchIdxAtomic, 1)
}
if rows < 1 {
break
}

if err := conv.SendBatch(ctx); err != nil {
return err
}

if done {
break
}
}

return nil
}()
close(kvCh)

writerErr := g.Wait()
if err == nil {
err = writerErr
}
if writerErr != nil {
log.Error(ctx, writerErr)
}

distsqlrun.DrainAndClose(
ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input)
}

func init() {
distsqlrun.NewBulkRowWriterProcessor = newBulkRowWriterProcessor
}
22 changes: 18 additions & 4 deletions pkg/sql/distsql_plan_ctas.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/pkg/errors"
)
Expand All @@ -18,16 +29,17 @@ var CTASPlanResultTypes = []types.T{
func PlanAndRunCTAS(
ctx context.Context,
dsp *DistSQLPlanner,
evalCtx *extendedEvalContext,
planner *planner,
txn *client.Txn,
canDistribute bool,
isLocal bool,
in planNode,
out distsqlpb.ProcessorCoreUnion,
recv *DistSQLReceiver,
) {
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, txn)
planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), txn)
planCtx.isLocal = isLocal
planCtx.planner = planner
planCtx.stmtType = tree.Rows

p, err := dsp.createPlanForNode(planCtx, in)
if err != nil {
Expand All @@ -43,6 +55,8 @@ func PlanAndRunCTAS(
p.PlanToStreamColMap = []int{0}
p.ResultTypes = CTASPlanResultTypes

// Make copy of evalCtx as Run might modify it.
evalCtxCopy := planner.ExtendedEvalContextCopy()
dsp.FinalizePlan(planCtx, &p)
dsp.Run(planCtx, txn, &p, recv, evalCtx, nil /* finishedSetupFn */)
dsp.Run(planCtx, txn, &p, recv, evalCtxCopy, nil /* finishedSetupFn */)
}
6 changes: 3 additions & 3 deletions pkg/sql/logictest/testdata/planner_test/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ statement ok
SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off

query TT
SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message
SELECT operation, message
FROM [SHOW KV TRACE FOR SESSION]
WHERE message NOT LIKE '%Z/%'
AND message NOT SIMILAR TO '%(PushTxn|ResolveIntent|SystemConfigSpan)%'
Expand All @@ -145,8 +145,8 @@ WHERE message NOT LIKE '%Z/%'
----
[async] kv.DistSender: sending pre-commit query intents r7: sending batch 1 QueryIntent to (n1,s1):1
table reader Scan /Table/55/{1-2}
table reader fetched: /kv2/primary/...PK.../k/v -> /1/2
flow Put /Table/55/1/...PK.../0 -> /TUPLE/1:1:Int/1/1:2:Int/4
table reader fetched: /kv2/primary/1/k/v -> /1/2
flow Put /Table/55/1/1/0 -> /TUPLE/1:1:Int/1/1:2:Int/4
flow fast path completed
exec cmd: exec stmt rows affected: 1

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/opt/exec/execbuilder/testdata/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ statement ok
SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off

query TT
SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message
SELECT operation, message
FROM [SHOW KV TRACE FOR SESSION]
WHERE message NOT LIKE '%Z/%'
AND message NOT SIMILAR TO '%(PushTxn|ResolveIntent|SystemConfigSpan)%'
Expand All @@ -141,8 +141,8 @@ WHERE message NOT LIKE '%Z/%'
----
[async] kv.DistSender: sending pre-commit query intents r7: sending batch 1 QueryIntent to (n1,s1):1
table reader Scan /Table/55/{1-2}
table reader fetched: /kv2/primary/...PK.../k/v -> /1/2
flow Put /Table/55/1/...PK.../0 -> /TUPLE/1:1:Int/1/1:2:Int/4
table reader fetched: /kv2/primary/1/k/v -> /1/2
flow Put /Table/55/1/1/0 -> /TUPLE/1:1:Int/1/1:2:Int/4
flow fast path completed
exec cmd: exec stmt rows affected: 1

Expand Down
Loading

0 comments on commit 8c52887

Please sign in to comment.