diff --git a/pkg/sql/rowcontainer/row_container.go b/pkg/sql/rowcontainer/row_container.go index 427bdc29d514..d85ab61ec7c9 100644 --- a/pkg/sql/rowcontainer/row_container.go +++ b/pkg/sql/rowcontainer/row_container.go @@ -534,8 +534,13 @@ type DiskBackedIndexedRowContainer struct { // [firstCachedRowPos, nextPosToCache). firstCachedRowPos int nextPosToCache int - indexedRowsCache ring.Buffer // the cache of up to maxIndexedRowsCacheSize contiguous rows - cacheMemAcc mon.BoundAccount + // indexedRowsCache is the cache of up to maxCacheSize contiguous rows. + indexedRowsCache ring.Buffer + // maxCacheSize indicates the maximum number of rows to be cached. It is + // initialized to maxIndexedRowsCacheSize and dynamically adjusted if OOM + // error is encountered. + maxCacheSize int + cacheMemAcc mon.BoundAccount } // MakeDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer @@ -570,6 +575,7 @@ func MakeDiskBackedIndexedRowContainer( d.scratchEncRow = make(sqlbase.EncDatumRow, len(d.storedTypes)) d.DiskBackedRowContainer = &DiskBackedRowContainer{} d.DiskBackedRowContainer.Init(ordering, d.storedTypes, evalCtx, engine, memoryMonitor, diskMonitor, rowCapacity) + d.maxCacheSize = maxIndexedRowsCacheSize d.cacheMemAcc = memoryMonitor.MakeBoundAccount() return &d } @@ -692,28 +698,38 @@ func (f *DiskBackedIndexedRowContainer) GetRow( } row, rowIdx := rowWithIdx[:len(rowWithIdx)-1], rowWithIdx[len(rowWithIdx)-1].Datum if idx, ok := rowIdx.(*tree.DInt); ok { - if f.indexedRowsCache.Len() == maxIndexedRowsCacheSize { - // The cache size is capped at maxIndexedRowsCacheSize, so we first - // remove the row with the smallest pos and advance + if f.indexedRowsCache.Len() == f.maxCacheSize { + // The cache size is capped at f.maxCacheSize, so we reuse the row + // with the smallest pos, put it as the last row, and advance // f.firstCachedRowPos. - usage := sizeOfInt + int64(f.indexedRowsCache.GetFirst().(IndexedRow).Row.Size()) - // TODO(yuzefovich): extend ring.Buffer to allow for reusing the - // allocated memory instead of allocating new one for every row. - f.indexedRowsCache.RemoveFirst() - // TODO(yuzefovich): investigate whether the pattern of growing and - // shrinking the memory account can be optimized. - f.cacheMemAcc.Shrink(ctx, usage) - f.firstCachedRowPos++ + if err := f.reuseFirstRowInCache(ctx, int(*idx), row); err != nil { + return nil, err + } + } else { + // We choose to ignore minor details like IndexedRow overhead and + // the cache overhead. + usage := sizeOfInt + int64(row.Size()) + if err := f.cacheMemAcc.Grow(ctx, usage); err != nil { + if sqlbase.IsOutOfMemoryError(err) { + // We hit the memory limit, so we need to cap the cache size + // and reuse the memory underlying first row in the cache. + if f.indexedRowsCache.Len() == 0 { + // The cache is empty, so there is no memory to be reused. + return nil, err + } + f.maxCacheSize = f.indexedRowsCache.Len() + if err := f.reuseFirstRowInCache(ctx, int(*idx), row); err != nil { + return nil, err + } + } else { + return nil, err + } + } else { + // We actually need to copy the row into memory. + ir := IndexedRow{int(*idx), f.rowAlloc.CopyRow(row)} + f.indexedRowsCache.AddLast(ir) + } } - // We choose to ignore minor details like IndexedRow overhead and - // the cache overhead. - usage := sizeOfInt + int64(row.Size()) - if err := f.cacheMemAcc.Grow(ctx, usage); err != nil { - return nil, err - } - // We actually need to copy the row into memory. - ir := IndexedRow{int(*idx), f.rowAlloc.CopyRow(row)} - f.indexedRowsCache.AddLast(ir) f.nextPosToCache++ } else { return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", idx) @@ -733,6 +749,50 @@ func (f *DiskBackedIndexedRowContainer) GetRow( return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", rowIdx) } +// reuseFirstRowInCache reuses the underlying memory of the first row in the +// cache to store 'row' and puts it as the last one in the cache. It adjusts +// the memory account accordingly and, if necessary, removes some first rows. +func (f *DiskBackedIndexedRowContainer) reuseFirstRowInCache( + ctx context.Context, idx int, row sqlbase.EncDatumRow, +) error { + newRowSize := row.Size() + for { + if f.indexedRowsCache.Len() == 0 { + return errors.Errorf("unexpectedly the cache of DiskBackedIndexedRowContainer contains zero rows") + } + indexedRowToReuse := f.indexedRowsCache.GetFirst().(IndexedRow) + oldRowSize := indexedRowToReuse.Row.Size() + delta := int64(newRowSize - oldRowSize) + if delta > 0 { + // New row takes up more memory than the old one. + if err := f.cacheMemAcc.Grow(ctx, delta); err != nil { + if sqlbase.IsOutOfMemoryError(err) { + // We need to actually reduce the cache size, so we remove the first + // row and adjust the memory account, maxCacheSize, and + // f.firstCachedRowPos accordingly. + f.indexedRowsCache.RemoveFirst() + f.cacheMemAcc.Shrink(ctx, int64(oldRowSize)) + f.maxCacheSize-- + f.firstCachedRowPos++ + if f.indexedRowsCache.Len() == 0 { + return err + } + continue + } + return err + } + } else if delta < 0 { + f.cacheMemAcc.Shrink(ctx, -delta) + } + indexedRowToReuse.Idx = idx + copy(indexedRowToReuse.Row, row) + f.indexedRowsCache.RemoveFirst() + f.indexedRowsCache.AddLast(indexedRowToReuse) + f.firstCachedRowPos++ + return nil + } +} + // getRowWithoutCache returns the row at requested position without using the // cache. It utilizes the same disk row iterator along multiple consequent // calls and rewinds the iterator only when it has been advanced further than diff --git a/pkg/sql/rowcontainer/row_container_test.go b/pkg/sql/rowcontainer/row_container_test.go index 8db929b6a8ef..840a82712a30 100644 --- a/pkg/sql/rowcontainer/row_container_test.go +++ b/pkg/sql/rowcontainer/row_container_test.go @@ -316,6 +316,7 @@ func TestDiskBackedRowContainer(t *testing.T) { memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) defer memoryMonitor.Stop(ctx) diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + defer diskMonitor.Stop(ctx) defer func() { if err := rc.UnsafeReset(ctx); err != nil { @@ -411,7 +412,6 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { const numTestRuns = 10 const numRows = 10 const numCols = 2 - rows := make([]sqlbase.EncDatumRow, numRows) ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} newOrdering := sqlbase.ColumnOrdering{{ColIdx: 1, Direction: encoding.Ascending}} @@ -427,6 +427,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { // index). t.Run("SpillingHalfway", func(t *testing.T) { for i := 0; i < numTestRuns; i++ { + rows := make([]sqlbase.EncDatumRow, numRows) types := sqlbase.RandSortingColumnTypes(rng, numCols) for i := 0; i < numRows; i++ { rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types) @@ -482,6 +483,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { // to be returned. Then, it spills to disk and does the same check again. t.Run("TestGetRow", func(t *testing.T) { for i := 0; i < numTestRuns; i++ { + rows := make([]sqlbase.EncDatumRow, numRows) sortedRows := indexedRows{rows: make([]IndexedRow, numRows)} types := sqlbase.RandSortingColumnTypes(rng, numCols) for i := 0; i < numRows; i++ { @@ -489,7 +491,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { sortedRows.rows[i] = IndexedRow{Idx: i, Row: rows[i]} } - sorter := sorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering} + sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering} sort.Sort(&sorter) if sorter.err != nil { t.Fatal(sorter.err) @@ -559,6 +561,86 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { } }) + // TestGetRowFromDiskWithLimitedMemory forces the container to spill to disk, + // adds all rows to it, sorts them, and checks that both the index and the + // row are what we expect by GetRow() to be returned. The goal is to test the + // behavior of capping the cache size and reusing the memory of the first + // rows in the cache, so we use the memory budget that accommodates only + // about half of all rows in the cache. + t.Run("TestGetRowWithLimitedMemory", func(t *testing.T) { + for i := 0; i < numTestRuns; i++ { + budget := int64(10240) + memoryUsage := int64(0) + rows := make([]sqlbase.EncDatumRow, 0, numRows) + sortedRows := indexedRows{rows: make([]IndexedRow, 0, numRows)} + types := sqlbase.RandSortingColumnTypes(rng, numCols) + for memoryUsage < 2*budget { + row := sqlbase.RandEncDatumRowOfTypes(rng, types) + memoryUsage += int64(row.Size()) + rows = append(rows, row) + sortedRows.rows = append(sortedRows.rows, IndexedRow{Idx: len(sortedRows.rows), Row: row}) + } + + memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(budget)) + defer memoryMonitor.Stop(ctx) + diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + defer diskMonitor.Stop(ctx) + + sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering} + sort.Sort(&sorter) + if sorter.err != nil { + t.Fatal(sorter.err) + } + + func() { + rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + defer rc.Close(ctx) + if err := rc.spillToDisk(ctx); err != nil { + t.Fatal(err) + } + for _, row := range rows { + if err := rc.AddRow(ctx, row); err != nil { + t.Fatal(err) + } + } + if !rc.Spilled() { + t.Fatal("unexpectedly using memory") + } + rc.Sort(ctx) + + // Check that GetRow returns the row we expect at each position. + for i := 0; i < len(rows); i++ { + readRow, err := rc.GetRow(ctx, i) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + expectedRow := sortedRows.rows[i] + readOrderingDatum, err := readRow.GetDatum(ordering[0].ColIdx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if readOrderingDatum.Compare(&evalCtx, expectedRow.Row[ordering[0].ColIdx].Datum) != 0 { + // We're skipping comparison if both rows are equal on the ordering + // column since in this case the order of indexed rows after + // sorting is nondeterministic. + if readRow.GetIdx() != expectedRow.GetIdx() { + t.Fatalf("read row has different idx that what we expect") + } + for col, expectedDatum := range expectedRow.Row { + readDatum, err := readRow.GetDatum(col) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cmp := readDatum.Compare(&evalCtx, expectedDatum.Datum); cmp != 0 { + t.Fatalf("read row is not equal to expected one") + } + } + } + } + }() + } + }) + // ReorderingInMemory initializes a DiskBackedIndexedRowContainer with one // ordering, adds all rows to it, sorts it and makes sure that the rows are // sorted as expected. Then, it reorders the container to a different @@ -566,6 +648,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { // Only in-memory containers should be used. t.Run("ReorderingInMemory", func(t *testing.T) { for i := 0; i < numTestRuns; i++ { + rows := make([]sqlbase.EncDatumRow, numRows) types := sqlbase.RandSortingColumnTypes(rng, numCols) for i := 0; i < numRows; i++ { rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types) @@ -606,6 +689,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { // container is forced to spill to disk right after initialization. t.Run("ReorderingOnDisk", func(t *testing.T) { for i := 0; i < numTestRuns; i++ { + rows := make([]sqlbase.EncDatumRow, numRows) types := sqlbase.RandSortingColumnTypes(rng, numCols) for i := 0; i < numRows; i++ { rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types) @@ -659,19 +743,19 @@ func (ir indexedRows) Len() int { // TODO(yuzefovich): this is a duplicate of partitionSorter from windower.go. // There are possibly couple of other duplicates as well in other files, so we // should refactor it and probably extract the code into a new package. -type sorter struct { +type rowsSorter struct { evalCtx *tree.EvalContext rows indexedRows ordering sqlbase.ColumnOrdering err error } -func (n *sorter) Len() int { return n.rows.Len() } +func (n *rowsSorter) Len() int { return n.rows.Len() } -func (n *sorter) Swap(i, j int) { +func (n *rowsSorter) Swap(i, j int) { n.rows.rows[i], n.rows.rows[j] = n.rows.rows[j], n.rows.rows[i] } -func (n *sorter) Less(i, j int) bool { +func (n *rowsSorter) Less(i, j int) bool { if n.err != nil { // An error occurred in previous calls to Less(). We want to be done with // sorting and to propagate that error to the caller of Sort(). @@ -685,13 +769,7 @@ func (n *sorter) Less(i, j int) bool { return cmp < 0 } -// sorter implements the tree.PeerGroupChecker interface. -func (n *sorter) InSameGroup(i, j int) (bool, error) { - cmp, err := n.Compare(i, j) - return cmp == 0, err -} - -func (n *sorter) Compare(i, j int) (int, error) { +func (n *rowsSorter) Compare(i, j int) (int, error) { ra, rb := n.rows.rows[i], n.rows.rows[j] for _, o := range n.ordering { da, err := ra.GetDatum(o.ColIdx)