diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 9a1595862aa2..d61fb34e3e2c 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -250,6 +250,7 @@ go_test( "replica_command_test.go", "replica_consistency_test.go", "replica_evaluate_test.go", + "replica_follower_read_test.go", "replica_gc_queue_test.go", "replica_init_test.go", "replica_learner_test.go", diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index a172103d3341..72e9183ef5f7 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1299,9 +1299,7 @@ func (r *Replica) checkExecutionCanProceed( st, shouldExtend, err = r.leaseGoodToGoRLocked(ctx, now, ba.WriteTimestamp()) if err != nil { // If not, can we serve this request on a follower? - // TODO(nvanbenschoten): once we make this check cheaper - // than leaseGoodToGoRLocked, invert these checks. - if !r.canServeFollowerReadRLocked(ctx, ba, err) { + if !r.canServeFollowerReadRLocked(ctx, ba) { return st, err } err = nil // ignore error diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index c776b8a7259d..191de425fc88 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" ) // FollowerReadsEnabled controls whether replicas attempt to serve follower @@ -61,14 +60,8 @@ func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { // non-locking, read-only requests can be served as follower reads. The batch // must be transactional and composed exclusively of this kind of request to be // accepted as a follower read. -func (r *Replica) canServeFollowerReadRLocked( - ctx context.Context, ba *roachpb.BatchRequest, err error, -) bool { - var lErr *roachpb.NotLeaseHolderError - eligible := errors.As(err, &lErr) && - BatchCanBeEvaluatedOnFollower(*ba) && - FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) - +func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.BatchRequest) bool { + eligible := BatchCanBeEvaluatedOnFollower(*ba) && FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) if !eligible { // We couldn't do anything with the error, propagate it. return false diff --git a/pkg/kv/kvserver/replica_follower_read_test.go b/pkg/kv/kvserver/replica_follower_read_test.go new file mode 100644 index 000000000000..83ff292abc11 --- /dev/null +++ b/pkg/kv/kvserver/replica_follower_read_test.go @@ -0,0 +1,177 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +func TestCanServeFollowerRead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // The clock needs to be higher than the closed-timestamp lag. Otherwise, + // trying to close timestamps below zero results in not closing anything. + manual := hlc.NewManualClock(5 * time.Second.Nanoseconds()) + clock := hlc.NewClock(manual.UnixNano, 1) + tsc := TestStoreConfig(clock) + const closedTimestampLag = time.Second + closedts.TargetDuration.Override(ctx, &tsc.Settings.SV, closedTimestampLag) + + nowNs := clock.Now().WallTime + tsBelowClosedTimestamp := hlc.Timestamp{ + WallTime: nowNs - closedTimestampLag.Nanoseconds() - clock.MaxOffset().Nanoseconds(), + Logical: 0, + } + + type test struct { + // The timestamp at which we'll read. Reading below the closed timestamp + // should result in canServeFollowerRead returning true; reading above the + // closed timestamp should result in a false. + readTimestamp hlc.Timestamp + expCanServeFollowerRead bool + } + for _, test := range []test{ + { + readTimestamp: tsBelowClosedTimestamp, + expCanServeFollowerRead: true, + }, + { + readTimestamp: clock.Now(), + expCanServeFollowerRead: false, + }, + } { + t.Run("", func(t *testing.T) { + tc := testContext{manualClock: manual} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, tsc) + + key := roachpb.Key("a") + + // Perform a write in order to close a timestamp. The range starts with no + // closed timestamp. + { + write := putArgs(key, []byte("foo")) + _, pErr := tc.SendWrapped(&write) + require.NoError(t, pErr.GoError()) + } + + gArgs := getArgs(key) + txn := roachpb.MakeTransaction( + "test", key, roachpb.NormalUserPriority, + test.readTimestamp, + clock.MaxOffset().Nanoseconds(), + ) + + ba := &roachpb.BatchRequest{} + ba.Header = roachpb.Header{Txn: &txn} + ba.Add(&gArgs) + r := tc.repl + r.mu.RLock() + defer r.mu.RUnlock() + require.Equal(t, test.expCanServeFollowerRead, r.canServeFollowerReadRLocked(ctx, ba)) + }) + } +} + +// Test that follower reads are permitted when the replica's lease is invalid. +func TestCheckExecutionCanProceedAllowsFollowerReadWithInvalidLease(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // The clock needs to be higher than the closed-timestamp lag. Otherwise, + // trying to close timestamps below zero results in not closing anything. + manual := hlc.NewManualClock(5 * time.Second.Nanoseconds()) + clock := hlc.NewClock(manual.UnixNano, 1) + tsc := TestStoreConfig(clock) + // Permit only one lease attempt. The test is flaky if we allow the lease to + // be renewed by background processes. + var leaseOnce sync.Once + tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *roachpb.Error { + admitted := false + leaseOnce.Do(func() { + admitted = true + }) + if admitted { + return nil + } + return roachpb.NewErrorf("boom") + } + const closedTimestampLag = time.Second + closedts.TargetDuration.Override(ctx, &tsc.Settings.SV, closedTimestampLag) + + nowNs := clock.Now().WallTime + tsBelowClosedTimestamp := hlc.Timestamp{ + WallTime: nowNs - closedTimestampLag.Nanoseconds() - clock.MaxOffset().Nanoseconds(), + Logical: 0, + } + + tc := testContext{manualClock: manual} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, tsc) + + key := roachpb.Key("a") + + // Perform a write in order to close a timestamp. The range starts with no + // closed timestamp. + { + write := putArgs(key, []byte("foo")) + _, pErr := tc.SendWrapped(&write) + require.NoError(t, pErr.GoError()) + } + + r := tc.repl + // Sanity check - lease should be valid. + ls := r.CurrentLeaseStatus(ctx) + require.True(t, ls.IsValid()) + + manual.Increment(100 * time.Second.Nanoseconds()) + // Sanity check - lease should now no longer be valid. + ls = r.CurrentLeaseStatus(ctx) + require.False(t, ls.IsValid()) + + gArgs := getArgs(key) + txn := roachpb.MakeTransaction( + "test", key, roachpb.NormalUserPriority, + tsBelowClosedTimestamp, + clock.MaxOffset().Nanoseconds(), + ) + + ba := &roachpb.BatchRequest{} + ba.Header = roachpb.Header{ + Txn: &txn, + Timestamp: txn.ReadTimestamp, + } + ba.Add(&gArgs) + + ls, err := r.checkExecutionCanProceed(ctx, ba, nil /* g */) + require.NoError(t, err) + require.Empty(t, ls) + + // Sanity check - lease should not have been renewed, so it's still invalid. + ls = r.CurrentLeaseStatus(ctx) + require.False(t, ls.IsValid()) +} diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 2401ac3bb6b9..068b4618b68f 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -426,7 +426,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // Then attempt to acquire the lease if not currently held by any // replica or redirect to the current leaseholder if currently held // by a different replica. - if pErr = r.handleInvalidLeaseError(ctx, ba, pErr, t); pErr != nil { + if pErr = r.handleInvalidLeaseError(ctx, ba); pErr != nil { return nil, pErr } case *roachpb.MergeInProgressError: @@ -576,28 +576,14 @@ func (r *Replica) handleIndeterminateCommitError( } func (r *Replica) handleInvalidLeaseError( - ctx context.Context, ba *roachpb.BatchRequest, _ *roachpb.Error, t *roachpb.InvalidLeaseError, + ctx context.Context, ba *roachpb.BatchRequest, ) *roachpb.Error { // On an invalid lease error, attempt to acquire a new lease. If in the // process of doing so, we determine that the lease now lives elsewhere, // redirect. _, pErr := r.redirectOnOrAcquireLeaseForRequest(ctx, ba.Timestamp) - if pErr == nil { - // Lease valid. Retry command. - return nil - } - // If we failed to acquire the lease, check to see whether the request can - // still be served as a follower read on this replica. Doing so here will - // not be necessary once we break the dependency between closed timestamps - // and leases and address the TODO in checkExecutionCanProceed to check the - // closed timestamp before consulting the lease. - - r.mu.RLock() - defer r.mu.RUnlock() - if r.canServeFollowerReadRLocked(ctx, ba, pErr.GoError()) { - // Follower read possible. Retry command. - return nil - } + // If we managed to get a lease (i.e. pErr == nil), the request evaluation + // will be retried. return pErr }