diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index f916e12ddff2..4443f195c9ee 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -283,4 +283,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-30 set the active cluster version in the format '.' +version version 22.1-32 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 31db96f77148..9376a4c93c4f 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -214,6 +214,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-30set the active cluster version in the format '.' +versionversion22.1-32set the active cluster version in the format '.' diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 47211ec35631..85fae2a501f7 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -996,6 +996,195 @@ func TestChangefeedUserDefinedTypes(t *testing.T) { cdcTest(t, testFn) } +// If the schema_change_policy is 'stop' and we drop columns which are not +// targeted by the changefeed, it should not stop. +func TestNoStopAfterNonTargetColumnDrop(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH schema_change_policy='stop'`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + }) + + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`) + + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`, + }) + + // Check that dropping a watched column still stops the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) + if _, err := cf.Next(); !testutils.IsError(err, `schema change occurred at`) { + t.Errorf(`expected "schema change occurred at ..." got: %+v`, err.Error()) + } + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +// If we drop columns which are not targeted by the changefeed, it should not backfill. +func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + }) + + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`, + }) + + // Check that dropping a watched column still backfills. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN c`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b"}}`, + `hasfams.b_and_c: [1]->{"after": {"b": "b1"}}`, + }) + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `CREATE TABLE nofams (id int primary key, a string, b string, c string)`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + sqlDB.Exec(t, `INSERT INTO nofams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE nofams`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + `nofams: [0]->{"after": {"a": "a", "b": "b", "c": "c", "id": 0}}`, + }) + + // Dropping an unwatched column from hasfams does not affect the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`) + sqlDB.Exec(t, `INSERT INTO nofams VALUES (1, 'a1', 'b1', 'c1')`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`, + `nofams: [1]->{"after": {"a": "a1", "b": "b1", "c": "c1", "id": 1}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"c": "c"}}`, + `hasfams.b_and_c: [1]->{"after": {"c": "c1"}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE nofams DROP COLUMN b`) + assertPayloads(t, cf, []string{ + `nofams: [0]->{"after": {"a": "a", "c": "c", "id": 0}}`, + `nofams: [1]->{"after": {"a": "a1", "c": "c1", "id": 1}}`, + }) + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `CREATE TABLE alsohasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + sqlDB.Exec(t, `INSERT INTO alsohasfams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE alsohasfams FAMILY id_a`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + `alsohasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`, + }) + + // Dropping an unwatched column from hasfams does not affect the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`) + sqlDB.Exec(t, `INSERT INTO alsohasfams VALUES (1, 'a1', 'b1', 'c1')`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`, + `alsohasfams.id_a: [1]->{"after": {"a": "a1", "id": 1}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE alsohasfams DROP COLUMN a`) + assertPayloads(t, cf, []string{ + `alsohasfams.id_a: [0]->{"after": {"id": 0}}`, + `alsohasfams.id_a: [1]->{"after": {"id": 1}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"c": "c"}}`, + `hasfams.b_and_c: [1]->{"after": {"c": "c1"}}`, + }) + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + `hasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + assertPayloads(t, cf, []string{ + `hasfams.id_a: [0]->{"after": {"id": 0}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"c": "c"}}`, + }) + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + func TestChangefeedExternalIODisabled(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/target.go b/pkg/ccl/changefeedccl/changefeedbase/target.go index 0e6591f3859b..3c02d1ecc613 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/target.go +++ b/pkg/ccl/changefeedccl/changefeedbase/target.go @@ -26,7 +26,11 @@ type Target struct { type StatementTimeName string type targetsByTable struct { - wholeTable *Target + // wholeTable is a single target set when the target has no specified column + // families + wholeTable *Target + // byFamilyName is populated only if there are multiple column family targets + // for the table byFamilyName map[string]Target } @@ -83,6 +87,21 @@ func (ts *Targets) EachTarget(f func(Target) error) error { return nil } +// GetSpecifiedColumnFamilies returns a set of watched families +// belonging to the table. +func (ts *Targets) GetSpecifiedColumnFamilies(tableID descpb.ID) map[string]struct{} { + target, exists := ts.m[tableID] + if !exists { + return make(map[string]struct{}) + } + + families := make(map[string]struct{}, len(target.byFamilyName)) + for family := range target.byFamilyName { + families[family] = struct{}{} + } + return families +} + // EachTableID iterates over unique TableIDs referenced in Targets. func (ts *Targets) EachTableID(f func(descpb.ID) error) error { for id := range ts.m { diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 3d4ed2ebcd05..ca5a3b1b88f1 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -287,11 +287,18 @@ func (f *kvFeed) run(ctx context.Context) (err error) { if err != nil { return err } + // Detect whether the event corresponds to a primary index change. Also - // detect whether that primary index change corresponds to any change in - // the primary key or in the set of visible columns. If it corresponds to - // no such change, than it may be a column being dropped physically and - // should not trigger a failure in the `stop` policy. + // detect whether the change corresponds to any change in the set of visible + // primary key columns. + // + // If a primary key is being changed and there are no changes in the + // primary key's columns, this may be due to a column which was dropped + // logically before and is presently being physically dropped. + // + // If is no change in the primary key columns, then a primary key change + // should not trigger a failure in the `stop` policy because this change is + // effectively invisible to consumers. primaryIndexChange, noColumnChanges := isPrimaryKeyChange(events) if primaryIndexChange && (noColumnChanges || f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyStop) { diff --git a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel index a01b601a4ed0..99f0bc4552cb 100644 --- a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", "//pkg/storage", + "//pkg/util", "//pkg/util/contextutil", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/changefeedccl/schemafeed/helpers_test.go b/pkg/ccl/changefeedccl/schemafeed/helpers_test.go index 91f61db019a1..7f1f045a3f84 100644 --- a/pkg/ccl/changefeedccl/schemafeed/helpers_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/helpers_test.go @@ -8,7 +8,12 @@ package schemafeed -import "strings" +import ( + "strings" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" +) const TestingAllEventFilter = "testing" @@ -35,3 +40,9 @@ func PrintTableEventType(t tableEventType) string { } return strings.Join(strs, "|") } + +func CreateChangefeedTargets(tableID descpb.ID) changefeedbase.Targets { + targets := changefeedbase.Targets{} + targets.Add(changefeedbase.Target{TableID: tableID}) + return targets +} diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 9437f9935da4..ef01d8087116 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -560,7 +560,7 @@ func (tf *schemaFeed) validateDescriptor( Before: lastVersion, After: desc, } - shouldFilter, err := tf.filter.shouldFilter(ctx, e) + shouldFilter, err := tf.filter.shouldFilter(ctx, e, tf.targets) log.VEventf(ctx, 1, "validate shouldFilter %v %v", formatEvent(e), shouldFilter) if err != nil { return err diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go index b7f3c519e387..38a5c5e2e746 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go @@ -14,7 +14,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" ) @@ -122,7 +124,9 @@ func classifyTableEvent(e TableEvent) tableEventTypeSet { // permitted by the filter. type tableEventFilter map[tableEventType]bool -func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (bool, error) { +func (filter tableEventFilter) shouldFilter( + ctx context.Context, e TableEvent, targets changefeedbase.Targets, +) (bool, error) { et := classifyTableEvent(e) // Truncation events are not ignored and return an error. @@ -141,7 +145,19 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) ( shouldFilter := true for filterEvent, filterPolicy := range filter { if et.Contains(filterEvent) && !filterPolicy { - shouldFilter = false + // Apply changefeed target-specific filters. + // In some cases, a drop column event should be filtered out. + // For example, we may be dropping a column which is not + // monitored by the changefeed. + if filterEvent == tableEventDropColumn { + sf, err := shouldFilterDropColumnEvent(e, targets) + if err != nil { + return false, err + } + shouldFilter = sf + } else { + shouldFilter = false + } } et = et.Clear(filterEvent) } @@ -151,6 +167,48 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) ( return shouldFilter, nil } +// shouldFilterDropColumnEvent decides if we should filter out a drop column event. +func shouldFilterDropColumnEvent(e TableEvent, targets changefeedbase.Targets) (bool, error) { + if watched, err := droppedColumnIsWatched(e, targets); err != nil { + return false, err + } else if watched { + return false, nil + } + return true, nil +} + +// Returns true if the changefeed targets a column which has a drop mutation inside the table event. +func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool, error) { + // If no column families are specified, then all columns are targeted. + specifiedColumnFamiliesForTable := targets.GetSpecifiedColumnFamilies(e.Before.GetID()) + if len(specifiedColumnFamiliesForTable) == 0 { + return true, nil + } + + var watchedColumnIDs util.FastIntSet + if err := e.Before.ForeachFamily(func(family *descpb.ColumnFamilyDescriptor) error { + if _, ok := specifiedColumnFamiliesForTable[family.Name]; ok { + for _, columnID := range family.ColumnIDs { + watchedColumnIDs.Add(int(columnID)) + } + } + return nil + }); err != nil { + return false, err + } + + for _, m := range e.After.AllMutations() { + if m.AsColumn() == nil || m.AsColumn().IsHidden() { + continue + } + if m.Dropped() && m.WriteAndDeleteOnly() && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) { + return true, nil + } + } + + return false, nil +} + func hasNewVisibleColumnDropBackfillMutation(e TableEvent) (res bool) { // Make sure that the old descriptor *doesn't* have the same mutation to avoid adding // the same scan boundary more than once. diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go index c6bb55ab805b..3061af4508d8 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go @@ -254,15 +254,18 @@ func TestTableEventFilterErrorsWithIncompletePolicy(t *testing.T) { Before: mkTableDesc(42, 1, ts(2), 2, 1), After: dropColBackfill(mkTableDesc(42, 2, ts(3), 1, 1)), } - _, err := incompleteFilter.shouldFilter(context.Background(), dropColEvent) + changefeedTargets := CreateChangefeedTargets(42) + + _, err := incompleteFilter.shouldFilter(context.Background(), dropColEvent, changefeedTargets) require.Error(t, err) unknownEvent := TableEvent{ Before: mkTableDesc(42, 1, ts(2), 2, 1), After: mkTableDesc(42, 1, ts(2), 2, 1), } - _, err = incompleteFilter.shouldFilter(context.Background(), unknownEvent) + _, err = incompleteFilter.shouldFilter(context.Background(), unknownEvent, changefeedTargets) require.Error(t, err) + } func TestTableEventFilter(t *testing.T) { @@ -388,7 +391,7 @@ func TestTableEventFilter(t *testing.T) { }, } { t.Run(c.name, func(t *testing.T) { - shouldFilter, err := c.p.shouldFilter(context.Background(), c.e) + shouldFilter, err := c.p.shouldFilter(context.Background(), c.e, CreateChangefeedTargets(42)) require.NoError(t, err) require.Equalf(t, c.exp, shouldFilter, "event %v", c.e) }) diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index 9b80cf000c7a..c94dbcbf5523 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -195,10 +195,10 @@ SELECT * FROM crdb_internal.leases WHERE node_id < 0 ---- node_id table_id name parent_id expiration deleted -query ITTTTTIIITRRRRRRRRRRRRRRRRRRRRRRRRRRBBTTTT colnames +query ITTTTTIIITRRRRRRRRRRRRRRRRRRRRRRRRRRBBTTTTT colnames SELECT * FROM crdb_internal.node_statement_statistics WHERE node_id < 0 ---- -node_id application_name flags statement_id key anonymized count first_attempt_count max_retries last_error rows_avg rows_var parse_lat_avg parse_lat_var plan_lat_avg plan_lat_var run_lat_avg run_lat_var service_lat_avg service_lat_var overhead_lat_avg overhead_lat_var bytes_read_avg bytes_read_var rows_read_avg rows_read_var network_bytes_avg network_bytes_var network_msgs_avg network_msgs_var max_mem_usage_avg max_mem_usage_var max_disk_usage_avg max_disk_usage_var contention_time_avg contention_time_var implicit_txn full_scan sample_plan database_name exec_node_ids txn_fingerprint_id +node_id application_name flags statement_id key anonymized count first_attempt_count max_retries last_error rows_avg rows_var parse_lat_avg parse_lat_var plan_lat_avg plan_lat_var run_lat_avg run_lat_var service_lat_avg service_lat_var overhead_lat_avg overhead_lat_var bytes_read_avg bytes_read_var rows_read_avg rows_read_var network_bytes_avg network_bytes_var network_msgs_avg network_msgs_var max_mem_usage_avg max_mem_usage_var max_disk_usage_avg max_disk_usage_var contention_time_avg contention_time_var implicit_txn full_scan sample_plan database_name exec_node_ids txn_fingerprint_id index_recommendations query ITTTIIRRRRRRRRRRRRRRRRRR colnames SELECT * FROM crdb_internal.node_transaction_statistics WHERE node_id < 0 diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index d4bb66fa63ba..497cc32ac7be 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -331,6 +331,9 @@ const ( AlterSystemSQLInstancesAddLocality // SystemExternalConnectionsTable adds system.external_connections table. SystemExternalConnectionsTable + // AlterSystemStatementStatisticsAddIndexRecommendations adds an + // index_recommendations column to the system.statement_statistics table. + AlterSystemStatementStatisticsAddIndexRecommendations // ************************************************* // Step (1): Add new versions here. @@ -572,6 +575,10 @@ var versionsSingleton = keyedVersions{ Key: SystemExternalConnectionsTable, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 30}, }, + { + Key: AlterSystemStatementStatisticsAddIndexRecommendations, + Version: roachpb.Version{Major: 22, Minor: 1, Internal: 32}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 2e253b5f9d57..73c5f7405ace 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -59,11 +59,12 @@ func _() { _ = x[EnablePredicateProjectionChangefeed-48] _ = x[AlterSystemSQLInstancesAddLocality-49] _ = x[SystemExternalConnectionsTable-50] + _ = x[AlterSystemStatementStatisticsAddIndexRecommendations-51] } -const _Key_name = "V21_2Start22_1PebbleFormatBlockPropertyCollectorProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersEnablePebbleFormatVersionBlockPropertiesMVCCIndexBackfillerEnableLeaseHolderRemovalLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTable" +const _Key_name = "V21_2Start22_1PebbleFormatBlockPropertyCollectorProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersEnablePebbleFormatVersionBlockPropertiesMVCCIndexBackfillerEnableLeaseHolderRemovalLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendations" -var _Key_index = [...]uint16{0, 5, 14, 48, 60, 88, 118, 146, 167, 186, 220, 258, 292, 324, 360, 392, 428, 470, 510, 529, 553, 584, 602, 643, 673, 684, 715, 738, 762, 786, 808, 820, 846, 860, 881, 899, 904, 913, 928, 968, 1002, 1036, 1058, 1078, 1097, 1130, 1149, 1169, 1190, 1225, 1259, 1289} +var _Key_index = [...]uint16{0, 5, 14, 48, 60, 88, 118, 146, 167, 186, 220, 258, 292, 324, 360, 392, 428, 470, 510, 529, 553, 584, 602, 643, 673, 684, 715, 738, 762, 786, 808, 820, 846, 860, 881, 899, 904, 913, 928, 968, 1002, 1036, 1058, 1078, 1097, 1130, 1149, 1169, 1190, 1225, 1259, 1289, 1342} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/roachpb/app_stats.go b/pkg/roachpb/app_stats.go index fd511eb1790f..4078aa35cfb0 100644 --- a/pkg/roachpb/app_stats.go +++ b/pkg/roachpb/app_stats.go @@ -155,6 +155,7 @@ func (s *StatementStatistics) Add(other *StatementStatistics) { s.RowsWritten.Add(other.RowsWritten, s.Count, other.Count) s.Nodes = util.CombineUniqueInt64(s.Nodes, other.Nodes) s.PlanGists = util.CombineUniqueString(s.PlanGists, other.PlanGists) + s.IndexRecommendations = util.CombineUniqueString(s.IndexRecommendations, other.IndexRecommendations) s.ExecStats.Add(other.ExecStats) diff --git a/pkg/roachpb/app_stats.proto b/pkg/roachpb/app_stats.proto index 3b1b4f1653b9..a75efcf4e90e 100644 --- a/pkg/roachpb/app_stats.proto +++ b/pkg/roachpb/app_stats.proto @@ -104,12 +104,15 @@ message StatementStatistics { // Nodes is the ordered list of nodes ids on which the statement was executed. repeated int64 nodes = 24; - // plan_gists is list of a compressed version of plan that can be converted (lossily) + // plan_gists is the list of a compressed version of plan that can be converted (lossily) // back into a logical plan. // Each statement contain only one plan gist, but the same statement fingerprint id // can contain more than one value. repeated string plan_gists = 26; + // index_recommendations is the list of index recommendations generated for the statement fingerprint. + repeated string index_recommendations = 27; + // Note: be sure to update `sql/app_stats.go` when adding/removing fields here! reserved 13, 14, 17, 18, 19, 20; diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index 84515819822c..9874c6e9c5d7 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -12,6 +12,7 @@ package sql import ( "context" + "strconv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -19,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -50,6 +52,10 @@ type applyJoinNode struct { rightTypes []*types.T planRightSideFn exec.ApplyJoinPlanRightSideFn + // iterationCount tracks the number of times planRightSideFn has been + // invoked (in other words, the number of left rows for which we have + // performed the apply join so far). + iterationCount int run struct { // emptyRight is a cached, all-NULL slice that's used for left outer joins @@ -206,18 +212,13 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) { // from the latest input row. leftRow := a.input.plan.Values() a.run.leftRow = leftRow + a.iterationCount++ // At this point, it's time to do the major lift of apply join: re-planning // the right side of the join using the optimizer, with all outer columns // in the right side replaced by the bindings that were defined by the most // recently read left row. - p, err := a.planRightSideFn(newExecFactory(params.p), leftRow) - if err != nil { - return false, err - } - plan := p.(*planComponents) - - if err := a.runRightSidePlan(params, plan); err != nil { + if err := a.runNextRightSideIteration(params, leftRow); err != nil { return false, err } @@ -237,25 +238,35 @@ func (a *applyJoinNode) clearRightRows(params runParams) error { return nil } -// runRightSidePlan runs a planTop that's been generated based on the -// re-optimized right hand side of the apply join, stashing the result in -// a.run.rightRows, ready for retrieval. An error indicates that something went -// wrong during execution of the right hand side of the join, and that we should -// completely give up on the outer join. -func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planComponents) error { +// runNextRightSideIteration generates a planTop based on the re-optimized right +// hand side of the apply join given the next left row and runs the plan to +// completion, stashing the result in a.run.rightRows, ready for retrieval. An +// error indicates that something went wrong during execution of the right hand +// side of the join, and that we should completely give up on the outer join. +func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree.Datums) error { + opName := "apply-join-iteration-" + strconv.Itoa(a.iterationCount) + ctx, sp := tracing.ChildSpan(params.ctx, opName) + defer sp.Finish() + p, err := a.planRightSideFn(newExecFactory(params.p), leftRow) + if err != nil { + return err + } + plan := p.(*planComponents) rowResultWriter := NewRowResultWriter(&a.run.rightRows) - if err := runPlanInsidePlan(params, plan, rowResultWriter); err != nil { + if err := runPlanInsidePlan(ctx, params, plan, rowResultWriter); err != nil { return err } - a.run.rightRowsIterator = newRowContainerIterator(params.ctx, a.run.rightRows, a.rightTypes) + a.run.rightRowsIterator = newRowContainerIterator(ctx, a.run.rightRows, a.rightTypes) return nil } // runPlanInsidePlan is used to run a plan and gather the results in the // resultWriter, as part of the execution of an "outer" plan. -func runPlanInsidePlan(params runParams, plan *planComponents, resultWriter rowResultWriter) error { +func runPlanInsidePlan( + ctx context.Context, params runParams, plan *planComponents, resultWriter rowResultWriter, +) error { recv := MakeDistSQLReceiver( - params.ctx, resultWriter, tree.Rows, + ctx, resultWriter, tree.Rows, params.ExecCfg().RangeDescriptorCache, params.p.Txn(), params.ExecCfg().Clock, @@ -287,9 +298,9 @@ func runPlanInsidePlan(params runParams, plan *planComponents, resultWriter rowR // Note that we intentionally defer the closure of the account until we // return from this method (after the main query is executed). subqueryResultMemAcc := params.p.EvalContext().Mon.MakeBoundAccount() - defer subqueryResultMemAcc.Close(params.ctx) + defer subqueryResultMemAcc.Close(ctx) if !params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRunSubqueries( - params.ctx, + ctx, params.p, params.extendedEvalCtx.copy, plan.subqueryPlans, @@ -304,20 +315,20 @@ func runPlanInsidePlan(params runParams, plan *planComponents, resultWriter rowR evalCtx := params.p.ExtendedEvalContextCopy() plannerCopy := *params.p distributePlan := getPlanDistribution( - params.ctx, &plannerCopy, plannerCopy.execCfg.NodeInfo.NodeID, plannerCopy.SessionData().DistSQLMode, plan.main, + ctx, &plannerCopy, plannerCopy.execCfg.NodeInfo.NodeID, plannerCopy.SessionData().DistSQLMode, plan.main, ) distributeType := DistributionType(DistributionTypeNone) if distributePlan.WillDistribute() { distributeType = DistributionTypeAlways } planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.NewPlanningCtx( - params.ctx, evalCtx, &plannerCopy, params.p.txn, distributeType) + ctx, evalCtx, &plannerCopy, params.p.txn, distributeType) planCtx.planner.curPlan.planComponents = *plan planCtx.ExtendedEvalCtx.Planner = &plannerCopy planCtx.stmtType = recv.stmtType params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun( - params.ctx, evalCtx, planCtx, params.p.Txn(), plan.main, recv, + ctx, evalCtx, planCtx, params.p.Txn(), plan.main, recv, )() return resultWriter.Err() } diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 02f3b090d2e2..36c7761c3aea 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -481,6 +481,8 @@ CREATE TABLE system.statement_statistics ( mod(fnv32(crdb_internal.datums_to_bytes(aggregated_ts, app_name, fingerprint_id, node_id, plan_hash, transaction_fingerprint_id)), 8:::INT8) ) STORED, + index_recommendations STRING[] NOT NULL DEFAULT (array[]::STRING[]), + CONSTRAINT "primary" PRIMARY KEY (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id) USING HASH WITH (bucket_count=8), INDEX "fingerprint_stats_idx" (fingerprint_id, transaction_fingerprint_id), @@ -495,7 +497,8 @@ CREATE TABLE system.statement_statistics ( agg_interval, metadata, statistics, - plan + plan, + index_recommendations ) ) ` @@ -1983,6 +1986,8 @@ var ( }, )) + defaultIndexRec = "ARRAY[]:::STRING[]" + // StatementStatisticsTable is the descriptor for the SQL statement stats table. // It stores statistics for statement fingerprints. StatementStatisticsTable = registerSystemTable( @@ -2009,6 +2014,7 @@ var ( ComputeExpr: &sqlStmtHashComputeExpr, Hidden: true, }, + {Name: "index_recommendations", ID: 12, Type: types.StringArray, Nullable: false, DefaultExpr: &defaultIndexRec}, }, []descpb.ColumnFamilyDescriptor{ { @@ -2017,9 +2023,9 @@ var ( ColumnNames: []string{ "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8", "aggregated_ts", "fingerprint_id", "transaction_fingerprint_id", "plan_hash", "app_name", "node_id", - "agg_interval", "metadata", "statistics", "plan", + "agg_interval", "metadata", "statistics", "plan", "index_recommendations", }, - ColumnIDs: []descpb.ColumnID{11, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + ColumnIDs: []descpb.ColumnID{11, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12}, DefaultColumnID: 0, }, }, diff --git a/pkg/sql/catalog/systemschema_test/testdata/bootstrap b/pkg/sql/catalog/systemschema_test/testdata/bootstrap index 1da4f56312d4..1b1e73c8fb7d 100644 --- a/pkg/sql/catalog/systemschema_test/testdata/bootstrap +++ b/pkg/sql/catalog/systemschema_test/testdata/bootstrap @@ -306,6 +306,7 @@ CREATE TABLE public.statement_statistics ( statistics JSONB NOT NULL, plan JSONB NOT NULL, crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 INT4 NOT VISIBLE NOT NULL AS (mod(fnv32(crdb_internal.datums_to_bytes(aggregated_ts, app_name, fingerprint_id, node_id, plan_hash, transaction_fingerprint_id)), 8:::INT8)) STORED, + index_recommendations STRING[] NOT NULL DEFAULT ARRAY[]:::STRING[], CONSTRAINT "primary" PRIMARY KEY (aggregated_ts ASC, fingerprint_id ASC, transaction_fingerprint_id ASC, plan_hash ASC, app_name ASC, node_id ASC) USING HASH WITH (bucket_count=8), INDEX fingerprint_stats_idx (fingerprint_id ASC, transaction_fingerprint_id ASC) ); diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 6b114da8a10c..2114133485e6 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1103,7 +1103,8 @@ CREATE TABLE crdb_internal.node_statement_statistics ( sample_plan JSONB, database_name STRING NOT NULL, exec_node_ids INT[] NOT NULL, - txn_fingerprint_id STRING + txn_fingerprint_id STRING, + index_recommendations STRING[] NOT NULL )`, populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { hasViewActivityOrViewActivityRedacted, err := p.HasViewActivityOrViewActivityRedactedRole(ctx) @@ -1156,6 +1157,13 @@ CREATE TABLE crdb_internal.node_statement_statistics ( } + indexRecommendations := tree.NewDArray(types.String) + for _, recommendation := range stats.Stats.IndexRecommendations { + if err := indexRecommendations.Append(tree.NewDString(recommendation)); err != nil { + return err + } + } + err := addRow( tree.NewDInt(tree.DInt(nodeID)), // node_id tree.NewDString(stats.Key.App), // application_name @@ -1199,6 +1207,7 @@ CREATE TABLE crdb_internal.node_statement_statistics ( tree.NewDString(stats.Key.Database), // database_name execNodeIDs, // exec_node_ids txnFingerprintID, // txn_fingerprint_id + indexRecommendations, // index_recommendations ) if err != nil { return err @@ -5371,7 +5380,8 @@ CREATE TABLE crdb_internal.cluster_statement_statistics ( metadata JSONB NOT NULL, statistics JSONB NOT NULL, sampled_plan JSONB NOT NULL, - aggregation_interval INTERVAL NOT NULL + aggregation_interval INTERVAL NOT NULL, + index_recommendations STRING[] NOT NULL );`, generator: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { // TODO(azhng): we want to eventually implement memory accounting within the @@ -5402,7 +5412,7 @@ CREATE TABLE crdb_internal.cluster_statement_statistics ( curAggTs := s.ComputeAggregatedTs() aggInterval := s.GetAggregationInterval() - row := make(tree.Datums, 8 /* number of columns for this virtual table */) + row := make(tree.Datums, 9 /* number of columns for this virtual table */) worker := func(ctx context.Context, pusher rowPusher) error { return memSQLStats.IterateStatementStats(ctx, &sqlstats.IteratorOptions{ SortedAppNames: true, @@ -5437,6 +5447,13 @@ CREATE TABLE crdb_internal.cluster_statement_statistics ( duration.MakeDuration(aggInterval.Nanoseconds(), 0, 0), types.DefaultIntervalTypeMetadata) + indexRecommendations := tree.NewDArray(types.String) + for _, recommendation := range statistics.Stats.IndexRecommendations { + if err := indexRecommendations.Append(tree.NewDString(recommendation)); err != nil { + return err + } + } + row = row[:0] row = append(row, aggregatedTs, // aggregated_ts @@ -5448,6 +5465,7 @@ CREATE TABLE crdb_internal.cluster_statement_statistics ( tree.NewDJSON(statisticsJSON), // statistics tree.NewDJSON(plan), // plan aggInterval, // aggregation_interval + indexRecommendations, // index_recommendations ) return pusher.pushRow(row...) @@ -5473,7 +5491,8 @@ SELECT max(metadata) as metadata, crdb_internal.merge_statement_stats(array_agg(statistics)), max(sampled_plan), - aggregation_interval + aggregation_interval, + array_remove(array_agg(index_rec), NULL) AS index_recommendations FROM ( SELECT aggregated_ts, @@ -5484,7 +5503,8 @@ FROM ( metadata, statistics, sampled_plan, - aggregation_interval + aggregation_interval, + index_recommendations FROM crdb_internal.cluster_statement_statistics UNION ALL @@ -5497,10 +5517,12 @@ FROM ( metadata, statistics, plan, - agg_interval + agg_interval, + index_recommendations FROM system.statement_statistics ) +LEFT JOIN LATERAL unnest(index_recommendations) AS index_rec ON true GROUP BY aggregated_ts, fingerprint_id, @@ -5518,6 +5540,7 @@ GROUP BY {Name: "statistics", Typ: types.Jsonb}, {Name: "sampled_plan", Typ: types.Jsonb}, {Name: "aggregation_interval", Typ: types.Interval}, + {Name: "index_recommendations", Typ: types.StringArray}, }, } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index ea67bd53059e..7aa3f943d549 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -51,7 +51,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" - "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" @@ -156,10 +155,6 @@ const ( // during initialization by CCL code to enable follower reads. var ReplicaOraclePolicy = replicaoracle.BinPackingChoice -// If true, the plan diagram (in JSON) is logged for each plan (used for -// debugging). -var logPlanDiagram = envutil.EnvOrDefaultBool("COCKROACH_DISTSQL_LOG_PLAN", false) - // NewDistSQLPlanner initializes a DistSQLPlanner. // // sqlInstanceID is the ID of the node on which this planner runs. It is used to diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index cbfaaf3ad6d6..64bafdf62866 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -506,8 +506,7 @@ func (dsp *DistSQLPlanner) Run( } } - if logPlanDiagram { - log.VEvent(ctx, 3, "creating plan diagram for logging") + if sp := tracing.SpanFromContext(ctx); sp != nil { var stmtStr string if planCtx.planner != nil && planCtx.planner.stmt.AST != nil { stmtStr = planCtx.planner.stmt.String() diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 73751375cad1..9f7316663405 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -168,23 +168,24 @@ func (ex *connExecutor) recordStatementSummary( } recordedStmtStats := sqlstats.RecordedStmtStats{ - SessionID: ex.sessionID, - StatementID: planner.stmt.QueryID, - AutoRetryCount: automaticRetryCount, - RowsAffected: rowsAffected, - ParseLatency: parseLat, - PlanLatency: planLat, - RunLatency: runLat, - ServiceLatency: svcLat, - OverheadLatency: execOverhead, - BytesRead: stats.bytesRead, - RowsRead: stats.rowsRead, - RowsWritten: stats.rowsWritten, - Nodes: getNodesFromPlanner(planner), - StatementType: stmt.AST.StatementType(), - Plan: planner.instrumentation.PlanForStats(ctx), - PlanGist: planner.instrumentation.planGist.String(), - StatementError: stmtErr, + SessionID: ex.sessionID, + StatementID: planner.stmt.QueryID, + AutoRetryCount: automaticRetryCount, + RowsAffected: rowsAffected, + ParseLatency: parseLat, + PlanLatency: planLat, + RunLatency: runLat, + ServiceLatency: svcLat, + OverheadLatency: execOverhead, + BytesRead: stats.bytesRead, + RowsRead: stats.rowsRead, + RowsWritten: stats.rowsWritten, + Nodes: getNodesFromPlanner(planner), + StatementType: stmt.AST.StatementType(), + Plan: planner.instrumentation.PlanForStats(ctx), + PlanGist: planner.instrumentation.planGist.String(), + StatementError: stmtErr, + IndexRecommendations: planner.instrumentation.indexRecommendations, } stmtFingerprintID, err := diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 99d872278024..4b1dff6d2d4f 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -538,8 +538,6 @@ type testClusterConfig struct { allowSplitAndScatter bool // if non-empty, overrides the default vectorize mode. overrideVectorize string - // if non-empty, overrides the default experimental DistSQL planning mode. - overrideExperimentalDistSQLPlanning string // if set, queries using distSQL processors or vectorized operators that can // fall back to disk do so immediately, using only their disk-based // implementation. @@ -764,12 +762,6 @@ var logicTestConfigs = []testClusterConfig{ binaryVersion: roachpb.Version{Major: 1, Minor: 1}, disableUpgrade: true, }, - { - name: "local-spec-planning", - numNodes: 1, - overrideDistSQLMode: "off", - overrideExperimentalDistSQLPlanning: "on", - }, { name: "fakedist", numNodes: 3, @@ -791,13 +783,6 @@ var logicTestConfigs = []testClusterConfig{ sqlExecUseDisk: true, skipShort: true, }, - { - name: "fakedist-spec-planning", - numNodes: 3, - useFakeSpanResolver: true, - overrideDistSQLMode: "on", - overrideExperimentalDistSQLPlanning: "on", - }, { name: "5node", numNodes: 5, @@ -815,12 +800,6 @@ var logicTestConfigs = []testClusterConfig{ sqlExecUseDisk: true, skipShort: true, }, - { - name: "5node-spec-planning", - numNodes: 5, - overrideDistSQLMode: "on", - overrideExperimentalDistSQLPlanning: "on", - }, { // 3node-tenant is a config that runs the test as a SQL tenant. This config // can only be run with a CCL binary, so is a noop if run through the normal @@ -1008,18 +987,15 @@ var ( defaultConfigNames = []string{ "local", "local-vec-off", - "local-spec-planning", "fakedist", "fakedist-vec-off", "fakedist-disk", - "fakedist-spec-planning", } // fiveNodeDefaultConfigName is a special alias for all 5 node configs. fiveNodeDefaultConfigName = "5node-default-configs" fiveNodeDefaultConfigNames = []string{ "5node", "5node-disk", - "5node-spec-planning", } // threeNodeTenantDefaultConfigName is a special alias for all 3-node tenant // configs. @@ -2019,14 +1995,6 @@ func (t *logicTest) newCluster( t.Fatal(err) } - if cfg.overrideExperimentalDistSQLPlanning != "" { - if _, err := conn.Exec( - "SET CLUSTER SETTING sql.defaults.experimental_distsql_planning = $1::string", cfg.overrideExperimentalDistSQLPlanning, - ); err != nil { - t.Fatal(err) - } - } - // Update the default AS OF time for querying the system.table_statistics // table to create the crdb_internal.table_row_statistics table. if _, err := conn.Exec( diff --git a/pkg/sql/logictest/testdata/logic_test/builtin_function b/pkg/sql/logictest/testdata/logic_test/builtin_function index 13952783daf1..40e9e89b76ec 100644 --- a/pkg/sql/logictest/testdata/logic_test/builtin_function +++ b/pkg/sql/logictest/testdata/logic_test/builtin_function @@ -1,5 +1,3 @@ -# LogicTest: !fakedist-spec-planning - statement ok CREATE TABLE foo (a int) diff --git a/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant b/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant index 479fbe2371ee..696bab428d46 100644 --- a/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant +++ b/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant @@ -1,4 +1,4 @@ -# LogicTest: !3node-tenant-default-configs !fakedist-spec-planning +# LogicTest: !3node-tenant-default-configs subtest check_consistency diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index f0539bfe679a..bd9c74e097d9 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -315,10 +315,10 @@ SELECT * FROM crdb_internal.leases WHERE node_id < 0 ---- node_id table_id name parent_id expiration deleted -query ITTTTTIIITRRRRRRRRRRRRRRRRRRRRRRRRRRBBTTTT colnames +query ITTTTTIIITRRRRRRRRRRRRRRRRRRRRRRRRRRBBTTTTT colnames SELECT * FROM crdb_internal.node_statement_statistics WHERE node_id < 0 ---- -node_id application_name flags statement_id key anonymized count first_attempt_count max_retries last_error rows_avg rows_var parse_lat_avg parse_lat_var plan_lat_avg plan_lat_var run_lat_avg run_lat_var service_lat_avg service_lat_var overhead_lat_avg overhead_lat_var bytes_read_avg bytes_read_var rows_read_avg rows_read_var network_bytes_avg network_bytes_var network_msgs_avg network_msgs_var max_mem_usage_avg max_mem_usage_var max_disk_usage_avg max_disk_usage_var contention_time_avg contention_time_var implicit_txn full_scan sample_plan database_name exec_node_ids txn_fingerprint_id +node_id application_name flags statement_id key anonymized count first_attempt_count max_retries last_error rows_avg rows_var parse_lat_avg parse_lat_var plan_lat_avg plan_lat_var run_lat_avg run_lat_var service_lat_avg service_lat_var overhead_lat_avg overhead_lat_var bytes_read_avg bytes_read_var rows_read_avg rows_read_var network_bytes_avg network_bytes_var network_msgs_avg network_msgs_var max_mem_usage_avg max_mem_usage_var max_disk_usage_avg max_disk_usage_var contention_time_avg contention_time_var implicit_txn full_scan sample_plan database_name exec_node_ids txn_fingerprint_id index_recommendations query ITTTIIRRRRRRRRRRRRRRRRRR colnames SELECT * FROM crdb_internal.node_transaction_statistics WHERE node_id < 0 diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 9e5219eb0e2f..8fb611a52738 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -381,7 +381,8 @@ CREATE TABLE crdb_internal.cluster_statement_statistics ( metadata JSONB NOT NULL, statistics JSONB NOT NULL, sampled_plan JSONB NOT NULL, - aggregation_interval INTERVAL NOT NULL + aggregation_interval INTERVAL NOT NULL, + index_recommendations STRING[] NOT NULL ) CREATE TABLE crdb_internal.cluster_statement_statistics ( aggregated_ts TIMESTAMPTZ NOT NULL, fingerprint_id BYTES NOT NULL, @@ -391,7 +392,8 @@ CREATE TABLE crdb_internal.cluster_statement_statistics ( metadata JSONB NOT NULL, statistics JSONB NOT NULL, sampled_plan JSONB NOT NULL, - aggregation_interval INTERVAL NOT NULL + aggregation_interval INTERVAL NOT NULL, + index_recommendations STRING[] NOT NULL ) {} {} CREATE TABLE crdb_internal.cluster_transaction_statistics ( aggregated_ts TIMESTAMPTZ NOT NULL, @@ -1041,7 +1043,8 @@ CREATE TABLE crdb_internal.node_statement_statistics ( sample_plan JSONB NULL, database_name STRING NOT NULL, exec_node_ids INT8[] NOT NULL, - txn_fingerprint_id STRING NULL + txn_fingerprint_id STRING NULL, + index_recommendations STRING[] NOT NULL ) CREATE TABLE crdb_internal.node_statement_statistics ( node_id INT8 NOT NULL, application_name STRING NOT NULL, @@ -1084,7 +1087,8 @@ CREATE TABLE crdb_internal.node_statement_statistics ( sample_plan JSONB NULL, database_name STRING NOT NULL, exec_node_ids INT8[] NOT NULL, - txn_fingerprint_id STRING NULL + txn_fingerprint_id STRING NULL, + index_recommendations STRING[] NOT NULL ) {} {} CREATE TABLE crdb_internal.node_transaction_statistics ( node_id INT8 NOT NULL, @@ -1401,7 +1405,8 @@ CREATE VIEW crdb_internal.statement_statistics ( metadata, statistics, sampled_plan, - aggregation_interval + aggregation_interval, + index_recommendations ) AS SELECT aggregated_ts, fingerprint_id, @@ -1411,7 +1416,8 @@ CREATE VIEW crdb_internal.statement_statistics ( max(metadata) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)), max(sampled_plan), - aggregation_interval + aggregation_interval, + array_remove(array_agg(index_rec), NULL) AS index_recommendations FROM ( SELECT @@ -1423,7 +1429,8 @@ CREATE VIEW crdb_internal.statement_statistics ( metadata, statistics, sampled_plan, - aggregation_interval + aggregation_interval, + index_recommendations FROM crdb_internal.cluster_statement_statistics UNION ALL @@ -1436,10 +1443,12 @@ CREATE VIEW crdb_internal.statement_statistics ( metadata, statistics, plan, - agg_interval + agg_interval, + index_recommendations FROM system.statement_statistics ) + LEFT JOIN LATERAL unnest(index_recommendations) AS index_rec ON true GROUP BY aggregated_ts, fingerprint_id, @@ -1455,7 +1464,8 @@ CREATE VIEW crdb_internal.statement_statistics ( metadata, statistics, sampled_plan, - aggregation_interval + aggregation_interval, + index_recommendations ) AS SELECT aggregated_ts, fingerprint_id, @@ -1465,7 +1475,8 @@ CREATE VIEW crdb_internal.statement_statistics ( max(metadata) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)), max(sampled_plan), - aggregation_interval + aggregation_interval, + array_remove(array_agg(index_rec), NULL) AS index_recommendations FROM ( SELECT @@ -1477,7 +1488,8 @@ CREATE VIEW crdb_internal.statement_statistics ( metadata, statistics, sampled_plan, - aggregation_interval + aggregation_interval, + index_recommendations FROM crdb_internal.cluster_statement_statistics UNION ALL @@ -1490,10 +1502,12 @@ CREATE VIEW crdb_internal.statement_statistics ( metadata, statistics, plan, - agg_interval + agg_interval, + index_recommendations FROM system.statement_statistics ) + LEFT JOIN LATERAL unnest(index_recommendations) AS index_rec ON true GROUP BY aggregated_ts, fingerprint_id, diff --git a/pkg/sql/logictest/testdata/logic_test/explain b/pkg/sql/logictest/testdata/logic_test/explain index 163b23b89f1e..6142ec3bbffd 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain +++ b/pkg/sql/logictest/testdata/logic_test/explain @@ -1,4 +1,4 @@ -# LogicTest: local local-vec-off local-spec-planning +# LogicTest: local local-vec-off statement ok CREATE TABLE t (a INT PRIMARY KEY) diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index adbebfb05374..515d7b5e1be0 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -1615,6 +1615,7 @@ system public check_sampling_probability system public primary system public statement_diagnostics_requests PRIMARY KEY NO NO system public 630200280_42_10_not_null system public statement_statistics CHECK NO NO system public 630200280_42_11_not_null system public statement_statistics CHECK NO NO +system public 630200280_42_12_not_null system public statement_statistics CHECK NO NO system public 630200280_42_1_not_null system public statement_statistics CHECK NO NO system public 630200280_42_2_not_null system public statement_statistics CHECK NO NO system public 630200280_42_3_not_null system public statement_statistics CHECK NO NO @@ -1801,6 +1802,7 @@ system public 630200280_41_1_not_null system public 630200280_41_2_not_null secret IS NOT NULL system public 630200280_41_3_not_null expiration IS NOT NULL system public 630200280_42_10_not_null plan IS NOT NULL +system public 630200280_42_12_not_null index_recommendations IS NOT NULL system public 630200280_42_1_not_null aggregated_ts IS NOT NULL system public 630200280_42_2_not_null fingerprint_id IS NOT NULL system public 630200280_42_3_not_null transaction_fingerprint_id IS NOT NULL @@ -2196,6 +2198,7 @@ system public statement_statistics aggregated_ts system public statement_statistics app_name 5 system public statement_statistics crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 11 system public statement_statistics fingerprint_id 2 +system public statement_statistics index_recommendations 12 system public statement_statistics metadata 8 system public statement_statistics node_id 6 system public statement_statistics plan 10 diff --git a/pkg/sql/logictest/testdata/logic_test/privilege_builtins b/pkg/sql/logictest/testdata/logic_test/privilege_builtins index e1102524b136..d0bf9223aaf8 100644 --- a/pkg/sql/logictest/testdata/logic_test/privilege_builtins +++ b/pkg/sql/logictest/testdata/logic_test/privilege_builtins @@ -1,5 +1,3 @@ -# LogicTest: !fakedist-spec-planning - statement ok CREATE USER bar; CREATE USER all_user_db; CREATE USER all_user_schema diff --git a/pkg/sql/opt/exec/execbuilder/testdata/zigzag_join b/pkg/sql/opt/exec/execbuilder/testdata/zigzag_join index 61d86e330eb3..1d158df95a27 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/zigzag_join +++ b/pkg/sql/opt/exec/execbuilder/testdata/zigzag_join @@ -1,4 +1,4 @@ -# LogicTest: local local-spec-planning +# LogicTest: local # Make sure that the zigzag join is used in the regression tests for #71093. statement ok diff --git a/pkg/sql/recursive_cte.go b/pkg/sql/recursive_cte.go index d97af43fc686..dd06bb0c831c 100644 --- a/pkg/sql/recursive_cte.go +++ b/pkg/sql/recursive_cte.go @@ -12,10 +12,12 @@ package sql import ( "context" + "strconv" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/tracing" ) // recursiveCTENode implements the logic for a recursive CTE: @@ -32,6 +34,8 @@ type recursiveCTENode struct { initial planNode genIterationFn exec.RecursiveCTEIterationFn + // iterationCount tracks the number of invocations of genIterationFn. + iterationCount int label string @@ -139,7 +143,11 @@ func (n *recursiveCTENode) Next(params runParams) (bool, error) { return false, err } - if err := runPlanInsidePlan(params, newPlan.(*planComponents), rowResultWriter(n)); err != nil { + n.iterationCount++ + opName := "recursive-cte-iteration-" + strconv.Itoa(n.iterationCount) + ctx, sp := tracing.ChildSpan(params.ctx, opName) + defer sp.Finish() + if err := runPlanInsidePlan(ctx, params, newPlan.(*planComponents), rowResultWriter(n)); err != nil { return false, err } diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index 816f6bc5206f..8238609271e7 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -41,6 +41,8 @@ go_library( "//pkg/sql/sqlstats/sslocal", "//pkg/sql/sqlstats/ssmemstorage", "//pkg/sql/sqlutil", + "//pkg/sql/types", + "//pkg/util", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/mon", diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 34e9de79f80a..64d0045c4f53 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -403,13 +404,14 @@ func (s *PersistedSQLStats) updateStatementStats( ) error { updateStmt := ` UPDATE system.statement_statistics -SET statistics = $1 -WHERE fingerprint_id = $2 - AND transaction_fingerprint_id = $3 - AND aggregated_ts = $4 - AND app_name = $5 - AND plan_hash = $6 - AND node_id = $7 +SET statistics = $1, +index_recommendations = $2 +WHERE fingerprint_id = $3 + AND transaction_fingerprint_id = $4 + AND aggregated_ts = $5 + AND app_name = $6 + AND plan_hash = $7 + AND node_id = $8 ` statisticsJSON, err := sqlstatsutil.BuildStmtStatisticsJSON(&stats.Stats) @@ -417,6 +419,12 @@ WHERE fingerprint_id = $2 return err } statistics := tree.NewDJSON(statisticsJSON) + indexRecommendations := tree.NewDArray(types.String) + for _, recommendation := range stats.Stats.IndexRecommendations { + if err := indexRecommendations.Append(tree.NewDString(recommendation)); err != nil { + return err + } + } rowsAffected, err := s.cfg.InternalExecutor.ExecEx( ctx, @@ -427,6 +435,7 @@ WHERE fingerprint_id = $2 }, updateStmt, statistics, // statistics + indexRecommendations, // index_recommendations serializedFingerprintID, // fingerprint_id serializedTransactionFingerprintID, // transaction_fingerprint_id aggregatedTs, // aggregated_ts @@ -465,7 +474,7 @@ func (s *PersistedSQLStats) insertStatementStats( ) (rowsAffected int, err error) { insertStmt := ` INSERT INTO system.statement_statistics -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, aggregated_ts, fingerprint_id, transaction_fingerprint_id, app_name, plan_hash, node_id) DO NOTHING @@ -486,6 +495,12 @@ DO NOTHING statistics := tree.NewDJSON(statisticsJSON) plan := tree.NewDJSON(sqlstatsutil.ExplainTreePlanNodeToJSON(&stats.Stats.SensitiveInfo.MostRecentPlanDescription)) + indexRecommendations := tree.NewDArray(types.String) + for _, recommendation := range stats.Stats.IndexRecommendations { + if err := indexRecommendations.Append(tree.NewDString(recommendation)); err != nil { + return 0, err + } + } rowsAffected, err = s.cfg.InternalExecutor.ExecEx( ctx, @@ -505,6 +520,7 @@ DO NOTHING metadata, // metadata statistics, // statistics plan, // plan + indexRecommendations, // index_recommendations ) return rowsAffected, err diff --git a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_encoding_test.go b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_encoding_test.go index 04b23341bf5c..cb9e98321245 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_encoding_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_encoding_test.go @@ -121,7 +121,8 @@ func TestSQLStatsJsonEncoding(t *testing.T) { "mean": {{.Float}}, "sqDiff": {{.Float}} } - } + }, + "index_recommendations": [{{joinStrings .StringArray}}] } ` @@ -239,7 +240,8 @@ func TestSQLStatsJsonEncoding(t *testing.T) { "mean": {{.Float}}, "sqDiff": {{.Float}} } - } + }, + "index_recommendations": [{{joinStrings .StringArray}}] } ` diff --git a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go index 240dee25204e..19c0b41d61c1 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go +++ b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go @@ -75,6 +75,7 @@ func (s *stmtStats) jsonFields() jsonFields { return jsonFields{ {"statistics", (*innerStmtStats)(s)}, {"execution_statistics", (*execStats)(&s.ExecStats)}, + {"index_recommendations", (*stringArray)(&s.IndexRecommendations)}, } } diff --git a/pkg/sql/sqlstats/persistedsqlstats/stmt_reader.go b/pkg/sql/sqlstats/persistedsqlstats/stmt_reader.go index b34d60684a12..fce7e1051a6e 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/stmt_reader.go +++ b/pkg/sql/sqlstats/persistedsqlstats/stmt_reader.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" ) @@ -111,6 +112,7 @@ func (s *PersistedSQLStats) getFetchQueryForStmtStatsTable( "statistics", "plan", "agg_interval", + "index_recommendations", } // [1]: selection columns @@ -188,5 +190,12 @@ func rowToStmtStats(row tree.Datums) (*roachpb.CollectedStatementStatistics, err aggInterval := tree.MustBeDInterval(row[8]).Duration stats.AggregationInterval = time.Duration(aggInterval.Nanos()) + recommendations := tree.MustBeDArray(row[9]) + var indexRecommendations []string + for _, s := range recommendations.Array { + indexRecommendations = util.CombineUniqueString(indexRecommendations, []string{string(tree.MustBeDString(s))}) + } + stats.Stats.IndexRecommendations = indexRecommendations + return &stats, nil } diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 631ced0b3088..322c53733058 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -126,6 +126,8 @@ func (s *Container) RecordStatement( stats.mu.data.LastExecTimestamp = s.getTimeNow() stats.mu.data.Nodes = util.CombineUniqueInt64(stats.mu.data.Nodes, value.Nodes) stats.mu.data.PlanGists = util.CombineUniqueString(stats.mu.data.PlanGists, []string{value.PlanGist}) + stats.mu.data.IndexRecommendations = util.CombineUniqueString(stats.mu.data.IndexRecommendations, value.IndexRecommendations) + // Note that some fields derived from tracing statements (such as // BytesSentOverNetwork) are not updated here because they are collected // on-demand. diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 33a1617ae929..1dc98687b69a 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -191,23 +191,24 @@ type Provider interface { // RecordedStmtStats stores the statistics of a statement to be recorded. type RecordedStmtStats struct { - SessionID clusterunique.ID - StatementID clusterunique.ID - AutoRetryCount int - RowsAffected int - ParseLatency float64 - PlanLatency float64 - RunLatency float64 - ServiceLatency float64 - OverheadLatency float64 - BytesRead int64 - RowsRead int64 - RowsWritten int64 - Nodes []int64 - StatementType tree.StatementType - Plan *roachpb.ExplainTreePlanNode - PlanGist string - StatementError error + SessionID clusterunique.ID + StatementID clusterunique.ID + AutoRetryCount int + RowsAffected int + ParseLatency float64 + PlanLatency float64 + RunLatency float64 + ServiceLatency float64 + OverheadLatency float64 + BytesRead int64 + RowsRead int64 + RowsWritten int64 + Nodes []int64 + StatementType tree.StatementType + Plan *roachpb.ExplainTreePlanNode + PlanGist string + StatementError error + IndexRecommendations []string } // RecordedTxnStats stores the statistics of a transaction to be recorded. diff --git a/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.fixture.ts index b1e1d3084d90..47124953d7e0 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.fixture.ts @@ -94,6 +94,7 @@ const statementStats: Required = { squared_diffs: 0.005, }, plan_gists: ["Ais="], + index_recommendations: [""], exec_stats: execStats, last_exec_timestamp: { seconds: Long.fromInt(1599670292), diff --git a/pkg/ui/workspaces/cluster-ui/src/util/appStats/appStats.spec.ts b/pkg/ui/workspaces/cluster-ui/src/util/appStats/appStats.spec.ts index fc1306dc57a7..f35b12c121ce 100644 --- a/pkg/ui/workspaces/cluster-ui/src/util/appStats/appStats.spec.ts +++ b/pkg/ui/workspaces/cluster-ui/src/util/appStats/appStats.spec.ts @@ -275,6 +275,7 @@ function randomStats( }, nodes: [Long.fromInt(1), Long.fromInt(3), Long.fromInt(4)], plan_gists: ["Ais="], + index_recommendations: [""], }; } diff --git a/pkg/ui/workspaces/cluster-ui/src/util/appStats/appStats.ts b/pkg/ui/workspaces/cluster-ui/src/util/appStats/appStats.ts index e047e04f4474..f579fb161829 100644 --- a/pkg/ui/workspaces/cluster-ui/src/util/appStats/appStats.ts +++ b/pkg/ui/workspaces/cluster-ui/src/util/appStats/appStats.ts @@ -139,6 +139,15 @@ export function addStatementStats( planGists = b.plan_gists; } + let indexRec: string[] = []; + if (a.index_recommendations && b.index_recommendations) { + indexRec = unique(a.index_recommendations.concat(b.index_recommendations)); + } else if (a.index_recommendations) { + indexRec = a.index_recommendations; + } else if (b.index_recommendations) { + indexRec = b.index_recommendations; + } + return { count: a.count.add(b.count), first_attempt_count: a.first_attempt_count.add(b.first_attempt_count), @@ -187,6 +196,7 @@ export function addStatementStats( : b.last_exec_timestamp, nodes: uniqueLong([...a.nodes, ...b.nodes]), plan_gists: planGists, + index_recommendations: indexRec, }; } diff --git a/pkg/ui/workspaces/db-console/src/views/statements/statements.spec.tsx b/pkg/ui/workspaces/db-console/src/views/statements/statements.spec.tsx index b6f463a87769..bebb26596b16 100644 --- a/pkg/ui/workspaces/db-console/src/views/statements/statements.spec.tsx +++ b/pkg/ui/workspaces/db-console/src/views/statements/statements.spec.tsx @@ -527,6 +527,7 @@ function makeStats(): Required { }, nodes: [Long.fromInt(1), Long.fromInt(2), Long.fromInt(3)], plan_gists: ["Ais="], + index_recommendations: [], }; } diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index a53bf7c8c4cf..8dbbb9d487e8 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -5,6 +5,7 @@ go_library( name = "upgrades", srcs = [ "alter_sql_instances_locality.go", + "alter_statement_statistics_index_recommendations.go", "alter_table_protected_timestamp_records.go", "comment_on_index_migration.go", "descriptor_utils.go", @@ -67,6 +68,7 @@ go_test( size = "large", srcs = [ "alter_sql_instances_locality_test.go", + "alter_statement_statistics_index_recommendations_test.go", "alter_table_protected_timestamp_records_test.go", "builtins_test.go", "comment_on_index_migration_external_test.go", diff --git a/pkg/upgrade/upgrades/alter_statement_statistics_index_recommendations.go b/pkg/upgrade/upgrades/alter_statement_statistics_index_recommendations.go new file mode 100644 index 000000000000..6bbcb5e73902 --- /dev/null +++ b/pkg/upgrade/upgrades/alter_statement_statistics_index_recommendations.go @@ -0,0 +1,42 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/upgrade" +) + +const addIndexRecommendationsCol = ` +ALTER TABLE system.statement_statistics +ADD COLUMN IF NOT EXISTS "index_recommendations" STRING[] NOT NULL DEFAULT (array[]::STRING[]) +FAMILY "primary" +` + +func alterSystemStatementStatisticsAddIndexRecommendations( + ctx context.Context, cs clusterversion.ClusterVersion, d upgrade.TenantDeps, _ *jobs.Job, +) error { + op := operation{ + name: "add-statement-statistics-index-recommendations-col", + schemaList: []string{"index_rec"}, + query: addIndexRecommendationsCol, + schemaExistsFn: hasColumn, + } + if err := migrateTable(ctx, cs, d, op, keys.StatementStatisticsTableID, systemschema.StatementStatisticsTable); err != nil { + return err + } + return nil +} diff --git a/pkg/upgrade/upgrades/alter_statement_statistics_index_recommendations_test.go b/pkg/upgrade/upgrades/alter_statement_statistics_index_recommendations_test.go new file mode 100644 index 000000000000..893bf951faab --- /dev/null +++ b/pkg/upgrade/upgrades/alter_statement_statistics_index_recommendations_test.go @@ -0,0 +1,193 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +func TestAlterSystemStatementStatisticsTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.AlterSystemStatementStatisticsAddIndexRecommendations - 1), + }, + }, + }, + } + + var ( + ctx = context.Background() + + tc = testcluster.StartTestCluster(t, 1, clusterArgs) + s = tc.Server(0) + sqlDB = tc.ServerConn(0) + ) + defer tc.Stopper().Stop(ctx) + + var ( + validationSchemas = []upgrades.Schema{ + {Name: "index_recommendations", ValidationFn: upgrades.HasColumn}, + {Name: "primary", ValidationFn: upgrades.HasColumnFamily}, + } + ) + + // Inject the old copy of the descriptor. + upgrades.InjectLegacyTable(ctx, t, s, systemschema.StatementStatisticsTable, getDeprecatedStatementStatisticsDescriptor) + // Validate that the table statement_statistics has the old schema. + upgrades.ValidateSchemaExists( + ctx, + t, + s, + sqlDB, + keys.StatementStatisticsTableID, + systemschema.StatementStatisticsTable, + []string{}, + validationSchemas, + false, /* expectExists */ + ) + // Run the upgrade. + upgrades.Upgrade( + t, + sqlDB, + clusterversion.AlterSystemStatementStatisticsAddIndexRecommendations, + nil, /* done */ + false, /* expectError */ + ) + // Validate that the table has new schema. + upgrades.ValidateSchemaExists( + ctx, + t, + s, + sqlDB, + keys.StatementStatisticsTableID, + systemschema.StatementStatisticsTable, + []string{}, + validationSchemas, + true, /* expectExists */ + ) +} + +// getDeprecatedSqlInstancesDescriptor returns the system.sql_instances +// table descriptor that was being used before adding a new column in the +// current version. +func getDeprecatedStatementStatisticsDescriptor() *descpb.TableDescriptor { + sqlStmtHashComputeExpr := `mod(fnv32(crdb_internal.datums_to_bytes(aggregated_ts, app_name, fingerprint_id, node_id, plan_hash, transaction_fingerprint_id)), 8:::INT8)` + + return &descpb.TableDescriptor{ + Name: string(catconstants.StatementStatisticsTableName), + ID: keys.StatementStatisticsTableID, + ParentID: keys.SystemDatabaseID, + UnexposedParentSchemaID: keys.PublicSchemaID, + Version: 1, + Columns: []descpb.ColumnDescriptor{ + {Name: "aggregated_ts", ID: 1, Type: types.TimestampTZ, Nullable: false}, + {Name: "fingerprint_id", ID: 2, Type: types.Bytes, Nullable: false}, + {Name: "transaction_fingerprint_id", ID: 3, Type: types.Bytes, Nullable: false}, + {Name: "plan_hash", ID: 4, Type: types.Bytes, Nullable: false}, + {Name: "app_name", ID: 5, Type: types.String, Nullable: false}, + {Name: "node_id", ID: 6, Type: types.Int, Nullable: false}, + {Name: "agg_interval", ID: 7, Type: types.Interval, Nullable: false}, + {Name: "metadata", ID: 8, Type: types.Jsonb, Nullable: false}, + {Name: "statistics", ID: 9, Type: types.Jsonb, Nullable: false}, + {Name: "plan", ID: 10, Type: types.Jsonb, Nullable: false}, + { + Name: "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8", + ID: 11, + Type: types.Int4, + Nullable: false, + ComputeExpr: &sqlStmtHashComputeExpr, + Hidden: true, + }, + }, + NextColumnID: 12, + Families: []descpb.ColumnFamilyDescriptor{ + { + Name: "primary", + ID: 0, + ColumnNames: []string{ + "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8", + "aggregated_ts", "fingerprint_id", "transaction_fingerprint_id", "plan_hash", "app_name", "node_id", + "agg_interval", "metadata", "statistics", "plan", + }, + ColumnIDs: []descpb.ColumnID{11, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + DefaultColumnID: 0, + }, + }, + NextFamilyID: 1, + PrimaryIndex: descpb.IndexDescriptor{ + Name: tabledesc.LegacyPrimaryKeyIndexName, + ID: 1, + Unique: true, + KeyColumnNames: []string{ + "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8", + "aggregated_ts", + "fingerprint_id", + "transaction_fingerprint_id", + "plan_hash", + "app_name", + "node_id", + }, + KeyColumnDirections: []catpb.IndexColumn_Direction{ + catpb.IndexColumn_ASC, + catpb.IndexColumn_ASC, + catpb.IndexColumn_ASC, + catpb.IndexColumn_ASC, + catpb.IndexColumn_ASC, + catpb.IndexColumn_ASC, + catpb.IndexColumn_ASC, + }, + KeyColumnIDs: []descpb.ColumnID{11, 1, 2, 3, 4, 5, 6}, + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + Sharded: catpb.ShardedDescriptor{ + IsSharded: true, + Name: "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8", + ShardBuckets: 8, + ColumnNames: []string{ + "aggregated_ts", + "app_name", + "fingerprint_id", + "node_id", + "plan_hash", + "transaction_fingerprint_id", + }, + }, + }, + NextIndexID: 3, + Privileges: catpb.NewCustomSuperuserPrivilegeDescriptor(privilege.ReadWriteData, username.NodeUserName()), + NextMutationID: 1, + FormatVersion: 3, + } +} diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index 627a9f1cdcae..38db772ea6f2 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -140,6 +140,12 @@ var upgrades = []upgrade.Upgrade{ NoPrecondition, systemExternalConnectionsTableMigration, ), + upgrade.NewTenantUpgrade( + "add column index_recommendations to table system.statement_statistics", + toCV(clusterversion.AlterSystemStatementStatisticsAddIndexRecommendations), + NoPrecondition, + alterSystemStatementStatisticsAddIndexRecommendations, + ), } func init() {