From 306f8edada31e3386e0414eb8a69436b67fd11d8 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 27 Jul 2021 10:21:43 -0700 Subject: [PATCH] rowexec: ask for at least 8MiB in the join reader memory limit The join reader doesn't know how to spill to disk, so previously in some cases (namely, when `distsql_workmem` session variable is low) the queries would error out. Now this is temporarily fixed by requiring the memory limit to be at least 8MiB (to accommodate 4MiB scratch input rows). This shouldn't really matter in the production setting but makes `tpchvec/disk` roachtest happy. Release note: None --- pkg/sql/execinfra/processorsbase.go | 14 -- pkg/sql/execinfra/server_config.go | 8 - pkg/sql/rowexec/BUILD.bazel | 1 - pkg/sql/rowexec/joinreader.go | 15 +- pkg/sql/rowexec/joinreader_test.go | 247 +++++++++++++--------------- 5 files changed, 124 insertions(+), 161 deletions(-) diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 65a7a4383462..0466cf3228f1 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -949,20 +949,6 @@ func NewLimitedMonitor( return limitedMon } -// NewLimitedMonitorNoDiskSpill is a utility function used by processors to -// create a new limited memory monitor with the given name and start it. The -// returned monitor must be closed. The limit is determined by -// SessionData.WorkMemLimit (stored inside of the flowCtx) but overridden to -// ServerConfig.TestingKnobs.MemoryLimitBytes if that knob is set. -// ServerConfig.TestingKnobs.ForceDiskSpill is ignored by this function. -func NewLimitedMonitorNoDiskSpill( - ctx context.Context, parent *mon.BytesMonitor, flowCtx *FlowCtx, name string, -) *mon.BytesMonitor { - limitedMon := mon.NewMonitorInheritWithLimit(name, GetWorkMemLimitNoDiskSpill(flowCtx), parent) - limitedMon.Start(ctx, parent, mon.BoundAccount{}) - return limitedMon -} - // NewLimitedMonitorNoFlowCtx is the same as NewLimitedMonitor and should be // used when the caller doesn't have an access to *FlowCtx. func NewLimitedMonitorNoFlowCtx( diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index d9390b53bf5d..381556bcabed 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -268,14 +268,6 @@ func GetWorkMemLimit(flowCtx *FlowCtx) int64 { if flowCtx.Cfg.TestingKnobs.ForceDiskSpill { return 1 } - return GetWorkMemLimitNoDiskSpill(flowCtx) -} - -// GetWorkMemLimitNoDiskSpill returns the number of bytes determining the amount -// of RAM available to a single processor or operator. This function should be -// used instead of GetWorkMemLimit if the processor cannot spill to disk, -// since ServerConfig.TestingKnobs.ForceDiskSpill is ignored by this function. -func GetWorkMemLimitNoDiskSpill(flowCtx *FlowCtx) int64 { if flowCtx.Cfg.TestingKnobs.MemoryLimitBytes != 0 { return flowCtx.Cfg.TestingKnobs.MemoryLimitBytes } diff --git a/pkg/sql/rowexec/BUILD.bazel b/pkg/sql/rowexec/BUILD.bazel index b64b122c916b..cfdd5927f004 100644 --- a/pkg/sql/rowexec/BUILD.bazel +++ b/pkg/sql/rowexec/BUILD.bazel @@ -158,7 +158,6 @@ go_test( "//pkg/sql/rowcontainer", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", - "//pkg/sql/sqlerrors", "//pkg/sql/sqlutil", "//pkg/sql/stats", "//pkg/sql/types", diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 85b1749587c9..e7265db5b8ff 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -373,10 +373,21 @@ func newJoinReader( } } + // We will create a memory monitor with at least 8MiB of memory limit since + // the join reader doesn't know how to spill to disk. It is most likely that + // if the target limit is below 8MiB, then we're in a test scenario and we + // don't want to error out. + const minMemoryLimit = 8 << 20 + memoryLimit := execinfra.GetWorkMemLimit(flowCtx) + if memoryLimit < minMemoryLimit { + memoryLimit = minMemoryLimit + } + // Initialize memory monitors and bound account for data structures in the joinReader. - jr.MemMonitor = execinfra.NewLimitedMonitorNoDiskSpill( - flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, flowCtx, "joinreader-mem", + jr.MemMonitor = mon.NewMonitorInheritWithLimit( + "joinreader-mem" /* name */, memoryLimit, flowCtx.EvalCtx.Mon, ) + jr.MemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) jr.memAcc = jr.MemMonitor.MakeBoundAccount() if err := jr.initJoinReaderStrategy(flowCtx, columnTypes, len(columnIDs), rightCols, readerType); err != nil { diff --git a/pkg/sql/rowexec/joinreader_test.go b/pkg/sql/rowexec/joinreader_test.go index 6183250de3bd..e30d8274be75 100644 --- a/pkg/sql/rowexec/joinreader_test.go +++ b/pkg/sql/rowexec/joinreader_test.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -851,155 +850,131 @@ func TestJoinReader(t *testing.T) { // paired joins, so do both. for _, smallBatch := range []bool{true, false} { for _, outputContinuation := range []bool{false, true} { - for _, lowMemory := range []bool{false, true} { - if outputContinuation && c.secondJoinInPairedJoin { - // outputContinuation is for the first join in paired-joins, so - // can't do that when this test case is for the second join in - // paired-joins. - continue + if outputContinuation && c.secondJoinInPairedJoin { + // outputContinuation is for the first join in paired-joins, so + // can't do that when this test case is for the second join in + // paired-joins. + continue + } + if outputContinuation && !reqOrdering { + // The first join in paired-joins must preserve ordering. + continue + } + if outputContinuation && len(c.expectedWithContinuation) == 0 { + continue + } + t.Run(fmt.Sprintf("%d/reqOrdering=%t/%s/smallBatch=%t/cont=%t", + i, reqOrdering, c.description, smallBatch, outputContinuation), func(t *testing.T) { + evalCtx := tree.MakeTestingEvalContext(st) + defer evalCtx.Stop(ctx) + flowCtx := execinfra.FlowCtx{ + EvalCtx: &evalCtx, + Cfg: &execinfra.ServerConfig{ + Settings: st, + TempStorage: tempEngine, + }, + Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()), + DiskMonitor: diskMonitor, } - if outputContinuation && !reqOrdering { - // The first join in paired-joins must preserve ordering. - continue + encRows := make(rowenc.EncDatumRows, len(c.input)) + for rowIdx, row := range c.input { + encRow := make(rowenc.EncDatumRow, len(row)) + for i, d := range row { + encRow[i] = rowenc.DatumToEncDatum(c.inputTypes[i], d) + } + encRows[rowIdx] = encRow } - if outputContinuation && len(c.expectedWithContinuation) == 0 { - continue + in := distsqlutils.NewRowBuffer(c.inputTypes, encRows, distsqlutils.RowBufferArgs{}) + + out := &distsqlutils.RowBuffer{} + post := c.post + if outputContinuation { + post.OutputColumns = append(post.OutputColumns, c.outputColumnForContinuation) } - if smallBatch && lowMemory { - continue + jr, err := newJoinReader( + &flowCtx, + 0, /* processorID */ + &execinfrapb.JoinReaderSpec{ + Table: *td.TableDesc(), + IndexIdx: c.indexIdx, + LookupColumns: c.lookupCols, + LookupExpr: execinfrapb.Expression{Expr: c.lookupExpr}, + RemoteLookupExpr: execinfrapb.Expression{Expr: c.remoteLookupExpr}, + OnExpr: execinfrapb.Expression{Expr: c.onExpr}, + Type: c.joinType, + MaintainOrdering: reqOrdering, + LeftJoinWithPairedJoiner: c.secondJoinInPairedJoin, + OutputGroupContinuationForLeftRow: outputContinuation, + }, + in, + &post, + out, + lookupJoinReaderType, + ) + if err != nil { + t.Fatal(err) } - t.Run(fmt.Sprintf("%d/reqOrdering=%t/%s/smallBatch=%t/cont=%t/lowMem=%t", - i, reqOrdering, c.description, smallBatch, outputContinuation, lowMemory), func(t *testing.T) { - evalCtx := tree.MakeTestingEvalContext(st) - defer evalCtx.Stop(ctx) - flowCtx := execinfra.FlowCtx{ - EvalCtx: &evalCtx, - Cfg: &execinfra.ServerConfig{ - Settings: st, - TempStorage: tempEngine, - }, - Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()), - DiskMonitor: diskMonitor, - } - encRows := make(rowenc.EncDatumRows, len(c.input)) - for rowIdx, row := range c.input { - encRow := make(rowenc.EncDatumRow, len(row)) - for i, d := range row { - encRow[i] = rowenc.DatumToEncDatum(c.inputTypes[i], d) - } - encRows[rowIdx] = encRow - } - in := distsqlutils.NewRowBuffer(c.inputTypes, encRows, distsqlutils.RowBufferArgs{}) - - if lowMemory { - flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = int64(encRows[0].Size() * 2) - } - out := &distsqlutils.RowBuffer{} - post := c.post - if outputContinuation { - post.OutputColumns = append(post.OutputColumns, c.outputColumnForContinuation) - } - jr, err := newJoinReader( - &flowCtx, - 0, /* processorID */ - &execinfrapb.JoinReaderSpec{ - Table: *td.TableDesc(), - IndexIdx: c.indexIdx, - LookupColumns: c.lookupCols, - LookupExpr: execinfrapb.Expression{Expr: c.lookupExpr}, - RemoteLookupExpr: execinfrapb.Expression{Expr: c.remoteLookupExpr}, - OnExpr: execinfrapb.Expression{Expr: c.onExpr}, - Type: c.joinType, - MaintainOrdering: reqOrdering, - LeftJoinWithPairedJoiner: c.secondJoinInPairedJoin, - OutputGroupContinuationForLeftRow: outputContinuation, - }, - in, - &post, - out, - lookupJoinReaderType, - ) - if err != nil { - t.Fatal(err) - } - - if smallBatch { - // Set a lower batch size to force multiple batches. - jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 2)) - } - // Else, use the default. + if smallBatch { + // Set a lower batch size to force multiple batches. + jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 2)) + } + // Else, use the default. - jr.Run(ctx) + jr.Run(ctx) - if !in.Done { - t.Fatal("joinReader didn't consume all the rows") - } - if !out.ProducerClosed() { - t.Fatalf("output RowReceiver not closed") - } + if !in.Done { + t.Fatal("joinReader didn't consume all the rows") + } + if !out.ProducerClosed() { + t.Fatalf("output RowReceiver not closed") + } - var res rowenc.EncDatumRows - var gotOutOfMemoryError bool - for { - row, meta := out.Next() - if meta != nil { - if lowMemory && meta.Err != nil { - if !sqlerrors.IsOutOfMemoryError(meta.Err) { - t.Fatalf("unexpected metadata %+v", meta) - } - gotOutOfMemoryError = true - } else if meta.Metrics == nil { - t.Fatalf("unexpected metadata %+v", meta) - } - } - if row == nil { - break - } - res = append(res, row) + var res rowenc.EncDatumRows + for { + row, meta := out.Next() + if meta != nil && meta.Metrics == nil { + t.Fatalf("unexpected metadata %+v", meta) } - - if lowMemory { - if gotOutOfMemoryError { - return - } - t.Fatal("expected out of memory error but it did not occur") + if row == nil { + break } + res = append(res, row) + } - // processOutputRows is a helper function that takes a stringified - // EncDatumRows output (e.g. [[1 2] [3 1]]) and returns a slice of - // stringified rows without brackets (e.g. []string{"1 2", "3 1"}). - processOutputRows := func(output string) []string { - // Comma-separate the rows. - output = strings.ReplaceAll(output, "] [", ",") - // Remove leading and trailing bracket. - output = strings.Trim(output, "[]") - // Split on the commas that were introduced and return that. - return strings.Split(output, ",") - } + // processOutputRows is a helper function that takes a stringified + // EncDatumRows output (e.g. [[1 2] [3 1]]) and returns a slice of + // stringified rows without brackets (e.g. []string{"1 2", "3 1"}). + processOutputRows := func(output string) []string { + // Comma-separate the rows. + output = strings.ReplaceAll(output, "] [", ",") + // Remove leading and trailing bracket. + output = strings.Trim(output, "[]") + // Split on the commas that were introduced and return that. + return strings.Split(output, ",") + } - outputTypes := c.outputTypes - if outputContinuation { - outputTypes = append(outputTypes, types.Bool) - } - result := processOutputRows(res.String(outputTypes)) - var expected []string - if outputContinuation { - expected = processOutputRows(c.expectedWithContinuation) - } else { - expected = processOutputRows(c.expected) - } + outputTypes := c.outputTypes + if outputContinuation { + outputTypes = append(outputTypes, types.Bool) + } + result := processOutputRows(res.String(outputTypes)) + var expected []string + if outputContinuation { + expected = processOutputRows(c.expectedWithContinuation) + } else { + expected = processOutputRows(c.expected) + } - if !reqOrdering { - // An ordering was not required, so sort both the result and - // expected slice to reuse equality comparison. - sort.Strings(result) - sort.Strings(expected) - } + if !reqOrdering { + // An ordering was not required, so sort both the result and + // expected slice to reuse equality comparison. + sort.Strings(result) + sort.Strings(expected) + } - require.Equal(t, expected, result) - }) - } + require.Equal(t, expected, result) + }) } } }