Skip to content

Commit

Permalink
sql: Do not check existence of stats compaction job.
Browse files Browse the repository at this point in the history
Scheduled jobs system, by default, ensures that there is only
one instance of the job that is currently executing for the schedule.
As such, it is not necessary to verify the compaction job does not exist
when starting stats compaction job from schedule.

Furthermore, due to the interaction of scheduling system, such
checks results in wide system.jobs table scan, which causes
scheduled job execution to be restarted if any other job modifies
system.jobs table.

Fixes #78465

Release Notes (sql): Stats compaction scheduled job no longer cause intent buildup.
Release Justification: important stability fix to ensure jobs and scheduled
jobs do not lock up when running stats compaction job.
  • Loading branch information
Yevgeniy Miretskiy committed Mar 28, 2022
1 parent 3cfa6e6 commit ac8fac9
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 161 deletions.
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"doc.go",
"drain.go",
"grpc_server.go",
"import_ts.go",
"index_usage_stats.go",
"init.go",
"init_handshake.go",
Expand Down Expand Up @@ -404,6 +405,7 @@ go_test(
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_golang_x_crypto//bcrypt",
],
Expand Down
11 changes: 1 addition & 10 deletions pkg/sql/compact_sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interfac
return err
}

// We check for concurrently running SQL Stats compaction jobs. We only allow
// one job to be running at the same time.
if err := persistedsqlstats.CheckExistingCompactionJob(ctx, r.job, ie, nil /* txn */); err != nil {
if errors.Is(err, persistedsqlstats.ErrConcurrentSQLStatsCompaction) {
log.Infof(ctx, "exiting due to a running sql stats compaction job")
}
return err
}

statsCompactor := persistedsqlstats.NewStatsCompactor(
r.st,
ie,
Expand Down Expand Up @@ -202,7 +193,7 @@ func (e *scheduledSQLStatsCompactionExecutor) createSQLStatsCompactionJob(
persistedsqlstats.CreateCompactionJob(ctx, &jobs.CreatedByInfo{
ID: sj.ScheduleID(),
Name: jobs.CreatedByScheduledJobs,
}, txn, cfg.InternalExecutor, p.(*planner).ExecCfg().JobRegistry)
}, txn, p.(*planner).ExecCfg().JobRegistry)

if err != nil {
return err
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,13 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb:with-mocks",
"//pkg/scheduledjobs",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats",
"//pkg/sql/sqlutil",
Expand Down
36 changes: 4 additions & 32 deletions pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,12 @@ func CreateSQLStatsCompactionScheduleIfNotYetExist(
return compactionSchedule, nil
}

// CreateCompactionJob creates a system.jobs record if there is no other
// SQL Stats compaction job running. This is invoked by the scheduled job
// Executor.
// CreateCompactionJob creates a system.jobs record.
// We do not need to worry about checking if the job already exist;
// at most 1 job semantics are enforced by scheduled jobs system.
func CreateCompactionJob(
ctx context.Context,
createdByInfo *jobs.CreatedByInfo,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
jobRegistry *jobs.Registry,
ctx context.Context, createdByInfo *jobs.CreatedByInfo, txn *kv.Txn, jobRegistry *jobs.Registry,
) (jobspb.JobID, error) {
if err := CheckExistingCompactionJob(ctx, nil /* job */, ie, txn); err != nil {
return jobspb.InvalidJobID, err
}

record := jobs.Record{
Description: "automatic SQL Stats compaction",
Username: security.NodeUserName(),
Expand All @@ -108,26 +100,6 @@ func CreateCompactionJob(
return jobID, nil
}

// CheckExistingCompactionJob checks for existing SQL Stats Compaction job
// that are either PAUSED, CANCELED, or RUNNING. If so, it returns a
// ErrConcurrentSQLStatsCompaction.
func CheckExistingCompactionJob(
ctx context.Context, job *jobs.Job, ie sqlutil.InternalExecutor, txn *kv.Txn,
) error {
jobID := jobspb.InvalidJobID
if job != nil {
jobID = job.ID()
}
exists, err := jobs.RunningJobExists(ctx, jobID, ie, txn, func(payload *jobspb.Payload) bool {
return payload.Type() == jobspb.TypeAutoSQLStatsCompaction
})

if err == nil && exists {
err = ErrConcurrentSQLStatsCompaction
}
return err
}

func checkExistingCompactionSchedule(
ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn,
) (exists bool, _ error) {
Expand Down
111 changes: 0 additions & 111 deletions pkg/sql/sqlstats/persistedsqlstats/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -202,76 +197,6 @@ func TestSQLStatsCompactor(t *testing.T) {
}
}

func TestAtMostOneSQLStatsCompactionJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var serverArgs base.TestServerArgs
var allowRequest chan struct{}

serverArgs.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{
AOSTClause: "AS OF SYSTEM TIME '-1us'",
}

params := base.TestClusterArgs{ServerArgs: serverArgs}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()

allowRequest = make(chan struct{})
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: createStatsRequestFilter(t, allowRequest),
}

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 3 /* numNodes */, params)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0 /* idx */)
sqlDB := sqlutils.MakeSQLRunner(conn)
server := tc.Server(0 /* idx */)

