diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 6137503074e5..2c01f52d2f30 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" @@ -70,24 +71,21 @@ const ( // over many ranges. indexTxnBackfillChunkSize = 100 - // indexBackfillBatchSize is the maximum number of index entries we attempt to - // fill in a single index batch before queueing it up for ingestion and - // progress reporting in the index backfiller processor. - // - // TODO(adityamaru): This should live with the index backfiller processor - // logic once the column backfiller is reworked. The only reason this variable - // is initialized here is to maintain a single testing knob - // `BackfillChunkSize` to control both the index and column backfill chunking - // behavior, and minimize test complexity. Should this be a cluster setting? I - // would hope we can do a dynamic memory based adjustment of this number in - // the processor. - indexBackfillBatchSize = 5000 - // checkpointInterval is the interval after which a checkpoint of the // schema change is posted. checkpointInterval = 2 * time.Minute ) +// indexBackfillBatchSize is the maximum number of rows we construct index +// entries for before we attempt to fill in a single index batch before queueing +// it up for ingestion and progress reporting in the index backfiller processor. +var indexBackfillBatchSize = settings.RegisterIntSetting( + "bulkio.index_backfill.batch_size", + "the number of rows for which we construct index entries in a single batch", + 50000, + settings.NonNegativeInt, /* validateFn */ +) + var _ sort.Interface = columnsByID{} var _ sort.Interface = indexesByID{} @@ -949,7 +947,6 @@ func (sc *SchemaChanger) distIndexBackfill( targetSpans []roachpb.Span, addedIndexes []descpb.IndexID, filter backfill.MutationFilter, - indexBackfillBatchSize int64, ) error { readAsOf := sc.clock.Now() @@ -1025,7 +1022,8 @@ func (sc *SchemaChanger) distIndexBackfill( evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory) planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, true /* distribute */) - chunkSize := sc.getChunkSize(indexBackfillBatchSize) + indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV) + chunkSize := sc.getChunkSize(indexBatchSize) spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), readAsOf, chunkSize, addedIndexes) if err != nil { return err @@ -1778,7 +1776,7 @@ func (sc *SchemaChanger) backfillIndexes( } if err := sc.distIndexBackfill( - ctx, version, addingSpans, addedIndexes, backfill.IndexMutationFilter, indexBackfillBatchSize, + ctx, version, addingSpans, addedIndexes, backfill.IndexMutationFilter, ); err != nil { return err } diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 67b2675e9040..769177169999 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -52,17 +52,9 @@ func IndexMutationFilter(m descpb.DescriptorMutation) bool { return m.GetIndex() != nil && m.Direction == descpb.DescriptorMutation_ADD } -// backfiller is common to a ColumnBackfiller or an IndexBackfiller. -type backfiller struct { - fetcher row.Fetcher - alloc rowenc.DatumAlloc -} - // ColumnBackfiller is capable of running a column backfill for all // updateCols. type ColumnBackfiller struct { - backfiller - added []descpb.ColumnDescriptor dropped []descpb.ColumnDescriptor @@ -71,6 +63,9 @@ type ColumnBackfiller struct { updateExprs []tree.TypedExpr evalCtx *tree.EvalContext + fetcher row.Fetcher + alloc rowenc.DatumAlloc + // mon is a memory monitor linked with the ColumnBackfiller on creation. mon *mon.BytesMonitor } @@ -409,8 +404,6 @@ type muBoundAccount struct { // IndexBackfiller is capable of backfilling all the added index. type IndexBackfiller struct { - backfiller - added []*descpb.IndexDescriptor // colIdxMap maps ColumnIDs to indices into desc.Columns and desc.Mutations. colIdxMap catalog.TableColMap @@ -444,6 +437,10 @@ type IndexBackfiller struct { // backfilled. indexesToEncode []*descpb.IndexDescriptor + valNeededForCol util.FastIntSet + + alloc rowenc.DatumAlloc + // mon is a memory monitor linked with the IndexBackfiller on creation. mon *mon.BytesMonitor muBoundAccount muBoundAccount @@ -473,7 +470,7 @@ func (ib *IndexBackfiller) InitForLocalUse( ib.initCols(desc) // Initialize ib.added. - valNeededForCol := ib.initIndexes(desc) + ib.valNeededForCol = ib.initIndexes(desc) predicates, colExprs, referencedColumns, err := constructExprs( ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, semaCtx, @@ -485,10 +482,10 @@ func (ib *IndexBackfiller) InitForLocalUse( // Add the columns referenced in the predicate to valNeededForCol so that // columns necessary to evaluate the predicate expression are fetched. referencedColumns.ForEach(func(col descpb.ColumnID) { - valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) + ib.valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) }) - return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon) + return ib.init(evalCtx, predicates, colExprs, mon) } // constructExprs is a helper to construct the index and column expressions @@ -594,7 +591,7 @@ func (ib *IndexBackfiller) InitForDistributedUse( ib.initCols(desc) // Initialize ib.added. - valNeededForCol := ib.initIndexes(desc) + ib.valNeededForCol = ib.initIndexes(desc) evalCtx := flowCtx.NewEvalCtx() var predicates map[descpb.IndexID]tree.TypedExpr @@ -630,15 +627,14 @@ func (ib *IndexBackfiller) InitForDistributedUse( // Add the columns referenced in the predicate to valNeededForCol so that // columns necessary to evaluate the predicate expression are fetched. referencedColumns.ForEach(func(col descpb.ColumnID) { - valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) + ib.valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) }) - return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon) + return ib.init(evalCtx, predicates, colExprs, mon) } // Close releases the resources used by the IndexBackfiller. func (ib *IndexBackfiller) Close(ctx context.Context) { - ib.fetcher.Close(ctx) if ib.mon != nil { ib.muBoundAccount.Lock() ib.muBoundAccount.boundAccount.Close(ctx) @@ -728,8 +724,6 @@ func (ib *IndexBackfiller) init( evalCtx *tree.EvalContext, predicateExprs map[descpb.IndexID]tree.TypedExpr, colExprs map[descpb.ColumnID]tree.TypedExpr, - valNeededForCol util.FastIntSet, - desc catalog.TableDescriptor, mon *mon.BytesMonitor, ) error { ib.evalCtx = evalCtx @@ -750,32 +744,13 @@ func (ib *IndexBackfiller) init( ib.types[i] = ib.cols[i].Type } - tableArgs := row.FetcherTableArgs{ - Desc: desc, - Index: desc.GetPrimaryIndex().IndexDesc(), - ColIdxMap: ib.colIdxMap, - Cols: ib.cols, - ValNeededForCol: valNeededForCol, - } - // Create a bound account associated with the index backfiller monitor. if mon == nil { return errors.AssertionFailedf("no memory monitor linked to IndexBackfiller during init") } ib.mon = mon ib.muBoundAccount.boundAccount = mon.MakeBoundAccount() - - return ib.fetcher.Init( - evalCtx.Context, - evalCtx.Codec, - false, /* reverse */ - descpb.ScanLockingStrength_FOR_NONE, - descpb.ScanLockingWaitPolicy_BLOCK, - false, /* isCheck */ - &ib.alloc, - mon, - tableArgs, - ) + return nil } // BuildIndexEntriesChunk reads a chunk of rows from a table using the span sp @@ -816,7 +791,29 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( // during the scan. Index entries in the new index are being // populated and deleted by the OLTP commands but not otherwise // read or used - if err := ib.fetcher.StartScan( + tableArgs := row.FetcherTableArgs{ + Desc: tableDesc, + Index: tableDesc.GetPrimaryIndex().IndexDesc(), + ColIdxMap: ib.colIdxMap, + Cols: ib.cols, + ValNeededForCol: ib.valNeededForCol, + } + var fetcher row.Fetcher + if err := fetcher.Init( + ib.evalCtx.Context, + ib.evalCtx.Codec, + false, /* reverse */ + descpb.ScanLockingStrength_FOR_NONE, + descpb.ScanLockingWaitPolicy_BLOCK, + false, /* isCheck */ + &ib.alloc, + ib.mon, + tableArgs, + ); err != nil { + return nil, nil, 0, err + } + defer fetcher.Close(ctx) + if err := fetcher.StartScan( ctx, txn, []roachpb.Span{sp}, true /* limitBatches */, initBufferSize, traceKV, false, /* forceProductionKVBatchSize */ ); err != nil { @@ -860,7 +857,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( return nil } for i := int64(0); i < chunkSize; i++ { - encRow, _, _, err := ib.fetcher.NextRow(ctx) + encRow, _, _, err := fetcher.NextRow(ctx) if err != nil { return nil, nil, 0, err } @@ -962,7 +959,12 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( ib.ShrinkBoundAccount(ctx, shrinkSize) memUsedPerChunk -= shrinkSize - return entries, ib.fetcher.Key(), memUsedPerChunk, nil + var resumeKey roachpb.Key + if fetcher.Key() != nil { + resumeKey = make(roachpb.Key, len(fetcher.Key())) + copy(resumeKey, fetcher.Key()) + } + return entries, resumeKey, memUsedPerChunk, nil } // RunIndexBackfillChunk runs an index backfill over a chunk of the table diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index a87b5677fe07..860a8d3ffd19 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -130,8 +130,8 @@ func (ib *IndexBackfillPlanner) plan( true /* distribute */) // TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the // batch size. Also plumb in a testing knob. - spec, err := initIndexBackfillerSpec( - *td.TableDesc(), readAsOf, indexBackfillBatchSize, indexesToBackfill) + chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV) + spec, err := initIndexBackfillerSpec(*td.TableDesc(), readAsOf, chunkSize, indexesToBackfill) if err != nil { return err } diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index 2d16297f45f1..f2db0947cc47 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -522,9 +522,8 @@ INSERT INTO foo VALUES (1), (10), (100); tableID, 1, execCfg.NodeID.SQLInstanceID(), s0.DB(), lm, jr, &execCfg, settings) changer.SetJob(j) spans := []roachpb.Span{table.IndexSpan(keys.SystemSQLCodec, test.indexToBackfill)} - require.NoError(t, changer.TestingDistIndexBackfill( - ctx, table.GetVersion(), spans, []descpb.IndexID{test.indexToBackfill}, backfill.IndexMutationFilter, 10, - )) + require.NoError(t, changer.TestingDistIndexBackfill(ctx, table.GetVersion(), spans, + []descpb.IndexID{test.indexToBackfill}, backfill.IndexMutationFilter)) // Make the mutation complete, then read the index and validate that it // has the expected contents. diff --git a/pkg/sql/schema_changer_helpers_test.go b/pkg/sql/schema_changer_helpers_test.go index 7dde63c130f3..43af4d03de10 100644 --- a/pkg/sql/schema_changer_helpers_test.go +++ b/pkg/sql/schema_changer_helpers_test.go @@ -27,10 +27,8 @@ func (sc *SchemaChanger) TestingDistIndexBackfill( targetSpans []roachpb.Span, addedIndexes []descpb.IndexID, filter backfill.MutationFilter, - indexBackfillBatchSize int64, ) error { - return sc.distIndexBackfill( - ctx, version, targetSpans, addedIndexes, filter, indexBackfillBatchSize) + return sc.distIndexBackfill(ctx, version, targetSpans, addedIndexes, filter) } // SetJob sets the job.