Skip to content

Commit

Permalink
sql: use disk-backed container in bufferNode
Browse files Browse the repository at this point in the history
This commit refactors several `planNode`s that need to buffer rows to
use a disk-backed row container instead of pure in-memory one. In order
to achieve that a couple of light wrappers are introduced on top of the
corresponding container and an iterator over it.

Still, one - probably important - case is not fixed properly: namely, if
a subquery is executed in AllRows or AllRowsNormalized mode, then we
first buffer the rows into the disk-backed container only to materialize
it later into a single tuple. Addressing this is left as a TODO.

Release note (sql change): CockroachDB now should be more stable when
executing queries with subqueries producing many rows (previously we
could OOM crash and now we will use the temporary disk storage).
  • Loading branch information
yuzefovich committed Apr 20, 2021
1 parent defd143 commit ae7992f
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 174 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"authorization.go",
"backfill.go",
"buffer.go",
"buffer_util.go",
"cancel_queries.go",
"cancel_sessions.go",
"check.go",
Expand Down
113 changes: 64 additions & 49 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand All @@ -43,10 +43,10 @@ type applyJoinNode struct {
// columns contains the metadata for the results of this node.
columns colinfo.ResultColumns

// rightCols contains the metadata for the result of the right side of this
// apply join, as built in the optimization phase. Later on, every re-planning
// of the right side will emit these same columns.
rightCols colinfo.ResultColumns
// rightTypes is the schema of the rows produced by the right side of the
// join, as built in the optimization phase. Later on, every re-planning of
// the right side will emit these same columns.
rightTypes []*types.T

planRightSideFn exec.ApplyJoinPlanRightSideFn

Expand All @@ -61,18 +61,16 @@ type applyJoinNode struct {
leftRowFoundAMatch bool
// rightRows will be populated with the result of the right side of the join
// each time it's run.
rightRows *rowcontainer.RowContainer
// curRightRow is the index into rightRows of the current right row being
// processed.
curRightRow int
rightRows rowContainerHelper
// rightRowsIterator, if non-nil, is the iterator into rightRows.
rightRowsIterator *rowContainerIterator
// out is the full result row, populated on each call to Next.
out tree.Datums
// done is true if the left side has been exhausted.
done bool
}
}

// Set to true to enable ultra verbose debug logging.
func newApplyJoinNode(
joinType descpb.JoinType,
left planDataSource,
Expand All @@ -93,7 +91,7 @@ func newApplyJoinNode(
joinType: joinType,
input: left,
pred: pred,
rightCols: rightCols,
rightTypes: getTypesFromResultColumns(rightCols),
planRightSideFn: planRightSideFn,
columns: pred.cols,
}, nil
Expand All @@ -103,15 +101,13 @@ func (a *applyJoinNode) startExec(params runParams) error {
// If needed, pre-allocate a right row of NULL tuples for when the
// join predicate fails to match.
if a.joinType == descpb.LeftOuterJoin {
a.run.emptyRight = make(tree.Datums, len(a.rightCols))
a.run.emptyRight = make(tree.Datums, len(a.rightTypes))
for i := range a.run.emptyRight {
a.run.emptyRight[i] = tree.DNull
}
}
a.run.out = make(tree.Datums, len(a.columns))
ci := colinfo.ColTypeInfoFromResCols(a.rightCols)
acc := params.EvalContext().Mon.MakeBoundAccount()
a.run.rightRows = rowcontainer.NewRowContainer(acc, ci)
a.run.rightRows.init(a.rightTypes, params.extendedEvalCtx, "apply-join" /* opName */)
return nil
}

Expand All @@ -121,37 +117,44 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) {
}

