Skip to content

Commit

Permalink
Merge pull request #90271 from aadityasondhi/aadityas/revert-timeouts…
Browse files Browse the repository at this point in the history
…-sqlliveness-22.2

release-22.2: sqlliveness: revert `add timeouts to heartbeats`
  • Loading branch information
aadityasondhi authored Oct 21, 2022
2 parents dab196a + a7d1504 commit 6cdd1f1
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 201 deletions.
3 changes: 0 additions & 3 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand All @@ -19,7 +18,6 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand All @@ -37,7 +35,6 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/testutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
58 changes: 13 additions & 45 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

var (
Expand Down Expand Up @@ -153,21 +151,15 @@ func (l *Instance) setSession(s *session) {
}

func (l *Instance) clearSession(ctx context.Context) {
l.checkExpiry(ctx)
l.mu.Lock()
defer l.mu.Unlock()
l.mu.s = nil
l.mu.blockCh = make(chan struct{})
}

func (l *Instance) checkExpiry(ctx context.Context) {
l.mu.Lock()
defer l.mu.Unlock()
if expiration := l.mu.s.Expiration(); expiration.Less(l.clock.Now()) {
// If the session has expired, invoke the session expiry callbacks
// associated with the session.
l.mu.s.invokeSessionExpiryCallbacks(ctx)
}
l.mu.s = nil
l.mu.blockCh = make(chan struct{})
}

// createSession tries until it can create a new session and returns an error
Expand Down Expand Up @@ -262,17 +254,9 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
case <-t.C:
t.Read = true
s, _ := l.getSessionOrBlockCh()
// TODO(aaditya): consider combining `DefaultTTL` and `DefaultHeartBeat` into a single knob to make these
// timeouts less fragile
timeout := l.ttl()/2 + l.hb()
if s == nil {
var newSession *session
if err := contextutil.RunWithTimeout(ctx, "sqlliveness create session", timeout, func(ctx context.Context) error {
var err error
newSession, err = l.createSession(ctx)
return err
}); err != nil {
log.Errorf(ctx, "sqlliveness failed to create new session: %v", err)
newSession, err := l.createSession(ctx)
if err != nil {
func() {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -288,37 +272,21 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Reset(l.hb())
continue
}
var found bool
err := contextutil.RunWithTimeout(ctx, "sqlliveness extend session", timeout, func(ctx context.Context) error {
var err error
found, err = l.extendSession(ctx, s)
return err
})
switch {
case errors.HasType(err, (*contextutil.TimeoutError)(nil)):
// Retry without clearing the session because we don't know the current status.
l.checkExpiry(ctx)
t.Reset(0)
continue
case err != nil && ctx.Err() == nil:
log.Errorf(ctx, "sqlliveness failed to extend session: %v", err)
fallthrough
case err != nil:
// TODO(ajwerner): Decide whether we actually should exit the heartbeat loop here if the context is not
// canceled. Consider the case of an ambiguous result error: shouldn't we try again?
found, err := l.extendSession(ctx, s)
if err != nil {
l.clearSession(ctx)
return
case !found:
// No existing session found, immediately create one.
}
if !found {
l.clearSession(ctx)
// Start next loop iteration immediately to insert a new session.
t.Reset(0)
default:
if log.V(2) {
log.Infof(ctx, "extended SQL liveness session %s", s.ID())
}
t.Reset(l.hb())
continue
}
if log.V(2) {
log.Infof(ctx, "extended SQL liveness session %s", s.ID())
}
t.Reset(l.hb())
}
}
}
Expand Down
116 changes: 2 additions & 114 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package slinstance_test

import (
"context"
"sync/atomic"
"testing"
"time"

Expand All @@ -21,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -42,8 +40,8 @@ func TestSQLInstance(t *testing.T) {
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
Expand Down Expand Up @@ -93,113 +91,3 @@ func TestSQLInstance(t *testing.T) {
_, err = sqlInstance.Session(ctx)
require.Error(t, err)
}

// TestSQLInstanceDeadlines tests that we have proper deadlines set on the
// create and extend session operations. This is done by blocking the fake
// storage layer and ensuring that no sessions get created because the
// timeouts are constantly triggered.
func TestSQLInstanceDeadlines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
// block the fake storage
fakeStorage.SetBlockCh()
cleanUpFunc := func() {
fakeStorage.CloseBlockCh()
}
defer cleanUpFunc()

sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that we do not create a session
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err == nil
},
100*time.Millisecond, 10*time.Millisecond,
)
}

// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the
// create and extend session operations. This tests the case where the session is
// successfully created first and then blocks indefinitely.
func TestSQLInstanceDeadlinesExtend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

mt := timeutil.NewManualTime(timeutil.Unix(0, 42))
clock := hlc.NewClock(mt, time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that eventually session is created successfully
testutils.SucceedsSoon(
t,
func() error {
_, err := sqlInstance.Session(ctx)
return err
},
)

// verify that session is also extended successfully a few times
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err != nil
},
100*time.Millisecond, 10*time.Millisecond,
)

// register a callback for verification that this session expired
var sessionExpired atomic.Bool
s, _ := sqlInstance.Session(ctx)
s.RegisterCallbackForSessionExpiry(func(ctx context.Context) {
sessionExpired.Store(true)
})

// block the fake storage
fakeStorage.SetBlockCh()
cleanUpFunc := func() {
fakeStorage.CloseBlockCh()
}
defer cleanUpFunc()
// advance manual clock so that session expires
mt.Advance(slinstance.DefaultTTL.Get(&settings.SV))

// expect session to expire
require.Eventually(
t,
func() bool {
return sessionExpired.Load()
},
testutils.DefaultSucceedsSoonDuration, 10*time.Millisecond,
)
}
41 changes: 2 additions & 39 deletions pkg/sql/sqlliveness/slstorage/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type FakeStorage struct {
mu struct {
syncutil.Mutex
sessions map[sqlliveness.SessionID]hlc.Timestamp
blockCh chan struct{}
}
}

Expand All @@ -47,16 +46,8 @@ func (s *FakeStorage) IsAlive(

// Insert implements the sqlliveness.Storage interface.
func (s *FakeStorage) Insert(
ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
_ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
) error {
if ch := s.getBlockCh(); ch != nil {
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
}
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.mu.sessions[sid]; ok {
Expand All @@ -68,16 +59,8 @@ func (s *FakeStorage) Insert(

// Update implements the sqlliveness.Storage interface.
func (s *FakeStorage) Update(
ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
_ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
) (bool, error) {
if ch := s.getBlockCh(); ch != nil {
select {
case <-ch:
break
case <-ctx.Done():
return false, ctx.Err()
}
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.mu.sessions[sid]; !ok {
Expand All @@ -94,23 +77,3 @@ func (s *FakeStorage) Delete(_ context.Context, sid sqlliveness.SessionID) error
delete(s.mu.sessions, sid)
return nil
}

// SetBlockCh is used to block the storage for testing purposes
func (s *FakeStorage) SetBlockCh() {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.blockCh = make(chan struct{})
}

// CloseBlockCh is used to unblock the storage for testing purposes
func (s *FakeStorage) CloseBlockCh() {
s.mu.Lock()
defer s.mu.Unlock()
close(s.mu.blockCh)
}

func (s *FakeStorage) getBlockCh() chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.blockCh
}

0 comments on commit 6cdd1f1

Please sign in to comment.