From 999412243cddb95cc0a9571dd98c87da61edfd01 Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Mon, 29 Aug 2022 14:45:30 +0000 Subject: [PATCH 1/4] streamingccl: reduce server count in multinode tests While these tests would pass under stress locally they would fail CI stress, which may be because we were starting more server processes than ever before with 4 source nodes, 4 source tenant pods, and 4 destination nodes. This PR reduces the node count to 3 (any lower and scatter doesn't correctly distribute ranges) and only starts a single tenant pod for the source cluster. Release justification: test-only change Release note: None --- .../stream_replication_e2e_test.go | 39 ++++++------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index d48c16bd03dc..eef01b619f89 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -214,8 +214,6 @@ func createTenantStreamingClusters( // Start the source cluster. srcCluster, srcURL, srcCleanup := startTestCluster(ctx, t, serverArgs, args.srcNumNodes) - // Start the src cluster tenant with tenant pods on every node in the cluster, - // ensuring they're all active beofre proceeding. tenantArgs := base.TestTenantArgs{ TenantID: args.srcTenantID, TestingKnobs: base.TestingKnobs{ @@ -223,17 +221,8 @@ func createTenantStreamingClusters( AllowSplitAndScatter: true, }}, } - tenantConns := make([]*gosql.DB, 0) srcTenantServer, srcTenantConn := serverutils.StartTenant(t, srcCluster.Server(0), tenantArgs) - tenantConns = append(tenantConns, srcTenantConn) - for i := 1; i < args.srcNumNodes; i++ { - tenantPodArgs := tenantArgs - tenantPodArgs.DisableCreateTenant = true - tenantPodArgs.SkipTenantCheck = true - _, srcTenantPodConn := serverutils.StartTenant(t, srcCluster.Server(i), tenantPodArgs) - tenantConns = append(tenantConns, srcTenantPodConn) - } - waitForTenantPodsActive(t, srcTenantServer, args.srcNumNodes) + waitForTenantPodsActive(t, srcTenantServer, 1) // Start the destination cluster. destCluster, _, destCleanup := startTestCluster(ctx, t, serverArgs, args.destNumNodes) @@ -265,11 +254,7 @@ func createTenantStreamingClusters( // Enable stream replication on dest by default. tsc.destSysSQL.Exec(t, `SET enable_experimental_stream_replication = true;`) return tsc, func() { - for _, tenantConn := range tenantConns { - if tenantConn != nil { - require.NoError(t, tenantConn.Close()) - } - } + require.NoError(t, srcTenantConn.Close()) destCleanup() srcCleanup() } @@ -279,7 +264,7 @@ func (c *tenantStreamingClusters) srcExec(exec srcInitExecFunc) { exec(c.t, c.srcSysSQL, c.srcTenantSQL) } -func createScatteredTable(t *testing.T, c *tenantStreamingClusters) { +func createScatteredTable(t *testing.T, c *tenantStreamingClusters, numNodes int) { // Create a source table with multiple ranges spread across multiple nodes numRanges := 50 rowsPerRange := 20 @@ -289,7 +274,7 @@ func createScatteredTable(t *testing.T, c *tenantStreamingClusters) { ALTER TABLE d.scattered SPLIT AT (SELECT * FROM generate_series(%d, %d, %d)); ALTER TABLE d.scattered SCATTER; `, numRanges*rowsPerRange, rowsPerRange, (numRanges-1)*rowsPerRange, rowsPerRange)) - c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{"4"}}) + c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{fmt.Sprint(numNodes)}}) } var defaultSrcClusterSetting = map[string]string{ @@ -743,16 +728,17 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes too long with multiple nodes") - skip.WithIssue(t, 86287) ctx := context.Background() args := defaultTenantStreamingClustersArgs - args.srcNumNodes = 4 - args.destNumNodes = 4 + + args.srcNumNodes = 3 + args.destNumNodes = 3 + c, cleanup := createTenantStreamingClusters(ctx, t, args) defer cleanup() - createScatteredTable(t, c) + createScatteredTable(t, c, 3) srcScatteredData := c.srcTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key") producerJobID, ingestionJobID := c.startStreamReplication() @@ -931,12 +917,11 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes too long with multiple nodes") - skip.WithIssue(t, 86206) ctx := context.Background() args := defaultTenantStreamingClustersArgs - args.srcNumNodes = 4 - args.destNumNodes = 4 + args.srcNumNodes = 3 + args.destNumNodes = 3 // Track the number of unique addresses that were connected to clientAddresses := make(map[string]struct{}) @@ -952,7 +937,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { c, cleanup := createTenantStreamingClusters(ctx, t, args) defer cleanup() - createScatteredTable(t, c) + createScatteredTable(t, c, 3) producerJobID, ingestionJobID := c.startStreamReplication() jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) From 99718193119382e8fd1ce5bb60968eabd5e37a47 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 6 Sep 2022 15:44:39 +0200 Subject: [PATCH 2/4] cli,server: fix `--sql-advertise-addr` when `--sql-addr` is not specified Release justification: bug fix Release note (bug fix): The flag `--sql-advertise-addr` now properly works even when the SQL and RPC ports are shared (because `--sql-addr` was not specified). Note that this port sharing is a deprecated feature in v22.2. --- pkg/cli/flags.go | 10 ++++++---- pkg/cli/flags_test.go | 10 ++++++++++ pkg/server/start_listen.go | 12 ++++++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index 55819e1ac9ff..5e228bb40221 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -1049,17 +1049,19 @@ func extraServerFlagInit(cmd *cobra.Command) error { serverCfg.SplitListenSQL = changed(fs, cliflags.ListenSQLAddr.Name) // Fill in the defaults for --advertise-sql-addr, if the flag exists on `cmd`. - advSpecified := changed(fs, cliflags.AdvertiseAddr.Name) || + advHostSpecified := changed(fs, cliflags.AdvertiseAddr.Name) || changed(fs, cliflags.AdvertiseHost.Name) + advPortSpecified := changed(fs, cliflags.AdvertiseAddr.Name) || + changed(fs, cliflags.AdvertisePort.Name) if serverSQLAdvertiseAddr == "" { - if advSpecified { + if advHostSpecified { serverSQLAdvertiseAddr = serverAdvertiseAddr } else { serverSQLAdvertiseAddr = serverSQLAddr } } if serverSQLAdvertisePort == "" { - if advSpecified && !serverCfg.SplitListenSQL { + if advPortSpecified && !serverCfg.SplitListenSQL { serverSQLAdvertisePort = serverAdvertisePort } else { serverSQLAdvertisePort = serverSQLPort @@ -1097,7 +1099,7 @@ func extraServerFlagInit(cmd *cobra.Command) error { serverCfg.HTTPAddr = net.JoinHostPort(serverHTTPAddr, serverHTTPPort) if serverHTTPAdvertiseAddr == "" { - if advSpecified { + if advHostSpecified || advPortSpecified { serverHTTPAdvertiseAddr = serverAdvertiseAddr } else { serverHTTPAdvertiseAddr = serverHTTPAddr diff --git a/pkg/cli/flags_test.go b/pkg/cli/flags_test.go index 72a8a0c43cf2..ee7891cabf6b 100644 --- a/pkg/cli/flags_test.go +++ b/pkg/cli/flags_test.go @@ -714,6 +714,16 @@ func TestServerConnSettings(t *testing.T) { ":54321", "192.168.0.111:12345", ":54321", "192.168.0.111:12345", }, + {[]string{"start", "--listen-addr", "127.0.0.1", "--advertise-sql-addr", "192.168.0.111", "--port", "54321", "--advertise-port", "12345"}, + "127.0.0.1:54321", "127.0.0.1:12345", + "127.0.0.1:54321", "192.168.0.111:12345", + "127.0.0.1:54321", "192.168.0.111:12345", + }, + {[]string{"start", "--listen-addr", "127.0.0.1", "--advertise-sql-addr", "192.168.0.111:12345", "--port", "54321"}, + "127.0.0.1:54321", "127.0.0.1:54321", + "127.0.0.1:54321", "192.168.0.111:12345", + "127.0.0.1:54321", "192.168.0.111:12345", + }, } for i, td := range testData { diff --git a/pkg/server/start_listen.go b/pkg/server/start_listen.go index 6e0a21d7cdb2..a6072668b86a 100644 --- a/pkg/server/start_listen.go +++ b/pkg/server/start_listen.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" ) // startListenRPCAndSQL starts the RPC and SQL listeners. @@ -89,11 +90,14 @@ func startListenRPCAndSQL( pgL = m.Match(func(r io.Reader) bool { return pgwire.Match(r) }) - // Also if the pg port is not split, the actual listen - // and advertise addresses for SQL become equal to that - // of RPC, regardless of what was configured. + // Also if the pg port is not split, the actual listen address for + // SQL become equal to that of RPC. cfg.SQLAddr = cfg.Addr - cfg.SQLAdvertiseAddr = cfg.AdvertiseAddr + // Then we update the advertised addr with the right port, if + // the port had been auto-allocated. + if err := UpdateAddrs(ctx, &cfg.SQLAddr, &cfg.SQLAdvertiseAddr, ln.Addr()); err != nil { + return nil, nil, errors.Wrapf(err, "internal error") + } } anyL := m.Match(cmux.Any()) From 52eda079b6e7ec2b6f6ca244e11917474a13093f Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 6 Sep 2022 18:03:01 -0400 Subject: [PATCH 3/4] upgrade/upgrades: allow CreatedAtNanos to be set when validating migration Schema change upgrade migrations to system tables are made idempotent by checking that the descriptor reaches some expected state. In order to ensure that it is in that expected state, some volatile fields need to be masked. We forgot to mask CreatedAtNanos. We also lost the testing which came with these helper functions we use. The vast majority of this PR is reviving testing from #66889. Fixes #85228. Release justification: Import bug fix for backport Release note (bug fix): Some upgrade migrations perform schema changes on system tables. Those upgrades which added indexes could, in some cases, get caught retrying because they failed to detect that the migration had already occurred due to the existence of a populated field. When that happens, the finalization of the new version can hang indefinitely and require manual intervention. This bug has been fixed. --- pkg/upgrade/upgrades/BUILD.bazel | 6 + pkg/upgrade/upgrades/helpers_test.go | 12 +- pkg/upgrade/upgrades/schema_changes.go | 8 +- .../upgrades/schema_changes_external_test.go | 492 ++++++++++++++++++ .../upgrades/schema_changes_helpers_test.go | 78 +++ 5 files changed, 594 insertions(+), 2 deletions(-) create mode 100644 pkg/upgrade/upgrades/schema_changes_external_test.go create mode 100644 pkg/upgrade/upgrades/schema_changes_helpers_test.go diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 8476a08299ee..fac437053e7d 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -76,6 +76,8 @@ go_test( "role_id_migration_test.go", "role_options_migration_test.go", "sampled_stmt_diagnostics_requests_test.go", + "schema_changes_external_test.go", + "schema_changes_helpers_test.go", "system_privileges_test.go", "update_invalid_column_ids_in_sequence_back_references_external_test.go", "upgrade_sequence_to_be_referenced_by_ID_external_test.go", @@ -94,6 +96,7 @@ go_test( "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/kv", + "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/security/username", @@ -116,16 +119,19 @@ go_test( "//pkg/sql/sem/tree", "//pkg/sql/tests", "//pkg/sql/types", + "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/upgrade", "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", diff --git a/pkg/upgrade/upgrades/helpers_test.go b/pkg/upgrade/upgrades/helpers_test.go index b1dacb8603f8..d48e4e9ccd55 100644 --- a/pkg/upgrade/upgrades/helpers_test.go +++ b/pkg/upgrade/upgrades/helpers_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -45,6 +46,12 @@ type Schema struct { // Upgrade runs cluster upgrade by changing the 'version' cluster setting. func Upgrade( t *testing.T, sqlDB *gosql.DB, key clusterversion.Key, done chan struct{}, expectError bool, +) { + UpgradeToVersion(t, sqlDB, clusterversion.ByKey(key), done, expectError) +} + +func UpgradeToVersion( + t *testing.T, sqlDB *gosql.DB, v roachpb.Version, done chan struct{}, expectError bool, ) { defer func() { if done != nil { @@ -52,7 +59,7 @@ func Upgrade( } }() _, err := sqlDB.Exec(`SET CLUSTER SETTING version = $1`, - clusterversion.ByKey(key).String()) + v.String()) if expectError { assert.Error(t, err) return @@ -147,3 +154,6 @@ func GetTable( require.NoError(t, err) return table } + +// WaitForJobStatement is exported so that it can be detected by a testing knob. +const WaitForJobStatement = waitForJobStatement diff --git a/pkg/upgrade/upgrades/schema_changes.go b/pkg/upgrade/upgrades/schema_changes.go index cda89ec997e9..a4d25773a4ab 100644 --- a/pkg/upgrade/upgrades/schema_changes.go +++ b/pkg/upgrade/upgrades/schema_changes.go @@ -43,6 +43,10 @@ type operation struct { schemaExistsFn func(catalog.TableDescriptor, catalog.TableDescriptor, string) (bool, error) } +// waitForJobStatement is the statement used to wait for an ongoing job to +// complete. +const waitForJobStatement = "SHOW JOBS WHEN COMPLETE VALUES ($1)" + // migrateTable is run during an upgrade to a new version and changes an existing // table's schema based on schemaChangeQuery. The schema-change is ignored if the // table already has the required changes. @@ -92,7 +96,7 @@ func migrateTable( for _, mutation := range mutations { log.Infof(ctx, "waiting for the mutation job %v to complete", mutation.JobID) if _, err := d.InternalExecutor.Exec(ctx, "migration-mutations-wait", - nil, "SHOW JOB WHEN COMPLETE $1", mutation.JobID); err != nil { + nil, waitForJobStatement, mutation.JobID); err != nil { return err } } @@ -242,6 +246,8 @@ func hasIndex(storedTable, expectedTable catalog.TableDescriptor, indexName stri expectedCopy.StoreColumnNames = []string{} storedCopy.StoreColumnIDs = []descpb.ColumnID{0, 0, 0} expectedCopy.StoreColumnIDs = []descpb.ColumnID{0, 0, 0} + storedCopy.CreatedAtNanos = 0 + expectedCopy.CreatedAtNanos = 0 if err = ensureProtoMessagesAreEqual(&expectedCopy, &storedCopy); err != nil { return false, err diff --git a/pkg/upgrade/upgrades/schema_changes_external_test.go b/pkg/upgrade/upgrades/schema_changes_external_test.go new file mode 100644 index 000000000000..75b039208eb7 --- /dev/null +++ b/pkg/upgrade/upgrades/schema_changes_external_test.go @@ -0,0 +1,492 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades_test + +import ( + "context" + "math" + "regexp" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/upgrade" + "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestMigrationWithFailures tests modification of a table during +// migration with different failures. It tests the system behavior with failure +// combinations of the migration job and schema-change jobs at different stages +// in their progress. +// +// This test was originally written in support of the migration which added +// exponential backoff to the system.jobs table, but was retrofitted to prevent +// regressions. +func TestMigrationWithFailures(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t, "very slow") + + // We're going to be migrating from startCV to endCV. + startCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 2041}} + endCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 2042}} + + // The tests follows the following procedure. + // + // Inject the old table descriptor and ensure that the system is using the + // deprecated jobs-table. + // + // Start migration, which initiates two schema-change jobs one by one. Test + // the system for each schema-change job separately. Later on, we inject + // failure in this migration, causing it to fail. + // + // Depending on the test setting, intercept the target schema-change job, + // preventing the job from progressing. We may cancel this schema-change or + // let it succeed to test different scenarios. + // + // Cancel the migration, causing the migration to revert and fail. + // + // Wait for the canceled migration-job to finish, expecting its failure. The + // schema-change job is still not progressing to control what the restarted + // migration will observe. + // + // Restart the migration, expecting it to succeed. Depending on the test setting, + // the intercepted schema-change job may wail for the migration job to resume. + // If it does, the migration job is expected to observe the ongoing schema-change. + // The ongoing schema-change is canceled or not, depending on the test case. + // In either case, we expect the correct number of mutations to be skipped + // during the migration. + // + // If we canceled the schema-job, expect it to rerun + // as part of the migration. Otherwise, expect the schema-change to be ignored + // during the migration. + // + // Finally, we validate that the schema changes are in effect by reading the new + // columns and the index, and by running a job that is failed and retried to + // practice exponential-backoff machinery. + + const createTableBefore = ` +CREATE TABLE test.test_table ( + id INT8 DEFAULT unique_rowid() PRIMARY KEY, + status STRING NOT NULL, + created TIMESTAMP NOT NULL DEFAULT now(), + payload BYTES NOT NULL, + progress BYTES, + created_by_type STRING, + created_by_id INT, + claim_session_id BYTES, + claim_instance_id INT8, + INDEX (status, created), + INDEX (created_by_type, created_by_id) STORING (status), + FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id), + FAMILY progress (progress), + FAMILY claim (claim_session_id, claim_instance_id) +); +` + const createTableAfter = ` +CREATE TABLE test.test_table ( + id INT8 DEFAULT unique_rowid() PRIMARY KEY, + status STRING NOT NULL, + created TIMESTAMP NOT NULL DEFAULT now(), + payload BYTES NOT NULL, + progress BYTES, + created_by_type STRING, + created_by_id INT, + claim_session_id BYTES, + claim_instance_id INT8, + num_runs INT8, + last_run TIMESTAMP, + INDEX (status, created), + INDEX (created_by_type, created_by_id) STORING (status), + INDEX jobs_run_stats_idx ( + claim_session_id, + status, + created + ) STORING(last_run, num_runs, claim_instance_id) + WHERE ` + systemschema.JobsRunStatsIdxPredicate + `, + FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id), + FAMILY progress (progress), + FAMILY claim (claim_session_id, claim_instance_id, num_runs, last_run) +); +` + + for _, test := range []struct { + // Test identifier. + name string + // Job status when the job is intercepted while transitioning to the intercepted status. + query string + // Whether the schema-change job should wait for the migration to restart + // after failure before proceeding. + waitForMigrationRestart bool + // Cancel the intercepted schema-change to inject a failure during migration. + cancelSchemaJob bool + // Expected number of schema-changes that are skipped during migration. + expectedSkipped int + }{ + { + name: "adding columns", + query: upgrades.TestingAddColsQuery, + waitForMigrationRestart: false, // Does not matter. + cancelSchemaJob: false, // Does not matter. + expectedSkipped: 0, // Will be ignored. + }, + { + name: "adding index", + query: upgrades.TestingAddIndexQuery, + waitForMigrationRestart: false, // Does not matter. + cancelSchemaJob: false, // Does not matter. + expectedSkipped: 0, // Will be ignored. + }, + { + name: "fail adding columns", + query: upgrades.TestingAddColsQuery, + waitForMigrationRestart: true, // Need to wait to observe failing schema change. + cancelSchemaJob: true, // To fail adding columns. + expectedSkipped: 0, + }, + { + name: "fail adding index", + query: upgrades.TestingAddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe failing schema change. + cancelSchemaJob: true, // To fail adding index. + expectedSkipped: 1, // Columns must not be added again. + }, + { + name: "skip none", + query: upgrades.TestingAddColsQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: true, // To fail adding index and skip adding column. + expectedSkipped: 0, // Both columns and index must be added. + }, + { + name: "skip adding columns", + query: upgrades.TestingAddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: true, // To fail adding index and skip adding column. + expectedSkipped: 1, // Columns must not be added again. + }, + { + name: "skip adding columns and index", + query: upgrades.TestingAddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: false, // To fail adding index and skip adding column. + expectedSkipped: 2, // Both columns and index must not be added again. + }, + } { + t.Run(test.name, func(t *testing.T) { + scope := log.Scope(t) + defer scope.Close(t) + + type updateEvent struct { + orig, updated jobs.JobMetadata + errChan chan error + } + + ctx := context.Background() + cancelCtx, cancel := context.WithCancel(ctx) + // To intercept the schema-change and the migration job. + updateEventChan := make(chan updateEvent) + var enableUpdateEventCh syncutil.AtomicBool + enableUpdateEventCh.Set(false) + beforeUpdate := func(orig, updated jobs.JobMetadata) error { + if !enableUpdateEventCh.Get() { + return nil + } + ue := updateEvent{ + orig: orig, + updated: updated, + errChan: make(chan error), + } + select { + case updateEventChan <- ue: + case <-cancelCtx.Done(): + return cancelCtx.Err() + } + select { + case err := <-ue.errChan: + return err + case <-cancelCtx.Done(): + return cancelCtx.Err() + } + } + var schemaEvent updateEvent + migrationWaitCh := make(chan struct{}) + + // Number of schema-change jobs that are skipped. + settings := cluster.MakeTestingClusterSettingsWithVersions( + endCV.Version, startCV.Version, false, /* initializeVersion */ + ) + require.NoError(t, clusterversion.Initialize( + ctx, startCV.Version, &settings.SV, + )) + jobsKnobs := jobs.NewTestingKnobsWithShortIntervals() + jobsKnobs.BeforeUpdate = beforeUpdate + migrationFunc, expectedDescriptor := upgrades. + MakeFakeMigrationForTestMigrationWithFailures() + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: startCV.Version, + }, + JobsTestingKnobs: jobsKnobs, + SQLExecutor: &sql.ExecutorTestingKnobs{ + BeforeExecute: func(ctx context.Context, stmt string) { + if stmt == upgrades.WaitForJobStatement { + select { + case migrationWaitCh <- struct{}{}: + case <-ctx.Done(): + } + } + }, + }, + UpgradeManager: &upgrade.TestingKnobs{ + ListBetweenOverride: func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion { + return []clusterversion.ClusterVersion{ + endCV, + } + }, + RegistryOverride: func(cv clusterversion.ClusterVersion) (upgrade.Upgrade, bool) { + if cv.Equal(endCV) { + return upgrade.NewTenantUpgrade("testing", + endCV, + upgrades.NoPrecondition, + migrationFunc, + ), true + } + panic("unexpected version") + }}, + }, + }, + } + tc := testcluster.StartTestCluster(t, 1, clusterArgs) + defer tc.Stopper().Stop(ctx) + defer cancel() + s := tc.Server(0) + sqlDB := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + // Build the expected table descriptor, inject it into the + // migration function, drop it, and then add the descriptor + // in the pre-migration state. + tdb.Exec(t, "CREATE DATABASE test") + tdb.Exec(t, createTableAfter) + var desc catalog.TableDescriptor + require.NoError(t, s.CollectionFactory().(*descs.CollectionFactory).Txn(ctx, s.DB(), func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) (err error) { + tn := tree.MakeTableNameWithSchema("test", "public", "test_table") + _, desc, err = descriptors.GetImmutableTableByName(ctx, txn, &tn, tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + Required: true, + AvoidLeased: true, + }, + }) + return err + })) + tdb.Exec(t, "DROP TABLE test.test_table") + tdb.Exec(t, createTableBefore) + expectedDescriptor.Store(desc) + enableUpdateEventCh.Set(true) + + // Run the migration, expecting failure. + t.Log("trying migration, expecting to fail") + // Channel to wait for the migration job to complete. + finishChan := make(chan struct{}) + go upgrades.UpgradeToVersion( + t, sqlDB, endCV.Version, finishChan, true, /* expectError */ + ) + + var migJobID jobspb.JobID + // Intercept the target schema-change job and get migration-job's ID. + t.Log("intercepting the schema job") + for { + e := <-updateEventChan + // The migration job creates schema-change jobs. Therefore, we are guaranteed + // to get the migration-job's ID before canceling the job later on. + if e.orig.Payload.Type() == jobspb.TypeMigration { + migJobID = e.orig.ID + e.errChan <- nil + continue + } + schemaQuery := strings.Replace(e.orig.Payload.Description, "test.public.test_table", "test.test_table", -1) + testQuery := removeSpaces(test.query) + testQuery = strings.ReplaceAll(testQuery, ":::STRING", "") + if testQuery == schemaQuery { + // Intercepted the target schema-change. + schemaEvent = e + t.Logf("intercepted schema change job: %v", e.orig.ID) + break + } + // Ignore all other job updates. + e.errChan <- nil + } + // Cancel the migration job. + t.Log("canceling the migration job") + go cancelJob(t, ctx, s, migJobID) + + // Wait for the migration job to finish while preventing the intercepted + // schema-change job from progressing. + t.Log("waiting for the migration job to finish.") + testutils.SucceedsSoon(t, func() error { + for { + select { + case <-finishChan: + return nil + case e := <-updateEventChan: + e.errChan <- nil + default: + return errors.Errorf("waiting for the migration job to finish.") + } + } + }) + + // Let all jobs to continue until test's completion, except the intercepted + // schema-change job that we resume later on. + go func() { + for { + var e updateEvent + select { + case e = <-updateEventChan: + close(e.errChan) + case <-cancelCtx.Done(): + return + } + } + }() + + // Restart the migration job. + t.Log("retrying migration, expecting to succeed") + go upgrades.UpgradeToVersion(t, sqlDB, endCV.Version, finishChan, false /* expectError */) + + // Wait until the new migration job observes an existing mutation job. + if test.waitForMigrationRestart { + t.Log("waiting for the migration job to observe a mutation") + <-migrationWaitCh + } + + t.Log("resuming the schema change job") + // If configured so, mark the schema-change job to cancel. + if test.cancelSchemaJob { + cancelJob(t, ctx, s, schemaEvent.orig.ID) + } + // Resume the schema-change job and all other jobs. + schemaEvent.errChan <- nil + + // If canceled the job, wait for the job to finish. + if test.cancelSchemaJob { + t.Log("waiting for the schema job to reach the cancel status") + waitUntilState(t, tdb, schemaEvent.orig.ID, jobs.StatusCanceled) + } + // Ensure all migrations complete. + go func() { + for { + select { + case <-migrationWaitCh: + case <-cancelCtx.Done(): + return + } + } + }() + + // Wait for the migration to complete, expecting success. + t.Logf("waiting for the new migration job to complete.") + testutils.SucceedsSoon(t, func() error { + select { + case <-finishChan: + return nil + default: + } + return errors.Errorf("waiting for the migration job to finish.") + }) + if test.waitForMigrationRestart { + // Ensure that we have observed the expected number of ignored schema change jobs. + log.Flush() + entries, err := log.FetchEntriesFromFiles( + 0, math.MaxInt64, 10000, + regexp.MustCompile("skipping.*operation as the schema change already exists."), + log.WithFlattenedSensitiveData, + ) + require.NoError(t, err) + require.Len(t, entries, test.expectedSkipped) + } + }) + } +} + +// cancelJob marks the given job as cancel-requested, leading the job to be +// canceled. +func cancelJob( + t *testing.T, ctx context.Context, s serverutils.TestServerInterface, jobID jobspb.JobID, +) { + err := s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Using this way of canceling because the migration job us non-cancelable. + // Canceling in this way skips the check. + return s.JobRegistry().(*jobs.Registry).UpdateJobWithTxn( + ctx, jobID, txn, false /* useReadLock */, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, + ) error { + ju.UpdateStatus(jobs.StatusCancelRequested) + return nil + }) + }) + assert.NoError(t, err) +} + +// waitUntilState waits until the specified job reaches to given state. +func waitUntilState( + t *testing.T, tdb *sqlutils.SQLRunner, jobID jobspb.JobID, expectedStatus jobs.Status, +) { + testutils.SucceedsSoon(t, func() error { + var status jobs.Status + tdb.QueryRow(t, + "SELECT status FROM system.jobs WHERE id = $1", jobID, + ).Scan(&status) + if status == expectedStatus { + return nil + } + return errors.Errorf( + "waiting for job %v to reach status %v, current status is %v", + jobID, expectedStatus, status) + }) +} + +func removeSpaces(stmt string) string { + stmt = strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(stmt, " ")) + stmt = strings.ReplaceAll(stmt, "( ", "(") + stmt = strings.ReplaceAll(stmt, " )", ")") + return stmt +} diff --git a/pkg/upgrade/upgrades/schema_changes_helpers_test.go b/pkg/upgrade/upgrades/schema_changes_helpers_test.go new file mode 100644 index 000000000000..c337086cc7ad --- /dev/null +++ b/pkg/upgrade/upgrades/schema_changes_helpers_test.go @@ -0,0 +1,78 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades + +import ( + "context" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/upgrade" +) + +const ( + // TestingAddColsQuery is used by TestMigrationWithFailures. + TestingAddColsQuery = ` +ALTER TABLE test.test_table + ADD COLUMN num_runs INT8 FAMILY claim, + ADD COLUMN last_run TIMESTAMP FAMILY claim` + + // TestingAddIndexQuery is used by TestMigrationWithFailures. + TestingAddIndexQuery = ` +CREATE INDEX jobs_run_stats_idx + ON test.test_table (claim_session_id, status, created) + STORING (last_run, num_runs, claim_instance_id) + WHERE ` + systemschema.JobsRunStatsIdxPredicate +) + +// MakeFakeMigrationForTestMigrationWithFailures makes the migration function +// used in the +func MakeFakeMigrationForTestMigrationWithFailures() ( + m upgrade.TenantUpgradeFunc, + expectedTableDescriptor *atomic.Value, +) { + expectedTableDescriptor = &atomic.Value{} + return func( + ctx context.Context, cs clusterversion.ClusterVersion, d upgrade.TenantDeps, _ *jobs.Job, + ) error { + row, err := d.InternalExecutor.QueryRow(ctx, "look-up-id", nil, /* txn */ + `select id from system.namespace where name = $1`, "test_table") + if err != nil { + return err + } + tableID := descpb.ID(tree.MustBeDInt(row[0])) + for _, op := range []operation{ + { + name: "jobs-add-columns", + schemaList: []string{"num_runs", "last_run"}, + query: TestingAddColsQuery, + schemaExistsFn: hasColumn, + }, + { + name: "jobs-add-index", + schemaList: []string{"jobs_run_stats_idx"}, + query: TestingAddIndexQuery, + schemaExistsFn: hasIndex, + }, + } { + expected := expectedTableDescriptor.Load().(catalog.TableDescriptor) + if err := migrateTable(ctx, cs, d, op, tableID, expected); err != nil { + return err + } + } + return nil + }, expectedTableDescriptor +} From 653f0521f68531331048b4bab092a765008a5991 Mon Sep 17 00:00:00 2001 From: Eric Harmeling Date: Tue, 6 Sep 2022 09:35:30 -0400 Subject: [PATCH 4/4] ui: update txn contention insights to use waiting txns as event This commit updates the transaction workload insights pages to use the waiting contended transaction as the primary contention event, rather than the blocking transaction. Fixes #87284. Release justification: bug fixes and low-risk updates to new functionality Release note: None --- .../cluster-ui/src/api/insightsApi.ts | 121 +++++++++--------- .../detailsPanels/waitTimeInsightsPanel.tsx | 6 +- .../cluster-ui/src/insights/types.ts | 6 +- .../transactionInsightDetails.tsx | 12 +- .../statementInsightsView.tsx | 4 +- .../transactionInsightsView.tsx | 4 +- 6 files changed, 79 insertions(+), 74 deletions(-) diff --git a/pkg/ui/workspaces/cluster-ui/src/api/insightsApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/insightsApi.ts index 16dbcb122edf..be568981d64f 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/insightsApi.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/insightsApi.ts @@ -52,14 +52,14 @@ export type TransactionContentionEventsResponse = // txnContentionQuery selects all relevant transaction contention events. const txnContentionQuery = `SELECT * - FROM (SELECT blocking_txn_id, + FROM (SELECT waiting_txn_id, encode( - blocking_txn_fingerprint_id, 'hex' - ) AS blocking_txn_fingerprint_id, + waiting_txn_fingerprint_id, 'hex' + ) AS waiting_txn_fingerprint_id, collection_ts, contention_duration, row_number() over ( - PARTITION BY blocking_txn_fingerprint_id + PARTITION BY waiting_txn_fingerprint_id ORDER BY collection_ts DESC ) AS rank, @@ -67,7 +67,7 @@ const txnContentionQuery = `SELECT * FROM (SELECT "sql.insights.latency_threshold" :: INTERVAL AS threshold FROM [SHOW CLUSTER SETTING sql.insights.latency_threshold]), - (SELECT DISTINCT ON (blocking_txn_id) * + (SELECT DISTINCT ON (waiting_txn_id) * FROM crdb_internal.transaction_contention_events tce), (SELECT txn_id FROM crdb_internal.cluster_execution_insights) WHERE contention_duration > threshold @@ -75,8 +75,8 @@ const txnContentionQuery = `SELECT * WHERE rank = 1`; type TransactionContentionResponseColumns = { - blocking_txn_id: string; - blocking_txn_fingerprint_id: string; + waiting_txn_id: string; + waiting_txn_fingerprint_id: string; collection_ts: string; contention_duration: string; threshold: string; @@ -84,15 +84,15 @@ type TransactionContentionResponseColumns = { function transactionContentionResultsToEventState( response: SqlExecutionResponse, -): TransactionContentionEventState[] { +): TransactionContentionEventsResponse { if (!response.execution.txn_results[0].rows) { // No transaction contention events. return []; } return response.execution.txn_results[0].rows.map(row => ({ - transactionID: row.blocking_txn_id, - fingerprintID: row.blocking_txn_fingerprint_id, + transactionID: row.waiting_txn_id, + fingerprintID: row.waiting_txn_fingerprint_id, startTime: moment(row.collection_ts), contentionDuration: moment.duration(row.contention_duration), contentionThreshold: moment.duration(row.threshold).asMilliseconds(), @@ -196,7 +196,7 @@ export function getTransactionInsightEventState(): Promise row.blocking_txn_fingerprint_id); + const txnFingerprintIDs = res.map(row => row.waiting_txn_fingerprint_id); const txnFingerprintRequest: SqlExecutionRequest = { statements: [ { @@ -209,10 +209,12 @@ export function getTransactionInsightEventState(): Promise( txnFingerprintRequest, ).then(txnStmtFingerprintResults => { - const stmtFingerprintIDs = - txnStmtFingerprintResults.execution.txn_results[0].rows?.map( - row => row.query_ids, - ); + const txnStmtRes = + txnStmtFingerprintResults.execution.txn_results[0].rows; + if (!txnStmtRes || txnStmtRes.length < 1) { + return; + } + const stmtFingerprintIDs = txnStmtRes.map(row => row.query_ids); const fingerprintStmtsRequest: SqlExecutionRequest = { statements: [ { @@ -238,7 +240,7 @@ export function getTransactionInsightEventState(): Promise; export type TransactionContentionEventDetailsResponse = @@ -324,17 +326,17 @@ const txnContentionDetailsQuery = (id: string) => `SELECT collection_ts, LEFT OUTER JOIN crdb_internal.ranges AS ranges ON tce.contending_key BETWEEN ranges.start_key AND ranges.end_key - WHERE blocking_txn_id = '${id}' + WHERE waiting_txn_id = '${id}' `; type TxnContentionDetailsResponseColumns = { - blocking_txn_id: string; + waiting_txn_id: string; + waiting_txn_fingerprint_id: string; collection_ts: string; contention_duration: string; threshold: string; + blocking_txn_id: string; blocking_txn_fingerprint_id: string; - waiting_txn_id: string; - waiting_txn_fingerprint_id: string; schema_name: string; database_name: string; table_name: string; @@ -351,13 +353,13 @@ function transactionContentionDetailsResultsToEventState( } const row = response.execution.txn_results[0].rows[0]; return { - executionID: row.blocking_txn_id, + executionID: row.waiting_txn_id, + fingerprintID: row.waiting_txn_fingerprint_id, startTime: moment(row.collection_ts), elapsedTime: moment.duration(row.contention_duration).asMilliseconds(), contentionThreshold: moment.duration(row.threshold).asMilliseconds(), - fingerprintID: row.blocking_txn_fingerprint_id, - waitingExecutionID: row.waiting_txn_id, - waitingFingerprintID: row.waiting_txn_fingerprint_id, + blockingExecutionID: row.blocking_txn_id, + blockingFingerprintID: row.blocking_txn_fingerprint_id, schemaName: row.schema_name, databaseName: row.database_name, tableName: row.table_name, @@ -388,64 +390,63 @@ export function getTransactionInsightEventDetailsState( if (!res || res.length < 1) { return; } - const blockingTxnFingerprintId = res[0].blocking_txn_fingerprint_id; - const blockingTxnFingerprintRequest: SqlExecutionRequest = { + const waitingTxnFingerprintId = res[0].waiting_txn_fingerprint_id; + const waitingTxnFingerprintRequest: SqlExecutionRequest = { statements: [ { - sql: `${txnStmtFingerprintsQuery(blockingTxnFingerprintId)}`, + sql: `${txnStmtFingerprintsQuery(waitingTxnFingerprintId)}`, }, ], execute: true, max_result_size: 50000, // 50 kib }; return executeInternalSql( - blockingTxnFingerprintRequest, - ).then(blockingTxnStmtFingerprintIDs => { - const blockingStmtFingerprintIDs = - blockingTxnStmtFingerprintIDs.execution.txn_results[0].rows[0] - .query_ids; - const blockingFingerprintStmtsRequest: SqlExecutionRequest = { + waitingTxnFingerprintRequest, + ).then(waitingTxnStmtFingerprintIDs => { + const waitingStmtFingerprintIDs = + waitingTxnStmtFingerprintIDs.execution.txn_results[0].rows[0].query_ids; + const waitingFingerprintStmtsRequest: SqlExecutionRequest = { statements: [ { - sql: `${fingerprintStmtsQuery(blockingStmtFingerprintIDs)}`, + sql: `${fingerprintStmtsQuery(waitingStmtFingerprintIDs)}`, }, ], execute: true, max_result_size: 50000, // 50 kib }; return executeInternalSql( - blockingFingerprintStmtsRequest, - ).then(blockingTxnStmtQueries => { - const waitingTxnFingerprintId = + waitingFingerprintStmtsRequest, + ).then(waitingTxnStmtQueries => { + const blockingTxnFingerprintId = contentionResults.execution.txn_results[0].rows[0] - .waiting_txn_fingerprint_id; - const waitingTxnFingerprintRequest: SqlExecutionRequest = { + .blocking_txn_fingerprint_id; + const blockingTxnFingerprintRequest: SqlExecutionRequest = { statements: [ { - sql: `${txnStmtFingerprintsQuery(waitingTxnFingerprintId)}`, + sql: `${txnStmtFingerprintsQuery(blockingTxnFingerprintId)}`, }, ], execute: true, max_result_size: 50000, // 50 kib }; return executeInternalSql( - waitingTxnFingerprintRequest, - ).then(waitingTxnStmtFingerprintIDs => { - const waitingStmtFingerprintIDs = - waitingTxnStmtFingerprintIDs.execution.txn_results[0].rows[0] + blockingTxnFingerprintRequest, + ).then(blockingTxnStmtFingerprintIDs => { + const blockingStmtFingerprintIDs = + blockingTxnStmtFingerprintIDs.execution.txn_results[0].rows[0] .query_ids; - const waitingFingerprintStmtsRequest: SqlExecutionRequest = { + const blockingFingerprintStmtsRequest: SqlExecutionRequest = { statements: [ { - sql: `${fingerprintStmtsQuery(waitingStmtFingerprintIDs)}`, + sql: `${fingerprintStmtsQuery(blockingStmtFingerprintIDs)}`, }, ], execute: true, max_result_size: 50000, // 50 kib }; return executeInternalSql( - waitingFingerprintStmtsRequest, - ).then(waitingTxnStmtQueries => { + blockingFingerprintStmtsRequest, + ).then(blockingTxnStmtQueries => { return combineTransactionInsightEventDetailsState( transactionContentionDetailsResultsToEventState( contentionResults, @@ -454,10 +455,10 @@ export function getTransactionInsightEventDetailsState( blockingTxnStmtFingerprintIDs, ), txnStmtFingerprintsResultsToEventState( - waitingTxnStmtFingerprintIDs, + blockingTxnStmtFingerprintIDs, ), - fingerprintStmtsResultsToEventState(blockingTxnStmtQueries), fingerprintStmtsResultsToEventState(waitingTxnStmtQueries), + fingerprintStmtsResultsToEventState(blockingTxnStmtQueries), ); }); }); @@ -468,31 +469,31 @@ export function getTransactionInsightEventDetailsState( export function combineTransactionInsightEventDetailsState( txnContentionDetailsState: TransactionContentionEventDetailsResponse, - blockingTxnFingerprintState: TxnStmtFingerprintEventsResponse, waitingTxnFingerprintState: TxnStmtFingerprintEventsResponse, - blockingFingerprintStmtState: FingerprintStmtsEventsResponse, + blockingTxnFingerprintState: TxnStmtFingerprintEventsResponse, waitingFingerprintStmtState: FingerprintStmtsEventsResponse, + blockingFingerprintStmtState: FingerprintStmtsEventsResponse, ): TransactionInsightEventDetailsState { let res: TransactionInsightEventDetailsState; if ( txnContentionDetailsState && - blockingTxnFingerprintState && waitingTxnFingerprintState && - blockingFingerprintStmtState && - waitingFingerprintStmtState + blockingTxnFingerprintState && + waitingFingerprintStmtState && + blockingFingerprintStmtState ) { res = { ...txnContentionDetailsState, application: blockingTxnFingerprintState[0].application, - queries: blockingTxnFingerprintState[0].queryIDs.map( + queries: waitingTxnFingerprintState[0].queryIDs.map( id => - blockingFingerprintStmtState.find( + waitingFingerprintStmtState.find( stmt => stmt.stmtFingerprintID === id, )?.query, ), - waitingQueries: waitingTxnFingerprintState[0].queryIDs.map( + blockingQueries: blockingTxnFingerprintState[0].queryIDs.map( id => - waitingFingerprintStmtState.find( + blockingFingerprintStmtState.find( stmt => stmt.stmtFingerprintID === id, )?.query, ), diff --git a/pkg/ui/workspaces/cluster-ui/src/detailsPanels/waitTimeInsightsPanel.tsx b/pkg/ui/workspaces/cluster-ui/src/detailsPanels/waitTimeInsightsPanel.tsx index e16679d49613..8fcb5114efee 100644 --- a/pkg/ui/workspaces/cluster-ui/src/detailsPanels/waitTimeInsightsPanel.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/detailsPanels/waitTimeInsightsPanel.tsx @@ -37,8 +37,12 @@ export const WaitTimeInsightsLabels = { `${capitalize(execType)} ID: ${id} waiting on`, WAITING_TXNS_TABLE_TITLE: (id: string, execType: ExecutionType): string => `${capitalize(execType)}s waiting for ID: ${id}`, + BLOCKED_TXNS_TABLE_TITLE: (id: string, execType: ExecutionType): string => + `${capitalize(execType)} with ID ${id} waited on`, WAITED_TXNS_TABLE_TITLE: (id: string, execType: ExecutionType): string => - `${capitalize(execType)}s that waited for ID: ${id}`, + `${capitalize(execType)}s that waited for ${capitalize( + execType, + )}s with ID ${id}`, }; type WaitTimeInsightsPanelProps = { diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/types.ts b/pkg/ui/workspaces/cluster-ui/src/insights/types.ts index bbe76ceb1d26..97d22ac78347 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/types.ts +++ b/pkg/ui/workspaces/cluster-ui/src/insights/types.ts @@ -47,9 +47,9 @@ export type TransactionInsightEventDetails = { contentionThreshold: number; application: string; fingerprintID: string; - waitingExecutionID: string; - waitingFingerprintID: string; - waitingQueries: string[]; + blockingExecutionID: string; + blockingFingerprintID: string; + blockingQueries: string[]; contendedKey: string; schemaName: string; databaseName: string; diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetails.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetails.tsx index a098561c1ad3..9ce58c547686 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetails.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetails.tsx @@ -119,11 +119,11 @@ export class TransactionInsightDetails extends React.Component - {WaitTimeInsightsLabels.WAITED_TXNS_TABLE_TITLE( + {WaitTimeInsightsLabels.BLOCKED_TXNS_TABLE_TITLE( insightDetails.executionID, insightDetails.execType, )}
diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsView.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsView.tsx index 680f99b96259..ee61bcc88700 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsView.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsView.tsx @@ -251,7 +251,7 @@ export const StatementInsightsView: React.FC = (
InsightsError()} @@ -280,7 +280,7 @@ export const StatementInsightsView: React.FC = ( renderNoResult={ 0 && filteredStatements?.length > 0 + search?.length > 0 && filteredStatements?.length === 0 } /> } diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsView.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsView.tsx index 8cc59f44056e..3cad40750804 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsView.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsView.tsx @@ -211,7 +211,7 @@ export const TransactionInsightsView: React.FC = (
InsightsError()} @@ -235,7 +235,7 @@ export const TransactionInsightsView: React.FC = ( renderNoResult={ 0 && filteredTransactions?.length == 0 + search?.length > 0 && filteredTransactions?.length === 0 } /> }