Skip to content

Commit

Permalink
jobs: add job to populate system activity tables
Browse files Browse the repository at this point in the history
The system statistics table grow too large for the
ui to quickly return results. The new activity
tables job does the aggregation and only records the top
500 of each of the 6 columns. This means for a given
hour there is a limit of 3000 rows. This allows
the ui to return results fast and reliably.

If the job detects there are less than 3k
rows it will just copy all the rows to
the activity tables.

Epic: none
closes: #98882

Release note (sql change): Adds a new sql activity
updater job. The job updates the system
transaction_activity and statement_activity
tables based on the statistics tables.
  • Loading branch information
j82w committed Apr 20, 2023
1 parent 3042c4f commit 53afe66
Show file tree
Hide file tree
Showing 24 changed files with 2,024 additions and 53 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if ena
sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job tenant-ro
sql.show_ranges_deprecated_behavior.enabled boolean true if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES. tenant-rw
sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators tenant-rw
sql.stats.activity.persisted_rows.max integer 200000 maximum number of rows of statement and transaction activity that will be persisted in the system tables tenant-rw
sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode tenant-rw
sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh tenant-rw
sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@
<tr><td><div id="setting-sql-schema-telemetry-recurrence" class="anchored"><code>sql.schema.telemetry.recurrence</code></div></td><td>string</td><td><code>@weekly</code></td><td>cron-tab recurrence for SQL schema telemetry job</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-sql-show-ranges-deprecated-behavior-enabled" class="anchored"><code>sql.show_ranges_deprecated_behavior.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-spatial-experimental-box2d-comparison-operators-enabled" class="anchored"><code>sql.spatial.experimental_box2d_comparison_operators.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>enables the use of certain experimental box2d comparison operators</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-activity-persisted-rows-max" class="anchored"><code>sql.stats.activity.persisted_rows.max</code></div></td><td>integer</td><td><code>200000</code></td><td>maximum number of rows of statement and transaction activity that will be persisted in the system tables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-automatic-collection-enabled" class="anchored"><code>sql.stats.automatic_collection.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>automatic statistics collection mode</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-automatic-collection-fraction-stale-rows" class="anchored"><code>sql.stats.automatic_collection.fraction_stale_rows</code></div></td><td>float</td><td><code>0.2</code></td><td>target fraction of stale rows per table that will trigger a statistics refresh</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-automatic-collection-min-stale-rows" class="anchored"><code>sql.stats.automatic_collection.min_stale_rows</code></div></td><td>integer</td><td><code>500</code></td><td>target minimum number of stale rows per table that will trigger a statistics refresh</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 58 descriptors and 57 namespace entries...
ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none
Examining 18 jobs...
Examining 20 jobs...
ERROR: validation failed
9 changes: 9 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,11 @@ const (
// was introduced.
V23_1_TenantIDSequence

// V23_1CreateSystemActivityUpdateJob is the version at which Cockroach adds a
// job that periodically updates the statement_activity and transaction_activity.
// tables.
V23_1CreateSystemActivityUpdateJob

// **********************************************************
// ** If we haven't yet selected a final 23.1 RC candidate **
// Step 1a: Add new versions for release-23.1 branch above here.
Expand Down Expand Up @@ -912,6 +917,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_TenantIDSequence,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 100},
},
{
Key: V23_1CreateSystemActivityUpdateJob,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 102},
},

// **********************************************************
// ** If we haven't yet selected a final 23.1 RC candidate **
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
SkipAutoConfigRunnerJobBootstrap: true,
SkipUpdateSQLActivityJobBootstrap: true,
}
args.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true}

