diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 1b8156409f8d..1015e2e4759a 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -296,7 +297,7 @@ CREATE TABLE crdb_internal.tables ( parent_schema_id INT NOT NULL, locality TEXT )`, - generator: func(ctx context.Context, p *planner, dbDesc *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbDesc *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { row := make(tree.Datums, 14) worker := func(pusher rowPusher) error { descs, err := p.Descriptors().GetAllDescriptors(ctx, p.txn) @@ -406,8 +407,7 @@ CREATE TABLE crdb_internal.tables ( } return nil } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -616,7 +616,7 @@ CREATE TABLE crdb_internal.jobs ( coordinator_id INT )`, comment: `decoded job metadata from system.jobs (KV scan)`, - generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { currentUser := p.SessionData().User() isAdmin, err := p.HasAdminRole(ctx) if err != nil { @@ -2152,7 +2152,7 @@ CREATE TABLE crdb_internal.table_columns ( hidden BOOL NOT NULL ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { row := make(tree.Datums, 8) worker := func(pusher rowPusher) error { return forEachTableDescAll(ctx, p, dbContext, hideVirtual, @@ -2188,8 +2188,7 @@ CREATE TABLE crdb_internal.table_columns ( }, ) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -2209,7 +2208,7 @@ CREATE TABLE crdb_internal.table_indexes ( is_inverted BOOL NOT NULL ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { primary := tree.NewDString("primary") secondary := tree.NewDString("secondary") row := make(tree.Datums, 7) @@ -2242,8 +2241,7 @@ CREATE TABLE crdb_internal.table_indexes ( }, ) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -2670,7 +2668,7 @@ CREATE TABLE crdb_internal.ranges_no_leases ( split_enforced_until TIMESTAMP ) `, - generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { if err := p.RequireAdminRole(ctx, "read crdb_internal.ranges_no_leases"); err != nil { return nil, nil, err } @@ -3613,7 +3611,7 @@ CREATE TABLE crdb_internal.partitions ( subzone_id INT -- references a subzone id in the crdb_internal.zones table ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { dbName := "" if dbContext != nil { dbName = dbContext.GetName() @@ -3629,8 +3627,7 @@ CREATE TABLE crdb_internal.partitions ( }) }) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } diff --git a/pkg/sql/exec_factory_util.go b/pkg/sql/exec_factory_util.go index 89653f62fc26..7be30f4c606d 100644 --- a/pkg/sql/exec_factory_util.go +++ b/pkg/sql/exec_factory_util.go @@ -225,7 +225,7 @@ func constructVirtualScan( indexDesc := index.(*optVirtualIndex).desc columns, constructor := virtual.getPlanInfo( table.(*optVirtualTable).desc, - indexDesc, params.IndexConstraint) + indexDesc, params.IndexConstraint, p.execCfg.DistSQLPlanner.stopper) n, err := delayedNodeCallback(&delayedNode{ name: fmt.Sprintf("%s@%s", table.Name(), index.Name()), diff --git a/pkg/sql/virtual_schema.go b/pkg/sql/virtual_schema.go index 0a2fa7bb8071..2c9f36437e93 100644 --- a/pkg/sql/virtual_schema.go +++ b/pkg/sql/virtual_schema.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -109,7 +110,7 @@ type virtualSchemaTable struct { // generator, if non-nil, is a function that is used when creating a // virtualTableNode. This function returns a virtualTableGenerator function // which generates the next row of the virtual table when called. - generator func(ctx context.Context, p *planner, db *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) + generator func(ctx context.Context, p *planner, db *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) } // virtualSchemaView represents a view within a virtualSchema @@ -448,6 +449,7 @@ func (e *virtualDefEntry) getPlanInfo( table catalog.TableDescriptor, index *descpb.IndexDescriptor, idxConstraint *constraint.Constraint, + stopper *stop.Stopper, ) (colinfo.ResultColumns, virtualTableConstructor) { var columns colinfo.ResultColumns for _, col := range e.desc.PublicColumns() { @@ -483,7 +485,7 @@ func (e *virtualDefEntry) getPlanInfo( } if def.generator != nil { - next, cleanup, err := def.generator(ctx, p, dbDesc) + next, cleanup, err := def.generator(ctx, p, dbDesc, stopper) if err != nil { return nil, err } @@ -492,14 +494,17 @@ func (e *virtualDefEntry) getPlanInfo( constrainedScan := idxConstraint != nil && !idxConstraint.IsUnconstrained() if !constrainedScan { - generator, cleanup := setupGenerator(ctx, func(pusher rowPusher) error { + generator, cleanup, setupError := setupGenerator(ctx, func(pusher rowPusher) error { return def.populate(ctx, p, dbDesc, func(row ...tree.Datum) error { if err := e.validateRow(row, columns); err != nil { return err } return pusher.pushRow(row...) }) - }) + }, stopper) + if setupError != nil { + return nil, setupError + } return p.newVirtualTableNode(columns, generator, cleanup), nil } @@ -514,8 +519,11 @@ func (e *virtualDefEntry) getPlanInfo( columnIdxMap := catalog.ColumnIDToOrdinalMap(table.PublicColumns()) indexKeyDatums := make([]tree.Datum, len(index.ColumnIDs)) - generator, cleanup := setupGenerator(ctx, e.makeConstrainedRowsGenerator( - ctx, p, dbDesc, index, indexKeyDatums, columnIdxMap, idxConstraint, columns)) + generator, cleanup, setupError := setupGenerator(ctx, e.makeConstrainedRowsGenerator( + ctx, p, dbDesc, index, indexKeyDatums, columnIdxMap, idxConstraint, columns), stopper) + if setupError != nil { + return nil, setupError + } return p.newVirtualTableNode(columns, generator, cleanup), nil default: diff --git a/pkg/sql/virtual_table.go b/pkg/sql/virtual_table.go index 34247a8ed39f..77e597c66966 100644 --- a/pkg/sql/virtual_table.go +++ b/pkg/sql/virtual_table.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -67,8 +68,8 @@ type virtualTableGeneratorResponse struct { // * cleanup: Performs all cleanup. This function must be called exactly once // to ensure that resources are cleaned up. func setupGenerator( - ctx context.Context, worker func(pusher rowPusher) error, -) (next virtualTableGenerator, cleanup cleanupFunc) { + ctx context.Context, worker func(pusher rowPusher) error, stopper *stop.Stopper, +) (next virtualTableGenerator, cleanup cleanupFunc, setupError error) { var cancel func() ctx, cancel = context.WithCancel(ctx) var wg sync.WaitGroup @@ -82,14 +83,12 @@ func setupGenerator( // computation through comm, and the generator places rows to consume // back into comm. comm := make(chan virtualTableGeneratorResponse) - addRow := func(datums ...tree.Datum) error { select { case <-ctx.Done(): return cancelchecker.QueryCanceledError case comm <- virtualTableGeneratorResponse{datums: datums}: } - // Block until the next call to cleanup() or next(). This allows us to // avoid issues with concurrent transaction usage if the worker is using // a transaction. Otherwise, worker could proceed running operations after @@ -107,7 +106,7 @@ func setupGenerator( } wg.Add(1) - go func() { + if setupError = stopper.RunAsyncTask(ctx, "sql.rowPusher: send rows", func(ctx context.Context) { defer wg.Done() // We wait until a call to next before starting the worker. This prevents // concurrent transaction usage during the startup phase. We also have to @@ -125,14 +124,19 @@ func setupGenerator( if errors.Is(err, cancelchecker.QueryCanceledError) { return } - // Notify that we are done sending rows. select { case <-ctx.Done(): return case comm <- virtualTableGeneratorResponse{err: err}: } - }() + }); setupError != nil { + // The presence of an error means the goroutine never started, + // thus wg.Done() is never called, which can result in + // cleanup() being blocked indefinitely on wg.Wait(). We call + // wg.Done() manually here to account for this case. + wg.Done() + } next = func() (tree.Datums, error) { // Notify the worker to begin computing a row. @@ -141,7 +145,6 @@ func setupGenerator( case <-ctx.Done(): return nil, cancelchecker.QueryCanceledError } - // Wait for the row to be sent. select { case <-ctx.Done(): @@ -150,7 +153,7 @@ func setupGenerator( return resp.datums, resp.err } } - return next, cleanup + return next, cleanup, setupError } // virtualTableNode is a planNode that constructs its rows by repeatedly diff --git a/pkg/sql/virtual_table_test.go b/pkg/sql/virtual_table_test.go index 4a08042eb08f..3b30dccd1534 100644 --- a/pkg/sql/virtual_table_test.go +++ b/pkg/sql/virtual_table_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -25,8 +26,10 @@ import ( func TestVirtualTableGenerators(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) t.Run("test cleanup", func(t *testing.T) { - ctx := context.Background() worker := func(pusher rowPusher) error { if err := pusher.pushRow(tree.NewDInt(1)); err != nil { return err @@ -36,8 +39,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return nil } - - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) d, err := next() if err != nil { t.Fatal(err) @@ -50,7 +53,6 @@ func TestVirtualTableGenerators(t *testing.T) { t.Run("test worker error", func(t *testing.T) { // Test that if the worker returns an error we catch it. - ctx := context.Background() worker := func(pusher rowPusher) error { if err := pusher.pushRow(tree.NewDInt(1)); err != nil { return err @@ -60,7 +62,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return errors.New("dummy error") } - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) _, err := next() require.NoError(t, err) _, err = next() @@ -71,12 +74,12 @@ func TestVirtualTableGenerators(t *testing.T) { }) t.Run("test no next", func(t *testing.T) { - ctx := context.Background() // Test we don't leak anything if we call cleanup before next. worker := func(pusher rowPusher) error { return nil } - _, cleanup := setupGenerator(ctx, worker) + _, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) cleanup() }) @@ -92,7 +95,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return nil } - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) cancel() _, err := next() // There is a small chance that we race and don't return @@ -105,7 +109,8 @@ func TestVirtualTableGenerators(t *testing.T) { // Test cancellation after asking for a row. ctx, cancel = context.WithCancel(context.Background()) - next, cleanup = setupGenerator(ctx, worker) + next, cleanup, setupError = setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) row, err := next() require.NoError(t, err) require.Equal(t, tree.Datums{tree.NewDInt(1)}, row) @@ -116,7 +121,8 @@ func TestVirtualTableGenerators(t *testing.T) { // Test cancellation after asking for all the rows. ctx, cancel = context.WithCancel(context.Background()) - next, cleanup = setupGenerator(ctx, worker) + next, cleanup, setupError = setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) _, err = next() require.NoError(t, err) _, err = next() @@ -129,7 +135,9 @@ func TestVirtualTableGenerators(t *testing.T) { func BenchmarkVirtualTableGenerators(b *testing.B) { defer leaktest.AfterTest(b)() defer log.Scope(b).Close(b) + stopper := stop.NewStopper() ctx := context.Background() + defer stopper.Stop(ctx) worker := func(pusher rowPusher) error { for { if err := pusher.pushRow(tree.NewDInt(tree.DInt(1))); err != nil { @@ -138,7 +146,8 @@ func BenchmarkVirtualTableGenerators(b *testing.B) { } } b.Run("bench read", func(b *testing.B) { - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(b, setupError) b.ResetTimer() for i := 0; i < b.N; i++ { _, err := next()