Skip to content

Commit

Permalink
sqlliveness: re-arrange APIs to clarify when the API was blocking
Browse files Browse the repository at this point in the history
By "default" the implementation of Reader was blocking and there was a method
to get a handle to a non-blocking CachedReader(). This asymmetry does not aid
understandability. The non-blocking reader came later, but it is generally the
more desirable interface.

Release note: None
  • Loading branch information
ajwerner committed Mar 14, 2023
1 parent 25c00d4 commit 4f8a911
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 42 deletions.
12 changes: 7 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,13 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
)

cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings, cfg.clock, cfg.rangeFeedFactory)
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings,
cfg.clock, cfg.rangeFeedFactory,
)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.stopper)
cfg.sqlInstanceStorage, cfg.sqlLivenessProvider.BlockingReader(),
cfg.stopper,
)

// We can't use the nodeDailer as the podNodeDailer unless we
// are serving the system tenant despite the fact that we've
Expand Down Expand Up @@ -787,7 +789,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
RowMetrics: &rowMetrics,
InternalRowMetrics: &internalRowMetrics,

SQLLivenessReader: cfg.sqlLivenessProvider,
SQLLivenessReader: cfg.sqlLivenessProvider.BlockingReader(),
JobRegistry: jobRegistry,
Gossip: cfg.gossip,
PodNodeDialer: cfg.podNodeDialer,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ type ExecutorConfig struct {
MetricsRecorder nodeStatusGenerator
SessionRegistry *SessionRegistry
ClosedSessionCache *ClosedSessionCache
SQLLiveness sqlliveness.Liveness
SQLLiveness sqlliveness.Provider
JobRegistry *jobs.Registry
VirtualSchemas *VirtualSchemaHolder
DistSQLPlanner *DistSQLPlanner
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) {
evalCtx.Settings = execCfg.Settings
evalCtx.Codec = execCfg.Codec
evalCtx.Tracer = execCfg.AmbientCtx.Tracer
evalCtx.SQLLivenessReader = execCfg.SQLLiveness
evalCtx.SQLLivenessReader = execCfg.SQLLiveness.BlockingReader()
evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc
evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc
evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs
Expand Down
35 changes: 22 additions & 13 deletions pkg/sql/sqlliveness/slstorage/slstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var CacheSize = settings.RegisterIntSetting(
1024)

// Storage deals with reading and writing session records. It implements the
// sqlliveness.Reader interface, and the slinstace.Writer interface.
// sqlliveness.Storage interface, and the slinstace.Writer interface.
type Storage struct {
settings *cluster.Settings
settingsWatcher *settingswatcher.SettingsWatcher
Expand Down Expand Up @@ -110,7 +110,7 @@ type Storage struct {
}
}

var _ sqlliveness.Reader = &Storage{}
var _ sqlliveness.StorageReader = &Storage{}

// NewTestingStorage constructs a new storage with control for the database
// in which the `sqlliveness` table should exist.
Expand Down Expand Up @@ -186,13 +186,6 @@ func (s *Storage) Start(ctx context.Context) {
s.mu.started = true
}

// IsAlive determines whether a given session is alive. If this method returns
// true, the session may no longer be alive, but if it returns false, the
// session definitely is not alive.
func (s *Storage) IsAlive(ctx context.Context, sid sqlliveness.SessionID) (alive bool, err error) {
return s.isAlive(ctx, sid, sync)
}

type readType byte

const (
Expand Down Expand Up @@ -580,15 +573,31 @@ func (s *Storage) Update(
// currently known state of the session, but will trigger an asynchronous
// refresh of the state of the session if it is not known.
func (s *Storage) CachedReader() sqlliveness.Reader {
return (*cachedStorage)(s)
return (*cachedReader)(s)
}

// BlockingReader reader returns an implementation of sqlliveness.Reader which
// will cache results of previous reads but will synchronously block to
// determine the status of a session which it does not know about or thinks
// might be expired.
func (s *Storage) BlockingReader() sqlliveness.Reader {
return (*blockingReader)(s)
}

type blockingReader Storage

func (s *blockingReader) IsAlive(
ctx context.Context, sid sqlliveness.SessionID,
) (alive bool, err error) {
return (*Storage)(s).isAlive(ctx, sid, sync)
}

// cachedStorage implements the sqlliveness.Reader interface, and the
// cachedReader implements the sqlliveness.Reader interface, and the
// slinstace.Writer interface, but does not read from the underlying store
// synchronously during IsAlive.
type cachedStorage Storage
type cachedReader Storage

func (s *cachedStorage) IsAlive(
func (s *cachedReader) IsAlive(
ctx context.Context, sid sqlliveness.SessionID,
) (alive bool, err error) {
return (*Storage)(s).isAlive(ctx, sid, async)
Expand Down
34 changes: 19 additions & 15 deletions pkg/sql/sqlliveness/slstorage/slstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func testStorage(t *testing.T) {
clock, _, _, stopper, storage := setup(t)
storage.Start(ctx)
defer stopper.Stop(ctx)
reader := storage.BlockingReader()

exp := clock.Now().Add(time.Second.Nanoseconds(), 0)
id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4())
Expand All @@ -100,14 +101,14 @@ func testStorage(t *testing.T) {
require.Equal(t, int64(1), metrics.WriteSuccesses.Count())
}
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count())
require.Equal(t, int64(0), metrics.IsAliveCacheHits.Count())
}
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count())
Expand All @@ -119,6 +120,7 @@ func testStorage(t *testing.T) {
defer stopper.Stop(ctx)
slstorage.GCJitter.Override(ctx, &settings.SV, 0)
storage.Start(ctx)
reader := storage.BlockingReader()
metrics := storage.Metrics()

// GC will run some time after startup.
Expand Down Expand Up @@ -148,10 +150,10 @@ func testStorage(t *testing.T) {

// Verify they are alive.
{
isAlive1, err := storage.IsAlive(ctx, id1)
isAlive1, err := reader.IsAlive(ctx, id1)
require.NoError(t, err)
require.True(t, isAlive1)
isAlive2, err := storage.IsAlive(ctx, id2)
isAlive2, err := reader.IsAlive(ctx, id2)
require.NoError(t, err)
require.True(t, isAlive2)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand All @@ -168,7 +170,7 @@ func testStorage(t *testing.T) {

// Ensure that the cached value is still in use for id2.
{
isAlive, err := storage.IsAlive(ctx, id2)
isAlive, err := reader.IsAlive(ctx, id2)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand All @@ -194,15 +196,15 @@ func testStorage(t *testing.T) {

// Ensure that we now see the id1 as dead. That fact will be cached.
{
isAlive, err := storage.IsAlive(ctx, id1)
isAlive, err := reader.IsAlive(ctx, id1)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
require.Equal(t, int64(2), metrics.IsAliveCacheHits.Count())
}
// Ensure that the fact that it's dead is cached.
{
isAlive, err := storage.IsAlive(ctx, id1)
isAlive, err := reader.IsAlive(ctx, id1)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand All @@ -218,15 +220,15 @@ func testStorage(t *testing.T) {

// Ensure that we now see the id2 as alive.
{
isAlive, err := storage.IsAlive(ctx, id2)
isAlive, err := reader.IsAlive(ctx, id2)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(3), metrics.IsAliveCacheMisses.Count())
require.Equal(t, int64(3), metrics.IsAliveCacheHits.Count())
}
// Ensure that the fact that it's still alive is cached.
{
isAlive, err := storage.IsAlive(ctx, id1)
isAlive, err := reader.IsAlive(ctx, id1)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(3), metrics.IsAliveCacheMisses.Count())
Expand All @@ -237,6 +239,7 @@ func testStorage(t *testing.T) {
clock, timeSource, _, stopper, storage := setup(t)
defer stopper.Stop(ctx)
storage.Start(ctx)
reader := storage.BlockingReader()

exp := clock.Now().Add(time.Second.Nanoseconds(), 0)
id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4())
Expand All @@ -248,7 +251,7 @@ func testStorage(t *testing.T) {
require.Equal(t, int64(1), metrics.WriteSuccesses.Count())
}
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count())
Expand All @@ -259,7 +262,7 @@ func testStorage(t *testing.T) {
timeSource.Advance(time.Second + time.Nanosecond)
// Ensure that we discover it is no longer alive.
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand All @@ -268,7 +271,7 @@ func testStorage(t *testing.T) {
}
// Ensure that the fact that it is no longer alive is cached.
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand Down Expand Up @@ -348,6 +351,7 @@ func testConcurrentAccessesAndEvictions(t *testing.T) {
storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings,
s.SettingsWatcher().(*settingswatcher.SettingsWatcher), table, timeSource.NewTimer)
storage.Start(ctx)
reader := storage.BlockingReader()

const (
runsPerWorker = 100
Expand Down Expand Up @@ -446,7 +450,7 @@ func testConcurrentAccessesAndEvictions(t *testing.T) {
for i := 0; i < runsPerWorker; i++ {
time.Sleep(time.Microsecond)
i, id := pickSession()
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
assert.NoError(t, err)
checkIsAlive(t, i, isAlive)
}
Expand Down Expand Up @@ -603,7 +607,7 @@ func testConcurrentAccessSynchronization(t *testing.T) {
// Now launch another, synchronous reader, which will join
// the single-flight.
g.Go(func() (err error) {
alive, err = storage.IsAlive(ctx, sid)
alive, err = storage.BlockingReader().IsAlive(ctx, sid)
return err
})
// Sleep some tiny amount of time to hopefully allow the other
Expand Down Expand Up @@ -648,7 +652,7 @@ func testConcurrentAccessSynchronization(t *testing.T) {
// Now launch another, synchronous reader, which will join
// the single-flight.
g.Go(func() (err error) {
alive, err = storage.IsAlive(toCancel, sid)
alive, err = storage.BlockingReader().IsAlive(toCancel, sid)
return err
})

Expand Down
18 changes: 11 additions & 7 deletions pkg/sql/sqlliveness/sqlliveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type SessionID string
// Provider is a wrapper around the sqlliveness subsystem for external
// consumption.
type Provider interface {
Liveness
Instance
StorageReader

// Start starts the sqlliveness subsystem. regionPhysicalRep should
// represent the physical representation of the current process region
Expand All @@ -43,18 +44,21 @@ type Provider interface {

// Metrics returns a metric.Struct which holds metrics for the provider.
Metrics() metric.Struct
}

// StorageReader provides access to Readers which either block or do not
// block.
type StorageReader interface {
// BlockingReader returns a Reader which only synchronously
// checks whether a session is alive if it does not have any
// cached information which implies that it currently is.
BlockingReader() Reader

// CachedReader returns a reader which only consults its local cache and
// does not perform any RPCs in the IsAlive call.
CachedReader() Reader
}

// Liveness exposes Reader and Instance interfaces.
type Liveness interface {
Reader
Instance
}

// String returns a hex-encoded version of the SessionID.
func (s SessionID) String() string {
return hex.EncodeToString(encoding.UnsafeConvertStringToBytes(string(s)))
Expand Down

0 comments on commit 4f8a911

Please sign in to comment.