Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: always enable sql_instances maintenance #95355

Merged
merged 2 commits into from
Jan 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 74 additions & 39 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
Expand Down Expand Up @@ -443,6 +444,33 @@ func (s *stopperSessionEventListener) OnSessionDeleted(
return false
}

type refreshInstanceSessionListener struct {
cfg *sqlServerArgs
}

var _ slinstance.SessionEventListener = &stopperSessionEventListener{}

// OnSessionDeleted implements the slinstance.SessionEventListener interface.
func (r *refreshInstanceSessionListener) OnSessionDeleted(
ctx context.Context,
) (createAnotherSession bool) {
if err := r.cfg.stopper.RunAsyncTask(ctx, "refresh-instance-session", func(context.Context) {
nodeID, _ := r.cfg.nodeIDContainer.OptionalNodeID()
s, err := r.cfg.sqlLivenessProvider.Session(ctx)
if err != nil {
log.Errorf(ctx, "faild to get new liveness session ID: %v", err)
}
if _, err := r.cfg.sqlInstanceStorage.CreateNodeInstance(
ctx, s.ID(), s.Expiration(), r.cfg.AdvertiseAddr, r.cfg.SQLAdvertiseAddr, r.cfg.Locality, nodeID,
); err != nil {
log.Errorf(ctx, "failed to update instance with new session ID: %v", err)
}
}); err != nil {
log.Errorf(ctx, "failed to run update of instance with new session ID: %v", err)
}
return true
}

// newSQLServer constructs a new SQLServer. The caller is responsible for
// listening to the server's ShutdownRequested() channel (which is the same as
// cfg.stopTrigger.C()) and stopping cfg.stopper when signaled.
Expand Down Expand Up @@ -483,37 +511,36 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// instance provider without initializing the instance, since this is not a
// SQL pod server.
_, isMixedSQLAndKVNode := cfg.nodeIDContainer.OptionalNodeID()
isSQLPod := !isMixedSQLAndKVNode

sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs)
var sessionEventsConsumer slinstance.SessionEventListener
if isSQLPod {
if !isMixedSQLAndKVNode {
// For SQL pods, we want the process to shutdown when the session liveness
// record is found to be deleted. This is because, if the session is
// deleted, the instance ID used by this server may have been stolen by
// another server, or it may be stolen in the future. This server shouldn't
// use the instance ID anymore, and there's no mechanism for allocating a
// new one after startup.
sessionEventsConsumer = &stopperSessionEventListener{trigger: cfg.stopTrigger}
} else {
sessionEventsConsumer = &refreshInstanceSessionListener{cfg: &cfg}
}
cfg.sqlLivenessProvider = slprovider.New(
cfg.AmbientCtx,
cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, sessionEventsConsumer,
)

if isSQLPod {
if codec.ForSystemTenant() {
return nil, errors.AssertionFailedf("non-system codec used for SQL pod")
}

cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.rangeFeedFactory,
codec, cfg.clock, cfg.stopper)
cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.rangeFeedFactory,
codec, cfg.clock, cfg.stopper)

if isMixedSQLAndKVNode {
cfg.podNodeDialer = cfg.nodeDialer
} else {
// In a multi-tenant environment, use the sqlInstanceReader to resolve
// SQL pod addresses.
addressResolver := func(nodeID roachpb.NodeID) (net.Addr, error) {
Expand All @@ -524,11 +551,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
return &util.UnresolvedAddr{AddressField: info.InstanceRPCAddr}, nil
}
cfg.podNodeDialer = nodedialer.New(cfg.rpcContext, addressResolver)
} else {
if !codec.ForSystemTenant() {
return nil, errors.AssertionFailedf("system codec used for SQL-only node")
}
cfg.podNodeDialer = cfg.nodeDialer
}

jobRegistry := cfg.circularJobRegistry
Expand Down Expand Up @@ -818,8 +840,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// cluster.
var getNodes func(ctx context.Context) ([]roachpb.NodeID, error)
var nodeDialer *nodedialer.Dialer
if !isSQLPod {
if isMixedSQLAndKVNode {
nodeDialer = cfg.nodeDialer
// TODO(dt): any reason not to just always use the instance reader? And just
// pass it directly instead of making a new closure here?
getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) {
var ns []roachpb.NodeID
ls, err := nodeLiveness.GetLivenessesFromKV(ctx)
Expand Down Expand Up @@ -1339,26 +1363,37 @@ func (s *SQLServer) preStart(
// Start the sql liveness subsystem. We'll need it to get a session.
s.sqlLivenessProvider.Start(ctx, regionPhysicalRep)

_, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID()
isTenant := !isMixedSQLAndKVNode
session, err := s.sqlLivenessProvider.Session(ctx)
if err != nil {
return err
}
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, s.internalDB, session.Expiration,
); err != nil {
return err
}
nodeId, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID()

if isTenant {
session, err := s.sqlLivenessProvider.Session(ctx)
var instance sqlinstance.InstanceInfo
if isMixedSQLAndKVNode {
// Write/acquire our instance row.
instance, err = s.sqlInstanceStorage.CreateNodeInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, nodeId,
)
if err != nil {
return err
}
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, s.internalDB, session.Expiration,
); err != nil {
return err
}
// Acquire our instance row.
instance, err := s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality)
} else {
instance, err = s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality,
)
if err != nil {
return err
}
}

