diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 09afa6ed638b..652da7e7ab56 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -558,11 +558,13 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ) cfg.sqlInstanceStorage = instancestorage.NewStorage( - cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings, cfg.clock, cfg.rangeFeedFactory) + cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings, + cfg.clock, cfg.rangeFeedFactory, + ) cfg.sqlInstanceReader = instancestorage.NewReader( - cfg.sqlInstanceStorage, - cfg.sqlLivenessProvider, - cfg.stopper) + cfg.sqlInstanceStorage, cfg.sqlLivenessProvider.BlockingReader(), + cfg.stopper, + ) // We can't use the nodeDailer as the podNodeDailer unless we // are serving the system tenant despite the fact that we've @@ -787,7 +789,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { RowMetrics: &rowMetrics, InternalRowMetrics: &internalRowMetrics, - SQLLivenessReader: cfg.sqlLivenessProvider, + SQLLivenessReader: cfg.sqlLivenessProvider.BlockingReader(), JobRegistry: jobRegistry, Gossip: cfg.gossip, PodNodeDialer: cfg.podNodeDialer, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 6336c770fa66..27f01d43f9ff 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1244,7 +1244,7 @@ type ExecutorConfig struct { MetricsRecorder nodeStatusGenerator SessionRegistry *SessionRegistry ClosedSessionCache *ClosedSessionCache - SQLLiveness sqlliveness.Liveness + SQLLiveness sqlliveness.Provider JobRegistry *jobs.Registry VirtualSchemas *VirtualSchemaHolder DistSQLPlanner *DistSQLPlanner diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index cb6bd9544b3a..f061ce630dc2 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -114,7 +114,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) { evalCtx.Settings = execCfg.Settings evalCtx.Codec = execCfg.Codec evalCtx.Tracer = execCfg.AmbientCtx.Tracer - evalCtx.SQLLivenessReader = execCfg.SQLLiveness + evalCtx.SQLLivenessReader = execCfg.SQLLiveness.BlockingReader() evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs diff --git a/pkg/sql/sqlliveness/slstorage/slstorage.go b/pkg/sql/sqlliveness/slstorage/slstorage.go index 2d148c7cbfa1..a3fe04b38a6b 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage.go @@ -79,7 +79,7 @@ var CacheSize = settings.RegisterIntSetting( 1024) // Storage deals with reading and writing session records. It implements the -// sqlliveness.Reader interface, and the slinstace.Writer interface. +// sqlliveness.Storage interface, and the slinstace.Writer interface. type Storage struct { settings *cluster.Settings settingsWatcher *settingswatcher.SettingsWatcher @@ -110,7 +110,7 @@ type Storage struct { } } -var _ sqlliveness.Reader = &Storage{} +var _ sqlliveness.StorageReader = &Storage{} // NewTestingStorage constructs a new storage with control for the database // in which the `sqlliveness` table should exist. @@ -186,13 +186,6 @@ func (s *Storage) Start(ctx context.Context) { s.mu.started = true } -// IsAlive determines whether a given session is alive. If this method returns -// true, the session may no longer be alive, but if it returns false, the -// session definitely is not alive. -func (s *Storage) IsAlive(ctx context.Context, sid sqlliveness.SessionID) (alive bool, err error) { - return s.isAlive(ctx, sid, sync) -} - type readType byte const ( @@ -580,15 +573,31 @@ func (s *Storage) Update( // currently known state of the session, but will trigger an asynchronous // refresh of the state of the session if it is not known. func (s *Storage) CachedReader() sqlliveness.Reader { - return (*cachedStorage)(s) + return (*cachedReader)(s) +} + +// BlockingReader reader returns an implementation of sqlliveness.Reader which +// will cache results of previous reads but will synchronously block to +// determine the status of a session which it does not know about or thinks +// might be expired. +func (s *Storage) BlockingReader() sqlliveness.Reader { + return (*blockingReader)(s) +} + +type blockingReader Storage + +func (s *blockingReader) IsAlive( + ctx context.Context, sid sqlliveness.SessionID, +) (alive bool, err error) { + return (*Storage)(s).isAlive(ctx, sid, sync) } -// cachedStorage implements the sqlliveness.Reader interface, and the +// cachedReader implements the sqlliveness.Reader interface, and the // slinstace.Writer interface, but does not read from the underlying store // synchronously during IsAlive. -type cachedStorage Storage +type cachedReader Storage -func (s *cachedStorage) IsAlive( +func (s *cachedReader) IsAlive( ctx context.Context, sid sqlliveness.SessionID, ) (alive bool, err error) { return (*Storage)(s).isAlive(ctx, sid, async) diff --git a/pkg/sql/sqlliveness/slstorage/slstorage_test.go b/pkg/sql/sqlliveness/slstorage/slstorage_test.go index 2404316ce7f0..2c2b1b4a899e 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage_test.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage_test.go @@ -89,6 +89,7 @@ func testStorage(t *testing.T) { clock, _, _, stopper, storage := setup(t) storage.Start(ctx) defer stopper.Stop(ctx) + reader := storage.BlockingReader() exp := clock.Now().Add(time.Second.Nanoseconds(), 0) id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) @@ -100,14 +101,14 @@ func testStorage(t *testing.T) { require.Equal(t, int64(1), metrics.WriteSuccesses.Count()) } { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count()) require.Equal(t, int64(0), metrics.IsAliveCacheHits.Count()) } { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count()) @@ -119,6 +120,7 @@ func testStorage(t *testing.T) { defer stopper.Stop(ctx) slstorage.GCJitter.Override(ctx, &settings.SV, 0) storage.Start(ctx) + reader := storage.BlockingReader() metrics := storage.Metrics() // GC will run some time after startup. @@ -148,10 +150,10 @@ func testStorage(t *testing.T) { // Verify they are alive. { - isAlive1, err := storage.IsAlive(ctx, id1) + isAlive1, err := reader.IsAlive(ctx, id1) require.NoError(t, err) require.True(t, isAlive1) - isAlive2, err := storage.IsAlive(ctx, id2) + isAlive2, err := reader.IsAlive(ctx, id2) require.NoError(t, err) require.True(t, isAlive2) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -168,7 +170,7 @@ func testStorage(t *testing.T) { // Ensure that the cached value is still in use for id2. { - isAlive, err := storage.IsAlive(ctx, id2) + isAlive, err := reader.IsAlive(ctx, id2) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -194,7 +196,7 @@ func testStorage(t *testing.T) { // Ensure that we now see the id1 as dead. That fact will be cached. { - isAlive, err := storage.IsAlive(ctx, id1) + isAlive, err := reader.IsAlive(ctx, id1) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -202,7 +204,7 @@ func testStorage(t *testing.T) { } // Ensure that the fact that it's dead is cached. { - isAlive, err := storage.IsAlive(ctx, id1) + isAlive, err := reader.IsAlive(ctx, id1) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -218,7 +220,7 @@ func testStorage(t *testing.T) { // Ensure that we now see the id2 as alive. { - isAlive, err := storage.IsAlive(ctx, id2) + isAlive, err := reader.IsAlive(ctx, id2) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(3), metrics.IsAliveCacheMisses.Count()) @@ -226,7 +228,7 @@ func testStorage(t *testing.T) { } // Ensure that the fact that it's still alive is cached. { - isAlive, err := storage.IsAlive(ctx, id1) + isAlive, err := reader.IsAlive(ctx, id1) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(3), metrics.IsAliveCacheMisses.Count()) @@ -237,6 +239,7 @@ func testStorage(t *testing.T) { clock, timeSource, _, stopper, storage := setup(t) defer stopper.Stop(ctx) storage.Start(ctx) + reader := storage.BlockingReader() exp := clock.Now().Add(time.Second.Nanoseconds(), 0) id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) @@ -248,7 +251,7 @@ func testStorage(t *testing.T) { require.Equal(t, int64(1), metrics.WriteSuccesses.Count()) } { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count()) @@ -259,7 +262,7 @@ func testStorage(t *testing.T) { timeSource.Advance(time.Second + time.Nanosecond) // Ensure that we discover it is no longer alive. { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -268,7 +271,7 @@ func testStorage(t *testing.T) { } // Ensure that the fact that it is no longer alive is cached. { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -348,6 +351,7 @@ func testConcurrentAccessesAndEvictions(t *testing.T) { storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, s.SettingsWatcher().(*settingswatcher.SettingsWatcher), table, timeSource.NewTimer) storage.Start(ctx) + reader := storage.BlockingReader() const ( runsPerWorker = 100 @@ -446,7 +450,7 @@ func testConcurrentAccessesAndEvictions(t *testing.T) { for i := 0; i < runsPerWorker; i++ { time.Sleep(time.Microsecond) i, id := pickSession() - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) assert.NoError(t, err) checkIsAlive(t, i, isAlive) } @@ -603,7 +607,7 @@ func testConcurrentAccessSynchronization(t *testing.T) { // Now launch another, synchronous reader, which will join // the single-flight. g.Go(func() (err error) { - alive, err = storage.IsAlive(ctx, sid) + alive, err = storage.BlockingReader().IsAlive(ctx, sid) return err }) // Sleep some tiny amount of time to hopefully allow the other @@ -648,7 +652,7 @@ func testConcurrentAccessSynchronization(t *testing.T) { // Now launch another, synchronous reader, which will join // the single-flight. g.Go(func() (err error) { - alive, err = storage.IsAlive(toCancel, sid) + alive, err = storage.BlockingReader().IsAlive(toCancel, sid) return err }) diff --git a/pkg/sql/sqlliveness/sqlliveness.go b/pkg/sql/sqlliveness/sqlliveness.go index 6510277f5dc1..6c70e02aebf4 100644 --- a/pkg/sql/sqlliveness/sqlliveness.go +++ b/pkg/sql/sqlliveness/sqlliveness.go @@ -34,7 +34,8 @@ type SessionID string // Provider is a wrapper around the sqlliveness subsystem for external // consumption. type Provider interface { - Liveness + Instance + StorageReader // Start starts the sqlliveness subsystem. regionPhysicalRep should // represent the physical representation of the current process region @@ -43,18 +44,21 @@ type Provider interface { // Metrics returns a metric.Struct which holds metrics for the provider. Metrics() metric.Struct +} + +// StorageReader provides access to Readers which either block or do not +// block. +type StorageReader interface { + // BlockingReader returns a Reader which only synchronously + // checks whether a session is alive if it does not have any + // cached information which implies that it currently is. + BlockingReader() Reader // CachedReader returns a reader which only consults its local cache and // does not perform any RPCs in the IsAlive call. CachedReader() Reader } -// Liveness exposes Reader and Instance interfaces. -type Liveness interface { - Reader - Instance -} - // String returns a hex-encoded version of the SessionID. func (s SessionID) String() string { return hex.EncodeToString(encoding.UnsafeConvertStringToBytes(string(s)))