diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 70bec476289e..522c3e91a3ab 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -10,7 +10,6 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/sqlliveness", - "//pkg/util/contextutil", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/log", @@ -19,7 +18,6 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/uuid", - "@com_github_cockroachdb_errors//:errors", ], ) @@ -37,7 +35,6 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slstorage", - "//pkg/testutils", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 73547c91a5ad..fbf7feddd9c5 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" - "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -30,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" ) var ( @@ -153,14 +151,6 @@ func (l *Instance) setSession(s *session) { } func (l *Instance) clearSession(ctx context.Context) { - l.checkExpiry(ctx) - l.mu.Lock() - defer l.mu.Unlock() - l.mu.s = nil - l.mu.blockCh = make(chan struct{}) -} - -func (l *Instance) checkExpiry(ctx context.Context) { l.mu.Lock() defer l.mu.Unlock() if expiration := l.mu.s.Expiration(); expiration.Less(l.clock.Now()) { @@ -168,6 +158,8 @@ func (l *Instance) checkExpiry(ctx context.Context) { // associated with the session. l.mu.s.invokeSessionExpiryCallbacks(ctx) } + l.mu.s = nil + l.mu.blockCh = make(chan struct{}) } // createSession tries until it can create a new session and returns an error @@ -262,17 +254,9 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { case <-t.C: t.Read = true s, _ := l.getSessionOrBlockCh() - // TODO(aaditya): consider combining `DefaultTTL` and `DefaultHeartBeat` into a single knob to make these - // timeouts less fragile - timeout := l.ttl()/2 + l.hb() if s == nil { - var newSession *session - if err := contextutil.RunWithTimeout(ctx, "sqlliveness create session", timeout, func(ctx context.Context) error { - var err error - newSession, err = l.createSession(ctx) - return err - }); err != nil { - log.Errorf(ctx, "sqlliveness failed to create new session: %v", err) + newSession, err := l.createSession(ctx) + if err != nil { func() { l.mu.Lock() defer l.mu.Unlock() @@ -288,37 +272,21 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { t.Reset(l.hb()) continue } - var found bool - err := contextutil.RunWithTimeout(ctx, "sqlliveness extend session", timeout, func(ctx context.Context) error { - var err error - found, err = l.extendSession(ctx, s) - return err - }) - switch { - case errors.HasType(err, (*contextutil.TimeoutError)(nil)): - // Retry without clearing the session because we don't know the current status. - l.checkExpiry(ctx) - t.Reset(0) - continue - case err != nil && ctx.Err() == nil: - log.Errorf(ctx, "sqlliveness failed to extend session: %v", err) - fallthrough - case err != nil: - // TODO(ajwerner): Decide whether we actually should exit the heartbeat loop here if the context is not - // canceled. Consider the case of an ambiguous result error: shouldn't we try again? + found, err := l.extendSession(ctx, s) + if err != nil { l.clearSession(ctx) return - case !found: - // No existing session found, immediately create one. + } + if !found { l.clearSession(ctx) // Start next loop iteration immediately to insert a new session. t.Reset(0) - default: - if log.V(2) { - log.Infof(ctx, "extended SQL liveness session %s", s.ID()) - } - t.Reset(l.hb()) + continue + } + if log.V(2) { + log.Infof(ctx, "extended SQL liveness session %s", s.ID()) } + t.Reset(l.hb()) } } } diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index c0cf276b00e3..75b069bab914 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -12,7 +12,6 @@ package slinstance_test import ( "context" - "sync/atomic" "testing" "time" @@ -21,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -42,8 +40,8 @@ func TestSQLInstance(t *testing.T) { clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion, true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond) fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) @@ -93,113 +91,3 @@ func TestSQLInstance(t *testing.T) { _, err = sqlInstance.Session(ctx) require.Error(t, err) } - -// TestSQLInstanceDeadlines tests that we have proper deadlines set on the -// create and extend session operations. This is done by blocking the fake -// storage layer and ensuring that no sessions get created because the -// timeouts are constantly triggered. -func TestSQLInstanceDeadlines(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx, stopper := context.Background(), stop.NewStopper() - defer stopper.Stop(ctx) - - clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */) - settings := cluster.MakeTestingClusterSettingsWithVersions( - clusterversion.TestingBinaryVersion, - clusterversion.TestingBinaryMinSupportedVersion, - true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) - - fakeStorage := slstorage.NewFakeStorage() - // block the fake storage - fakeStorage.SetBlockCh() - cleanUpFunc := func() { - fakeStorage.CloseBlockCh() - } - defer cleanUpFunc() - - sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) - sqlInstance.Start(ctx) - - // verify that we do not create a session - require.Never( - t, - func() bool { - _, err := sqlInstance.Session(ctx) - return err == nil - }, - 100*time.Millisecond, 10*time.Millisecond, - ) -} - -// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the -// create and extend session operations. This tests the case where the session is -// successfully created first and then blocks indefinitely. -func TestSQLInstanceDeadlinesExtend(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx, stopper := context.Background(), stop.NewStopper() - defer stopper.Stop(ctx) - - mt := timeutil.NewManualTime(timeutil.Unix(0, 42)) - clock := hlc.NewClock(mt, time.Nanosecond /* maxOffset */) - settings := cluster.MakeTestingClusterSettingsWithVersions( - clusterversion.TestingBinaryVersion, - clusterversion.TestingBinaryMinSupportedVersion, - true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) - // Must be shorter than the storage sleep amount below - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) - - fakeStorage := slstorage.NewFakeStorage() - sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) - sqlInstance.Start(ctx) - - // verify that eventually session is created successfully - testutils.SucceedsSoon( - t, - func() error { - _, err := sqlInstance.Session(ctx) - return err - }, - ) - - // verify that session is also extended successfully a few times - require.Never( - t, - func() bool { - _, err := sqlInstance.Session(ctx) - return err != nil - }, - 100*time.Millisecond, 10*time.Millisecond, - ) - - // register a callback for verification that this session expired - var sessionExpired atomic.Bool - s, _ := sqlInstance.Session(ctx) - s.RegisterCallbackForSessionExpiry(func(ctx context.Context) { - sessionExpired.Store(true) - }) - - // block the fake storage - fakeStorage.SetBlockCh() - cleanUpFunc := func() { - fakeStorage.CloseBlockCh() - } - defer cleanUpFunc() - // advance manual clock so that session expires - mt.Advance(slinstance.DefaultTTL.Get(&settings.SV)) - - // expect session to expire - require.Eventually( - t, - func() bool { - return sessionExpired.Load() - }, - testutils.DefaultSucceedsSoonDuration, 10*time.Millisecond, - ) -} diff --git a/pkg/sql/sqlliveness/slstorage/test_helpers.go b/pkg/sql/sqlliveness/slstorage/test_helpers.go index 91952ff538d1..23eb1115f217 100644 --- a/pkg/sql/sqlliveness/slstorage/test_helpers.go +++ b/pkg/sql/sqlliveness/slstorage/test_helpers.go @@ -24,7 +24,6 @@ type FakeStorage struct { mu struct { syncutil.Mutex sessions map[sqlliveness.SessionID]hlc.Timestamp - blockCh chan struct{} } } @@ -47,16 +46,8 @@ func (s *FakeStorage) IsAlive( // Insert implements the sqlliveness.Storage interface. func (s *FakeStorage) Insert( - ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, + _ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, ) error { - if ch := s.getBlockCh(); ch != nil { - select { - case <-ch: - break - case <-ctx.Done(): - return ctx.Err() - } - } s.mu.Lock() defer s.mu.Unlock() if _, ok := s.mu.sessions[sid]; ok { @@ -68,16 +59,8 @@ func (s *FakeStorage) Insert( // Update implements the sqlliveness.Storage interface. func (s *FakeStorage) Update( - ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, + _ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, ) (bool, error) { - if ch := s.getBlockCh(); ch != nil { - select { - case <-ch: - break - case <-ctx.Done(): - return false, ctx.Err() - } - } s.mu.Lock() defer s.mu.Unlock() if _, ok := s.mu.sessions[sid]; !ok { @@ -94,23 +77,3 @@ func (s *FakeStorage) Delete(_ context.Context, sid sqlliveness.SessionID) error delete(s.mu.sessions, sid) return nil } - -// SetBlockCh is used to block the storage for testing purposes -func (s *FakeStorage) SetBlockCh() { - s.mu.Lock() - defer s.mu.Unlock() - s.mu.blockCh = make(chan struct{}) -} - -// CloseBlockCh is used to unblock the storage for testing purposes -func (s *FakeStorage) CloseBlockCh() { - s.mu.Lock() - defer s.mu.Unlock() - close(s.mu.blockCh) -} - -func (s *FakeStorage) getBlockCh() chan struct{} { - s.mu.Lock() - defer s.mu.Unlock() - return s.mu.blockCh -}