Skip to content

Commit

Permalink
server: always enable sql_instances maintenance
Browse files Browse the repository at this point in the history
Previously the system.sql_instances table was only maintained by SQL
servers that were operating in "pod" mode, i.e. not in mixed KV and SQL
process nodes, where KV-level liveness and gossip provides an alternative
means of node discovery that can be used by the SQL layer when searching
for other SQL instances. However this inconsistency makes writing correct
remote-node discovery and interaction SQL-level code difficult: in some
cases such code needs to consult the instances list, and in some the KV
liveness store, which when combined with complexities of doing so around
initialization, dependency-injection, etc can become hard to maintain.

Additionally such a design precludes a cluster where some SQL instances
are in mixed KV nodes and some are not, as the non-KV nodes would have
no way discover the KV ones. Such deployments are not currently possible
but could be in the future.

Instead, this change enabled maintenance of the sql_instances table by
all SQL servers, whether running in their own processes or embedded in
a KV storage node process. This paves the way for making the means of
discovery of SQL servers uniform across all SQL server types: they will
all be able to simply consult the instances list, to find any other SQL
servers, regardless of where those SQL servers are running.

A follow-up change could simplify DistSQLPhysicalPlanner, specifically
the SetupAllNodesPlanning method that has two different implementations
due to the previous inconsistency in the available APIs.

Release note: none.
Epic: none.
  • Loading branch information
dt committed Jan 17, 2023
1 parent fa43210 commit a2b455b
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 43 deletions.
80 changes: 42 additions & 38 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
Expand Down Expand Up @@ -476,11 +477,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// instance provider without initializing the instance, since this is not a
// SQL pod server.
_, isMixedSQLAndKVNode := cfg.nodeIDContainer.OptionalNodeID()
isSQLPod := !isMixedSQLAndKVNode

sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs)
var sessionEventsConsumer slinstance.SessionEventListener
if isSQLPod {
if !isMixedSQLAndKVNode {
// For SQL pods, we want the process to shutdown when the session liveness
// record is found to be deleted. This is because, if the session is
// deleted, the instance ID used by this server may have been stolen by
Expand All @@ -494,19 +494,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, sessionEventsConsumer,
)

if isSQLPod {
if codec.ForSystemTenant() {
return nil, errors.AssertionFailedf("non-system codec used for SQL pod")
}

cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.rangeFeedFactory,
codec, cfg.clock, cfg.stopper)
cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.rangeFeedFactory,
codec, cfg.clock, cfg.stopper)

if isMixedSQLAndKVNode {
cfg.podNodeDialer = cfg.nodeDialer
} else {
// In a multi-tenant environment, use the sqlInstanceReader to resolve
// SQL pod addresses.
addressResolver := func(nodeID roachpb.NodeID) (net.Addr, error) {
Expand All @@ -517,11 +515,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
return &util.UnresolvedAddr{AddressField: info.InstanceRPCAddr}, nil
}
cfg.podNodeDialer = nodedialer.New(cfg.rpcContext, addressResolver)
} else {
if !codec.ForSystemTenant() {
return nil, errors.AssertionFailedf("system codec used for SQL-only node")
}
cfg.podNodeDialer = cfg.nodeDialer
}

