Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowexec: ask for at least 8MiB in the join reader memory limit #68119

Merged
merged 1 commit into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,20 +949,6 @@ func NewLimitedMonitor(
return limitedMon
}

// NewLimitedMonitorNoDiskSpill is a utility function used by processors to
// create a new limited memory monitor with the given name and start it. The
// returned monitor must be closed. The limit is determined by
// SessionData.WorkMemLimit (stored inside of the flowCtx) but overridden to
// ServerConfig.TestingKnobs.MemoryLimitBytes if that knob is set.
// ServerConfig.TestingKnobs.ForceDiskSpill is ignored by this function.
func NewLimitedMonitorNoDiskSpill(
ctx context.Context, parent *mon.BytesMonitor, flowCtx *FlowCtx, name string,
) *mon.BytesMonitor {
limitedMon := mon.NewMonitorInheritWithLimit(name, GetWorkMemLimitNoDiskSpill(flowCtx), parent)
limitedMon.Start(ctx, parent, mon.BoundAccount{})
return limitedMon
}

// NewLimitedMonitorNoFlowCtx is the same as NewLimitedMonitor and should be
// used when the caller doesn't have an access to *FlowCtx.
func NewLimitedMonitorNoFlowCtx(
Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,6 @@ func GetWorkMemLimit(flowCtx *FlowCtx) int64 {
if flowCtx.Cfg.TestingKnobs.ForceDiskSpill {
return 1
}
return GetWorkMemLimitNoDiskSpill(flowCtx)
}

// GetWorkMemLimitNoDiskSpill returns the number of bytes determining the amount
// of RAM available to a single processor or operator. This function should be
// used instead of GetWorkMemLimit if the processor cannot spill to disk,
// since ServerConfig.TestingKnobs.ForceDiskSpill is ignored by this function.
func GetWorkMemLimitNoDiskSpill(flowCtx *FlowCtx) int64 {
if flowCtx.Cfg.TestingKnobs.MemoryLimitBytes != 0 {
return flowCtx.Cfg.TestingKnobs.MemoryLimitBytes
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/rowexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ go_test(
"//pkg/sql/rowcontainer",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/types",
Expand Down
15 changes: 13 additions & 2 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,21 @@ func newJoinReader(
}
}

// We will create a memory monitor with at least 8MiB of memory limit since
// the join reader doesn't know how to spill to disk. It is most likely that
// if the target limit is below 8MiB, then we're in a test scenario and we
// don't want to error out.
const minMemoryLimit = 8 << 20
memoryLimit := execinfra.GetWorkMemLimit(flowCtx)
if memoryLimit < minMemoryLimit {
memoryLimit = minMemoryLimit
}

// Initialize memory monitors and bound account for data structures in the joinReader.
jr.MemMonitor = execinfra.NewLimitedMonitorNoDiskSpill(
flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, flowCtx, "joinreader-mem",
jr.MemMonitor = mon.NewMonitorInheritWithLimit(
"joinreader-mem" /* name */, memoryLimit, flowCtx.EvalCtx.Mon,
)
jr.MemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.memAcc = jr.MemMonitor.MakeBoundAccount()

if err := jr.initJoinReaderStrategy(flowCtx, columnTypes, len(columnIDs), rightCols, readerType); err != nil {
Expand Down
247 changes: 111 additions & 136 deletions pkg/sql/rowexec/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -851,155 +850,131 @@ func TestJoinReader(t *testing.T) {
// paired joins, so do both.
for _, smallBatch := range []bool{true, false} {
for _, outputContinuation := range []bool{false, true} {
for _, lowMemory := range []bool{false, true} {
if outputContinuation && c.secondJoinInPairedJoin {
// outputContinuation is for the first join in paired-joins, so
// can't do that when this test case is for the second join in
// paired-joins.
continue
if outputContinuation && c.secondJoinInPairedJoin {
// outputContinuation is for the first join in paired-joins, so
// can't do that when this test case is for the second join in
// paired-joins.
continue
}
if outputContinuation && !reqOrdering {
// The first join in paired-joins must preserve ordering.
continue
}
if outputContinuation && len(c.expectedWithContinuation) == 0 {
continue
}
t.Run(fmt.Sprintf("%d/reqOrdering=%t/%s/smallBatch=%t/cont=%t",
i, reqOrdering, c.description, smallBatch, outputContinuation), func(t *testing.T) {
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: st,
TempStorage: tempEngine,
},
Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()),
DiskMonitor: diskMonitor,
}
if outputContinuation && !reqOrdering {
// The first join in paired-joins must preserve ordering.
continue
encRows := make(rowenc.EncDatumRows, len(c.input))
for rowIdx, row := range c.input {
encRow := make(rowenc.EncDatumRow, len(row))
for i, d := range row {
encRow[i] = rowenc.DatumToEncDatum(c.inputTypes[i], d)
}
encRows[rowIdx] = encRow
}
if outputContinuation && len(c.expectedWithContinuation) == 0 {
continue
in := distsqlutils.NewRowBuffer(c.inputTypes, encRows, distsqlutils.RowBufferArgs{})

out := &distsqlutils.RowBuffer{}
post := c.post
if outputContinuation {
post.OutputColumns = append(post.OutputColumns, c.outputColumnForContinuation)
}
if smallBatch && lowMemory {
continue
jr, err := newJoinReader(
&flowCtx,
0, /* processorID */
&execinfrapb.JoinReaderSpec{
Table: *td.TableDesc(),
IndexIdx: c.indexIdx,
LookupColumns: c.lookupCols,
LookupExpr: execinfrapb.Expression{Expr: c.lookupExpr},
RemoteLookupExpr: execinfrapb.Expression{Expr: c.remoteLookupExpr},
OnExpr: execinfrapb.Expression{Expr: c.onExpr},
Type: c.joinType,
MaintainOrdering: reqOrdering,
LeftJoinWithPairedJoiner: c.secondJoinInPairedJoin,
OutputGroupContinuationForLeftRow: outputContinuation,
},
in,
&post,
out,
lookupJoinReaderType,
)
if err != nil {
t.Fatal(err)
}
t.Run(fmt.Sprintf("%d/reqOrdering=%t/%s/smallBatch=%t/cont=%t/lowMem=%t",
i, reqOrdering, c.description, smallBatch, outputContinuation, lowMemory), func(t *testing.T) {
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: st,
TempStorage: tempEngine,
},
Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()),
DiskMonitor: diskMonitor,
}
encRows := make(rowenc.EncDatumRows, len(c.input))
for rowIdx, row := range c.input {
encRow := make(rowenc.EncDatumRow, len(row))
for i, d := range row {
encRow[i] = rowenc.DatumToEncDatum(c.inputTypes[i], d)
}
encRows[rowIdx] = encRow
}
in := distsqlutils.NewRowBuffer(c.inputTypes, encRows, distsqlutils.RowBufferArgs{})

if lowMemory {
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = int64(encRows[0].Size() * 2)
}

out := &distsqlutils.RowBuffer{}
post := c.post
if outputContinuation {
post.OutputColumns = append(post.OutputColumns, c.outputColumnForContinuation)
}
jr, err := newJoinReader(
&flowCtx,
0, /* processorID */
&execinfrapb.JoinReaderSpec{
Table: *td.TableDesc(),
IndexIdx: c.indexIdx,
LookupColumns: c.lookupCols,
LookupExpr: execinfrapb.Expression{Expr: c.lookupExpr},
RemoteLookupExpr: execinfrapb.Expression{Expr: c.remoteLookupExpr},
OnExpr: execinfrapb.Expression{Expr: c.onExpr},
Type: c.joinType,
MaintainOrdering: reqOrdering,
LeftJoinWithPairedJoiner: c.secondJoinInPairedJoin,
OutputGroupContinuationForLeftRow: outputContinuation,
},
in,
&post,
out,
lookupJoinReaderType,
)
if err != nil {
t.Fatal(err)
}

if smallBatch {
// Set a lower batch size to force multiple batches.
jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 2))
}
// Else, use the default.
if smallBatch {
// Set a lower batch size to force multiple batches.
jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 2))
}
// Else, use the default.

jr.Run(ctx)
jr.Run(ctx)

if !in.Done {
t.Fatal("joinReader didn't consume all the rows")
}
if !out.ProducerClosed() {
t.Fatalf("output RowReceiver not closed")
}
if !in.Done {
t.Fatal("joinReader didn't consume all the rows")
}
if !out.ProducerClosed() {
t.Fatalf("output RowReceiver not closed")
}

var res rowenc.EncDatumRows
var gotOutOfMemoryError bool
for {
row, meta := out.Next()
if meta != nil {
if lowMemory && meta.Err != nil {
if !sqlerrors.IsOutOfMemoryError(meta.Err) {
t.Fatalf("unexpected metadata %+v", meta)
}
gotOutOfMemoryError = true
} else if meta.Metrics == nil {
t.Fatalf("unexpected metadata %+v", meta)
}
}
if row == nil {
break
}
res = append(res, row)
var res rowenc.EncDatumRows
for {
row, meta := out.Next()
if meta != nil && meta.Metrics == nil {
t.Fatalf("unexpected metadata %+v", meta)
}

if lowMemory {
if gotOutOfMemoryError {
return
}
t.Fatal("expected out of memory error but it did not occur")
if row == nil {
break
}
res = append(res, row)
}

// processOutputRows is a helper function that takes a stringified
// EncDatumRows output (e.g. [[1 2] [3 1]]) and returns a slice of
// stringified rows without brackets (e.g. []string{"1 2", "3 1"}).
processOutputRows := func(output string) []string {
// Comma-separate the rows.
output = strings.ReplaceAll(output, "] [", ",")
// Remove leading and trailing bracket.
output = strings.Trim(output, "[]")
// Split on the commas that were introduced and return that.
return strings.Split(output, ",")
}
// processOutputRows is a helper function that takes a stringified
// EncDatumRows output (e.g. [[1 2] [3 1]]) and returns a slice of
// stringified rows without brackets (e.g. []string{"1 2", "3 1"}).
processOutputRows := func(output string) []string {
// Comma-separate the rows.
output = strings.ReplaceAll(output, "] [", ",")
// Remove leading and trailing bracket.
output = strings.Trim(output, "[]")
// Split on the commas that were introduced and return that.
return strings.Split(output, ",")
}

outputTypes := c.outputTypes
if outputContinuation {
outputTypes = append(outputTypes, types.Bool)
}
result := processOutputRows(res.String(outputTypes))
var expected []string
if outputContinuation {
expected = processOutputRows(c.expectedWithContinuation)
} else {
expected = processOutputRows(c.expected)
}
outputTypes := c.outputTypes
if outputContinuation {
outputTypes = append(outputTypes, types.Bool)
}
result := processOutputRows(res.String(outputTypes))
var expected []string
if outputContinuation {
expected = processOutputRows(c.expectedWithContinuation)
} else {
expected = processOutputRows(c.expected)
}

if !reqOrdering {
// An ordering was not required, so sort both the result and
// expected slice to reuse equality comparison.
sort.Strings(result)
sort.Strings(expected)
}
if !reqOrdering {
// An ordering was not required, so sort both the result and
// expected slice to reuse equality comparison.
sort.Strings(result)
sort.Strings(expected)
}

require.Equal(t, expected, result)
})
}
require.Equal(t, expected, result)
})
}
}
}
Expand Down