From 6ef8c1680d497cb6ac5a24ecf89ebb1ac967348d Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 29 Jan 2021 00:03:18 -0500 Subject: [PATCH] kv: route present-time reads to global_read follower replicas First commit from #59505. This commit updates the kv client routing logic to account for the new `LEAD_FOR_GLOBAL_READS` `RangeClosedTimestampPolicy` added in #59505. In enterprise builds, non-stale read-only requests to ranges with this closed timestamp policy can now be routed to follower replicas, just like stale read-only requests to normal ranges currently are. In addition to this main change, there are a few smaller changes in this PR that were hard to split out, so are included in this commit. First, non-transactional requests are no longer served by followers even if the follower replica detects that the request can be. Previously, non-transactional requests would never be routed intentionally to a follower replica, but `Replica.canServeFollowerRead` would allow them through if their timestamp was low enough. This change was made in order to keep the client and server logic in-sync and because this does not seem safe for non-transactional requests that get their timestamp assigned on the server. We're planning to remove non-transactional requests soon anyway (#58459), so this isn't a big deal. It mostly just caused some testing fallout. Second, transactional requests that had previously written intents are now allowed to have their read-only requests served by followers, as long as those followers have a closed timestamp above the transaction's read *and* write timestamp. Previously, we had avoided this because it seemed to cause issues with read-your-writes. However, it appears safe as long as the write timestamp is below the closed timestamp, because we know all of the transaction's intents are at or below its write timestamp. This is very important for multi-region work, where we want a transaction to be able to write to a REGIONAL table and then later perform local reads (maybe foreign key checks) on GLOBAL tables. Third, a max clock offset shift in `canUseFollowerRead` was removed. It wasn't exactly clear what this was doing. It was introduced in the original 70be833 and seemed to allow a follower read in cases where they otherwise shouldn't be expected to succeed. I thought at first that it was accounting for the fact that the kv client's clock might be leading the kv server's clock and so it was being pessimistic about the expected closed timestamp, but it actually seems to be shifting the other way, so I don't know. I might be missing something. Finally, the concept of a `replicaoracle.OracleFactoryFunc` was removed and the existing `replicaoracle.OracleFactory` takes its place. We no longer need the double-factory approach because the transaction object is now passed directly to `Oracle.ChoosePreferredReplica`. This was a necessary change, because the process of determining whether a follower read can be served now requires transaction information and range information (i.e. the closed timestamp policy), so we need to make it in the Oracle itself instead of in the OracleFactory. This all seems like a simplification anyway. This is still waiting on changes to the closed timestamp system to be able to write end-to-end tests using this new functionality. --- pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel | 7 +- .../kvccl/kvfollowerreadsccl/followerreads.go | 139 +++--- .../kvfollowerreadsccl/followerreads_test.go | 435 ++++++++++++++---- pkg/kv/kvclient/kvcoord/dist_sender.go | 8 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 10 +- pkg/kv/kvclient/kvcoord/range_iter.go | 13 + pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 7 + pkg/kv/kvserver/closed_timestamp_test.go | 110 +++-- pkg/kv/kvserver/replica_follower_read.go | 2 +- pkg/kv/kvserver/replica_learner_test.go | 5 +- pkg/kv/mock_transactional_sender.go | 5 + pkg/kv/sender.go | 4 + pkg/kv/txn.go | 8 + pkg/roachpb/data.go | 36 +- pkg/sql/physicalplan/replicaoracle/oracle.go | 84 ++-- .../physicalplan/replicaoracle/oracle_test.go | 26 +- pkg/sql/physicalplan/span_resolver.go | 17 +- 17 files changed, 658 insertions(+), 258 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel index de96531e8339..9820e372612e 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl", visibility = ["//visibility:public"], deps = [ - "//pkg/base", "//pkg/ccl/utilccl", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", @@ -19,7 +18,6 @@ go_library( "//pkg/sql/physicalplan/replicaoracle", "//pkg/sql/sem/builtins", "//pkg/util/hlc", - "//pkg/util/timeutil", "//pkg/util/uuid", ], ) @@ -39,6 +37,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/rpc", "//pkg/security", @@ -47,21 +46,21 @@ go_test( "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/physicalplan/replicaoracle", - "//pkg/storage/enginepb", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", - "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go index 726a337c6487..55d58025d7f5 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go @@ -11,10 +11,10 @@ package kvfollowerreadsccl import ( + "context" "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -49,11 +48,11 @@ var followerReadMultiple = settings.RegisterFloatSetting( }, ) -// getFollowerReadOffset returns the offset duration which should be used to as -// the offset from now to request a follower read. The same value less the clock -// uncertainty, then is used to determine at the kv layer if a query can use a -// follower read. -func getFollowerReadDuration(st *cluster.Settings) time.Duration { +// getFollowerReadLag returns the (negative) offset duration from hlc.Now() +// which should be used to request a follower read. The same value is used to +// determine at the kv layer if a query can use a follower read for ranges with +// the default LAG_BY_CLUSTER_SETTING closed timestamp policy. +func getFollowerReadLag(st *cluster.Settings) time.Duration { targetMultiple := followerReadMultiple.Get(&st.SV) targetDuration := closedts.TargetDuration.Get(&st.SV) closeFraction := closedts.CloseFraction.Get(&st.SV) @@ -61,91 +60,119 @@ func getFollowerReadDuration(st *cluster.Settings) time.Duration { (1+closeFraction*targetMultiple)) } +// getGlobalReadsLead returns the (positive) offset duration from hlc.Now() +// which clients can expect followers of a range with the LEAD_FOR_GLOBAL_READS +// closed timestamp policy to have closed off. This offset is equal to the +// maximum clock offset, allowing present-time (i.e. those not pushed into the +// future) transactions to serve reads from followers. +func getGlobalReadsLead(clock *hlc.Clock) time.Duration { + return clock.MaxOffset() +} + func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error { org := sql.ClusterOrganization.Get(&st.SV) return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads") } +func checkFollowerReadsEnabled(clusterID uuid.UUID, st *cluster.Settings) bool { + if !kvserver.FollowerReadsEnabled.Get(&st.SV) { + return false + } + return checkEnterpriseEnabled(clusterID, st) == nil +} + func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) { if err := checkEnterpriseEnabled(clusterID, st); err != nil { return 0, err } - return getFollowerReadDuration(st), nil + return getFollowerReadLag(st), nil } // batchCanBeEvaluatedOnFollower determines if a batch consists exclusively of // requests that can be evaluated on a follower replica. func batchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { - return !ba.IsLocking() && ba.IsAllTransactional() -} - -// txnCanPerformFollowerRead determines if the provided transaction can perform -// follower reads. -func txnCanPerformFollowerRead(txn *roachpb.Transaction) bool { - // If the request is transactional and that transaction has acquired any - // locks then that request should not perform follower reads. Doing so could - // allow the request to miss its own writes or observe state that conflicts - // with its locks. - return txn != nil && !txn.IsLocking() + return ba.Txn != nil && !ba.IsLocking() && ba.IsAllTransactional() } -// canUseFollowerRead determines if a query can be sent to a follower. -func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timestamp) bool { - if !kvserver.FollowerReadsEnabled.Get(&st.SV) { - return false - } - threshold := (-1 * getFollowerReadDuration(st)) - 1*base.DefaultMaxClockOffset - if timeutil.Since(ts.GoTime()) < threshold { - return false +// closedTimestampLikelySufficient determines if a request at a given timestamp +// is likely to be below a follower's closed timestamp and servicable as a +// follower read were the request to be sent to a follower replica. +func closedTimestampLikelySufficient( + st *cluster.Settings, + clock *hlc.Clock, + ctPolicy roachpb.RangeClosedTimestampPolicy, + maxObservableTS hlc.Timestamp, +) bool { + var offset time.Duration + switch ctPolicy { + case roachpb.LAG_BY_CLUSTER_SETTING: + offset = getFollowerReadLag(st) + case roachpb.LEAD_FOR_GLOBAL_READS: + offset = getGlobalReadsLead(clock) + default: + panic("unknown RangeClosedTimestampPolicy") } - return checkEnterpriseEnabled(clusterID, st) == nil + expectedClosedTS := clock.Now().Add(offset.Nanoseconds(), 0) + return maxObservableTS.LessEq(expectedClosedTS) } // canSendToFollower implements the logic for checking whether a batch request // may be sent to a follower. -// TODO(aayush): We should try to bind clusterID to the function here, rather -// than having callers plumb it in every time. -func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool { - return batchCanBeEvaluatedOnFollower(ba) && - txnCanPerformFollowerRead(ba.Txn) && - canUseFollowerRead(clusterID, st, forward(ba.Txn.ReadTimestamp, ba.Txn.GlobalUncertaintyLimit)) -} - -func forward(ts hlc.Timestamp, to hlc.Timestamp) hlc.Timestamp { - ts.Forward(to) - return ts +func canSendToFollower( + clusterID uuid.UUID, + st *cluster.Settings, + clock *hlc.Clock, + ctPolicy roachpb.RangeClosedTimestampPolicy, + ba roachpb.BatchRequest, +) bool { + return checkFollowerReadsEnabled(clusterID, st) && + batchCanBeEvaluatedOnFollower(ba) && + closedTimestampLikelySufficient(st, clock, ctPolicy, ba.Txn.MaxObservableTimestamp()) } -type oracleFactory struct { - clusterID *base.ClusterIDContainer - st *cluster.Settings +type followerReadOracle struct { + st *cluster.Settings + clock *hlc.Clock - binPacking replicaoracle.OracleFactory - closest replicaoracle.OracleFactory + closest replicaoracle.Oracle + binPacking replicaoracle.Oracle } -func newOracleFactory(cfg replicaoracle.Config) replicaoracle.OracleFactory { - return &oracleFactory{ - clusterID: &cfg.RPCContext.ClusterID, +func newFollowerReadOracle(cfg replicaoracle.Config) replicaoracle.Oracle { + if !checkFollowerReadsEnabled(cfg.RPCContext.ClusterID.Get(), cfg.Settings) { + return replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg) + } + return &followerReadOracle{ st: cfg.Settings, - binPacking: replicaoracle.NewOracleFactory(replicaoracle.BinPackingChoice, cfg), - closest: replicaoracle.NewOracleFactory(replicaoracle.ClosestChoice, cfg), + clock: cfg.RPCContext.Clock, + closest: replicaoracle.NewOracle(replicaoracle.ClosestChoice, cfg), + binPacking: replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg), } } -func (f oracleFactory) Oracle(txn *kv.Txn) replicaoracle.Oracle { - if txn != nil && canUseFollowerRead(f.clusterID.Get(), f.st, txn.ReadTimestamp()) { - return f.closest.Oracle(txn) +func (o *followerReadOracle) ChoosePreferredReplica( + ctx context.Context, + txn *kv.Txn, + desc *roachpb.RangeDescriptor, + leaseholder *roachpb.ReplicaDescriptor, + ctPolicy roachpb.RangeClosedTimestampPolicy, + queryState replicaoracle.QueryState, +) (roachpb.ReplicaDescriptor, error) { + var oracle replicaoracle.Oracle + if txn != nil && closedTimestampLikelySufficient(o.st, o.clock, ctPolicy, txn.MaxObservableTimestamp()) { + oracle = o.closest + } else { + oracle = o.binPacking } - return f.binPacking.Oracle(txn) + return oracle.ChoosePreferredReplica(ctx, txn, desc, leaseholder, ctPolicy, queryState) } -// followerReadAwareChoice is a leaseholder choosing policy that detects +// followerReadOraclePolicy is a leaseholder choosing policy that detects // whether a query can be used with a follower read. -var followerReadAwareChoice = replicaoracle.RegisterPolicy(newOracleFactory) +var followerReadOraclePolicy = replicaoracle.RegisterPolicy(newFollowerReadOracle) func init() { - sql.ReplicaOraclePolicy = followerReadAwareChoice + sql.ReplicaOraclePolicy = followerReadOraclePolicy builtins.EvalFollowerReadOffset = evalFollowerReadOffset kvcoord.CanSendToFollower = canSendToFollower } diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 4d1665ea1393..c8a383e082c9 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -10,7 +10,6 @@ package kvfollowerreadsccl import ( "context" - "reflect" "testing" "time" @@ -20,24 +19,25 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -69,72 +69,171 @@ func TestEvalFollowerReadOffset(t *testing.T) { func TestCanSendToFollower(t *testing.T) { defer leaktest.AfterTest(t)() - disableEnterprise := utilccl.TestingEnableEnterprise() - defer disableEnterprise() - st := cluster.MakeTestingClusterSettings() - kvserver.FollowerReadsEnabled.Override(&st.SV, true) + clock := hlc.NewClock(hlc.UnixNano, base.DefaultMaxClockOffset) + stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0) + current := clock.Now() + future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0) - old := hlc.Timestamp{ - WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(), - } - oldHeader := roachpb.Header{Txn: &roachpb.Transaction{ - ReadTimestamp: old, - }} - rw := roachpb.BatchRequest{Header: oldHeader} - rw.Add(&roachpb.PutRequest{}) - if canSendToFollower(uuid.MakeV4(), st, rw) { - t.Fatalf("should not be able to send a rw request to a follower") - } - roNonTxn := roachpb.BatchRequest{Header: oldHeader} - roNonTxn.Add(&roachpb.QueryTxnRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNonTxn) { - t.Fatalf("should not be able to send a non-transactional ro request to a follower") + txn := func(ts hlc.Timestamp) *roachpb.Transaction { + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) + return &txn } - roNoTxn := roachpb.BatchRequest{} - roNoTxn.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNoTxn) { - t.Fatalf("should not be able to send a batch with no txn to a follower") + withWriteTimestamp := func(txn *roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction { + txn.WriteTimestamp = ts + return txn } - roOld := roachpb.BatchRequest{Header: oldHeader} - roOld.Add(&roachpb.GetRequest{}) - if !canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should be able to send an old ro batch to a follower") + withUncertaintyLimit := func(txn *roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction { + txn.MaxTimestamp = ts + return txn } - roRWTxnOld := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{Key: []byte("key")}, - ReadTimestamp: old, - }, - }} - roRWTxnOld.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roRWTxnOld) { - t.Fatalf("should not be able to send a ro request from a rw txn to a follower") + batch := func(txn *roachpb.Transaction, req roachpb.Request) roachpb.BatchRequest { + var ba roachpb.BatchRequest + ba.Txn = txn + ba.Add(req) + return ba } - kvserver.FollowerReadsEnabled.Override(&st.SV, false) - if canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled") - } - kvserver.FollowerReadsEnabled.Override(&st.SV, true) - roNew := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - ReadTimestamp: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, - }, - }} - if canSendToFollower(uuid.MakeV4(), st, roNew) { - t.Fatalf("should not be able to send a new ro batch to a follower") - } - roOldWithNewMax := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - GlobalUncertaintyLimit: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, - }, - }} - roOldWithNewMax.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNew) { - t.Fatalf("should not be able to send a ro batch with new GlobalUncertaintyLimit to a follower") + + testCases := []struct { + name string + ba roachpb.BatchRequest + ctPolicy roachpb.RangeClosedTimestampPolicy + disabledEnterprise bool + disabledFollowerReads bool + exp bool + }{ + { + name: "non-txn batch", + ba: batch(nil, &roachpb.GetRequest{}), + exp: false, + }, + { + name: "stale read", + ba: batch(txn(stale), &roachpb.GetRequest{}), + exp: true, + }, + { + name: "stale locking read", + ba: batch(txn(stale), &roachpb.ScanRequest{KeyLocking: lock.Exclusive}), + exp: false, + }, + { + name: "stale write", + ba: batch(txn(stale), &roachpb.PutRequest{}), + exp: false, + }, + { + name: "stale non-txn request", + ba: batch(txn(stale), &roachpb.QueryTxnRequest{}), + exp: false, + }, + { + name: "stale read with current-time writes", + ba: batch(withWriteTimestamp(txn(stale), current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "stale read with current-time uncertainty limit", + ba: batch(withUncertaintyLimit(txn(stale), current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "current-time read", + ba: batch(txn(current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "future read", + ba: batch(txn(future), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "non-txn batch, global reads policy", + ba: batch(nil, &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale read, global reads policy", + ba: batch(txn(stale), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "stale locking read, global reads policy", + ba: batch(txn(stale), &roachpb.ScanRequest{KeyLocking: lock.Exclusive}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale write, global reads policy", + ba: batch(txn(stale), &roachpb.PutRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale non-txn request, global reads policy", + ba: batch(txn(stale), &roachpb.QueryTxnRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale read with current-time writes, global reads policy", + ba: batch(withWriteTimestamp(txn(stale), current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "stale read with current-time uncertainty limit, global reads policy", + ba: batch(withUncertaintyLimit(txn(stale), current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "current-time read, global reads policy", + ba: batch(txn(current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "current-time read with future writes, global reads policy", + ba: batch(withWriteTimestamp(txn(current), future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "current-time read with future uncertainty limit, global reads policy", + ba: batch(withUncertaintyLimit(txn(current), future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "future read, global reads policy", + ba: batch(txn(future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "non-enterprise", + disabledEnterprise: true, + exp: false, + }, + { + name: "follower reads disabled", + disabledFollowerReads: true, + exp: false, + }, } - disableEnterprise() - if canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should not be able to send an old ro batch to a follower without enterprise enabled") + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + if !c.disabledEnterprise { + defer utilccl.TestingEnableEnterprise()() + } + st := cluster.MakeTestingClusterSettings() + kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads) + + can := canSendToFollower(uuid.MakeV4(), st, clock, c.ctPolicy, c.ba) + require.Equal(t, c.exp, can) + }) } } @@ -149,43 +248,193 @@ func TestFollowerReadMultipleValidation(t *testing.T) { followerReadMultiple.Override(&st.SV, .1) } +// mockNodeStore implements the kvcoord.NodeDescStore interface. +type mockNodeStore []roachpb.NodeDescriptor + +func (s mockNodeStore) GetNodeDescriptor(id roachpb.NodeID) (*roachpb.NodeDescriptor, error) { + for i := range s { + desc := &s[i] + if desc.NodeID == id { + return desc, nil + } + } + return nil, errors.Errorf("unable to look up descriptor for n%d", id) +} + // TestOracle tests the OracleFactory exposed by this package. -// This test ends up being rather indirect but works by checking if the type -// of the oracle returned from the factory differs between requests we'd -// expect to support follower reads and that which we'd expect not to. func TestOracleFactory(t *testing.T) { defer leaktest.AfterTest(t)() - disableEnterprise := utilccl.TestingEnableEnterprise() - defer disableEnterprise() - st := cluster.MakeTestingClusterSettings() - kvserver.FollowerReadsEnabled.Override(&st.SV, true) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + + ctx := context.Background() stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) + defer stopper.Stop(ctx) + clock := hlc.NewClock(hlc.UnixNano, base.DefaultMaxClockOffset) + stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0) + current := clock.Now() + future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0) + + c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper) + staleTxn := kv.NewTxn(ctx, c, 0) + staleTxn.SetFixedTimestamp(ctx, stale) + currentTxn := kv.NewTxn(ctx, c, 0) + currentTxn.SetFixedTimestamp(ctx, current) + futureTxn := kv.NewTxn(ctx, c, 0) + futureTxn.SetFixedTimestamp(ctx, future) + + nodes := mockNodeStore{ + {NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")}, + {NodeID: 2, Address: util.MakeUnresolvedAddr("tcp", "2")}, + {NodeID: 3, Address: util.MakeUnresolvedAddr("tcp", "3")}, + } + replicas := []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + } + desc := &roachpb.RangeDescriptor{ + InternalReplicas: replicas, + } + closestFollower := replicas[1] + leaseholder := replicas[2] + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - c := kv.NewDB(log.AmbientContext{ - Tracer: tracing.NewTracer(), - }, kv.MockTxnSenderFactory{}, hlc.NewClock(hlc.UnixNano, time.Nanosecond), stopper) - txn := kv.NewTxn(context.Background(), c, 0) - of := replicaoracle.NewOracleFactory(followerReadAwareChoice, replicaoracle.Config{ - Settings: st, - RPCContext: rpcContext, - }) - noFollowerReadOracle := of.Oracle(txn) - old := hlc.Timestamp{ - WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(), + setLatency := func(addr string, latency time.Duration) { + // All test cases have to have at least 11 measurement values in order for + // the exponentially-weighted moving average to work properly. See the + // comment on the WARMUP_SAMPLES const in the ewma package for details. + for i := 0; i < 11; i++ { + rpcContext.RemoteClocks.UpdateOffset(ctx, addr, rpc.RemoteOffset{}, latency) + } } - txn.SetFixedTimestamp(context.Background(), old) - followerReadOracle := of.Oracle(txn) - if reflect.TypeOf(followerReadOracle) == reflect.TypeOf(noFollowerReadOracle) { - t.Fatalf("expected types of %T and %T to differ", followerReadOracle, - noFollowerReadOracle) + setLatency("1", 100*time.Millisecond) + setLatency("2", 2*time.Millisecond) + setLatency("3", 80*time.Millisecond) + + testCases := []struct { + name string + txn *kv.Txn + lh *roachpb.ReplicaDescriptor + ctPolicy roachpb.RangeClosedTimestampPolicy + disabledEnterprise bool + disabledFollowerReads bool + exp roachpb.ReplicaDescriptor + }{ + { + name: "non-txn, known leaseholder", + txn: nil, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "non-txn, unknown leaseholder", + txn: nil, + exp: closestFollower, + }, + { + name: "stale txn, known leaseholder", + txn: staleTxn, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "stale txn, unknown leaseholder", + txn: staleTxn, + exp: closestFollower, + }, + { + name: "current txn, known leaseholder", + txn: currentTxn, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "current txn, unknown leaseholder", + txn: currentTxn, + exp: closestFollower, + }, + { + name: "future txn, known leaseholder", + txn: futureTxn, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "future txn, unknown leaseholder", + txn: futureTxn, + exp: closestFollower, + }, + { + name: "stale txn, known leaseholder, global reads policy", + txn: staleTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "stale txn, unknown leaseholder, global reads policy", + txn: staleTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "current txn, known leaseholder, global reads policy", + txn: currentTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "current txn, unknown leaseholder, global reads policy", + txn: currentTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "future txn, known leaseholder, global reads policy", + txn: futureTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "future txn, unknown leaseholder, global reads policy", + txn: futureTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "stale txn, non-enterprise", + txn: staleTxn, + lh: &leaseholder, + disabledEnterprise: true, + exp: leaseholder, + }, + { + name: "stale txn, follower reads disabled", + txn: staleTxn, + lh: &leaseholder, + disabledFollowerReads: true, + exp: leaseholder, + }, } - disableEnterprise() - disabledFollowerReadOracle := of.Oracle(txn) - if reflect.TypeOf(disabledFollowerReadOracle) != reflect.TypeOf(noFollowerReadOracle) { - t.Fatalf("expected types of %T and %T not to differ", disabledFollowerReadOracle, - noFollowerReadOracle) + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + if !c.disabledEnterprise { + defer utilccl.TestingEnableEnterprise()() + } + st := cluster.MakeTestingClusterSettings() + kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads) + + o := replicaoracle.NewOracle(followerReadOraclePolicy, replicaoracle.Config{ + NodeDescs: nodes, + Settings: st, + RPCContext: rpcContext, + }) + + res, err := o.ChoosePreferredReplica(ctx, c.txn, desc, c.lh, c.ctPolicy, replicaoracle.QueryState{}) + require.NoError(t, err) + require.Equal(t, c.exp, res) + }) } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 5cfb8643af81..369af9ee5b58 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -128,7 +128,11 @@ var ( // followerreadsccl code to inject logic to check if follower reads are enabled. // By default, without CCL code, this function returns false. var CanSendToFollower = func( - clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest, + _ uuid.UUID, + _ *cluster.Settings, + _ *hlc.Clock, + _ roachpb.RangeClosedTimestampPolicy, + _ roachpb.BatchRequest, ) bool { return false } @@ -1771,7 +1775,7 @@ func (ds *DistSender) sendToReplicas( desc := routing.Desc() ba.RangeID = desc.RangeID leaseholder := routing.Leaseholder() - canFollowerRead := (ds.clusterID != nil) && CanSendToFollower(ds.clusterID.Get(), ds.st, ba) + canFollowerRead := CanSendToFollower(ds.clusterID.Get(), ds.st, ds.clock, routing.ClosedTimestampPolicy(), ba) var replicas ReplicaSlice var err error if canFollowerRead { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index fe63f3b2c5ec..052d163b7ef4 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -3432,8 +3432,6 @@ func TestErrorIndexAlignment(t *testing.T) { // TestCanSendToFollower tests that the DistSender abides by the result it // get from CanSendToFollower. -// TODO(nvanbenschoten): update this test once ClosedTimestampPolicy begins -// dictating the decision of CanSendToFollower. func TestCanSendToFollower(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -3444,7 +3442,13 @@ func TestCanSendToFollower(t *testing.T) { old := CanSendToFollower defer func() { CanSendToFollower = old }() canSend := true - CanSendToFollower = func(_ uuid.UUID, _ *cluster.Settings, ba roachpb.BatchRequest) bool { + CanSendToFollower = func( + _ uuid.UUID, + _ *cluster.Settings, + _ *hlc.Clock, + _ roachpb.RangeClosedTimestampPolicy, + ba roachpb.BatchRequest, + ) bool { return !ba.IsLocking() && canSend } diff --git a/pkg/kv/kvclient/kvcoord/range_iter.go b/pkg/kv/kvclient/kvcoord/range_iter.go index b1269974b895..c34a28c2b354 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter.go +++ b/pkg/kv/kvclient/kvcoord/range_iter.go @@ -86,6 +86,19 @@ func (ri *RangeIterator) Leaseholder() *roachpb.ReplicaDescriptor { return ri.token.Leaseholder() } +// ClosedTimestampPolicy returns the closed timestamp policy of the range at +// which the iterator is currently positioned. The iterator must be valid. +// +// The policy information comes from a cache, and so it can be stale. Returns +// the default policy of LAG_BY_CLUSTER_SETTING if no policy information is +// known. +func (ri *RangeIterator) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy { + if !ri.Valid() { + panic(ri.Error()) + } + return ri.token.ClosedTimestampPolicy() +} + // Token returns the eviction token corresponding to the range // descriptor for the current iteration. The iterator must be valid. func (ri *RangeIterator) Token() rangecache.EvictionToken { diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 5a028e0617d2..977cd82a1e89 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -931,6 +931,13 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam tc.mu.txn.MinTimestamp.Backward(ts) } +// MaxObservableTimestamp is part of the client.TxnSender interface. +func (tc *TxnCoordSender) MaxObservableTimestamp() hlc.Timestamp { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.mu.txn.MaxObservableTimestamp() +} + // ManualRestart is part of the client.TxnSender interface. func (tc *TxnCoordSender) ManualRestart( ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index a312b88fe99f..c2b76a0ab040 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -90,7 +90,7 @@ func TestClosedTimestampCanServe(t *testing.T) { repls := replsForRange(ctx, t, tc, desc, numNodes) ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -161,7 +161,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) { reqTS := tc.Server(0).Clock().Now() // Sleep for a sufficiently long time so that reqTS can be closed. time.Sleep(3 * testingTargetDuration) - baRead := makeReadBatchRequestForDesc(desc, reqTS) + baRead := makeTxnReadBatchForDesc(desc, reqTS) repls := replsForRange(ctx, t, tc, desc, numNodes) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) @@ -190,7 +190,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { t.Fatal(err) } ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -222,7 +222,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { // Attempt to send read requests to a replica in a tight loop until deadline // is reached. If an error is seen on any replica then it is returned to the // errgroup. - baRead = makeReadBatchRequestForDesc(desc, ts) + baRead = makeTxnReadBatchForDesc(desc, ts) ensureCanReadFromReplicaUntilDeadline := func(r *kvserver.Replica) { g.Go(func() error { for timeutil.Now().Before(deadline) { @@ -285,14 +285,7 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) { respCh := make(chan struct{}, len(keys)) for i, key := range keys { go func(repl *kvserver.Replica, key roachpb.Key) { - var baRead roachpb.BatchRequest - r := &roachpb.ScanRequest{} - r.Key = key - r.EndKey = key.Next() - baRead.Add(r) - baRead.Timestamp = ts - baRead.RangeID = desc.RangeID - + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { // Expect 0 rows, because the intents will be aborted. _, err := expectRows(0)(repl.Send(ctx, baRead)) @@ -356,7 +349,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { } // Start by ensuring that the values can be read from all replicas at ts. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(2)) }) @@ -382,10 +375,10 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { // Now immediately query both the ranges and there's 1 value per range. // We need to tolerate RangeNotFound as the split range may not have been // created yet. - baReadL := makeReadBatchRequestForDesc(lr, ts) + baReadL := makeTxnReadBatchForDesc(lr, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadL, lRepls, respFuncs(retryOnRangeNotFound, expectRows(1)))) - baReadR := makeReadBatchRequestForDesc(rr, ts) + baReadR := makeTxnReadBatchForDesc(rr, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadR, rRepls, respFuncs(retryOnRangeNotFound, expectRows(1)))) @@ -397,7 +390,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { // The hazard here is that a follower is not yet aware of the merge and will // return an error. We'll accept that because a client wouldn't see that error // from distsender. - baReadMerged := makeReadBatchRequestForDesc(merged, ts) + baReadMerged := makeTxnReadBatchForDesc(merged, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadMerged, mergedRepls, respFuncs(retryOnRangeKeyMismatch, expectRows(2)))) } @@ -429,12 +422,12 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { } // Grab a timestamp before initiating a lease transfer, transfer the lease, - // then ensure that reads at that timestamp can occur from all the replicas. + // then ensure that reads at that timestamp can occur from all the replicas. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} lh := getCurrentLeaseholder(t, tc, desc) target := pickRandomTarget(tc, lh, desc) require.Nil(t, tc.TransferRangeLease(desc, target)) - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -448,7 +441,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { verifyNotLeaseHolderErrors(t, baRead, repls, 2) } -func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { +func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -469,17 +462,30 @@ func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { // Verify that we can serve a follower read at a timestamp. Wait if necessary. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) - // Create a read-only batch and attach a read-write transaction. - rwTxn := roachpb.MakeTransaction("test", []byte("key"), roachpb.NormalUserPriority, ts, 0) - baRead.Txn = &rwTxn + // Update the batch to simulate a transaction that has written an intent. + baRead.Txn.Key = []byte("key") + baRead.Txn.Sequence++ - // Send the request to all three replicas. One should succeed and - // the other two should return NotLeaseHolderErrors. + // The write timestamp of the transaction is still closed, so a read-only + // request by the transaction should be servicable by followers. This is + // because the writing transaction can still read its writes on the + // followers. + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) + }) + + // Update the batch to simulate a transaction that has written past its read + // timestamp and past the expected closed timestamp. This should prevent its + // reads from being served by followers. + baRead.Txn.WriteTimestamp = baRead.Txn.WriteTimestamp.Add(time.Hour.Nanoseconds(), 0) + + // Send the request to all three replicas. One should succeed and the other + // two should return NotLeaseHolderErrors. verifyNotLeaseHolderErrors(t, baRead, repls, 2) } @@ -502,9 +508,10 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { t.Fatal(err) } - // Verify that we can serve a follower read at a timestamp. Wait if necessary + // Verify that we can serve a follower read at a timestamp. Wait if + // necessary. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -519,11 +526,43 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { baQueryTxn.Add(r) baQueryTxn.Timestamp = ts - // Send the request to all three replicas. One should succeed and - // the other two should return NotLeaseHolderErrors. + // Send the request to all three replicas. One should succeed and the other + // two should return NotLeaseHolderErrors. verifyNotLeaseHolderErrors(t, baQueryTxn, repls, 2) } +func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Limiting how long transactions can run does not work + // well with race unless we're extremely lenient, which + // drives up the test duration. + skip.UnderRace(t) + + ctx := context.Background() + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + defer tc.Stopper().Stop(ctx) + repls := replsForRange(ctx, t, tc, desc, numNodes) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + + // Verify that we can serve a follower read at a timestamp with a + // transactional batch. Wait if necessary. + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + baRead := makeTxnReadBatchForDesc(desc, ts) + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) + }) + + // Remove the transaction and send the request to all three replicas. One + // should succeed and the other two should return NotLeaseHolderErrors. + baRead.Txn = nil + verifyNotLeaseHolderErrors(t, baRead, repls, 2) +} + // TestClosedTimestampInactiveAfterSubsumption verifies that, during a merge, // replicas of the subsumed range (RHS) cannot serve follower reads for // timestamps after the subsumption time. @@ -694,7 +733,7 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { case <-time.After(30 * time.Second): t.Fatal("failed to receive next closed timestamp update") } - baReadAfterLeaseTransfer := makeReadBatchRequestForDesc(rightDesc, inactiveClosedTSBoundary.Next()) + baReadAfterLeaseTransfer := makeTxnReadBatchForDesc(rightDesc, inactiveClosedTSBoundary.Next()) rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) @@ -755,7 +794,7 @@ func forceLeaseTransferOnSubsumedRange( // that the current rightLeaseholder has stopped heartbeating. This will prompt // it to acquire the range lease for itself. g.Go(func() error { - leaseAcquisitionRequest := makeReadBatchRequestForDesc(rightDesc, freezeStartTimestamp) + leaseAcquisitionRequest := makeTxnReadBatchForDesc(rightDesc, freezeStartTimestamp) log.Infof(ctx, "sending a read request from a follower of RHS (store %d) in order to trigger lease acquisition", newRightLeaseholder.StoreID()) @@ -1355,15 +1394,16 @@ func verifyCanReadFromAllRepls( return g.Wait() } -func makeReadBatchRequestForDesc( - desc roachpb.RangeDescriptor, ts hlc.Timestamp, -) roachpb.BatchRequest { +func makeTxnReadBatchForDesc(desc roachpb.RangeDescriptor, ts hlc.Timestamp) roachpb.BatchRequest { + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) + var baRead roachpb.BatchRequest baRead.Header.RangeID = desc.RangeID + baRead.Header.Timestamp = ts + baRead.Header.Txn = &txn r := &roachpb.ScanRequest{} r.Key = desc.StartKey.AsRawKey() r.EndKey = desc.EndKey.AsRawKey() baRead.Add(r) - baRead.Timestamp = ts return baRead } diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 72c4ebdee4fc..b15d94b84382 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -82,7 +82,7 @@ func (r *Replica) canServeFollowerReadRLocked( // We can't actually serve the read based on the closed timestamp. // Signal the clients that we want an update so that future requests can succeed. r.store.cfg.ClosedTimestamp.Clients.Request(lErr.LeaseHolder.NodeID, r.RangeID) - log.Eventf(ctx, "can't serve follower read; closed timestamp too low by: %s; maxClosed: %s ts: %s uncertaintyLimit: %s", + log.Eventf(ctx, "can't serve follower read; closed timestamp too low by: %s; MaxClosedTimestamp: %s ts: %s uncertaintyLimit: %s", tsDiff, maxClosed, ba.Timestamp, uncertaintyLimitStr) if false { diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 958c31c6170e..960968934d58 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -801,9 +801,12 @@ func TestLearnerAndVoterOutgoingFollowerRead(t *testing.T) { }) check := func() { + ts := tc.Server(0).Clock().Now() + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) req := roachpb.BatchRequest{Header: roachpb.Header{ RangeID: scratchDesc.RangeID, - Timestamp: tc.Server(0).Clock().Now(), + Timestamp: ts, + Txn: &txn, }} req.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{ Key: scratchDesc.StartKey.AsRawKey(), EndKey: scratchDesc.EndKey.AsRawKey(), diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 1a27c205172a..aa3ce20c7a2d 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -124,6 +124,11 @@ func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Ti m.txn.MinTimestamp.Backward(ts) } +// MaxObservableTimestamp is part of the TxnSender interface. +func (m *MockTransactionalSender) MaxObservableTimestamp() hlc.Timestamp { + return m.txn.MaxObservableTimestamp() +} + // ManualRestart is part of the TxnSender interface. func (m *MockTransactionalSender) ManualRestart( ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 856fbc42e3b9..75006d52dc9a 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -230,6 +230,10 @@ type TxnSender interface { // field on TxnMeta. ProvisionalCommitTimestamp() hlc.Timestamp + // MaxObservableTimestamp returns the largest timestamp at which the + // transaction may observe when performing a read-only operation. + MaxObservableTimestamp() hlc.Timestamp + // IsSerializablePushAndRefreshNotPossible returns true if the // transaction is serializable, its timestamp has been pushed and // there's no chance that refreshing the read spans will succeed diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index e09f39ea6117..ebe5d948055f 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -309,6 +309,14 @@ func (txn *Txn) ProvisionalCommitTimestamp() hlc.Timestamp { return txn.mu.sender.ProvisionalCommitTimestamp() } +// MaxObservableTimestamp returns the largest timestamp at which the transaction +// may observe when performing a read-only operation. +func (txn *Txn) MaxObservableTimestamp() hlc.Timestamp { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.MaxObservableTimestamp() +} + // SetSystemConfigTrigger sets the system db trigger to true on this transaction. // This will impact the EndTxnRequest. Note that this method takes a boolean // argument indicating whether this transaction is intended for the system diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 35c91d29ae0d..86de81748c33 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -910,13 +910,43 @@ func MakeTransaction( func (t Transaction) LastActive() hlc.Timestamp { ts := t.LastHeartbeat // Only forward by the ReadTimestamp if it is a clock timestamp. - // TODO(nvanbenschoten): replace this with look at the Synthetic bool. - if readTS, ok := t.ReadTimestamp.TryToClockTimestamp(); ok { - ts.Forward(readTS.ToTimestamp()) + if !t.ReadTimestamp.Synthetic { + ts.Forward(t.ReadTimestamp) } return ts } +// MaxObservableTimestamp returns the largest timestamp at which the transaction +// may observe when performing a read-only operation. This is the maximum of the +// transaction's read timestamp, its write timestamp, and its global uncertainty +// limit. +func (t Transaction) MaxObservableTimestamp() hlc.Timestamp { + // A transaction can observe committed values up to its read timestamp. + ts := t.ReadTimestamp + // Forward to the transaction's write timestamp. The transaction will read + // committed values at its read timestamp but may perform reads up to its + // intent timestamps if the transaction is reading its own intent writes, + // which we know to all be at timestamps <= its current write timestamp. + // + // There is a case where an intent written by a transaction is above the + // transactions write timestamp — after a successful intent push. Such cases + // do allow a transaction to operate above its max observable timestamp. + // However, this is fine... + // + // WIP: something about how the intent will be present on followers with a + // sufficiently high closed timestamp, either in its pre or post push state. + // Either works. Do we need to change the contract of MaxObservableTimestamp + // to accommodate for this? + ts.Forward(t.WriteTimestamp) + // Forward to the transaction's global uncertainty limit, because the + // transaction may observe committed writes from other transactions up to + // this time and consider them to be "uncertain". When a transaction begins, + // this will be above its read timestamp, but the read timestamp can surpass + // the global uncertainty limit due to refreshes or retries. + ts.Forward(t.GlobalUncertaintyLimit) + return ts +} + // Clone creates a copy of the given transaction. The copy is shallow because // none of the references held by a transaction allow interior mutability. func (t Transaction) Clone() *Transaction { diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 8a01bde6ed5b..38eddb98d07e 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -31,11 +31,11 @@ type Policy byte var ( // RandomChoice chooses lease replicas randomly. - RandomChoice = RegisterPolicy(newRandomOracleFactory) + RandomChoice = RegisterPolicy(newRandomOracle) // BinPackingChoice bin-packs the choices. - BinPackingChoice = RegisterPolicy(newBinPackingOracleFactory) + BinPackingChoice = RegisterPolicy(newBinPackingOracle) // ClosestChoice chooses the node closest to the current node. - ClosestChoice = RegisterPolicy(newClosestOracleFactory) + ClosestChoice = RegisterPolicy(newClosestOracle) ) // Config is used to construct an OracleFactory. @@ -63,43 +63,45 @@ type Oracle interface { // don't care about the leaseholder (e.g. when we're planning for follower // reads). // + // When the range's closed timestamp policy is known, it is passed in. + // Otherwise, the default closed timestamp policy is provided. + // // A RangeUnavailableError can be returned if there's no information in gossip // about any of the nodes that might be tried. ChoosePreferredReplica( - ctx context.Context, rng *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor, qState QueryState, + ctx context.Context, + txn *kv.Txn, + rng *roachpb.RangeDescriptor, + leaseholder *roachpb.ReplicaDescriptor, + ctPolicy roachpb.RangeClosedTimestampPolicy, + qState QueryState, ) (roachpb.ReplicaDescriptor, error) } -// OracleFactory creates an oracle for a Txn. -type OracleFactory interface { - Oracle(*kv.Txn) Oracle -} - -// OracleFactoryFunc creates an OracleFactory from a Config. -type OracleFactoryFunc func(Config) OracleFactory +// OracleFactory creates an oracle from a Config. +type OracleFactory func(Config) Oracle -// NewOracleFactory creates an oracle with the given policy. -func NewOracleFactory(policy Policy, cfg Config) OracleFactory { - ff, ok := oracleFactoryFuncs[policy] +// NewOracle creates an oracle with the given policy. +func NewOracle(policy Policy, cfg Config) Oracle { + ff, ok := oracleFactories[policy] if !ok { panic(errors.Errorf("unknown Policy %v", policy)) } return ff(cfg) } -// RegisterPolicy creates a new policy given a function which constructs an -// OracleFactory. RegisterPolicy is intended to be called only during init and -// is not safe for concurrent use. -func RegisterPolicy(f OracleFactoryFunc) Policy { - if len(oracleFactoryFuncs) == 255 { +// RegisterPolicy creates a new policy given an OracleFactory. RegisterPolicy is +// intended to be called only during init and is not safe for concurrent use. +func RegisterPolicy(f OracleFactory) Policy { + if len(oracleFactories) == 255 { panic("Can only register 255 Policy instances") } - r := Policy(len(oracleFactoryFuncs)) - oracleFactoryFuncs[r] = f + r := Policy(len(oracleFactories)) + oracleFactories[r] = f return r } -var oracleFactoryFuncs = map[Policy]OracleFactoryFunc{} +var oracleFactories = map[Policy]OracleFactory{} // QueryState encapsulates the history of assignments of ranges to nodes // done by an oracle on behalf of one particular query. @@ -122,18 +124,17 @@ type randomOracle struct { nodeDescs kvcoord.NodeDescStore } -var _ OracleFactory = &randomOracle{} - -func newRandomOracleFactory(cfg Config) OracleFactory { +func newRandomOracle(cfg Config) Oracle { return &randomOracle{nodeDescs: cfg.NodeDescs} } -func (o *randomOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *randomOracle) ChoosePreferredReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, _ *roachpb.ReplicaDescriptor, _ QueryState, + ctx context.Context, + _ *kv.Txn, + desc *roachpb.RangeDescriptor, + _ *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, + _ QueryState, ) (roachpb.ReplicaDescriptor, error) { replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc, kvcoord.OnlyPotentialLeaseholders) if err != nil { @@ -150,7 +151,7 @@ type closestOracle struct { latencyFunc kvcoord.LatencyFunc } -func newClosestOracleFactory(cfg Config) OracleFactory { +func newClosestOracle(cfg Config) Oracle { return &closestOracle{ nodeDescs: cfg.NodeDescs, nodeDesc: cfg.NodeDesc, @@ -158,12 +159,13 @@ func newClosestOracleFactory(cfg Config) OracleFactory { } } -func (o *closestOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *closestOracle) ChoosePreferredReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, _ *roachpb.ReplicaDescriptor, _ QueryState, + ctx context.Context, + _ *kv.Txn, + desc *roachpb.RangeDescriptor, + _ *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, + _ QueryState, ) (roachpb.ReplicaDescriptor, error) { // We know we're serving a follower read request, so consider all non-outgoing // replicas. @@ -199,7 +201,7 @@ type binPackingOracle struct { latencyFunc kvcoord.LatencyFunc } -func newBinPackingOracleFactory(cfg Config) OracleFactory { +func newBinPackingOracle(cfg Config) Oracle { return &binPackingOracle{ maxPreferredRangesPerLeaseHolder: maxPreferredRangesPerLeaseHolder, nodeDescs: cfg.NodeDescs, @@ -208,16 +210,12 @@ func newBinPackingOracleFactory(cfg Config) OracleFactory { } } -var _ OracleFactory = &binPackingOracle{} - -func (o *binPackingOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *binPackingOracle) ChoosePreferredReplica( ctx context.Context, + _ *kv.Txn, desc *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, queryState QueryState, ) (roachpb.ReplicaDescriptor, error) { // If we know the leaseholder, we choose it. diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index c714094b46ed..7298f587362b 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -30,7 +30,7 @@ import ( // TestRandomOracle defeats TestUnused for RandomChoice. func TestRandomOracle(t *testing.T) { - _ = NewOracleFactory(RandomChoice, Config{}) + _ = NewOracle(RandomChoice, Config{}) } func TestClosest(t *testing.T) { @@ -40,24 +40,30 @@ func TestClosest(t *testing.T) { defer stopper.Stop(ctx) g, _ := makeGossip(t, stopper) nd, _ := g.GetNodeDescriptor(1) - of := NewOracleFactory(ClosestChoice, Config{ + o := NewOracle(ClosestChoice, Config{ NodeDescs: g, NodeDesc: *nd, }) - of.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { + o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { if strings.HasSuffix(s, "2") { return time.Nanosecond, true } return time.Millisecond, true } - o := of.Oracle(nil) - info, err := o.ChoosePreferredReplica(ctx, &roachpb.RangeDescriptor{ - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1}, - {NodeID: 2, StoreID: 2}, - {NodeID: 3, StoreID: 3}, + info, err := o.ChoosePreferredReplica( + ctx, + nil, /* txn */ + &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + }, }, - }, nil /* leaseHolder */, QueryState{}) + nil, /* leaseHolder */ + roachpb.LAG_BY_CLUSTER_SETTING, + QueryState{}, + ) if err != nil { t.Fatalf("Failed to choose closest replica: %v", err) } diff --git a/pkg/sql/physicalplan/span_resolver.go b/pkg/sql/physicalplan/span_resolver.go index 9fac64c525bf..ce47cda2f6e1 100644 --- a/pkg/sql/physicalplan/span_resolver.go +++ b/pkg/sql/physicalplan/span_resolver.go @@ -115,10 +115,10 @@ type SpanResolverIterator interface { // spanResolver implements SpanResolver. type spanResolver struct { - st *cluster.Settings - distSender *kvcoord.DistSender - nodeDesc roachpb.NodeDescriptor - oracleFactory replicaoracle.OracleFactory + st *cluster.Settings + distSender *kvcoord.DistSender + nodeDesc roachpb.NodeDescriptor + oracle replicaoracle.Oracle } var _ SpanResolver = &spanResolver{} @@ -135,7 +135,7 @@ func NewSpanResolver( return &spanResolver{ st: st, nodeDesc: nodeDesc, - oracleFactory: replicaoracle.NewOracleFactory(policy, replicaoracle.Config{ + oracle: replicaoracle.NewOracle(policy, replicaoracle.Config{ NodeDescs: nodeDescs, NodeDesc: nodeDesc, Settings: st, @@ -147,6 +147,8 @@ func NewSpanResolver( // spanResolverIterator implements the SpanResolverIterator interface. type spanResolverIterator struct { + // txn is the transaction using the iterator. + txn *kv.Txn // it is a wrapped RangeIterator. it *kvcoord.RangeIterator // oracle is used to choose a lease holders for ranges when one isn't present @@ -167,8 +169,9 @@ var _ SpanResolverIterator = &spanResolverIterator{} // NewSpanResolverIterator creates a new SpanResolverIterator. func (sr *spanResolver) NewSpanResolverIterator(txn *kv.Txn) SpanResolverIterator { return &spanResolverIterator{ + txn: txn, it: kvcoord.NewRangeIterator(sr.distSender), - oracle: sr.oracleFactory.Oracle(txn), + oracle: sr.oracle, queryState: replicaoracle.MakeQueryState(), } } @@ -260,7 +263,7 @@ func (it *spanResolverIterator) ReplicaInfo( } repl, err := it.oracle.ChoosePreferredReplica( - ctx, it.it.Desc(), it.it.Leaseholder(), it.queryState) + ctx, it.txn, it.it.Desc(), it.it.Leaseholder(), it.it.ClosedTimestampPolicy(), it.queryState) if err != nil { return roachpb.ReplicaDescriptor{}, err }