Skip to content

Commit

Permalink
Merge #61366
Browse files Browse the repository at this point in the history
61366: backfill: move indexbackfiller Fetcher to operate on a per batch basis r=adityamaru a=adityamaru

Previously, we would init and close the fetcher at the beginning and end
of the index backfill distsql flow. Prior to the refactor in #55468 this
worked because we would teardown the flow after backfilling for a
threshold number of seconds.

Since #55468 adds a streaming based approach where we use a single flow
to build all the indexes, we started seeing OOM's because of accumulated
objects in the `pebbleResults` across all the ScanRequests. I have not
been able to pinpoint the exact reference we were holding that was not
allowing these results from being gc'ed but more information about the
profiles can be seen over in #58933.

Release justification: fixes for high-priority or high-severity bugs in
existing functionality (fixing an OOM in an index backfill)

Release note: None

Resolves: #58933

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Mar 6, 2021
2 parents 2b11985 + e4f9742 commit 0ba3da4
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 66 deletions.
30 changes: 14 additions & 16 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
Expand Down Expand Up @@ -70,24 +71,21 @@ const (
// over many ranges.
indexTxnBackfillChunkSize = 100

// indexBackfillBatchSize is the maximum number of index entries we attempt to
// fill in a single index batch before queueing it up for ingestion and
// progress reporting in the index backfiller processor.
//
// TODO(adityamaru): This should live with the index backfiller processor
// logic once the column backfiller is reworked. The only reason this variable
// is initialized here is to maintain a single testing knob
// `BackfillChunkSize` to control both the index and column backfill chunking
// behavior, and minimize test complexity. Should this be a cluster setting? I
// would hope we can do a dynamic memory based adjustment of this number in
// the processor.
indexBackfillBatchSize = 5000

// checkpointInterval is the interval after which a checkpoint of the
// schema change is posted.
checkpointInterval = 2 * time.Minute
)

// indexBackfillBatchSize is the maximum number of rows we construct index
// entries for before we attempt to fill in a single index batch before queueing
// it up for ingestion and progress reporting in the index backfiller processor.
var indexBackfillBatchSize = settings.RegisterIntSetting(
"bulkio.index_backfill.batch_size",
"the number of rows for which we construct index entries in a single batch",
50000,
settings.NonNegativeInt, /* validateFn */
)

var _ sort.Interface = columnsByID{}
var _ sort.Interface = indexesByID{}

