diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 8f07ebfe715f..5e3d61fd422b 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -253,6 +253,7 @@ sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if ena
sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job tenant-ro
sql.show_ranges_deprecated_behavior.enabled boolean true if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES. tenant-rw
sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators tenant-rw
+sql.stats.activity.persisted_rows.max integer 200000 maximum number of rows of statement and transaction activity that will be persisted in the system tables tenant-rw
sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode tenant-rw
sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh tenant-rw
sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh tenant-rw
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 622d41a2bd76..cdbc7f5a46c0 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -205,6 +205,7 @@
sql.schema.telemetry.recurrence
| string | @weekly | cron-tab recurrence for SQL schema telemetry job | Serverless/Dedicated/Self-Hosted (read-only) |
sql.show_ranges_deprecated_behavior.enabled
| boolean | true | if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES. | Serverless/Dedicated/Self-Hosted |
sql.spatial.experimental_box2d_comparison_operators.enabled
| boolean | false | enables the use of certain experimental box2d comparison operators | Serverless/Dedicated/Self-Hosted |
+sql.stats.activity.persisted_rows.max
| integer | 200000 | maximum number of rows of statement and transaction activity that will be persisted in the system tables | Serverless/Dedicated/Self-Hosted |
sql.stats.automatic_collection.enabled
| boolean | true | automatic statistics collection mode | Serverless/Dedicated/Self-Hosted |
sql.stats.automatic_collection.fraction_stale_rows
| float | 0.2 | target fraction of stale rows per table that will trigger a statistics refresh | Serverless/Dedicated/Self-Hosted |
sql.stats.automatic_collection.min_stale_rows
| integer | 500 | target minimum number of stale rows per table that will trigger a statistics refresh | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/cli/testdata/doctor/test_examine_cluster b/pkg/cli/testdata/doctor/test_examine_cluster
index d1f9f1bb36e1..dbee8d9b3eac 100644
--- a/pkg/cli/testdata/doctor/test_examine_cluster
+++ b/pkg/cli/testdata/doctor/test_examine_cluster
@@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 58 descriptors and 57 namespace entries...
ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none
-Examining 18 jobs...
+Examining 20 jobs...
ERROR: validation failed
diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go
index 573048dbe5d4..09b66ee7830c 100644
--- a/pkg/clusterversion/cockroach_versions.go
+++ b/pkg/clusterversion/cockroach_versions.go
@@ -516,6 +516,11 @@ const (
// was introduced.
V23_1_TenantIDSequence
+ // V23_1CreateSystemActivityUpdateJob is the version at which Cockroach adds a
+ // job that periodically updates the statement_activity and transaction_activity.
+ // tables.
+ V23_1CreateSystemActivityUpdateJob
+
// **********************************************************
// ** If we haven't yet selected a final 23.1 RC candidate **
// Step 1a: Add new versions for release-23.1 branch above here.
@@ -912,6 +917,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_TenantIDSequence,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 100},
},
+ {
+ Key: V23_1CreateSystemActivityUpdateJob,
+ Version: roachpb.Version{Major: 22, Minor: 2, Internal: 102},
+ },
// **********************************************************
// ** If we haven't yet selected a final 23.1 RC candidate **
diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go
index 44201680c664..5f3386ecf33d 100644
--- a/pkg/jobs/jobs_test.go
+++ b/pkg/jobs/jobs_test.go
@@ -240,6 +240,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
SkipAutoConfigRunnerJobBootstrap: true,
+ SkipUpdateSQLActivityJobBootstrap: true,
}
args.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true}
diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto
index d3ae6951f08f..cae15f376987 100644
--- a/pkg/jobs/jobspb/jobs.proto
+++ b/pkg/jobs/jobspb/jobs.proto
@@ -1196,6 +1196,12 @@ message AutoConfigTaskDetails {
message AutoConfigTaskProgress {
}
+message AutoUpdateSQLActivityDetails {
+}
+
+message AutoUpdateSQLActivityProgress {
+}
+
message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
@@ -1257,6 +1263,7 @@ message Payload {
AutoConfigRunnerDetails auto_config_runner = 41;
AutoConfigEnvRunnerDetails auto_config_env_runner = 42;
AutoConfigTaskDetails auto_config_task = 43;
+ AutoUpdateSQLActivityDetails auto_update_sql_activities = 44;
}
reserved 26;
// PauseReason is used to describe the reason that the job is currently paused
@@ -1284,7 +1291,7 @@ message Payload {
// specifies how old such record could get before this job is canceled.
int64 maximum_pts_age = 40 [(gogoproto.casttype) = "time.Duration", (gogoproto.customname) = "MaximumPTSAge"];
- // NEXT ID: 44
+ // NEXT ID: 45
}
message Progress {
@@ -1329,6 +1336,7 @@ message Progress {
AutoConfigRunnerProgress auto_config_runner = 29;
AutoConfigEnvRunnerProgress auto_config_env_runner = 30;
AutoConfigTaskProgress auto_config_task = 31;
+ AutoUpdateSQLActivityProgress update_sql_activity = 32;
}
uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
@@ -1363,6 +1371,7 @@ enum Type {
AUTO_CONFIG_RUNNER = 20 [(gogoproto.enumvalue_customname) = "TypeAutoConfigRunner"];
AUTO_CONFIG_ENV_RUNNER = 21 [(gogoproto.enumvalue_customname) = "TypeAutoConfigEnvRunner"];
AUTO_CONFIG_TASK = 22 [(gogoproto.enumvalue_customname) = "TypeAutoConfigTask"];
+ AUTO_UPDATE_SQL_ACTIVITY = 23 [(gogoproto.enumvalue_customname) = "TypeAutoUpdateSQLActivity"];
}
message Job {
diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go
index 7855a6035605..bc9c4e999f30 100644
--- a/pkg/jobs/jobspb/wrap.go
+++ b/pkg/jobs/jobspb/wrap.go
@@ -51,6 +51,7 @@ var (
_ Details = AutoConfigRunnerDetails{}
_ Details = AutoConfigEnvRunnerDetails{}
_ Details = AutoConfigTaskDetails{}
+ _ Details = AutoUpdateSQLActivityDetails{}
)
// ProgressDetails is a marker interface for job progress details proto structs.
@@ -74,6 +75,7 @@ var (
_ ProgressDetails = AutoConfigRunnerProgress{}
_ ProgressDetails = AutoConfigEnvRunnerProgress{}
_ ProgressDetails = AutoConfigTaskProgress{}
+ _ ProgressDetails = AutoUpdateSQLActivityProgress{}
)
// Type returns the payload's job type and panics if the type is invalid.
@@ -156,6 +158,7 @@ var AutomaticJobTypes = [...]Type{
TypeAutoConfigEnvRunner,
TypeAutoConfigTask,
TypeKeyVisualizer,
+ TypeAutoUpdateSQLActivity,
}
// DetailsType returns the type for a payload detail.
@@ -207,6 +210,8 @@ func DetailsType(d isPayload_Details) (Type, error) {
return TypeAutoConfigEnvRunner, nil
case *Payload_AutoConfigTask:
return TypeAutoConfigTask, nil
+ case *Payload_AutoUpdateSqlActivities:
+ return TypeAutoUpdateSQLActivity, nil
default:
return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d)
}
@@ -250,6 +255,7 @@ var JobDetailsForEveryJobType = map[Type]Details{
TypeAutoConfigRunner: AutoConfigRunnerDetails{},
TypeAutoConfigEnvRunner: AutoConfigEnvRunnerDetails{},
TypeAutoConfigTask: AutoConfigTaskDetails{},
+ TypeAutoUpdateSQLActivity: AutoUpdateSQLActivityDetails{},
}
// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
@@ -303,6 +309,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_AutoConfigEnvRunner{AutoConfigEnvRunner: &d}
case AutoConfigTaskProgress:
return &Progress_AutoConfigTask{AutoConfigTask: &d}
+ case AutoUpdateSQLActivityProgress:
+ return &Progress_UpdateSqlActivity{UpdateSqlActivity: &d}
default:
panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d))
}
@@ -354,6 +362,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.AutoConfigEnvRunner
case *Payload_AutoConfigTask:
return *d.AutoConfigTask
+ case *Payload_AutoUpdateSqlActivities:
+ return *d.AutoUpdateSqlActivities
default:
return nil
}
@@ -405,6 +415,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.AutoConfigEnvRunner
case *Progress_AutoConfigTask:
return *d.AutoConfigTask
+ case *Progress_UpdateSqlActivity:
+ return *d.UpdateSqlActivity
default:
return nil
}
@@ -480,6 +492,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_AutoConfigEnvRunner{AutoConfigEnvRunner: &d}
case AutoConfigTaskDetails:
return &Payload_AutoConfigTask{AutoConfigTask: &d}
+ case AutoUpdateSQLActivityDetails:
+ return &Payload_AutoUpdateSqlActivities{AutoUpdateSqlActivities: &d}
default:
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
@@ -515,7 +529,7 @@ const (
func (Type) SafeValue() {}
// NumJobTypes is the number of jobs types.
-const NumJobTypes = 23
+const NumJobTypes = 24
// ChangefeedDetailsMarshaler allows for dependency injection of
// cloud.SanitizeExternalStorageURI to avoid the dependency from this
diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go
index 4f876cdb3161..884cf80839fb 100644
--- a/pkg/jobs/registry.go
+++ b/pkg/jobs/registry.go
@@ -301,6 +301,9 @@ const (
// AutoConfigRunnerJobID A static job ID is used for the auto config runner job.
AutoConfigRunnerJobID = jobspb.JobID(102)
+
+ // SqlActivityUpdaterJobID A static job ID is used for the SQL activity tables.
+ SqlActivityUpdaterJobID = jobspb.JobID(103)
)
// MakeJobID generates a new job ID.
@@ -692,6 +695,44 @@ func (r *Registry) CreateJobWithTxn(
return j, nil
}
+// CreateIfNotExistAdoptableJobWithTxn checks if a job already exists in
+// the system.jobs table, and if it does not it will create the job. The job
+// will be adopted for execution at a later time by some node in the cluster.
+func (r *Registry) CreateIfNotExistAdoptableJobWithTxn(
+ ctx context.Context, record Record, txn isql.Txn,
+) error {
+ if record.JobID == 0 {
+ return fmt.Errorf("invalid record.JobID value: %d", record.JobID)
+ }
+
+ if txn == nil {
+ return fmt.Errorf("txn is required for job: %d", record.JobID)
+ }
+
+ // Make sure job with id doesn't already exist in system.jobs.
+ // Use a txn to avoid race conditions
+ row, err := txn.QueryRowEx(
+ ctx,
+ "check if job exists",
+ txn.KV(),
+ sessiondata.InternalExecutorOverride{User: username.RootUserName()},
+ "SELECT id FROM system.jobs WHERE id = $1",
+ record.JobID,
+ )
+ if err != nil {
+ return err
+ }
+
+ // If there isn't a row for the job, create the job.
+ if row == nil {
+ if _, err = r.CreateAdoptableJobWithTxn(ctx, record, record.JobID, txn); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
// CreateAdoptableJobWithTxn creates a job which will be adopted for execution
// at a later time by some node in the cluster.
func (r *Registry) CreateAdoptableJobWithTxn(
diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go
index 77f45dec2e84..c0a5b1953813 100644
--- a/pkg/jobs/registry_test.go
+++ b/pkg/jobs/registry_test.go
@@ -236,22 +236,23 @@ INSERT INTO t."%s" VALUES('a', 'foo');
newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute),
StatusCanceled, mutOptions)
+ sqlActivityJob := fmt.Sprintf("%d", SqlActivityUpdaterJobID)
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
- {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
+ {sqlActivityJob}, {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})
if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
- {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob},
- {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})
+ {sqlActivityJob}, {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob},
+ {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})
if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
- {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})
+ {sqlActivityJob}, {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})
// Delete the revert failed, and running jobs for the next run of the
// test.
@@ -280,6 +281,7 @@ func TestRegistryGCPagination(t *testing.T) {
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
SkipAutoConfigRunnerJobBootstrap: true,
+ SkipUpdateSQLActivityJobBootstrap: true,
},
KeyVisualizer: &keyvisualizer.TestingKnobs{
SkipJobBootstrap: true,
@@ -447,6 +449,39 @@ func TestCreateJobWritesToJobInfo(t *testing.T) {
}))
runTests(t, createdJob)
})
+
+ t.Run("CreateIfNotExistAdoptableJobWithTxn", func(t *testing.T) {
+ tempRecord := Record{
+ JobID: r.MakeJobID(),
+ Details: jobspb.ImportDetails{},
+ Progress: jobspb.ImportProgress{},
+ Username: username.RootUserName(),
+ }
+
+ // loop to verify no errors if create if not exist is called multiple times
+ for i := 0; i < 3; i++ {
+ err := ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
+ return r.CreateIfNotExistAdoptableJobWithTxn(ctx, tempRecord, txn)
+ })
+ require.NoError(t, err)
+ }
+
+ require.NoError(t, ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
+ row, err := txn.QueryRowEx(
+ ctx,
+ "check if job exists",
+ txn.KV(),
+ sessiondata.InternalExecutorOverride{User: username.RootUserName()},
+ "SELECT id FROM system.jobs WHERE id = $1",
+ tempRecord.JobID,
+ )
+ if err != nil {
+ return err
+ }
+ require.NotNil(t, row)
+ return nil
+ }))
+ })
}
func TestBatchJobsCreation(t *testing.T) {
@@ -665,6 +700,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
SkipAutoConfigRunnerJobBootstrap: true,
+ SkipUpdateSQLActivityJobBootstrap: true,
},
KeyVisualizer: &keyvisualizer.TestingKnobs{
SkipJobBootstrap: true,
diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel
index 93521d273d0e..0b828893168a 100644
--- a/pkg/sql/BUILD.bazel
+++ b/pkg/sql/BUILD.bazel
@@ -236,6 +236,7 @@ go_library(
"sort.go",
"split.go",
"spool.go",
+ "sql_activity_update_job.go",
"sql_cursor.go",
"statement.go",
"subquery.go",
@@ -674,6 +675,7 @@ go_test(
"show_trace_replica_test.go",
"sort_test.go",
"split_test.go",
+ "sql_activity_update_job_test.go",
"sql_cursor_test.go",
"sql_prepare_test.go",
"statement_mark_redaction_test.go",
@@ -817,6 +819,7 @@ go_test(
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/sqlstats",
+ "//pkg/sql/sqlstats/persistedsqlstats",
"//pkg/sql/sqltestutils",
"//pkg/sql/stats",
"//pkg/sql/stmtdiagnostics",
diff --git a/pkg/sql/opt/exec/execbuilder/testdata/observability b/pkg/sql/opt/exec/execbuilder/testdata/observability
new file mode 100644
index 000000000000..d2d0d5735159
--- /dev/null
+++ b/pkg/sql/opt/exec/execbuilder/testdata/observability
@@ -0,0 +1,710 @@
+# LogicTest: local
+
+# Generates the explain plans the sql_activity_update_job uses to update
+# transaction_activity and statement_activity tables
+
+statement ok
+set enable_zigzag_join = false
+
+statement ok
+INSERT INTO system.users VALUES ('node', NULL, true, 3)
+
+statement ok
+GRANT node TO root
+
+# Upsert all transaction_activity
+query T retry
+EXPLAIN (VERBOSE) UPSERT INTO system.public.transaction_activity
+ (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata,
+ statistics, query, execution_count, execution_total_seconds,
+ execution_total_cluster_seconds, contention_time_avg_seconds,
+ cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+ (SELECT aggregated_ts,
+ fingerprint_id,
+ app_name,
+ agg_interval,
+ metadata,
+ statistics,
+ '' AS query,
+ (statistics->'execution_statistics'->>'cnt')::int,
+ ((statistics->'execution_statistics'->>'cnt')::float)*((statistics->'statistics'->'svcLat'->>'mean')::float),
+ 100 AS execution_total_cluster_seconds,
+ COALESCE((statistics->'execution_statistics'->'contentionTime'->>'mean')::float,0),
+ COALESCE((statistics->'execution_statistics'->'cpu_sql_nanos'->>'mean')::float,0),
+ (statistics->'statistics'->'svcLat'->>'mean')::float,
+ COALESCE((statistics->'statistics'->'latencyInfo'->>'p99')::float, 0)
+ FROM (SELECT
+ max(aggregated_ts) AS aggregated_ts,
+ app_name,
+ fingerprint_id,
+ agg_interval,
+ crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
+ crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
+ FROM system.public.transaction_statistics
+ WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00'
+ and app_name not like '$ internal%'
+ GROUP BY app_name,
+ fingerprint_id,
+ agg_interval));
+----
+distribution: local
+vectorized: true
+·
+• upsert
+│ columns: ()
+│ estimated row count: 0 (missing stats)
+│ into: transaction_activity(aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+│ auto commit
+│ arbiter indexes: primary
+│
+└── • project
+ │ columns: (max, fingerprint_id, app_name, agg_interval, metadata, statistics, query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds, agg_interval, metadata, statistics, query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts)
+ │
+ └── • lookup join (left outer)
+ │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics, aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+ │ estimated row count: 3 (missing stats)
+ │ table: transaction_activity@primary
+ │ equality: (max, fingerprint_id, app_name) = (aggregated_ts,fingerprint_id,app_name)
+ │ equality cols are key
+ │
+ └── • distinct
+ │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics)
+ │ estimated row count: 3 (missing stats)
+ │ distinct on: fingerprint_id, app_name, max
+ │ nulls are distinct
+ │ error on duplicate
+ │
+ └── • render
+ │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics)
+ │ render query: ''
+ │ render int8: ((statistics->'execution_statistics')->>'cnt')::INT8
+ │ render ?column?: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render execution_total_cluster_seconds: 100.0
+ │ render coalesce: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0)
+ │ render coalesce: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0)
+ │ render float8: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render coalesce: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0)
+ │ render fingerprint_id: fingerprint_id
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │ render max: max
+ │ render metadata: metadata
+ │ render statistics: statistics
+ │
+ └── • render
+ │ columns: (metadata, statistics, fingerprint_id, app_name, agg_interval, max)
+ │ render metadata: crdb_internal.merge_stats_metadata(array_agg)
+ │ render statistics: crdb_internal.merge_transaction_stats(array_agg)
+ │ render fingerprint_id: fingerprint_id
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │ render max: max
+ │
+ └── • group (hash)
+ │ columns: (fingerprint_id, app_name, agg_interval, max, array_agg, array_agg)
+ │ estimated row count: 3 (missing stats)
+ │ aggregate 0: max(aggregated_ts)
+ │ aggregate 1: array_agg(metadata)
+ │ aggregate 2: array_agg(statistics)
+ │ group by: fingerprint_id, app_name, agg_interval
+ │
+ └── • index join
+ │ columns: (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics)
+ │ estimated row count: 3 (missing stats)
+ │ table: transaction_statistics@primary
+ │ key columns: crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id
+ │
+ └── • scan
+ columns: (aggregated_ts, fingerprint_id, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8)
+ estimated row count: 3 (missing stats)
+ table: transaction_statistics@execution_count_idx (partial index)
+ spans: /2023-04-10T16:00:00Z-/2023-04-10T16:00:00.000000001Z
+
+# Upsert all statement_activity
+query T retry
+EXPLAIN (VERBOSE) UPSERT
+ INTO system.public.statement_activity (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name,
+ agg_interval, metadata, statistics, plan, index_recommendations, execution_count,
+ execution_total_seconds, execution_total_cluster_seconds,
+ contention_time_avg_seconds,
+ cpu_sql_avg_nanos,
+ service_latency_avg_seconds, service_latency_p99_seconds)
+ (SELECT aggregated_ts,
+ fingerprint_id,
+ transaction_fingerprint_id,
+ plan_hash,
+ app_name,
+ agg_interval,
+ metadata,
+ statistics,
+ plan,
+ index_recommendations,
+ (statistics -> 'execution_statistics' ->> 'cnt')::int,
+ ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float),
+ 100 AS execution_total_cluster_seconds,
+ COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0),
+ COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0),
+ (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float,
+ COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0)
+ FROM (SELECT max(aggregated_ts) AS aggregated_ts,
+ fingerprint_id,
+ transaction_fingerprint_id,
+ plan_hash,
+ app_name,
+ agg_interval,
+ crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
+ crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
+ plan,
+ index_recommendations
+ FROM system.public.statement_statistics
+ WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00'
+ and app_name not like '$ internal%'
+ GROUP BY app_name,
+ fingerprint_id,
+ transaction_fingerprint_id,
+ plan_hash,
+ agg_interval,
+ plan,
+ index_recommendations));
+----
+distribution: local
+vectorized: true
+·
+• upsert
+│ columns: ()
+│ estimated row count: 0 (missing stats)
+│ into: statement_activity(aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+│ auto commit
+│ arbiter indexes: primary
+│
+└── • project
+ │ columns: (max, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds, agg_interval, metadata, statistics, plan, index_recommendations, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts)
+ │
+ └── • lookup join (left outer)
+ │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics, aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+ │ estimated row count: 3 (missing stats)
+ │ table: statement_activity@primary
+ │ equality: (max, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name) = (aggregated_ts,fingerprint_id,transaction_fingerprint_id,plan_hash,app_name)
+ │ equality cols are key
+ │
+ └── • distinct
+ │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics)
+ │ estimated row count: 3 (missing stats)
+ │ distinct on: fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, max
+ │ nulls are distinct
+ │ error on duplicate
+ │
+ └── • render
+ │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics)
+ │ render int8: ((statistics->'execution_statistics')->>'cnt')::INT8
+ │ render ?column?: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render execution_total_cluster_seconds: 100.0
+ │ render coalesce: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0)
+ │ render coalesce: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0)
+ │ render float8: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render coalesce: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0)
+ │ render fingerprint_id: fingerprint_id
+ │ render transaction_fingerprint_id: transaction_fingerprint_id
+ │ render plan_hash: plan_hash
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │ render plan: plan
+ │ render index_recommendations: index_recommendations
+ │ render max: max
+ │ render metadata: metadata
+ │ render statistics: statistics
+ │
+ └── • render
+ │ columns: (metadata, statistics, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max)
+ │ render metadata: crdb_internal.merge_stats_metadata(array_agg)
+ │ render statistics: crdb_internal.merge_statement_stats(array_agg)
+ │ render fingerprint_id: fingerprint_id
+ │ render transaction_fingerprint_id: transaction_fingerprint_id
+ │ render plan_hash: plan_hash
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │ render plan: plan
+ │ render index_recommendations: index_recommendations
+ │ render max: max
+ │
+ └── • group (hash)
+ │ columns: (fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, array_agg, array_agg)
+ │ estimated row count: 3 (missing stats)
+ │ aggregate 0: max(aggregated_ts)
+ │ aggregate 1: array_agg(metadata)
+ │ aggregate 2: array_agg(statistics)
+ │ group by: fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations
+ │
+ └── • index join
+ │ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations)
+ │ estimated row count: 3 (missing stats)
+ │ table: statement_statistics@primary
+ │ key columns: crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id
+ │
+ └── • scan
+ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8)
+ estimated row count: 3 (missing stats)
+ table: statement_statistics@execution_count_idx (partial index)
+ spans: /2023-04-10T16:00:00Z-/2023-04-10T16:00:00.000000001Z
+
+# Upsert top 500 statement_activity including all statements in the top 500 transactions
+query T retry
+EXPLAIN (VERBOSE) UPSERT
+ INTO system.public.statement_activity
+ (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name,
+ agg_interval, metadata, statistics, plan, index_recommendations, execution_count,
+ execution_total_seconds, execution_total_cluster_seconds,
+ contention_time_avg_seconds,
+ cpu_sql_avg_nanos,
+ service_latency_avg_seconds, service_latency_p99_seconds)
+ (SELECT aggregated_ts,
+ fingerprint_id,
+ transaction_fingerprint_id,
+ plan_hash,
+ app_name,
+ agg_interval,
+ metadata,
+ statistics,
+ plan,
+ index_recommendations,
+ (statistics -> 'execution_statistics' ->> 'cnt')::int,
+ ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float),
+ 100 AS execution_total_cluster_seconds,
+ COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0),
+ COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0),
+ (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float,
+ COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0)
+ FROM (SELECT max(ss.aggregated_ts) AS aggregated_ts,
+ ss.fingerprint_id,
+ ss.transaction_fingerprint_id,
+ ss.plan_hash,
+ ss.app_name,
+ ss.agg_interval,
+ crdb_internal.merge_stats_metadata(array_agg(ss.metadata)) AS metadata,
+ crdb_internal.merge_statement_stats(array_agg(ss.statistics)) AS statistics,
+ ss.plan,
+ ss.index_recommendations
+ FROM system.public.statement_statistics ss
+ inner join (SELECT fingerprint_id, app_name
+ FROM (SELECT fingerprint_id, app_name,
+ row_number()
+ OVER (ORDER BY (statistics -> 'execution_statistics' ->> 'cnt')::int desc) AS ePos,
+ row_number()
+ OVER (ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float desc) AS sPos,
+ row_number() OVER (ORDER BY
+ ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float) desc) AS tPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float,
+ 0) desc) AS cPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float,
+ 0) desc) AS uPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float,
+ 0) desc) AS lPos
+ FROM (SELECT fingerprint_id,
+ app_name,
+ crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
+ FROM system.public.statement_statistics
+ WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00' and
+ app_name not like '$ internal%'
+ GROUP BY app_name,
+ fingerprint_id))
+ WHERE ePos < 500
+ or sPos < 500
+ or tPos < 500
+ or cPos < 500
+ or uPos < 500
+ or lPos < 500) agg on agg.app_name = ss.app_name and agg.fingerprint_id = ss.fingerprint_id
+ WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00'
+ GROUP BY ss.app_name,
+ ss.fingerprint_id,
+ ss.transaction_fingerprint_id,
+ ss.plan_hash,
+ ss.agg_interval,
+ ss.plan,
+ ss.index_recommendations));
+----
+distribution: local
+vectorized: true
+·
+• upsert
+│ columns: ()
+│ estimated row count: 0 (missing stats)
+│ into: statement_activity(aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+│ auto commit
+│ arbiter indexes: primary
+│
+└── • project
+ │ columns: (max, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds, agg_interval, metadata, statistics, plan, index_recommendations, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts)
+ │
+ └── • lookup join (left outer)
+ │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics, aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+ │ estimated row count: 0 (missing stats)
+ │ table: statement_activity@primary
+ │ equality: (max, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name) = (aggregated_ts,fingerprint_id,transaction_fingerprint_id,plan_hash,app_name)
+ │ equality cols are key
+ │
+ └── • distinct
+ │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics)
+ │ estimated row count: 0 (missing stats)
+ │ distinct on: fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, max
+ │ nulls are distinct
+ │ error on duplicate
+ │
+ └── • render
+ │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics)
+ │ render int8: ((statistics->'execution_statistics')->>'cnt')::INT8
+ │ render ?column?: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render execution_total_cluster_seconds: 100.0
+ │ render coalesce: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0)
+ │ render coalesce: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0)
+ │ render float8: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render coalesce: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0)
+ │ render fingerprint_id: fingerprint_id
+ │ render transaction_fingerprint_id: transaction_fingerprint_id
+ │ render plan_hash: plan_hash
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │ render plan: plan
+ │ render index_recommendations: index_recommendations
+ │ render max: max
+ │ render metadata: metadata
+ │ render statistics: statistics
+ │
+ └── • render
+ │ columns: (metadata, statistics, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max)
+ │ render metadata: crdb_internal.merge_stats_metadata(array_agg)
+ │ render statistics: crdb_internal.merge_statement_stats(array_agg)
+ │ render fingerprint_id: fingerprint_id
+ │ render transaction_fingerprint_id: transaction_fingerprint_id
+ │ render plan_hash: plan_hash
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │ render plan: plan
+ │ render index_recommendations: index_recommendations
+ │ render max: max
+ │
+ └── • group (hash)
+ │ columns: (fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, array_agg, array_agg)
+ │ estimated row count: 0 (missing stats)
+ │ aggregate 0: max(aggregated_ts)
+ │ aggregate 1: array_agg(metadata)
+ │ aggregate 2: array_agg(statistics)
+ │ group by: fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations
+ │
+ └── • project
+ │ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations)
+ │
+ └── • hash join (inner)
+ │ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number)
+ │ estimated row count: 0 (missing stats)
+ │ equality: (app_name, fingerprint_id) = (app_name, fingerprint_id)
+ │ right cols are key
+ │
+ ├── • scan
+ │ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations)
+ │ estimated row count: 10 (missing stats)
+ │ table: statement_statistics@primary
+ │ spans: /0/2023-04-10T16:00:00Z-/0/2023-04-10T16:00:00.000000001Z /1/2023-04-10T16:00:00Z-/1/2023-04-10T16:00:00.000000001Z /2/2023-04-10T16:00:00Z-/2/2023-04-10T16:00:00.000000001Z /3/2023-04-10T16:00:00Z-/3/2023-04-10T16:00:00.000000001Z /4/2023-04-10T16:00:00Z-/4/2023-04-10T16:00:00.000000001Z /5/2023-04-10T16:00:00Z-/5/2023-04-10T16:00:00.000000001Z /6/2023-04-10T16:00:00Z-/6/2023-04-10T16:00:00.000000001Z /7/2023-04-10T16:00:00Z-/7/2023-04-10T16:00:00.000000001Z
+ │
+ └── • filter
+ │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number)
+ │ estimated row count: 3 (missing stats)
+ │ filter: (((((row_number < 500) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_6_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_5_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_4_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_3_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_2_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_1_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • render
+ │ columns: (fingerprint_id, app_name, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ render row_number_1_orderby_1_1: ((statistics->'execution_statistics')->>'cnt')::INT8
+ │ render row_number_2_orderby_1_1: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render row_number_3_orderby_1_1: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render row_number_4_orderby_1_1: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0)
+ │ render row_number_5_orderby_1_1: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0)
+ │ render row_number_6_orderby_1_1: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0)
+ │ render fingerprint_id: fingerprint_id
+ │ render app_name: app_name
+ │
+ └── • render
+ │ columns: (statistics, fingerprint_id, app_name)
+ │ render statistics: crdb_internal.merge_statement_stats(array_agg)
+ │ render fingerprint_id: fingerprint_id
+ │ render app_name: app_name
+ │
+ └── • group (hash)
+ │ columns: (fingerprint_id, app_name, array_agg)
+ │ estimated row count: 3 (missing stats)
+ │ aggregate 0: array_agg(statistics)
+ │ group by: fingerprint_id, app_name
+ │
+ └── • project
+ │ columns: (fingerprint_id, app_name, statistics)
+ │
+ └── • index join
+ │ columns: (aggregated_ts, fingerprint_id, app_name, statistics)
+ │ estimated row count: 3 (missing stats)
+ │ table: statement_statistics@primary
+ │ key columns: crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id
+ │
+ └── • scan
+ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8)
+ estimated row count: 3 (missing stats)
+ table: statement_statistics@execution_count_idx (partial index)
+ spans: /2023-04-10T16:00:00Z-/2023-04-10T16:00:00.000000001Z
+
+# Upsert top 500 transactions
+query T retry
+EXPLAIN (VERBOSE) UPSERT
+ INTO system.public.transaction_activity
+ (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata,
+ statistics, query, execution_count, execution_total_seconds,
+ execution_total_cluster_seconds, contention_time_avg_seconds,
+ cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+ (SELECT aggregated_ts,
+ fingerprint_id,
+ app_name,
+ agg_interval,
+ metadata,
+ statistics,
+ '' AS query,
+ (statistics -> 'execution_statistics' ->> 'cnt')::int,
+ ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float),
+ 100 AS execution_total_cluster_seconds,
+ COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0),
+ COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0),
+ (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float,
+ COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0)
+ FROM (SELECT max(ts.aggregated_ts) AS aggregated_ts,
+ ts.app_name,
+ ts.fingerprint_id,
+ ts.agg_interval,
+ crdb_internal.merge_stats_metadata(array_agg(ts.metadata)) AS metadata,
+ crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
+ FROM system.public.transaction_statistics ts
+ inner join (SELECT fingerprint_id, app_name, agg_interval
+ FROM (SELECT fingerprint_id, app_name, agg_interval,
+ row_number()
+ OVER (ORDER BY (statistics -> 'execution_statistics' ->> 'cnt')::int desc) AS ePos,
+ row_number()
+ OVER (ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float desc) AS sPos,
+ row_number()
+ OVER (ORDER BY ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float) desc) AS tPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float,
+ 0) desc) AS cPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float,
+ 0) desc) AS uPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float,
+ 0) desc) AS lPos
+ FROM (SELECT fingerprint_id, app_name, agg_interval,
+ crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
+ FROM system.public.transaction_statistics
+ WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00' and
+ app_name not like '$ internal%'
+ GROUP BY app_name,
+ fingerprint_id,
+ agg_interval))
+ WHERE ePos < 500
+ or sPos < 500
+ or tPos < 500
+ or cPos < 500
+ or uPos < 500
+ or lPos < 500) agg
+ on agg.app_name = ts.app_name and agg.fingerprint_id = ts.fingerprint_id and
+ agg.agg_interval = ts.agg_interval
+ GROUP BY ts.app_name,
+ ts.fingerprint_id,
+ ts.agg_interval));
+----
+distribution: local
+vectorized: true
+·
+• upsert
+│ columns: ()
+│ estimated row count: 0 (missing stats)
+│ into: transaction_activity(aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+│ auto commit
+│ arbiter indexes: primary
+│
+└── • project
+ │ columns: (max, fingerprint_id, app_name, agg_interval, metadata, statistics, query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds, agg_interval, metadata, statistics, query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts)
+ │
+ └── • lookup join (left outer)
+ │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics, aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+ │ estimated row count: 0 (missing stats)
+ │ table: transaction_activity@primary
+ │ equality: (max, fingerprint_id, app_name) = (aggregated_ts,fingerprint_id,app_name)
+ │ equality cols are key
+ │
+ └── • distinct
+ │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics)
+ │ estimated row count: 0 (missing stats)
+ │ distinct on: fingerprint_id, app_name, max
+ │ nulls are distinct
+ │ error on duplicate
+ │
+ └── • render
+ │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics)
+ │ render query: ''
+ │ render int8: ((statistics->'execution_statistics')->>'cnt')::INT8
+ │ render ?column?: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render execution_total_cluster_seconds: 100.0
+ │ render coalesce: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0)
+ │ render coalesce: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0)
+ │ render float8: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render coalesce: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0)
+ │ render fingerprint_id: fingerprint_id
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │ render max: max
+ │ render metadata: metadata
+ │ render statistics: statistics
+ │
+ └── • render
+ │ columns: (metadata, statistics, fingerprint_id, app_name, agg_interval, max)
+ │ render metadata: crdb_internal.merge_stats_metadata(array_agg)
+ │ render statistics: crdb_internal.merge_transaction_stats(array_agg)
+ │ render fingerprint_id: fingerprint_id
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │ render max: max
+ │
+ └── • group (hash)
+ │ columns: (fingerprint_id, app_name, agg_interval, max, array_agg, array_agg)
+ │ estimated row count: 0 (missing stats)
+ │ aggregate 0: max(aggregated_ts)
+ │ aggregate 1: array_agg(metadata)
+ │ aggregate 2: array_agg(statistics)
+ │ group by: fingerprint_id, app_name, agg_interval
+ │
+ └── • project
+ │ columns: (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics)
+ │
+ └── • project
+ │ columns: (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │
+ └── • lookup join (inner)
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number, aggregated_ts, fingerprint_id, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, agg_interval, metadata, statistics)
+ │ estimated row count: 0 (missing stats)
+ │ table: transaction_statistics@primary
+ │ equality: (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id) = (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8,aggregated_ts,fingerprint_id,app_name,node_id)
+ │ equality cols are key
+ │ pred: agg_interval = agg_interval
+ │
+ └── • lookup join (inner)
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number, aggregated_ts, fingerprint_id, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8)
+ │ estimated row count: 0 (missing stats)
+ │ table: transaction_statistics@fingerprint_stats_idx
+ │ lookup condition: (fingerprint_id = fingerprint_id) AND (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 IN (0, 1, 2, 3, 4, 5, 6, 7))
+ │ pred: app_name = app_name
+ │
+ └── • filter
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number)
+ │ estimated row count: 3 (missing stats)
+ │ filter: (((((row_number < 500) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_6_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_5_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_4_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_3_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_2_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • window
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ estimated row count: 3 (missing stats)
+ │ window 0: row_number() OVER (ORDER BY row_number_1_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ │
+ └── • render
+ │ columns: (fingerprint_id, app_name, agg_interval, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1)
+ │ render row_number_1_orderby_1_1: ((statistics->'execution_statistics')->>'cnt')::INT8
+ │ render row_number_2_orderby_1_1: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render row_number_3_orderby_1_1: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8
+ │ render row_number_4_orderby_1_1: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0)
+ │ render row_number_5_orderby_1_1: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0)
+ │ render row_number_6_orderby_1_1: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0)
+ │ render fingerprint_id: fingerprint_id
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │
+ └── • render
+ │ columns: (statistics, fingerprint_id, app_name, agg_interval)
+ │ render statistics: crdb_internal.merge_transaction_stats(array_agg)
+ │ render fingerprint_id: fingerprint_id
+ │ render app_name: app_name
+ │ render agg_interval: agg_interval
+ │
+ └── • group (hash)
+ │ columns: (fingerprint_id, app_name, agg_interval, array_agg)
+ │ estimated row count: 3 (missing stats)
+ │ aggregate 0: array_agg(statistics)
+ │ group by: fingerprint_id, app_name, agg_interval
+ │
+ └── • project
+ │ columns: (fingerprint_id, app_name, agg_interval, statistics)
+ │
+ └── • index join
+ │ columns: (aggregated_ts, fingerprint_id, app_name, agg_interval, statistics)
+ │ estimated row count: 3 (missing stats)
+ │ table: transaction_statistics@primary
+ │ key columns: crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id
+ │
+ └── • scan
+ columns: (aggregated_ts, fingerprint_id, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8)
+ estimated row count: 3 (missing stats)
+ table: transaction_statistics@execution_count_idx (partial index)
+ spans: /2023-04-10T16:00:00Z-/2023-04-10T16:00:00.000000001Z
diff --git a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go
index 83fb197ec1cc..4237d46387bb 100644
--- a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go
+++ b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go
@@ -399,6 +399,13 @@ func TestExecBuild_not_visible_index(
runExecBuildLogicTest(t, "not_visible_index")
}
+func TestExecBuild_observability(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runExecBuildLogicTest(t, "observability")
+}
+
func TestExecBuild_orderby(
t *testing.T,
) {
diff --git a/pkg/sql/sql_activity_update_job.go b/pkg/sql/sql_activity_update_job.go
new file mode 100644
index 000000000000..9db8e76fc083
--- /dev/null
+++ b/pkg/sql/sql_activity_update_job.go
@@ -0,0 +1,669 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package sql
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/jobs"
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
+ "github.com/cockroachdb/cockroach/pkg/settings"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
+ "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/metric"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
+ io_prometheus_client "github.com/prometheus/client_model/go"
+)
+
+// enabled the stats activity flush job.
+var enabled = settings.RegisterBoolSetting(
+ settings.SystemOnly,
+ "sql.stats.activity.flush.enabled",
+ "enable the flush to the system statement and transaction activity tables",
+ true)
+
+// sqlStatsActivityTopCount is the cluster setting that controls the number of
+// rows selected to be inserted into the activity tables
+var sqlStatsActivityTopCount = settings.RegisterIntSetting(
+ settings.TenantWritable,
+ "sql.stats.activity.top.max",
+ "the limit per column for the top number of statistics to be flushed "+
+ "to the activity tables",
+ 500,
+ settings.NonNegativeInt,
+)
+
+// sqlStatsActivityMaxPersistedRows specifies maximum number of rows that will be
+// retained in system.statement_activity and system.transaction_activity.
+// Defaults computed 500(top limit)*6(num columns)*24(hrs)*3(days)=216000
+// to give a minimum of 3 days of history. It was rounded down to 200k to
+// give an even number. The top 500(controlled by sql.stats.activity.top.max)
+// are likely the same for several columns, so it should still give 3 days
+// of history for the default settings
+var sqlStatsActivityMaxPersistedRows = settings.RegisterIntSetting(
+ settings.TenantWritable,
+ "sql.stats.activity.persisted_rows.max",
+ "maximum number of rows of statement and transaction"+
+ " activity that will be persisted in the system tables",
+ 200000, /* defaultValue*/
+ settings.NonNegativeInt,
+).WithPublic()
+
+const numberOfTopColumns = 6
+
+type sqlActivityUpdateJob struct {
+ job *jobs.Job
+}
+
+// Resume implements the jobs.sqlActivityUpdateJob interface.
+// The SQL activity job runs AS a forever-running background job
+// and runs the SqlActivityUpdater according to sql.stats.activity.flush.interval.
+func (j *sqlActivityUpdateJob) Resume(ctx context.Context, execCtxI interface{}) (jobErr error) {
+ log.Infof(ctx, "starting sql stats activity flush job")
+ // The sql activity update job is a forever running background job.
+ // It's always safe to wind the SQL pod down whenever it's
+ // running, something we indicate through the job's idle
+ // status.
+ j.job.MarkIdle(true)
+
+ execCtx := execCtxI.(JobExecContext)
+ stopper := execCtx.ExecCfg().DistSQLSrv.Stopper
+ settings := execCtx.ExecCfg().Settings
+ statsFlush := execCtx.ExecCfg().InternalDB.server.sqlStats
+ metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoUpdateSQLActivity].(ActivityUpdaterMetrics)
+
+ flushDoneSignal := make(chan struct{})
+ defer func() {
+ statsFlush.SetFlushDoneCallback(nil)
+ close(flushDoneSignal)
+ }()
+
+ for {
+ statsFlush.SetFlushDoneCallback(func() {
+ flushDoneSignal <- struct{}{}
+ })
+ select {
+ case <-flushDoneSignal:
+ // A flush was done. Set the timer and wait for it to complete.
+ if enabled.Get(&settings.SV) {
+ updater := NewSqlActivityUpdater(settings, execCtx.ExecCfg().InternalDB)
+ if err := updater.TransferStatsToActivity(ctx); err != nil {
+ log.Warningf(ctx, "error running sql activity updater job: %v", err)
+ metrics.numErrors.Inc(1)
+ }
+ }
+ case <-ctx.Done():
+ return nil
+ case <-stopper.ShouldQuiesce():
+ return nil
+ }
+ }
+}
+
+type ActivityUpdaterMetrics struct {
+ numErrors *metric.Counter
+}
+
+func (m ActivityUpdaterMetrics) MetricStruct() {}
+
+func newActivityUpdaterMetrics() metric.Struct {
+ return ActivityUpdaterMetrics{
+ numErrors: metric.NewCounter(metric.Metadata{
+ Name: "jobs.metrics.task_failed",
+ Help: "Number of metrics sql activity updater tasks that failed",
+ Measurement: "errors",
+ Unit: metric.Unit_COUNT,
+ MetricType: io_prometheus_client.MetricType_COUNTER,
+ }),
+ }
+}
+
+// OnFailOrCancel implements the jobs.sqlActivityUpdateJob interface.
+// No action needs to be taken on our part. There's no state to clean up.
+func (r *sqlActivityUpdateJob) OnFailOrCancel(
+ ctx context.Context, _ interface{}, jobErr error,
+) error {
+ if jobs.HasErrJobCanceled(jobErr) {
+ err := errors.NewAssertionErrorWithWrappedErrf(jobErr,
+ "sql activity is not cancelable")
+ log.Errorf(ctx, "%v", err)
+ }
+ return nil
+}
+
+func init() {
+ jobs.RegisterConstructor(jobspb.TypeAutoUpdateSQLActivity,
+ func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
+ return &sqlActivityUpdateJob{job: job}
+ },
+ jobs.DisablesTenantCostControl,
+ jobs.WithJobMetrics(newActivityUpdaterMetrics()),
+ )
+}
+
+// NewSqlActivityUpdater returns a new instance of SqlActivityUpdater.
+func NewSqlActivityUpdater(setting *cluster.Settings, db isql.DB) *SqlActivityUpdater {
+ return &SqlActivityUpdater{
+ st: setting,
+ db: db,
+ }
+}
+
+type SqlActivityUpdater struct {
+ st *cluster.Settings
+ db isql.DB
+}
+
+func (u *SqlActivityUpdater) TransferStatsToActivity(ctx context.Context) error {
+ // Get the config and pass it around to avoid any issue of it changing
+ // in the middle of the execution.
+ maxRowPersistedRows := sqlStatsActivityMaxPersistedRows.Get(&u.st.SV)
+ topLimit := sqlStatsActivityTopCount.Get(&u.st.SV)
+ aggTs := u.computeAggregatedTs(&u.st.SV)
+
+ // The counts are using AS OF SYSTEM TIME so the values may be slightly
+ // off. This is acceptable to increase the performance.
+ stmtRowCount, txnRowCount, totalStmtClusterExecCount, totalTxnClusterExecCount, err := u.getAostExecutionCount(ctx, aggTs)
+ if err != nil {
+ return err
+ }
+
+ // No need to continue since there are no rows to transfer
+ if stmtRowCount == 0 && txnRowCount == 0 {
+ log.Infof(ctx, "sql stats activity found no rows at %s", aggTs)
+ return nil
+ }
+
+ // Create space on the table before adding new rows to avoid
+ // going OVER the count. If the compaction fails it will not
+ // add any new rows.
+ err = u.compactActivityTables(ctx, maxRowPersistedRows-stmtRowCount)
+ if err != nil {
+ return err
+ }
+
+ // There are fewer rows than filtered top would return.
+ // Just transfer all the stats to avoid overhead of getting
+ // the tops.
+ if stmtRowCount < (topLimit*numberOfTopColumns) && txnRowCount < (topLimit*numberOfTopColumns) {
+ return u.transferAllStats(ctx, aggTs, totalStmtClusterExecCount, totalTxnClusterExecCount)
+ }
+
+ // Only transfer the top sql.stats.activity.top.max for each of
+ // the 6 most popular columns
+ err = u.transferTopStats(ctx, aggTs, topLimit, totalStmtClusterExecCount, totalTxnClusterExecCount)
+ return err
+}
+
+// transferAllStats is used to transfer all the stats FROM
+// system.statement_statistics and system.transaction_statistics
+// to system.statement_activity and system.transaction_activity
+func (u *SqlActivityUpdater) transferAllStats(
+ ctx context.Context,
+ aggTs time.Time,
+ totalStmtClusterExecCount int64,
+ totalTxnClusterExecCount int64,
+) error {
+ _, err := u.db.Executor().ExecEx(ctx,
+ "activity-flush-txn-transfer-all",
+ nil, /* txn */
+ sessiondata.NodeUserSessionDataOverride,
+ `
+ UPSERT INTO system.public.transaction_activity
+(aggregated_ts, fingerprint_id, app_name, agg_interval, metadata,
+ statistics, query, execution_count, execution_total_seconds,
+ execution_total_cluster_seconds, contention_time_avg_seconds,
+ cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+ (SELECT aggregated_ts,
+ fingerprint_id,
+ app_name,
+ agg_interval,
+ metadata,
+ statistics,
+ '' AS query,
+ (statistics->'execution_statistics'->>'cnt')::int,
+ ((statistics->'execution_statistics'->>'cnt')::float)*((statistics->'statistics'->'svcLat'->>'mean')::float),
+ $1 AS execution_total_cluster_seconds,
+ COALESCE((statistics->'execution_statistics'->'contentionTime'->>'mean')::float,0),
+ COALESCE((statistics->'execution_statistics'->'cpu_sql_nanos'->>'mean')::float,0),
+ (statistics->'statistics'->'svcLat'->>'mean')::float,
+ COALESCE((statistics->'statistics'->'latencyInfo'->>'p99')::float, 0)
+ FROM (SELECT
+ max(aggregated_ts) AS aggregated_ts,
+ app_name,
+ fingerprint_id,
+ agg_interval,
+ crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
+ crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
+ FROM system.public.transaction_statistics
+ WHERE aggregated_ts = $2
+ and app_name not like '$ internal%'
+ GROUP BY app_name,
+ fingerprint_id,
+ agg_interval));
+`,
+ totalTxnClusterExecCount,
+ aggTs,
+ )
+
+ if err != nil {
+ return err
+ }
+
+ _, err = u.db.Executor().ExecEx(ctx,
+ "activity-flush-stmt-transfer-all",
+ nil, /* txn */
+ sessiondata.NodeUserSessionDataOverride,
+ `
+ UPSERT
+INTO system.public.statement_activity (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name,
+ agg_interval, metadata, statistics, plan, index_recommendations, execution_count,
+ execution_total_seconds, execution_total_cluster_seconds,
+ contention_time_avg_seconds,
+ cpu_sql_avg_nanos,
+ service_latency_avg_seconds, service_latency_p99_seconds)
+ (SELECT aggregated_ts,
+ fingerprint_id,
+ transaction_fingerprint_id,
+ plan_hash,
+ app_name,
+ agg_interval,
+ metadata,
+ statistics,
+ plan,
+ index_recommendations,
+ (statistics -> 'execution_statistics' ->> 'cnt')::int,
+ ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float),
+ $1 AS execution_total_cluster_seconds,
+ COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0),
+ COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0),
+ (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float,
+ COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0)
+ FROM (SELECT max(aggregated_ts) AS aggregated_ts,
+ fingerprint_id,
+ transaction_fingerprint_id,
+ plan_hash,
+ app_name,
+ agg_interval,
+ crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
+ crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
+ plan,
+ index_recommendations
+ FROM system.public.statement_statistics
+ WHERE aggregated_ts = $2
+ and app_name not like '$ internal%'
+ GROUP BY app_name,
+ fingerprint_id,
+ transaction_fingerprint_id,
+ plan_hash,
+ agg_interval,
+ plan,
+ index_recommendations));
+`,
+ totalStmtClusterExecCount,
+ aggTs,
+ )
+
+ return err
+}
+
+// transferTopStats is used to transfer top N stats FROM
+// system.statement_statistics and system.transaction_statistics
+// to system.statement_activity and system.transaction_activity
+func (u *SqlActivityUpdater) transferTopStats(
+ ctx context.Context,
+ aggTs time.Time,
+ topLimit int64,
+ totalStmtClusterExecCount int64,
+ totalTxnClusterExecCount int64,
+) (retErr error) {
+ // Select the top 500 (controlled by sql.stats.activity.top.max) for
+ // each of execution_count, total execution time, service_latency,cpu_sql_nanos,
+ // contention_time, p99_latency and insert into transaction_activity table.
+ // Up to 3000 rows (sql.stats.activity.top.max * 6) may be added to
+ // transaction_activity.
+ _, err := u.db.Executor().ExecEx(ctx,
+ "activity-flush-txn-transfer-tops",
+ nil, /* txn */
+ sessiondata.NodeUserSessionDataOverride,
+ `
+UPSERT
+INTO system.public.transaction_activity
+(aggregated_ts, fingerprint_id, app_name, agg_interval, metadata,
+ statistics, query, execution_count, execution_total_seconds,
+ execution_total_cluster_seconds, contention_time_avg_seconds,
+ cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds)
+ (SELECT aggregated_ts,
+ fingerprint_id,
+ app_name,
+ agg_interval,
+ metadata,
+ statistics,
+ '' AS query,
+ (statistics -> 'execution_statistics' ->> 'cnt')::int,
+ ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float),
+ $1 AS execution_total_cluster_seconds,
+ COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0),
+ COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0),
+ (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float,
+ COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0)
+ FROM (SELECT max(ts.aggregated_ts) AS aggregated_ts,
+ ts.app_name,
+ ts.fingerprint_id,
+ ts.agg_interval,
+ crdb_internal.merge_stats_metadata(array_agg(ts.metadata)) AS metadata,
+ crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
+ FROM system.public.transaction_statistics ts
+ inner join (SELECT fingerprint_id, app_name, agg_interval
+ FROM (SELECT fingerprint_id, app_name, agg_interval,
+ row_number()
+ OVER (ORDER BY (statistics -> 'execution_statistics' ->> 'cnt')::int desc) AS ePos,
+ row_number()
+ OVER (ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float desc) AS sPos,
+ row_number()
+ OVER (ORDER BY ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float) desc) AS tPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float,
+ 0) desc) AS cPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float,
+ 0) desc) AS uPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float,
+ 0) desc) AS lPos
+ FROM (SELECT fingerprint_id, app_name, agg_interval,
+ crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
+ FROM system.public.transaction_statistics
+ WHERE aggregated_ts = $2 and
+ app_name not like '$ internal%'
+ GROUP BY app_name,
+ fingerprint_id,
+ agg_interval))
+ WHERE ePos < $3
+ or sPos < $3
+ or tPos < $3
+ or cPos < $3
+ or uPos < $3
+ or lPos < $3) agg
+ on agg.app_name = ts.app_name and agg.fingerprint_id = ts.fingerprint_id and
+ agg.agg_interval = ts.agg_interval
+ GROUP BY ts.app_name,
+ ts.fingerprint_id,
+ ts.agg_interval));
+`,
+ totalStmtClusterExecCount,
+ aggTs,
+ topLimit,
+ )
+
+ if err != nil {
+ return err
+ }
+
+ // Select the top 500 (controlled by sql.stats.activity.top.max) for each of
+ // execution_count, total execution time, service_latency, cpu_sql_nanos,
+ // contention_time, p99_latency. Also include all statements that are in the
+ // top N transactions. This is needed so the statement information is
+ // available for the ui so a user can see what is in the transaction.
+ _, err = u.db.Executor().ExecEx(ctx,
+ "activity-flush-stmt-transfer-tops",
+ nil, /* txn */
+ sessiondata.NodeUserSessionDataOverride,
+ `
+UPSERT
+INTO system.public.statement_activity
+(aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name,
+ agg_interval, metadata, statistics, plan, index_recommendations, execution_count,
+ execution_total_seconds, execution_total_cluster_seconds,
+ contention_time_avg_seconds,
+ cpu_sql_avg_nanos,
+ service_latency_avg_seconds, service_latency_p99_seconds)
+ (SELECT aggregated_ts,
+ fingerprint_id,
+ transaction_fingerprint_id,
+ plan_hash,
+ app_name,
+ agg_interval,
+ metadata,
+ statistics,
+ plan,
+ index_recommendations,
+ (statistics -> 'execution_statistics' ->> 'cnt')::int,
+ ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float),
+ $1 AS execution_total_cluster_seconds,
+ COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0),
+ COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0),
+ (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float,
+ COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0)
+ FROM (SELECT max(ss.aggregated_ts) AS aggregated_ts,
+ ss.fingerprint_id,
+ ss.transaction_fingerprint_id,
+ ss.plan_hash,
+ ss.app_name,
+ ss.agg_interval,
+ crdb_internal.merge_stats_metadata(array_agg(ss.metadata)) AS metadata,
+ crdb_internal.merge_statement_stats(array_agg(ss.statistics)) AS statistics,
+ ss.plan,
+ ss.index_recommendations
+ FROM system.public.statement_statistics ss
+ inner join (SELECT fingerprint_id, app_name
+ FROM (SELECT fingerprint_id, app_name,
+ row_number()
+ OVER (ORDER BY (statistics -> 'execution_statistics' ->> 'cnt')::int desc) AS ePos,
+ row_number()
+ OVER (ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float desc) AS sPos,
+ row_number() OVER (ORDER BY
+ ((statistics -> 'execution_statistics' ->> 'cnt')::float) *
+ ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float) desc) AS tPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float,
+ 0) desc) AS cPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float,
+ 0) desc) AS uPos,
+ row_number() OVER (ORDER BY COALESCE(
+ (statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float,
+ 0) desc) AS lPos
+ FROM (SELECT fingerprint_id,
+ app_name,
+ crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
+ FROM system.public.statement_statistics
+ WHERE aggregated_ts = $2 and
+ app_name not like '$ internal%'
+ GROUP BY app_name,
+ fingerprint_id))
+ WHERE ePos < $3
+ or sPos < $3
+ or tPos < $3
+ or cPos < $3
+ or uPos < $3
+ or lPos < $3) agg on agg.app_name = ss.app_name and agg.fingerprint_id = ss.fingerprint_id
+ WHERE aggregated_ts = $2
+ GROUP BY ss.app_name,
+ ss.fingerprint_id,
+ ss.transaction_fingerprint_id,
+ ss.plan_hash,
+ ss.agg_interval,
+ ss.plan,
+ ss.index_recommendations));
+`,
+ totalTxnClusterExecCount,
+ aggTs,
+ topLimit,
+ )
+
+ return err
+}
+
+// getAosExecutionCount is used to get the row counts of both the
+// system.statement_statistics and system.transaction_statistics.
+// It also gets the total execution count for the specified aggregated
+// timestamp.
+func (u *SqlActivityUpdater) getAostExecutionCount(
+ ctx context.Context, aggTs time.Time,
+) (
+ stmtRowCount int64,
+ txnRowCount int64,
+ totalStmtClusterExecCount int64,
+ totalTxnClusterExecCount int64,
+ retErr error,
+) {
+ it, err := u.db.Executor().QueryIteratorEx(ctx,
+ "activity-flush-count",
+ nil, /* txn */
+ sessiondata.NodeUserSessionDataOverride,
+ `
+ SELECT row_count, ex_sum FROM (SELECT
+ count_rows():::int AS row_count,
+ COALESCE(sum(execution_count)::int, 0) AS ex_sum
+ FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()
+ WHERE app_name not like '$ internal%' and aggregated_ts = $1
+ union all
+ SELECT
+ count_rows():::int AS row_count,
+ COALESCE(sum(execution_count)::int, 0) AS ex_sum
+ FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()
+ WHERE app_name not like '$ internal%' and aggregated_ts = $1) AS OF SYSTEM TIME follower_read_timestamp()`,
+ aggTs,
+ )
+
+ if err != nil {
+ return -1, -1, -1, -1, err
+ }
+
+ defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()
+
+ stmtRowCount, totalStmtClusterExecCount, err = u.getExecutionCountFromRow(ctx, it)
+ if err != nil {
+ return -1, -1, -1, -1, err
+ }
+
+ txnRowCount, totalTxnClusterExecCount, err = u.getExecutionCountFromRow(ctx, it)
+ return stmtRowCount, txnRowCount, totalStmtClusterExecCount, totalTxnClusterExecCount, err
+}
+
+func (u *SqlActivityUpdater) getExecutionCountFromRow(
+ ctx context.Context, iter isql.Rows,
+) (rowCount int64, totalExecutionCount int64, err error) {
+ ok, err := iter.Next(ctx)
+ if err != nil {
+ return -1, -1, err
+ }
+
+ if !ok {
+ return -1, -1, fmt.Errorf("no rows in activity-flush-count")
+ }
+
+ row := iter.Cur()
+ if row[0] == tree.DNull || row[1] == tree.DNull {
+ return 0, 0, nil
+ }
+
+ return int64(tree.MustBeDInt(row[0])), int64(tree.MustBeDInt(row[1])), nil
+}
+
+// ComputeAggregatedTs returns the aggregation timestamp to assign
+// in-memory SQL stats during storage or aggregation.
+func (u *SqlActivityUpdater) computeAggregatedTs(sv *settings.Values) time.Time {
+ interval := persistedsqlstats.SQLStatsAggregationInterval.Get(sv)
+
+ now := timeutil.Now()
+ aggTs := now.Truncate(interval)
+ return aggTs
+}
+
+// compactActivityTables is used delete rows FROM the activity tables
+// to keep the tables under the specified config limit.
+func (u *SqlActivityUpdater) compactActivityTables(ctx context.Context, maxRowCount int64) error {
+ rowCount, err := u.getTableRowCount(ctx, "system.statement_activity")
+ if err != nil {
+ return err
+ }
+
+ if rowCount < maxRowCount {
+ return nil
+ }
+
+ // Delete all the rows FROM the aggregated_ts to avoid
+ // showing partial data for a time range.
+ _, err = u.db.Executor().ExecEx(ctx,
+ "activity-stmt-compaction",
+ nil, /* txn */
+ sessiondata.NodeUserSessionDataOverride,
+ `
+ DELETE
+FROM system.statement_activity
+WHERE aggregated_ts IN (SELECT DISTINCT aggregated_ts FROM (SELECT aggregated_ts FROM system.statement_activity ORDER BY aggregated_ts ASC limit $1));`,
+ rowCount-maxRowCount,
+ )
+
+ if err != nil {
+ return err
+ }
+
+ // Delete all the rows older than on the oldest statement_activity aggregated_ts.
+ // This makes sure that the 2 tables are always in sync.
+ _, err = u.db.Executor().ExecEx(ctx,
+ "activity-txn-compaction",
+ nil, /* txn */
+ sessiondata.NodeUserSessionDataOverride,
+ `
+ DELETE
+FROM system.transaction_activity
+WHERE aggregated_ts not in (SELECT distinct aggregated_ts FROM system.statement_activity);`,
+ )
+
+ return err
+}
+
+// getTableRowCount is used to get the row counts of both the
+// system.statement_statistics and system.transaction_statistics.
+// It also gets the total execution count for the specified aggregated
+// timestamp.
+func (u *SqlActivityUpdater) getTableRowCount(
+ ctx context.Context, tableName string,
+) (rowCount int64, retErr error) {
+ query := fmt.Sprintf(`
+ SELECT
+ count_rows()::int
+ FROM %s AS OF SYSTEM TIME follower_read_timestamp()`, tableName)
+ datums, err := u.db.Executor().QueryRowEx(ctx,
+ "activity-total-count",
+ nil, /* txn */
+ sessiondata.NodeUserSessionDataOverride,
+ query,
+ )
+
+ if err != nil {
+ return 0, err
+ }
+
+ if datums == nil {
+ return 0, nil
+ }
+
+ if datums[0] == tree.DNull {
+ return 0, nil
+ }
+
+ return int64(tree.MustBeDInt(datums[0])), nil
+}
diff --git a/pkg/sql/sql_activity_update_job_test.go b/pkg/sql/sql_activity_update_job_test.go
new file mode 100644
index 000000000000..674740d7e4af
--- /dev/null
+++ b/pkg/sql/sql_activity_update_job_test.go
@@ -0,0 +1,339 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package sql
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/jobs"
+ "github.com/cockroachdb/cockroach/pkg/settings"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
+ "github.com/cockroachdb/cockroach/pkg/testutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/skip"
+ "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/errors"
+ "github.com/stretchr/testify/require"
+)
+
+// TestSqlActivityUpdateJob verifies that the
+// job is created.
+func TestSqlActivityUpdateJob(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ skip.UnderStressRace(t, "test is too slow to run under race")
+
+ // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
+ // Disable the job since it is called manually from a new instance to avoid
+ // any race conditions.
+ ctx := context.Background()
+ srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true,
+ Knobs: base.TestingKnobs{UpgradeManager: &upgradebase.TestingKnobs{
+ DontUseJobs: true,
+ SkipUpdateSQLActivityJobBootstrap: true,
+ }}})
+ defer srv.Stopper().Stop(context.Background())
+ defer db.Close()
+
+ var count int
+ row := db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.transaction_activity")
+ err := row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "transaction_activity: expect:0, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.statement_activity")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "statement_activity: expect:0, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.jobs WHERE job_type = 'AUTO UPDATE SQL ACTIVITY' and id = 103 ")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "jobs: expect:0, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.transaction_statistics")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "transaction_statistics: expect:0, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.statement_statistics")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "statement_statistics: expect:0, actual:%d", count)
+
+ execCfg := srv.ExecutorConfig().(ExecutorConfig)
+ st := cluster.MakeTestingClusterSettings()
+ updater := NewSqlActivityUpdater(st, execCfg.InternalDB)
+
+ // Transient failures from AOST queries: https://github.com/cockroachdb/cockroach/issues/97840
+ testutils.SucceedsWithin(t, func() error {
+ // Verify no error with empty stats
+ return updater.TransferStatsToActivity(ctx)
+ }, 30*time.Second)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.transaction_activity")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "transaction_activity: expect:0, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.statement_activity")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "statement_activity: expect:0, actual:%d", count)
+
+ appName := "TestSqlActivityUpdateJob"
+ _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", appName)
+ require.NoError(t, err)
+
+ _, err = db.ExecContext(ctx, "SELECT 1;")
+ require.NoError(t, err)
+ srv.SQLServer().(*Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
+ srv.SQLServer().(*Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
+
+ _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", "randomIgnore")
+ require.NoError(t, err)
+
+ // The check to calculate the rows uses the follower_read_timestamp which will
+ // skip the upsert because it will see there are no rows.
+ testutils.SucceedsWithin(t, func() error {
+ var txnAggTs time.Time
+ row = db.QueryRowContext(ctx, `SELECT count_rows(), aggregated_ts
+ FROM system.public.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()
+ WHERE app_name = $1
+ GROUP BY aggregated_ts`, appName)
+ err = row.Scan(&count, &txnAggTs)
+ if err != nil {
+ return err
+ }
+ if count <= 0 {
+ return errors.New("Need to wait for row to populate with follower_read_timestamp.")
+ }
+
+ var stmtAggTs time.Time
+ row = db.QueryRowContext(ctx, `SELECT count_rows(), aggregated_ts
+ FROM system.public.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()
+ WHERE app_name = $1
+ GROUP BY aggregated_ts`, appName)
+ err = row.Scan(&count, &stmtAggTs)
+ if err != nil {
+ return err
+ }
+ if count <= 0 {
+ return errors.New("Need to wait for row to populate with follower_read_timestamp.")
+ }
+ require.Equal(t, stmtAggTs, txnAggTs)
+ return nil
+ }, 30*time.Second)
+
+ // Run the updater to add rows to the activity tables
+ // This will use the transfer all scenarios with there only
+ // being a few rows
+ err = updater.TransferStatsToActivity(ctx)
+ require.NoError(t, err)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.transaction_activity WHERE app_name = $1", appName)
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, count, 1, "transaction_activity after transfer: expect:1, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.statement_activity WHERE app_name = $1", appName)
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, count, 1, "statement_activity after transfer: expect:1, actual:%d", count)
+}
+
+// TestSqlActivityUpdateJob verifies that the
+// job is created.
+func TestSqlActivityUpdateTopLimitJob(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ skip.UnderStressRace(t, "test is too slow to run under race")
+
+ // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
+ ctx := context.Background()
+ srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true,
+ Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}})
+ defer srv.Stopper().Stop(context.Background())
+ defer db.Close()
+
+ // Verify all the tables are empty initially
+ var count int
+ row := db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.transaction_activity")
+ err := row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "transaction_activity: expect:0, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.statement_activity")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "statement_activity: expect:0, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.transaction_statistics")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "transaction_statistics: expect:0, actual:%d", count)
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.statement_statistics")
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.Equal(t, 0, count, "statement_statistics: expect:0, actual:%d", count)
+
+ execCfg := srv.ExecutorConfig().(ExecutorConfig)
+ st := cluster.MakeTestingClusterSettings()
+ su := st.MakeUpdater()
+ topLimit := 5
+ err = su.Set(ctx, "sql.stats.activity.top.max", settings.EncodedValue{
+ Value: settings.EncodeInt(int64(topLimit)),
+ Type: "i",
+ })
+ require.NoError(t, err)
+
+ updater := NewSqlActivityUpdater(st, execCfg.InternalDB)
+
+ appNamePrefix := "TestSqlActivityUpdateJobLoop"
+ // Generate 100 unique rows for statistics tables
+ for i := 0; i < 100; i++ {
+ tempAppName := fmt.Sprintf("%s%d", appNamePrefix, i)
+ _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", tempAppName)
+ require.NoError(t, err)
+
+ _, err = db.ExecContext(ctx, "SELECT 1;")
+ require.NoError(t, err)
+ }
+
+ // Need to call it twice to actually cause a flush
+ srv.SQLServer().(*Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
+ srv.SQLServer().(*Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
+
+ _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", "randomIgnore")
+ require.NoError(t, err)
+
+ // The check to calculate the rows uses the follower_read_timestamp which will
+ // skip the upsert because it will see there are no rows.
+ testutils.SucceedsWithin(t, func() error {
+ var txnAggTs time.Time
+ row = db.QueryRowContext(ctx, `SELECT count_rows(), aggregated_ts
+ FROM system.public.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()
+ WHERE app_name LIKE 'TestSqlActivityUpdateJobLoop%'
+ GROUP BY aggregated_ts`)
+ err = row.Scan(&count, &txnAggTs)
+ if err != nil {
+ return err
+ }
+ if count < 100 {
+ return errors.New("Need to wait for row to populate with follower_read_timestamp.")
+ }
+
+ var stmtAggTs time.Time
+ row = db.QueryRowContext(ctx, `SELECT count_rows(), aggregated_ts
+ FROM system.public.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()
+ WHERE app_name LIKE 'TestSqlActivityUpdateJobLoop%'
+ GROUP BY aggregated_ts`)
+ err = row.Scan(&count, &stmtAggTs)
+ if err != nil {
+ return err
+ }
+ if count < 100 {
+ return errors.New("Need to wait for row to populate with follower_read_timestamp.")
+ }
+ require.Equal(t, stmtAggTs, txnAggTs)
+ return nil
+ }, 30*time.Second)
+
+ // Run the updater to add rows to the activity tables
+ // This will use the transfer all scenarios with there only
+ // being a few rows
+ err = updater.TransferStatsToActivity(ctx)
+ require.NoError(t, err)
+
+ maxRows := topLimit * 6 // Number of top columns to select from
+ row = db.QueryRowContext(ctx, `SELECT count_rows()
+ FROM system.public.transaction_activity
+ WHERE app_name LIKE 'TestSqlActivityUpdateJobLoop%'`)
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.LessOrEqual(t, count, maxRows, "transaction_activity after transfer: actual:%d, max:%d", count, maxRows)
+
+ row = db.QueryRowContext(ctx, `SELECT count_rows()
+ FROM system.public.statement_activity
+ WHERE app_name LIKE 'TestSqlActivityUpdateJobLoop%'`)
+ err = row.Scan(&count)
+ require.NoError(t, err)
+ require.LessOrEqual(t, count, maxRows, "statement_activity after transfer: actual:%d, max:%d", count, maxRows)
+}
+
+func TestScheduledSQLStatsCompaction(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ skip.UnderStressRace(t, "test is too slow to run under race")
+
+ // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
+ ctx := context.Background()
+ st := cluster.MakeTestingClusterSettings()
+ srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true,
+ Settings: st,
+ Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}})
+ defer srv.Stopper().Stop(context.Background())
+ defer db.Close()
+ _, err := db.ExecContext(ctx, "SET CLUSTER SETTING sql.stats.flush.interval = '100ms'")
+ require.NoError(t, err)
+ appName := "TestScheduledSQLStatsCompaction"
+ _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", appName)
+ require.NoError(t, err)
+
+ testutils.SucceedsWithin(t, func() error {
+ _, err = db.ExecContext(ctx, "SELECT 1;")
+ require.NoError(t, err)
+
+ row := db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.transaction_activity WHERE app_name = $1", appName)
+ var count int
+ err = row.Scan(&count)
+ if err != nil {
+ return err
+ }
+ if count <= 0 {
+ return fmt.Errorf("transaction_activity is empty: %d", count)
+ }
+
+ row = db.QueryRowContext(ctx, "SELECT count_rows() "+
+ "FROM system.public.statement_activity WHERE app_name = $1", appName)
+ err = row.Scan(&count)
+ if err != nil {
+ return err
+ }
+ if count <= 0 {
+ return fmt.Errorf("statement_activity is empty: %d", count)
+ }
+
+ return nil
+ }, 1*time.Minute)
+}
diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
index ad4ad83bca8d..4aca0403cbc5 100644
--- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
+++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
@@ -47,6 +47,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/retry",
"//pkg/util/stop",
+ "//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//types",
diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go
index 774f7e707106..9652dbb36360 100644
--- a/pkg/sql/sqlstats/persistedsqlstats/provider.go
+++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go
@@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
@@ -67,6 +68,10 @@ type PersistedSQLStats struct {
// exceeded.
memoryPressureSignal chan struct{}
+ // Use the signal the flush completed.
+ flushDoneCallback func()
+ flushMutex syncutil.Mutex
+
lastFlushStarted time.Time
jobMonitor jobMonitor
atomic struct {
@@ -89,6 +94,7 @@ func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats {
cfg: cfg,
memoryPressureSignal: make(chan struct{}),
drain: make(chan struct{}),
+ flushDoneCallback: nil,
}
p.jobMonitor = jobMonitor{
@@ -128,6 +134,12 @@ func (s *PersistedSQLStats) Stop(ctx context.Context) {
s.tasksDoneWG.Wait()
}
+func (s *PersistedSQLStats) SetFlushDoneCallback(callBackFunc func()) {
+ s.flushMutex.Lock()
+ defer s.flushMutex.Unlock()
+ s.flushDoneCallback = callBackFunc
+}
+
// GetController returns the controller of the PersistedSQLStats.
func (s *PersistedSQLStats) GetController(server serverpb.SQLStatusServer) *Controller {
return NewController(s, server, s.cfg.DB)
@@ -173,6 +185,14 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper
}
s.Flush(ctx)
+
+ func() {
+ s.flushMutex.Lock()
+ defer s.flushMutex.Unlock()
+ if s.flushDoneCallback != nil {
+ s.flushDoneCallback()
+ }
+ }()
}
})
if err != nil {
diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go
index 1902e065ffae..d64ddbf2893b 100644
--- a/pkg/ts/catalog/chart_catalog.go
+++ b/pkg/ts/catalog/chart_catalog.go
@@ -3715,6 +3715,17 @@ var charts = []sectionDescription{
"jobs.key_visualizer.resume_retry_error",
},
},
+ {
+ Title: "SQL Activity Updater",
+ Metrics: []string{
+ "jobs.auto_update_sql_activity.fail_or_cancel_completed",
+ "jobs.auto_update_sql_activity.fail_or_cancel_failed",
+ "jobs.auto_update_sql_activity.fail_or_cancel_retry_error",
+ "jobs.auto_update_sql_activity.resume_completed",
+ "jobs.auto_update_sql_activity.resume_failed",
+ "jobs.auto_update_sql_activity.resume_retry_error",
+ },
+ },
{
Title: "Jobs Stats Polling Job",
Metrics: []string{
diff --git a/pkg/upgrade/upgradebase/testing_knobs.go b/pkg/upgrade/upgradebase/testing_knobs.go
index a52b5df92b6d..d52147390bb5 100644
--- a/pkg/upgrade/upgradebase/testing_knobs.go
+++ b/pkg/upgrade/upgradebase/testing_knobs.go
@@ -71,6 +71,11 @@ type TestingKnobs struct {
// AfterRunPermanentUpgrades is called after each call to
// RunPermanentUpgrades.
AfterRunPermanentUpgrades func()
+
+ // SkipUpdateSQLActivityJobBootstrap, if set, disables the
+ // clusterversion.V23_1AddSystemActivityTables upgrade, which prevents a
+ // job from being created.
+ SkipUpdateSQLActivityJobBootstrap bool
}
// ModuleTestingKnobs makes TestingKnobs a base.ModuleTestingKnobs.
diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel
index 5c5e41adcd24..403fd2d1bc56 100644
--- a/pkg/upgrade/upgrades/BUILD.bazel
+++ b/pkg/upgrade/upgrades/BUILD.bazel
@@ -26,6 +26,7 @@ go_library(
"schema_changes.go",
"schemachanger_elements.go",
"sql_stats_ttl.go",
+ "system_activity_update_job.go",
"system_external_connections.go",
"system_job_info.go",
"system_privileges_index_migration.go",
@@ -115,6 +116,7 @@ go_test(
"schema_changes_helpers_test.go",
"schemachanger_elements_test.go",
"sql_stats_ttl_test.go",
+ "system_activity_update_job_test.go",
"system_job_info_test.go",
"system_privileges_index_migration_test.go",
"system_privileges_user_id_migration_test.go",
diff --git a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go
index 83d5cc2ecca8..1acb656c206d 100644
--- a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go
+++ b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go
@@ -19,7 +19,6 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/jobs/metricspoller" // Ensure job implementation is linked.
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
- "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/upgrade"
)
@@ -30,32 +29,15 @@ func createJobsMetricsPollingJob(
return nil
}
return d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
- row, err := d.DB.Executor().QueryRowEx(
- ctx,
- "check for existing job metrics polling job",
- nil,
- sessiondata.InternalExecutorOverride{User: username.RootUserName()},
- "SELECT * FROM system.jobs WHERE id = $1",
- jobs.JobMetricsPollerJobID,
- )
- if err != nil {
- return err
+ jr := jobs.Record{
+ JobID: jobs.JobMetricsPollerJobID,
+ Description: jobspb.TypePollJobsStats.String(),
+ Details: jobspb.PollJobsStatsDetails{},
+ Progress: jobspb.PollJobsStatsProgress{},
+ CreatedBy: &jobs.CreatedByInfo{Name: username.RootUser, ID: username.RootUserID},
+ Username: username.RootUserName(),
+ NonCancelable: true,
}
-
- if row == nil {
- jr := jobs.Record{
- JobID: jobs.JobMetricsPollerJobID,
- Description: jobspb.TypePollJobsStats.String(),
- Details: jobspb.PollJobsStatsDetails{},
- Progress: jobspb.PollJobsStatsProgress{},
- CreatedBy: &jobs.CreatedByInfo{Name: username.RootUser, ID: username.RootUserID},
- Username: username.RootUserName(),
- NonCancelable: true,
- }
- if _, err := d.JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobs.JobMetricsPollerJobID, txn); err != nil {
- return err
- }
- }
- return nil
+ return d.JobRegistry.CreateIfNotExistAdoptableJobWithTxn(ctx, jr, txn)
})
}
diff --git a/pkg/upgrade/upgrades/key_visualizer_migration.go b/pkg/upgrade/upgrades/key_visualizer_migration.go
index 11e77e2878e1..c14130f51358 100644
--- a/pkg/upgrade/upgrades/key_visualizer_migration.go
+++ b/pkg/upgrade/upgrades/key_visualizer_migration.go
@@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/upgrade"
)
@@ -81,25 +82,9 @@ func keyVisualizerTablesMigration(
NonCancelable: true, // The job can't be canceled, but it can be paused.
}
- // Make sure job with id doesn't already exist in system.jobs.
- row, err := d.DB.Executor().QueryRowEx(
- ctx,
- "check for existing key visualizer job",
- nil,
- sessiondata.InternalExecutorOverride{User: username.RootUserName()},
- "SELECT * FROM system.jobs WHERE id = $1",
- record.JobID,
- )
- if err != nil {
- return err
- }
-
- // If there isn't a row for the key visualizer job, create the job.
- if row == nil {
- if _, err := d.JobRegistry.CreateAdoptableJobWithTxn(ctx, record, record.JobID, nil); err != nil {
- return err
- }
- }
+ return d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
+ return d.JobRegistry.CreateIfNotExistAdoptableJobWithTxn(ctx, record, txn)
+ })
}
return nil
diff --git a/pkg/upgrade/upgrades/system_activity_update_job.go b/pkg/upgrade/upgrades/system_activity_update_job.go
new file mode 100644
index 000000000000..e47251aee7b5
--- /dev/null
+++ b/pkg/upgrade/upgrades/system_activity_update_job.go
@@ -0,0 +1,46 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package upgrades
+
+import (
+ "context"
+
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
+ "github.com/cockroachdb/cockroach/pkg/jobs"
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
+ "github.com/cockroachdb/cockroach/pkg/security/username"
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
+ "github.com/cockroachdb/cockroach/pkg/upgrade"
+)
+
+// createActivityUpdateJobMigration creates the job to update the
+// system.statement_activity and system.transaction_activity tables.
+func createActivityUpdateJobMigration(
+ ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps,
+) error {
+
+ if d.TestingKnobs != nil && d.TestingKnobs.SkipUpdateSQLActivityJobBootstrap {
+ return nil
+ }
+
+ record := jobs.Record{
+ JobID: jobs.SqlActivityUpdaterJobID,
+ Description: "sql activity job",
+ Username: username.NodeUserName(),
+ Details: jobspb.AutoUpdateSQLActivityDetails{},
+ Progress: jobspb.AutoConfigRunnerProgress{},
+ NonCancelable: true, // The job can't be canceled, but it can be paused.
+ }
+
+ return d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
+ return d.JobRegistry.CreateIfNotExistAdoptableJobWithTxn(ctx, record, txn)
+ })
+}
diff --git a/pkg/upgrade/upgrades/system_activity_update_job_test.go b/pkg/upgrade/upgrades/system_activity_update_job_test.go
new file mode 100644
index 000000000000..3d509f246f49
--- /dev/null
+++ b/pkg/upgrade/upgrades/system_activity_update_job_test.go
@@ -0,0 +1,73 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package upgrades_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
+ "github.com/cockroachdb/cockroach/pkg/server"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/testutils/skip"
+ "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
+ "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCreateActivityUpdateJobMigration(t *testing.T) {
+ skip.UnderStressRace(t)
+ defer leaktest.AfterTest(t)()
+ ctx := context.Background()
+
+ settings := cluster.MakeTestingClusterSettingsWithVersions(
+ clusterversion.TestingBinaryVersion,
+ clusterversion.TestingBinaryMinSupportedVersion,
+ false,
+ )
+
+ tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{
+ Settings: settings,
+ Knobs: base.TestingKnobs{
+ Server: &server.TestingKnobs{
+ DisableAutomaticVersionUpgrade: make(chan struct{}),
+ BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion,
+ },
+ },
+ },
+ })
+ defer tc.Stopper().Stop(ctx)
+
+ db := tc.ServerConn(0)
+ defer db.Close()
+
+ // NB: this isn't actually doing anything, since the table is baked into the
+ // bootstrap schema, so this is really just showing the upgrade is idempotent,
+ // but this is in line with the other tests of createSystemTable upgrades.
+ upgrades.Upgrade(
+ t,
+ db,
+ clusterversion.V23_1CreateSystemActivityUpdateJob,
+ nil,
+ false,
+ )
+
+ row := db.QueryRow("SELECT count(*) FROM system.public.jobs WHERE id = 103")
+ assert.NotNil(t, row)
+ assert.NoError(t, row.Err())
+ var count int
+ err := row.Scan(&count)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, count)
+}
diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go
index 09a037ff9ca5..1d2faddea175 100644
--- a/pkg/upgrade/upgrades/upgrades.go
+++ b/pkg/upgrade/upgrades/upgrades.go
@@ -304,6 +304,12 @@ var upgrades = []upgradebase.Upgrade{
toCV(clusterversion.V23_1_TenantIDSequence),
tenantIDSequenceForSystemTenant,
),
+ upgrade.NewPermanentTenantUpgrade(
+ "create sql activity updater job",
+ toCV(clusterversion.V23_1CreateSystemActivityUpdateJob),
+ createActivityUpdateJobMigration,
+ "create statement_activity and transaction_activity job",
+ ),
}
func init() {