Skip to content

Commit

Permalink
sql: use stopper for virtual table row pushing go routine
Browse files Browse the repository at this point in the history
Previously, the row pushing go routine in virtual table
would not go through the stopper. Now it uses the stopper
to run an async task, allowing the stopper to help with
error handling.

Release justification: bug fix and low-risk update

Release note: None
  • Loading branch information
barryhe2000 committed Mar 5, 2021
1 parent 9ccda03 commit 0d894be
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 41 deletions.
25 changes: 11 additions & 14 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -296,7 +297,7 @@ CREATE TABLE crdb_internal.tables (
parent_schema_id INT NOT NULL,
locality TEXT
)`,
generator: func(ctx context.Context, p *planner, dbDesc *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) {
generator: func(ctx context.Context, p *planner, dbDesc *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) {
row := make(tree.Datums, 14)
worker := func(pusher rowPusher) error {
descs, err := p.Descriptors().GetAllDescriptors(ctx, p.txn)
Expand Down Expand Up @@ -406,8 +407,7 @@ CREATE TABLE crdb_internal.tables (
}
return nil
}
next, cleanup := setupGenerator(ctx, worker)
return next, cleanup, nil
return setupGenerator(ctx, worker, stopper)
},
}

Expand Down Expand Up @@ -616,7 +616,7 @@ CREATE TABLE crdb_internal.jobs (
coordinator_id INT
)`,
comment: `decoded job metadata from system.jobs (KV scan)`,
generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) {
generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) {
currentUser := p.SessionData().User()
isAdmin, err := p.HasAdminRole(ctx)
if err != nil {
Expand Down Expand Up @@ -2152,7 +2152,7 @@ CREATE TABLE crdb_internal.table_columns (
hidden BOOL NOT NULL
)
`,
generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) {
generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) {
row := make(tree.Datums, 8)
worker := func(pusher rowPusher) error {
return forEachTableDescAll(ctx, p, dbContext, hideVirtual,
Expand Down Expand Up @@ -2188,8 +2188,7 @@ CREATE TABLE crdb_internal.table_columns (
},
)
}
next, cleanup := setupGenerator(ctx, worker)
return next, cleanup, nil
return setupGenerator(ctx, worker, stopper)
},
}

Expand All @@ -2209,7 +2208,7 @@ CREATE TABLE crdb_internal.table_indexes (
is_inverted BOOL NOT NULL
)
`,
generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) {
generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) {
primary := tree.NewDString("primary")
secondary := tree.NewDString("secondary")
row := make(tree.Datums, 7)
Expand Down Expand Up @@ -2242,8 +2241,7 @@ CREATE TABLE crdb_internal.table_indexes (
},
)
}
next, cleanup := setupGenerator(ctx, worker)
return next, cleanup, nil
return setupGenerator(ctx, worker, stopper)
},
}

Expand Down Expand Up @@ -2670,7 +2668,7 @@ CREATE TABLE crdb_internal.ranges_no_leases (
split_enforced_until TIMESTAMP
)
`,
generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) {
generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) {
if err := p.RequireAdminRole(ctx, "read crdb_internal.ranges_no_leases"); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -3613,7 +3611,7 @@ CREATE TABLE crdb_internal.partitions (
subzone_id INT -- references a subzone id in the crdb_internal.zones table
)
`,
generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) {
generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) {
dbName := ""
if dbContext != nil {
dbName = dbContext.GetName()
Expand All @@ -3629,8 +3627,7 @@ CREATE TABLE crdb_internal.partitions (
})
})
}
next, cleanup := setupGenerator(ctx, worker)
return next, cleanup, nil
return setupGenerator(ctx, worker, stopper)
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func constructVirtualScan(
indexDesc := index.(*optVirtualIndex).desc
columns, constructor := virtual.getPlanInfo(
table.(*optVirtualTable).desc,
indexDesc, params.IndexConstraint)
indexDesc, params.IndexConstraint, p.execCfg.DistSQLPlanner.stopper)

