Skip to content

Commit

Permalink
sql: Updated CTAS logic to utilize the BulkRowWriter processor.
Browse files Browse the repository at this point in the history
This change integrates the BulkRowWriter into the distsql
PlanAndRun phase for CTAS statements.

Release Note: None
  • Loading branch information
adityamaru27 committed Jul 8, 2019
1 parent 63d832c commit 3e1f7f7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 128 deletions.
6 changes: 3 additions & 3 deletions pkg/sql/logictest/testdata/planner_test/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ statement ok
SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off

query TT
SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message
SELECT operation, message
FROM [SHOW KV TRACE FOR SESSION]
WHERE message NOT LIKE '%Z/%'
AND message NOT SIMILAR TO '%(PushTxn|ResolveIntent|SystemConfigSpan)%'
Expand All @@ -145,8 +145,8 @@ WHERE message NOT LIKE '%Z/%'
----
[async] kv.DistSender: sending pre-commit query intents r7: sending batch 1 QueryIntent to (n1,s1):1
table reader Scan /Table/55/{1-2}
table reader fetched: /kv2/primary/...PK.../k/v -> /1/2
flow Put /Table/55/1/...PK.../0 -> /TUPLE/1:1:Int/1/1:2:Int/4
table reader fetched: /kv2/primary/1/k/v -> /1/2
flow Put /Table/55/1/1/0 -> /TUPLE/1:1:Int/1/1:2:Int/4
flow fast path completed
exec cmd: exec stmt rows affected: 1

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/opt/exec/execbuilder/testdata/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ statement ok
SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off

query TT
SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message
SELECT operation, message
FROM [SHOW KV TRACE FOR SESSION]
WHERE message NOT LIKE '%Z/%'
AND message NOT SIMILAR TO '%(PushTxn|ResolveIntent|SystemConfigSpan)%'
Expand All @@ -141,8 +141,8 @@ WHERE message NOT LIKE '%Z/%'
----
[async] kv.DistSender: sending pre-commit query intents r7: sending batch 1 QueryIntent to (n1,s1):1
table reader Scan /Table/55/{1-2}
table reader fetched: /kv2/primary/...PK.../k/v -> /1/2
flow Put /Table/55/1/...PK.../0 -> /TUPLE/1:1:Int/1/1:2:Int/4
table reader fetched: /kv2/primary/1/k/v -> /1/2
flow Put /Table/55/1/1/0 -> /TUPLE/1:1:Int/1/1:2:Int/4
flow fast path completed
exec cmd: exec stmt rows affected: 1

Expand Down
153 changes: 31 additions & 122 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,20 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -144,9 +143,7 @@ type SchemaChanger struct {
clock *hlc.Clock
settings *cluster.Settings
execCfg *ExecutorConfig
// Placeholder information used by CTAS execution in the SchemaChanger.
placeholders *tree.PlaceholderInfo
ieFactory sqlutil.SessionBoundInternalExecutorFactory
ieFactory sqlutil.SessionBoundInternalExecutorFactory
}

