diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 5a6bf9e87dc5..a62c1fe45827 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -89,6 +89,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" @@ -443,6 +444,33 @@ func (s *stopperSessionEventListener) OnSessionDeleted( return false } +type refreshInstanceSessionListener struct { + cfg *sqlServerArgs +} + +var _ slinstance.SessionEventListener = &stopperSessionEventListener{} + +// OnSessionDeleted implements the slinstance.SessionEventListener interface. +func (r *refreshInstanceSessionListener) OnSessionDeleted( + ctx context.Context, +) (createAnotherSession bool) { + if err := r.cfg.stopper.RunAsyncTask(ctx, "refresh-instance-session", func(context.Context) { + nodeID, _ := r.cfg.nodeIDContainer.OptionalNodeID() + s, err := r.cfg.sqlLivenessProvider.Session(ctx) + if err != nil { + log.Errorf(ctx, "faild to get new liveness session ID: %v", err) + } + if _, err := r.cfg.sqlInstanceStorage.CreateNodeInstance( + ctx, s.ID(), s.Expiration(), r.cfg.AdvertiseAddr, r.cfg.SQLAdvertiseAddr, r.cfg.Locality, nodeID, + ); err != nil { + log.Errorf(ctx, "failed to update instance with new session ID: %v", err) + } + }); err != nil { + log.Errorf(ctx, "failed to run update of instance with new session ID: %v", err) + } + return true +} + // newSQLServer constructs a new SQLServer. The caller is responsible for // listening to the server's ShutdownRequested() channel (which is the same as // cfg.stopTrigger.C()) and stopping cfg.stopper when signaled. @@ -483,11 +511,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // instance provider without initializing the instance, since this is not a // SQL pod server. _, isMixedSQLAndKVNode := cfg.nodeIDContainer.OptionalNodeID() - isSQLPod := !isMixedSQLAndKVNode sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs) var sessionEventsConsumer slinstance.SessionEventListener - if isSQLPod { + if !isMixedSQLAndKVNode { // For SQL pods, we want the process to shutdown when the session liveness // record is found to be deleted. This is because, if the session is // deleted, the instance ID used by this server may have been stolen by @@ -495,25 +522,25 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // use the instance ID anymore, and there's no mechanism for allocating a // new one after startup. sessionEventsConsumer = &stopperSessionEventListener{trigger: cfg.stopTrigger} + } else { + sessionEventsConsumer = &refreshInstanceSessionListener{cfg: &cfg} } cfg.sqlLivenessProvider = slprovider.New( cfg.AmbientCtx, cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, sessionEventsConsumer, ) - if isSQLPod { - if codec.ForSystemTenant() { - return nil, errors.AssertionFailedf("non-system codec used for SQL pod") - } - - cfg.sqlInstanceStorage = instancestorage.NewStorage( - cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) - cfg.sqlInstanceReader = instancestorage.NewReader( - cfg.sqlInstanceStorage, - cfg.sqlLivenessProvider, - cfg.rangeFeedFactory, - codec, cfg.clock, cfg.stopper) + cfg.sqlInstanceStorage = instancestorage.NewStorage( + cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) + cfg.sqlInstanceReader = instancestorage.NewReader( + cfg.sqlInstanceStorage, + cfg.sqlLivenessProvider, + cfg.rangeFeedFactory, + codec, cfg.clock, cfg.stopper) + if isMixedSQLAndKVNode { + cfg.podNodeDialer = cfg.nodeDialer + } else { // In a multi-tenant environment, use the sqlInstanceReader to resolve // SQL pod addresses. addressResolver := func(nodeID roachpb.NodeID) (net.Addr, error) { @@ -524,11 +551,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { return &util.UnresolvedAddr{AddressField: info.InstanceRPCAddr}, nil } cfg.podNodeDialer = nodedialer.New(cfg.rpcContext, addressResolver) - } else { - if !codec.ForSystemTenant() { - return nil, errors.AssertionFailedf("system codec used for SQL-only node") - } - cfg.podNodeDialer = cfg.nodeDialer } jobRegistry := cfg.circularJobRegistry @@ -818,8 +840,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // cluster. var getNodes func(ctx context.Context) ([]roachpb.NodeID, error) var nodeDialer *nodedialer.Dialer - if !isSQLPod { + if isMixedSQLAndKVNode { nodeDialer = cfg.nodeDialer + // TODO(dt): any reason not to just always use the instance reader? And just + // pass it directly instead of making a new closure here? getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) { var ns []roachpb.NodeID ls, err := nodeLiveness.GetLivenessesFromKV(ctx) @@ -1339,26 +1363,37 @@ func (s *SQLServer) preStart( // Start the sql liveness subsystem. We'll need it to get a session. s.sqlLivenessProvider.Start(ctx, regionPhysicalRep) - _, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() - isTenant := !isMixedSQLAndKVNode + session, err := s.sqlLivenessProvider.Session(ctx) + if err != nil { + return err + } + // Start instance ID reclaim loop. + if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( + ctx, stopper, timeutil.DefaultTimeSource{}, s.internalDB, session.Expiration, + ); err != nil { + return err + } + nodeId, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() - if isTenant { - session, err := s.sqlLivenessProvider.Session(ctx) + var instance sqlinstance.InstanceInfo + if isMixedSQLAndKVNode { + // Write/acquire our instance row. + instance, err = s.sqlInstanceStorage.CreateNodeInstance( + ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, nodeId, + ) if err != nil { return err } - // Start instance ID reclaim loop. - if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( - ctx, stopper, timeutil.DefaultTimeSource{}, s.internalDB, session.Expiration, - ); err != nil { - return err - } - // Acquire our instance row. - instance, err := s.sqlInstanceStorage.CreateInstance( - ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality) + } else { + instance, err = s.sqlInstanceStorage.CreateInstance( + ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, + ) if err != nil { return err } + } + + if !isMixedSQLAndKVNode { // TODO(andrei): Release the instance ID on server shutdown. It is not trivial // to determine where/when exactly to do that, though. Doing it after stopper // quiescing doesn't work. Doing it too soon, for example as part of draining, @@ -1372,13 +1407,13 @@ func (s *SQLServer) preStart( if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil { return err } - // Start the instance provider. This needs to come after we've allocated our - // instance ID because the instances reader needs to see our own instance; - // we might be the only SQL server available, especially when we have not - // received data from the rangefeed yet, and if the reader doesn't see - // it, we'd be unable to plan any queries. - s.sqlInstanceReader.Start(ctx, instance) } + // Start the instance provider. This needs to come after we've allocated our + // instance ID because the instances reader needs to see our own instance; + // we might be the only SQL server available, especially when we have not + // received data from the rangefeed yet, and if the reader doesn't see + // it, we'd be unable to plan any queries. + s.sqlInstanceReader.Start(ctx, instance) s.execCfg.GCJobNotifier.Start(ctx) s.temporaryObjectCleaner.Start(ctx, stopper) diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage.go b/pkg/sql/sqlinstance/instancestorage/instancestorage.go index 22be76e5a243..7c3da16529d6 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage.go @@ -135,6 +135,22 @@ func NewStorage( return NewTestingStorage(db, codec, keys.SQLInstancesTableID, slReader, settings) } +// CreateNodeInstance claims a unique instance identifier for the SQL pod, and +// associates it with its SQL address and session information. +func (s *Storage) CreateNodeInstance( + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + rpcAddr string, + sqlAddr string, + locality roachpb.Locality, + nodeID roachpb.NodeID, +) (instance sqlinstance.InstanceInfo, _ error) { + return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, nodeID) +} + +const noNodeID = 0 + // CreateInstance claims a unique instance identifier for the SQL pod, and // associates it with its SQL address and session information. func (s *Storage) CreateInstance( @@ -144,6 +160,18 @@ func (s *Storage) CreateInstance( rpcAddr string, sqlAddr string, locality roachpb.Locality, +) (instance sqlinstance.InstanceInfo, _ error) { + return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, noNodeID) +} + +func (s *Storage) createInstanceRow( + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + rpcAddr string, + sqlAddr string, + locality roachpb.Locality, + nodeID roachpb.NodeID, ) (instance sqlinstance.InstanceInfo, _ error) { if len(sqlAddr) == 0 || len(rpcAddr) == 0 { return sqlinstance.InstanceInfo{}, errors.AssertionFailedf("missing sql or rpc address information for instance") @@ -177,11 +205,21 @@ func (s *Storage) CreateInstance( return err } - // Try to retrieve an available instance ID. This blocks until one - // is available. - availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) - if err != nil { - return err + // TODO(dt): do we need this at all? this keeps nodeID == instanceID when + // running mixed KV and SQL nodes, but bakes in the assumption that any + // clusters where this happens will contain _only_ mixed KV and SQL nodes + // and thus do not need to worry about finding an _actually_ available ID + // and avoiding conflicts. This is true today but may not be in more + // complex deployments. + if nodeID != noNodeID { + availableID = base.SQLInstanceID(nodeID) + } else { + // Try to retrieve an available instance ID. This blocks until one + // is available. + availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) + if err != nil { + return err + } } key := s.rowcodec.encodeKey(region, availableID) diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go index 6ff841f18a47..81337c8013ed 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go @@ -339,6 +339,52 @@ func TestSQLAccess(t *testing.T) { require.NoError(t, rows.Err()) } +func TestRefreshSession(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "abc", Value: "xyz"}}}}) + defer s.Stopper().Stop(ctx) + + c1 := sqlutils.MakeSQLRunner(sqlDB) + + // Everything but the session should stay the same so observe the initial row. + rowBeforeNoSession := c1.QueryStr(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1") + require.Len(t, rowBeforeNoSession, 1) + + // This initial session should go away once we expire it below, but let's + // verify it is there for starters and remember it. + sess := c1.QueryStr(t, "SELECT encode(session_id, 'hex') FROM system.sql_instances WHERE id = 1") + require.Len(t, sess, 1) + require.Len(t, sess[0][0], 38) + + // First let's delete the instance AND expire the session; the instance should + // reappear when a new session is acquired, with the new session. + c1.ExecRowsAffected(t, 1, "DELETE FROM system.sql_instances WHERE session_id = decode($1, 'hex')", sess[0][0]) + c1.ExecRowsAffected(t, 1, "DELETE FROM system.sqlliveness WHERE session_id = decode($1, 'hex')", sess[0][0]) + + // Wait until we see the right row appear. + query := fmt.Sprintf(`SELECT count(*) FROM system.sql_instances WHERE id = 1 AND session_id <> decode('%s', 'hex')`, sess[0][0]) + c1.CheckQueryResultsRetry(t, query, [][]string{{"1"}}) + + // Verify that everything else is the same after recreate. + c1.CheckQueryResults(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1", rowBeforeNoSession) + + sess = c1.QueryStr(t, "SELECT encode(session_id, 'hex') FROM system.sql_instances WHERE id = 1") + // Now let's just expire the session and leave the row; the instance row + // should still become correct once it is updated with the new session. + c1.ExecRowsAffected(t, 1, "DELETE FROM system.sqlliveness WHERE session_id = decode($1, 'hex')", sess[0][0]) + + // Wait until we see the right row appear. + query = fmt.Sprintf(`SELECT count(*) FROM system.sql_instances WHERE id = 1 AND session_id <> decode('%s', 'hex')`, sess[0][0]) + c1.CheckQueryResultsRetry(t, query, [][]string{{"1"}}) + + // Verify everything else is still the same after update. + c1.CheckQueryResults(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1", rowBeforeNoSession) + +} + // TestConcurrentCreateAndRelease verifies that concurrent access to instancestorage // to create and release SQL instance IDs works as expected. func TestConcurrentCreateAndRelease(t *testing.T) {