From a8f940515d0841ea82b4a029aabb10cab4cc153f Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 18 Apr 2023 21:19:38 -0400 Subject: [PATCH 1/6] kv: deflake and unskip TestRangeLocalUncertaintyLimitAfterNewLease Fixes #99527. Fixes #100491. This commit deflakes and unskips TestRangeLocalUncertaintyLimitAfterNewLease. The test was flaky because it was pausing the clock for some but not all nodes in the cluster. This was because it was accidentally using a 3-node cluster instead of a 2-node cluster, like it was intending. Release note: None --- pkg/kv/kvserver/client_replica_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index ad6267dd1e0c..8ac713f81bc5 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -1964,8 +1964,6 @@ func TestRangeLocalUncertaintyLimitAfterNewLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 100491, "flaky test") - numServers := 2 var manuals []*hlc.HybridManualClock for i := 0; i < numServers; i++ { @@ -1982,7 +1980,7 @@ func TestRangeLocalUncertaintyLimitAfterNewLease(t *testing.T) { } } ctx := context.Background() - tc := testcluster.StartTestCluster(t, numNodes, + tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgsPerNode: serverArgs, From 51df2e1db6d8c06e2a885968e1ca8d530b3dea01 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 18 Apr 2023 21:28:21 -0400 Subject: [PATCH 2/6] kv: remove numNodes global variable As the TODO asserted, this was a terrible variable name to leak into the global scope of a test package. I guess I should have addressed this when I noticed it the first time. Epic: None Release note: None --- pkg/kv/kvserver/client_merge_test.go | 2 +- pkg/kv/kvserver/client_relocate_range_test.go | 2 +- pkg/kv/kvserver/client_replica_test.go | 2 +- pkg/kv/kvserver/closed_timestamp_test.go | 36 +++++++++---------- pkg/kv/kvserver/replica_rangefeed_test.go | 2 +- 5 files changed, 20 insertions(+), 24 deletions(-) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 51c547a413d7..0fe173995273 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -5146,7 +5146,7 @@ func setupClusterWithSubsumedRange( }) require.NoError(t, tc.(*testcluster.TestCluster).WaitForFullReplication()) testutils.SucceedsSoon(t, func() error { - if count := len(replsForRange(ctx, t, tc, newDesc, numNodes)); count != 2 { + if count := len(replsForRange(ctx, t, tc, newDesc)); count != 2 { return errors.Newf("expected %d replicas for range %d; found %d", 2, newDesc.RangeID, count) } return nil diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 9b11cebd63b5..a49646800fbf 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -359,7 +359,7 @@ func TestAdminRelocateRangeFailsWithDuplicates(t *testing.T) { ReplicationMode: base.ReplicationManual, } - tc := testcluster.StartTestCluster(t, numNodes, args) + tc := testcluster.StartTestCluster(t, 3, args) defer tc.Stopper().Stop(ctx) k := keys.MustAddr(tc.ScratchRange(t)) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 8ac713f81bc5..549438a93d09 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2387,7 +2387,7 @@ func getLeaseInfoOrFatal( func TestRemoveLeaseholder(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - tc := testcluster.StartTestCluster(t, numNodes, + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, }) diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 67f0ede4387c..a25e994bdd78 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -95,7 +95,7 @@ func TestClosedTimestampCanServe(t *testing.T) { tc.Target(2)) } - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) ts := tc.Server(0).Clock().Now() baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { @@ -168,7 +168,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) { // Sleep for a sufficiently long time so that reqTS can be closed. time.Sleep(3 * testingTargetDuration) baRead := makeTxnReadBatchForDesc(desc, reqTS) - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -189,7 +189,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { ctx := context.Background() tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) @@ -267,7 +267,7 @@ func TestClosedTimestampCantServeWithConflictingIntent(t *testing.T) { ctx := context.Background() tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) // Write N different intents for the same transaction, where N is the number @@ -373,7 +373,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { ctx := context.Background() tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) // Disable the automatic merging. if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { t.Fatal(err) @@ -410,8 +410,8 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { } // Ensure that we can perform follower reads from all replicas. - lRepls := replsForRange(ctx, t, tc, lr, numNodes) - rRepls := replsForRange(ctx, t, tc, rr, numNodes) + lRepls := replsForRange(ctx, t, tc, lr) + rRepls := replsForRange(ctx, t, tc, rr) // Now immediately query both the ranges and there's 1 value per range. // We need to tolerate RangeNotFound as the split range may not have been // created yet. @@ -426,7 +426,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { // the merged range. merged, err := tc.Server(0).MergeRanges(lr.StartKey.AsRawKey()) require.Nil(t, err) - mergedRepls := replsForRange(ctx, t, tc, merged, numNodes) + mergedRepls := replsForRange(ctx, t, tc, merged) // The hazard here is that a follower is not yet aware of the merge and will // return an error. We'll accept that because a client wouldn't see that error // from distsender. @@ -454,7 +454,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { // drive MaxClosed. tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) @@ -487,7 +487,7 @@ func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) { ctx := context.Background() tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) @@ -534,7 +534,7 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { ctx := context.Background() tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) @@ -576,7 +576,7 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { ctx := context.Background() tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) @@ -717,6 +717,7 @@ func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) { // Set up the closed timestamp timing such that, when we block a merge and // transfer the RHS lease, the closed timestamp advances over the LHS // lease but not over the RHS lease. + const numNodes = 3 tc, _ := setupTestClusterWithDummyRange(t, clusterArgs, "cttest" /* dbName */, "kv" /* tableName */, numNodes) defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) @@ -1041,7 +1042,7 @@ func getFollowerReplicas( rangeDesc roachpb.RangeDescriptor, leaseholder roachpb.ReplicationTarget, ) []*kvserver.Replica { - repls := replsForRange(ctx, t, tc, rangeDesc, numNodes) + repls := replsForRange(ctx, t, tc, rangeDesc) followers := make([]*kvserver.Replica, 0, len(repls)-1) for _, repl := range repls { if repl.StoreID() == leaseholder.StoreID && repl.NodeID() == leaseholder.NodeID { @@ -1113,21 +1114,15 @@ const testingTargetDuration = 300 * time.Millisecond const testingSideTransportInterval = 100 * time.Millisecond -// TODO(nvanbenschoten): this is a pretty bad variable name to leak into the -// global scope of the kvserver_test package. At least one test was using it -// unintentionally. Remove it. -const numNodes = 3 - func replsForRange( ctx context.Context, t *testing.T, tc serverutils.TestClusterInterface, desc roachpb.RangeDescriptor, - numNodes int, ) (repls []*kvserver.Replica) { testutils.SucceedsSoon(t, func() error { repls = nil - for i := 0; i < numNodes; i++ { + for i := 0; i < tc.NumServers(); i++ { repl, _, err := tc.Server(i).GetStores().(*kvserver.Stores).GetReplicaForRangeID(ctx, desc.RangeID) if err != nil { return err @@ -1199,6 +1194,7 @@ func setupClusterForClosedTSTesting( clusterArgs base.TestClusterArgs, dbName, tableName string, ) (tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor) { + const numNodes = 3 tc, desc := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes) sqlRunner := sqlutils.MakeSQLRunner(tc.ServerConn(0)) sqlRunner.ExecMultiple(t, strings.Split(fmt.Sprintf(` diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 58c002b72331..4999c5831963 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1118,7 +1118,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { ctx := context.Background() tc, db, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) - repls := replsForRange(ctx, t, tc, desc, numNodes) + repls := replsForRange(ctx, t, tc, desc) sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) From 5588516af1cf4bb6149353b76ccb818643dc2fe9 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 19 Apr 2023 18:26:07 -0700 Subject: [PATCH 3/6] go.mod: bump Pebble to 6002e39ce756 6002e39c db: fix bug with restricted checkpoints 7fdab075 objstorage: add mutex in sharedReadable 4bfe993a objstorage: simplify ReadAt for objects ed8f90b0 sstable: don't try to read past end of object bd6e947b vfs: fix MemFS ReadAt EOF behavior 914e8582 objstorage: test shared objects with TestNotExistError 101876aa *: add skip-shared iteration mode to ScanInternal d54c3292 objstorage: fix bug in manifest 20d15bd8 tool: add EnableSharedStorage method 60cfeb46 metamorphic: export package 4e468412 sstable: include SINGLEDELs within NumDeletions table property 691db988 db: create new type for file numbers on disk fa2c2ec6 internal/base: add parsing for human-readable INGESTSST,LOGDATA kinds c0ccd694 internal/base: correct InternalKeyKind prefix on IngestSST key kind Release note: None Epic: none --- DEPS.bzl | 6 +++--- build/bazelutil/distdir_files.bzl | 2 +- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 47a16c7519b8..a85e5c14be36 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1565,10 +1565,10 @@ def go_deps(): patches = [ "@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "d8be5ce8a593880b4284a00d33f8bd28204b03f0d0979db850893eb76e5aac56", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20230411154528-23c3eabc394b", + sha256 = "dd7aa940304f7474dd3daa431a08eb44bb3a5821a9d70d932ad267d4a619815c", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20230420011906-6002e39ce756", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20230411154528-23c3eabc394b.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20230420011906-6002e39ce756.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 0f4bd22f17a3..12cb338ca502 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -313,7 +313,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/go-test-teamcity/com_github_cockroachdb_go_test_teamcity-v0.0.0-20191211140407-cff980ad0a55.zip": "bac30148e525b79d004da84d16453ddd2d5cd20528e9187f1d7dac708335674b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20230118201751-21c54148d20b.zip": "ca7776f47e5fecb4c495490a679036bfc29d95bd7625290cfdb9abb0baf97476", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20230411154528-23c3eabc394b.zip": "d8be5ce8a593880b4284a00d33f8bd28204b03f0d0979db850893eb76e5aac56", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20230420011906-6002e39ce756.zip": "dd7aa940304f7474dd3daa431a08eb44bb3a5821a9d70d932ad267d4a619815c", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.3.zip": "7778b1e4485e4f17f35e5e592d87eb99c29e173ac9507801d000ad76dd0c261e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/sentry-go/com_github_cockroachdb_sentry_go-v0.6.1-cockroachdb.2.zip": "fbb2207d02aecfdd411b1357efe1192dbb827959e36b7cab7491731ac55935c9", diff --git a/go.mod b/go.mod index 042c86a558c8..09ec62f0749e 100644 --- a/go.mod +++ b/go.mod @@ -116,7 +116,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.19.0 github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b - github.com/cockroachdb/pebble v0.0.0-20230411154528-23c3eabc394b + github.com/cockroachdb/pebble v0.0.0-20230420011906-6002e39ce756 github.com/cockroachdb/redact v1.1.3 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b diff --git a/go.sum b/go.sum index eade945a8872..792980ef1d92 100644 --- a/go.sum +++ b/go.sum @@ -485,8 +485,8 @@ github.com/cockroachdb/gostdlib v1.19.0/go.mod h1:+dqqpARXbE/gRDEhCak6dm0l14AaTy github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= -github.com/cockroachdb/pebble v0.0.0-20230411154528-23c3eabc394b h1:ZRIAKpkJSYLzRLYqqGVHItpZNJTjDlIZ/aJZKb6rjUQ= -github.com/cockroachdb/pebble v0.0.0-20230411154528-23c3eabc394b/go.mod h1:9lRMC4XN3/BLPtIp6kAKwIaHu369NOf2rMucPzipz50= +github.com/cockroachdb/pebble v0.0.0-20230420011906-6002e39ce756 h1:NgUWWgwtcPD4JjclHKE6GtJGJmZm0mWHfGjomCA4CsY= +github.com/cockroachdb/pebble v0.0.0-20230420011906-6002e39ce756/go.mod h1:9lRMC4XN3/BLPtIp6kAKwIaHu369NOf2rMucPzipz50= github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd h1:KFOt5I9nEKZgCnOSmy8r4Oykh8BYQO8bFOTgHDS8YZA= From c3cf3e60ea757d65973c3c0fd7b1f69621862902 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 19 Apr 2023 18:32:07 -0700 Subject: [PATCH 4/6] storage: enable TestCreateCheckpoint_SpanConstrained The problem was fixed in Pebble. Fixes: #100935 Release note: None --- pkg/storage/engine_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index ca2d51e8c5cb..8a9b28166028 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -34,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1036,7 +1035,6 @@ func TestCreateCheckpoint(t *testing.T) { func TestCreateCheckpoint_SpanConstrained(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 100935) ctx := context.Background() rng, _ := randutil.NewTestRand() From 53afe66d3d4d3b1647f3fcda73fe0e66833e81eb Mon Sep 17 00:00:00 2001 From: j82w Date: Fri, 31 Mar 2023 17:47:51 -0400 Subject: [PATCH 5/6] jobs: add job to populate system activity tables The system statistics table grow too large for the ui to quickly return results. The new activity tables job does the aggregation and only records the top 500 of each of the 6 columns. This means for a given hour there is a limit of 3000 rows. This allows the ui to return results fast and reliably. If the job detects there are less than 3k rows it will just copy all the rows to the activity tables. Epic: none closes: #98882 Release note (sql change): Adds a new sql activity updater job. The job updates the system transaction_activity and statement_activity tables based on the statistics tables. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/cli/testdata/doctor/test_examine_cluster | 2 +- pkg/clusterversion/cockroach_versions.go | 9 + pkg/jobs/jobs_test.go | 1 + pkg/jobs/jobspb/jobs.proto | 11 +- pkg/jobs/jobspb/wrap.go | 16 +- pkg/jobs/registry.go | 41 + pkg/jobs/registry_test.go | 44 +- pkg/sql/BUILD.bazel | 3 + .../exec/execbuilder/testdata/observability | 710 ++++++++++++++++++ .../execbuilder/tests/local/generated_test.go | 7 + pkg/sql/sql_activity_update_job.go | 669 +++++++++++++++++ pkg/sql/sql_activity_update_job_test.go | 339 +++++++++ .../sqlstats/persistedsqlstats/BUILD.bazel | 1 + .../sqlstats/persistedsqlstats/provider.go | 20 + pkg/ts/catalog/chart_catalog.go | 11 + pkg/upgrade/upgradebase/testing_knobs.go | 5 + pkg/upgrade/upgrades/BUILD.bazel | 2 + .../create_jobs_metrics_polling_job.go | 36 +- .../upgrades/key_visualizer_migration.go | 23 +- .../upgrades/system_activity_update_job.go | 46 ++ .../system_activity_update_job_test.go | 73 ++ pkg/upgrade/upgrades/upgrades.go | 6 + 24 files changed, 2024 insertions(+), 53 deletions(-) create mode 100644 pkg/sql/opt/exec/execbuilder/testdata/observability create mode 100644 pkg/sql/sql_activity_update_job.go create mode 100644 pkg/sql/sql_activity_update_job_test.go create mode 100644 pkg/upgrade/upgrades/system_activity_update_job.go create mode 100644 pkg/upgrade/upgrades/system_activity_update_job_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 8f07ebfe715f..5e3d61fd422b 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -253,6 +253,7 @@ sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if ena sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job tenant-ro sql.show_ranges_deprecated_behavior.enabled boolean true if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES. tenant-rw sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators tenant-rw +sql.stats.activity.persisted_rows.max integer 200000 maximum number of rows of statement and transaction activity that will be persisted in the system tables tenant-rw sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode tenant-rw sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh tenant-rw sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 622d41a2bd76..cdbc7f5a46c0 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -205,6 +205,7 @@
sql.schema.telemetry.recurrence
string@weeklycron-tab recurrence for SQL schema telemetry jobServerless/Dedicated/Self-Hosted (read-only)
sql.show_ranges_deprecated_behavior.enabled
booleantrueif set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES.Serverless/Dedicated/Self-Hosted
sql.spatial.experimental_box2d_comparison_operators.enabled
booleanfalseenables the use of certain experimental box2d comparison operatorsServerless/Dedicated/Self-Hosted +
sql.stats.activity.persisted_rows.max
integer200000maximum number of rows of statement and transaction activity that will be persisted in the system tablesServerless/Dedicated/Self-Hosted
sql.stats.automatic_collection.enabled
booleantrueautomatic statistics collection modeServerless/Dedicated/Self-Hosted
sql.stats.automatic_collection.fraction_stale_rows
float0.2target fraction of stale rows per table that will trigger a statistics refreshServerless/Dedicated/Self-Hosted
sql.stats.automatic_collection.min_stale_rows
integer500target minimum number of stale rows per table that will trigger a statistics refreshServerless/Dedicated/Self-Hosted diff --git a/pkg/cli/testdata/doctor/test_examine_cluster b/pkg/cli/testdata/doctor/test_examine_cluster index d1f9f1bb36e1..dbee8d9b3eac 100644 --- a/pkg/cli/testdata/doctor/test_examine_cluster +++ b/pkg/cli/testdata/doctor/test_examine_cluster @@ -3,5 +3,5 @@ debug doctor examine cluster debug doctor examine cluster Examining 58 descriptors and 57 namespace entries... ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none -Examining 18 jobs... +Examining 20 jobs... ERROR: validation failed diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 573048dbe5d4..09b66ee7830c 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -516,6 +516,11 @@ const ( // was introduced. V23_1_TenantIDSequence + // V23_1CreateSystemActivityUpdateJob is the version at which Cockroach adds a + // job that periodically updates the statement_activity and transaction_activity. + // tables. + V23_1CreateSystemActivityUpdateJob + // ********************************************************** // ** If we haven't yet selected a final 23.1 RC candidate ** // Step 1a: Add new versions for release-23.1 branch above here. @@ -912,6 +917,10 @@ var rawVersionsSingleton = keyedVersions{ Key: V23_1_TenantIDSequence, Version: roachpb.Version{Major: 22, Minor: 2, Internal: 100}, }, + { + Key: V23_1CreateSystemActivityUpdateJob, + Version: roachpb.Version{Major: 22, Minor: 2, Internal: 102}, + }, // ********************************************************** // ** If we haven't yet selected a final 23.1 RC candidate ** diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 44201680c664..5f3386ecf33d 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -240,6 +240,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) { DontUseJobs: true, SkipJobMetricsPollingJobBootstrap: true, SkipAutoConfigRunnerJobBootstrap: true, + SkipUpdateSQLActivityJobBootstrap: true, } args.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index d3ae6951f08f..cae15f376987 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1196,6 +1196,12 @@ message AutoConfigTaskDetails { message AutoConfigTaskProgress { } +message AutoUpdateSQLActivityDetails { +} + +message AutoUpdateSQLActivityProgress { +} + message Payload { string description = 1; // If empty, the description is assumed to be the statement. @@ -1257,6 +1263,7 @@ message Payload { AutoConfigRunnerDetails auto_config_runner = 41; AutoConfigEnvRunnerDetails auto_config_env_runner = 42; AutoConfigTaskDetails auto_config_task = 43; + AutoUpdateSQLActivityDetails auto_update_sql_activities = 44; } reserved 26; // PauseReason is used to describe the reason that the job is currently paused @@ -1284,7 +1291,7 @@ message Payload { // specifies how old such record could get before this job is canceled. int64 maximum_pts_age = 40 [(gogoproto.casttype) = "time.Duration", (gogoproto.customname) = "MaximumPTSAge"]; - // NEXT ID: 44 + // NEXT ID: 45 } message Progress { @@ -1329,6 +1336,7 @@ message Progress { AutoConfigRunnerProgress auto_config_runner = 29; AutoConfigEnvRunnerProgress auto_config_env_runner = 30; AutoConfigTaskProgress auto_config_task = 31; + AutoUpdateSQLActivityProgress update_sql_activity = 32; } uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"]; @@ -1363,6 +1371,7 @@ enum Type { AUTO_CONFIG_RUNNER = 20 [(gogoproto.enumvalue_customname) = "TypeAutoConfigRunner"]; AUTO_CONFIG_ENV_RUNNER = 21 [(gogoproto.enumvalue_customname) = "TypeAutoConfigEnvRunner"]; AUTO_CONFIG_TASK = 22 [(gogoproto.enumvalue_customname) = "TypeAutoConfigTask"]; + AUTO_UPDATE_SQL_ACTIVITY = 23 [(gogoproto.enumvalue_customname) = "TypeAutoUpdateSQLActivity"]; } message Job { diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index 7855a6035605..bc9c4e999f30 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -51,6 +51,7 @@ var ( _ Details = AutoConfigRunnerDetails{} _ Details = AutoConfigEnvRunnerDetails{} _ Details = AutoConfigTaskDetails{} + _ Details = AutoUpdateSQLActivityDetails{} ) // ProgressDetails is a marker interface for job progress details proto structs. @@ -74,6 +75,7 @@ var ( _ ProgressDetails = AutoConfigRunnerProgress{} _ ProgressDetails = AutoConfigEnvRunnerProgress{} _ ProgressDetails = AutoConfigTaskProgress{} + _ ProgressDetails = AutoUpdateSQLActivityProgress{} ) // Type returns the payload's job type and panics if the type is invalid. @@ -156,6 +158,7 @@ var AutomaticJobTypes = [...]Type{ TypeAutoConfigEnvRunner, TypeAutoConfigTask, TypeKeyVisualizer, + TypeAutoUpdateSQLActivity, } // DetailsType returns the type for a payload detail. @@ -207,6 +210,8 @@ func DetailsType(d isPayload_Details) (Type, error) { return TypeAutoConfigEnvRunner, nil case *Payload_AutoConfigTask: return TypeAutoConfigTask, nil + case *Payload_AutoUpdateSqlActivities: + return TypeAutoUpdateSQLActivity, nil default: return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d) } @@ -250,6 +255,7 @@ var JobDetailsForEveryJobType = map[Type]Details{ TypeAutoConfigRunner: AutoConfigRunnerDetails{}, TypeAutoConfigEnvRunner: AutoConfigEnvRunnerDetails{}, TypeAutoConfigTask: AutoConfigTaskDetails{}, + TypeAutoUpdateSQLActivity: AutoUpdateSQLActivityDetails{}, } // WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper @@ -303,6 +309,8 @@ func WrapProgressDetails(details ProgressDetails) interface { return &Progress_AutoConfigEnvRunner{AutoConfigEnvRunner: &d} case AutoConfigTaskProgress: return &Progress_AutoConfigTask{AutoConfigTask: &d} + case AutoUpdateSQLActivityProgress: + return &Progress_UpdateSqlActivity{UpdateSqlActivity: &d} default: panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d)) } @@ -354,6 +362,8 @@ func (p *Payload) UnwrapDetails() Details { return *d.AutoConfigEnvRunner case *Payload_AutoConfigTask: return *d.AutoConfigTask + case *Payload_AutoUpdateSqlActivities: + return *d.AutoUpdateSqlActivities default: return nil } @@ -405,6 +415,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails { return *d.AutoConfigEnvRunner case *Progress_AutoConfigTask: return *d.AutoConfigTask + case *Progress_UpdateSqlActivity: + return *d.UpdateSqlActivity default: return nil } @@ -480,6 +492,8 @@ func WrapPayloadDetails(details Details) interface { return &Payload_AutoConfigEnvRunner{AutoConfigEnvRunner: &d} case AutoConfigTaskDetails: return &Payload_AutoConfigTask{AutoConfigTask: &d} + case AutoUpdateSQLActivityDetails: + return &Payload_AutoUpdateSqlActivities{AutoUpdateSqlActivities: &d} default: panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d)) } @@ -515,7 +529,7 @@ const ( func (Type) SafeValue() {} // NumJobTypes is the number of jobs types. -const NumJobTypes = 23 +const NumJobTypes = 24 // ChangefeedDetailsMarshaler allows for dependency injection of // cloud.SanitizeExternalStorageURI to avoid the dependency from this diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 4f876cdb3161..884cf80839fb 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -301,6 +301,9 @@ const ( // AutoConfigRunnerJobID A static job ID is used for the auto config runner job. AutoConfigRunnerJobID = jobspb.JobID(102) + + // SqlActivityUpdaterJobID A static job ID is used for the SQL activity tables. + SqlActivityUpdaterJobID = jobspb.JobID(103) ) // MakeJobID generates a new job ID. @@ -692,6 +695,44 @@ func (r *Registry) CreateJobWithTxn( return j, nil } +// CreateIfNotExistAdoptableJobWithTxn checks if a job already exists in +// the system.jobs table, and if it does not it will create the job. The job +// will be adopted for execution at a later time by some node in the cluster. +func (r *Registry) CreateIfNotExistAdoptableJobWithTxn( + ctx context.Context, record Record, txn isql.Txn, +) error { + if record.JobID == 0 { + return fmt.Errorf("invalid record.JobID value: %d", record.JobID) + } + + if txn == nil { + return fmt.Errorf("txn is required for job: %d", record.JobID) + } + + // Make sure job with id doesn't already exist in system.jobs. + // Use a txn to avoid race conditions + row, err := txn.QueryRowEx( + ctx, + "check if job exists", + txn.KV(), + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "SELECT id FROM system.jobs WHERE id = $1", + record.JobID, + ) + if err != nil { + return err + } + + // If there isn't a row for the job, create the job. + if row == nil { + if _, err = r.CreateAdoptableJobWithTxn(ctx, record, record.JobID, txn); err != nil { + return err + } + } + + return nil +} + // CreateAdoptableJobWithTxn creates a job which will be adopted for execution // at a later time by some node in the cluster. func (r *Registry) CreateAdoptableJobWithTxn( diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 77f45dec2e84..c0a5b1953813 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -236,22 +236,23 @@ INSERT INTO t."%s" VALUES('a', 'foo'); newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute), StatusCanceled, mutOptions) + sqlActivityJob := fmt.Sprintf("%d", SqlActivityUpdaterJobID) db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob}, + {sqlActivityJob}, {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob}, {newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil { t.Fatal(err) } db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob}, - {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) + {sqlActivityJob}, {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, + {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil { t.Fatal(err) } db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}}) + {sqlActivityJob}, {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}}) // Delete the revert failed, and running jobs for the next run of the // test. @@ -280,6 +281,7 @@ func TestRegistryGCPagination(t *testing.T) { DontUseJobs: true, SkipJobMetricsPollingJobBootstrap: true, SkipAutoConfigRunnerJobBootstrap: true, + SkipUpdateSQLActivityJobBootstrap: true, }, KeyVisualizer: &keyvisualizer.TestingKnobs{ SkipJobBootstrap: true, @@ -447,6 +449,39 @@ func TestCreateJobWritesToJobInfo(t *testing.T) { })) runTests(t, createdJob) }) + + t.Run("CreateIfNotExistAdoptableJobWithTxn", func(t *testing.T) { + tempRecord := Record{ + JobID: r.MakeJobID(), + Details: jobspb.ImportDetails{}, + Progress: jobspb.ImportProgress{}, + Username: username.RootUserName(), + } + + // loop to verify no errors if create if not exist is called multiple times + for i := 0; i < 3; i++ { + err := ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return r.CreateIfNotExistAdoptableJobWithTxn(ctx, tempRecord, txn) + }) + require.NoError(t, err) + } + + require.NoError(t, ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + row, err := txn.QueryRowEx( + ctx, + "check if job exists", + txn.KV(), + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "SELECT id FROM system.jobs WHERE id = $1", + tempRecord.JobID, + ) + if err != nil { + return err + } + require.NotNil(t, row) + return nil + })) + }) } func TestBatchJobsCreation(t *testing.T) { @@ -665,6 +700,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { DontUseJobs: true, SkipJobMetricsPollingJobBootstrap: true, SkipAutoConfigRunnerJobBootstrap: true, + SkipUpdateSQLActivityJobBootstrap: true, }, KeyVisualizer: &keyvisualizer.TestingKnobs{ SkipJobBootstrap: true, diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 93521d273d0e..0b828893168a 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -236,6 +236,7 @@ go_library( "sort.go", "split.go", "spool.go", + "sql_activity_update_job.go", "sql_cursor.go", "statement.go", "subquery.go", @@ -674,6 +675,7 @@ go_test( "show_trace_replica_test.go", "sort_test.go", "split_test.go", + "sql_activity_update_job_test.go", "sql_cursor_test.go", "sql_prepare_test.go", "statement_mark_redaction_test.go", @@ -817,6 +819,7 @@ go_test( "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/sqlstats", + "//pkg/sql/sqlstats/persistedsqlstats", "//pkg/sql/sqltestutils", "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", diff --git a/pkg/sql/opt/exec/execbuilder/testdata/observability b/pkg/sql/opt/exec/execbuilder/testdata/observability new file mode 100644 index 000000000000..d2d0d5735159 --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/observability @@ -0,0 +1,710 @@ +# LogicTest: local + +# Generates the explain plans the sql_activity_update_job uses to update +# transaction_activity and statement_activity tables + +statement ok +set enable_zigzag_join = false + +statement ok +INSERT INTO system.users VALUES ('node', NULL, true, 3) + +statement ok +GRANT node TO root + +# Upsert all transaction_activity +query T retry +EXPLAIN (VERBOSE) UPSERT INTO system.public.transaction_activity + (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, + statistics, query, execution_count, execution_total_seconds, + execution_total_cluster_seconds, contention_time_avg_seconds, + cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) + (SELECT aggregated_ts, + fingerprint_id, + app_name, + agg_interval, + metadata, + statistics, + '' AS query, + (statistics->'execution_statistics'->>'cnt')::int, + ((statistics->'execution_statistics'->>'cnt')::float)*((statistics->'statistics'->'svcLat'->>'mean')::float), + 100 AS execution_total_cluster_seconds, + COALESCE((statistics->'execution_statistics'->'contentionTime'->>'mean')::float,0), + COALESCE((statistics->'execution_statistics'->'cpu_sql_nanos'->>'mean')::float,0), + (statistics->'statistics'->'svcLat'->>'mean')::float, + COALESCE((statistics->'statistics'->'latencyInfo'->>'p99')::float, 0) + FROM (SELECT + max(aggregated_ts) AS aggregated_ts, + app_name, + fingerprint_id, + agg_interval, + crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, + crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics + FROM system.public.transaction_statistics + WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00' + and app_name not like '$ internal%' + GROUP BY app_name, + fingerprint_id, + agg_interval)); +---- +distribution: local +vectorized: true +· +• upsert +│ columns: () +│ estimated row count: 0 (missing stats) +│ into: transaction_activity(aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) +│ auto commit +│ arbiter indexes: primary +│ +└── • project + │ columns: (max, fingerprint_id, app_name, agg_interval, metadata, statistics, query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds, agg_interval, metadata, statistics, query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts) + │ + └── • lookup join (left outer) + │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics, aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) + │ estimated row count: 3 (missing stats) + │ table: transaction_activity@primary + │ equality: (max, fingerprint_id, app_name) = (aggregated_ts,fingerprint_id,app_name) + │ equality cols are key + │ + └── • distinct + │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics) + │ estimated row count: 3 (missing stats) + │ distinct on: fingerprint_id, app_name, max + │ nulls are distinct + │ error on duplicate + │ + └── • render + │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics) + │ render query: '' + │ render int8: ((statistics->'execution_statistics')->>'cnt')::INT8 + │ render ?column?: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render execution_total_cluster_seconds: 100.0 + │ render coalesce: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0) + │ render coalesce: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0) + │ render float8: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render coalesce: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0) + │ render fingerprint_id: fingerprint_id + │ render app_name: app_name + │ render agg_interval: agg_interval + │ render max: max + │ render metadata: metadata + │ render statistics: statistics + │ + └── • render + │ columns: (metadata, statistics, fingerprint_id, app_name, agg_interval, max) + │ render metadata: crdb_internal.merge_stats_metadata(array_agg) + │ render statistics: crdb_internal.merge_transaction_stats(array_agg) + │ render fingerprint_id: fingerprint_id + │ render app_name: app_name + │ render agg_interval: agg_interval + │ render max: max + │ + └── • group (hash) + │ columns: (fingerprint_id, app_name, agg_interval, max, array_agg, array_agg) + │ estimated row count: 3 (missing stats) + │ aggregate 0: max(aggregated_ts) + │ aggregate 1: array_agg(metadata) + │ aggregate 2: array_agg(statistics) + │ group by: fingerprint_id, app_name, agg_interval + │ + └── • index join + │ columns: (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics) + │ estimated row count: 3 (missing stats) + │ table: transaction_statistics@primary + │ key columns: crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id + │ + └── • scan + columns: (aggregated_ts, fingerprint_id, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8) + estimated row count: 3 (missing stats) + table: transaction_statistics@execution_count_idx (partial index) + spans: /2023-04-10T16:00:00Z-/2023-04-10T16:00:00.000000001Z + +# Upsert all statement_activity +query T retry +EXPLAIN (VERBOSE) UPSERT + INTO system.public.statement_activity (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, + agg_interval, metadata, statistics, plan, index_recommendations, execution_count, + execution_total_seconds, execution_total_cluster_seconds, + contention_time_avg_seconds, + cpu_sql_avg_nanos, + service_latency_avg_seconds, service_latency_p99_seconds) + (SELECT aggregated_ts, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + app_name, + agg_interval, + metadata, + statistics, + plan, + index_recommendations, + (statistics -> 'execution_statistics' ->> 'cnt')::int, + ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float), + 100 AS execution_total_cluster_seconds, + COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0), + COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0), + (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float, + COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0) + FROM (SELECT max(aggregated_ts) AS aggregated_ts, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + app_name, + agg_interval, + crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, + crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, + plan, + index_recommendations + FROM system.public.statement_statistics + WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00' + and app_name not like '$ internal%' + GROUP BY app_name, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + agg_interval, + plan, + index_recommendations)); +---- +distribution: local +vectorized: true +· +• upsert +│ columns: () +│ estimated row count: 0 (missing stats) +│ into: statement_activity(aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) +│ auto commit +│ arbiter indexes: primary +│ +└── • project + │ columns: (max, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds, agg_interval, metadata, statistics, plan, index_recommendations, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts) + │ + └── • lookup join (left outer) + │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics, aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) + │ estimated row count: 3 (missing stats) + │ table: statement_activity@primary + │ equality: (max, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name) = (aggregated_ts,fingerprint_id,transaction_fingerprint_id,plan_hash,app_name) + │ equality cols are key + │ + └── • distinct + │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics) + │ estimated row count: 3 (missing stats) + │ distinct on: fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, max + │ nulls are distinct + │ error on duplicate + │ + └── • render + │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics) + │ render int8: ((statistics->'execution_statistics')->>'cnt')::INT8 + │ render ?column?: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render execution_total_cluster_seconds: 100.0 + │ render coalesce: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0) + │ render coalesce: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0) + │ render float8: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render coalesce: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0) + │ render fingerprint_id: fingerprint_id + │ render transaction_fingerprint_id: transaction_fingerprint_id + │ render plan_hash: plan_hash + │ render app_name: app_name + │ render agg_interval: agg_interval + │ render plan: plan + │ render index_recommendations: index_recommendations + │ render max: max + │ render metadata: metadata + │ render statistics: statistics + │ + └── • render + │ columns: (metadata, statistics, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max) + │ render metadata: crdb_internal.merge_stats_metadata(array_agg) + │ render statistics: crdb_internal.merge_statement_stats(array_agg) + │ render fingerprint_id: fingerprint_id + │ render transaction_fingerprint_id: transaction_fingerprint_id + │ render plan_hash: plan_hash + │ render app_name: app_name + │ render agg_interval: agg_interval + │ render plan: plan + │ render index_recommendations: index_recommendations + │ render max: max + │ + └── • group (hash) + │ columns: (fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, array_agg, array_agg) + │ estimated row count: 3 (missing stats) + │ aggregate 0: max(aggregated_ts) + │ aggregate 1: array_agg(metadata) + │ aggregate 2: array_agg(statistics) + │ group by: fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations + │ + └── • index join + │ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations) + │ estimated row count: 3 (missing stats) + │ table: statement_statistics@primary + │ key columns: crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id + │ + └── • scan + columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8) + estimated row count: 3 (missing stats) + table: statement_statistics@execution_count_idx (partial index) + spans: /2023-04-10T16:00:00Z-/2023-04-10T16:00:00.000000001Z + +# Upsert top 500 statement_activity including all statements in the top 500 transactions +query T retry +EXPLAIN (VERBOSE) UPSERT + INTO system.public.statement_activity + (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, + agg_interval, metadata, statistics, plan, index_recommendations, execution_count, + execution_total_seconds, execution_total_cluster_seconds, + contention_time_avg_seconds, + cpu_sql_avg_nanos, + service_latency_avg_seconds, service_latency_p99_seconds) + (SELECT aggregated_ts, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + app_name, + agg_interval, + metadata, + statistics, + plan, + index_recommendations, + (statistics -> 'execution_statistics' ->> 'cnt')::int, + ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float), + 100 AS execution_total_cluster_seconds, + COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0), + COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0), + (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float, + COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0) + FROM (SELECT max(ss.aggregated_ts) AS aggregated_ts, + ss.fingerprint_id, + ss.transaction_fingerprint_id, + ss.plan_hash, + ss.app_name, + ss.agg_interval, + crdb_internal.merge_stats_metadata(array_agg(ss.metadata)) AS metadata, + crdb_internal.merge_statement_stats(array_agg(ss.statistics)) AS statistics, + ss.plan, + ss.index_recommendations + FROM system.public.statement_statistics ss + inner join (SELECT fingerprint_id, app_name + FROM (SELECT fingerprint_id, app_name, + row_number() + OVER (ORDER BY (statistics -> 'execution_statistics' ->> 'cnt')::int desc) AS ePos, + row_number() + OVER (ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float desc) AS sPos, + row_number() OVER (ORDER BY + ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float) desc) AS tPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, + 0) desc) AS cPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, + 0) desc) AS uPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, + 0) desc) AS lPos + FROM (SELECT fingerprint_id, + app_name, + crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics + FROM system.public.statement_statistics + WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00' and + app_name not like '$ internal%' + GROUP BY app_name, + fingerprint_id)) + WHERE ePos < 500 + or sPos < 500 + or tPos < 500 + or cPos < 500 + or uPos < 500 + or lPos < 500) agg on agg.app_name = ss.app_name and agg.fingerprint_id = ss.fingerprint_id + WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00' + GROUP BY ss.app_name, + ss.fingerprint_id, + ss.transaction_fingerprint_id, + ss.plan_hash, + ss.agg_interval, + ss.plan, + ss.index_recommendations)); +---- +distribution: local +vectorized: true +· +• upsert +│ columns: () +│ estimated row count: 0 (missing stats) +│ into: statement_activity(aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) +│ auto commit +│ arbiter indexes: primary +│ +└── • project + │ columns: (max, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds, agg_interval, metadata, statistics, plan, index_recommendations, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts) + │ + └── • lookup join (left outer) + │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics, aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) + │ estimated row count: 0 (missing stats) + │ table: statement_activity@primary + │ equality: (max, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name) = (aggregated_ts,fingerprint_id,transaction_fingerprint_id,plan_hash,app_name) + │ equality cols are key + │ + └── • distinct + │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics) + │ estimated row count: 0 (missing stats) + │ distinct on: fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, max + │ nulls are distinct + │ error on duplicate + │ + └── • render + │ columns: (int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, metadata, statistics) + │ render int8: ((statistics->'execution_statistics')->>'cnt')::INT8 + │ render ?column?: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render execution_total_cluster_seconds: 100.0 + │ render coalesce: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0) + │ render coalesce: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0) + │ render float8: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render coalesce: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0) + │ render fingerprint_id: fingerprint_id + │ render transaction_fingerprint_id: transaction_fingerprint_id + │ render plan_hash: plan_hash + │ render app_name: app_name + │ render agg_interval: agg_interval + │ render plan: plan + │ render index_recommendations: index_recommendations + │ render max: max + │ render metadata: metadata + │ render statistics: statistics + │ + └── • render + │ columns: (metadata, statistics, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max) + │ render metadata: crdb_internal.merge_stats_metadata(array_agg) + │ render statistics: crdb_internal.merge_statement_stats(array_agg) + │ render fingerprint_id: fingerprint_id + │ render transaction_fingerprint_id: transaction_fingerprint_id + │ render plan_hash: plan_hash + │ render app_name: app_name + │ render agg_interval: agg_interval + │ render plan: plan + │ render index_recommendations: index_recommendations + │ render max: max + │ + └── • group (hash) + │ columns: (fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations, max, array_agg, array_agg) + │ estimated row count: 0 (missing stats) + │ aggregate 0: max(aggregated_ts) + │ aggregate 1: array_agg(metadata) + │ aggregate 2: array_agg(statistics) + │ group by: fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, plan, index_recommendations + │ + └── • project + │ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations) + │ + └── • hash join (inner) + │ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations, fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number) + │ estimated row count: 0 (missing stats) + │ equality: (app_name, fingerprint_id) = (app_name, fingerprint_id) + │ right cols are key + │ + ├── • scan + │ columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, agg_interval, metadata, statistics, plan, index_recommendations) + │ estimated row count: 10 (missing stats) + │ table: statement_statistics@primary + │ spans: /0/2023-04-10T16:00:00Z-/0/2023-04-10T16:00:00.000000001Z /1/2023-04-10T16:00:00Z-/1/2023-04-10T16:00:00.000000001Z /2/2023-04-10T16:00:00Z-/2/2023-04-10T16:00:00.000000001Z /3/2023-04-10T16:00:00Z-/3/2023-04-10T16:00:00.000000001Z /4/2023-04-10T16:00:00Z-/4/2023-04-10T16:00:00.000000001Z /5/2023-04-10T16:00:00Z-/5/2023-04-10T16:00:00.000000001Z /6/2023-04-10T16:00:00Z-/6/2023-04-10T16:00:00.000000001Z /7/2023-04-10T16:00:00Z-/7/2023-04-10T16:00:00.000000001Z + │ + └── • filter + │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number) + │ estimated row count: 3 (missing stats) + │ filter: (((((row_number < 500) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500) + │ + └── • window + │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_6_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_5_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_4_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_3_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_2_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_1_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • render + │ columns: (fingerprint_id, app_name, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ render row_number_1_orderby_1_1: ((statistics->'execution_statistics')->>'cnt')::INT8 + │ render row_number_2_orderby_1_1: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render row_number_3_orderby_1_1: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render row_number_4_orderby_1_1: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0) + │ render row_number_5_orderby_1_1: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0) + │ render row_number_6_orderby_1_1: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0) + │ render fingerprint_id: fingerprint_id + │ render app_name: app_name + │ + └── • render + │ columns: (statistics, fingerprint_id, app_name) + │ render statistics: crdb_internal.merge_statement_stats(array_agg) + │ render fingerprint_id: fingerprint_id + │ render app_name: app_name + │ + └── • group (hash) + │ columns: (fingerprint_id, app_name, array_agg) + │ estimated row count: 3 (missing stats) + │ aggregate 0: array_agg(statistics) + │ group by: fingerprint_id, app_name + │ + └── • project + │ columns: (fingerprint_id, app_name, statistics) + │ + └── • index join + │ columns: (aggregated_ts, fingerprint_id, app_name, statistics) + │ estimated row count: 3 (missing stats) + │ table: statement_statistics@primary + │ key columns: crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id + │ + └── • scan + columns: (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8) + estimated row count: 3 (missing stats) + table: statement_statistics@execution_count_idx (partial index) + spans: /2023-04-10T16:00:00Z-/2023-04-10T16:00:00.000000001Z + +# Upsert top 500 transactions +query T retry +EXPLAIN (VERBOSE) UPSERT + INTO system.public.transaction_activity + (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, + statistics, query, execution_count, execution_total_seconds, + execution_total_cluster_seconds, contention_time_avg_seconds, + cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) + (SELECT aggregated_ts, + fingerprint_id, + app_name, + agg_interval, + metadata, + statistics, + '' AS query, + (statistics -> 'execution_statistics' ->> 'cnt')::int, + ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float), + 100 AS execution_total_cluster_seconds, + COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0), + COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0), + (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float, + COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0) + FROM (SELECT max(ts.aggregated_ts) AS aggregated_ts, + ts.app_name, + ts.fingerprint_id, + ts.agg_interval, + crdb_internal.merge_stats_metadata(array_agg(ts.metadata)) AS metadata, + crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics + FROM system.public.transaction_statistics ts + inner join (SELECT fingerprint_id, app_name, agg_interval + FROM (SELECT fingerprint_id, app_name, agg_interval, + row_number() + OVER (ORDER BY (statistics -> 'execution_statistics' ->> 'cnt')::int desc) AS ePos, + row_number() + OVER (ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float desc) AS sPos, + row_number() + OVER (ORDER BY ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float) desc) AS tPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, + 0) desc) AS cPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, + 0) desc) AS uPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, + 0) desc) AS lPos + FROM (SELECT fingerprint_id, app_name, agg_interval, + crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics + FROM system.public.transaction_statistics + WHERE aggregated_ts = '2023-04-10 16:00:00.000000 +00:00' and + app_name not like '$ internal%' + GROUP BY app_name, + fingerprint_id, + agg_interval)) + WHERE ePos < 500 + or sPos < 500 + or tPos < 500 + or cPos < 500 + or uPos < 500 + or lPos < 500) agg + on agg.app_name = ts.app_name and agg.fingerprint_id = ts.fingerprint_id and + agg.agg_interval = ts.agg_interval + GROUP BY ts.app_name, + ts.fingerprint_id, + ts.agg_interval)); +---- +distribution: local +vectorized: true +· +• upsert +│ columns: () +│ estimated row count: 0 (missing stats) +│ into: transaction_activity(aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) +│ auto commit +│ arbiter indexes: primary +│ +└── • project + │ columns: (max, fingerprint_id, app_name, agg_interval, metadata, statistics, query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds, agg_interval, metadata, statistics, query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", aggregated_ts) + │ + └── • lookup join (left outer) + │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics, aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, query, execution_count, execution_total_seconds, execution_total_cluster_seconds, contention_time_avg_seconds, cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) + │ estimated row count: 0 (missing stats) + │ table: transaction_activity@primary + │ equality: (max, fingerprint_id, app_name) = (aggregated_ts,fingerprint_id,app_name) + │ equality cols are key + │ + └── • distinct + │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics) + │ estimated row count: 0 (missing stats) + │ distinct on: fingerprint_id, app_name, max + │ nulls are distinct + │ error on duplicate + │ + └── • render + │ columns: (query, int8, "?column?", execution_total_cluster_seconds, "coalesce", "coalesce", float8, "coalesce", fingerprint_id, app_name, agg_interval, max, metadata, statistics) + │ render query: '' + │ render int8: ((statistics->'execution_statistics')->>'cnt')::INT8 + │ render ?column?: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render execution_total_cluster_seconds: 100.0 + │ render coalesce: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0) + │ render coalesce: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0) + │ render float8: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render coalesce: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0) + │ render fingerprint_id: fingerprint_id + │ render app_name: app_name + │ render agg_interval: agg_interval + │ render max: max + │ render metadata: metadata + │ render statistics: statistics + │ + └── • render + │ columns: (metadata, statistics, fingerprint_id, app_name, agg_interval, max) + │ render metadata: crdb_internal.merge_stats_metadata(array_agg) + │ render statistics: crdb_internal.merge_transaction_stats(array_agg) + │ render fingerprint_id: fingerprint_id + │ render app_name: app_name + │ render agg_interval: agg_interval + │ render max: max + │ + └── • group (hash) + │ columns: (fingerprint_id, app_name, agg_interval, max, array_agg, array_agg) + │ estimated row count: 0 (missing stats) + │ aggregate 0: max(aggregated_ts) + │ aggregate 1: array_agg(metadata) + │ aggregate 2: array_agg(statistics) + │ group by: fingerprint_id, app_name, agg_interval + │ + └── • project + │ columns: (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics) + │ + └── • project + │ columns: (aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, statistics, fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ + └── • lookup join (inner) + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number, aggregated_ts, fingerprint_id, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, agg_interval, metadata, statistics) + │ estimated row count: 0 (missing stats) + │ table: transaction_statistics@primary + │ equality: (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id) = (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8,aggregated_ts,fingerprint_id,app_name,node_id) + │ equality cols are key + │ pred: agg_interval = agg_interval + │ + └── • lookup join (inner) + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number, aggregated_ts, fingerprint_id, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8) + │ estimated row count: 0 (missing stats) + │ table: transaction_statistics@fingerprint_stats_idx + │ lookup condition: (fingerprint_id = fingerprint_id) AND (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 IN (0, 1, 2, 3, 4, 5, 6, 7)) + │ pred: app_name = app_name + │ + └── • filter + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number) + │ estimated row count: 3 (missing stats) + │ filter: (((((row_number < 500) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500)) OR (row_number < 500) + │ + └── • window + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1, row_number) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_6_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_5_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_4_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_3_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_2_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • window + │ columns: (fingerprint_id, app_name, agg_interval, row_number, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ estimated row count: 3 (missing stats) + │ window 0: row_number() OVER (ORDER BY row_number_1_orderby_1_1 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + │ + └── • render + │ columns: (fingerprint_id, app_name, agg_interval, row_number_1_orderby_1_1, row_number_2_orderby_1_1, row_number_3_orderby_1_1, row_number_4_orderby_1_1, row_number_5_orderby_1_1, row_number_6_orderby_1_1) + │ render row_number_1_orderby_1_1: ((statistics->'execution_statistics')->>'cnt')::INT8 + │ render row_number_2_orderby_1_1: (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render row_number_3_orderby_1_1: ((statistics->'execution_statistics')->>'cnt')::FLOAT8 * (((statistics->'statistics')->'svcLat')->>'mean')::FLOAT8 + │ render row_number_4_orderby_1_1: COALESCE((((statistics->'execution_statistics')->'contentionTime')->>'mean')::FLOAT8, 0.0) + │ render row_number_5_orderby_1_1: COALESCE((((statistics->'execution_statistics')->'cpu_sql_nanos')->>'mean')::FLOAT8, 0.0) + │ render row_number_6_orderby_1_1: COALESCE((((statistics->'statistics')->'latencyInfo')->>'p99')::FLOAT8, 0.0) + │ render fingerprint_id: fingerprint_id + │ render app_name: app_name + │ render agg_interval: agg_interval + │ + └── • render + │ columns: (statistics, fingerprint_id, app_name, agg_interval) + │ render statistics: crdb_internal.merge_transaction_stats(array_agg) + │ render fingerprint_id: fingerprint_id + │ render app_name: app_name + │ render agg_interval: agg_interval + │ + └── • group (hash) + │ columns: (fingerprint_id, app_name, agg_interval, array_agg) + │ estimated row count: 3 (missing stats) + │ aggregate 0: array_agg(statistics) + │ group by: fingerprint_id, app_name, agg_interval + │ + └── • project + │ columns: (fingerprint_id, app_name, agg_interval, statistics) + │ + └── • index join + │ columns: (aggregated_ts, fingerprint_id, app_name, agg_interval, statistics) + │ estimated row count: 3 (missing stats) + │ table: transaction_statistics@primary + │ key columns: crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id + │ + └── • scan + columns: (aggregated_ts, fingerprint_id, app_name, node_id, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8) + estimated row count: 3 (missing stats) + table: transaction_statistics@execution_count_idx (partial index) + spans: /2023-04-10T16:00:00Z-/2023-04-10T16:00:00.000000001Z diff --git a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go index 83fb197ec1cc..4237d46387bb 100644 --- a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go +++ b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go @@ -399,6 +399,13 @@ func TestExecBuild_not_visible_index( runExecBuildLogicTest(t, "not_visible_index") } +func TestExecBuild_observability( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runExecBuildLogicTest(t, "observability") +} + func TestExecBuild_orderby( t *testing.T, ) { diff --git a/pkg/sql/sql_activity_update_job.go b/pkg/sql/sql_activity_update_job.go new file mode 100644 index 000000000000..9db8e76fc083 --- /dev/null +++ b/pkg/sql/sql_activity_update_job.go @@ -0,0 +1,669 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + io_prometheus_client "github.com/prometheus/client_model/go" +) + +// enabled the stats activity flush job. +var enabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "sql.stats.activity.flush.enabled", + "enable the flush to the system statement and transaction activity tables", + true) + +// sqlStatsActivityTopCount is the cluster setting that controls the number of +// rows selected to be inserted into the activity tables +var sqlStatsActivityTopCount = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.stats.activity.top.max", + "the limit per column for the top number of statistics to be flushed "+ + "to the activity tables", + 500, + settings.NonNegativeInt, +) + +// sqlStatsActivityMaxPersistedRows specifies maximum number of rows that will be +// retained in system.statement_activity and system.transaction_activity. +// Defaults computed 500(top limit)*6(num columns)*24(hrs)*3(days)=216000 +// to give a minimum of 3 days of history. It was rounded down to 200k to +// give an even number. The top 500(controlled by sql.stats.activity.top.max) +// are likely the same for several columns, so it should still give 3 days +// of history for the default settings +var sqlStatsActivityMaxPersistedRows = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.stats.activity.persisted_rows.max", + "maximum number of rows of statement and transaction"+ + " activity that will be persisted in the system tables", + 200000, /* defaultValue*/ + settings.NonNegativeInt, +).WithPublic() + +const numberOfTopColumns = 6 + +type sqlActivityUpdateJob struct { + job *jobs.Job +} + +// Resume implements the jobs.sqlActivityUpdateJob interface. +// The SQL activity job runs AS a forever-running background job +// and runs the SqlActivityUpdater according to sql.stats.activity.flush.interval. +func (j *sqlActivityUpdateJob) Resume(ctx context.Context, execCtxI interface{}) (jobErr error) { + log.Infof(ctx, "starting sql stats activity flush job") + // The sql activity update job is a forever running background job. + // It's always safe to wind the SQL pod down whenever it's + // running, something we indicate through the job's idle + // status. + j.job.MarkIdle(true) + + execCtx := execCtxI.(JobExecContext) + stopper := execCtx.ExecCfg().DistSQLSrv.Stopper + settings := execCtx.ExecCfg().Settings + statsFlush := execCtx.ExecCfg().InternalDB.server.sqlStats + metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoUpdateSQLActivity].(ActivityUpdaterMetrics) + + flushDoneSignal := make(chan struct{}) + defer func() { + statsFlush.SetFlushDoneCallback(nil) + close(flushDoneSignal) + }() + + for { + statsFlush.SetFlushDoneCallback(func() { + flushDoneSignal <- struct{}{} + }) + select { + case <-flushDoneSignal: + // A flush was done. Set the timer and wait for it to complete. + if enabled.Get(&settings.SV) { + updater := NewSqlActivityUpdater(settings, execCtx.ExecCfg().InternalDB) + if err := updater.TransferStatsToActivity(ctx); err != nil { + log.Warningf(ctx, "error running sql activity updater job: %v", err) + metrics.numErrors.Inc(1) + } + } + case <-ctx.Done(): + return nil + case <-stopper.ShouldQuiesce(): + return nil + } + } +} + +type ActivityUpdaterMetrics struct { + numErrors *metric.Counter +} + +func (m ActivityUpdaterMetrics) MetricStruct() {} + +func newActivityUpdaterMetrics() metric.Struct { + return ActivityUpdaterMetrics{ + numErrors: metric.NewCounter(metric.Metadata{ + Name: "jobs.metrics.task_failed", + Help: "Number of metrics sql activity updater tasks that failed", + Measurement: "errors", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + }), + } +} + +// OnFailOrCancel implements the jobs.sqlActivityUpdateJob interface. +// No action needs to be taken on our part. There's no state to clean up. +func (r *sqlActivityUpdateJob) OnFailOrCancel( + ctx context.Context, _ interface{}, jobErr error, +) error { + if jobs.HasErrJobCanceled(jobErr) { + err := errors.NewAssertionErrorWithWrappedErrf(jobErr, + "sql activity is not cancelable") + log.Errorf(ctx, "%v", err) + } + return nil +} + +func init() { + jobs.RegisterConstructor(jobspb.TypeAutoUpdateSQLActivity, + func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return &sqlActivityUpdateJob{job: job} + }, + jobs.DisablesTenantCostControl, + jobs.WithJobMetrics(newActivityUpdaterMetrics()), + ) +} + +// NewSqlActivityUpdater returns a new instance of SqlActivityUpdater. +func NewSqlActivityUpdater(setting *cluster.Settings, db isql.DB) *SqlActivityUpdater { + return &SqlActivityUpdater{ + st: setting, + db: db, + } +} + +type SqlActivityUpdater struct { + st *cluster.Settings + db isql.DB +} + +func (u *SqlActivityUpdater) TransferStatsToActivity(ctx context.Context) error { + // Get the config and pass it around to avoid any issue of it changing + // in the middle of the execution. + maxRowPersistedRows := sqlStatsActivityMaxPersistedRows.Get(&u.st.SV) + topLimit := sqlStatsActivityTopCount.Get(&u.st.SV) + aggTs := u.computeAggregatedTs(&u.st.SV) + + // The counts are using AS OF SYSTEM TIME so the values may be slightly + // off. This is acceptable to increase the performance. + stmtRowCount, txnRowCount, totalStmtClusterExecCount, totalTxnClusterExecCount, err := u.getAostExecutionCount(ctx, aggTs) + if err != nil { + return err + } + + // No need to continue since there are no rows to transfer + if stmtRowCount == 0 && txnRowCount == 0 { + log.Infof(ctx, "sql stats activity found no rows at %s", aggTs) + return nil + } + + // Create space on the table before adding new rows to avoid + // going OVER the count. If the compaction fails it will not + // add any new rows. + err = u.compactActivityTables(ctx, maxRowPersistedRows-stmtRowCount) + if err != nil { + return err + } + + // There are fewer rows than filtered top would return. + // Just transfer all the stats to avoid overhead of getting + // the tops. + if stmtRowCount < (topLimit*numberOfTopColumns) && txnRowCount < (topLimit*numberOfTopColumns) { + return u.transferAllStats(ctx, aggTs, totalStmtClusterExecCount, totalTxnClusterExecCount) + } + + // Only transfer the top sql.stats.activity.top.max for each of + // the 6 most popular columns + err = u.transferTopStats(ctx, aggTs, topLimit, totalStmtClusterExecCount, totalTxnClusterExecCount) + return err +} + +// transferAllStats is used to transfer all the stats FROM +// system.statement_statistics and system.transaction_statistics +// to system.statement_activity and system.transaction_activity +func (u *SqlActivityUpdater) transferAllStats( + ctx context.Context, + aggTs time.Time, + totalStmtClusterExecCount int64, + totalTxnClusterExecCount int64, +) error { + _, err := u.db.Executor().ExecEx(ctx, + "activity-flush-txn-transfer-all", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + ` + UPSERT INTO system.public.transaction_activity +(aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, + statistics, query, execution_count, execution_total_seconds, + execution_total_cluster_seconds, contention_time_avg_seconds, + cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) + (SELECT aggregated_ts, + fingerprint_id, + app_name, + agg_interval, + metadata, + statistics, + '' AS query, + (statistics->'execution_statistics'->>'cnt')::int, + ((statistics->'execution_statistics'->>'cnt')::float)*((statistics->'statistics'->'svcLat'->>'mean')::float), + $1 AS execution_total_cluster_seconds, + COALESCE((statistics->'execution_statistics'->'contentionTime'->>'mean')::float,0), + COALESCE((statistics->'execution_statistics'->'cpu_sql_nanos'->>'mean')::float,0), + (statistics->'statistics'->'svcLat'->>'mean')::float, + COALESCE((statistics->'statistics'->'latencyInfo'->>'p99')::float, 0) + FROM (SELECT + max(aggregated_ts) AS aggregated_ts, + app_name, + fingerprint_id, + agg_interval, + crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, + crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics + FROM system.public.transaction_statistics + WHERE aggregated_ts = $2 + and app_name not like '$ internal%' + GROUP BY app_name, + fingerprint_id, + agg_interval)); +`, + totalTxnClusterExecCount, + aggTs, + ) + + if err != nil { + return err + } + + _, err = u.db.Executor().ExecEx(ctx, + "activity-flush-stmt-transfer-all", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + ` + UPSERT +INTO system.public.statement_activity (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, + agg_interval, metadata, statistics, plan, index_recommendations, execution_count, + execution_total_seconds, execution_total_cluster_seconds, + contention_time_avg_seconds, + cpu_sql_avg_nanos, + service_latency_avg_seconds, service_latency_p99_seconds) + (SELECT aggregated_ts, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + app_name, + agg_interval, + metadata, + statistics, + plan, + index_recommendations, + (statistics -> 'execution_statistics' ->> 'cnt')::int, + ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float), + $1 AS execution_total_cluster_seconds, + COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0), + COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0), + (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float, + COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0) + FROM (SELECT max(aggregated_ts) AS aggregated_ts, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + app_name, + agg_interval, + crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, + crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, + plan, + index_recommendations + FROM system.public.statement_statistics + WHERE aggregated_ts = $2 + and app_name not like '$ internal%' + GROUP BY app_name, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + agg_interval, + plan, + index_recommendations)); +`, + totalStmtClusterExecCount, + aggTs, + ) + + return err +} + +// transferTopStats is used to transfer top N stats FROM +// system.statement_statistics and system.transaction_statistics +// to system.statement_activity and system.transaction_activity +func (u *SqlActivityUpdater) transferTopStats( + ctx context.Context, + aggTs time.Time, + topLimit int64, + totalStmtClusterExecCount int64, + totalTxnClusterExecCount int64, +) (retErr error) { + // Select the top 500 (controlled by sql.stats.activity.top.max) for + // each of execution_count, total execution time, service_latency,cpu_sql_nanos, + // contention_time, p99_latency and insert into transaction_activity table. + // Up to 3000 rows (sql.stats.activity.top.max * 6) may be added to + // transaction_activity. + _, err := u.db.Executor().ExecEx(ctx, + "activity-flush-txn-transfer-tops", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + ` +UPSERT +INTO system.public.transaction_activity +(aggregated_ts, fingerprint_id, app_name, agg_interval, metadata, + statistics, query, execution_count, execution_total_seconds, + execution_total_cluster_seconds, contention_time_avg_seconds, + cpu_sql_avg_nanos, service_latency_avg_seconds, service_latency_p99_seconds) + (SELECT aggregated_ts, + fingerprint_id, + app_name, + agg_interval, + metadata, + statistics, + '' AS query, + (statistics -> 'execution_statistics' ->> 'cnt')::int, + ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float), + $1 AS execution_total_cluster_seconds, + COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0), + COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0), + (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float, + COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0) + FROM (SELECT max(ts.aggregated_ts) AS aggregated_ts, + ts.app_name, + ts.fingerprint_id, + ts.agg_interval, + crdb_internal.merge_stats_metadata(array_agg(ts.metadata)) AS metadata, + crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics + FROM system.public.transaction_statistics ts + inner join (SELECT fingerprint_id, app_name, agg_interval + FROM (SELECT fingerprint_id, app_name, agg_interval, + row_number() + OVER (ORDER BY (statistics -> 'execution_statistics' ->> 'cnt')::int desc) AS ePos, + row_number() + OVER (ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float desc) AS sPos, + row_number() + OVER (ORDER BY ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float) desc) AS tPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, + 0) desc) AS cPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, + 0) desc) AS uPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, + 0) desc) AS lPos + FROM (SELECT fingerprint_id, app_name, agg_interval, + crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics + FROM system.public.transaction_statistics + WHERE aggregated_ts = $2 and + app_name not like '$ internal%' + GROUP BY app_name, + fingerprint_id, + agg_interval)) + WHERE ePos < $3 + or sPos < $3 + or tPos < $3 + or cPos < $3 + or uPos < $3 + or lPos < $3) agg + on agg.app_name = ts.app_name and agg.fingerprint_id = ts.fingerprint_id and + agg.agg_interval = ts.agg_interval + GROUP BY ts.app_name, + ts.fingerprint_id, + ts.agg_interval)); +`, + totalStmtClusterExecCount, + aggTs, + topLimit, + ) + + if err != nil { + return err + } + + // Select the top 500 (controlled by sql.stats.activity.top.max) for each of + // execution_count, total execution time, service_latency, cpu_sql_nanos, + // contention_time, p99_latency. Also include all statements that are in the + // top N transactions. This is needed so the statement information is + // available for the ui so a user can see what is in the transaction. + _, err = u.db.Executor().ExecEx(ctx, + "activity-flush-stmt-transfer-tops", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + ` +UPSERT +INTO system.public.statement_activity +(aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, + agg_interval, metadata, statistics, plan, index_recommendations, execution_count, + execution_total_seconds, execution_total_cluster_seconds, + contention_time_avg_seconds, + cpu_sql_avg_nanos, + service_latency_avg_seconds, service_latency_p99_seconds) + (SELECT aggregated_ts, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + app_name, + agg_interval, + metadata, + statistics, + plan, + index_recommendations, + (statistics -> 'execution_statistics' ->> 'cnt')::int, + ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float), + $1 AS execution_total_cluster_seconds, + COALESCE((statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, 0), + COALESCE((statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, 0), + (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float, + COALESCE((statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, 0) + FROM (SELECT max(ss.aggregated_ts) AS aggregated_ts, + ss.fingerprint_id, + ss.transaction_fingerprint_id, + ss.plan_hash, + ss.app_name, + ss.agg_interval, + crdb_internal.merge_stats_metadata(array_agg(ss.metadata)) AS metadata, + crdb_internal.merge_statement_stats(array_agg(ss.statistics)) AS statistics, + ss.plan, + ss.index_recommendations + FROM system.public.statement_statistics ss + inner join (SELECT fingerprint_id, app_name + FROM (SELECT fingerprint_id, app_name, + row_number() + OVER (ORDER BY (statistics -> 'execution_statistics' ->> 'cnt')::int desc) AS ePos, + row_number() + OVER (ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float desc) AS sPos, + row_number() OVER (ORDER BY + ((statistics -> 'execution_statistics' ->> 'cnt')::float) * + ((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float) desc) AS tPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::float, + 0) desc) AS cPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'execution_statistics' -> 'cpu_sql_nanos' ->> 'mean')::float, + 0) desc) AS uPos, + row_number() OVER (ORDER BY COALESCE( + (statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::float, + 0) desc) AS lPos + FROM (SELECT fingerprint_id, + app_name, + crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics + FROM system.public.statement_statistics + WHERE aggregated_ts = $2 and + app_name not like '$ internal%' + GROUP BY app_name, + fingerprint_id)) + WHERE ePos < $3 + or sPos < $3 + or tPos < $3 + or cPos < $3 + or uPos < $3 + or lPos < $3) agg on agg.app_name = ss.app_name and agg.fingerprint_id = ss.fingerprint_id + WHERE aggregated_ts = $2 + GROUP BY ss.app_name, + ss.fingerprint_id, + ss.transaction_fingerprint_id, + ss.plan_hash, + ss.agg_interval, + ss.plan, + ss.index_recommendations)); +`, + totalTxnClusterExecCount, + aggTs, + topLimit, + ) + + return err +} + +// getAosExecutionCount is used to get the row counts of both the +// system.statement_statistics and system.transaction_statistics. +// It also gets the total execution count for the specified aggregated +// timestamp. +func (u *SqlActivityUpdater) getAostExecutionCount( + ctx context.Context, aggTs time.Time, +) ( + stmtRowCount int64, + txnRowCount int64, + totalStmtClusterExecCount int64, + totalTxnClusterExecCount int64, + retErr error, +) { + it, err := u.db.Executor().QueryIteratorEx(ctx, + "activity-flush-count", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + ` + SELECT row_count, ex_sum FROM (SELECT + count_rows():::int AS row_count, + COALESCE(sum(execution_count)::int, 0) AS ex_sum + FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp() + WHERE app_name not like '$ internal%' and aggregated_ts = $1 + union all + SELECT + count_rows():::int AS row_count, + COALESCE(sum(execution_count)::int, 0) AS ex_sum + FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp() + WHERE app_name not like '$ internal%' and aggregated_ts = $1) AS OF SYSTEM TIME follower_read_timestamp()`, + aggTs, + ) + + if err != nil { + return -1, -1, -1, -1, err + } + + defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }() + + stmtRowCount, totalStmtClusterExecCount, err = u.getExecutionCountFromRow(ctx, it) + if err != nil { + return -1, -1, -1, -1, err + } + + txnRowCount, totalTxnClusterExecCount, err = u.getExecutionCountFromRow(ctx, it) + return stmtRowCount, txnRowCount, totalStmtClusterExecCount, totalTxnClusterExecCount, err +} + +func (u *SqlActivityUpdater) getExecutionCountFromRow( + ctx context.Context, iter isql.Rows, +) (rowCount int64, totalExecutionCount int64, err error) { + ok, err := iter.Next(ctx) + if err != nil { + return -1, -1, err + } + + if !ok { + return -1, -1, fmt.Errorf("no rows in activity-flush-count") + } + + row := iter.Cur() + if row[0] == tree.DNull || row[1] == tree.DNull { + return 0, 0, nil + } + + return int64(tree.MustBeDInt(row[0])), int64(tree.MustBeDInt(row[1])), nil +} + +// ComputeAggregatedTs returns the aggregation timestamp to assign +// in-memory SQL stats during storage or aggregation. +func (u *SqlActivityUpdater) computeAggregatedTs(sv *settings.Values) time.Time { + interval := persistedsqlstats.SQLStatsAggregationInterval.Get(sv) + + now := timeutil.Now() + aggTs := now.Truncate(interval) + return aggTs +} + +// compactActivityTables is used delete rows FROM the activity tables +// to keep the tables under the specified config limit. +func (u *SqlActivityUpdater) compactActivityTables(ctx context.Context, maxRowCount int64) error { + rowCount, err := u.getTableRowCount(ctx, "system.statement_activity") + if err != nil { + return err + } + + if rowCount < maxRowCount { + return nil + } + + // Delete all the rows FROM the aggregated_ts to avoid + // showing partial data for a time range. + _, err = u.db.Executor().ExecEx(ctx, + "activity-stmt-compaction", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + ` + DELETE +FROM system.statement_activity +WHERE aggregated_ts IN (SELECT DISTINCT aggregated_ts FROM (SELECT aggregated_ts FROM system.statement_activity ORDER BY aggregated_ts ASC limit $1));`, + rowCount-maxRowCount, + ) + + if err != nil { + return err + } + + // Delete all the rows older than on the oldest statement_activity aggregated_ts. + // This makes sure that the 2 tables are always in sync. + _, err = u.db.Executor().ExecEx(ctx, + "activity-txn-compaction", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + ` + DELETE +FROM system.transaction_activity +WHERE aggregated_ts not in (SELECT distinct aggregated_ts FROM system.statement_activity);`, + ) + + return err +} + +// getTableRowCount is used to get the row counts of both the +// system.statement_statistics and system.transaction_statistics. +// It also gets the total execution count for the specified aggregated +// timestamp. +func (u *SqlActivityUpdater) getTableRowCount( + ctx context.Context, tableName string, +) (rowCount int64, retErr error) { + query := fmt.Sprintf(` + SELECT + count_rows()::int + FROM %s AS OF SYSTEM TIME follower_read_timestamp()`, tableName) + datums, err := u.db.Executor().QueryRowEx(ctx, + "activity-total-count", + nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + query, + ) + + if err != nil { + return 0, err + } + + if datums == nil { + return 0, nil + } + + if datums[0] == tree.DNull { + return 0, nil + } + + return int64(tree.MustBeDInt(datums[0])), nil +} diff --git a/pkg/sql/sql_activity_update_job_test.go b/pkg/sql/sql_activity_update_job_test.go new file mode 100644 index 000000000000..674740d7e4af --- /dev/null +++ b/pkg/sql/sql_activity_update_job_test.go @@ -0,0 +1,339 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestSqlActivityUpdateJob verifies that the +// job is created. +func TestSqlActivityUpdateJob(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStressRace(t, "test is too slow to run under race") + + // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.) + // Disable the job since it is called manually from a new instance to avoid + // any race conditions. + ctx := context.Background() + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true, + Knobs: base.TestingKnobs{UpgradeManager: &upgradebase.TestingKnobs{ + DontUseJobs: true, + SkipUpdateSQLActivityJobBootstrap: true, + }}}) + defer srv.Stopper().Stop(context.Background()) + defer db.Close() + + var count int + row := db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.transaction_activity") + err := row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "transaction_activity: expect:0, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.statement_activity") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "statement_activity: expect:0, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.jobs WHERE job_type = 'AUTO UPDATE SQL ACTIVITY' and id = 103 ") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "jobs: expect:0, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.transaction_statistics") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "transaction_statistics: expect:0, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.statement_statistics") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "statement_statistics: expect:0, actual:%d", count) + + execCfg := srv.ExecutorConfig().(ExecutorConfig) + st := cluster.MakeTestingClusterSettings() + updater := NewSqlActivityUpdater(st, execCfg.InternalDB) + + // Transient failures from AOST queries: https://github.com/cockroachdb/cockroach/issues/97840 + testutils.SucceedsWithin(t, func() error { + // Verify no error with empty stats + return updater.TransferStatsToActivity(ctx) + }, 30*time.Second) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.transaction_activity") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "transaction_activity: expect:0, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.statement_activity") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "statement_activity: expect:0, actual:%d", count) + + appName := "TestSqlActivityUpdateJob" + _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", appName) + require.NoError(t, err) + + _, err = db.ExecContext(ctx, "SELECT 1;") + require.NoError(t, err) + srv.SQLServer().(*Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + srv.SQLServer().(*Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", "randomIgnore") + require.NoError(t, err) + + // The check to calculate the rows uses the follower_read_timestamp which will + // skip the upsert because it will see there are no rows. + testutils.SucceedsWithin(t, func() error { + var txnAggTs time.Time + row = db.QueryRowContext(ctx, `SELECT count_rows(), aggregated_ts + FROM system.public.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp() + WHERE app_name = $1 + GROUP BY aggregated_ts`, appName) + err = row.Scan(&count, &txnAggTs) + if err != nil { + return err + } + if count <= 0 { + return errors.New("Need to wait for row to populate with follower_read_timestamp.") + } + + var stmtAggTs time.Time + row = db.QueryRowContext(ctx, `SELECT count_rows(), aggregated_ts + FROM system.public.statement_statistics AS OF SYSTEM TIME follower_read_timestamp() + WHERE app_name = $1 + GROUP BY aggregated_ts`, appName) + err = row.Scan(&count, &stmtAggTs) + if err != nil { + return err + } + if count <= 0 { + return errors.New("Need to wait for row to populate with follower_read_timestamp.") + } + require.Equal(t, stmtAggTs, txnAggTs) + return nil + }, 30*time.Second) + + // Run the updater to add rows to the activity tables + // This will use the transfer all scenarios with there only + // being a few rows + err = updater.TransferStatsToActivity(ctx) + require.NoError(t, err) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.transaction_activity WHERE app_name = $1", appName) + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, count, 1, "transaction_activity after transfer: expect:1, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.statement_activity WHERE app_name = $1", appName) + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, count, 1, "statement_activity after transfer: expect:1, actual:%d", count) +} + +// TestSqlActivityUpdateJob verifies that the +// job is created. +func TestSqlActivityUpdateTopLimitJob(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStressRace(t, "test is too slow to run under race") + + // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.) + ctx := context.Background() + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true, + Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}}) + defer srv.Stopper().Stop(context.Background()) + defer db.Close() + + // Verify all the tables are empty initially + var count int + row := db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.transaction_activity") + err := row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "transaction_activity: expect:0, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.statement_activity") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "statement_activity: expect:0, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.transaction_statistics") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "transaction_statistics: expect:0, actual:%d", count) + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.statement_statistics") + err = row.Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "statement_statistics: expect:0, actual:%d", count) + + execCfg := srv.ExecutorConfig().(ExecutorConfig) + st := cluster.MakeTestingClusterSettings() + su := st.MakeUpdater() + topLimit := 5 + err = su.Set(ctx, "sql.stats.activity.top.max", settings.EncodedValue{ + Value: settings.EncodeInt(int64(topLimit)), + Type: "i", + }) + require.NoError(t, err) + + updater := NewSqlActivityUpdater(st, execCfg.InternalDB) + + appNamePrefix := "TestSqlActivityUpdateJobLoop" + // Generate 100 unique rows for statistics tables + for i := 0; i < 100; i++ { + tempAppName := fmt.Sprintf("%s%d", appNamePrefix, i) + _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", tempAppName) + require.NoError(t, err) + + _, err = db.ExecContext(ctx, "SELECT 1;") + require.NoError(t, err) + } + + // Need to call it twice to actually cause a flush + srv.SQLServer().(*Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + srv.SQLServer().(*Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", "randomIgnore") + require.NoError(t, err) + + // The check to calculate the rows uses the follower_read_timestamp which will + // skip the upsert because it will see there are no rows. + testutils.SucceedsWithin(t, func() error { + var txnAggTs time.Time + row = db.QueryRowContext(ctx, `SELECT count_rows(), aggregated_ts + FROM system.public.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp() + WHERE app_name LIKE 'TestSqlActivityUpdateJobLoop%' + GROUP BY aggregated_ts`) + err = row.Scan(&count, &txnAggTs) + if err != nil { + return err + } + if count < 100 { + return errors.New("Need to wait for row to populate with follower_read_timestamp.") + } + + var stmtAggTs time.Time + row = db.QueryRowContext(ctx, `SELECT count_rows(), aggregated_ts + FROM system.public.statement_statistics AS OF SYSTEM TIME follower_read_timestamp() + WHERE app_name LIKE 'TestSqlActivityUpdateJobLoop%' + GROUP BY aggregated_ts`) + err = row.Scan(&count, &stmtAggTs) + if err != nil { + return err + } + if count < 100 { + return errors.New("Need to wait for row to populate with follower_read_timestamp.") + } + require.Equal(t, stmtAggTs, txnAggTs) + return nil + }, 30*time.Second) + + // Run the updater to add rows to the activity tables + // This will use the transfer all scenarios with there only + // being a few rows + err = updater.TransferStatsToActivity(ctx) + require.NoError(t, err) + + maxRows := topLimit * 6 // Number of top columns to select from + row = db.QueryRowContext(ctx, `SELECT count_rows() + FROM system.public.transaction_activity + WHERE app_name LIKE 'TestSqlActivityUpdateJobLoop%'`) + err = row.Scan(&count) + require.NoError(t, err) + require.LessOrEqual(t, count, maxRows, "transaction_activity after transfer: actual:%d, max:%d", count, maxRows) + + row = db.QueryRowContext(ctx, `SELECT count_rows() + FROM system.public.statement_activity + WHERE app_name LIKE 'TestSqlActivityUpdateJobLoop%'`) + err = row.Scan(&count) + require.NoError(t, err) + require.LessOrEqual(t, count, maxRows, "statement_activity after transfer: actual:%d, max:%d", count, maxRows) +} + +func TestScheduledSQLStatsCompaction(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStressRace(t, "test is too slow to run under race") + + // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true, + Settings: st, + Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}}) + defer srv.Stopper().Stop(context.Background()) + defer db.Close() + _, err := db.ExecContext(ctx, "SET CLUSTER SETTING sql.stats.flush.interval = '100ms'") + require.NoError(t, err) + appName := "TestScheduledSQLStatsCompaction" + _, err = db.ExecContext(ctx, "SET SESSION application_name=$1", appName) + require.NoError(t, err) + + testutils.SucceedsWithin(t, func() error { + _, err = db.ExecContext(ctx, "SELECT 1;") + require.NoError(t, err) + + row := db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.transaction_activity WHERE app_name = $1", appName) + var count int + err = row.Scan(&count) + if err != nil { + return err + } + if count <= 0 { + return fmt.Errorf("transaction_activity is empty: %d", count) + } + + row = db.QueryRowContext(ctx, "SELECT count_rows() "+ + "FROM system.public.statement_activity WHERE app_name = $1", appName) + err = row.Scan(&count) + if err != nil { + return err + } + if count <= 0 { + return fmt.Errorf("statement_activity is empty: %d", count) + } + + return nil + }, 1*time.Minute) +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index ad4ad83bca8d..4aca0403cbc5 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -47,6 +47,7 @@ go_library( "//pkg/util/mon", "//pkg/util/retry", "//pkg/util/stop", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//types", diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 774f7e707106..9652dbb36360 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -67,6 +68,10 @@ type PersistedSQLStats struct { // exceeded. memoryPressureSignal chan struct{} + // Use the signal the flush completed. + flushDoneCallback func() + flushMutex syncutil.Mutex + lastFlushStarted time.Time jobMonitor jobMonitor atomic struct { @@ -89,6 +94,7 @@ func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats { cfg: cfg, memoryPressureSignal: make(chan struct{}), drain: make(chan struct{}), + flushDoneCallback: nil, } p.jobMonitor = jobMonitor{ @@ -128,6 +134,12 @@ func (s *PersistedSQLStats) Stop(ctx context.Context) { s.tasksDoneWG.Wait() } +func (s *PersistedSQLStats) SetFlushDoneCallback(callBackFunc func()) { + s.flushMutex.Lock() + defer s.flushMutex.Unlock() + s.flushDoneCallback = callBackFunc +} + // GetController returns the controller of the PersistedSQLStats. func (s *PersistedSQLStats) GetController(server serverpb.SQLStatusServer) *Controller { return NewController(s, server, s.cfg.DB) @@ -173,6 +185,14 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper } s.Flush(ctx) + + func() { + s.flushMutex.Lock() + defer s.flushMutex.Unlock() + if s.flushDoneCallback != nil { + s.flushDoneCallback() + } + }() } }) if err != nil { diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 1902e065ffae..d64ddbf2893b 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3715,6 +3715,17 @@ var charts = []sectionDescription{ "jobs.key_visualizer.resume_retry_error", }, }, + { + Title: "SQL Activity Updater", + Metrics: []string{ + "jobs.auto_update_sql_activity.fail_or_cancel_completed", + "jobs.auto_update_sql_activity.fail_or_cancel_failed", + "jobs.auto_update_sql_activity.fail_or_cancel_retry_error", + "jobs.auto_update_sql_activity.resume_completed", + "jobs.auto_update_sql_activity.resume_failed", + "jobs.auto_update_sql_activity.resume_retry_error", + }, + }, { Title: "Jobs Stats Polling Job", Metrics: []string{ diff --git a/pkg/upgrade/upgradebase/testing_knobs.go b/pkg/upgrade/upgradebase/testing_knobs.go index a52b5df92b6d..d52147390bb5 100644 --- a/pkg/upgrade/upgradebase/testing_knobs.go +++ b/pkg/upgrade/upgradebase/testing_knobs.go @@ -71,6 +71,11 @@ type TestingKnobs struct { // AfterRunPermanentUpgrades is called after each call to // RunPermanentUpgrades. AfterRunPermanentUpgrades func() + + // SkipUpdateSQLActivityJobBootstrap, if set, disables the + // clusterversion.V23_1AddSystemActivityTables upgrade, which prevents a + // job from being created. + SkipUpdateSQLActivityJobBootstrap bool } // ModuleTestingKnobs makes TestingKnobs a base.ModuleTestingKnobs. diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 5c5e41adcd24..403fd2d1bc56 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "schema_changes.go", "schemachanger_elements.go", "sql_stats_ttl.go", + "system_activity_update_job.go", "system_external_connections.go", "system_job_info.go", "system_privileges_index_migration.go", @@ -115,6 +116,7 @@ go_test( "schema_changes_helpers_test.go", "schemachanger_elements_test.go", "sql_stats_ttl_test.go", + "system_activity_update_job_test.go", "system_job_info_test.go", "system_privileges_index_migration_test.go", "system_privileges_user_id_migration_test.go", diff --git a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go index 83d5cc2ecca8..1acb656c206d 100644 --- a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go +++ b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go @@ -19,7 +19,6 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/jobs/metricspoller" // Ensure job implementation is linked. "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/isql" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/upgrade" ) @@ -30,32 +29,15 @@ func createJobsMetricsPollingJob( return nil } return d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - row, err := d.DB.Executor().QueryRowEx( - ctx, - "check for existing job metrics polling job", - nil, - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - "SELECT * FROM system.jobs WHERE id = $1", - jobs.JobMetricsPollerJobID, - ) - if err != nil { - return err + jr := jobs.Record{ + JobID: jobs.JobMetricsPollerJobID, + Description: jobspb.TypePollJobsStats.String(), + Details: jobspb.PollJobsStatsDetails{}, + Progress: jobspb.PollJobsStatsProgress{}, + CreatedBy: &jobs.CreatedByInfo{Name: username.RootUser, ID: username.RootUserID}, + Username: username.RootUserName(), + NonCancelable: true, } - - if row == nil { - jr := jobs.Record{ - JobID: jobs.JobMetricsPollerJobID, - Description: jobspb.TypePollJobsStats.String(), - Details: jobspb.PollJobsStatsDetails{}, - Progress: jobspb.PollJobsStatsProgress{}, - CreatedBy: &jobs.CreatedByInfo{Name: username.RootUser, ID: username.RootUserID}, - Username: username.RootUserName(), - NonCancelable: true, - } - if _, err := d.JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobs.JobMetricsPollerJobID, txn); err != nil { - return err - } - } - return nil + return d.JobRegistry.CreateIfNotExistAdoptableJobWithTxn(ctx, jr, txn) }) } diff --git a/pkg/upgrade/upgrades/key_visualizer_migration.go b/pkg/upgrade/upgrades/key_visualizer_migration.go index 11e77e2878e1..c14130f51358 100644 --- a/pkg/upgrade/upgrades/key_visualizer_migration.go +++ b/pkg/upgrade/upgrades/key_visualizer_migration.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/upgrade" ) @@ -81,25 +82,9 @@ func keyVisualizerTablesMigration( NonCancelable: true, // The job can't be canceled, but it can be paused. } - // Make sure job with id doesn't already exist in system.jobs. - row, err := d.DB.Executor().QueryRowEx( - ctx, - "check for existing key visualizer job", - nil, - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - "SELECT * FROM system.jobs WHERE id = $1", - record.JobID, - ) - if err != nil { - return err - } - - // If there isn't a row for the key visualizer job, create the job. - if row == nil { - if _, err := d.JobRegistry.CreateAdoptableJobWithTxn(ctx, record, record.JobID, nil); err != nil { - return err - } - } + return d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return d.JobRegistry.CreateIfNotExistAdoptableJobWithTxn(ctx, record, txn) + }) } return nil diff --git a/pkg/upgrade/upgrades/system_activity_update_job.go b/pkg/upgrade/upgrades/system_activity_update_job.go new file mode 100644 index 000000000000..e47251aee7b5 --- /dev/null +++ b/pkg/upgrade/upgrades/system_activity_update_job.go @@ -0,0 +1,46 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/upgrade" +) + +// createActivityUpdateJobMigration creates the job to update the +// system.statement_activity and system.transaction_activity tables. +func createActivityUpdateJobMigration( + ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps, +) error { + + if d.TestingKnobs != nil && d.TestingKnobs.SkipUpdateSQLActivityJobBootstrap { + return nil + } + + record := jobs.Record{ + JobID: jobs.SqlActivityUpdaterJobID, + Description: "sql activity job", + Username: username.NodeUserName(), + Details: jobspb.AutoUpdateSQLActivityDetails{}, + Progress: jobspb.AutoConfigRunnerProgress{}, + NonCancelable: true, // The job can't be canceled, but it can be paused. + } + + return d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return d.JobRegistry.CreateIfNotExistAdoptableJobWithTxn(ctx, record, txn) + }) +} diff --git a/pkg/upgrade/upgrades/system_activity_update_job_test.go b/pkg/upgrade/upgrades/system_activity_update_job_test.go new file mode 100644 index 000000000000..3d509f246f49 --- /dev/null +++ b/pkg/upgrade/upgrades/system_activity_update_job_test.go @@ -0,0 +1,73 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func TestCreateActivityUpdateJobMigration(t *testing.T) { + skip.UnderStressRace(t) + defer leaktest.AfterTest(t)() + ctx := context.Background() + + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + false, + ) + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + defer db.Close() + + // NB: this isn't actually doing anything, since the table is baked into the + // bootstrap schema, so this is really just showing the upgrade is idempotent, + // but this is in line with the other tests of createSystemTable upgrades. + upgrades.Upgrade( + t, + db, + clusterversion.V23_1CreateSystemActivityUpdateJob, + nil, + false, + ) + + row := db.QueryRow("SELECT count(*) FROM system.public.jobs WHERE id = 103") + assert.NotNil(t, row) + assert.NoError(t, row.Err()) + var count int + err := row.Scan(&count) + assert.NoError(t, err) + assert.Equal(t, 1, count) +} diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index 09a037ff9ca5..1d2faddea175 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -304,6 +304,12 @@ var upgrades = []upgradebase.Upgrade{ toCV(clusterversion.V23_1_TenantIDSequence), tenantIDSequenceForSystemTenant, ), + upgrade.NewPermanentTenantUpgrade( + "create sql activity updater job", + toCV(clusterversion.V23_1CreateSystemActivityUpdateJob), + createActivityUpdateJobMigration, + "create statement_activity and transaction_activity job", + ), } func init() { From 39c0cec9cbceec7c22a269b1e8e085e2955a76b2 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 20 Apr 2023 19:02:50 +0000 Subject: [PATCH 6/6] kvserver: always bump epoch when acquiring expired epoch lease Previously, a lease acquisition only bumped the epoch of an expired epoch leaseholder if the new lease was also an epoch lease. However, we have to bump the epoch regardless of the new lease type, to properly invalidate the old leaseholder's leases. This patch bumps the epoch regardless of the new lease type. Epic: none Release note: None --- pkg/kv/kvserver/client_lease_test.go | 74 ++++++++++++++++++++++++++ pkg/kv/kvserver/replica_range_lease.go | 18 +++---- 2 files changed, 81 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 3a70e77f5616..7192670ba842 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -31,6 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1572,3 +1574,75 @@ func TestLeaseUpgradeVersionGate(t *testing.T) { }) tc.WaitForLeaseUpgrade(ctx, t, desc) } + +// TestLeaseRequestBumpsEpoch tests that a non-cooperative lease acquisition of +// an expired epoch lease always bumps the epoch of the outgoing leaseholder, +// regardless of the type of lease being acquired. +func TestLeaseRequestBumpsEpoch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "expLease", func(t *testing.T, expLease bool) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) + + manual := hlc.NewHybridManualClock() + args := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Required by TestCluster.MoveRangeLeaseNonCooperatively. + AllowLeaseRequestProposalsWhenNotLeader: true, + DisableAutomaticLeaseRenewal: true, + }, + Server: &server.TestingKnobs{ + WallClock: manual, + }, + }, + }, + } + tc := testcluster.StartTestCluster(t, 2, args) + defer tc.Stopper().Stop(ctx) + + // Create range and upreplicate. + key := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + desc := tc.LookupRangeOrFatal(t, key) + + // Make sure n1 has an epoch lease. + t0 := tc.Target(0) + tc.TransferRangeLeaseOrFatal(t, desc, t0) + prevLease, _, err := tc.FindRangeLease(desc, &t0) + require.NoError(t, err) + require.Equal(t, roachpb.LeaseEpoch, prevLease.Type()) + + // Non-cooperatively move the lease to n2. + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, expLease) + t1 := tc.Target(1) + newLease, err := tc.MoveRangeLeaseNonCooperatively(ctx, desc, t1, manual) + require.NoError(t, err) + require.NotNil(t, newLease) + if expLease { + require.Equal(t, roachpb.LeaseExpiration, newLease.Type()) + } else { + require.Equal(t, roachpb.LeaseEpoch, newLease.Type()) + } + + // Check that n1's liveness epoch was bumped. + l0 := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness) + livenesses, err := l0.GetLivenessesFromKV(ctx) + require.NoError(t, err) + var liveness livenesspb.Liveness + for _, l := range livenesses { + if l.NodeID == 1 { + liveness = l + break + } + } + require.NotZero(t, liveness) + require.Greater(t, liveness.Epoch, prevLease.Epoch) + }) +} diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 0efa3f352070..5697297348b1 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -313,7 +313,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( } } - if err := p.requestLeaseAsync(ctx, nextLeaseHolder, reqLease, status, leaseReq); err != nil { + if err := p.requestLeaseAsync(ctx, nextLeaseHolder, status, leaseReq); err != nil { // We failed to start the asynchronous task. Send a blank NotLeaseHolderError // back to indicate that we have no idea who the range lease holder might // be; we've withdrawn from active duty. @@ -336,11 +336,10 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( // specified replica. The request is sent in an async task. // // The status argument is used as the expected value for liveness operations. -// reqLease and leaseReq must be consistent with the LeaseStatus. +// leaseReq must be consistent with the LeaseStatus. func (p *pendingLeaseRequest) requestLeaseAsync( parentCtx context.Context, nextLeaseHolder roachpb.ReplicaDescriptor, - reqLease roachpb.Lease, status kvserverpb.LeaseStatus, leaseReq kvpb.Request, ) error { @@ -382,7 +381,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( func(ctx context.Context) { defer sp.Finish() - err := p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq) + err := p.requestLease(ctx, nextLeaseHolder, status, leaseReq) // Error will be handled below. // We reset our state below regardless of whether we've gotten an error or @@ -423,7 +422,6 @@ func (p *pendingLeaseRequest) requestLeaseAsync( func (p *pendingLeaseRequest) requestLease( ctx context.Context, nextLeaseHolder roachpb.ReplicaDescriptor, - reqLease roachpb.Lease, status kvserverpb.LeaseStatus, leaseReq kvpb.Request, ) error { @@ -432,12 +430,10 @@ func (p *pendingLeaseRequest) requestLease( p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds()) }() - // If requesting an epoch-based lease & current state is expired, - // potentially heartbeat our own liveness or increment epoch of - // prior owner. Note we only do this if the previous lease was - // epoch-based. - if reqLease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED && - status.Lease.Type() == roachpb.LeaseEpoch { + // If we're replacing an expired epoch-based lease, we must increment the + // epoch of the prior owner to invalidate its leases. If we were the owner, + // then we instead heartbeat to become live. + if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED { var err error // If this replica is previous & next lease holder, manually heartbeat to become live. if status.OwnedBy(nextLeaseHolder.StoreID) &&