Skip to content

Commit

Permalink
sql: reuse already allocated memory for the cache in a row container
Browse files Browse the repository at this point in the history
Previously, we would always allocate new memory for every row that
we put in the cache of DiskBackedIndexedRowContainer and simply
discard the memory underlying the row that we remove from the cache.
Now, we're reusing that memory.

Release note: None
  • Loading branch information
yuzefovich committed Feb 11, 2019
1 parent 40e4035 commit a132513
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 35 deletions.
104 changes: 82 additions & 22 deletions pkg/sql/rowcontainer/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,13 @@ type DiskBackedIndexedRowContainer struct {
// [firstCachedRowPos, nextPosToCache).
firstCachedRowPos int
nextPosToCache int
indexedRowsCache ring.Buffer // the cache of up to maxIndexedRowsCacheSize contiguous rows
cacheMemAcc mon.BoundAccount
// indexedRowsCache is the cache of up to maxCacheSize contiguous rows.
indexedRowsCache ring.Buffer
// maxCacheSize indicates the maximum number of rows to be cached. It is
// initialized to maxIndexedRowsCacheSize and dynamically adjusted if OOM
// error is encountered.
maxCacheSize int
cacheMemAcc mon.BoundAccount
}

// MakeDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer
Expand Down Expand Up @@ -570,6 +575,7 @@ func MakeDiskBackedIndexedRowContainer(
d.scratchEncRow = make(sqlbase.EncDatumRow, len(d.storedTypes))
d.DiskBackedRowContainer = &DiskBackedRowContainer{}
d.DiskBackedRowContainer.Init(ordering, d.storedTypes, evalCtx, engine, memoryMonitor, diskMonitor, rowCapacity)
d.maxCacheSize = maxIndexedRowsCacheSize
d.cacheMemAcc = memoryMonitor.MakeBoundAccount()
return &d
}
Expand Down Expand Up @@ -692,28 +698,38 @@ func (f *DiskBackedIndexedRowContainer) GetRow(
}
row, rowIdx := rowWithIdx[:len(rowWithIdx)-1], rowWithIdx[len(rowWithIdx)-1].Datum
if idx, ok := rowIdx.(*tree.DInt); ok {
if f.indexedRowsCache.Len() == maxIndexedRowsCacheSize {
// The cache size is capped at maxIndexedRowsCacheSize, so we first
// remove the row with the smallest pos and advance
if f.indexedRowsCache.Len() == f.maxCacheSize {
// The cache size is capped at f.maxCacheSize, so we reuse the row
// with the smallest pos, put it as the last row, and advance
// f.firstCachedRowPos.
usage := sizeOfInt + int64(f.indexedRowsCache.GetFirst().(IndexedRow).Row.Size())
// TODO(yuzefovich): extend ring.Buffer to allow for reusing the
// allocated memory instead of allocating new one for every row.
f.indexedRowsCache.RemoveFirst()
// TODO(yuzefovich): investigate whether the pattern of growing and
// shrinking the memory account can be optimized.
f.cacheMemAcc.Shrink(ctx, usage)
f.firstCachedRowPos++
if err := f.reuseFirstRowInCache(ctx, int(*idx), row); err != nil {
return nil, err
}
} else {
// We choose to ignore minor details like IndexedRow overhead and
// the cache overhead.
usage := sizeOfInt + int64(row.Size())
if err := f.cacheMemAcc.Grow(ctx, usage); err != nil {
if sqlbase.IsOutOfMemoryError(err) {
// We hit the memory limit, so we need to cap the cache size
// and reuse the memory underlying first row in the cache.
if f.indexedRowsCache.Len() == 0 {
// The cache is empty, so there is no memory to be reused.
return nil, err
}
f.maxCacheSize = f.indexedRowsCache.Len()
if err := f.reuseFirstRowInCache(ctx, int(*idx), row); err != nil {
return nil, err
}
} else {
return nil, err
}
} else {
// We actually need to copy the row into memory.
ir := IndexedRow{int(*idx), f.rowAlloc.CopyRow(row)}
f.indexedRowsCache.AddLast(ir)
}
}
// We choose to ignore minor details like IndexedRow overhead and
// the cache overhead.
usage := sizeOfInt + int64(row.Size())
if err := f.cacheMemAcc.Grow(ctx, usage); err != nil {
return nil, err
}
// We actually need to copy the row into memory.
ir := IndexedRow{int(*idx), f.rowAlloc.CopyRow(row)}
f.indexedRowsCache.AddLast(ir)
f.nextPosToCache++
} else {
return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", idx)
Expand All @@ -733,6 +749,50 @@ func (f *DiskBackedIndexedRowContainer) GetRow(
return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", rowIdx)
}

// reuseFirstRowInCache reuses the underlying memory of the first row in the
// cache to store 'row' and puts it as the last one in the cache. It adjusts
// the memory account accordingly and, if necessary, removes some first rows.
func (f *DiskBackedIndexedRowContainer) reuseFirstRowInCache(
ctx context.Context, idx int, row sqlbase.EncDatumRow,
) error {
newRowSize := row.Size()
for {
if f.indexedRowsCache.Len() == 0 {
return errors.Errorf("unexpectedly the cache of DiskBackedIndexedRowContainer contains zero rows")
}
indexedRowToReuse := f.indexedRowsCache.GetFirst().(IndexedRow)
oldRowSize := indexedRowToReuse.Row.Size()
delta := int64(newRowSize - oldRowSize)
if delta > 0 {
// New row takes up more memory than the old one.
if err := f.cacheMemAcc.Grow(ctx, delta); err != nil {
if sqlbase.IsOutOfMemoryError(err) {
// We need to actually reduce the cache size, so we remove the first
// row and adjust the memory account, maxCacheSize, and
// f.firstCachedRowPos accordingly.
f.indexedRowsCache.RemoveFirst()
f.cacheMemAcc.Shrink(ctx, int64(oldRowSize))
f.maxCacheSize--
f.firstCachedRowPos++
if f.indexedRowsCache.Len() == 0 {
return err
}
continue
}
return err
}
} else if delta < 0 {
f.cacheMemAcc.Shrink(ctx, -delta)
}
indexedRowToReuse.Idx = idx
copy(indexedRowToReuse.Row, row)
f.indexedRowsCache.RemoveFirst()
f.indexedRowsCache.AddLast(indexedRowToReuse)
f.firstCachedRowPos++
return nil
}
}

// getRowWithoutCache returns the row at requested position without using the
// cache. It utilizes the same disk row iterator along multiple consequent
// calls and rewinds the iterator only when it has been advanced further than
Expand Down
104 changes: 91 additions & 13 deletions pkg/sql/rowcontainer/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func TestDiskBackedRowContainer(t *testing.T) {
memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1))
defer memoryMonitor.Stop(ctx)
diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1))
defer diskMonitor.Stop(ctx)