Expand Down
11 changes: 10 additions & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,12 @@ message AutoConfigTaskDetails {
message AutoConfigTaskProgress {
}

message AutoUpdateSQLActivityDetails {
}

message AutoUpdateSQLActivityProgress {
}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -1257,6 +1263,7 @@ message Payload {
AutoConfigRunnerDetails auto_config_runner = 41;
AutoConfigEnvRunnerDetails auto_config_env_runner = 42;
AutoConfigTaskDetails auto_config_task = 43;
AutoUpdateSQLActivityDetails auto_update_sql_activities = 44;
}
reserved 26;
// PauseReason is used to describe the reason that the job is currently paused
Expand Down Expand Up @@ -1284,7 +1291,7 @@ message Payload {
// specifies how old such record could get before this job is canceled.
int64 maximum_pts_age = 40 [(gogoproto.casttype) = "time.Duration", (gogoproto.customname) = "MaximumPTSAge"];

// NEXT ID: 44
// NEXT ID: 45
}

message Progress {
Expand Down Expand Up @@ -1329,6 +1336,7 @@ message Progress {
AutoConfigRunnerProgress auto_config_runner = 29;
AutoConfigEnvRunnerProgress auto_config_env_runner = 30;
AutoConfigTaskProgress auto_config_task = 31;
AutoUpdateSQLActivityProgress update_sql_activity = 32;
}

uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
Expand Down Expand Up @@ -1363,6 +1371,7 @@ enum Type {
AUTO_CONFIG_RUNNER = 20 [(gogoproto.enumvalue_customname) = "TypeAutoConfigRunner"];
AUTO_CONFIG_ENV_RUNNER = 21 [(gogoproto.enumvalue_customname) = "TypeAutoConfigEnvRunner"];
AUTO_CONFIG_TASK = 22 [(gogoproto.enumvalue_customname) = "TypeAutoConfigTask"];
AUTO_UPDATE_SQL_ACTIVITY = 23 [(gogoproto.enumvalue_customname) = "TypeAutoUpdateSQLActivity"];
}

message Job {
Expand Down
16 changes: 15 additions & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
_ Details = AutoConfigRunnerDetails{}
_ Details = AutoConfigEnvRunnerDetails{}
_ Details = AutoConfigTaskDetails{}
_ Details = AutoUpdateSQLActivityDetails{}
)

// ProgressDetails is a marker interface for job progress details proto structs.
Expand All @@ -74,6 +75,7 @@ var (
_ ProgressDetails = AutoConfigRunnerProgress{}
_ ProgressDetails = AutoConfigEnvRunnerProgress{}
_ ProgressDetails = AutoConfigTaskProgress{}
_ ProgressDetails = AutoUpdateSQLActivityProgress{}
)

// Type returns the payload's job type and panics if the type is invalid.
Expand Down Expand Up @@ -156,6 +158,7 @@ var AutomaticJobTypes = [...]Type{
TypeAutoConfigEnvRunner,
TypeAutoConfigTask,
TypeKeyVisualizer,
TypeAutoUpdateSQLActivity,
}

// DetailsType returns the type for a payload detail.
Expand Down Expand Up @@ -207,6 +210,8 @@ func DetailsType(d isPayload_Details) (Type, error) {
return TypeAutoConfigEnvRunner, nil
case *Payload_AutoConfigTask:
return TypeAutoConfigTask, nil
case *Payload_AutoUpdateSqlActivities:
return TypeAutoUpdateSQLActivity, nil
default:
return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d)
}
Expand Down Expand Up @@ -250,6 +255,7 @@ var JobDetailsForEveryJobType = map[Type]Details{
TypeAutoConfigRunner: AutoConfigRunnerDetails{},
TypeAutoConfigEnvRunner: AutoConfigEnvRunnerDetails{},
TypeAutoConfigTask: AutoConfigTaskDetails{},
TypeAutoUpdateSQLActivity: AutoUpdateSQLActivityDetails{},
}

// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
Expand Down Expand Up @@ -303,6 +309,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_AutoConfigEnvRunner{AutoConfigEnvRunner: &d}
case AutoConfigTaskProgress:
return &Progress_AutoConfigTask{AutoConfigTask: &d}
case AutoUpdateSQLActivityProgress:
return &Progress_UpdateSqlActivity{UpdateSqlActivity: &d}
default:
panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d))
}
Expand Down Expand Up @@ -354,6 +362,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.AutoConfigEnvRunner
case *Payload_AutoConfigTask:
return *d.AutoConfigTask
case *Payload_AutoUpdateSqlActivities:
return *d.AutoUpdateSqlActivities
default:
return nil
}
Expand Down Expand Up @@ -405,6 +415,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.AutoConfigEnvRunner
case *Progress_AutoConfigTask:
return *d.AutoConfigTask
case *Progress_UpdateSqlActivity:
return *d.UpdateSqlActivity
default:
return nil
}
Expand Down Expand Up @@ -480,6 +492,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_AutoConfigEnvRunner{AutoConfigEnvRunner: &d}
case AutoConfigTaskDetails:
return &Payload_AutoConfigTask{AutoConfigTask: &d}
case AutoUpdateSQLActivityDetails:
return &Payload_AutoUpdateSqlActivities{AutoUpdateSqlActivities: &d}
default:
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -515,7 +529,7 @@ const (
func (Type) SafeValue() {}

// NumJobTypes is the number of jobs types.
const NumJobTypes = 23
const NumJobTypes = 24

// ChangefeedDetailsMarshaler allows for dependency injection of
// cloud.SanitizeExternalStorageURI to avoid the dependency from this
Expand Down
41 changes: 41 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ const (

// AutoConfigRunnerJobID A static job ID is used for the auto config runner job.
AutoConfigRunnerJobID = jobspb.JobID(102)

// SqlActivityUpdaterJobID A static job ID is used for the SQL activity tables.
SqlActivityUpdaterJobID = jobspb.JobID(103)
)

// MakeJobID generates a new job ID.
Expand Down Expand Up @@ -692,6 +695,44 @@ func (r *Registry) CreateJobWithTxn(
return j, nil
}

// CreateIfNotExistAdoptableJobWithTxn checks if a job already exists in
// the system.jobs table, and if it does not it will create the job. The job
// will be adopted for execution at a later time by some node in the cluster.
func (r *Registry) CreateIfNotExistAdoptableJobWithTxn(
ctx context.Context, record Record, txn isql.Txn,
) error {
if record.JobID == 0 {
return fmt.Errorf("invalid record.JobID value: %d", record.JobID)
}

if txn == nil {
return fmt.Errorf("txn is required for job: %d", record.JobID)
}

// Make sure job with id doesn't already exist in system.jobs.
// Use a txn to avoid race conditions
row, err := txn.QueryRowEx(
ctx,
"check if job exists",
txn.KV(),
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
"SELECT id FROM system.jobs WHERE id = $1",
record.JobID,
)
if err != nil {
return err
}

// If there isn't a row for the job, create the job.
if row == nil {
if _, err = r.CreateAdoptableJobWithTxn(ctx, record, record.JobID, txn); err != nil {
return err
}
}

return nil
}

// CreateAdoptableJobWithTxn creates a job which will be adopted for execution
// at a later time by some node in the cluster.
func (r *Registry) CreateAdoptableJobWithTxn(
Expand Down
44 changes: 40 additions & 4 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,22 +236,23 @@ INSERT INTO t."%s" VALUES('a', 'foo');
newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute),
StatusCanceled, mutOptions)

