diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 5ee5800809bf..53dc22a23e92 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -90,6 +90,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" @@ -476,11 +477,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // instance provider without initializing the instance, since this is not a // SQL pod server. _, isMixedSQLAndKVNode := cfg.nodeIDContainer.OptionalNodeID() - isSQLPod := !isMixedSQLAndKVNode sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs) var sessionEventsConsumer slinstance.SessionEventListener - if isSQLPod { + if !isMixedSQLAndKVNode { // For SQL pods, we want the process to shutdown when the session liveness // record is found to be deleted. This is because, if the session is // deleted, the instance ID used by this server may have been stolen by @@ -494,19 +494,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, sessionEventsConsumer, ) - if isSQLPod { - if codec.ForSystemTenant() { - return nil, errors.AssertionFailedf("non-system codec used for SQL pod") - } - - cfg.sqlInstanceStorage = instancestorage.NewStorage( - cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) - cfg.sqlInstanceReader = instancestorage.NewReader( - cfg.sqlInstanceStorage, - cfg.sqlLivenessProvider, - cfg.rangeFeedFactory, - codec, cfg.clock, cfg.stopper) + cfg.sqlInstanceStorage = instancestorage.NewStorage( + cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) + cfg.sqlInstanceReader = instancestorage.NewReader( + cfg.sqlInstanceStorage, + cfg.sqlLivenessProvider, + cfg.rangeFeedFactory, + codec, cfg.clock, cfg.stopper) + if isMixedSQLAndKVNode { + cfg.podNodeDialer = cfg.nodeDialer + } else { // In a multi-tenant environment, use the sqlInstanceReader to resolve // SQL pod addresses. addressResolver := func(nodeID roachpb.NodeID) (net.Addr, error) { @@ -517,11 +515,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { return &util.UnresolvedAddr{AddressField: info.InstanceRPCAddr}, nil } cfg.podNodeDialer = nodedialer.New(cfg.rpcContext, addressResolver) - } else { - if !codec.ForSystemTenant() { - return nil, errors.AssertionFailedf("system codec used for SQL-only node") - } - cfg.podNodeDialer = cfg.nodeDialer } jobRegistry := cfg.circularJobRegistry @@ -815,8 +808,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // cluster. var getNodes func(ctx context.Context) ([]roachpb.NodeID, error) var nodeDialer *nodedialer.Dialer - if !isSQLPod { + if isMixedSQLAndKVNode { nodeDialer = cfg.nodeDialer + // TODO(dt): any reason not to just always use the instance reader? And just + // pass it directly instead of making a new closure here? getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) { var ns []roachpb.NodeID ls, err := nodeLiveness.GetLivenessesFromKV(ctx) @@ -1346,26 +1341,35 @@ func (s *SQLServer) preStart( // Start the sql liveness subsystem. We'll need it to get a session. s.sqlLivenessProvider.Start(ctx, regionPhysicalRep) - _, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() - isTenant := !isMixedSQLAndKVNode + session, err := s.sqlLivenessProvider.Session(ctx) + if err != nil { + return err + } + // Start instance ID reclaim loop. + if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( + ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration, + ); err != nil { + return err + } + nodeId, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() - if isTenant { - session, err := s.sqlLivenessProvider.Session(ctx) + var instance sqlinstance.InstanceInfo + if isMixedSQLAndKVNode { + // Write/acquire our instance row. + instance, err = s.sqlInstanceStorage.CreateNodeInstance( + ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, nodeId) if err != nil { return err } - // Start instance ID reclaim loop. - if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( - ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration, - ); err != nil { - return err - } - // Acquire our instance row. - instance, err := s.sqlInstanceStorage.CreateInstance( + } else { + instance, err = s.sqlInstanceStorage.CreateInstance( ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality) if err != nil { return err } + } + + if !isMixedSQLAndKVNode { // TODO(andrei): Release the instance ID on server shutdown. It is not trivial // to determine where/when exactly to do that, though. Doing it after stopper // quiescing doesn't work. Doing it too soon, for example as part of draining, @@ -1379,13 +1383,13 @@ func (s *SQLServer) preStart( if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil { return err } - // Start the instance provider. This needs to come after we've allocated our - // instance ID because the instances reader needs to see our own instance; - // we might be the only SQL server available, especially when we have not - // received data from the rangefeed yet, and if the reader doesn't see - // it, we'd be unable to plan any queries. - s.sqlInstanceReader.Start(ctx, instance) } + // Start the instance provider. This needs to come after we've allocated our + // instance ID because the instances reader needs to see our own instance; + // we might be the only SQL server available, especially when we have not + // received data from the rangefeed yet, and if the reader doesn't see + // it, we'd be unable to plan any queries. + s.sqlInstanceReader.Start(ctx, instance) s.execCfg.GCJobNotifier.Start(ctx) s.temporaryObjectCleaner.Start(ctx, stopper) diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage.go b/pkg/sql/sqlinstance/instancestorage/instancestorage.go index b75c3570c9aa..18fb3084a3dd 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage.go @@ -135,6 +135,22 @@ func NewStorage( return NewTestingStorage(db, codec, keys.SQLInstancesTableID, slReader, settings) } +// CreateNodeInstance claims a unique instance identifier for the SQL pod, and +// associates it with its SQL address and session information. +func (s *Storage) CreateNodeInstance( + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + rpcAddr string, + sqlAddr string, + locality roachpb.Locality, + nodeID roachpb.NodeID, +) (instance sqlinstance.InstanceInfo, _ error) { + return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, nodeID) +} + +const noNodeID = 0 + // CreateInstance claims a unique instance identifier for the SQL pod, and // associates it with its SQL address and session information. func (s *Storage) CreateInstance( @@ -145,6 +161,19 @@ func (s *Storage) CreateInstance( sqlAddr string, locality roachpb.Locality, ) (instance sqlinstance.InstanceInfo, _ error) { + return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, noNodeID) +} + +func (s *Storage) createInstanceRow( + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + rpcAddr string, + sqlAddr string, + locality roachpb.Locality, + nodeID roachpb.NodeID, +) (instance sqlinstance.InstanceInfo, _ error) { + if len(sqlAddr) == 0 || len(rpcAddr) == 0 { return sqlinstance.InstanceInfo{}, errors.AssertionFailedf("missing sql or rpc address information for instance") } @@ -177,11 +206,21 @@ func (s *Storage) CreateInstance( return err } - // Try to retrieve an available instance ID. This blocks until one - // is available. - availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) - if err != nil { - return err + // TODO(dt): do we need this at all? this keeps nodeID == instanceID when + // running mixed KV and SQL nodes, but bakes in the assumption that any + // clusters where this happens will contain _only_ mixed KV and SQL nodes + // and thus do not need to worry about finding an _actually_ available ID + // and avoiding conflicts. This is true today but may not be in more + // complex deployments. + if nodeID != noNodeID { + availableID = base.SQLInstanceID(nodeID) + } else { + // Try to retrieve an available instance ID. This blocks until one + // is available. + availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) + if err != nil { + return err + } } key := s.rowcodec.encodeKey(region, availableID)