defer func() {
if err := rc.UnsafeReset(ctx); err != nil {
Expand Down Expand Up @@ -411,7 +412,6 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
const numTestRuns = 10
const numRows = 10
const numCols = 2
rows := make([]sqlbase.EncDatumRow, numRows)
ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}
newOrdering := sqlbase.ColumnOrdering{{ColIdx: 1, Direction: encoding.Ascending}}

Expand All @@ -427,6 +427,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
// index).
t.Run("SpillingHalfway", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
rows := make([]sqlbase.EncDatumRow, numRows)
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for i := 0; i < numRows; i++ {
rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types)
Expand Down Expand Up @@ -482,14 +483,15 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
// to be returned. Then, it spills to disk and does the same check again.
t.Run("TestGetRow", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
rows := make([]sqlbase.EncDatumRow, numRows)
sortedRows := indexedRows{rows: make([]IndexedRow, numRows)}
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for i := 0; i < numRows; i++ {
rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types)
sortedRows.rows[i] = IndexedRow{Idx: i, Row: rows[i]}
}

sorter := sorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering}
sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering}
sort.Sort(&sorter)
if sorter.err != nil {
t.Fatal(sorter.err)
Expand Down Expand Up @@ -559,13 +561,94 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
}
})

// TestGetRowFromDiskWithLimitedMemory forces the container to spill to disk,
// adds all rows to it, sorts them, and checks that both the index and the
// row are what we expect by GetRow() to be returned. The goal is to test the
// behavior of capping the cache size and reusing the memory of the first
// rows in the cache, so we use the memory budget that accommodates only
// about half of all rows in the cache.
t.Run("TestGetRowWithLimitedMemory", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
budget := int64(10240)
memoryUsage := int64(0)
rows := make([]sqlbase.EncDatumRow, 0, numRows)
sortedRows := indexedRows{rows: make([]IndexedRow, 0, numRows)}
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for memoryUsage < 2*budget {
row := sqlbase.RandEncDatumRowOfTypes(rng, types)
memoryUsage += int64(row.Size())
rows = append(rows, row)
sortedRows.rows = append(sortedRows.rows, IndexedRow{Idx: len(sortedRows.rows), Row: row})
}

memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(budget))
defer memoryMonitor.Stop(ctx)
diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
defer diskMonitor.Stop(ctx)

sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering}
sort.Sort(&sorter)
if sorter.err != nil {
t.Fatal(sorter.err)
}

func() {
rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
if err := rc.spillToDisk(ctx); err != nil {
t.Fatal(err)
}
for _, row := range rows {
if err := rc.AddRow(ctx, row); err != nil {
t.Fatal(err)
}
}
if !rc.Spilled() {
t.Fatal("unexpectedly using memory")
}
rc.Sort(ctx)

// Check that GetRow returns the row we expect at each position.
for i := 0; i < len(rows); i++ {
readRow, err := rc.GetRow(ctx, i)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expectedRow := sortedRows.rows[i]
readOrderingDatum, err := readRow.GetDatum(ordering[0].ColIdx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if readOrderingDatum.Compare(&evalCtx, expectedRow.Row[ordering[0].ColIdx].Datum) != 0 {
// We're skipping comparison if both rows are equal on the ordering
// column since in this case the order of indexed rows after
// sorting is nondeterministic.
if readRow.GetIdx() != expectedRow.GetIdx() {
t.Fatalf("read row has different idx that what we expect")
}
for col, expectedDatum := range expectedRow.Row {
readDatum, err := readRow.GetDatum(col)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if cmp := readDatum.Compare(&evalCtx, expectedDatum.Datum); cmp != 0 {
t.Fatalf("read row is not equal to expected one")
}
}
}
}
}()
}
})

// ReorderingInMemory initializes a DiskBackedIndexedRowContainer with one
// ordering, adds all rows to it, sorts it and makes sure that the rows are
// sorted as expected. Then, it reorders the container to a different
// ordering, sorts it and verifies that the rows are in the order we expect.
// Only in-memory containers should be used.
t.Run("ReorderingInMemory", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
rows := make([]sqlbase.EncDatumRow, numRows)
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for i := 0; i < numRows; i++ {
rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types)
Expand Down Expand Up @@ -606,6 +689,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
// container is forced to spill to disk right after initialization.
t.Run("ReorderingOnDisk", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
rows := make([]sqlbase.EncDatumRow, numRows)
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for i := 0; i < numRows; i++ {
rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types)
Expand Down Expand Up @@ -659,19 +743,19 @@ func (ir indexedRows) Len() int {
// TODO(yuzefovich): this is a duplicate of partitionSorter from windower.go.
// There are possibly couple of other duplicates as well in other files, so we
// should refactor it and probably extract the code into a new package.
type sorter struct {
type rowsSorter struct {
evalCtx *tree.EvalContext
rows indexedRows
ordering sqlbase.ColumnOrdering
err error
}

func (n *sorter) Len() int { return n.rows.Len() }
func (n *rowsSorter) Len() int { return n.rows.Len() }

func (n *sorter) Swap(i, j int) {
func (n *rowsSorter) Swap(i, j int) {
n.rows.rows[i], n.rows.rows[j] = n.rows.rows[j], n.rows.rows[i]
}
func (n *sorter) Less(i, j int) bool {
func (n *rowsSorter) Less(i, j int) bool {
if n.err != nil {
// An error occurred in previous calls to Less(). We want to be done with
// sorting and to propagate that error to the caller of Sort().
Expand All @@ -685,13 +769,7 @@ func (n *sorter) Less(i, j int) bool {
return cmp < 0
}

// sorter implements the tree.PeerGroupChecker interface.
func (n *sorter) InSameGroup(i, j int) (bool, error) {
cmp, err := n.Compare(i, j)
return cmp == 0, err
}

func (n *sorter) Compare(i, j int) (int, error) {
func (n *rowsSorter) Compare(i, j int) (int, error) {
ra, rb := n.rows.rows[i], n.rows.rows[j]
for _, o := range n.ordering {
da, err := ra.GetDatum(o.ColIdx)
Expand Down

0 comments on commit a132513

Please sign in to comment.