jobID, err := launchSQLStatsCompactionJob(server)
require.NoError(t, err)

// We wait until the job appears in the system.jobs table.
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d`, jobID),
[][]string{{"1"}},
)

allowRequest <- struct{}{}

// Launching a second job should fail here since we are still blocking the
// the first job's execution through allowRequest. (Since we need to send
// struct{}{} twice into the channel to fully unblock it.)
_, err = launchSQLStatsCompactionJob(server)
expected := persistedsqlstats.ErrConcurrentSQLStatsCompaction.Error()
if !testutils.IsError(err, expected) {
t.Fatalf("expected '%s' error, but got %+v", expected, err)
}

allowRequest <- struct{}{}
close(allowRequest)

// We wait until the first job finishes.
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID),
[][]string{{"1"}},
)

// Launching the job now should succeed.
jobID, err = launchSQLStatsCompactionJob(server)
require.NoError(t, err)

// Wait until the second job to finish for sanity check.
server.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID),
[][]string{{"1"}},
)
}

func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -310,7 +235,6 @@ func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) {
func launchSQLStatsCompactionJob(server serverutils.TestServerInterface) (jobspb.JobID, error) {
return persistedsqlstats.CreateCompactionJob(
context.Background(), nil /* createdByInfo */, nil, /* txn */
server.InternalExecutor().(sqlutil.InternalExecutor),
server.JobRegistry().(*jobs.Registry),
)
}
Expand Down Expand Up @@ -367,38 +291,3 @@ ORDER BY aggregated_ts`

return stmtFingerprints, txnFingerprints
}

func createStatsRequestFilter(
t *testing.T, allowToProgress chan struct{},
) kvserverbase.ReplicaRequestFilter {
// Start a test server here, so we can get the descriptor ID for the system
// table. This allows us to not hardcode the descriptor ID.
s, sqlConn, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer func() {
s.Stopper().Stop(context.Background())
err := sqlConn.Close()
require.NoError(t, err)
}()
sqlDB := sqlutils.MakeSQLRunner(sqlConn)

stmtStatsTableID, txnStatsTableID := getStatsTablesIDs(t, sqlDB)
return func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if req, ok := ba.GetArg(roachpb.Scan); ok {
_, tableID, _ := encoding.DecodeUvarintAscending(req.(*roachpb.ScanRequest).Key)
if descpb.ID(tableID) == stmtStatsTableID || descpb.ID(tableID) == txnStatsTableID {
<-allowToProgress
<-allowToProgress
}
}
return nil
}
}

func getStatsTablesIDs(
t *testing.T, sqlDB *sqlutils.SQLRunner,
) (stmtStatsTableID, txnStatsTableID descpb.ID) {
stmt :=
"select 'system.statement_statistics'::regclass::oid, 'system.transaction_statistics'::regclass::oid"
sqlDB.QueryRow(t, stmt).Scan(&stmtStatsTableID, &txnStatsTableID)
return stmtStatsTableID, txnStatsTableID
}
5 changes: 0 additions & 5 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

const (
Expand All @@ -42,10 +41,6 @@ const (
dummyPlanHash = int64(0)
)

// ErrConcurrentSQLStatsCompaction is reported when two sql stats compaction
// jobs are issued concurrently. This is a sentinel error.
var ErrConcurrentSQLStatsCompaction = errors.New("another sql stats compaction job is already running")

// Config is a configuration struct for the persisted SQL stats subsystem.
type Config struct {
Settings *cluster.Settings
Expand Down

0 comments on commit ac8fac9

Please sign in to comment.