Skip to content

Commit

Permalink
Merge pull request #122377 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1-122202

release-24.1: sqlliveness: extend expiration on insertion retries
  • Loading branch information
rafiss authored Apr 19, 2024
2 parents 12af023 + 7bf9ccc commit 51fd317
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 5 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ go_test(
deps = [
":slinstance",
"//pkg/clusterversion",
"//pkg/kv/kvpb",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql/enum",
"//pkg/sql/sqlliveness",
Expand All @@ -41,6 +43,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
14 changes: 9 additions & 5 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,9 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) {
return nil, err
}

start := l.clock.Now()
exp := start.Add(l.ttl().Nanoseconds(), 0)
s := &session{
id: id,
start: start,
id: id,
}
s.mu.exp = exp

opts := retry.Options{
InitialBackoff: 10 * time.Millisecond,
Expand All @@ -219,6 +215,14 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) {
}
everySecond := log.Every(time.Second)
for i, r := 0, retry.StartWithCtx(ctx, opts); r.Next(); {
// If we fail to insert the session, then reset the start time
// and expiration, since otherwise there is a danger of inserting
// an expired session.
s.start = l.clock.Now()
// Note: Concurrent access is not possible at this point because
// the session has not been returned, so we have no need to acquire
// the lock.
s.mu.exp = s.start.Add(l.ttl().Nanoseconds(), 0)
i++
if err = l.storage.Insert(ctx, s.id, s.Expiration()); err != nil {
if ctx.Err() != nil {
Expand Down
37 changes: 37 additions & 0 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand All @@ -50,7 +53,41 @@ func TestSQLInstance(t *testing.T) {

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(ambientCtx, stopper, clock, fakeStorage, settings, nil, nil)
// Ensure that the first iteration always fails with an artificial error, this
// should lead to a retry. Which confirms that the retry logic works correctly.
var failureMu struct {
syncutil.Mutex
numRetries int
initialTimestamp hlc.Timestamp
nextTimestamp hlc.Timestamp
}
fakeStorage.SetInjectedFailure(func(sid sqlliveness.SessionID, expiration hlc.Timestamp) error {
failureMu.Lock()
defer failureMu.Unlock()
failureMu.numRetries++
if failureMu.numRetries == 1 {
failureMu.initialTimestamp = expiration
return kvpb.NewReplicaUnavailableError(errors.Newf("fake injected error"), &roachpb.RangeDescriptor{}, roachpb.ReplicaDescriptor{})
}
failureMu.nextTimestamp = expiration
return nil
})
sqlInstance.Start(ctx, nil)
// We expect two attempts to insert, since we inject a replica unavailable
// error on the first attempt.
testutils.SucceedsSoon(t, func() error {
failureMu.Lock()
defer failureMu.Unlock()
if failureMu.numRetries < 2 {
return errors.AssertionFailedf("unexpected number of retries on session insertion, "+
"expected at least 2, got %d", failureMu.numRetries)
}
if !failureMu.nextTimestamp.After(failureMu.initialTimestamp) {
return errors.AssertionFailedf("timestamp should move forward on each retry, "+
"got %s. Previous timestamp was: %s", failureMu.nextTimestamp, failureMu.initialTimestamp)
}
return nil
})

// Add one more instance to introduce concurrent access to storage.
dummy := slinstance.NewSQLInstance(ambientCtx, stopper, clock, fakeStorage, settings, nil, nil)
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/sqlliveness/slstorage/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type FakeStorage struct {
mu struct {
syncutil.Mutex
sessions map[sqlliveness.SessionID]hlc.Timestamp

// Used to inject errors into the storage layer.
insertError func(sid sqlliveness.SessionID, expiration hlc.Timestamp) error
}
}

Expand All @@ -35,6 +38,16 @@ func NewFakeStorage() *FakeStorage {
return fs
}

// SetInjectedFailure adds support for injecting failures for different
// operations.
func (s *FakeStorage) SetInjectedFailure(
insertError func(sid sqlliveness.SessionID, expiration hlc.Timestamp) error,
) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.insertError = insertError
}

// IsAlive implements the sqlliveness.Reader interface.
func (s *FakeStorage) IsAlive(
_ context.Context, sid sqlliveness.SessionID,
Expand All @@ -51,6 +64,12 @@ func (s *FakeStorage) Insert(
) error {
s.mu.Lock()
defer s.mu.Unlock()
// Support injecting errors during the initial creation of the session.
if s.mu.insertError != nil {
if err := s.mu.insertError(sid, expiration); err != nil {
return err
}
}
if _, ok := s.mu.sessions[sid]; ok {
return errors.Errorf("session %s already exists", sid)
}
Expand Down

0 comments on commit 51fd317

Please sign in to comment.