Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: fix flakes inside TestRandomSyntaxFunctions #108995

Merged
merged 4 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,11 @@ func (p *planner) RevalidateUniqueConstraintsInCurrentDB(ctx context.Context) er
return err
}
return inDB.ForEachDescriptor(func(desc catalog.Descriptor) error {
// If the context is cancelled, then we should bail out, since
// the actual revalidate operation might not check anything.
if err := ctx.Err(); err != nil {
return err
}
tableDesc, err := catalog.AsTableDescriptor(desc)
if err != nil {
return err
Expand Down
44 changes: 8 additions & 36 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func StubTableStats(
}

// createStatsNode is a planNode implemented in terms of a function. The
// startJob function starts a Job during Start, and the remainder of the
// runJob function starts a Job during Start, and the remainder of the
// CREATE STATISTICS planning and execution is performed within the jobs
// framework.
type createStatsNode struct {
Expand All @@ -97,49 +97,23 @@ type createStatsNode struct {
// If it is false, the flow for create statistics is planned directly; this
// is used when the statement is under EXPLAIN or EXPLAIN ANALYZE.
runAsJob bool

run createStatsRun
}

// createStatsRun contains the run-time state of createStatsNode during local
// execution.
type createStatsRun struct {
resultsCh chan tree.Datums
errCh chan error
}

func (n *createStatsNode) startExec(params runParams) error {
telemetry.Inc(sqltelemetry.SchemaChangeCreateCounter("stats"))
n.run.resultsCh = make(chan tree.Datums)
n.run.errCh = make(chan error)
go func() {
err := n.startJob(params.ctx, n.run.resultsCh)
select {
case <-params.ctx.Done():
case n.run.errCh <- err:
}
close(n.run.errCh)
close(n.run.resultsCh)
}()
return nil
return n.runJob(params.ctx)
}

func (n *createStatsNode) Next(params runParams) (bool, error) {
select {
case <-params.ctx.Done():
return false, params.ctx.Err()
case err := <-n.run.errCh:
return false, err
case <-n.run.resultsCh:
return true, nil
}
return false, nil
}

func (*createStatsNode) Close(context.Context) {}
func (*createStatsNode) Values() tree.Datums { return nil }

// startJob starts a CreateStats job to plan and execute statistics creation.
func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Datums) error {
// runJob starts a CreateStats job synchronously to plan and execute
// statistics creation and then waits for the job to complete.
func (n *createStatsNode) runJob(ctx context.Context) error {
record, err := n.makeJobRecord(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -172,8 +146,7 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
if err := job.Start(ctx); err != nil {
return err
}

if err := job.AwaitCompletion(ctx); err != nil {
if err = job.AwaitCompletion(ctx); err != nil {
if errors.Is(err, stats.ConcurrentCreateStatsError) {
// Delete the job so users don't see it and get confused by the error.
const stmt = `DELETE FROM system.jobs WHERE id = $1`
Expand All @@ -183,9 +156,8 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
log.Warningf(ctx, "failed to delete job: %v", delErr)
}
}
return err
}
return nil
return err
}

// makeJobRecord creates a CreateStats job record which can be used to plan and
Expand Down
52 changes: 52 additions & 0 deletions pkg/sql/stats/create_stats_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestCreateStatsControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB
Expand Down Expand Up @@ -122,6 +125,55 @@ func TestCreateStatsControlJob(t *testing.T) {
})
}

func TestCreateStatisticsCanBeCancelled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t, "test can be slow to quiesce because of filter")

var allowRequest chan struct{}

var serverArgs base.TestServerArgs
filter, setTableID := createStatsRequestFilter(&allowRequest)
serverArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
serverArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: filter,
}

ctx := context.Background()
tc, conn, _ := serverutils.StartServer(t, serverArgs)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(conn)

sqlDB.Exec(t, `CREATE DATABASE d`)
sqlDB.Exec(t, `CREATE TABLE d.t (x INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO d.t SELECT generate_series(1,1000)`)
var tID descpb.ID
sqlDB.QueryRow(t, `SELECT 'd.t'::regclass::int`).Scan(&tID)
setTableID(tID)

// Run CREATE STATISTICS and wait for to create the job.
allowRequest = make(chan struct{})
errCh := make(chan error)
go func() {
_, err := conn.Exec(`CREATE STATISTICS s1 FROM d.t`)
errCh <- err
}()
allowRequest <- struct{}{}
testutils.SucceedsSoon(t, func() error {
row := conn.QueryRow("SELECT query_id FROM [SHOW CLUSTER STATEMENTS] WHERE query LIKE 'CREATE STATISTICS%';")
var queryID string
if err := row.Scan(&queryID); err != nil {
return err
}
_, err := conn.Exec("CANCEL QUERIES VALUES ((SELECT query_id FROM [SHOW CLUSTER STATEMENTS] WHERE query LIKE 'CREATE STATISTICS%'));")
return err
})
err := <-errCh
allowRequest <- struct{}{}

require.ErrorContains(t, err, "pq: query execution canceled")
}

func TestAtMostOneRunningCreateStats(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/tests/rsg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (db *verifyFormatDB) execWithResettableTimeout(
}()
retry := true
targetDuration := duration
cancellationChannel := ctx.Done()
for retry {
retry = false
err := func() error {
Expand Down Expand Up @@ -202,7 +203,7 @@ func (db *verifyFormatDB) execWithResettableTimeout(
return &nonCrasher{sql: sql, err: err}
}
return nil
case <-ctx.Done():
case <-cancellationChannel:
// Sanity: The context is cancelled when the test is about to
// timeout. We will log whatever statement we're waiting on for
// debugging purposes. Sometimes queries won't respect
Expand All @@ -211,6 +212,7 @@ func (db *verifyFormatDB) execWithResettableTimeout(
// We will intentionally retry, which will us to wait for the
// go routine to complete above to avoid leaking it.
retry = true
cancellationChannel = nil
return nil
case <-time.After(targetDuration):
db.mu.Lock()
Expand Down Expand Up @@ -358,6 +360,14 @@ func TestRandomSyntaxFunctions(t *testing.T) {
switch lower {
case "pg_sleep":
continue
case "crdb_internal.create_sql_schema_telemetry_job":
// We can create a crazy number of telemtry jobs accidentally,
// within the test. Leading to terrible contention.
continue
case "crdb_internal.gen_rand_ident":
// Generates random identifiers, so a large number are dangerous and
// can take a long time.
continue
case "st_frechetdistance", "st_buffer":
// Some spatial function are slow and testing them here
// is not worth it.
Expand Down Expand Up @@ -433,6 +443,7 @@ func TestRandomSyntaxFunctions(t *testing.T) {
// involve schema changes like truncates. In general this should make
// this test more resilient as the timeouts are reset as long progress
// is made on *some* connection.
t.Logf("Running %q", s)
return db.execWithResettableTimeout(t, ctx, s, *flagRSGExecTimeout, *flagRSGGoRoutines)
})
}
Expand Down