Skip to content

Commit

Permalink
rowexec: ask for at least 8MiB in the join reader memory limit
Browse files Browse the repository at this point in the history
The join reader doesn't know how to spill to disk, so previously in some
cases (namely, when `distsql_workmem` session variable is low) the
queries would error out. Now this is temporarily fixed by requiring the
memory limit to be at least 8MiB (to accommodate 4MiB scratch input
rows). This shouldn't really matter in the production setting but makes
`tpchvec/disk` roachtest happy.

Release note: None
  • Loading branch information
yuzefovich committed Jul 27, 2021
1 parent 4718beb commit 306f8ed
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 161 deletions.
14 changes: 0 additions & 14 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,20 +949,6 @@ func NewLimitedMonitor(
return limitedMon
}

// NewLimitedMonitorNoDiskSpill is a utility function used by processors to
// create a new limited memory monitor with the given name and start it. The
// returned monitor must be closed. The limit is determined by
// SessionData.WorkMemLimit (stored inside of the flowCtx) but overridden to
// ServerConfig.TestingKnobs.MemoryLimitBytes if that knob is set.
// ServerConfig.TestingKnobs.ForceDiskSpill is ignored by this function.
func NewLimitedMonitorNoDiskSpill(
ctx context.Context, parent *mon.BytesMonitor, flowCtx *FlowCtx, name string,
) *mon.BytesMonitor {
limitedMon := mon.NewMonitorInheritWithLimit(name, GetWorkMemLimitNoDiskSpill(flowCtx), parent)
limitedMon.Start(ctx, parent, mon.BoundAccount{})
return limitedMon
}

