diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel index de96531e8339..9820e372612e 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl", visibility = ["//visibility:public"], deps = [ - "//pkg/base", "//pkg/ccl/utilccl", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", @@ -19,7 +18,6 @@ go_library( "//pkg/sql/physicalplan/replicaoracle", "//pkg/sql/sem/builtins", "//pkg/util/hlc", - "//pkg/util/timeutil", "//pkg/util/uuid", ], ) @@ -39,6 +37,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/rpc", "//pkg/security", @@ -47,21 +46,21 @@ go_test( "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/physicalplan/replicaoracle", - "//pkg/storage/enginepb", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", - "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go index 726a337c6487..55d58025d7f5 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go @@ -11,10 +11,10 @@ package kvfollowerreadsccl import ( + "context" "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -49,11 +48,11 @@ var followerReadMultiple = settings.RegisterFloatSetting( }, ) -// getFollowerReadOffset returns the offset duration which should be used to as -// the offset from now to request a follower read. The same value less the clock -// uncertainty, then is used to determine at the kv layer if a query can use a -// follower read. -func getFollowerReadDuration(st *cluster.Settings) time.Duration { +// getFollowerReadLag returns the (negative) offset duration from hlc.Now() +// which should be used to request a follower read. The same value is used to +// determine at the kv layer if a query can use a follower read for ranges with +// the default LAG_BY_CLUSTER_SETTING closed timestamp policy. +func getFollowerReadLag(st *cluster.Settings) time.Duration { targetMultiple := followerReadMultiple.Get(&st.SV) targetDuration := closedts.TargetDuration.Get(&st.SV) closeFraction := closedts.CloseFraction.Get(&st.SV) @@ -61,91 +60,119 @@ func getFollowerReadDuration(st *cluster.Settings) time.Duration { (1+closeFraction*targetMultiple)) } +// getGlobalReadsLead returns the (positive) offset duration from hlc.Now() +// which clients can expect followers of a range with the LEAD_FOR_GLOBAL_READS +// closed timestamp policy to have closed off. This offset is equal to the +// maximum clock offset, allowing present-time (i.e. those not pushed into the +// future) transactions to serve reads from followers. +func getGlobalReadsLead(clock *hlc.Clock) time.Duration { + return clock.MaxOffset() +} + func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error { org := sql.ClusterOrganization.Get(&st.SV) return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads") } +func checkFollowerReadsEnabled(clusterID uuid.UUID, st *cluster.Settings) bool { + if !kvserver.FollowerReadsEnabled.Get(&st.SV) { + return false + } + return checkEnterpriseEnabled(clusterID, st) == nil +} + func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) { if err := checkEnterpriseEnabled(clusterID, st); err != nil { return 0, err } - return getFollowerReadDuration(st), nil + return getFollowerReadLag(st), nil } // batchCanBeEvaluatedOnFollower determines if a batch consists exclusively of // requests that can be evaluated on a follower replica. func batchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { - return !ba.IsLocking() && ba.IsAllTransactional() -} - -// txnCanPerformFollowerRead determines if the provided transaction can perform -// follower reads. -func txnCanPerformFollowerRead(txn *roachpb.Transaction) bool { - // If the request is transactional and that transaction has acquired any - // locks then that request should not perform follower reads. Doing so could - // allow the request to miss its own writes or observe state that conflicts - // with its locks. - return txn != nil && !txn.IsLocking() + return ba.Txn != nil && !ba.IsLocking() && ba.IsAllTransactional() } -// canUseFollowerRead determines if a query can be sent to a follower. -func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timestamp) bool { - if !kvserver.FollowerReadsEnabled.Get(&st.SV) { - return false - } - threshold := (-1 * getFollowerReadDuration(st)) - 1*base.DefaultMaxClockOffset - if timeutil.Since(ts.GoTime()) < threshold { - return false +// closedTimestampLikelySufficient determines if a request at a given timestamp +// is likely to be below a follower's closed timestamp and servicable as a +// follower read were the request to be sent to a follower replica. +func closedTimestampLikelySufficient( + st *cluster.Settings, + clock *hlc.Clock, + ctPolicy roachpb.RangeClosedTimestampPolicy, + maxObservableTS hlc.Timestamp, +) bool { + var offset time.Duration + switch ctPolicy { + case roachpb.LAG_BY_CLUSTER_SETTING: + offset = getFollowerReadLag(st) + case roachpb.LEAD_FOR_GLOBAL_READS: + offset = getGlobalReadsLead(clock) + default: + panic("unknown RangeClosedTimestampPolicy") } - return checkEnterpriseEnabled(clusterID, st) == nil + expectedClosedTS := clock.Now().Add(offset.Nanoseconds(), 0) + return maxObservableTS.LessEq(expectedClosedTS) } // canSendToFollower implements the logic for checking whether a batch request // may be sent to a follower. -// TODO(aayush): We should try to bind clusterID to the function here, rather -// than having callers plumb it in every time. -func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool { - return batchCanBeEvaluatedOnFollower(ba) && - txnCanPerformFollowerRead(ba.Txn) && - canUseFollowerRead(clusterID, st, forward(ba.Txn.ReadTimestamp, ba.Txn.GlobalUncertaintyLimit)) -} - -func forward(ts hlc.Timestamp, to hlc.Timestamp) hlc.Timestamp { - ts.Forward(to) - return ts +func canSendToFollower( + clusterID uuid.UUID, + st *cluster.Settings, + clock *hlc.Clock, + ctPolicy roachpb.RangeClosedTimestampPolicy, + ba roachpb.BatchRequest, +) bool { + return checkFollowerReadsEnabled(clusterID, st) && + batchCanBeEvaluatedOnFollower(ba) && + closedTimestampLikelySufficient(st, clock, ctPolicy, ba.Txn.MaxObservableTimestamp()) } -type oracleFactory struct { - clusterID *base.ClusterIDContainer - st *cluster.Settings +type followerReadOracle struct { + st *cluster.Settings + clock *hlc.Clock - binPacking replicaoracle.OracleFactory - closest replicaoracle.OracleFactory + closest replicaoracle.Oracle + binPacking replicaoracle.Oracle } -func newOracleFactory(cfg replicaoracle.Config) replicaoracle.OracleFactory { - return &oracleFactory{ - clusterID: &cfg.RPCContext.ClusterID, +func newFollowerReadOracle(cfg replicaoracle.Config) replicaoracle.Oracle { + if !checkFollowerReadsEnabled(cfg.RPCContext.ClusterID.Get(), cfg.Settings) { + return replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg) + } + return &followerReadOracle{ st: cfg.Settings, - binPacking: replicaoracle.NewOracleFactory(replicaoracle.BinPackingChoice, cfg), - closest: replicaoracle.NewOracleFactory(replicaoracle.ClosestChoice, cfg), + clock: cfg.RPCContext.Clock, + closest: replicaoracle.NewOracle(replicaoracle.ClosestChoice, cfg), + binPacking: replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg), } } -func (f oracleFactory) Oracle(txn *kv.Txn) replicaoracle.Oracle { - if txn != nil && canUseFollowerRead(f.clusterID.Get(), f.st, txn.ReadTimestamp()) { - return f.closest.Oracle(txn) +func (o *followerReadOracle) ChoosePreferredReplica( + ctx context.Context, + txn *kv.Txn, + desc *roachpb.RangeDescriptor, + leaseholder *roachpb.ReplicaDescriptor, + ctPolicy roachpb.RangeClosedTimestampPolicy, + queryState replicaoracle.QueryState, +) (roachpb.ReplicaDescriptor, error) { + var oracle replicaoracle.Oracle + if txn != nil && closedTimestampLikelySufficient(o.st, o.clock, ctPolicy, txn.MaxObservableTimestamp()) { + oracle = o.closest + } else { + oracle = o.binPacking } - return f.binPacking.Oracle(txn) + return oracle.ChoosePreferredReplica(ctx, txn, desc, leaseholder, ctPolicy, queryState) } -// followerReadAwareChoice is a leaseholder choosing policy that detects +// followerReadOraclePolicy is a leaseholder choosing policy that detects // whether a query can be used with a follower read. -var followerReadAwareChoice = replicaoracle.RegisterPolicy(newOracleFactory) +var followerReadOraclePolicy = replicaoracle.RegisterPolicy(newFollowerReadOracle) func init() { - sql.ReplicaOraclePolicy = followerReadAwareChoice + sql.ReplicaOraclePolicy = followerReadOraclePolicy builtins.EvalFollowerReadOffset = evalFollowerReadOffset kvcoord.CanSendToFollower = canSendToFollower } diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 4d1665ea1393..c8a383e082c9 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -10,7 +10,6 @@ package kvfollowerreadsccl import ( "context" - "reflect" "testing" "time" @@ -20,24 +19,25 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -69,72 +69,171 @@ func TestEvalFollowerReadOffset(t *testing.T) { func TestCanSendToFollower(t *testing.T) { defer leaktest.AfterTest(t)() - disableEnterprise := utilccl.TestingEnableEnterprise() - defer disableEnterprise() - st := cluster.MakeTestingClusterSettings() - kvserver.FollowerReadsEnabled.Override(&st.SV, true) + clock := hlc.NewClock(hlc.UnixNano, base.DefaultMaxClockOffset) + stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0) + current := clock.Now() + future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0) - old := hlc.Timestamp{ - WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(), - } - oldHeader := roachpb.Header{Txn: &roachpb.Transaction{ - ReadTimestamp: old, - }} - rw := roachpb.BatchRequest{Header: oldHeader} - rw.Add(&roachpb.PutRequest{}) - if canSendToFollower(uuid.MakeV4(), st, rw) { - t.Fatalf("should not be able to send a rw request to a follower") - } - roNonTxn := roachpb.BatchRequest{Header: oldHeader} - roNonTxn.Add(&roachpb.QueryTxnRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNonTxn) { - t.Fatalf("should not be able to send a non-transactional ro request to a follower") + txn := func(ts hlc.Timestamp) *roachpb.Transaction { + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) + return &txn } - roNoTxn := roachpb.BatchRequest{} - roNoTxn.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNoTxn) { - t.Fatalf("should not be able to send a batch with no txn to a follower") + withWriteTimestamp := func(txn *roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction { + txn.WriteTimestamp = ts + return txn } - roOld := roachpb.BatchRequest{Header: oldHeader} - roOld.Add(&roachpb.GetRequest{}) - if !canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should be able to send an old ro batch to a follower") + withUncertaintyLimit := func(txn *roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction { + txn.MaxTimestamp = ts + return txn } - roRWTxnOld := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{Key: []byte("key")}, - ReadTimestamp: old, - }, - }} - roRWTxnOld.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roRWTxnOld) { - t.Fatalf("should not be able to send a ro request from a rw txn to a follower") + batch := func(txn *roachpb.Transaction, req roachpb.Request) roachpb.BatchRequest { + var ba roachpb.BatchRequest + ba.Txn = txn + ba.Add(req) + return ba } - kvserver.FollowerReadsEnabled.Override(&st.SV, false) - if canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled") - } - kvserver.FollowerReadsEnabled.Override(&st.SV, true) - roNew := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - ReadTimestamp: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, - }, - }} - if canSendToFollower(uuid.MakeV4(), st, roNew) { - t.Fatalf("should not be able to send a new ro batch to a follower") - } - roOldWithNewMax := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - GlobalUncertaintyLimit: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, - }, - }} - roOldWithNewMax.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNew) { - t.Fatalf("should not be able to send a ro batch with new GlobalUncertaintyLimit to a follower") + + testCases := []struct { + name string + ba roachpb.BatchRequest + ctPolicy roachpb.RangeClosedTimestampPolicy + disabledEnterprise bool + disabledFollowerReads bool + exp bool + }{ + { + name: "non-txn batch", + ba: batch(nil, &roachpb.GetRequest{}), + exp: false, + }, + { + name: "stale read", + ba: batch(txn(stale), &roachpb.GetRequest{}), + exp: true, + }, + { + name: "stale locking read", + ba: batch(txn(stale), &roachpb.ScanRequest{KeyLocking: lock.Exclusive}), + exp: false, + }, + { + name: "stale write", + ba: batch(txn(stale), &roachpb.PutRequest{}), + exp: false, + }, + { + name: "stale non-txn request", + ba: batch(txn(stale), &roachpb.QueryTxnRequest{}), + exp: false, + }, + { + name: "stale read with current-time writes", + ba: batch(withWriteTimestamp(txn(stale), current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "stale read with current-time uncertainty limit", + ba: batch(withUncertaintyLimit(txn(stale), current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "current-time read", + ba: batch(txn(current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "future read", + ba: batch(txn(future), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "non-txn batch, global reads policy", + ba: batch(nil, &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale read, global reads policy", + ba: batch(txn(stale), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "stale locking read, global reads policy", + ba: batch(txn(stale), &roachpb.ScanRequest{KeyLocking: lock.Exclusive}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale write, global reads policy", + ba: batch(txn(stale), &roachpb.PutRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale non-txn request, global reads policy", + ba: batch(txn(stale), &roachpb.QueryTxnRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale read with current-time writes, global reads policy", + ba: batch(withWriteTimestamp(txn(stale), current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "stale read with current-time uncertainty limit, global reads policy", + ba: batch(withUncertaintyLimit(txn(stale), current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "current-time read, global reads policy", + ba: batch(txn(current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "current-time read with future writes, global reads policy", + ba: batch(withWriteTimestamp(txn(current), future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "current-time read with future uncertainty limit, global reads policy", + ba: batch(withUncertaintyLimit(txn(current), future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "future read, global reads policy", + ba: batch(txn(future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "non-enterprise", + disabledEnterprise: true, + exp: false, + }, + { + name: "follower reads disabled", + disabledFollowerReads: true, + exp: false, + }, } - disableEnterprise() - if canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should not be able to send an old ro batch to a follower without enterprise enabled") + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + if !c.disabledEnterprise { + defer utilccl.TestingEnableEnterprise()() + } + st := cluster.MakeTestingClusterSettings() + kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads) + + can := canSendToFollower(uuid.MakeV4(), st, clock, c.ctPolicy, c.ba) + require.Equal(t, c.exp, can) + }) } } @@ -149,43 +248,193 @@ func TestFollowerReadMultipleValidation(t *testing.T) { followerReadMultiple.Override(&st.SV, .1) } +// mockNodeStore implements the kvcoord.NodeDescStore interface. +type mockNodeStore []roachpb.NodeDescriptor + +func (s mockNodeStore) GetNodeDescriptor(id roachpb.NodeID) (*roachpb.NodeDescriptor, error) { + for i := range s { + desc := &s[i] + if desc.NodeID == id { + return desc, nil + } + } + return nil, errors.Errorf("unable to look up descriptor for n%d", id) +} + // TestOracle tests the OracleFactory exposed by this package. -// This test ends up being rather indirect but works by checking if the type -// of the oracle returned from the factory differs between requests we'd -// expect to support follower reads and that which we'd expect not to. func TestOracleFactory(t *testing.T) { defer leaktest.AfterTest(t)() - disableEnterprise := utilccl.TestingEnableEnterprise() - defer disableEnterprise() - st := cluster.MakeTestingClusterSettings() - kvserver.FollowerReadsEnabled.Override(&st.SV, true) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + + ctx := context.Background() stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) + defer stopper.Stop(ctx) + clock := hlc.NewClock(hlc.UnixNano, base.DefaultMaxClockOffset) + stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0) + current := clock.Now() + future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0) + + c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper) + staleTxn := kv.NewTxn(ctx, c, 0) + staleTxn.SetFixedTimestamp(ctx, stale) + currentTxn := kv.NewTxn(ctx, c, 0) + currentTxn.SetFixedTimestamp(ctx, current) + futureTxn := kv.NewTxn(ctx, c, 0) + futureTxn.SetFixedTimestamp(ctx, future) + + nodes := mockNodeStore{ + {NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")}, + {NodeID: 2, Address: util.MakeUnresolvedAddr("tcp", "2")}, + {NodeID: 3, Address: util.MakeUnresolvedAddr("tcp", "3")}, + } + replicas := []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + } + desc := &roachpb.RangeDescriptor{ + InternalReplicas: replicas, + } + closestFollower := replicas[1] + leaseholder := replicas[2] + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - c := kv.NewDB(log.AmbientContext{ - Tracer: tracing.NewTracer(), - }, kv.MockTxnSenderFactory{}, hlc.NewClock(hlc.UnixNano, time.Nanosecond), stopper) - txn := kv.NewTxn(context.Background(), c, 0) - of := replicaoracle.NewOracleFactory(followerReadAwareChoice, replicaoracle.Config{ - Settings: st, - RPCContext: rpcContext, - }) - noFollowerReadOracle := of.Oracle(txn) - old := hlc.Timestamp{ - WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(), + setLatency := func(addr string, latency time.Duration) { + // All test cases have to have at least 11 measurement values in order for + // the exponentially-weighted moving average to work properly. See the + // comment on the WARMUP_SAMPLES const in the ewma package for details. + for i := 0; i < 11; i++ { + rpcContext.RemoteClocks.UpdateOffset(ctx, addr, rpc.RemoteOffset{}, latency) + } } - txn.SetFixedTimestamp(context.Background(), old) - followerReadOracle := of.Oracle(txn) - if reflect.TypeOf(followerReadOracle) == reflect.TypeOf(noFollowerReadOracle) { - t.Fatalf("expected types of %T and %T to differ", followerReadOracle, - noFollowerReadOracle) + setLatency("1", 100*time.Millisecond) + setLatency("2", 2*time.Millisecond) + setLatency("3", 80*time.Millisecond) + + testCases := []struct { + name string + txn *kv.Txn + lh *roachpb.ReplicaDescriptor + ctPolicy roachpb.RangeClosedTimestampPolicy + disabledEnterprise bool + disabledFollowerReads bool + exp roachpb.ReplicaDescriptor + }{ + { + name: "non-txn, known leaseholder", + txn: nil, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "non-txn, unknown leaseholder", + txn: nil, + exp: closestFollower, + }, + { + name: "stale txn, known leaseholder", + txn: staleTxn, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "stale txn, unknown leaseholder", + txn: staleTxn, + exp: closestFollower, + }, + { + name: "current txn, known leaseholder", + txn: currentTxn, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "current txn, unknown leaseholder", + txn: currentTxn, + exp: closestFollower, + }, + { + name: "future txn, known leaseholder", + txn: futureTxn, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "future txn, unknown leaseholder", + txn: futureTxn, + exp: closestFollower, + }, + { + name: "stale txn, known leaseholder, global reads policy", + txn: staleTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "stale txn, unknown leaseholder, global reads policy", + txn: staleTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "current txn, known leaseholder, global reads policy", + txn: currentTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "current txn, unknown leaseholder, global reads policy", + txn: currentTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "future txn, known leaseholder, global reads policy", + txn: futureTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "future txn, unknown leaseholder, global reads policy", + txn: futureTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "stale txn, non-enterprise", + txn: staleTxn, + lh: &leaseholder, + disabledEnterprise: true, + exp: leaseholder, + }, + { + name: "stale txn, follower reads disabled", + txn: staleTxn, + lh: &leaseholder, + disabledFollowerReads: true, + exp: leaseholder, + }, } - disableEnterprise() - disabledFollowerReadOracle := of.Oracle(txn) - if reflect.TypeOf(disabledFollowerReadOracle) != reflect.TypeOf(noFollowerReadOracle) { - t.Fatalf("expected types of %T and %T not to differ", disabledFollowerReadOracle, - noFollowerReadOracle) + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + if !c.disabledEnterprise { + defer utilccl.TestingEnableEnterprise()() + } + st := cluster.MakeTestingClusterSettings() + kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads) + + o := replicaoracle.NewOracle(followerReadOraclePolicy, replicaoracle.Config{ + NodeDescs: nodes, + Settings: st, + RPCContext: rpcContext, + }) + + res, err := o.ChoosePreferredReplica(ctx, c.txn, desc, c.lh, c.ctPolicy, replicaoracle.QueryState{}) + require.NoError(t, err) + require.Equal(t, c.exp, res) + }) } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 5cfb8643af81..369af9ee5b58 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -128,7 +128,11 @@ var ( // followerreadsccl code to inject logic to check if follower reads are enabled. // By default, without CCL code, this function returns false. var CanSendToFollower = func( - clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest, + _ uuid.UUID, + _ *cluster.Settings, + _ *hlc.Clock, + _ roachpb.RangeClosedTimestampPolicy, + _ roachpb.BatchRequest, ) bool { return false } @@ -1771,7 +1775,7 @@ func (ds *DistSender) sendToReplicas( desc := routing.Desc() ba.RangeID = desc.RangeID leaseholder := routing.Leaseholder() - canFollowerRead := (ds.clusterID != nil) && CanSendToFollower(ds.clusterID.Get(), ds.st, ba) + canFollowerRead := CanSendToFollower(ds.clusterID.Get(), ds.st, ds.clock, routing.ClosedTimestampPolicy(), ba) var replicas ReplicaSlice var err error if canFollowerRead { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index fe63f3b2c5ec..052d163b7ef4 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -3432,8 +3432,6 @@ func TestErrorIndexAlignment(t *testing.T) { // TestCanSendToFollower tests that the DistSender abides by the result it // get from CanSendToFollower. -// TODO(nvanbenschoten): update this test once ClosedTimestampPolicy begins -// dictating the decision of CanSendToFollower. func TestCanSendToFollower(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -3444,7 +3442,13 @@ func TestCanSendToFollower(t *testing.T) { old := CanSendToFollower defer func() { CanSendToFollower = old }() canSend := true - CanSendToFollower = func(_ uuid.UUID, _ *cluster.Settings, ba roachpb.BatchRequest) bool { + CanSendToFollower = func( + _ uuid.UUID, + _ *cluster.Settings, + _ *hlc.Clock, + _ roachpb.RangeClosedTimestampPolicy, + ba roachpb.BatchRequest, + ) bool { return !ba.IsLocking() && canSend } diff --git a/pkg/kv/kvclient/kvcoord/range_iter.go b/pkg/kv/kvclient/kvcoord/range_iter.go index b1269974b895..c34a28c2b354 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter.go +++ b/pkg/kv/kvclient/kvcoord/range_iter.go @@ -86,6 +86,19 @@ func (ri *RangeIterator) Leaseholder() *roachpb.ReplicaDescriptor { return ri.token.Leaseholder() } +// ClosedTimestampPolicy returns the closed timestamp policy of the range at +// which the iterator is currently positioned. The iterator must be valid. +// +// The policy information comes from a cache, and so it can be stale. Returns +// the default policy of LAG_BY_CLUSTER_SETTING if no policy information is +// known. +func (ri *RangeIterator) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy { + if !ri.Valid() { + panic(ri.Error()) + } + return ri.token.ClosedTimestampPolicy() +} + // Token returns the eviction token corresponding to the range // descriptor for the current iteration. The iterator must be valid. func (ri *RangeIterator) Token() rangecache.EvictionToken { diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 5a028e0617d2..977cd82a1e89 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -931,6 +931,13 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam tc.mu.txn.MinTimestamp.Backward(ts) } +// MaxObservableTimestamp is part of the client.TxnSender interface. +func (tc *TxnCoordSender) MaxObservableTimestamp() hlc.Timestamp { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.mu.txn.MaxObservableTimestamp() +} + // ManualRestart is part of the client.TxnSender interface. func (tc *TxnCoordSender) ManualRestart( ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index a312b88fe99f..c2b76a0ab040 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -90,7 +90,7 @@ func TestClosedTimestampCanServe(t *testing.T) { repls := replsForRange(ctx, t, tc, desc, numNodes) ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -161,7 +161,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) { reqTS := tc.Server(0).Clock().Now() // Sleep for a sufficiently long time so that reqTS can be closed. time.Sleep(3 * testingTargetDuration) - baRead := makeReadBatchRequestForDesc(desc, reqTS) + baRead := makeTxnReadBatchForDesc(desc, reqTS) repls := replsForRange(ctx, t, tc, desc, numNodes) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) @@ -190,7 +190,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { t.Fatal(err) } ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -222,7 +222,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { // Attempt to send read requests to a replica in a tight loop until deadline // is reached. If an error is seen on any replica then it is returned to the // errgroup. - baRead = makeReadBatchRequestForDesc(desc, ts) + baRead = makeTxnReadBatchForDesc(desc, ts) ensureCanReadFromReplicaUntilDeadline := func(r *kvserver.Replica) { g.Go(func() error { for timeutil.Now().Before(deadline) { @@ -285,14 +285,7 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) { respCh := make(chan struct{}, len(keys)) for i, key := range keys { go func(repl *kvserver.Replica, key roachpb.Key) { - var baRead roachpb.BatchRequest - r := &roachpb.ScanRequest{} - r.Key = key - r.EndKey = key.Next() - baRead.Add(r) - baRead.Timestamp = ts - baRead.RangeID = desc.RangeID - + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { // Expect 0 rows, because the intents will be aborted. _, err := expectRows(0)(repl.Send(ctx, baRead)) @@ -356,7 +349,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { } // Start by ensuring that the values can be read from all replicas at ts. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(2)) }) @@ -382,10 +375,10 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { // Now immediately query both the ranges and there's 1 value per range. // We need to tolerate RangeNotFound as the split range may not have been // created yet. - baReadL := makeReadBatchRequestForDesc(lr, ts) + baReadL := makeTxnReadBatchForDesc(lr, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadL, lRepls, respFuncs(retryOnRangeNotFound, expectRows(1)))) - baReadR := makeReadBatchRequestForDesc(rr, ts) + baReadR := makeTxnReadBatchForDesc(rr, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadR, rRepls, respFuncs(retryOnRangeNotFound, expectRows(1)))) @@ -397,7 +390,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { // The hazard here is that a follower is not yet aware of the merge and will // return an error. We'll accept that because a client wouldn't see that error // from distsender. - baReadMerged := makeReadBatchRequestForDesc(merged, ts) + baReadMerged := makeTxnReadBatchForDesc(merged, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadMerged, mergedRepls, respFuncs(retryOnRangeKeyMismatch, expectRows(2)))) } @@ -429,12 +422,12 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { } // Grab a timestamp before initiating a lease transfer, transfer the lease, - // then ensure that reads at that timestamp can occur from all the replicas. + // then ensure that reads at that timestamp can occur from all the replicas. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} lh := getCurrentLeaseholder(t, tc, desc) target := pickRandomTarget(tc, lh, desc) require.Nil(t, tc.TransferRangeLease(desc, target)) - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -448,7 +441,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { verifyNotLeaseHolderErrors(t, baRead, repls, 2) } -func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { +func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -469,17 +462,30 @@ func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { // Verify that we can serve a follower read at a timestamp. Wait if necessary. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) - // Create a read-only batch and attach a read-write transaction. - rwTxn := roachpb.MakeTransaction("test", []byte("key"), roachpb.NormalUserPriority, ts, 0) - baRead.Txn = &rwTxn + // Update the batch to simulate a transaction that has written an intent. + baRead.Txn.Key = []byte("key") + baRead.Txn.Sequence++ - // Send the request to all three replicas. One should succeed and - // the other two should return NotLeaseHolderErrors. + // The write timestamp of the transaction is still closed, so a read-only + // request by the transaction should be servicable by followers. This is + // because the writing transaction can still read its writes on the + // followers. + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) + }) + + // Update the batch to simulate a transaction that has written past its read + // timestamp and past the expected closed timestamp. This should prevent its + // reads from being served by followers. + baRead.Txn.WriteTimestamp = baRead.Txn.WriteTimestamp.Add(time.Hour.Nanoseconds(), 0) + + // Send the request to all three replicas. One should succeed and the other + // two should return NotLeaseHolderErrors. verifyNotLeaseHolderErrors(t, baRead, repls, 2) } @@ -502,9 +508,10 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { t.Fatal(err) } - // Verify that we can serve a follower read at a timestamp. Wait if necessary + // Verify that we can serve a follower read at a timestamp. Wait if + // necessary. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -519,11 +526,43 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { baQueryTxn.Add(r) baQueryTxn.Timestamp = ts - // Send the request to all three replicas. One should succeed and - // the other two should return NotLeaseHolderErrors. + // Send the request to all three replicas. One should succeed and the other + // two should return NotLeaseHolderErrors. verifyNotLeaseHolderErrors(t, baQueryTxn, repls, 2) } +func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Limiting how long transactions can run does not work + // well with race unless we're extremely lenient, which + // drives up the test duration. + skip.UnderRace(t) + + ctx := context.Background() + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + defer tc.Stopper().Stop(ctx) + repls := replsForRange(ctx, t, tc, desc, numNodes) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + + // Verify that we can serve a follower read at a timestamp with a + // transactional batch. Wait if necessary. + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + baRead := makeTxnReadBatchForDesc(desc, ts) + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) + }) + + // Remove the transaction and send the request to all three replicas. One + // should succeed and the other two should return NotLeaseHolderErrors. + baRead.Txn = nil + verifyNotLeaseHolderErrors(t, baRead, repls, 2) +} + // TestClosedTimestampInactiveAfterSubsumption verifies that, during a merge, // replicas of the subsumed range (RHS) cannot serve follower reads for // timestamps after the subsumption time. @@ -694,7 +733,7 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { case <-time.After(30 * time.Second): t.Fatal("failed to receive next closed timestamp update") } - baReadAfterLeaseTransfer := makeReadBatchRequestForDesc(rightDesc, inactiveClosedTSBoundary.Next()) + baReadAfterLeaseTransfer := makeTxnReadBatchForDesc(rightDesc, inactiveClosedTSBoundary.Next()) rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) @@ -755,7 +794,7 @@ func forceLeaseTransferOnSubsumedRange( // that the current rightLeaseholder has stopped heartbeating. This will prompt // it to acquire the range lease for itself. g.Go(func() error { - leaseAcquisitionRequest := makeReadBatchRequestForDesc(rightDesc, freezeStartTimestamp) + leaseAcquisitionRequest := makeTxnReadBatchForDesc(rightDesc, freezeStartTimestamp) log.Infof(ctx, "sending a read request from a follower of RHS (store %d) in order to trigger lease acquisition", newRightLeaseholder.StoreID()) @@ -1355,15 +1394,16 @@ func verifyCanReadFromAllRepls( return g.Wait() } -func makeReadBatchRequestForDesc( - desc roachpb.RangeDescriptor, ts hlc.Timestamp, -) roachpb.BatchRequest { +func makeTxnReadBatchForDesc(desc roachpb.RangeDescriptor, ts hlc.Timestamp) roachpb.BatchRequest { + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) + var baRead roachpb.BatchRequest baRead.Header.RangeID = desc.RangeID + baRead.Header.Timestamp = ts + baRead.Header.Txn = &txn r := &roachpb.ScanRequest{} r.Key = desc.StartKey.AsRawKey() r.EndKey = desc.EndKey.AsRawKey() baRead.Add(r) - baRead.Timestamp = ts return baRead } diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 72c4ebdee4fc..b15d94b84382 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -82,7 +82,7 @@ func (r *Replica) canServeFollowerReadRLocked( // We can't actually serve the read based on the closed timestamp. // Signal the clients that we want an update so that future requests can succeed. r.store.cfg.ClosedTimestamp.Clients.Request(lErr.LeaseHolder.NodeID, r.RangeID) - log.Eventf(ctx, "can't serve follower read; closed timestamp too low by: %s; maxClosed: %s ts: %s uncertaintyLimit: %s", + log.Eventf(ctx, "can't serve follower read; closed timestamp too low by: %s; MaxClosedTimestamp: %s ts: %s uncertaintyLimit: %s", tsDiff, maxClosed, ba.Timestamp, uncertaintyLimitStr) if false { diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 958c31c6170e..960968934d58 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -801,9 +801,12 @@ func TestLearnerAndVoterOutgoingFollowerRead(t *testing.T) { }) check := func() { + ts := tc.Server(0).Clock().Now() + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) req := roachpb.BatchRequest{Header: roachpb.Header{ RangeID: scratchDesc.RangeID, - Timestamp: tc.Server(0).Clock().Now(), + Timestamp: ts, + Txn: &txn, }} req.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{ Key: scratchDesc.StartKey.AsRawKey(), EndKey: scratchDesc.EndKey.AsRawKey(), diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 1a27c205172a..aa3ce20c7a2d 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -124,6 +124,11 @@ func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Ti m.txn.MinTimestamp.Backward(ts) } +// MaxObservableTimestamp is part of the TxnSender interface. +func (m *MockTransactionalSender) MaxObservableTimestamp() hlc.Timestamp { + return m.txn.MaxObservableTimestamp() +} + // ManualRestart is part of the TxnSender interface. func (m *MockTransactionalSender) ManualRestart( ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 856fbc42e3b9..75006d52dc9a 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -230,6 +230,10 @@ type TxnSender interface { // field on TxnMeta. ProvisionalCommitTimestamp() hlc.Timestamp + // MaxObservableTimestamp returns the largest timestamp at which the + // transaction may observe when performing a read-only operation. + MaxObservableTimestamp() hlc.Timestamp + // IsSerializablePushAndRefreshNotPossible returns true if the // transaction is serializable, its timestamp has been pushed and // there's no chance that refreshing the read spans will succeed diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index e09f39ea6117..ebe5d948055f 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -309,6 +309,14 @@ func (txn *Txn) ProvisionalCommitTimestamp() hlc.Timestamp { return txn.mu.sender.ProvisionalCommitTimestamp() } +// MaxObservableTimestamp returns the largest timestamp at which the transaction +// may observe when performing a read-only operation. +func (txn *Txn) MaxObservableTimestamp() hlc.Timestamp { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.MaxObservableTimestamp() +} + // SetSystemConfigTrigger sets the system db trigger to true on this transaction. // This will impact the EndTxnRequest. Note that this method takes a boolean // argument indicating whether this transaction is intended for the system diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 35c91d29ae0d..86de81748c33 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -910,13 +910,43 @@ func MakeTransaction( func (t Transaction) LastActive() hlc.Timestamp { ts := t.LastHeartbeat // Only forward by the ReadTimestamp if it is a clock timestamp. - // TODO(nvanbenschoten): replace this with look at the Synthetic bool. - if readTS, ok := t.ReadTimestamp.TryToClockTimestamp(); ok { - ts.Forward(readTS.ToTimestamp()) + if !t.ReadTimestamp.Synthetic { + ts.Forward(t.ReadTimestamp) } return ts } +// MaxObservableTimestamp returns the largest timestamp at which the transaction +// may observe when performing a read-only operation. This is the maximum of the +// transaction's read timestamp, its write timestamp, and its global uncertainty +// limit. +func (t Transaction) MaxObservableTimestamp() hlc.Timestamp { + // A transaction can observe committed values up to its read timestamp. + ts := t.ReadTimestamp + // Forward to the transaction's write timestamp. The transaction will read + // committed values at its read timestamp but may perform reads up to its + // intent timestamps if the transaction is reading its own intent writes, + // which we know to all be at timestamps <= its current write timestamp. + // + // There is a case where an intent written by a transaction is above the + // transactions write timestamp — after a successful intent push. Such cases + // do allow a transaction to operate above its max observable timestamp. + // However, this is fine... + // + // WIP: something about how the intent will be present on followers with a + // sufficiently high closed timestamp, either in its pre or post push state. + // Either works. Do we need to change the contract of MaxObservableTimestamp + // to accommodate for this? + ts.Forward(t.WriteTimestamp) + // Forward to the transaction's global uncertainty limit, because the + // transaction may observe committed writes from other transactions up to + // this time and consider them to be "uncertain". When a transaction begins, + // this will be above its read timestamp, but the read timestamp can surpass + // the global uncertainty limit due to refreshes or retries. + ts.Forward(t.GlobalUncertaintyLimit) + return ts +} + // Clone creates a copy of the given transaction. The copy is shallow because // none of the references held by a transaction allow interior mutability. func (t Transaction) Clone() *Transaction { diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 8a01bde6ed5b..38eddb98d07e 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -31,11 +31,11 @@ type Policy byte var ( // RandomChoice chooses lease replicas randomly. - RandomChoice = RegisterPolicy(newRandomOracleFactory) + RandomChoice = RegisterPolicy(newRandomOracle) // BinPackingChoice bin-packs the choices. - BinPackingChoice = RegisterPolicy(newBinPackingOracleFactory) + BinPackingChoice = RegisterPolicy(newBinPackingOracle) // ClosestChoice chooses the node closest to the current node. - ClosestChoice = RegisterPolicy(newClosestOracleFactory) + ClosestChoice = RegisterPolicy(newClosestOracle) ) // Config is used to construct an OracleFactory. @@ -63,43 +63,45 @@ type Oracle interface { // don't care about the leaseholder (e.g. when we're planning for follower // reads). // + // When the range's closed timestamp policy is known, it is passed in. + // Otherwise, the default closed timestamp policy is provided. + // // A RangeUnavailableError can be returned if there's no information in gossip // about any of the nodes that might be tried. ChoosePreferredReplica( - ctx context.Context, rng *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor, qState QueryState, + ctx context.Context, + txn *kv.Txn, + rng *roachpb.RangeDescriptor, + leaseholder *roachpb.ReplicaDescriptor, + ctPolicy roachpb.RangeClosedTimestampPolicy, + qState QueryState, ) (roachpb.ReplicaDescriptor, error) } -// OracleFactory creates an oracle for a Txn. -type OracleFactory interface { - Oracle(*kv.Txn) Oracle -} - -// OracleFactoryFunc creates an OracleFactory from a Config. -type OracleFactoryFunc func(Config) OracleFactory +// OracleFactory creates an oracle from a Config. +type OracleFactory func(Config) Oracle -// NewOracleFactory creates an oracle with the given policy. -func NewOracleFactory(policy Policy, cfg Config) OracleFactory { - ff, ok := oracleFactoryFuncs[policy] +// NewOracle creates an oracle with the given policy. +func NewOracle(policy Policy, cfg Config) Oracle { + ff, ok := oracleFactories[policy] if !ok { panic(errors.Errorf("unknown Policy %v", policy)) } return ff(cfg) } -// RegisterPolicy creates a new policy given a function which constructs an -// OracleFactory. RegisterPolicy is intended to be called only during init and -// is not safe for concurrent use. -func RegisterPolicy(f OracleFactoryFunc) Policy { - if len(oracleFactoryFuncs) == 255 { +// RegisterPolicy creates a new policy given an OracleFactory. RegisterPolicy is +// intended to be called only during init and is not safe for concurrent use. +func RegisterPolicy(f OracleFactory) Policy { + if len(oracleFactories) == 255 { panic("Can only register 255 Policy instances") } - r := Policy(len(oracleFactoryFuncs)) - oracleFactoryFuncs[r] = f + r := Policy(len(oracleFactories)) + oracleFactories[r] = f return r } -var oracleFactoryFuncs = map[Policy]OracleFactoryFunc{} +var oracleFactories = map[Policy]OracleFactory{} // QueryState encapsulates the history of assignments of ranges to nodes // done by an oracle on behalf of one particular query. @@ -122,18 +124,17 @@ type randomOracle struct { nodeDescs kvcoord.NodeDescStore } -var _ OracleFactory = &randomOracle{} - -func newRandomOracleFactory(cfg Config) OracleFactory { +func newRandomOracle(cfg Config) Oracle { return &randomOracle{nodeDescs: cfg.NodeDescs} } -func (o *randomOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *randomOracle) ChoosePreferredReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, _ *roachpb.ReplicaDescriptor, _ QueryState, + ctx context.Context, + _ *kv.Txn, + desc *roachpb.RangeDescriptor, + _ *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, + _ QueryState, ) (roachpb.ReplicaDescriptor, error) { replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc, kvcoord.OnlyPotentialLeaseholders) if err != nil { @@ -150,7 +151,7 @@ type closestOracle struct { latencyFunc kvcoord.LatencyFunc } -func newClosestOracleFactory(cfg Config) OracleFactory { +func newClosestOracle(cfg Config) Oracle { return &closestOracle{ nodeDescs: cfg.NodeDescs, nodeDesc: cfg.NodeDesc, @@ -158,12 +159,13 @@ func newClosestOracleFactory(cfg Config) OracleFactory { } } -func (o *closestOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *closestOracle) ChoosePreferredReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, _ *roachpb.ReplicaDescriptor, _ QueryState, + ctx context.Context, + _ *kv.Txn, + desc *roachpb.RangeDescriptor, + _ *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, + _ QueryState, ) (roachpb.ReplicaDescriptor, error) { // We know we're serving a follower read request, so consider all non-outgoing // replicas. @@ -199,7 +201,7 @@ type binPackingOracle struct { latencyFunc kvcoord.LatencyFunc } -func newBinPackingOracleFactory(cfg Config) OracleFactory { +func newBinPackingOracle(cfg Config) Oracle { return &binPackingOracle{ maxPreferredRangesPerLeaseHolder: maxPreferredRangesPerLeaseHolder, nodeDescs: cfg.NodeDescs, @@ -208,16 +210,12 @@ func newBinPackingOracleFactory(cfg Config) OracleFactory { } } -var _ OracleFactory = &binPackingOracle{} - -func (o *binPackingOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *binPackingOracle) ChoosePreferredReplica( ctx context.Context, + _ *kv.Txn, desc *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, queryState QueryState, ) (roachpb.ReplicaDescriptor, error) { // If we know the leaseholder, we choose it. diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index c714094b46ed..7298f587362b 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -30,7 +30,7 @@ import ( // TestRandomOracle defeats TestUnused for RandomChoice. func TestRandomOracle(t *testing.T) { - _ = NewOracleFactory(RandomChoice, Config{}) + _ = NewOracle(RandomChoice, Config{}) } func TestClosest(t *testing.T) { @@ -40,24 +40,30 @@ func TestClosest(t *testing.T) { defer stopper.Stop(ctx) g, _ := makeGossip(t, stopper) nd, _ := g.GetNodeDescriptor(1) - of := NewOracleFactory(ClosestChoice, Config{ + o := NewOracle(ClosestChoice, Config{ NodeDescs: g, NodeDesc: *nd, }) - of.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { + o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { if strings.HasSuffix(s, "2") { return time.Nanosecond, true } return time.Millisecond, true } - o := of.Oracle(nil) - info, err := o.ChoosePreferredReplica(ctx, &roachpb.RangeDescriptor{ - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1}, - {NodeID: 2, StoreID: 2}, - {NodeID: 3, StoreID: 3}, + info, err := o.ChoosePreferredReplica( + ctx, + nil, /* txn */ + &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + }, }, - }, nil /* leaseHolder */, QueryState{}) + nil, /* leaseHolder */ + roachpb.LAG_BY_CLUSTER_SETTING, + QueryState{}, + ) if err != nil { t.Fatalf("Failed to choose closest replica: %v", err) } diff --git a/pkg/sql/physicalplan/span_resolver.go b/pkg/sql/physicalplan/span_resolver.go index 9fac64c525bf..ce47cda2f6e1 100644 --- a/pkg/sql/physicalplan/span_resolver.go +++ b/pkg/sql/physicalplan/span_resolver.go @@ -115,10 +115,10 @@ type SpanResolverIterator interface { // spanResolver implements SpanResolver. type spanResolver struct { - st *cluster.Settings - distSender *kvcoord.DistSender - nodeDesc roachpb.NodeDescriptor - oracleFactory replicaoracle.OracleFactory + st *cluster.Settings + distSender *kvcoord.DistSender + nodeDesc roachpb.NodeDescriptor + oracle replicaoracle.Oracle } var _ SpanResolver = &spanResolver{} @@ -135,7 +135,7 @@ func NewSpanResolver( return &spanResolver{ st: st, nodeDesc: nodeDesc, - oracleFactory: replicaoracle.NewOracleFactory(policy, replicaoracle.Config{ + oracle: replicaoracle.NewOracle(policy, replicaoracle.Config{ NodeDescs: nodeDescs, NodeDesc: nodeDesc, Settings: st, @@ -147,6 +147,8 @@ func NewSpanResolver( // spanResolverIterator implements the SpanResolverIterator interface. type spanResolverIterator struct { + // txn is the transaction using the iterator. + txn *kv.Txn // it is a wrapped RangeIterator. it *kvcoord.RangeIterator // oracle is used to choose a lease holders for ranges when one isn't present @@ -167,8 +169,9 @@ var _ SpanResolverIterator = &spanResolverIterator{} // NewSpanResolverIterator creates a new SpanResolverIterator. func (sr *spanResolver) NewSpanResolverIterator(txn *kv.Txn) SpanResolverIterator { return &spanResolverIterator{ + txn: txn, it: kvcoord.NewRangeIterator(sr.distSender), - oracle: sr.oracleFactory.Oracle(txn), + oracle: sr.oracle, queryState: replicaoracle.MakeQueryState(), } } @@ -260,7 +263,7 @@ func (it *spanResolverIterator) ReplicaInfo( } repl, err := it.oracle.ChoosePreferredReplica( - ctx, it.it.Desc(), it.it.Leaseholder(), it.queryState) + ctx, it.txn, it.it.Desc(), it.it.Leaseholder(), it.it.ClosedTimestampPolicy(), it.queryState) if err != nil { return roachpb.ReplicaDescriptor{}, err }