Skip to content

Commit

Permalink
server: refresh instance rows when liveness changes
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt committed Jan 20, 2023
1 parent a788bfb commit d916cc6
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,33 @@ func (s *stopperSessionEventListener) OnSessionDeleted(
return false
}

type refreshInstanceSessionListener struct {
cfg *sqlServerArgs
}

var _ slinstance.SessionEventListener = &stopperSessionEventListener{}

// OnSessionDeleted implements the slinstance.SessionEventListener interface.
func (r *refreshInstanceSessionListener) OnSessionDeleted(
ctx context.Context,
) (createAnotherSession bool) {
if err := r.cfg.stopper.RunAsyncTask(ctx, "refresh-instance-session", func(context.Context) {
nodeID, _ := r.cfg.nodeIDContainer.OptionalNodeID()
s, err := r.cfg.sqlLivenessProvider.Session(ctx)
if err != nil {
log.Errorf(ctx, "faild to get new liveness session ID: %v", err)
}
if _, err := r.cfg.sqlInstanceStorage.CreateNodeInstance(
ctx, s.ID(), s.Expiration(), r.cfg.AdvertiseAddr, r.cfg.SQLAdvertiseAddr, r.cfg.Locality, nodeID,
); err != nil {
log.Errorf(ctx, "failed to update instance with new session ID: %v", err)
}
}); err != nil {
log.Errorf(ctx, "failed to run update of instance with new session ID: %v", err)
}
return true
}

// newSQLServer constructs a new SQLServer. The caller is responsible for
// listening to the server's ShutdownRequested() channel (which is the same as
// cfg.stopTrigger.C()) and stopping cfg.stopper when signaled.
Expand Down Expand Up @@ -495,6 +522,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// use the instance ID anymore, and there's no mechanism for allocating a
// new one after startup.
sessionEventsConsumer = &stopperSessionEventListener{trigger: cfg.stopTrigger}
} else {
sessionEventsConsumer = &refreshInstanceSessionListener{cfg: &cfg}
}
cfg.sqlLivenessProvider = slprovider.New(
cfg.AmbientCtx,
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/sqlinstance/instancestorage/instancestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,52 @@ func TestSQLAccess(t *testing.T) {
require.NoError(t, rows.Err())
}

func TestRefreshSession(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "abc", Value: "xyz"}}}})
defer s.Stopper().Stop(ctx)

c1 := sqlutils.MakeSQLRunner(sqlDB)

// Everything but the session should stay the same so observe the initial row.
rowBeforeNoSession := c1.QueryStr(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1")
require.Len(t, rowBeforeNoSession, 1)

// This initial session should go away once we expire it below, but let's
// verify it is there for starters and remember it.
sess := c1.QueryStr(t, "SELECT encode(session_id, 'hex') FROM system.sql_instances WHERE id = 1")
require.Len(t, sess, 1)
require.Len(t, sess[0][0], 38)

// First let's delete the instance AND expire the session; the instance should
// reappear when a new session is acquired, with the new session.
c1.ExecRowsAffected(t, 1, "DELETE FROM system.sql_instances WHERE session_id = decode($1, 'hex')", sess[0][0])
c1.ExecRowsAffected(t, 1, "DELETE FROM system.sqlliveness WHERE session_id = decode($1, 'hex')", sess[0][0])

// Wait until we see the right row appear.
query := fmt.Sprintf(`SELECT count(*) FROM system.sql_instances WHERE id = 1 AND session_id <> decode('%s', 'hex')`, sess[0][0])
c1.CheckQueryResultsRetry(t, query, [][]string{{"1"}})

// Verify that everything else is the same after recreate.
c1.CheckQueryResults(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1", rowBeforeNoSession)

sess = c1.QueryStr(t, "SELECT encode(session_id, 'hex') FROM system.sql_instances WHERE id = 1")
// Now let's just expire the session and leave the row; the instance row
// should still become correct once it is updated with the new session.
c1.ExecRowsAffected(t, 1, "DELETE FROM system.sqlliveness WHERE session_id = decode($1, 'hex')", sess[0][0])

// Wait until we see the right row appear.
query = fmt.Sprintf(`SELECT count(*) FROM system.sql_instances WHERE id = 1 AND session_id <> decode('%s', 'hex')`, sess[0][0])
c1.CheckQueryResultsRetry(t, query, [][]string{{"1"}})

// Verify everything else is still the same after update.
c1.CheckQueryResults(t, "SELECT id, addr, sql_addr, locality FROM system.sql_instances WHERE id = 1", rowBeforeNoSession)

}

// TestConcurrentCreateAndRelease verifies that concurrent access to instancestorage
// to create and release SQL instance IDs works as expected.
func TestConcurrentCreateAndRelease(t *testing.T) {
Expand Down

0 comments on commit d916cc6

Please sign in to comment.