Expand Down Expand Up @@ -949,7 +947,6 @@ func (sc *SchemaChanger) distIndexBackfill(
targetSpans []roachpb.Span,
addedIndexes []descpb.IndexID,
filter backfill.MutationFilter,
indexBackfillBatchSize int64,
) error {
readAsOf := sc.clock.Now()

Expand Down Expand Up @@ -1025,7 +1022,8 @@ func (sc *SchemaChanger) distIndexBackfill(
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory)
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
chunkSize := sc.getChunkSize(indexBackfillBatchSize)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
chunkSize := sc.getChunkSize(indexBatchSize)
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), readAsOf, chunkSize, addedIndexes)
if err != nil {
return err
Expand Down Expand Up @@ -1778,7 +1776,7 @@ func (sc *SchemaChanger) backfillIndexes(
}

if err := sc.distIndexBackfill(
ctx, version, addingSpans, addedIndexes, backfill.IndexMutationFilter, indexBackfillBatchSize,
ctx, version, addingSpans, addedIndexes, backfill.IndexMutationFilter,
); err != nil {
return err
}
Expand Down
86 changes: 44 additions & 42 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,9 @@ func IndexMutationFilter(m descpb.DescriptorMutation) bool {
return m.GetIndex() != nil && m.Direction == descpb.DescriptorMutation_ADD
}

// backfiller is common to a ColumnBackfiller or an IndexBackfiller.
type backfiller struct {
fetcher row.Fetcher
alloc rowenc.DatumAlloc
}

// ColumnBackfiller is capable of running a column backfill for all
// updateCols.
type ColumnBackfiller struct {
backfiller

added []descpb.ColumnDescriptor
dropped []descpb.ColumnDescriptor

Expand All @@ -71,6 +63,9 @@ type ColumnBackfiller struct {
updateExprs []tree.TypedExpr
evalCtx *tree.EvalContext

fetcher row.Fetcher
alloc rowenc.DatumAlloc

// mon is a memory monitor linked with the ColumnBackfiller on creation.
mon *mon.BytesMonitor
}
Expand Down Expand Up @@ -409,8 +404,6 @@ type muBoundAccount struct {

// IndexBackfiller is capable of backfilling all the added index.
type IndexBackfiller struct {
backfiller

added []*descpb.IndexDescriptor
// colIdxMap maps ColumnIDs to indices into desc.Columns and desc.Mutations.
colIdxMap catalog.TableColMap
Expand Down Expand Up @@ -444,6 +437,10 @@ type IndexBackfiller struct {
// backfilled.
indexesToEncode []*descpb.IndexDescriptor

valNeededForCol util.FastIntSet

alloc rowenc.DatumAlloc

// mon is a memory monitor linked with the IndexBackfiller on creation.
mon *mon.BytesMonitor
muBoundAccount muBoundAccount
Expand Down Expand Up @@ -473,7 +470,7 @@ func (ib *IndexBackfiller) InitForLocalUse(
ib.initCols(desc)

// Initialize ib.added.
valNeededForCol := ib.initIndexes(desc)
ib.valNeededForCol = ib.initIndexes(desc)

predicates, colExprs, referencedColumns, err := constructExprs(
ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, semaCtx,
Expand All @@ -485,10 +482,10 @@ func (ib *IndexBackfiller) InitForLocalUse(
// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
referencedColumns.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
ib.valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon)
return ib.init(evalCtx, predicates, colExprs, mon)
}

// constructExprs is a helper to construct the index and column expressions
Expand Down Expand Up @@ -594,7 +591,7 @@ func (ib *IndexBackfiller) InitForDistributedUse(
ib.initCols(desc)

// Initialize ib.added.
valNeededForCol := ib.initIndexes(desc)
ib.valNeededForCol = ib.initIndexes(desc)

evalCtx := flowCtx.NewEvalCtx()
var predicates map[descpb.IndexID]tree.TypedExpr
Expand Down Expand Up @@ -630,15 +627,14 @@ func (ib *IndexBackfiller) InitForDistributedUse(
// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
referencedColumns.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
ib.valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon)
return ib.init(evalCtx, predicates, colExprs, mon)
}

// Close releases the resources used by the IndexBackfiller.
func (ib *IndexBackfiller) Close(ctx context.Context) {
ib.fetcher.Close(ctx)
if ib.mon != nil {
ib.muBoundAccount.Lock()
ib.muBoundAccount.boundAccount.Close(ctx)
Expand Down Expand Up @@ -728,8 +724,6 @@ func (ib *IndexBackfiller) init(
evalCtx *tree.EvalContext,
predicateExprs map[descpb.IndexID]tree.TypedExpr,
colExprs map[descpb.ColumnID]tree.TypedExpr,
valNeededForCol util.FastIntSet,
desc catalog.TableDescriptor,
mon *mon.BytesMonitor,
) error {
ib.evalCtx = evalCtx
Expand All @@ -750,32 +744,13 @@ func (ib *IndexBackfiller) init(
ib.types[i] = ib.cols[i].Type
}

tableArgs := row.FetcherTableArgs{
Desc: desc,
Index: desc.GetPrimaryIndex().IndexDesc(),
ColIdxMap: ib.colIdxMap,
Cols: ib.cols,
ValNeededForCol: valNeededForCol,
}

// Create a bound account associated with the index backfiller monitor.
if mon == nil {
return errors.AssertionFailedf("no memory monitor linked to IndexBackfiller during init")
}
ib.mon = mon
ib.muBoundAccount.boundAccount = mon.MakeBoundAccount()

return ib.fetcher.Init(
evalCtx.Context,
evalCtx.Codec,
false, /* reverse */
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
false, /* isCheck */
&ib.alloc,
mon,
tableArgs,
)
return nil
}

// BuildIndexEntriesChunk reads a chunk of rows from a table using the span sp
Expand Down Expand Up @@ -816,7 +791,29 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
// during the scan. Index entries in the new index are being
// populated and deleted by the OLTP commands but not otherwise
// read or used
if err := ib.fetcher.StartScan(
tableArgs := row.FetcherTableArgs{
Desc: tableDesc,
Index: tableDesc.GetPrimaryIndex().IndexDesc(),
ColIdxMap: ib.colIdxMap,
Cols: ib.cols,
ValNeededForCol: ib.valNeededForCol,
}
var fetcher row.Fetcher
if err := fetcher.Init(
ib.evalCtx.Context,
ib.evalCtx.Codec,
false, /* reverse */
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
false, /* isCheck */
&ib.alloc,
ib.mon,
tableArgs,
); err != nil {
return nil, nil, 0, err
}
defer fetcher.Close(ctx)
if err := fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, true /* limitBatches */, initBufferSize,
traceKV, false, /* forceProductionKVBatchSize */
); err != nil {
Expand Down Expand Up @@ -860,7 +857,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
return nil
}
for i := int64(0); i < chunkSize; i++ {
encRow, _, _, err := ib.fetcher.NextRow(ctx)
encRow, _, _, err := fetcher.NextRow(ctx)
if err != nil {
return nil, nil, 0, err
}
Expand Down Expand Up @@ -962,7 +959,12 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
ib.ShrinkBoundAccount(ctx, shrinkSize)
memUsedPerChunk -= shrinkSize

return entries, ib.fetcher.Key(), memUsedPerChunk, nil
var resumeKey roachpb.Key
if fetcher.Key() != nil {
resumeKey = make(roachpb.Key, len(fetcher.Key()))
copy(resumeKey, fetcher.Key())
}
return entries, resumeKey, memUsedPerChunk, nil
}

// RunIndexBackfillChunk runs an index backfill over a chunk of the table
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (ib *IndexBackfillPlanner) plan(
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
spec, err := initIndexBackfillerSpec(
*td.TableDesc(), readAsOf, indexBackfillBatchSize, indexesToBackfill)
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), readAsOf, chunkSize, indexesToBackfill)
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/indexbackfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,8 @@ INSERT INTO foo VALUES (1), (10), (100);
tableID, 1, execCfg.NodeID.SQLInstanceID(), s0.DB(), lm, jr, &execCfg, settings)
changer.SetJob(j)
spans := []roachpb.Span{table.IndexSpan(keys.SystemSQLCodec, test.indexToBackfill)}
require.NoError(t, changer.TestingDistIndexBackfill(
ctx, table.GetVersion(), spans, []descpb.IndexID{test.indexToBackfill}, backfill.IndexMutationFilter, 10,
))
require.NoError(t, changer.TestingDistIndexBackfill(ctx, table.GetVersion(), spans,
[]descpb.IndexID{test.indexToBackfill}, backfill.IndexMutationFilter))

// Make the mutation complete, then read the index and validate that it
// has the expected contents.
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/schema_changer_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ func (sc *SchemaChanger) TestingDistIndexBackfill(
targetSpans []roachpb.Span,
addedIndexes []descpb.IndexID,
filter backfill.MutationFilter,
indexBackfillBatchSize int64,
) error {
return sc.distIndexBackfill(
ctx, version, targetSpans, addedIndexes, filter, indexBackfillBatchSize)
return sc.distIndexBackfill(ctx, version, targetSpans, addedIndexes, filter)
}

// SetJob sets the job.
Expand Down

0 comments on commit 0ba3da4

Please sign in to comment.