Skip to content

Commit

Permalink
kvserver: serve follower reads when lease is invalid
Browse files Browse the repository at this point in the history
Before this patch, requests were not eligible for follower reads when
the lease was invalid (or when the follower replica in question thought
that there's no valid lease because it's behind). The reasons for this
bizarre state of affairs stem historically from the intertwining of
leases and closed timestamps - the current leaseholder had to be known
in order to ask it for a closed timestamp. This entanglement has now
gone away, so we can serve follower reads regardless of the lease
status as nature intended.

Closes #57992

Release note: None
Release justification: needed for bounded staleness read availability
during partial failures.
  • Loading branch information
andreimatei committed Aug 30, 2021
1 parent 6d35693 commit dcf2273
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ go_test(
"replica_command_test.go",
"replica_consistency_test.go",
"replica_evaluate_test.go",
"replica_follower_read_test.go",
"replica_gc_queue_test.go",
"replica_init_test.go",
"replica_learner_test.go",
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,9 +1299,7 @@ func (r *Replica) checkExecutionCanProceed(
st, shouldExtend, err = r.leaseGoodToGoRLocked(ctx, now, ba.WriteTimestamp())
if err != nil {
// If not, can we serve this request on a follower?
// TODO(nvanbenschoten): once we make this check cheaper
// than leaseGoodToGoRLocked, invert these checks.
if !r.canServeFollowerReadRLocked(ctx, ba, err) {
if !r.canServeFollowerReadRLocked(ctx, ba) {
return st, err
}
err = nil // ignore error
Expand Down
11 changes: 2 additions & 9 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// FollowerReadsEnabled controls whether replicas attempt to serve follower
Expand Down Expand Up @@ -61,14 +60,8 @@ func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool {
// non-locking, read-only requests can be served as follower reads. The batch
// must be transactional and composed exclusively of this kind of request to be
// accepted as a follower read.
func (r *Replica) canServeFollowerReadRLocked(
ctx context.Context, ba *roachpb.BatchRequest, err error,
) bool {
var lErr *roachpb.NotLeaseHolderError
eligible := errors.As(err, &lErr) &&
BatchCanBeEvaluatedOnFollower(*ba) &&
FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV)

func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.BatchRequest) bool {
eligible := BatchCanBeEvaluatedOnFollower(*ba) && FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV)
if !eligible {
// We couldn't do anything with the error, propagate it.
return false
Expand Down
177 changes: 177 additions & 0 deletions pkg/kv/kvserver/replica_follower_read_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"context"
"sync"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

func TestCanServeFollowerRead(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// The clock needs to be higher than the closed-timestamp lag. Otherwise,
// trying to close timestamps below zero results in not closing anything.
manual := hlc.NewManualClock(5 * time.Second.Nanoseconds())
clock := hlc.NewClock(manual.UnixNano, 1)
tsc := TestStoreConfig(clock)
const closedTimestampLag = time.Second
closedts.TargetDuration.Override(ctx, &tsc.Settings.SV, closedTimestampLag)

nowNs := clock.Now().WallTime
tsBelowClosedTimestamp := hlc.Timestamp{
WallTime: nowNs - closedTimestampLag.Nanoseconds() - clock.MaxOffset().Nanoseconds(),
Logical: 0,
}

type test struct {
// The timestamp at which we'll read. Reading below the closed timestamp
// should result in canServeFollowerRead returning true; reading above the
// closed timestamp should result in a false.
readTimestamp hlc.Timestamp
expCanServeFollowerRead bool
}
for _, test := range []test{
{
readTimestamp: tsBelowClosedTimestamp,
expCanServeFollowerRead: true,
},
{
readTimestamp: clock.Now(),
expCanServeFollowerRead: false,
},
} {
t.Run("", func(t *testing.T) {
tc := testContext{manualClock: manual}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, tsc)

key := roachpb.Key("a")

// Perform a write in order to close a timestamp. The range starts with no
// closed timestamp.
{
write := putArgs(key, []byte("foo"))
_, pErr := tc.SendWrapped(&write)
require.NoError(t, pErr.GoError())
}

gArgs := getArgs(key)
txn := roachpb.MakeTransaction(
"test", key, roachpb.NormalUserPriority,
test.readTimestamp,
clock.MaxOffset().Nanoseconds(),
)

ba := &roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&gArgs)
r := tc.repl
r.mu.RLock()
defer r.mu.RUnlock()
require.Equal(t, test.expCanServeFollowerRead, r.canServeFollowerReadRLocked(ctx, ba))
})
}
}

// Test that follower reads are permitted when the replica's lease is invalid.
func TestCheckExecutionCanProceedAllowsFollowerReadWithInvalidLease(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// The clock needs to be higher than the closed-timestamp lag. Otherwise,
// trying to close timestamps below zero results in not closing anything.
manual := hlc.NewManualClock(5 * time.Second.Nanoseconds())
clock := hlc.NewClock(manual.UnixNano, 1)
tsc := TestStoreConfig(clock)
// Permit only one lease attempt. The test is flaky if we allow the lease to
// be renewed by background processes.
var leaseOnce sync.Once
tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *roachpb.Error {
admitted := false
leaseOnce.Do(func() {
admitted = true
})
if admitted {
return nil
}
return roachpb.NewErrorf("boom")
}
const closedTimestampLag = time.Second
closedts.TargetDuration.Override(ctx, &tsc.Settings.SV, closedTimestampLag)

nowNs := clock.Now().WallTime
tsBelowClosedTimestamp := hlc.Timestamp{
WallTime: nowNs - closedTimestampLag.Nanoseconds() - clock.MaxOffset().Nanoseconds(),
Logical: 0,
}

tc := testContext{manualClock: manual}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, tsc)

key := roachpb.Key("a")

// Perform a write in order to close a timestamp. The range starts with no
// closed timestamp.
{
write := putArgs(key, []byte("foo"))
_, pErr := tc.SendWrapped(&write)
require.NoError(t, pErr.GoError())
}

r := tc.repl
// Sanity check - lease should be valid.
ls := r.CurrentLeaseStatus(ctx)
require.True(t, ls.IsValid())

manual.Increment(100 * time.Second.Nanoseconds())
// Sanity check - lease should now no longer be valid.
ls = r.CurrentLeaseStatus(ctx)
require.False(t, ls.IsValid())

gArgs := getArgs(key)
txn := roachpb.MakeTransaction(
"test", key, roachpb.NormalUserPriority,
tsBelowClosedTimestamp,
clock.MaxOffset().Nanoseconds(),
)

ba := &roachpb.BatchRequest{}
ba.Header = roachpb.Header{
Txn: &txn,
Timestamp: txn.ReadTimestamp,
}
ba.Add(&gArgs)

ls, err := r.checkExecutionCanProceed(ctx, ba, nil /* g */)
require.NoError(t, err)
require.Empty(t, ls)

// Sanity check - lease should not have been renewed, so it's still invalid.
ls = r.CurrentLeaseStatus(ctx)
require.False(t, ls.IsValid())
}
22 changes: 4 additions & 18 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
// Then attempt to acquire the lease if not currently held by any
// replica or redirect to the current leaseholder if currently held
// by a different replica.
if pErr = r.handleInvalidLeaseError(ctx, ba, pErr, t); pErr != nil {
if pErr = r.handleInvalidLeaseError(ctx, ba); pErr != nil {
return nil, pErr
}
case *roachpb.MergeInProgressError:
Expand Down Expand Up @@ -576,28 +576,14 @@ func (r *Replica) handleIndeterminateCommitError(
}

func (r *Replica) handleInvalidLeaseError(
ctx context.Context, ba *roachpb.BatchRequest, _ *roachpb.Error, t *roachpb.InvalidLeaseError,
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
// On an invalid lease error, attempt to acquire a new lease. If in the
// process of doing so, we determine that the lease now lives elsewhere,
// redirect.
_, pErr := r.redirectOnOrAcquireLeaseForRequest(ctx, ba.Timestamp)
if pErr == nil {
// Lease valid. Retry command.
return nil
}
// If we failed to acquire the lease, check to see whether the request can
// still be served as a follower read on this replica. Doing so here will
// not be necessary once we break the dependency between closed timestamps
// and leases and address the TODO in checkExecutionCanProceed to check the
// closed timestamp before consulting the lease.

r.mu.RLock()
defer r.mu.RUnlock()
if r.canServeFollowerReadRLocked(ctx, ba, pErr.GoError()) {
// Follower read possible. Retry command.
return nil
}
// If we managed to get a lease (i.e. pErr == nil), the request evaluation
// will be retried.
return pErr
}

Expand Down

0 comments on commit dcf2273

Please sign in to comment.