From 8c528870bdbc83caf174b904bcf17573a12f8bac Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 21 Jun 2019 16:58:02 -0400 Subject: [PATCH] sql: Updated CTAS logic to utilize the BulkRowWriter processor. This change integrates the BulkRowWriter into the distsql PlanAndRun phase for CTAS statements. Release Note: None --- pkg/sql/create_table.go | 204 ++++++++++++++++++ pkg/sql/distsql_plan_ctas.go | 22 +- .../testdata/planner_test/show_trace | 6 +- .../opt/exec/execbuilder/testdata/show_trace | 6 +- pkg/sql/schema_changer.go | 153 +++---------- 5 files changed, 259 insertions(+), 132 deletions(-) diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 9f761ec9bf4d..8e7a759e98a2 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -17,10 +17,14 @@ import ( "math" "sort" "strings" + "sync/atomic" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -29,9 +33,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" ) @@ -1639,3 +1648,198 @@ func MakeCheckConstraint( ColumnIDs: colIDs, }, nil } + +func newBulkRowWriterProcessor( + flowCtx *distsqlrun.FlowCtx, + processorID int32, + spec distsqlpb.BulkRowWriterSpec, + input distsqlrun.RowSource, + output distsqlrun.RowReceiver, +) (distsqlrun.Processor, error) { + c := &bulkRowWriter{ + flowCtx: flowCtx, + processorID: processorID, + batchIdxAtomic: 0, + spec: spec, + input: input, + output: output, + } + if err := c.out.Init(&distsqlpb.PostProcessSpec{}, CTASPlanResultTypes, + flowCtx.NewEvalCtx(), output); err != nil { + return nil, err + } + return c, nil +} + +type bulkRowWriter struct { + flowCtx *distsqlrun.FlowCtx + processorID int32 + batchIdxAtomic int64 + spec distsqlpb.BulkRowWriterSpec + input distsqlrun.RowSource + out distsqlrun.ProcOutputHelper + output distsqlrun.RowReceiver +} + +var _ distsqlrun.Processor = &bulkRowWriter{} + +func (sp *bulkRowWriter) OutputTypes() []types.T { + return CTASPlanResultTypes +} + +// ingestKvs drains kvs from the channel until it closes, ingesting them using +// the BulkAdder. It handles the required buffering/sorting/etc. +func ingestKvs( + ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan []roachpb.KeyValue, +) error { + for kvBatch := range kvCh { + for _, kv := range kvBatch { + if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil { + if _, ok := err.(storagebase.DuplicateKeyError); ok { + return errors.WithStack(err) + } + return err + } + } + } + + if err := adder.Flush(ctx); err != nil { + if err, ok := err.(storagebase.DuplicateKeyError); ok { + return errors.WithStack(err) + } + return err + } + return nil +} + +func (sp *bulkRowWriter) Run(ctx context.Context) { + ctx, span := tracing.ChildSpan(ctx, "bulkRowWriter") + defer tracing.FinishSpan(span) + + var kvCh chan []roachpb.KeyValue + var g ctxgroup.Group + err := func() error { + typs := sp.input.OutputTypes() + sp.input.Start(ctx) + input := distsqlrun.MakeNoMetadataRowSource(sp.input, sp.output) + + alloc := &sqlbase.DatumAlloc{} + + // Create a new evalCtx per converter so each go routine gets its own + // collationenv, which can't be accessed in parallel. + evalCtx := sp.flowCtx.EvalCtx.Copy() + kvCh = make(chan []roachpb.KeyValue, 10) + + conv, err := NewRowConverter(&sp.spec.Table, evalCtx, kvCh) + if err != nil { + return err + } + if conv.EvalCtx.SessionData == nil { + panic("uninitialized session data") + } + + done := false + + g = ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + writeTS := sp.spec.Table.CreateAsOfTime + const bufferSize, flushSize = 64 << 20, 16 << 20 + adder, err := sp.flowCtx.BulkAdder(ctx, sp.flowCtx.ClientDB, + bufferSize, flushSize, writeTS) + if err != nil { + return err + } + defer adder.Close(ctx) + + // Drain the kvCh using the BulkAdder until it closes. + if err := ingestKvs(ctx, adder, kvCh); err != nil { + return err + } + + added := adder.GetSummary() + countsBytes, err := protoutil.Marshal(&added) + if err != nil { + return err + } + cs, err := sp.out.EmitRow(ctx, sqlbase.EncDatumRow{ + sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))), + }) + if err != nil { + return err + } + + if cs != distsqlrun.NeedMoreRows { + return errors.New("unexpected closure of consumer") + } + + return nil + }) + + for { + var rows int64 + for { + row, err := input.NextRow() + if err != nil { + return err + } + if row == nil { + done = true + break + } + rows++ + + for i, ed := range row { + if ed.IsNull() { + conv.Datums[i] = tree.DNull + continue + } + if err := ed.EnsureDecoded(&typs[i], alloc); err != nil { + return err + } + conv.Datums[i] = ed.Datum + } + + // `conv.Row` uses these as arguments to GenerateUniqueID to generate + // hidden primary keys, when necessary. We want them to be ascending per + // to reduce overlap in the resulting kvs and non-conflicting (because + // of primary key uniqueness). The ids that come out of GenerateUniqueID + // are sorted by (fileIndex, rowIndex) and unique as long as the two + // inputs are a unique combo, so using the processor ID and a + // monotonically increasing batch index should do what we want. + if err := conv.Row(ctx, sp.processorID, sp.batchIdxAtomic); err != nil { + return err + } + atomic.AddInt64(&sp.batchIdxAtomic, 1) + } + if rows < 1 { + break + } + + if err := conv.SendBatch(ctx); err != nil { + return err + } + + if done { + break + } + } + + return nil + }() + close(kvCh) + + writerErr := g.Wait() + if err == nil { + err = writerErr + } + if writerErr != nil { + log.Error(ctx, writerErr) + } + + distsqlrun.DrainAndClose( + ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input) +} + +func init() { + distsqlrun.NewBulkRowWriterProcessor = newBulkRowWriterProcessor +} diff --git a/pkg/sql/distsql_plan_ctas.go b/pkg/sql/distsql_plan_ctas.go index 4edbba702db6..f347b15a57f7 100644 --- a/pkg/sql/distsql_plan_ctas.go +++ b/pkg/sql/distsql_plan_ctas.go @@ -1,3 +1,13 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + package sql import ( @@ -5,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/pkg/errors" ) @@ -18,16 +29,17 @@ var CTASPlanResultTypes = []types.T{ func PlanAndRunCTAS( ctx context.Context, dsp *DistSQLPlanner, - evalCtx *extendedEvalContext, + planner *planner, txn *client.Txn, - canDistribute bool, isLocal bool, in planNode, out distsqlpb.ProcessorCoreUnion, recv *DistSQLReceiver, ) { - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, txn) + planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), txn) planCtx.isLocal = isLocal + planCtx.planner = planner + planCtx.stmtType = tree.Rows p, err := dsp.createPlanForNode(planCtx, in) if err != nil { @@ -43,6 +55,8 @@ func PlanAndRunCTAS( p.PlanToStreamColMap = []int{0} p.ResultTypes = CTASPlanResultTypes + // Make copy of evalCtx as Run might modify it. + evalCtxCopy := planner.ExtendedEvalContextCopy() dsp.FinalizePlan(planCtx, &p) - dsp.Run(planCtx, txn, &p, recv, evalCtx, nil /* finishedSetupFn */) + dsp.Run(planCtx, txn, &p, recv, evalCtxCopy, nil /* finishedSetupFn */) } diff --git a/pkg/sql/logictest/testdata/planner_test/show_trace b/pkg/sql/logictest/testdata/planner_test/show_trace index 9f386e4d72a6..d5444499241a 100644 --- a/pkg/sql/logictest/testdata/planner_test/show_trace +++ b/pkg/sql/logictest/testdata/planner_test/show_trace @@ -134,7 +134,7 @@ statement ok SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off query TT -SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message +SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE message NOT LIKE '%Z/%' AND message NOT SIMILAR TO '%(PushTxn|ResolveIntent|SystemConfigSpan)%' @@ -145,8 +145,8 @@ WHERE message NOT LIKE '%Z/%' ---- [async] kv.DistSender: sending pre-commit query intents r7: sending batch 1 QueryIntent to (n1,s1):1 table reader Scan /Table/55/{1-2} -table reader fetched: /kv2/primary/...PK.../k/v -> /1/2 -flow Put /Table/55/1/...PK.../0 -> /TUPLE/1:1:Int/1/1:2:Int/4 +table reader fetched: /kv2/primary/1/k/v -> /1/2 +flow Put /Table/55/1/1/0 -> /TUPLE/1:1:Int/1/1:2:Int/4 flow fast path completed exec cmd: exec stmt rows affected: 1 diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace b/pkg/sql/opt/exec/execbuilder/testdata/show_trace index aadc7c90c27e..d7326868a465 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace @@ -130,7 +130,7 @@ statement ok SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off query TT -SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message +SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE message NOT LIKE '%Z/%' AND message NOT SIMILAR TO '%(PushTxn|ResolveIntent|SystemConfigSpan)%' @@ -141,8 +141,8 @@ WHERE message NOT LIKE '%Z/%' ---- [async] kv.DistSender: sending pre-commit query intents r7: sending batch 1 QueryIntent to (n1,s1):1 table reader Scan /Table/55/{1-2} -table reader fetched: /kv2/primary/...PK.../k/v -> /1/2 -flow Put /Table/55/1/...PK.../0 -> /TUPLE/1:1:Int/1/1:2:Int/4 +table reader fetched: /kv2/primary/1/k/v -> /1/2 +flow Put /Table/55/1/1/0 -> /TUPLE/1:1:Int/1/1:2:Int/4 flow fast path completed exec cmd: exec stmt rows affected: 1 diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 2c381cf120c6..bad74cd1406b 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -29,21 +29,20 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/sql/row" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -144,9 +143,7 @@ type SchemaChanger struct { clock *hlc.Clock settings *cluster.Settings execCfg *ExecutorConfig - // Placeholder information used by CTAS execution in the SchemaChanger. - placeholders *tree.PlaceholderInfo - ieFactory sqlutil.SessionBoundInternalExecutorFactory + ieFactory sqlutil.SessionBoundInternalExecutorFactory } // NewSchemaChangerForTesting only for tests. @@ -574,10 +571,7 @@ func (sc *SchemaChanger) maybeDropTable( // maybe backfill a created table by executing the AS query. Return nil if // successfully backfilled. func (sc *SchemaChanger) maybeBackfillCreateTableAs( - ctx context.Context, - table *sqlbase.TableDescriptor, - evalCtx *extendedEvalContext, - placeholders *tree.PlaceholderInfo, + ctx context.Context, table *sqlbase.TableDescriptor, evalCtx *extendedEvalContext, ) error { if table.Adding() && table.IsAs() { if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { @@ -612,21 +606,21 @@ func (sc *SchemaChanger) maybeBackfillCreateTableAs( } defer localPlanner.curPlan.close(ctx) - colTypes := make([]types.T, len(table.VisibleColumns())) - for i, t := range table.VisibleColumns() { - colTypes[i] = t.Type - } - ci := sqlbase.ColTypeInfoFromColTypes(colTypes) - rows := rowcontainer.NewRowContainer( - localPlanner.EvalContext().Mon.MakeBoundAccount(), ci, 0, - ) - defer rows.Close(ctx) - - rw := NewRowResultWriter(rows) + res := roachpb.BulkOpSummary{} + rw := newCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { + // TODO(adityamaru): Use the BulkOpSummary for either telemetry or to + // return to user. + var counts roachpb.BulkOpSummary + if err := protoutil.Unmarshal([]byte(*row[0].(*tree.DBytes)), &counts); err != nil { + return err + } + res.Add(counts) + return nil + }) recv := MakeDistSQLReceiver( ctx, rw, - stmt.AST.StatementType(), + tree.Rows, sc.execCfg.RangeDescriptorCache, sc.execCfg.LeaseHolderCache, txn, @@ -637,125 +631,40 @@ func (sc *SchemaChanger) maybeBackfillCreateTableAs( ) defer recv.Release() - planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, localPlanner.ExtendedEvalContext(), txn) rec, err := sc.distSQLPlanner.checkSupportForNode(localPlanner.curPlan.plan) - planCtx.isLocal = err != nil || rec == cannotDistribute - planCtx.planner = localPlanner - planCtx.stmtType = recv.stmtType - var planAndRunErr error localPlanner.runWithOptions(resolveFlags{skipCache: true}, func() { + // Resolve subqueries before running the queries' physical plan. if len(localPlanner.curPlan.subqueryPlans) != 0 { if !sc.distSQLPlanner.PlanAndRunSubqueries( - planCtx.ctx, localPlanner, localPlanner.ExtendedEvalContextCopy, + ctx, localPlanner, localPlanner.ExtendedEvalContextCopy, localPlanner.curPlan.subqueryPlans, recv, rec == canDistribute, ) { - if planAndRunErr = rw.Err(); err != nil { + if planAndRunErr = rw.Err(); planAndRunErr != nil { return } - if recv.commErr != nil { - planAndRunErr = recv.commErr + if planAndRunErr = recv.commErr; planAndRunErr != nil { return } } } - // Copy the evalCtx as it might be modified. - evalCtxCopy := localPlanner.ExtendedEvalContextCopy() - sc.distSQLPlanner.PlanAndRun(ctx, evalCtxCopy, planCtx, txn, localPlanner.curPlan.plan, recv) - if recv.commErr != nil { - planAndRunErr = recv.commErr + isLocal := err != nil || rec == cannotDistribute + out := distsqlpb.ProcessorCoreUnion{BulkRowWriter: &distsqlpb.BulkRowWriterSpec{ + Table: *table, + }} + + PlanAndRunCTAS(ctx, sc.distSQLPlanner, localPlanner, + txn, isLocal, localPlanner.curPlan.plan, out, recv) + if planAndRunErr = rw.Err(); planAndRunErr != nil { return } - if rw.Err() != nil { - planAndRunErr = rw.Err() + if planAndRunErr = recv.commErr; planAndRunErr != nil { return } }) - if planAndRunErr != nil { - return planAndRunErr - } - - // This is a very simplified version of the INSERT logic: no CHECK - // expressions, no FK checks, no arbitrary insertion order, no - // RETURNING, etc. - - // Instantiate a row inserter and table writer. - ri, err := row.MakeInserter( - txn, - sqlbase.NewImmutableTableDescriptor(*table), - nil, - table.Columns, - row.SkipFKs, - &localPlanner.alloc) - if err != nil { - return err - } - ti := tableInserterPool.Get().(*tableInserter) - *ti = tableInserter{ri: ri} - tw := tableWriter(ti) - defer func() { - tw.close(ctx) - *ti = tableInserter{} - tableInserterPool.Put(ti) - }() - if err := tw.init(txn, localPlanner.EvalContext()); err != nil { - return err - } - - // Prepare the buffer for row values. At this point, one more - // column has been added by ensurePrimaryKey() to the list of - // columns stored in table.Columns. - rowBuffer := make(tree.Datums, len(table.Columns)) - pkColIdx := len(table.Columns) - 1 - - // The optimizer includes the rowID expression as part of the input - // expression. But the heuristic planner does not do this, so construct a - // rowID expression to be evaluated separately. - // - // TODO(adityamaru): This could be redundant as it is only required when - // the heuristic planner is used, but currently there is no way of knowing - // this from the SchemaChanger. - var defTypedExpr tree.TypedExpr - // Prepare the rowID expression. - defExprSQL := *table.Columns[pkColIdx].DefaultExpr - defExpr, err := parser.ParseExpr(defExprSQL) - if err != nil { - return err - } - defTypedExpr, err = localPlanner.analyzeExpr( - ctx, - defExpr, - nil, /*sources*/ - tree.IndexedVarHelper{}, - types.Any, - false, /*requireType*/ - "CREATE TABLE AS") - if err != nil { - return err - } - - for i := 0; i < rows.Len(); i++ { - copy(rowBuffer, rows.At(i)) - - rowBuffer[pkColIdx], err = defTypedExpr.Eval(localPlanner.EvalContext()) - if err != nil { - return err - } - - err := tw.row(ctx, rowBuffer, evalCtx.Tracing.KVTracingEnabled()) - if err != nil { - return err - } - } - - _, err = tw.finalize( - ctx, evalCtx.Tracing.KVTracingEnabled()) - if err != nil { - return err - } - return rw.Err() + return planAndRunErr }); err != nil { return err } @@ -1018,7 +927,7 @@ func (sc *SchemaChanger) exec( return err } - if err := sc.maybeBackfillCreateTableAs(ctx, tableDesc, evalCtx, sc.placeholders); err != nil { + if err := sc.maybeBackfillCreateTableAs(ctx, tableDesc, evalCtx); err != nil { return err }