// NewSchemaChangerForTesting only for tests.
Expand Down Expand Up @@ -574,10 +571,7 @@ func (sc *SchemaChanger) maybeDropTable(
// maybe backfill a created table by executing the AS query. Return nil if
// successfully backfilled.
func (sc *SchemaChanger) maybeBackfillCreateTableAs(
ctx context.Context,
table *sqlbase.TableDescriptor,
evalCtx *extendedEvalContext,
placeholders *tree.PlaceholderInfo,
ctx context.Context, table *sqlbase.TableDescriptor, evalCtx *extendedEvalContext,
) error {
if table.Adding() && table.IsAs() {
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
Expand Down Expand Up @@ -612,21 +606,21 @@ func (sc *SchemaChanger) maybeBackfillCreateTableAs(
}
defer localPlanner.curPlan.close(ctx)

colTypes := make([]types.T, len(table.VisibleColumns()))
for i, t := range table.VisibleColumns() {
colTypes[i] = t.Type
}
ci := sqlbase.ColTypeInfoFromColTypes(colTypes)
rows := rowcontainer.NewRowContainer(
localPlanner.EvalContext().Mon.MakeBoundAccount(), ci, 0,
)
defer rows.Close(ctx)

rw := NewRowResultWriter(rows)
res := roachpb.BulkOpSummary{}
rw := newCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
// TODO(adityamaru): Use the BulkOpSummary for either telemetry or to
// return to user.
var counts roachpb.BulkOpSummary
if err := protoutil.Unmarshal([]byte(*row[0].(*tree.DBytes)), &counts); err != nil {
return err
}
res.Add(counts)
return nil
})
recv := MakeDistSQLReceiver(
ctx,
rw,
stmt.AST.StatementType(),
tree.Rows,
sc.execCfg.RangeDescriptorCache,
sc.execCfg.LeaseHolderCache,
txn,
Expand All @@ -637,125 +631,40 @@ func (sc *SchemaChanger) maybeBackfillCreateTableAs(
)
defer recv.Release()

planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, localPlanner.ExtendedEvalContext(), txn)
rec, err := sc.distSQLPlanner.checkSupportForNode(localPlanner.curPlan.plan)
planCtx.isLocal = err != nil || rec == cannotDistribute
planCtx.planner = localPlanner
planCtx.stmtType = recv.stmtType

var planAndRunErr error
localPlanner.runWithOptions(resolveFlags{skipCache: true}, func() {
// Resolve subqueries before running the queries' physical plan.
if len(localPlanner.curPlan.subqueryPlans) != 0 {
if !sc.distSQLPlanner.PlanAndRunSubqueries(
planCtx.ctx, localPlanner, localPlanner.ExtendedEvalContextCopy,
ctx, localPlanner, localPlanner.ExtendedEvalContextCopy,
localPlanner.curPlan.subqueryPlans, recv, rec == canDistribute,
) {
if planAndRunErr = rw.Err(); err != nil {
if planAndRunErr = rw.Err(); planAndRunErr != nil {
return
}
if recv.commErr != nil {
planAndRunErr = recv.commErr
if planAndRunErr = recv.commErr; planAndRunErr != nil {
return
}
}
}
// Copy the evalCtx as it might be modified.
evalCtxCopy := localPlanner.ExtendedEvalContextCopy()

sc.distSQLPlanner.PlanAndRun(ctx, evalCtxCopy, planCtx, txn, localPlanner.curPlan.plan, recv)
if recv.commErr != nil {
planAndRunErr = recv.commErr
isLocal := err != nil || rec == cannotDistribute
out := distsqlpb.ProcessorCoreUnion{BulkRowWriter: &distsqlpb.BulkRowWriterSpec{
Table: *table,
}}

PlanAndRunCTAS(ctx, sc.distSQLPlanner, localPlanner,
txn, isLocal, localPlanner.curPlan.plan, out, recv)
if planAndRunErr = rw.Err(); planAndRunErr != nil {
return
}
if rw.Err() != nil {
planAndRunErr = rw.Err()
if planAndRunErr = recv.commErr; planAndRunErr != nil {
return
}
})

if planAndRunErr != nil {
return planAndRunErr
}

// This is a very simplified version of the INSERT logic: no CHECK
// expressions, no FK checks, no arbitrary insertion order, no
// RETURNING, etc.

// Instantiate a row inserter and table writer.
ri, err := row.MakeInserter(
txn,
sqlbase.NewImmutableTableDescriptor(*table),
nil,
table.Columns,
row.SkipFKs,
&localPlanner.alloc)
if err != nil {
return err
}
ti := tableInserterPool.Get().(*tableInserter)
*ti = tableInserter{ri: ri}
tw := tableWriter(ti)
defer func() {
tw.close(ctx)
*ti = tableInserter{}
tableInserterPool.Put(ti)
}()
if err := tw.init(txn, localPlanner.EvalContext()); err != nil {
return err
}

// Prepare the buffer for row values. At this point, one more
// column has been added by ensurePrimaryKey() to the list of
// columns stored in table.Columns.
rowBuffer := make(tree.Datums, len(table.Columns))
pkColIdx := len(table.Columns) - 1

// The optimizer includes the rowID expression as part of the input
// expression. But the heuristic planner does not do this, so construct a
// rowID expression to be evaluated separately.
//
// TODO(adityamaru): This could be redundant as it is only required when
// the heuristic planner is used, but currently there is no way of knowing
// this from the SchemaChanger.
var defTypedExpr tree.TypedExpr
// Prepare the rowID expression.
defExprSQL := *table.Columns[pkColIdx].DefaultExpr
defExpr, err := parser.ParseExpr(defExprSQL)
if err != nil {
return err
}
defTypedExpr, err = localPlanner.analyzeExpr(
ctx,
defExpr,
nil, /*sources*/
tree.IndexedVarHelper{},
types.Any,
false, /*requireType*/
"CREATE TABLE AS")
if err != nil {
return err
}

for i := 0; i < rows.Len(); i++ {
copy(rowBuffer, rows.At(i))

rowBuffer[pkColIdx], err = defTypedExpr.Eval(localPlanner.EvalContext())
if err != nil {
return err
}

err := tw.row(ctx, rowBuffer, evalCtx.Tracing.KVTracingEnabled())
if err != nil {
return err
}
}

_, err = tw.finalize(
ctx, evalCtx.Tracing.KVTracingEnabled())
if err != nil {
return err
}
return rw.Err()
return planAndRunErr
}); err != nil {
return err
}
Expand Down Expand Up @@ -1018,7 +927,7 @@ func (sc *SchemaChanger) exec(
return err
}

if err := sc.maybeBackfillCreateTableAs(ctx, tableDesc, evalCtx, sc.placeholders); err != nil {
if err := sc.maybeBackfillCreateTableAs(ctx, tableDesc, evalCtx); err != nil {
return err
}

Expand Down

0 comments on commit 3e1f7f7

Please sign in to comment.