// NewLimitedMonitorNoFlowCtx is the same as NewLimitedMonitor and should be
// used when the caller doesn't have an access to *FlowCtx.
func NewLimitedMonitorNoFlowCtx(
Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,6 @@ func GetWorkMemLimit(flowCtx *FlowCtx) int64 {
if flowCtx.Cfg.TestingKnobs.ForceDiskSpill {
return 1
}
return GetWorkMemLimitNoDiskSpill(flowCtx)
}

// GetWorkMemLimitNoDiskSpill returns the number of bytes determining the amount
// of RAM available to a single processor or operator. This function should be
// used instead of GetWorkMemLimit if the processor cannot spill to disk,
// since ServerConfig.TestingKnobs.ForceDiskSpill is ignored by this function.
func GetWorkMemLimitNoDiskSpill(flowCtx *FlowCtx) int64 {
if flowCtx.Cfg.TestingKnobs.MemoryLimitBytes != 0 {
return flowCtx.Cfg.TestingKnobs.MemoryLimitBytes
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/rowexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ go_test(
"//pkg/sql/rowcontainer",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/types",
Expand Down
15 changes: 13 additions & 2 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,21 @@ func newJoinReader(
}
}

// We will create a memory monitor with at least 8MiB of memory limit since
// the join reader doesn't know how to spill to disk. It is most likely that
// if the target limit is below 8MiB, then we're in a test scenario and we
// don't want to error out.
const minMemoryLimit = 8 << 20
memoryLimit := execinfra.GetWorkMemLimit(flowCtx)
if memoryLimit < minMemoryLimit {
memoryLimit = minMemoryLimit
}

// Initialize memory monitors and bound account for data structures in the joinReader.
jr.MemMonitor = execinfra.NewLimitedMonitorNoDiskSpill(
flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, flowCtx, "joinreader-mem",
jr.MemMonitor = mon.NewMonitorInheritWithLimit(
"joinreader-mem" /* name */, memoryLimit, flowCtx.EvalCtx.Mon,
)
jr.MemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.memAcc = jr.MemMonitor.MakeBoundAccount()

if err := jr.initJoinReaderStrategy(flowCtx, columnTypes, len(columnIDs), rightCols, readerType); err != nil {
Expand Down
247 changes: 111 additions & 136 deletions pkg/sql/rowexec/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -851,155 +850,131 @@ func TestJoinReader(t *testing.T) {
// paired joins, so do both.
for _, smallBatch := range []bool{true, false} {
for _, outputContinuation := range []bool{false, true} {
for _, lowMemory := range []bool{false, true} {
if outputContinuation && c.secondJoinInPairedJoin {
// outputContinuation is for the first join in paired-joins, so
// can't do that when this test case is for the second join in
// paired-joins.
continue
if outputContinuation && c.secondJoinInPairedJoin {
// outputContinuation is for the first join in paired-joins, so
// can't do that when this test case is for the second join in
// paired-joins.
continue
}
if outputContinuation && !reqOrdering {
// The first join in paired-joins must preserve ordering.
continue
}
if outputContinuation && len(c.expectedWithContinuation) == 0 {
continue
}
t.Run(fmt.Sprintf("%d/reqOrdering=%t/%s/smallBatch=%t/cont=%t",
i, reqOrdering, c.description, smallBatch, outputContinuation), func(t *testing.T) {
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: st,
TempStorage: tempEngine,
},
Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()),
DiskMonitor: diskMonitor,
}
if outputContinuation && !reqOrdering {
// The first join in paired-joins must preserve ordering.
continue
encRows := make(rowenc.EncDatumRows, len(c.input))
for rowIdx, row := range c.input {
encRow := make(rowenc.EncDatumRow, len(row))
for i, d := range row {
encRow[i] = rowenc.DatumToEncDatum(c.inputTypes[i], d)
}
encRows[rowIdx] = encRow
}
if outputContinuation && len(c.expectedWithContinuation) == 0 {
continue
in := distsqlutils.NewRowBuffer(c.inputTypes, encRows, distsqlutils.RowBufferArgs{})

out := &distsqlutils.RowBuffer{}
post := c.post
if outputContinuation {
post.OutputColumns = append(post.OutputColumns, c.outputColumnForContinuation)
}
if smallBatch && lowMemory {
continue
jr, err := newJoinReader(
&flowCtx,
0, /* processorID */
&execinfrapb.JoinReaderSpec{
Table: *td.TableDesc(),
IndexIdx: c.indexIdx,
LookupColumns: c.lookupCols,
LookupExpr: execinfrapb.Expression{Expr: c.lookupExpr},
RemoteLookupExpr: execinfrapb.Expression{Expr: c.remoteLookupExpr},
OnExpr: execinfrapb.Expression{Expr: c.onExpr},
Type: c.joinType,
MaintainOrdering: reqOrdering,
LeftJoinWithPairedJoiner: c.secondJoinInPairedJoin,
OutputGroupContinuationForLeftRow: outputContinuation,
},
in,
&post,
out,
lookupJoinReaderType,
)
if err != nil {
t.Fatal(err)
}
t.Run(fmt.Sprintf("%d/reqOrdering=%t/%s/smallBatch=%t/cont=%t/lowMem=%t",
i, reqOrdering, c.description, smallBatch, outputContinuation, lowMemory), func(t *testing.T) {
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: st,
TempStorage: tempEngine,
},
Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()),
DiskMonitor: diskMonitor,
}
encRows := make(rowenc.EncDatumRows, len(c.input))
for rowIdx, row := range c.input {
encRow := make(rowenc.EncDatumRow, len(row))
for i, d := range row {
encRow[i] = rowenc.DatumToEncDatum(c.inputTypes[i], d)
}
encRows[rowIdx] = encRow
}
in := distsqlutils.NewRowBuffer(c.inputTypes, encRows, distsqlutils.RowBufferArgs{})

if lowMemory {
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = int64(encRows[0].Size() * 2)
}

out := &distsqlutils.RowBuffer{}
post := c.post
if outputContinuation {
post.OutputColumns = append(post.OutputColumns, c.outputColumnForContinuation)
}
jr, err := newJoinReader(
&flowCtx,
0, /* processorID */
&execinfrapb.JoinReaderSpec{
Table: *td.TableDesc(),
IndexIdx: c.indexIdx,
LookupColumns: c.lookupCols,
LookupExpr: execinfrapb.Expression{Expr: c.lookupExpr},
RemoteLookupExpr: execinfrapb.Expression{Expr: c.remoteLookupExpr},
OnExpr: execinfrapb.Expression{Expr: c.onExpr},
Type: c.joinType,
MaintainOrdering: reqOrdering,
LeftJoinWithPairedJoiner: c.secondJoinInPairedJoin,
OutputGroupContinuationForLeftRow: outputContinuation,
},
in,
&post,
out,
lookupJoinReaderType,
)
if err != nil {
t.Fatal(err)
}

if smallBatch {
// Set a lower batch size to force multiple batches.
jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 2))
}
// Else, use the default.
if smallBatch {
// Set a lower batch size to force multiple batches.
jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 2))
}
// Else, use the default.

jr.Run(ctx)
jr.Run(ctx)

if !in.Done {
t.Fatal("joinReader didn't consume all the rows")
}
if !out.ProducerClosed() {
t.Fatalf("output RowReceiver not closed")
}
if !in.Done {
t.Fatal("joinReader didn't consume all the rows")
}
if !out.ProducerClosed() {
t.Fatalf("output RowReceiver not closed")
}

var res rowenc.EncDatumRows
var gotOutOfMemoryError bool
for {
row, meta := out.Next()
if meta != nil {
if lowMemory && meta.Err != nil {
if !sqlerrors.IsOutOfMemoryError(meta.Err) {
t.Fatalf("unexpected metadata %+v", meta)
}
gotOutOfMemoryError = true
} else if meta.Metrics == nil {
t.Fatalf("unexpected metadata %+v", meta)
}
}
if row == nil {
break
}
res = append(res, row)
var res rowenc.EncDatumRows
for {
row, meta := out.Next()
if meta != nil && meta.Metrics == nil {
t.Fatalf("unexpected metadata %+v", meta)
}

if lowMemory {
if gotOutOfMemoryError {
return
}
t.Fatal("expected out of memory error but it did not occur")
if row == nil {
break
}
res = append(res, row)
}

// processOutputRows is a helper function that takes a stringified
// EncDatumRows output (e.g. [[1 2] [3 1]]) and returns a slice of
// stringified rows without brackets (e.g. []string{"1 2", "3 1"}).
processOutputRows := func(output string) []string {
// Comma-separate the rows.
output = strings.ReplaceAll(output, "] [", ",")
// Remove leading and trailing bracket.
output = strings.Trim(output, "[]")
// Split on the commas that were introduced and return that.
return strings.Split(output, ",")
}
// processOutputRows is a helper function that takes a stringified
// EncDatumRows output (e.g. [[1 2] [3 1]]) and returns a slice of
// stringified rows without brackets (e.g. []string{"1 2", "3 1"}).
processOutputRows := func(output string) []string {
// Comma-separate the rows.
output = strings.ReplaceAll(output, "] [", ",")
// Remove leading and trailing bracket.
output = strings.Trim(output, "[]")
// Split on the commas that were introduced and return that.
return strings.Split(output, ",")
}

outputTypes := c.outputTypes
if outputContinuation {
outputTypes = append(outputTypes, types.Bool)
}
result := processOutputRows(res.String(outputTypes))
var expected []string
if outputContinuation {
expected = processOutputRows(c.expectedWithContinuation)
} else {
expected = processOutputRows(c.expected)
}
outputTypes := c.outputTypes
if outputContinuation {
outputTypes = append(outputTypes, types.Bool)
}
result := processOutputRows(res.String(outputTypes))
var expected []string
if outputContinuation {
expected = processOutputRows(c.expectedWithContinuation)
} else {
expected = processOutputRows(c.expected)
}

if !reqOrdering {
// An ordering was not required, so sort both the result and
// expected slice to reuse equality comparison.
sort.Strings(result)
sort.Strings(expected)
}
if !reqOrdering {
// An ordering was not required, so sort both the result and
// expected slice to reuse equality comparison.
sort.Strings(result)
sort.Strings(expected)
}

require.Equal(t, expected, result)
})
}
require.Equal(t, expected, result)
})
}
}
}
Expand Down

0 comments on commit 306f8ed

Please sign in to comment.