From 0d894beb65749613ec8179ca3337f2bfa8a620ae Mon Sep 17 00:00:00 2001 From: Barry He Date: Fri, 5 Mar 2021 04:27:06 -0500 Subject: [PATCH] sql: use stopper for virtual table row pushing go routine Previously, the row pushing go routine in virtual table would not go through the stopper. Now it uses the stopper to run an async task, allowing the stopper to help with error handling. Release justification: bug fix and low-risk update Release note: None --- pkg/sql/crdb_internal.go | 25 +++++++++++-------------- pkg/sql/exec_factory_util.go | 2 +- pkg/sql/virtual_schema.go | 20 ++++++++++++++------ pkg/sql/virtual_table.go | 21 ++++++++++++--------- pkg/sql/virtual_table_test.go | 31 ++++++++++++++++++++----------- 5 files changed, 58 insertions(+), 41 deletions(-) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 1b8156409f8d..1015e2e4759a 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -296,7 +297,7 @@ CREATE TABLE crdb_internal.tables ( parent_schema_id INT NOT NULL, locality TEXT )`, - generator: func(ctx context.Context, p *planner, dbDesc *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbDesc *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { row := make(tree.Datums, 14) worker := func(pusher rowPusher) error { descs, err := p.Descriptors().GetAllDescriptors(ctx, p.txn) @@ -406,8 +407,7 @@ CREATE TABLE crdb_internal.tables ( } return nil } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -616,7 +616,7 @@ CREATE TABLE crdb_internal.jobs ( coordinator_id INT )`, comment: `decoded job metadata from system.jobs (KV scan)`, - generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { currentUser := p.SessionData().User() isAdmin, err := p.HasAdminRole(ctx) if err != nil { @@ -2152,7 +2152,7 @@ CREATE TABLE crdb_internal.table_columns ( hidden BOOL NOT NULL ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { row := make(tree.Datums, 8) worker := func(pusher rowPusher) error { return forEachTableDescAll(ctx, p, dbContext, hideVirtual, @@ -2188,8 +2188,7 @@ CREATE TABLE crdb_internal.table_columns ( }, ) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -2209,7 +2208,7 @@ CREATE TABLE crdb_internal.table_indexes ( is_inverted BOOL NOT NULL ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { primary := tree.NewDString("primary") secondary := tree.NewDString("secondary") row := make(tree.Datums, 7) @@ -2242,8 +2241,7 @@ CREATE TABLE crdb_internal.table_indexes ( }, ) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -2670,7 +2668,7 @@ CREATE TABLE crdb_internal.ranges_no_leases ( split_enforced_until TIMESTAMP ) `, - generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { if err := p.RequireAdminRole(ctx, "read crdb_internal.ranges_no_leases"); err != nil { return nil, nil, err } @@ -3613,7 +3611,7 @@ CREATE TABLE crdb_internal.partitions ( subzone_id INT -- references a subzone id in the crdb_internal.zones table ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { dbName := "" if dbContext != nil { dbName = dbContext.GetName() @@ -3629,8 +3627,7 @@ CREATE TABLE crdb_internal.partitions ( }) }) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } diff --git a/pkg/sql/exec_factory_util.go b/pkg/sql/exec_factory_util.go index 89653f62fc26..7be30f4c606d 100644 --- a/pkg/sql/exec_factory_util.go +++ b/pkg/sql/exec_factory_util.go @@ -225,7 +225,7 @@ func constructVirtualScan( indexDesc := index.(*optVirtualIndex).desc columns, constructor := virtual.getPlanInfo( table.(*optVirtualTable).desc, - indexDesc, params.IndexConstraint) + indexDesc, params.IndexConstraint, p.execCfg.DistSQLPlanner.stopper) n, err := delayedNodeCallback(&delayedNode{ name: fmt.Sprintf("%s@%s", table.Name(), index.Name()), diff --git a/pkg/sql/virtual_schema.go b/pkg/sql/virtual_schema.go index 0a2fa7bb8071..2c9f36437e93 100644 --- a/pkg/sql/virtual_schema.go +++ b/pkg/sql/virtual_schema.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -109,7 +110,7 @@ type virtualSchemaTable struct { // generator, if non-nil, is a function that is used when creating a // virtualTableNode. This function returns a virtualTableGenerator function // which generates the next row of the virtual table when called. - generator func(ctx context.Context, p *planner, db *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) + generator func(ctx context.Context, p *planner, db *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) } // virtualSchemaView represents a view within a virtualSchema @@ -448,6 +449,7 @@ func (e *virtualDefEntry) getPlanInfo( table catalog.TableDescriptor, index *descpb.IndexDescriptor, idxConstraint *constraint.Constraint, + stopper *stop.Stopper, ) (colinfo.ResultColumns, virtualTableConstructor) { var columns colinfo.ResultColumns for _, col := range e.desc.PublicColumns() { @@ -483,7 +485,7 @@ func (e *virtualDefEntry) getPlanInfo( } if def.generator != nil { - next, cleanup, err := def.generator(ctx, p, dbDesc) + next, cleanup, err := def.generator(ctx, p, dbDesc, stopper) if err != nil { return nil, err } @@ -492,14 +494,17 @@ func (e *virtualDefEntry) getPlanInfo( constrainedScan := idxConstraint != nil && !idxConstraint.IsUnconstrained() if !constrainedScan { - generator, cleanup := setupGenerator(ctx, func(pusher rowPusher) error { + generator, cleanup, setupError := setupGenerator(ctx, func(pusher rowPusher) error { return def.populate(ctx, p, dbDesc, func(row ...tree.Datum) error { if err := e.validateRow(row, columns); err != nil { return err } return pusher.pushRow(row...) }) - }) + }, stopper) + if setupError != nil { + return nil, setupError + } return p.newVirtualTableNode(columns, generator, cleanup), nil } @@ -514,8 +519,11 @@ func (e *virtualDefEntry) getPlanInfo( columnIdxMap := catalog.ColumnIDToOrdinalMap(table.PublicColumns()) indexKeyDatums := make([]tree.Datum, len(index.ColumnIDs)) - generator, cleanup := setupGenerator(ctx, e.makeConstrainedRowsGenerator( - ctx, p, dbDesc, index, indexKeyDatums, columnIdxMap, idxConstraint, columns)) + generator, cleanup, setupError := setupGenerator(ctx, e.makeConstrainedRowsGenerator( + ctx, p, dbDesc, index, indexKeyDatums, columnIdxMap, idxConstraint, columns), stopper) + if setupError != nil { + return nil, setupError + } return p.newVirtualTableNode(columns, generator, cleanup), nil default: diff --git a/pkg/sql/virtual_table.go b/pkg/sql/virtual_table.go index 34247a8ed39f..77e597c66966 100644 --- a/pkg/sql/virtual_table.go +++ b/pkg/sql/virtual_table.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -67,8 +68,8 @@ type virtualTableGeneratorResponse struct { // * cleanup: Performs all cleanup. This function must be called exactly once // to ensure that resources are cleaned up. func setupGenerator( - ctx context.Context, worker func(pusher rowPusher) error, -) (next virtualTableGenerator, cleanup cleanupFunc) { + ctx context.Context, worker func(pusher rowPusher) error, stopper *stop.Stopper, +) (next virtualTableGenerator, cleanup cleanupFunc, setupError error) { var cancel func() ctx, cancel = context.WithCancel(ctx) var wg sync.WaitGroup @@ -82,14 +83,12 @@ func setupGenerator( // computation through comm, and the generator places rows to consume // back into comm. comm := make(chan virtualTableGeneratorResponse) - addRow := func(datums ...tree.Datum) error { select { case <-ctx.Done(): return cancelchecker.QueryCanceledError case comm <- virtualTableGeneratorResponse{datums: datums}: } - // Block until the next call to cleanup() or next(). This allows us to // avoid issues with concurrent transaction usage if the worker is using // a transaction. Otherwise, worker could proceed running operations after @@ -107,7 +106,7 @@ func setupGenerator( } wg.Add(1) - go func() { + if setupError = stopper.RunAsyncTask(ctx, "sql.rowPusher: send rows", func(ctx context.Context) { defer wg.Done() // We wait until a call to next before starting the worker. This prevents // concurrent transaction usage during the startup phase. We also have to @@ -125,14 +124,19 @@ func setupGenerator( if errors.Is(err, cancelchecker.QueryCanceledError) { return } - // Notify that we are done sending rows. select { case <-ctx.Done(): return case comm <- virtualTableGeneratorResponse{err: err}: } - }() + }); setupError != nil { + // The presence of an error means the goroutine never started, + // thus wg.Done() is never called, which can result in + // cleanup() being blocked indefinitely on wg.Wait(). We call + // wg.Done() manually here to account for this case. + wg.Done() + } next = func() (tree.Datums, error) { // Notify the worker to begin computing a row. @@ -141,7 +145,6 @@ func setupGenerator( case <-ctx.Done(): return nil, cancelchecker.QueryCanceledError } - // Wait for the row to be sent. select { case <-ctx.Done(): @@ -150,7 +153,7 @@ func setupGenerator( return resp.datums, resp.err } } - return next, cleanup + return next, cleanup, setupError } // virtualTableNode is a planNode that constructs its rows by repeatedly diff --git a/pkg/sql/virtual_table_test.go b/pkg/sql/virtual_table_test.go index 4a08042eb08f..3b30dccd1534 100644 --- a/pkg/sql/virtual_table_test.go +++ b/pkg/sql/virtual_table_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -25,8 +26,10 @@ import ( func TestVirtualTableGenerators(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) t.Run("test cleanup", func(t *testing.T) { - ctx := context.Background() worker := func(pusher rowPusher) error { if err := pusher.pushRow(tree.NewDInt(1)); err != nil { return err @@ -36,8 +39,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return nil } - - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) d, err := next() if err != nil { t.Fatal(err) @@ -50,7 +53,6 @@ func TestVirtualTableGenerators(t *testing.T) { t.Run("test worker error", func(t *testing.T) { // Test that if the worker returns an error we catch it. - ctx := context.Background() worker := func(pusher rowPusher) error { if err := pusher.pushRow(tree.NewDInt(1)); err != nil { return err @@ -60,7 +62,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return errors.New("dummy error") } - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) _, err := next() require.NoError(t, err) _, err = next() @@ -71,12 +74,12 @@ func TestVirtualTableGenerators(t *testing.T) { }) t.Run("test no next", func(t *testing.T) { - ctx := context.Background() // Test we don't leak anything if we call cleanup before next. worker := func(pusher rowPusher) error { return nil } - _, cleanup := setupGenerator(ctx, worker) + _, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) cleanup() }) @@ -92,7 +95,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return nil } - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) cancel() _, err := next() // There is a small chance that we race and don't return @@ -105,7 +109,8 @@ func TestVirtualTableGenerators(t *testing.T) { // Test cancellation after asking for a row. ctx, cancel = context.WithCancel(context.Background()) - next, cleanup = setupGenerator(ctx, worker) + next, cleanup, setupError = setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) row, err := next() require.NoError(t, err) require.Equal(t, tree.Datums{tree.NewDInt(1)}, row) @@ -116,7 +121,8 @@ func TestVirtualTableGenerators(t *testing.T) { // Test cancellation after asking for all the rows. ctx, cancel = context.WithCancel(context.Background()) - next, cleanup = setupGenerator(ctx, worker) + next, cleanup, setupError = setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) _, err = next() require.NoError(t, err) _, err = next() @@ -129,7 +135,9 @@ func TestVirtualTableGenerators(t *testing.T) { func BenchmarkVirtualTableGenerators(b *testing.B) { defer leaktest.AfterTest(b)() defer log.Scope(b).Close(b) + stopper := stop.NewStopper() ctx := context.Background() + defer stopper.Stop(ctx) worker := func(pusher rowPusher) error { for { if err := pusher.pushRow(tree.NewDInt(tree.DInt(1))); err != nil { @@ -138,7 +146,8 @@ func BenchmarkVirtualTableGenerators(b *testing.B) { } } b.Run("bench read", func(b *testing.B) { - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(b, setupError) b.ResetTimer() for i := 0; i < b.N; i++ { _, err := next()