sqlActivityJob := fmt.Sprintf("%d", SqlActivityUpdaterJobID)
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{sqlActivityJob}, {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob},
{newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})
{sqlActivityJob}, {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob},
{newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})
{sqlActivityJob}, {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})

// Delete the revert failed, and running jobs for the next run of the
// test.
Expand Down Expand Up @@ -280,6 +281,7 @@ func TestRegistryGCPagination(t *testing.T) {
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
SkipAutoConfigRunnerJobBootstrap: true,
SkipUpdateSQLActivityJobBootstrap: true,
},
KeyVisualizer: &keyvisualizer.TestingKnobs{
SkipJobBootstrap: true,
Expand Down Expand Up @@ -447,6 +449,39 @@ func TestCreateJobWritesToJobInfo(t *testing.T) {
}))
runTests(t, createdJob)
})

t.Run("CreateIfNotExistAdoptableJobWithTxn", func(t *testing.T) {
tempRecord := Record{
JobID: r.MakeJobID(),
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.RootUserName(),
}

// loop to verify no errors if create if not exist is called multiple times
for i := 0; i < 3; i++ {
err := ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return r.CreateIfNotExistAdoptableJobWithTxn(ctx, tempRecord, txn)
})
require.NoError(t, err)
}

require.NoError(t, ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
row, err := txn.QueryRowEx(
ctx,
"check if job exists",
txn.KV(),
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
"SELECT id FROM system.jobs WHERE id = $1",
tempRecord.JobID,
)
if err != nil {
return err
}
require.NotNil(t, row)
return nil
}))
})
}

func TestBatchJobsCreation(t *testing.T) {
Expand Down Expand Up @@ -665,6 +700,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
SkipAutoConfigRunnerJobBootstrap: true,
SkipUpdateSQLActivityJobBootstrap: true,
},
KeyVisualizer: &keyvisualizer.TestingKnobs{
SkipJobBootstrap: true,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ go_library(
"sort.go",
"split.go",
"spool.go",
"sql_activity_update_job.go",
"sql_cursor.go",
"statement.go",
"subquery.go",
Expand Down Expand Up @@ -674,6 +675,7 @@ go_test(
"show_trace_replica_test.go",
"sort_test.go",
"split_test.go",
"sql_activity_update_job_test.go",
"sql_cursor_test.go",
"sql_prepare_test.go",
"statement_mark_redaction_test.go",
Expand Down Expand Up @@ -817,6 +819,7 @@ go_test(
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/sqlstats",
"//pkg/sql/sqlstats/persistedsqlstats",
"//pkg/sql/sqltestutils",
"//pkg/sql/stats",
"//pkg/sql/stmtdiagnostics",
Expand Down
Loading

0 comments on commit 53afe66

Please sign in to comment.