if !isMixedSQLAndKVNode {
// TODO(andrei): Release the instance ID on server shutdown. It is not trivial
// to determine where/when exactly to do that, though. Doing it after stopper
// quiescing doesn't work. Doing it too soon, for example as part of draining,
Expand All @@ -1372,13 +1407,13 @@ func (s *SQLServer) preStart(
if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil {
return err
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, especially when we have not
// received data from the rangefeed yet, and if the reader doesn't see
// it, we'd be unable to plan any queries.
s.sqlInstanceReader.Start(ctx, instance)
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, especially when we have not
// received data from the rangefeed yet, and if the reader doesn't see
// it, we'd be unable to plan any queries.
s.sqlInstanceReader.Start(ctx, instance)

s.execCfg.GCJobNotifier.Start(ctx)
s.temporaryObjectCleaner.Start(ctx, stopper)
Expand Down
48 changes: 43 additions & 5 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ func NewStorage(
return NewTestingStorage(db, codec, keys.SQLInstancesTableID, slReader, settings)
}

// CreateNodeInstance claims a unique instance identifier for the SQL pod, and
// associates it with its SQL address and session information.
func (s *Storage) CreateNodeInstance(
ctx context.Context,
sessionID sqlliveness.SessionID,
sessionExpiration hlc.Timestamp,
rpcAddr string,
sqlAddr string,
locality roachpb.Locality,
nodeID roachpb.NodeID,
) (instance sqlinstance.InstanceInfo, _ error) {
return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, nodeID)
}

const noNodeID = 0

// CreateInstance claims a unique instance identifier for the SQL pod, and
// associates it with its SQL address and session information.
func (s *Storage) CreateInstance(
Expand All @@ -144,6 +160,18 @@ func (s *Storage) CreateInstance(
rpcAddr string,
sqlAddr string,
locality roachpb.Locality,
) (instance sqlinstance.InstanceInfo, _ error) {
return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, noNodeID)
}

func (s *Storage) createInstanceRow(
ctx context.Context,
sessionID sqlliveness.SessionID,
sessionExpiration hlc.Timestamp,
rpcAddr string,
sqlAddr string,
locality roachpb.Locality,
nodeID roachpb.NodeID,
) (instance sqlinstance.InstanceInfo, _ error) {
if len(sqlAddr) == 0 || len(rpcAddr) == 0 {
return sqlinstance.InstanceInfo{}, errors.AssertionFailedf("missing sql or rpc address information for instance")
Expand Down Expand Up @@ -177,11 +205,21 @@ func (s *Storage) CreateInstance(
return err
}

// Try to retrieve an available instance ID. This blocks until one
// is available.
availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn)
if err != nil {
return err
// TODO(dt): do we need this at all? this keeps nodeID == instanceID when
// running mixed KV and SQL nodes, but bakes in the assumption that any
// clusters where this happens will contain _only_ mixed KV and SQL nodes
// and thus do not need to worry about finding an _actually_ available ID
// and avoiding conflicts. This is true today but may not be in more
// complex deployments.
if nodeID != noNodeID {
availableID = base.SQLInstanceID(nodeID)
} else {
// Try to retrieve an available instance ID. This blocks until one
// is available.
availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn)
if err != nil {
return err
}
}

key := s.rowcodec.encodeKey(region, availableID)
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/sqlinstance/instancestorage/instancestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,52 @@ func TestSQLAccess(t *testing.T) {
require.NoError(t, rows.Err())
}

func TestRefreshSession(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "abc", Value: "xyz"}}}})
defer s.Stopper().Stop(ctx)

c1 := sqlutils.MakeSQLRunner(sqlDB)

// Everything but the session should stay the same so observe the initial row.
rowBeforeNoSession := c1.QueryStr(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1")
require.Len(t, rowBeforeNoSession, 1)

// This initial session should go away once we expire it below, but let's
// verify it is there for starters and remember it.
sess := c1.QueryStr(t, "SELECT encode(session_id, 'hex') FROM system.sql_instances WHERE id = 1")
require.Len(t, sess, 1)
require.Len(t, sess[0][0], 38)

// First let's delete the instance AND expire the session; the instance should
// reappear when a new session is acquired, with the new session.
c1.ExecRowsAffected(t, 1, "DELETE FROM system.sql_instances WHERE session_id = decode($1, 'hex')", sess[0][0])
c1.ExecRowsAffected(t, 1, "DELETE FROM system.sqlliveness WHERE session_id = decode($1, 'hex')", sess[0][0])

// Wait until we see the right row appear.
query := fmt.Sprintf(`SELECT count(*) FROM system.sql_instances WHERE id = 1 AND session_id <> decode('%s', 'hex')`, sess[0][0])
c1.CheckQueryResultsRetry(t, query, [][]string{{"1"}})

// Verify that everything else is the same after recreate.
c1.CheckQueryResults(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1", rowBeforeNoSession)

sess = c1.QueryStr(t, "SELECT encode(session_id, 'hex') FROM system.sql_instances WHERE id = 1")
// Now let's just expire the session and leave the row; the instance row
// should still become correct once it is updated with the new session.
c1.ExecRowsAffected(t, 1, "DELETE FROM system.sqlliveness WHERE session_id = decode($1, 'hex')", sess[0][0])

// Wait until we see the right row appear.
query = fmt.Sprintf(`SELECT count(*) FROM system.sql_instances WHERE id = 1 AND session_id <> decode('%s', 'hex')`, sess[0][0])
c1.CheckQueryResultsRetry(t, query, [][]string{{"1"}})

// Verify everything else is still the same after update.
c1.CheckQueryResults(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1", rowBeforeNoSession)

}

// TestConcurrentCreateAndRelease verifies that concurrent access to instancestorage
// to create and release SQL instance IDs works as expected.
func TestConcurrentCreateAndRelease(t *testing.T) {
Expand Down