for {
for a.run.curRightRow < a.run.rightRows.Len() {
// We have right rows set up - check the next one for a match.
var rrow tree.Datums
if len(a.rightCols) != 0 {
rrow = a.run.rightRows.At(a.run.curRightRow)
}
a.run.curRightRow++
// Compute join.
predMatched, err := a.pred.eval(params.EvalContext(), a.run.leftRow, rrow)
if err != nil {
return false, err
}
if !predMatched {
// Didn't match? Try with the next right-side row.
continue
}
if a.run.rightRowsIterator != nil {
// We might have right rows set up - check the next one for a match.
for {
var rrow tree.Datums
if len(a.rightTypes) != 0 {
var err error
rrow, err = a.run.rightRowsIterator.next()
if err != nil {
return false, err
}
if rrow == nil {
// We have exhausted all rows from the right side.
break
}
}
// Compute join.
predMatched, err := a.pred.eval(params.EvalContext(), a.run.leftRow, rrow)
if err != nil {
return false, err
}
if !predMatched {
// Didn't match? Try with the next right-side row.
continue
}

a.run.leftRowFoundAMatch = true
if a.joinType == descpb.LeftAntiJoin ||
a.joinType == descpb.LeftSemiJoin {
// We found a match, but we're doing an anti or semi join, so we're
// done with this left row.
break
a.run.leftRowFoundAMatch = true
if a.joinType == descpb.LeftAntiJoin ||
a.joinType == descpb.LeftSemiJoin {
// We found a match, but we're doing an anti or semi join,
// so we're done with this left row.
break
}
// We're doing an ordinary join, so prep the row and emit it.
a.pred.prepareRow(a.run.out, a.run.leftRow, rrow)
return true, nil
}
// We're doing an ordinary join, so prep the row and emit it.
a.pred.prepareRow(a.run.out, a.run.leftRow, rrow)
return true, nil
}
// We're out of right side rows. Clear them, and reset the match state for
// next time.
a.run.rightRows.Clear(params.ctx)
// We're out of right side rows. Reset the match state for next time.
foundAMatch := a.run.leftRowFoundAMatch
a.run.leftRowFoundAMatch = false

Expand Down Expand Up @@ -222,15 +225,25 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) {
// wrong during execution of the right hand side of the join, and that we should
// completely give up on the outer join.
func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planComponents) error {
a.run.curRightRow = 0
a.run.rightRows.Clear(params.ctx)
return runPlanInsidePlan(params, plan, a.run.rightRows)
// Prepare rightRows state for reuse.
if err := a.run.rightRows.clear(params.ctx); err != nil {
return err
}
if a.run.rightRowsIterator != nil {
a.run.rightRowsIterator.close()
a.run.rightRowsIterator = nil
}
if err := runPlanInsidePlan(params, plan, &a.run.rightRows); err != nil {
return err
}
a.run.rightRowsIterator = newRowContainerIterator(params.ctx, a.run.rightRows, a.rightTypes)
return nil
}

// runPlanInsidePlan is used to run a plan and gather the results in a row
// container, as part of the execution of an "outer" plan.
func runPlanInsidePlan(
params runParams, plan *planComponents, rowContainer *rowcontainer.RowContainer,
params runParams, plan *planComponents, rowContainer *rowContainerHelper,
) error {
rowResultWriter := NewRowResultWriter(rowContainer)
recv := MakeDistSQLReceiver(
Expand Down Expand Up @@ -279,7 +292,9 @@ func (a *applyJoinNode) Values() tree.Datums {

func (a *applyJoinNode) Close(ctx context.Context) {
a.input.plan.Close(ctx)
if a.run.rightRows != nil {
a.run.rightRows.Close(ctx)
a.run.rightRows.close(ctx)
if a.run.rightRowsIterator != nil {
a.run.rightRowsIterator.close()
a.run.rightRowsIterator = nil
}
}
46 changes: 26 additions & 20 deletions pkg/sql/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)

// bufferNode consumes its input one row at a time, stores it in the buffer,
Expand All @@ -24,21 +23,18 @@ import (
type bufferNode struct {
plan planNode

// TODO(yuzefovich): the buffer should probably be backed by disk. If so, the
// comments about TempStorage suggest that it should be used by DistSQL
// processors, but this node is local.
bufferedRows *rowcontainer.RowContainer
passThruNextRowIdx int
// typs is the schema of rows buffered by this node.
typs []*types.T
rows rowContainerHelper
currentRow tree.Datums

// label is a string used to describe the node in an EXPLAIN plan.
label string
}

func (n *bufferNode) startExec(params runParams) error {
n.bufferedRows = rowcontainer.NewRowContainer(
params.EvalContext().Mon.MakeBoundAccount(),
colinfo.ColTypeInfoFromResCols(getPlanColumns(n.plan, false /* mut */)),
)
n.typs = planTypes(n.plan)
n.rows.init(n.typs, params.extendedEvalCtx, n.label)
return nil
}

Expand All @@ -53,20 +49,20 @@ func (n *bufferNode) Next(params runParams) (bool, error) {
if !ok {
return false, nil
}
if _, err = n.bufferedRows.AddRow(params.ctx, n.plan.Values()); err != nil {
n.currentRow = n.plan.Values()
if err = n.rows.addRow(params.ctx, n.currentRow); err != nil {
return false, err
}
n.passThruNextRowIdx++
return true, nil
}

func (n *bufferNode) Values() tree.Datums {
return n.bufferedRows.At(n.passThruNextRowIdx - 1)
return n.currentRow
}

func (n *bufferNode) Close(ctx context.Context) {
n.plan.Close(ctx)
n.bufferedRows.Close(ctx)
n.rows.close(ctx)
}

// scanBufferNode behaves like an iterator into the bufferNode it is
Expand All @@ -75,24 +71,34 @@ func (n *bufferNode) Close(ctx context.Context) {
type scanBufferNode struct {
buffer *bufferNode

nextRowIdx int
iterator *rowContainerIterator
currentRow tree.Datums

// label is a string used to describe the node in an EXPLAIN plan.
label string
}

func (n *scanBufferNode) startExec(runParams) error {
func (n *scanBufferNode) startExec(params runParams) error {
n.iterator = newRowContainerIterator(params.ctx, n.buffer.rows, n.buffer.typs)
return nil
}

func (n *scanBufferNode) Next(runParams) (bool, error) {
n.nextRowIdx++
return n.nextRowIdx <= n.buffer.bufferedRows.Len(), nil
var err error
n.currentRow, err = n.iterator.next()
if n.currentRow == nil || err != nil {
return false, err
}
return true, nil
}

func (n *scanBufferNode) Values() tree.Datums {
return n.buffer.bufferedRows.At(n.nextRowIdx - 1)
return n.currentRow
}

func (n *scanBufferNode) Close(context.Context) {
if n.iterator != nil {
n.iterator.close()
n.iterator = nil
}
}
Loading

0 comments on commit ae7992f

Please sign in to comment.