Skip to content

Commit

Permalink
Merge pull request #108996 from fqazi/backport23.1.9-rc-107491-108525
Browse files Browse the repository at this point in the history
release-23.1.9-rc: fix flakes inside TestRandomSyntaxFunctions
  • Loading branch information
fqazi authored Aug 22, 2023
2 parents 23a2985 + b957951 commit a5cf688
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 37 deletions.
5 changes: 5 additions & 0 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,11 @@ func (p *planner) RevalidateUniqueConstraintsInCurrentDB(ctx context.Context) er
return err
}
return inDB.ForEachDescriptor(func(desc catalog.Descriptor) error {
// If the context is cancelled, then we should bail out, since
// the actual revalidate operation might not check anything.
if err := ctx.Err(); err != nil {
return err
}
tableDesc, err := catalog.AsTableDescriptor(desc)
if err != nil {
return err
Expand Down
44 changes: 8 additions & 36 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func StubTableStats(
}

// createStatsNode is a planNode implemented in terms of a function. The
// startJob function starts a Job during Start, and the remainder of the
// runJob function starts a Job during Start, and the remainder of the
// CREATE STATISTICS planning and execution is performed within the jobs
// framework.
type createStatsNode struct {
Expand All @@ -97,49 +97,23 @@ type createStatsNode struct {
// If it is false, the flow for create statistics is planned directly; this
// is used when the statement is under EXPLAIN or EXPLAIN ANALYZE.
runAsJob bool

run createStatsRun
}

// createStatsRun contains the run-time state of createStatsNode during local
// execution.
type createStatsRun struct {
resultsCh chan tree.Datums
errCh chan error
}

func (n *createStatsNode) startExec(params runParams) error {
telemetry.Inc(sqltelemetry.SchemaChangeCreateCounter("stats"))
n.run.resultsCh = make(chan tree.Datums)
n.run.errCh = make(chan error)
go func() {
err := n.startJob(params.ctx, n.run.resultsCh)
select {
case <-params.ctx.Done():
case n.run.errCh <- err:
}
close(n.run.errCh)
close(n.run.resultsCh)
}()
return nil
return n.runJob(params.ctx)
}

func (n *createStatsNode) Next(params runParams) (bool, error) {
select {
case <-params.ctx.Done():
return false, params.ctx.Err()
case err := <-n.run.errCh:
return false, err
case <-n.run.resultsCh:
return true, nil
}
return false, nil
}

func (*createStatsNode) Close(context.Context) {}
func (*createStatsNode) Values() tree.Datums { return nil }

// startJob starts a CreateStats job to plan and execute statistics creation.
func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Datums) error {
// runJob starts a CreateStats job synchronously to plan and execute
// statistics creation and then waits for the job to complete.
func (n *createStatsNode) runJob(ctx context.Context) error {
record, err := n.makeJobRecord(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -172,8 +146,7 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
if err := job.Start(ctx); err != nil {
return err
}

if err := job.AwaitCompletion(ctx); err != nil {
if err = job.AwaitCompletion(ctx); err != nil {
if errors.Is(err, stats.ConcurrentCreateStatsError) {
// Delete the job so users don't see it and get confused by the error.
const stmt = `DELETE FROM system.jobs WHERE id = $1`
Expand All @@ -183,9 +156,8 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
log.Warningf(ctx, "failed to delete job: %v", delErr)
}
}
return err
}
return nil
return err
}

// makeJobRecord creates a CreateStats job record which can be used to plan and
Expand Down
52 changes: 52 additions & 0 deletions pkg/sql/stats/create_stats_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestCreateStatsControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB
Expand Down Expand Up @@ -122,6 +125,55 @@ func TestCreateStatsControlJob(t *testing.T) {
})
}

func TestCreateStatisticsCanBeCancelled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t, "test can be slow to quiesce because of filter")

var allowRequest chan struct{}

var serverArgs base.TestServerArgs
filter, setTableID := createStatsRequestFilter(&allowRequest)
serverArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
serverArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: filter,
}

ctx := context.Background()
tc, conn, _ := serverutils.StartServer(t, serverArgs)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(conn)

sqlDB.Exec(t, `CREATE DATABASE d`)
sqlDB.Exec(t, `CREATE TABLE d.t (x INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO d.t SELECT generate_series(1,1000)`)
var tID descpb.ID
sqlDB.QueryRow(t, `SELECT 'd.t'::regclass::int`).Scan(&tID)
setTableID(tID)

// Run CREATE STATISTICS and wait for to create the job.
allowRequest = make(chan struct{})
errCh := make(chan error)
go func() {
_, err := conn.Exec(`CREATE STATISTICS s1 FROM d.t`)
errCh <- err
}()
allowRequest <- struct{}{}
testutils.SucceedsSoon(t, func() error {
row := conn.QueryRow("SELECT query_id FROM [SHOW CLUSTER STATEMENTS] WHERE query LIKE 'CREATE STATISTICS%';")
var queryID string
if err := row.Scan(&queryID); err != nil {
return err
}
_, err := conn.Exec("CANCEL QUERIES VALUES ((SELECT query_id FROM [SHOW CLUSTER STATEMENTS] WHERE query LIKE 'CREATE STATISTICS%'));")
return err
})
err := <-errCh
allowRequest <- struct{}{}

require.ErrorContains(t, err, "pq: query execution canceled")
}

func TestAtMostOneRunningCreateStats(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/tests/rsg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (db *verifyFormatDB) execWithResettableTimeout(
}()
retry := true
targetDuration := duration
cancellationChannel := ctx.Done()
for retry {
retry = false
err := func() error {
Expand Down Expand Up @@ -202,7 +203,7 @@ func (db *verifyFormatDB) execWithResettableTimeout(
return &nonCrasher{sql: sql, err: err}
}
return nil
case <-ctx.Done():
case <-cancellationChannel:
// Sanity: The context is cancelled when the test is about to
// timeout. We will log whatever statement we're waiting on for
// debugging purposes. Sometimes queries won't respect
Expand All @@ -211,6 +212,7 @@ func (db *verifyFormatDB) execWithResettableTimeout(
// We will intentionally retry, which will us to wait for the
// go routine to complete above to avoid leaking it.
retry = true
cancellationChannel = nil
return nil
case <-time.After(targetDuration):
db.mu.Lock()
Expand Down Expand Up @@ -358,6 +360,14 @@ func TestRandomSyntaxFunctions(t *testing.T) {
switch lower {
case "pg_sleep":
continue
case "crdb_internal.create_sql_schema_telemetry_job":
// We can create a crazy number of telemtry jobs accidentally,
// within the test. Leading to terrible contention.
continue
case "crdb_internal.gen_rand_ident":
// Generates random identifiers, so a large number are dangerous and
// can take a long time.
continue
case "st_frechetdistance", "st_buffer":
// Some spatial function are slow and testing them here
// is not worth it.
Expand Down Expand Up @@ -433,6 +443,7 @@ func TestRandomSyntaxFunctions(t *testing.T) {
// involve schema changes like truncates. In general this should make
// this test more resilient as the timeouts are reset as long progress
// is made on *some* connection.
t.Logf("Running %q", s)
return db.execWithResettableTimeout(t, ctx, s, *flagRSGExecTimeout, *flagRSGGoRoutines)
})
}
Expand Down

0 comments on commit a5cf688

Please sign in to comment.