From 383313feb9d0b73cc25b65d311183a1d44f1e3b0 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Tue, 14 Jun 2022 08:58:21 -0700 Subject: [PATCH] sql: add locality to system.sql_instances table This PR adds the column `locality` to the `system.sql_instances` table that contains the locality (e.g., region) of a SQL instance. The encoded locality is a JSONB representing the `roachpb.Locality` that may have been provided when the instance was created. This change also pipes the locality through `InstanceInfo`. This will allow us to determine and use locality information of other SQL instances, e.g. in DistSQL for multi-tenant locality-awareness distribution planning. Informs: #80678 Release note (sql change): Table `system.sql_instances` has a new column, `locality`, that stores the locality of a SQL instance if it was provided when the instance was started. This exposes a SQL instance's locality to other instances in the cluster for query planning. --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 7 + pkg/clusterversion/key_string.go | 5 +- pkg/server/server_sql.go | 2 +- pkg/sql/catalog/systemschema/system.go | 10 +- .../systemschema_test/testdata/bootstrap | 1 + .../logic_test/distsql_tenant_locality | 10 +- .../testdata/logic_test/information_schema | 1 + pkg/sql/sqlinstance/BUILD.bazel | 1 + .../sqlinstance/instanceprovider/BUILD.bazel | 1 + .../instanceprovider/instanceprovider.go | 8 +- .../sqlinstance/instancestorage/BUILD.bazel | 2 + .../instancestorage/instancereader.go | 5 +- .../instancestorage/instancereader_test.go | 40 +++-- .../instancestorage/instancestorage.go | 11 +- .../instancestorage/instancestorage_test.go | 42 ++++-- .../sqlinstance/instancestorage/row_codec.go | 43 +++++- .../instancestorage/test_helpers.go | 10 +- pkg/sql/sqlinstance/sqlinstance.go | 6 +- pkg/upgrade/upgrades/BUILD.bazel | 3 + .../upgrades/alter_sql_instances_locality.go | 42 ++++++ .../alter_sql_instances_locality_test.go | 141 ++++++++++++++++++ pkg/upgrade/upgrades/upgrades.go | 6 + pkg/util/json/json.go | 44 ++++++ 25 files changed, 402 insertions(+), 43 deletions(-) create mode 100644 pkg/upgrade/upgrades/alter_sql_instances_locality.go create mode 100644 pkg/upgrade/upgrades/alter_sql_instances_locality_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 2d32b70934f0..9b71eb405321 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -281,4 +281,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-24 set the active cluster version in the format '.' +version version 22.1-26 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index afb103b53e7f..149a8db1f51a 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -212,6 +212,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-24set the active cluster version in the format '.' +versionversion22.1-26set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 9fa2513ae98b..a1af43cb2d0f 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -385,6 +385,9 @@ const ( // EnablePredicateProjectionChangefeed indicates that changefeeds support // predicates and projections. EnablePredicateProjectionChangefeed + // AlterSystemSQLInstancesAddLocality adds a locality column to the + // system.sql_instances table. + AlterSystemSQLInstancesAddLocality // ************************************************* // Step (1): Add new versions here. @@ -686,6 +689,10 @@ var versionsSingleton = keyedVersions{ Key: EnablePredicateProjectionChangefeed, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 24}, }, + { + Key: AlterSystemSQLInstancesAddLocality, + Version: roachpb.Version{Major: 22, Minor: 1, Internal: 26}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index aac4fecd241b..6403cde27bac 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -74,11 +74,12 @@ func _() { _ = x[AddSSTableTombstones-63] _ = x[SystemPrivilegesTable-64] _ = x[EnablePredicateProjectionChangefeed-65] + _ = x[AlterSystemSQLInstancesAddLocality-66] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemovalBackupResolutionInJobLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirDateStyleIntervalStyleCastRewriteEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsForecastStatsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeed" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemovalBackupResolutionInJobLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirDateStyleIntervalStyleCastRewriteEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsForecastStatsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocality" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 166, 207, 233, 252, 286, 298, 329, 353, 374, 402, 432, 460, 481, 494, 513, 547, 585, 619, 651, 687, 719, 755, 797, 816, 856, 888, 907, 931, 952, 983, 1001, 1042, 1072, 1083, 1114, 1137, 1170, 1194, 1218, 1240, 1253, 1265, 1291, 1305, 1326, 1344, 1349, 1358, 1373, 1407, 1441, 1463, 1483, 1502, 1535, 1554, 1574, 1595, 1630} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 166, 207, 233, 252, 286, 298, 329, 353, 374, 402, 432, 460, 481, 494, 513, 547, 585, 619, 651, 687, 719, 755, 797, 816, 856, 888, 907, 931, 952, 983, 1001, 1042, 1072, 1083, 1114, 1137, 1170, 1194, 1218, 1240, 1253, 1265, 1291, 1305, 1326, 1344, 1349, 1358, 1373, 1407, 1441, 1463, 1483, 1502, 1535, 1554, 1574, 1595, 1630, 1664} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 92776b5bc797..042d161bf4e7 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -406,7 +406,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, ) cfg.sqlInstanceProvider = instanceprovider.New( - cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.rangeFeedFactory, cfg.clock, + cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock, ) if !codec.ForSystemTenant() { diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 8b2a4ad68a07..5be87b016901 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -613,8 +613,9 @@ CREATE TABLE system.sql_instances ( id INT NOT NULL, addr STRING, session_id BYTES, + locality JSONB, CONSTRAINT "primary" PRIMARY KEY (id), - FAMILY "primary" (id, addr, session_id) + FAMILY "primary" (id, addr, session_id, locality) )` SpanConfigurationsTableSchema = ` @@ -2265,7 +2266,7 @@ var ( // SQLInstancesTable is the descriptor for the sqlinstances table // It stores information about all the SQL instances for a tenant - // and their associated session and address information. + // and their associated session, locality, and address information. SQLInstancesTable = registerSystemTable( SQLInstancesTableSchema, systemTable( @@ -2275,13 +2276,14 @@ var ( {Name: "id", ID: 1, Type: types.Int, Nullable: false}, {Name: "addr", ID: 2, Type: types.String, Nullable: true}, {Name: "session_id", ID: 3, Type: types.Bytes, Nullable: true}, + {Name: "locality", ID: 4, Type: types.Jsonb, Nullable: true}, }, []descpb.ColumnFamilyDescriptor{ { Name: "primary", ID: 0, - ColumnNames: []string{"id", "addr", "session_id"}, - ColumnIDs: []descpb.ColumnID{1, 2, 3}, + ColumnNames: []string{"id", "addr", "session_id", "locality"}, + ColumnIDs: []descpb.ColumnID{1, 2, 3, 4}, DefaultColumnID: 0, }, }, diff --git a/pkg/sql/catalog/systemschema_test/testdata/bootstrap b/pkg/sql/catalog/systemschema_test/testdata/bootstrap index b302e128ca09..6b60f825b8b9 100644 --- a/pkg/sql/catalog/systemschema_test/testdata/bootstrap +++ b/pkg/sql/catalog/systemschema_test/testdata/bootstrap @@ -346,6 +346,7 @@ CREATE TABLE public.sql_instances ( id INT8 NOT NULL, addr STRING NULL, session_id BYTES NULL, + locality STRING NULL, CONSTRAINT "primary" PRIMARY KEY (id ASC) ); CREATE TABLE public.span_configurations ( diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_tenant_locality b/pkg/sql/logictest/testdata/logic_test/distsql_tenant_locality index 2a5566da935e..da2197da5eab 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_tenant_locality +++ b/pkg/sql/logictest/testdata/logic_test/distsql_tenant_locality @@ -31,4 +31,12 @@ SELECT start_pretty, lease_holder FROM crdb_internal.ranges WHERE start_pretty L # TODO(harding): Once locality-aware distribution is implemented, run queries in # the secondary tenant. -#user root +user root + +# Check sql instance locality in the secondary tenant. +query IT +SELECT id, locality FROM system.sql_instances +---- +1 {"Tiers": "region=test"} +2 {"Tiers": "region=test1"} +3 {"Tiers": "region=test2"} diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index e8c47ec5d3f7..ce2eaaaf62b8 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -2151,6 +2151,7 @@ system pg_extension spatial_ref_sys srid system pg_extension spatial_ref_sys srtext 4 system public sql_instances addr 2 system public sql_instances id 1 +system public sql_instances locality 4 system public sql_instances session_id 3 system public sqlliveness expiration 2 system public sqlliveness session_id 1 diff --git a/pkg/sql/sqlinstance/BUILD.bazel b/pkg/sql/sqlinstance/BUILD.bazel index b709a74310ba..5f41e67e443d 100644 --- a/pkg/sql/sqlinstance/BUILD.bazel +++ b/pkg/sql/sqlinstance/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/roachpb", "//pkg/sql/sqlliveness", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/sql/sqlinstance/instanceprovider/BUILD.bazel b/pkg/sql/sqlinstance/instanceprovider/BUILD.bazel index 62dcff3e161a..c83ca28dc194 100644 --- a/pkg/sql/sqlinstance/instanceprovider/BUILD.bazel +++ b/pkg/sql/sqlinstance/instanceprovider/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/rangefeed", + "//pkg/roachpb", "//pkg/sql/sqlinstance", "//pkg/sql/sqlinstance/instancestorage", "//pkg/sql/sqlliveness", diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go index c9abb511f57c..363573bf6583 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" @@ -31,7 +32,7 @@ import ( ) type writer interface { - CreateInstance(ctx context.Context, sessionID sqlliveness.SessionID, sessionExpiration hlc.Timestamp, instanceAddr string) (base.SQLInstanceID, error) + CreateInstance(ctx context.Context, sessionID sqlliveness.SessionID, sessionExpiration hlc.Timestamp, instanceAddr string, locality roachpb.Locality) (base.SQLInstanceID, error) ReleaseInstanceID(ctx context.Context, instanceID base.SQLInstanceID) error } @@ -42,6 +43,7 @@ type provider struct { stopper *stop.Stopper instanceAddr string session sqlliveness.Instance + locality roachpb.Locality initOnce sync.Once initialized chan struct{} instanceID base.SQLInstanceID @@ -60,6 +62,7 @@ func New( codec keys.SQLCodec, slProvider sqlliveness.Provider, addr string, + locality roachpb.Locality, f *rangefeed.Factory, clock *hlc.Clock, ) sqlinstance.Provider { @@ -71,6 +74,7 @@ func New( Reader: reader, session: slProvider, instanceAddr: addr, + locality: locality, initialized: make(chan struct{}), } return p @@ -142,7 +146,7 @@ func (p *provider) initialize(ctx context.Context) error { if err != nil { return errors.Wrap(err, "constructing session") } - instanceID, err := p.storage.CreateInstance(ctx, session.ID(), session.Expiration(), p.instanceAddr) + instanceID, err := p.storage.CreateInstance(ctx, session.ID(), session.Expiration(), p.instanceAddr, p.locality) if err != nil { return err } diff --git a/pkg/sql/sqlinstance/instancestorage/BUILD.bazel b/pkg/sql/sqlinstance/instancestorage/BUILD.bazel index 4623a3531156..ca85716bea0a 100644 --- a/pkg/sql/sqlinstance/instancestorage/BUILD.bazel +++ b/pkg/sql/sqlinstance/instancestorage/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//pkg/util/encoding", "//pkg/util/grpcutil", "//pkg/util/hlc", + "//pkg/util/json", "//pkg/util/log", "//pkg/util/stop", "//pkg/util/syncutil", @@ -48,6 +49,7 @@ go_test( "//pkg/base", "//pkg/keys", "//pkg/kv/kvclient/rangefeed", + "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", diff --git a/pkg/sql/sqlinstance/instancestorage/instancereader.go b/pkg/sql/sqlinstance/instancestorage/instancereader.go index 9093604dd182..916fedc74e2c 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancereader.go +++ b/pkg/sql/sqlinstance/instancestorage/instancereader.go @@ -118,7 +118,7 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed { updateCacheFn := func( ctx context.Context, keyVal *roachpb.RangeFeedValue, ) { - instanceID, addr, sessionID, timestamp, tombstone, err := r.rowcodec.decodeRow(kv.KeyValue{ + instanceID, addr, sessionID, locality, timestamp, tombstone, err := r.rowcodec.decodeRow(kv.KeyValue{ Key: keyVal.Key, Value: &keyVal.Value, }) @@ -131,6 +131,7 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed { addr: addr, sessionID: sessionID, timestamp: timestamp, + locality: locality, } r.updateInstanceMap(instance, tombstone) } @@ -195,6 +196,7 @@ func (r *Reader) GetInstance( InstanceID: instance.instanceID, InstanceAddr: instance.addr, SessionID: instance.sessionID, + Locality: instance.locality, } return instanceInfo, nil } @@ -218,6 +220,7 @@ func (r *Reader) GetAllInstances( InstanceID: liveInstance.instanceID, InstanceAddr: liveInstance.addr, SessionID: liveInstance.sessionID, + Locality: liveInstance.locality, } sqlInstances = append(sqlInstances, instanceInfo) } diff --git a/pkg/sql/sqlinstance/instancestorage/instancereader_test.go b/pkg/sql/sqlinstance/instancestorage/instancereader_test.go index f74beb7b28a8..790711674285 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancereader_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancereader_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" @@ -73,12 +74,13 @@ func TestReader(t *testing.T) { require.NoError(t, reader.Start(ctx)) const sessionID = sqlliveness.SessionID("session_id") const addr = "addr" + locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}} // Set a high enough expiration to ensure the session stays // live through the test. const expiration = 10 * time.Minute { sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr) + id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality) if err != nil { t.Fatal(err) } @@ -94,6 +96,9 @@ func TestReader(t *testing.T) { if addr != instanceInfo.InstanceAddr { return errors.Newf("expected instance address %s != actual instance address %s", addr, instanceInfo.InstanceAddr) } + if !locality.Equals(instanceInfo.Locality) { + return errors.Newf("expected instance locality %s != actual instance locality %s", locality, instanceInfo.Locality) + } return nil }) } @@ -109,8 +114,13 @@ func TestReader(t *testing.T) { instanceIDs := []base.SQLInstanceID{1, 2, 3} addresses := []string{"addr1", "addr2", "addr3"} sessionIDs := []sqlliveness.SessionID{"session1", "session2", "session3"} + localities := []roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "region1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "region2"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "region3"}}}, + } - testOutputFn := func(expectedIDs []base.SQLInstanceID, expectedAddresses []string, expectedSessionIDs []sqlliveness.SessionID, actualInstances []sqlinstance.InstanceInfo) error { + testOutputFn := func(expectedIDs []base.SQLInstanceID, expectedAddresses []string, expectedSessionIDs []sqlliveness.SessionID, expectedLocalities []roachpb.Locality, actualInstances []sqlinstance.InstanceInfo) error { if len(expectedIDs) != len(actualInstances) { return errors.Newf("expected %d instances, got %d instances", len(expectedIDs), len(actualInstances)) } @@ -124,6 +134,9 @@ func TestReader(t *testing.T) { if expectedSessionIDs[index] != instance.SessionID { return errors.Newf("expected session ID %s != actual session ID %s", expectedSessionIDs[index], instance.SessionID) } + if !expectedLocalities[index].Equals(instance.Locality) { + return errors.Newf("expected instance locality %s != actual instance locality %s", expectedLocalities[index], instance.Locality) + } } return nil } @@ -131,7 +144,7 @@ func TestReader(t *testing.T) { // Set up mock data within instance and session storage. for index, addr := range addresses { sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - _, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addr) + _, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addr, localities[index]) if err != nil { t.Fatal(err) } @@ -147,7 +160,7 @@ func TestReader(t *testing.T) { return err } sortInstances(instances) - return testOutputFn(instanceIDs, addresses, sessionIDs, instances) + return testOutputFn(instanceIDs, addresses, sessionIDs, localities, instances) }) } @@ -163,7 +176,7 @@ func TestReader(t *testing.T) { return err } sortInstances(instances) - return testOutputFn(instanceIDs[1:], addresses[1:], sessionIDs[1:], instances) + return testOutputFn(instanceIDs[1:], addresses[1:], sessionIDs[1:], localities[1:], instances) }) } @@ -179,7 +192,7 @@ func TestReader(t *testing.T) { return err } sortInstances(instances) - return testOutputFn(instanceIDs[2:], addresses[2:], sessionIDs[2:], instances) + return testOutputFn(instanceIDs[2:], addresses[2:], sessionIDs[2:], localities[2:], instances) }) } @@ -188,8 +201,9 @@ func TestReader(t *testing.T) { // when instance information isn't released correctly prior to SQL instance shutdown. { sessionID := sqlliveness.SessionID("session4") + locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region4"}}} sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2]) + id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2], locality) if err != nil { t.Fatal(err) } @@ -203,7 +217,7 @@ func TestReader(t *testing.T) { return err } sortInstances(instances) - return testOutputFn([]base.SQLInstanceID{id}, []string{addresses[2]}, []sqlliveness.SessionID{sessionID}, instances) + return testOutputFn([]base.SQLInstanceID{id}, []string{addresses[2]}, []sqlliveness.SessionID{sessionID}, []roachpb.Locality{locality}, instances) }) } }) @@ -217,11 +231,16 @@ func TestReader(t *testing.T) { instanceIDs := [...]base.SQLInstanceID{1, 2, 3} addresses := [...]string{"addr1", "addr2", "addr3"} sessionIDs := [...]sqlliveness.SessionID{"session1", "session2", "session3"} + localities := [...]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "region1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "region2"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "region3"}}}, + } { // Set up mock data within instance and session storage. for index, addr := range addresses { sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - _, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addr) + _, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addr, localities[index]) if err != nil { t.Fatal(err) } @@ -242,6 +261,9 @@ func TestReader(t *testing.T) { if addresses[0] != instanceInfo.InstanceAddr { return errors.Newf("expected instance address %s != actual instance address %s", addresses[0], instanceInfo.InstanceAddr) } + if !localities[0].Equals(instanceInfo.Locality) { + return errors.Newf("expected instance locality %s != actual instance locality %s", localities[0], instanceInfo.Locality) + } return nil }) } diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage.go b/pkg/sql/sqlinstance/instancestorage/instancestorage.go index b89838af4cda..2f32a49d8901 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" @@ -42,6 +43,7 @@ type instancerow struct { instanceID base.SQLInstanceID addr string sessionID sqlliveness.SessionID + locality roachpb.Locality timestamp hlc.Timestamp } @@ -72,6 +74,7 @@ func (s *Storage) CreateInstance( sessionID sqlliveness.SessionID, sessionExpiration hlc.Timestamp, addr string, + locality roachpb.Locality, ) (instanceID base.SQLInstanceID, _ error) { if len(addr) == 0 { return base.SQLInstanceID(0), errors.New("no address information for instance") @@ -92,7 +95,7 @@ func (s *Storage) CreateInstance( return err } instanceID = s.getAvailableInstanceID(ctx, rows) - row, err := s.rowcodec.encodeRow(instanceID, addr, sessionID, s.codec, s.tableID) + row, err := s.rowcodec.encodeRow(instanceID, addr, sessionID, locality, s.codec, s.tableID) if err != nil { log.Warningf(ctx, "failed to encode row for instance id %d: %v", instanceID, err) return err @@ -162,7 +165,7 @@ func (s *Storage) getInstanceData( if row.Value == nil { return instancerow{}, sqlinstance.NonExistentInstanceError } - _, addr, sessionID, timestamp, _, err := s.rowcodec.decodeRow(row) + _, addr, sessionID, locality, timestamp, _, err := s.rowcodec.decodeRow(row) if err != nil { return instancerow{}, errors.Wrapf(err, "could not decode data for instance %d", instanceID) } @@ -171,6 +174,7 @@ func (s *Storage) getInstanceData( addr: addr, sessionID: sessionID, timestamp: timestamp, + locality: locality, } return instanceData, nil } @@ -204,7 +208,7 @@ func (s *Storage) getAllInstanceRows( return nil, err } for i := range rows { - instanceID, addr, sessionID, timestamp, _, err := s.rowcodec.decodeRow(rows[i]) + instanceID, addr, sessionID, locality, timestamp, _, err := s.rowcodec.decodeRow(rows[i]) if err != nil { log.Warningf(ctx, "failed to decode row %v: %v", rows[i].Key, err) return nil, err @@ -214,6 +218,7 @@ func (s *Storage) getAllInstanceRows( addr: addr, sessionID: sessionID, timestamp: timestamp, + locality: locality, } instances = append(instances, curInstance) } diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go index 574cac14a604..f7ec52c2f1b8 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" @@ -73,9 +74,10 @@ func TestStorage(t *testing.T) { const id = base.SQLInstanceID(1) const sessionID = sqlliveness.SessionID("session_id") const addr = "addr" + locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}} const expiration = time.Minute { - instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr) + instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality) require.NoError(t, err) require.Equal(t, id, instanceID) } @@ -88,10 +90,15 @@ func TestStorage(t *testing.T) { instanceIDs := [...]base.SQLInstanceID{1, 2, 3} addresses := [...]string{"addr1", "addr2", "addr3"} sessionIDs := [...]sqlliveness.SessionID{"session1", "session2", "session3"} + localities := [...]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "region1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "region2"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "region3"}}}, + } { for index, addr := range addresses { sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - instanceID, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addr) + instanceID, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addr, localities[index]) require.NoError(t, err) err = slStorage.Insert(ctx, sessionIDs[index], sessionExpiry) if err != nil { @@ -111,6 +118,7 @@ func TestStorage(t *testing.T) { require.Equal(t, instanceIDs[index], instance.InstanceID) require.Equal(t, sessionIDs[index], instance.SessionID) require.Equal(t, addresses[index], instance.InstanceAddr) + require.Equal(t, localities[index], instance.Locality) } } @@ -125,6 +133,7 @@ func TestStorage(t *testing.T) { require.Equal(t, instanceIDs[index+1], instance.InstanceID) require.Equal(t, sessionIDs[index+1], instance.SessionID) require.Equal(t, addresses[index+1], instance.InstanceAddr) + require.Equal(t, localities[index+1], instance.Locality) } } @@ -135,8 +144,9 @@ func TestStorage(t *testing.T) { var instanceID base.SQLInstanceID newSessionID := sqlliveness.SessionID("session4") newAddr := "addr4" + newLocality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region4"}}} newSessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - instanceID, err = storage.CreateInstance(ctx, newSessionID, newSessionExpiry, newAddr) + instanceID, err = storage.CreateInstance(ctx, newSessionID, newSessionExpiry, newAddr, newLocality) require.NoError(t, err) require.Equal(t, instanceIDs[0], instanceID) var instances []sqlinstance.InstanceInfo @@ -149,10 +159,12 @@ func TestStorage(t *testing.T) { if index == 0 { require.Equal(t, newSessionID, instance.SessionID) require.Equal(t, newAddr, instance.InstanceAddr) + require.Equal(t, newLocality, instance.Locality) continue } require.Equal(t, sessionIDs[index], instance.SessionID) require.Equal(t, addresses[index], instance.InstanceAddr) + require.Equal(t, localities[index], instance.Locality) } } @@ -162,8 +174,9 @@ func TestStorage(t *testing.T) { var instanceID base.SQLInstanceID newSessionID := sqlliveness.SessionID("session5") newAddr := "addr5" + newLocality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region5"}}} newSessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - instanceID, err = storage.CreateInstance(ctx, newSessionID, newSessionExpiry, newAddr) + instanceID, err = storage.CreateInstance(ctx, newSessionID, newSessionExpiry, newAddr, newLocality) require.NoError(t, err) require.Equal(t, instanceIDs[0], instanceID) var instances []sqlinstance.InstanceInfo @@ -176,10 +189,12 @@ func TestStorage(t *testing.T) { if index == 0 { require.Equal(t, newSessionID, instance.SessionID) require.Equal(t, newAddr, instance.InstanceAddr) + require.Equal(t, newLocality, instance.Locality) continue } require.Equal(t, sessionIDs[index], instance.SessionID) require.Equal(t, addresses[index], instance.InstanceAddr) + require.Equal(t, localities[index], instance.Locality) } } }) @@ -209,14 +224,19 @@ func TestSQLAccess(t *testing.T) { const ( sessionID = sqlliveness.SessionID("session") addr = "addr" + localityStr = "{\"Tiers\": \"region=test1,zone=test2\"}" expiration = time.Minute - expectedNumCols = 3 + expectedNumCols = 4 ) - instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr) + var locality roachpb.Locality + if err := locality.Set(localityStr); err != nil { + t.Fatal(err) + } + instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality) require.NoError(t, err) // Query the table through SQL and verify the query completes successfully. - rows := tDB.Query(t, fmt.Sprintf("SELECT id, addr, session_id FROM \"%s\".sql_instances", dbName)) + rows := tDB.Query(t, fmt.Sprintf("SELECT id, addr, session_id, locality FROM \"%s\".sql_instances", dbName)) defer rows.Close() columns, err := rows.Columns() require.NoError(t, err) @@ -224,12 +244,14 @@ func TestSQLAccess(t *testing.T) { var parsedInstanceID base.SQLInstanceID var parsedSessionID sqlliveness.SessionID var parsedAddr string + var parsedLocality string rows.Next() - err = rows.Scan(&parsedInstanceID, &parsedAddr, &parsedSessionID) + err = rows.Scan(&parsedInstanceID, &parsedAddr, &parsedSessionID, &parsedLocality) require.NoError(t, err) require.Equal(t, instanceID, parsedInstanceID) require.Equal(t, sessionID, parsedSessionID) require.Equal(t, addr, parsedAddr) + require.Equal(t, localityStr, parsedLocality) // Verify that the table only contains one row as expected. hasAnotherRow := rows.Next() @@ -268,6 +290,7 @@ func TestConcurrentCreateAndRelease(t *testing.T) { addr = "addr" expiration = time.Minute ) + locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test-region"}}} sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) err := slStorage.Insert(ctx, sessionID, sessionExpiry) if err != nil { @@ -292,7 +315,7 @@ func TestConcurrentCreateAndRelease(t *testing.T) { if err != nil { t.Fatal(err) } - instanceID, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr) + instanceID, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality) require.NoError(t, err) if len(state.freeInstances) > 0 { _, free := state.freeInstances[instanceID] @@ -352,6 +375,7 @@ func TestConcurrentCreateAndRelease(t *testing.T) { require.NoError(t, err) require.Equal(t, addr, instanceInfo.InstanceAddr) require.Equal(t, sessionID, instanceInfo.SessionID) + require.Equal(t, locality, instanceInfo.Locality) _, live := state.liveInstances[i] require.True(t, live) } diff --git a/pkg/sql/sqlinstance/instancestorage/row_codec.go b/pkg/sql/sqlinstance/instancestorage/row_codec.go index 956c0d244aec..307d46ad4a8f 100644 --- a/pkg/sql/sqlinstance/instancestorage/row_codec.go +++ b/pkg/sql/sqlinstance/instancestorage/row_codec.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/errors" ) @@ -50,6 +51,7 @@ func (d *rowCodec) encodeRow( instanceID base.SQLInstanceID, addr string, sessionID sqlliveness.SessionID, + locality roachpb.Locality, codec keys.SQLCodec, tableID descpb.ID, ) (kv kv.KeyValue, err error) { @@ -66,6 +68,15 @@ func (d *rowCodec) encodeRow( if err != nil { return kv, err } + // Preserve the ordering of locality.Tiers, even though we convert it to json. + builder := json.NewObjectBuilder(1) + builder.Add("Tiers", json.FromString(locality.String())) + localityDatum := tree.NewDJSON(builder.Build()) + localityColDiff := valueside.MakeColumnIDDelta(d.columns[2].GetID(), d.columns[3].GetID()) + valueBuf, err = valueside.Encode(valueBuf, localityColDiff, localityDatum, []byte(nil)) + if err != nil { + return kv, err + } var v roachpb.Value v.SetTuple(valueBuf) kv.Value = &v @@ -80,6 +91,7 @@ func (d *rowCodec) decodeRow( instanceID base.SQLInstanceID, addr string, sessionID sqlliveness.SessionID, + locality roachpb.Locality, timestamp hlc.Timestamp, tombstone bool, _ error, @@ -91,26 +103,26 @@ func (d *rowCodec) decodeRow( row := make([]rowenc.EncDatum, 1) _, _, err := rowenc.DecodeIndexKey(d.codec, types, row, nil, kv.Key) if err != nil { - return base.SQLInstanceID(0), "", "", hlc.Timestamp{}, false, errors.Wrap(err, "failed to decode key") + return base.SQLInstanceID(0), "", "", roachpb.Locality{}, hlc.Timestamp{}, false, errors.Wrap(err, "failed to decode key") } if err := row[0].EnsureDecoded(types[0], &alloc); err != nil { - return base.SQLInstanceID(0), "", "", hlc.Timestamp{}, false, err + return base.SQLInstanceID(0), "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err } instanceID = base.SQLInstanceID(tree.MustBeDInt(row[0].Datum)) } if !kv.Value.IsPresent() { - return instanceID, "", "", hlc.Timestamp{}, true, nil + return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, true, nil } timestamp = kv.Value.Timestamp // The rest of the columns are stored as a family. bytes, err := kv.Value.GetTuple() if err != nil { - return instanceID, "", "", hlc.Timestamp{}, false, err + return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err } datums, err := d.decoder.Decode(&alloc, bytes) if err != nil { - return instanceID, "", "", hlc.Timestamp{}, false, err + return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err } if addrVal := datums[1]; addrVal != tree.DNull { @@ -119,8 +131,27 @@ func (d *rowCodec) decodeRow( if sessionIDVal := datums[2]; sessionIDVal != tree.DNull { sessionID = sqlliveness.SessionID(tree.MustBeDBytes(sessionIDVal)) } + locality = roachpb.Locality{} + if localityVal := datums[3]; localityVal != tree.DNull { + localityJ := tree.MustBeDJSON(localityVal) + v, err := localityJ.FetchValKey("Tiers") + if err != nil { + return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err + } + if v != nil { + vStr, err := v.AsText() + if err != nil { + return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err + } + if len(*vStr) > 0 { + if err := locality.Set(*vStr); err != nil { + return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err + } + } + } + } - return instanceID, addr, sessionID, timestamp, false, nil + return instanceID, addr, sessionID, locality, timestamp, false, nil } func makeTablePrefix(codec keys.SQLCodec, tableID descpb.ID) roachpb.Key { diff --git a/pkg/sql/sqlinstance/instancestorage/test_helpers.go b/pkg/sql/sqlinstance/instancestorage/test_helpers.go index 5d49d183bc16..b93203cc2dca 100644 --- a/pkg/sql/sqlinstance/instancestorage/test_helpers.go +++ b/pkg/sql/sqlinstance/instancestorage/test_helpers.go @@ -16,6 +16,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -42,7 +43,11 @@ func NewFakeStorage() *FakeStorage { // CreateInstance implements the instanceprovider.writer interface. func (f *FakeStorage) CreateInstance( - _ context.Context, sessionID sqlliveness.SessionID, _ hlc.Timestamp, addr string, + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + addr string, + locality roachpb.Locality, ) (base.SQLInstanceID, error) { f.mu.Lock() defer f.mu.Unlock() @@ -50,6 +55,7 @@ func (f *FakeStorage) CreateInstance( InstanceID: f.mu.instanceIDCtr, InstanceAddr: addr, SessionID: sessionID, + Locality: locality, } f.mu.instances[f.mu.instanceIDCtr] = i f.mu.instanceIDCtr++ @@ -77,6 +83,7 @@ func (s *Storage) GetInstanceDataForTest( InstanceID: i.instanceID, InstanceAddr: i.addr, SessionID: i.sessionID, + Locality: i.locality, } return instanceInfo, nil } @@ -95,6 +102,7 @@ func (s *Storage) GetAllInstancesDataForTest( InstanceID: instance.instanceID, InstanceAddr: instance.addr, SessionID: instance.sessionID, + Locality: instance.locality, } instances = append(instances, instanceInfo) } diff --git a/pkg/sql/sqlinstance/sqlinstance.go b/pkg/sql/sqlinstance/sqlinstance.go index 11247e2718dc..62d726e414b3 100644 --- a/pkg/sql/sqlinstance/sqlinstance.go +++ b/pkg/sql/sqlinstance/sqlinstance.go @@ -19,16 +19,18 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/errors" ) -// InstanceInfo exposes information on a SQL instance such as ID, network address and -// the associated sqlliveness.SessionID. +// InstanceInfo exposes information on a SQL instance such as ID, network +// address, the associated sqlliveness.SessionID, and the instance's locality. type InstanceInfo struct { InstanceID base.SQLInstanceID InstanceAddr string SessionID sqlliveness.SessionID + Locality roachpb.Locality } // AddressResolver exposes API for retrieving the instance address and all live instances for a tenant. diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index c10a70216338..0de9e9b96bfd 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "upgrades", srcs = [ + "alter_sql_instances_locality.go", "alter_table_protected_timestamp_records.go", "alter_table_statistics_avg_size.go", "comment_on_index_migration.go", @@ -67,6 +68,7 @@ go_test( name = "upgrades_test", size = "large", srcs = [ + "alter_sql_instances_locality_test.go", "alter_table_protected_timestamp_records_test.go", "alter_table_statistics_avg_size_test.go", "builtins_test.go", @@ -114,6 +116,7 @@ go_test( "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/privilege", + "//pkg/sql/sem/catconstants", "//pkg/sql/sem/tree", "//pkg/sql/sqlutil", "//pkg/sql/types", diff --git a/pkg/upgrade/upgrades/alter_sql_instances_locality.go b/pkg/upgrade/upgrades/alter_sql_instances_locality.go new file mode 100644 index 000000000000..68f34b57474f --- /dev/null +++ b/pkg/upgrade/upgrades/alter_sql_instances_locality.go @@ -0,0 +1,42 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/upgrade" +) + +const addLocalityCol = ` +ALTER TABLE system.sql_instances +ADD COLUMN IF NOT EXISTS "locality" JSONB +FAMILY "primary" +` + +func alterSystemSQLInstancesAddLocality( + ctx context.Context, cs clusterversion.ClusterVersion, d upgrade.TenantDeps, _ *jobs.Job, +) error { + op := operation{ + name: "add-sql-instances-locality-col", + schemaList: []string{"total_consumption"}, + query: addLocalityCol, + schemaExistsFn: hasColumn, + } + if err := migrateTable(ctx, cs, d, op, keys.SQLInstancesTableID, systemschema.SQLInstancesTable); err != nil { + return err + } + return nil +} diff --git a/pkg/upgrade/upgrades/alter_sql_instances_locality_test.go b/pkg/upgrade/upgrades/alter_sql_instances_locality_test.go new file mode 100644 index 000000000000..3f3aa7d96604 --- /dev/null +++ b/pkg/upgrade/upgrades/alter_sql_instances_locality_test.go @@ -0,0 +1,141 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +func TestAlterSystemSqlInstancesTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.AlterSystemSQLInstancesAddLocality - 1), + }, + }, + }, + } + + var ( + ctx = context.Background() + + tc = testcluster.StartTestCluster(t, 1, clusterArgs) + s = tc.Server(0) + sqlDB = tc.ServerConn(0) + ) + defer tc.Stopper().Stop(ctx) + + var ( + validationSchemas = []upgrades.Schema{ + {Name: "locality", ValidationFn: upgrades.HasColumn}, + {Name: "primary", ValidationFn: upgrades.HasColumnFamily}, + } + ) + + // Inject the old copy of the descriptor. + upgrades.InjectLegacyTable(ctx, t, s, systemschema.SQLInstancesTable, getDeprecatedSqlInstancesDescriptor) + // Validate that the table sql_instances has the old schema. + upgrades.ValidateSchemaExists( + ctx, + t, + s, + sqlDB, + keys.SQLInstancesTableID, + systemschema.SQLInstancesTable, + []string{}, + validationSchemas, + false, /* expectExists */ + ) + // Run the upgrade. + upgrades.Upgrade( + t, + sqlDB, + clusterversion.AlterSystemSQLInstancesAddLocality, + nil, /* done */ + false, /* expectError */ + ) + // Validate that the table has new schema. + upgrades.ValidateSchemaExists( + ctx, + t, + s, + sqlDB, + keys.SQLInstancesTableID, + systemschema.SQLInstancesTable, + []string{}, + validationSchemas, + true, /* expectExists */ + ) +} + +// getDeprecatedSqlInstancesDescriptor returns the system.sql_instances +// table descriptor that was being used before adding a new column in the +// current version. +func getDeprecatedSqlInstancesDescriptor() *descpb.TableDescriptor { + return &descpb.TableDescriptor{ + Name: string(catconstants.SQLInstancesTableName), + ID: keys.SQLInstancesTableID, + ParentID: keys.SystemDatabaseID, + UnexposedParentSchemaID: keys.PublicSchemaID, + Version: 1, + Columns: []descpb.ColumnDescriptor{ + {Name: "id", ID: 1, Type: types.Int, Nullable: false}, + {Name: "addr", ID: 2, Type: types.String, Nullable: true}, + {Name: "session_id", ID: 3, Type: types.Bytes, Nullable: true}, + }, + NextColumnID: 4, + Families: []descpb.ColumnFamilyDescriptor{ + { + Name: "primary", + ID: 0, + ColumnNames: []string{"id", "addr", "session_id"}, + ColumnIDs: []descpb.ColumnID{1, 2, 3}, + DefaultColumnID: 0, + }, + }, + NextFamilyID: 1, + PrimaryIndex: descpb.IndexDescriptor{ + Name: "id", + ID: 1, + Unique: true, + KeyColumnNames: []string{"id"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{1}, + }, + NextIndexID: 2, + Privileges: catpb.NewCustomSuperuserPrivilegeDescriptor(privilege.ReadWriteData, username.NodeUserName()), + NextMutationID: 1, + FormatVersion: 3, + } +} diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index 623e5c34aab2..ef90fe21477e 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -143,6 +143,12 @@ var upgrades = []upgrade.Upgrade{ NoPrecondition, systemPrivilegesTableMigration, ), + upgrade.NewTenantUpgrade( + "add column locality to table system.sql_instances", + toCV(clusterversion.AlterSystemSQLInstancesAddLocality), + NoPrecondition, + alterSystemSQLInstancesAddLocality, + ), } func init() { diff --git a/pkg/util/json/json.go b/pkg/util/json/json.go index b0aff4f817ca..07be38feaf7c 100644 --- a/pkg/util/json/json.go +++ b/pkg/util/json/json.go @@ -389,6 +389,33 @@ func (b *ObjectBuilder) Build() JSON { return jsonObject(sorter.pairs) } +// BuildUnsorted creates a JSON object that uniqueifies the keys, but leaves +// the keys in their original order instead of sorting them by key. +func (b *ObjectBuilder) BuildUnsorted() JSON { + if b.pairs == nil { + panic(errors.AssertionFailedf(msgModifyAfterBuild)) + } + orders := make([]int, len(b.pairs)) + for i := range orders { + orders[i] = i + } + sorter := pairSorter{ + pairs: b.pairs, + orders: orders, + hasNonUnique: false, + } + b.pairs = nil + // Uniqueify the objects, but then sort back into the original order. + sort.Sort(&sorter) + sorter.unique() + orderSorter := pairOrderSorter{ + pairs: sorter.pairs, + orders: sorter.orders, + } + sort.Sort(&orderSorter) + return jsonObject(orderSorter.pairs) +} + // pairSorter sorts and uniqueifies JSON pairs. In order to keep // the last one for pairs with the same key while sort.Sort is // not stable, pairSorter uses []int orders to maintain order and @@ -440,6 +467,23 @@ func (s *pairSorter) unique() { s.pairs = s.pairs[:top+1] } +type pairOrderSorter struct { + pairs []jsonKeyValuePair + orders []int +} + +func (s *pairOrderSorter) Len() int { + return len(s.pairs) +} + +func (s *pairOrderSorter) Less(i, j int) bool { + return s.orders[i] > s.orders[j] +} + +func (s *pairOrderSorter) Swap(i, j int) { + s.pairs[i], s.orders[i], s.pairs[j], s.orders[j] = s.pairs[j], s.orders[j], s.pairs[i], s.orders[i] +} + // jsonObject represents a JSON object as a sorted-by-key list of key-value // pairs, which are unique by key. type jsonObject []jsonKeyValuePair