From a2b455be6f4e396fdd71c528cfad25f968a6ca20 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 17 Jan 2023 14:21:24 +0000 Subject: [PATCH] server: always enable sql_instances maintenance Previously the system.sql_instances table was only maintained by SQL servers that were operating in "pod" mode, i.e. not in mixed KV and SQL process nodes, where KV-level liveness and gossip provides an alternative means of node discovery that can be used by the SQL layer when searching for other SQL instances. However this inconsistency makes writing correct remote-node discovery and interaction SQL-level code difficult: in some cases such code needs to consult the instances list, and in some the KV liveness store, which when combined with complexities of doing so around initialization, dependency-injection, etc can become hard to maintain. Additionally such a design precludes a cluster where some SQL instances are in mixed KV nodes and some are not, as the non-KV nodes would have no way discover the KV ones. Such deployments are not currently possible but could be in the future. Instead, this change enabled maintenance of the sql_instances table by all SQL servers, whether running in their own processes or embedded in a KV storage node process. This paves the way for making the means of discovery of SQL servers uniform across all SQL server types: they will all be able to simply consult the instances list, to find any other SQL servers, regardless of where those SQL servers are running. A follow-up change could simplify DistSQLPhysicalPlanner, specifically the SetupAllNodesPlanning method that has two different implementations due to the previous inconsistency in the available APIs. Release note: none. Epic: none. --- pkg/server/server_sql.go | 80 ++++++++++--------- .../instancestorage/instancestorage.go | 49 ++++++++++-- 2 files changed, 86 insertions(+), 43 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 5ee5800809bf..53dc22a23e92 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -90,6 +90,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" @@ -476,11 +477,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // instance provider without initializing the instance, since this is not a // SQL pod server. _, isMixedSQLAndKVNode := cfg.nodeIDContainer.OptionalNodeID() - isSQLPod := !isMixedSQLAndKVNode sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs) var sessionEventsConsumer slinstance.SessionEventListener - if isSQLPod { + if !isMixedSQLAndKVNode { // For SQL pods, we want the process to shutdown when the session liveness // record is found to be deleted. This is because, if the session is // deleted, the instance ID used by this server may have been stolen by @@ -494,19 +494,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, sessionEventsConsumer, ) - if isSQLPod { - if codec.ForSystemTenant() { - return nil, errors.AssertionFailedf("non-system codec used for SQL pod") - } - - cfg.sqlInstanceStorage = instancestorage.NewStorage( - cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) - cfg.sqlInstanceReader = instancestorage.NewReader( - cfg.sqlInstanceStorage, - cfg.sqlLivenessProvider, - cfg.rangeFeedFactory, - codec, cfg.clock, cfg.stopper) + cfg.sqlInstanceStorage = instancestorage.NewStorage( + cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) + cfg.sqlInstanceReader = instancestorage.NewReader( + cfg.sqlInstanceStorage, + cfg.sqlLivenessProvider, + cfg.rangeFeedFactory, + codec, cfg.clock, cfg.stopper) + if isMixedSQLAndKVNode { + cfg.podNodeDialer = cfg.nodeDialer + } else { // In a multi-tenant environment, use the sqlInstanceReader to resolve // SQL pod addresses. addressResolver := func(nodeID roachpb.NodeID) (net.Addr, error) { @@ -517,11 +515,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { return &util.UnresolvedAddr{AddressField: info.InstanceRPCAddr}, nil } cfg.podNodeDialer = nodedialer.New(cfg.rpcContext, addressResolver) - } else { - if !codec.ForSystemTenant() { - return nil, errors.AssertionFailedf("system codec used for SQL-only node") - } - cfg.podNodeDialer = cfg.nodeDialer } jobRegistry := cfg.circularJobRegistry @@ -815,8 +808,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // cluster. var getNodes func(ctx context.Context) ([]roachpb.NodeID, error) var nodeDialer *nodedialer.Dialer - if !isSQLPod { + if isMixedSQLAndKVNode { nodeDialer = cfg.nodeDialer + // TODO(dt): any reason not to just always use the instance reader? And just + // pass it directly instead of making a new closure here? getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) { var ns []roachpb.NodeID ls, err := nodeLiveness.GetLivenessesFromKV(ctx) @@ -1346,26 +1341,35 @@ func (s *SQLServer) preStart( // Start the sql liveness subsystem. We'll need it to get a session. s.sqlLivenessProvider.Start(ctx, regionPhysicalRep) - _, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() - isTenant := !isMixedSQLAndKVNode + session, err := s.sqlLivenessProvider.Session(ctx) + if err != nil { + return err + } + // Start instance ID reclaim loop. + if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( + ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration, + ); err != nil { + return err + } + nodeId, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() - if isTenant { - session, err := s.sqlLivenessProvider.Session(ctx) + var instance sqlinstance.InstanceInfo + if isMixedSQLAndKVNode { + // Write/acquire our instance row. + instance, err = s.sqlInstanceStorage.CreateNodeInstance( + ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, nodeId) if err != nil { return err } - // Start instance ID reclaim loop. - if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( - ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration, - ); err != nil { - return err - } - // Acquire our instance row. - instance, err := s.sqlInstanceStorage.CreateInstance( + } else { + instance, err = s.sqlInstanceStorage.CreateInstance( ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality) if err != nil { return err } + } + + if !isMixedSQLAndKVNode { // TODO(andrei): Release the instance ID on server shutdown. It is not trivial // to determine where/when exactly to do that, though. Doing it after stopper // quiescing doesn't work. Doing it too soon, for example as part of draining, @@ -1379,13 +1383,13 @@ func (s *SQLServer) preStart( if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil { return err } - // Start the instance provider. This needs to come after we've allocated our - // instance ID because the instances reader needs to see our own instance; - // we might be the only SQL server available, especially when we have not - // received data from the rangefeed yet, and if the reader doesn't see - // it, we'd be unable to plan any queries. - s.sqlInstanceReader.Start(ctx, instance) } + // Start the instance provider. This needs to come after we've allocated our + // instance ID because the instances reader needs to see our own instance; + // we might be the only SQL server available, especially when we have not + // received data from the rangefeed yet, and if the reader doesn't see + // it, we'd be unable to plan any queries. + s.sqlInstanceReader.Start(ctx, instance) s.execCfg.GCJobNotifier.Start(ctx) s.temporaryObjectCleaner.Start(ctx, stopper) diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage.go b/pkg/sql/sqlinstance/instancestorage/instancestorage.go index b75c3570c9aa..18fb3084a3dd 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage.go @@ -135,6 +135,22 @@ func NewStorage( return NewTestingStorage(db, codec, keys.SQLInstancesTableID, slReader, settings) } +// CreateNodeInstance claims a unique instance identifier for the SQL pod, and +// associates it with its SQL address and session information. +func (s *Storage) CreateNodeInstance( + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + rpcAddr string, + sqlAddr string, + locality roachpb.Locality, + nodeID roachpb.NodeID, +) (instance sqlinstance.InstanceInfo, _ error) { + return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, nodeID) +} + +const noNodeID = 0 + // CreateInstance claims a unique instance identifier for the SQL pod, and // associates it with its SQL address and session information. func (s *Storage) CreateInstance( @@ -145,6 +161,19 @@ func (s *Storage) CreateInstance( sqlAddr string, locality roachpb.Locality, ) (instance sqlinstance.InstanceInfo, _ error) { + return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, noNodeID) +} + +func (s *Storage) createInstanceRow( + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + rpcAddr string, + sqlAddr string, + locality roachpb.Locality, + nodeID roachpb.NodeID, +) (instance sqlinstance.InstanceInfo, _ error) { + if len(sqlAddr) == 0 || len(rpcAddr) == 0 { return sqlinstance.InstanceInfo{}, errors.AssertionFailedf("missing sql or rpc address information for instance") } @@ -177,11 +206,21 @@ func (s *Storage) CreateInstance( return err } - // Try to retrieve an available instance ID. This blocks until one - // is available. - availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) - if err != nil { - return err + // TODO(dt): do we need this at all? this keeps nodeID == instanceID when + // running mixed KV and SQL nodes, but bakes in the assumption that any + // clusters where this happens will contain _only_ mixed KV and SQL nodes + // and thus do not need to worry about finding an _actually_ available ID + // and avoiding conflicts. This is true today but may not be in more + // complex deployments. + if nodeID != noNodeID { + availableID = base.SQLInstanceID(nodeID) + } else { + // Try to retrieve an available instance ID. This blocks until one + // is available. + availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) + if err != nil { + return err + } } key := s.rowcodec.encodeKey(region, availableID)