n, err := delayedNodeCallback(&delayedNode{
name: fmt.Sprintf("%s@%s", table.Name(), index.Name()),
Expand Down
20 changes: 14 additions & 6 deletions pkg/sql/virtual_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -109,7 +110,7 @@ type virtualSchemaTable struct {
// generator, if non-nil, is a function that is used when creating a
// virtualTableNode. This function returns a virtualTableGenerator function
// which generates the next row of the virtual table when called.
generator func(ctx context.Context, p *planner, db *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error)
generator func(ctx context.Context, p *planner, db *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error)
}

// virtualSchemaView represents a view within a virtualSchema
Expand Down Expand Up @@ -448,6 +449,7 @@ func (e *virtualDefEntry) getPlanInfo(
table catalog.TableDescriptor,
index *descpb.IndexDescriptor,
idxConstraint *constraint.Constraint,
stopper *stop.Stopper,
) (colinfo.ResultColumns, virtualTableConstructor) {
var columns colinfo.ResultColumns
for _, col := range e.desc.PublicColumns() {
Expand Down Expand Up @@ -483,7 +485,7 @@ func (e *virtualDefEntry) getPlanInfo(
}

if def.generator != nil {
next, cleanup, err := def.generator(ctx, p, dbDesc)
next, cleanup, err := def.generator(ctx, p, dbDesc, stopper)
if err != nil {
return nil, err
}
Expand All @@ -492,14 +494,17 @@ func (e *virtualDefEntry) getPlanInfo(

constrainedScan := idxConstraint != nil && !idxConstraint.IsUnconstrained()
if !constrainedScan {
generator, cleanup := setupGenerator(ctx, func(pusher rowPusher) error {
generator, cleanup, setupError := setupGenerator(ctx, func(pusher rowPusher) error {
return def.populate(ctx, p, dbDesc, func(row ...tree.Datum) error {
if err := e.validateRow(row, columns); err != nil {
return err
}
return pusher.pushRow(row...)
})
})
}, stopper)
if setupError != nil {
return nil, setupError
}
return p.newVirtualTableNode(columns, generator, cleanup), nil
}

Expand All @@ -514,8 +519,11 @@ func (e *virtualDefEntry) getPlanInfo(
columnIdxMap := catalog.ColumnIDToOrdinalMap(table.PublicColumns())
indexKeyDatums := make([]tree.Datum, len(index.ColumnIDs))

generator, cleanup := setupGenerator(ctx, e.makeConstrainedRowsGenerator(
ctx, p, dbDesc, index, indexKeyDatums, columnIdxMap, idxConstraint, columns))
generator, cleanup, setupError := setupGenerator(ctx, e.makeConstrainedRowsGenerator(
ctx, p, dbDesc, index, indexKeyDatums, columnIdxMap, idxConstraint, columns), stopper)
if setupError != nil {
return nil, setupError
}
return p.newVirtualTableNode(columns, generator, cleanup), nil

default:
Expand Down
21 changes: 12 additions & 9 deletions pkg/sql/virtual_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -67,8 +68,8 @@ type virtualTableGeneratorResponse struct {
// * cleanup: Performs all cleanup. This function must be called exactly once
// to ensure that resources are cleaned up.
func setupGenerator(
ctx context.Context, worker func(pusher rowPusher) error,
) (next virtualTableGenerator, cleanup cleanupFunc) {
ctx context.Context, worker func(pusher rowPusher) error, stopper *stop.Stopper,
) (next virtualTableGenerator, cleanup cleanupFunc, setupError error) {
var cancel func()
ctx, cancel = context.WithCancel(ctx)
var wg sync.WaitGroup
Expand All @@ -82,14 +83,12 @@ func setupGenerator(
// computation through comm, and the generator places rows to consume
// back into comm.
comm := make(chan virtualTableGeneratorResponse)

addRow := func(datums ...tree.Datum) error {
select {
case <-ctx.Done():
return cancelchecker.QueryCanceledError
case comm <- virtualTableGeneratorResponse{datums: datums}:
}

// Block until the next call to cleanup() or next(). This allows us to
// avoid issues with concurrent transaction usage if the worker is using
// a transaction. Otherwise, worker could proceed running operations after
Expand All @@ -107,7 +106,7 @@ func setupGenerator(
}

wg.Add(1)
go func() {
if setupError = stopper.RunAsyncTask(ctx, "sql.rowPusher: send rows", func(ctx context.Context) {
defer wg.Done()
// We wait until a call to next before starting the worker. This prevents
// concurrent transaction usage during the startup phase. We also have to
Expand All @@ -125,14 +124,19 @@ func setupGenerator(
if errors.Is(err, cancelchecker.QueryCanceledError) {
return
}

// Notify that we are done sending rows.
select {
case <-ctx.Done():
return
case comm <- virtualTableGeneratorResponse{err: err}:
}
}()
}); setupError != nil {
// The presence of an error means the goroutine never started,
// thus wg.Done() is never called, which can result in
// cleanup() being blocked indefinitely on wg.Wait(). We call
// wg.Done() manually here to account for this case.
wg.Done()
}

next = func() (tree.Datums, error) {
// Notify the worker to begin computing a row.
Expand All @@ -141,7 +145,6 @@ func setupGenerator(
case <-ctx.Done():
return nil, cancelchecker.QueryCanceledError
}

// Wait for the row to be sent.
select {
case <-ctx.Done():
Expand All @@ -150,7 +153,7 @@ func setupGenerator(
return resp.datums, resp.err
}
}
return next, cleanup
return next, cleanup, setupError
}

// virtualTableNode is a planNode that constructs its rows by repeatedly
Expand Down
31 changes: 20 additions & 11 deletions pkg/sql/virtual_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestVirtualTableGenerators(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)
t.Run("test cleanup", func(t *testing.T) {
ctx := context.Background()
worker := func(pusher rowPusher) error {
if err := pusher.pushRow(tree.NewDInt(1)); err != nil {
return err
Expand All @@ -36,8 +39,8 @@ func TestVirtualTableGenerators(t *testing.T) {
}
return nil
}

next, cleanup := setupGenerator(ctx, worker)
next, cleanup, setupError := setupGenerator(ctx, worker, stopper)
require.NoError(t, setupError)
d, err := next()
if err != nil {
t.Fatal(err)
Expand All @@ -50,7 +53,6 @@ func TestVirtualTableGenerators(t *testing.T) {

t.Run("test worker error", func(t *testing.T) {
// Test that if the worker returns an error we catch it.
ctx := context.Background()
worker := func(pusher rowPusher) error {
if err := pusher.pushRow(tree.NewDInt(1)); err != nil {
return err
Expand All @@ -60,7 +62,8 @@ func TestVirtualTableGenerators(t *testing.T) {
}
return errors.New("dummy error")
}
next, cleanup := setupGenerator(ctx, worker)
next, cleanup, setupError := setupGenerator(ctx, worker, stopper)
require.NoError(t, setupError)
_, err := next()
require.NoError(t, err)
_, err = next()
Expand All @@ -71,12 +74,12 @@ func TestVirtualTableGenerators(t *testing.T) {
})

t.Run("test no next", func(t *testing.T) {
ctx := context.Background()
// Test we don't leak anything if we call cleanup before next.
worker := func(pusher rowPusher) error {
return nil
}
_, cleanup := setupGenerator(ctx, worker)
_, cleanup, setupError := setupGenerator(ctx, worker, stopper)
require.NoError(t, setupError)
cleanup()
})

Expand All @@ -92,7 +95,8 @@ func TestVirtualTableGenerators(t *testing.T) {
}
return nil
}
next, cleanup := setupGenerator(ctx, worker)
next, cleanup, setupError := setupGenerator(ctx, worker, stopper)
require.NoError(t, setupError)
cancel()
_, err := next()
// There is a small chance that we race and don't return
Expand All @@ -105,7 +109,8 @@ func TestVirtualTableGenerators(t *testing.T) {

// Test cancellation after asking for a row.
ctx, cancel = context.WithCancel(context.Background())
next, cleanup = setupGenerator(ctx, worker)
next, cleanup, setupError = setupGenerator(ctx, worker, stopper)
require.NoError(t, setupError)
row, err := next()
require.NoError(t, err)
require.Equal(t, tree.Datums{tree.NewDInt(1)}, row)
Expand All @@ -116,7 +121,8 @@ func TestVirtualTableGenerators(t *testing.T) {

// Test cancellation after asking for all the rows.
ctx, cancel = context.WithCancel(context.Background())
next, cleanup = setupGenerator(ctx, worker)
next, cleanup, setupError = setupGenerator(ctx, worker, stopper)
require.NoError(t, setupError)
_, err = next()
require.NoError(t, err)
_, err = next()
Expand All @@ -129,7 +135,9 @@ func TestVirtualTableGenerators(t *testing.T) {
func BenchmarkVirtualTableGenerators(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)
worker := func(pusher rowPusher) error {
for {
if err := pusher.pushRow(tree.NewDInt(tree.DInt(1))); err != nil {
Expand All @@ -138,7 +146,8 @@ func BenchmarkVirtualTableGenerators(b *testing.B) {
}
}
b.Run("bench read", func(b *testing.B) {
next, cleanup := setupGenerator(ctx, worker)
next, cleanup, setupError := setupGenerator(ctx, worker, stopper)
require.NoError(b, setupError)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := next()
Expand Down

0 comments on commit 0d894be

Please sign in to comment.