From a788bfb5af1849214f97643ea15086d87e5877a5 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 17 Jan 2023 14:21:24 +0000 Subject: [PATCH 1/2] server: always enable sql_instances maintenance Previously the system.sql_instances table was only maintained by SQL servers that were operating in "pod" mode, i.e. not in mixed KV and SQL process nodes, where KV-level liveness and gossip provides an alternative means of node discovery that can be used by the SQL layer when searching for other SQL instances. However this inconsistency makes writing correct remote-node discovery and interaction SQL-level code difficult: in some cases such code needs to consult the instances list, and in some the KV liveness store, which when combined with complexities of doing so around initialization, dependency-injection, etc can become hard to maintain. Additionally such a design precludes a cluster where some SQL instances are in mixed KV nodes and some are not, as the non-KV nodes would have no way discover the KV ones. Such deployments are not currently possible but could be in the future. Instead, this change enabled maintenance of the sql_instances table by all SQL servers, whether running in their own processes or embedded in a KV storage node process. This paves the way for making the means of discovery of SQL servers uniform across all SQL server types: they will all be able to simply consult the instances list, to find any other SQL servers, regardless of where those SQL servers are running. A follow-up change could simplify DistSQLPhysicalPlanner, specifically the SetupAllNodesPlanning method that has two different implementations due to the previous inconsistency in the available APIs. Release note: none. Epic: none. --- pkg/server/server_sql.go | 84 ++++++++++--------- .../instancestorage/instancestorage.go | 48 +++++++++-- 2 files changed, 88 insertions(+), 44 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 5a6bf9e87dc5..595f10678cd0 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -89,6 +89,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" @@ -483,11 +484,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // instance provider without initializing the instance, since this is not a // SQL pod server. _, isMixedSQLAndKVNode := cfg.nodeIDContainer.OptionalNodeID() - isSQLPod := !isMixedSQLAndKVNode sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs) var sessionEventsConsumer slinstance.SessionEventListener - if isSQLPod { + if !isMixedSQLAndKVNode { // For SQL pods, we want the process to shutdown when the session liveness // record is found to be deleted. This is because, if the session is // deleted, the instance ID used by this server may have been stolen by @@ -501,19 +501,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, sessionEventsConsumer, ) - if isSQLPod { - if codec.ForSystemTenant() { - return nil, errors.AssertionFailedf("non-system codec used for SQL pod") - } - - cfg.sqlInstanceStorage = instancestorage.NewStorage( - cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) - cfg.sqlInstanceReader = instancestorage.NewReader( - cfg.sqlInstanceStorage, - cfg.sqlLivenessProvider, - cfg.rangeFeedFactory, - codec, cfg.clock, cfg.stopper) + cfg.sqlInstanceStorage = instancestorage.NewStorage( + cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) + cfg.sqlInstanceReader = instancestorage.NewReader( + cfg.sqlInstanceStorage, + cfg.sqlLivenessProvider, + cfg.rangeFeedFactory, + codec, cfg.clock, cfg.stopper) + if isMixedSQLAndKVNode { + cfg.podNodeDialer = cfg.nodeDialer + } else { // In a multi-tenant environment, use the sqlInstanceReader to resolve // SQL pod addresses. addressResolver := func(nodeID roachpb.NodeID) (net.Addr, error) { @@ -524,11 +522,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { return &util.UnresolvedAddr{AddressField: info.InstanceRPCAddr}, nil } cfg.podNodeDialer = nodedialer.New(cfg.rpcContext, addressResolver) - } else { - if !codec.ForSystemTenant() { - return nil, errors.AssertionFailedf("system codec used for SQL-only node") - } - cfg.podNodeDialer = cfg.nodeDialer } jobRegistry := cfg.circularJobRegistry @@ -818,8 +811,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // cluster. var getNodes func(ctx context.Context) ([]roachpb.NodeID, error) var nodeDialer *nodedialer.Dialer - if !isSQLPod { + if isMixedSQLAndKVNode { nodeDialer = cfg.nodeDialer + // TODO(dt): any reason not to just always use the instance reader? And just + // pass it directly instead of making a new closure here? getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) { var ns []roachpb.NodeID ls, err := nodeLiveness.GetLivenessesFromKV(ctx) @@ -1339,26 +1334,37 @@ func (s *SQLServer) preStart( // Start the sql liveness subsystem. We'll need it to get a session. s.sqlLivenessProvider.Start(ctx, regionPhysicalRep) - _, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() - isTenant := !isMixedSQLAndKVNode + session, err := s.sqlLivenessProvider.Session(ctx) + if err != nil { + return err + } + // Start instance ID reclaim loop. + if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( + ctx, stopper, timeutil.DefaultTimeSource{}, s.internalDB, session.Expiration, + ); err != nil { + return err + } + nodeId, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() - if isTenant { - session, err := s.sqlLivenessProvider.Session(ctx) + var instance sqlinstance.InstanceInfo + if isMixedSQLAndKVNode { + // Write/acquire our instance row. + instance, err = s.sqlInstanceStorage.CreateNodeInstance( + ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, nodeId, + ) if err != nil { return err } - // Start instance ID reclaim loop. - if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( - ctx, stopper, timeutil.DefaultTimeSource{}, s.internalDB, session.Expiration, - ); err != nil { - return err - } - // Acquire our instance row. - instance, err := s.sqlInstanceStorage.CreateInstance( - ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality) + } else { + instance, err = s.sqlInstanceStorage.CreateInstance( + ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, + ) if err != nil { return err } + } + + if !isMixedSQLAndKVNode { // TODO(andrei): Release the instance ID on server shutdown. It is not trivial // to determine where/when exactly to do that, though. Doing it after stopper // quiescing doesn't work. Doing it too soon, for example as part of draining, @@ -1372,13 +1378,13 @@ func (s *SQLServer) preStart( if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil { return err } - // Start the instance provider. This needs to come after we've allocated our - // instance ID because the instances reader needs to see our own instance; - // we might be the only SQL server available, especially when we have not - // received data from the rangefeed yet, and if the reader doesn't see - // it, we'd be unable to plan any queries. - s.sqlInstanceReader.Start(ctx, instance) } + // Start the instance provider. This needs to come after we've allocated our + // instance ID because the instances reader needs to see our own instance; + // we might be the only SQL server available, especially when we have not + // received data from the rangefeed yet, and if the reader doesn't see + // it, we'd be unable to plan any queries. + s.sqlInstanceReader.Start(ctx, instance) s.execCfg.GCJobNotifier.Start(ctx) s.temporaryObjectCleaner.Start(ctx, stopper) diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage.go b/pkg/sql/sqlinstance/instancestorage/instancestorage.go index 22be76e5a243..7c3da16529d6 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage.go @@ -135,6 +135,22 @@ func NewStorage( return NewTestingStorage(db, codec, keys.SQLInstancesTableID, slReader, settings) } +// CreateNodeInstance claims a unique instance identifier for the SQL pod, and +// associates it with its SQL address and session information. +func (s *Storage) CreateNodeInstance( + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + rpcAddr string, + sqlAddr string, + locality roachpb.Locality, + nodeID roachpb.NodeID, +) (instance sqlinstance.InstanceInfo, _ error) { + return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, nodeID) +} + +const noNodeID = 0 + // CreateInstance claims a unique instance identifier for the SQL pod, and // associates it with its SQL address and session information. func (s *Storage) CreateInstance( @@ -144,6 +160,18 @@ func (s *Storage) CreateInstance( rpcAddr string, sqlAddr string, locality roachpb.Locality, +) (instance sqlinstance.InstanceInfo, _ error) { + return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, noNodeID) +} + +func (s *Storage) createInstanceRow( + ctx context.Context, + sessionID sqlliveness.SessionID, + sessionExpiration hlc.Timestamp, + rpcAddr string, + sqlAddr string, + locality roachpb.Locality, + nodeID roachpb.NodeID, ) (instance sqlinstance.InstanceInfo, _ error) { if len(sqlAddr) == 0 || len(rpcAddr) == 0 { return sqlinstance.InstanceInfo{}, errors.AssertionFailedf("missing sql or rpc address information for instance") @@ -177,11 +205,21 @@ func (s *Storage) CreateInstance( return err } - // Try to retrieve an available instance ID. This blocks until one - // is available. - availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) - if err != nil { - return err + // TODO(dt): do we need this at all? this keeps nodeID == instanceID when + // running mixed KV and SQL nodes, but bakes in the assumption that any + // clusters where this happens will contain _only_ mixed KV and SQL nodes + // and thus do not need to worry about finding an _actually_ available ID + // and avoiding conflicts. This is true today but may not be in more + // complex deployments. + if nodeID != noNodeID { + availableID = base.SQLInstanceID(nodeID) + } else { + // Try to retrieve an available instance ID. This blocks until one + // is available. + availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) + if err != nil { + return err + } } key := s.rowcodec.encodeKey(region, availableID) From d916cc6a6c843334610508985f3ee9e3ac419f83 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 18 Jan 2023 00:59:08 +0000 Subject: [PATCH 2/2] server: refresh instance rows when liveness changes Release note: none. Epic: none. --- pkg/server/server_sql.go | 29 ++++++++++++ .../instancestorage/instancestorage_test.go | 46 +++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 595f10678cd0..a62c1fe45827 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -444,6 +444,33 @@ func (s *stopperSessionEventListener) OnSessionDeleted( return false } +type refreshInstanceSessionListener struct { + cfg *sqlServerArgs +} + +var _ slinstance.SessionEventListener = &stopperSessionEventListener{} + +// OnSessionDeleted implements the slinstance.SessionEventListener interface. +func (r *refreshInstanceSessionListener) OnSessionDeleted( + ctx context.Context, +) (createAnotherSession bool) { + if err := r.cfg.stopper.RunAsyncTask(ctx, "refresh-instance-session", func(context.Context) { + nodeID, _ := r.cfg.nodeIDContainer.OptionalNodeID() + s, err := r.cfg.sqlLivenessProvider.Session(ctx) + if err != nil { + log.Errorf(ctx, "faild to get new liveness session ID: %v", err) + } + if _, err := r.cfg.sqlInstanceStorage.CreateNodeInstance( + ctx, s.ID(), s.Expiration(), r.cfg.AdvertiseAddr, r.cfg.SQLAdvertiseAddr, r.cfg.Locality, nodeID, + ); err != nil { + log.Errorf(ctx, "failed to update instance with new session ID: %v", err) + } + }); err != nil { + log.Errorf(ctx, "failed to run update of instance with new session ID: %v", err) + } + return true +} + // newSQLServer constructs a new SQLServer. The caller is responsible for // listening to the server's ShutdownRequested() channel (which is the same as // cfg.stopTrigger.C()) and stopping cfg.stopper when signaled. @@ -495,6 +522,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // use the instance ID anymore, and there's no mechanism for allocating a // new one after startup. sessionEventsConsumer = &stopperSessionEventListener{trigger: cfg.stopTrigger} + } else { + sessionEventsConsumer = &refreshInstanceSessionListener{cfg: &cfg} } cfg.sqlLivenessProvider = slprovider.New( cfg.AmbientCtx, diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go index 6ff841f18a47..81337c8013ed 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go @@ -339,6 +339,52 @@ func TestSQLAccess(t *testing.T) { require.NoError(t, rows.Err()) } +func TestRefreshSession(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "abc", Value: "xyz"}}}}) + defer s.Stopper().Stop(ctx) + + c1 := sqlutils.MakeSQLRunner(sqlDB) + + // Everything but the session should stay the same so observe the initial row. + rowBeforeNoSession := c1.QueryStr(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1") + require.Len(t, rowBeforeNoSession, 1) + + // This initial session should go away once we expire it below, but let's + // verify it is there for starters and remember it. + sess := c1.QueryStr(t, "SELECT encode(session_id, 'hex') FROM system.sql_instances WHERE id = 1") + require.Len(t, sess, 1) + require.Len(t, sess[0][0], 38) + + // First let's delete the instance AND expire the session; the instance should + // reappear when a new session is acquired, with the new session. + c1.ExecRowsAffected(t, 1, "DELETE FROM system.sql_instances WHERE session_id = decode($1, 'hex')", sess[0][0]) + c1.ExecRowsAffected(t, 1, "DELETE FROM system.sqlliveness WHERE session_id = decode($1, 'hex')", sess[0][0]) + + // Wait until we see the right row appear. + query := fmt.Sprintf(`SELECT count(*) FROM system.sql_instances WHERE id = 1 AND session_id <> decode('%s', 'hex')`, sess[0][0]) + c1.CheckQueryResultsRetry(t, query, [][]string{{"1"}}) + + // Verify that everything else is the same after recreate. + c1.CheckQueryResults(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1", rowBeforeNoSession) + + sess = c1.QueryStr(t, "SELECT encode(session_id, 'hex') FROM system.sql_instances WHERE id = 1") + // Now let's just expire the session and leave the row; the instance row + // should still become correct once it is updated with the new session. + c1.ExecRowsAffected(t, 1, "DELETE FROM system.sqlliveness WHERE session_id = decode($1, 'hex')", sess[0][0]) + + // Wait until we see the right row appear. + query = fmt.Sprintf(`SELECT count(*) FROM system.sql_instances WHERE id = 1 AND session_id <> decode('%s', 'hex')`, sess[0][0]) + c1.CheckQueryResultsRetry(t, query, [][]string{{"1"}}) + + // Verify everything else is still the same after update. + c1.CheckQueryResults(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1", rowBeforeNoSession) + +} + // TestConcurrentCreateAndRelease verifies that concurrent access to instancestorage // to create and release SQL instance IDs works as expected. func TestConcurrentCreateAndRelease(t *testing.T) {