jobRegistry := cfg.circularJobRegistry
Expand Down Expand Up @@ -815,8 +808,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// cluster.
var getNodes func(ctx context.Context) ([]roachpb.NodeID, error)
var nodeDialer *nodedialer.Dialer
if !isSQLPod {
if isMixedSQLAndKVNode {
nodeDialer = cfg.nodeDialer
// TODO(dt): any reason not to just always use the instance reader? And just
// pass it directly instead of making a new closure here?
getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) {
var ns []roachpb.NodeID
ls, err := nodeLiveness.GetLivenessesFromKV(ctx)
Expand Down Expand Up @@ -1346,26 +1341,35 @@ func (s *SQLServer) preStart(
// Start the sql liveness subsystem. We'll need it to get a session.
s.sqlLivenessProvider.Start(ctx, regionPhysicalRep)

_, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID()
isTenant := !isMixedSQLAndKVNode
session, err := s.sqlLivenessProvider.Session(ctx)
if err != nil {
return err
}
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration,
); err != nil {
return err
}
nodeId, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID()

if isTenant {
session, err := s.sqlLivenessProvider.Session(ctx)
var instance sqlinstance.InstanceInfo
if isMixedSQLAndKVNode {
// Write/acquire our instance row.
instance, err = s.sqlInstanceStorage.CreateNodeInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, nodeId)
if err != nil {
return err
}
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration,
); err != nil {
return err
}
// Acquire our instance row.
instance, err := s.sqlInstanceStorage.CreateInstance(
} else {
instance, err = s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality)
if err != nil {
return err
}
}

if !isMixedSQLAndKVNode {
// TODO(andrei): Release the instance ID on server shutdown. It is not trivial
// to determine where/when exactly to do that, though. Doing it after stopper
// quiescing doesn't work. Doing it too soon, for example as part of draining,
Expand All @@ -1379,13 +1383,13 @@ func (s *SQLServer) preStart(
if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil {
return err
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, especially when we have not
// received data from the rangefeed yet, and if the reader doesn't see
// it, we'd be unable to plan any queries.
s.sqlInstanceReader.Start(ctx, instance)
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, especially when we have not
// received data from the rangefeed yet, and if the reader doesn't see
// it, we'd be unable to plan any queries.
s.sqlInstanceReader.Start(ctx, instance)

s.execCfg.GCJobNotifier.Start(ctx)
s.temporaryObjectCleaner.Start(ctx, stopper)
Expand Down
49 changes: 44 additions & 5 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ func NewStorage(
return NewTestingStorage(db, codec, keys.SQLInstancesTableID, slReader, settings)
}

// CreateNodeInstance claims a unique instance identifier for the SQL pod, and
// associates it with its SQL address and session information.
func (s *Storage) CreateNodeInstance(
ctx context.Context,
sessionID sqlliveness.SessionID,
sessionExpiration hlc.Timestamp,
rpcAddr string,
sqlAddr string,
locality roachpb.Locality,
nodeID roachpb.NodeID,
) (instance sqlinstance.InstanceInfo, _ error) {
return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, nodeID)
}

const noNodeID = 0

// CreateInstance claims a unique instance identifier for the SQL pod, and
// associates it with its SQL address and session information.
func (s *Storage) CreateInstance(
Expand All @@ -145,6 +161,19 @@ func (s *Storage) CreateInstance(
sqlAddr string,
locality roachpb.Locality,
) (instance sqlinstance.InstanceInfo, _ error) {
return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, noNodeID)
}

func (s *Storage) createInstanceRow(
ctx context.Context,
sessionID sqlliveness.SessionID,
sessionExpiration hlc.Timestamp,
rpcAddr string,
sqlAddr string,
locality roachpb.Locality,
nodeID roachpb.NodeID,
) (instance sqlinstance.InstanceInfo, _ error) {

if len(sqlAddr) == 0 || len(rpcAddr) == 0 {
return sqlinstance.InstanceInfo{}, errors.AssertionFailedf("missing sql or rpc address information for instance")
}
Expand Down Expand Up @@ -177,11 +206,21 @@ func (s *Storage) CreateInstance(
return err
}

// Try to retrieve an available instance ID. This blocks until one
// is available.
availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn)
if err != nil {
return err
// TODO(dt): do we need this at all? this keeps nodeID == instanceID when
// running mixed KV and SQL nodes, but bakes in the assumption that any
// clusters where this happens will contain _only_ mixed KV and SQL nodes
// and thus do not need to worry about finding an _actually_ available ID
// and avoiding conflicts. This is true today but may not be in more
// complex deployments.
if nodeID != noNodeID {
availableID = base.SQLInstanceID(nodeID)
} else {
// Try to retrieve an available instance ID. This blocks until one
// is available.
availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn)
if err != nil {
return err
}
}

key := s.rowcodec.encodeKey(region, availableID)
Expand Down

0 comments on commit a2b455b

Please sign in to comment.