Skip to content

Commit

Permalink
kv: route present-time reads to global_read follower replicas
Browse files Browse the repository at this point in the history
First commit from cockroachdb#59505.

This commit updates the kv client routing logic to account for the new
`LEAD_FOR_GLOBAL_READS` `RangeClosedTimestampPolicy` added in cockroachdb#59505. In
enterprise builds, non-stale read-only requests to ranges with this
closed timestamp policy can now be routed to follower replicas, just
like stale read-only requests to normal ranges currently are.

In addition to this main change, there are a few smaller changes in this
PR that were hard to split out, so are included in this commit.

First, non-transactional requests are no longer served by followers even
if the follower replica detects that the request can be. Previously,
non-transactional requests would never be routed intentionally to a
follower replica, but `Replica.canServeFollowerRead` would allow them
through if their timestamp was low enough. This change was made in order
to keep the client and server logic in-sync and because this does not
seem safe for non-transactional requests that get their timestamp
assigned on the server. We're planning to remove non-transactional
requests soon anyway (cockroachdb#58459), so this isn't a big deal. It mostly
just caused some testing fallout.

Second, transactional requests that had previously written intents are
now allowed to have their read-only requests served by followers, as
long as those followers have a closed timestamp above the transaction's
read *and* write timestamp. Previously, we had avoided this because it
seemed to cause issues with read-your-writes. However, it appears safe
as long as the write timestamp is below the closed timestamp, because we
know all of the transaction's intents are at or below its write
timestamp. This is very important for multi-region work, where we want a
transaction to be able to write to a REGIONAL table and then later
perform local reads (maybe foreign key checks) on GLOBAL tables.

Third, a max clock offset shift in `canUseFollowerRead` was removed. It
wasn't exactly clear what this was doing. It was introduced in the
original 70be833 and seemed to allow a follower read in cases where they
otherwise shouldn't be expected to succeed. I thought at first that it
was accounting for the fact that the kv client's clock might be leading
the kv server's clock and so it was being pessimistic about the expected
closed timestamp, but it actually seems to be shifting the other way, so
I don't know. I might be missing something.

Finally, the concept of a `replicaoracle.OracleFactoryFunc` was removed
and the existing `replicaoracle.OracleFactory` takes its place. We no
longer need the double-factory approach because the transaction object
is now passed directly to `Oracle.ChoosePreferredReplica`. This was a
necessary change, because the process of determining whether a follower
read can be served now requires transaction information and range
information (i.e. the closed timestamp policy), so we need to make it in
the Oracle itself instead of in the OracleFactory. This all seems like a
simplification anyway.

This is still waiting on changes to the closed timestamp system to be
able to write end-to-end tests using this new functionality.
  • Loading branch information
nvanbenschoten committed Feb 6, 2021
1 parent 81a2c26 commit a501736
Show file tree
Hide file tree
Showing 17 changed files with 682 additions and 265 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ go_library(
"//pkg/sql/physicalplan/replicaoracle",
"//pkg/sql/sem/builtins",
"//pkg/util/hlc",
"//pkg/util/timeutil",
"//pkg/util/uuid",
],
)
Expand All @@ -38,6 +37,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security",
Expand All @@ -46,21 +46,21 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/physicalplan/replicaoracle",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
152 changes: 100 additions & 52 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kvfollowerreadsccl

import (
"context"
"fmt"
"time"

Expand All @@ -27,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand All @@ -49,103 +49,151 @@ var followerReadMultiple = settings.RegisterFloatSetting(
},
)

// getFollowerReadOffset returns the offset duration which should be used to as
// the offset from now to request a follower read. The same value less the clock
// uncertainty, then is used to determine at the kv layer if a query can use a
// follower read.
func getFollowerReadDuration(st *cluster.Settings) time.Duration {
// getFollowerReadLag returns the (negative) offset duration from hlc.Now()
// which should be used to request a follower read. The same value is used to
// determine at the kv layer if a query can use a follower read for ranges with
// the default LAG_BY_CLUSTER_SETTING closed timestamp policy.
func getFollowerReadLag(st *cluster.Settings) time.Duration {
targetMultiple := followerReadMultiple.Get(&st.SV)
targetDuration := closedts.TargetDuration.Get(&st.SV)
closeFraction := closedts.CloseFraction.Get(&st.SV)
return -1 * time.Duration(float64(targetDuration)*
(1+closeFraction*targetMultiple))
}

// getGlobalReadsLead returns the (positive) offset duration from hlc.Now()
// which clients can expect followers of a range with the LEAD_FOR_GLOBAL_READS
// closed timestamp policy to have closed off. This offset is equal to the
// maximum clock offset, allowing present-time (i.e. those not pushed into the
// future) transactions to serve reads from followers.
func getGlobalReadsLead(clock *hlc.Clock) time.Duration {
return clock.MaxOffset()
}

func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error {
org := sql.ClusterOrganization.Get(&st.SV)
return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads")
}

func checkFollowerReadsEnabled(clusterID uuid.UUID, st *cluster.Settings) bool {
if !kvserver.FollowerReadsEnabled.Get(&st.SV) {
return false
}
return checkEnterpriseEnabled(clusterID, st) == nil
}

func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) {
if err := checkEnterpriseEnabled(clusterID, st); err != nil {
return 0, err
}
return getFollowerReadDuration(st), nil
return getFollowerReadLag(st), nil
}

// batchCanBeEvaluatedOnFollower determines if a batch consists exclusively of
// requests that can be evaluated on a follower replica.
func batchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool {
return !ba.IsLocking() && ba.IsAllTransactional()
}

// txnCanPerformFollowerRead determines if the provided transaction can perform
// follower reads.
func txnCanPerformFollowerRead(txn *roachpb.Transaction) bool {
// If the request is transactional and that transaction has acquired any
// locks then that request should not perform follower reads. Doing so could
// allow the request to miss its own writes or observe state that conflicts
// with its locks.
return txn != nil && !txn.IsLocking()
return ba.Txn != nil && !ba.IsLocking() && ba.IsAllTransactional()
}

// canUseFollowerRead determines if a query can be sent to a follower.
func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timestamp) bool {
if !kvserver.FollowerReadsEnabled.Get(&st.SV) {
return false
// closedTimestampLikelySufficient determines if a request at a given timestamp
// is likely to be below a follower's closed timestamp and servicable as a
// follower read were the request to be sent to a follower replica.
func closedTimestampLikelySufficient(
st *cluster.Settings,
clock *hlc.Clock,
ctPolicy roachpb.RangeClosedTimestampPolicy,
maxObservableTS hlc.Timestamp,
) bool {
var offset time.Duration
switch ctPolicy {
case roachpb.LAG_BY_CLUSTER_SETTING:
offset = getFollowerReadLag(st)
case roachpb.LEAD_FOR_GLOBAL_READS:
offset = getGlobalReadsLead(clock)
default:
panic("unknown RangeClosedTimestampPolicy")
}
threshold := (-1 * getFollowerReadDuration(st)) - 1*base.DefaultMaxClockOffset
if timeutil.Since(ts.GoTime()) < threshold {
return false
}
return checkEnterpriseEnabled(clusterID, st) == nil
expectedClosedTS := clock.Now().Add(offset.Nanoseconds(), 0)
return maxObservableTS.LessEq(expectedClosedTS)
}

// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
// TODO(aayush): We should try to bind clusterID to the function here, rather
// than having callers plumb it in every time.
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return batchCanBeEvaluatedOnFollower(ba) &&
txnCanPerformFollowerRead(ba.Txn) &&
canUseFollowerRead(clusterID, st, forward(ba.Txn.ReadTimestamp, ba.Txn.MaxTimestamp))
func canSendToFollower(
clusterID uuid.UUID,
st *cluster.Settings,
clock *hlc.Clock,
ctPolicy roachpb.RangeClosedTimestampPolicy,
ba roachpb.BatchRequest,
) bool {
return checkFollowerReadsEnabled(clusterID, st) &&
batchCanBeEvaluatedOnFollower(ba) &&
closedTimestampLikelySufficient(st, clock, ctPolicy, ba.Txn.MaxObservableTimestamp())
}

func forward(ts hlc.Timestamp, to hlc.Timestamp) hlc.Timestamp {
ts.Forward(to)
return ts
}

type oracleFactory struct {
type followerReadOracle struct {
clusterID *base.ClusterIDContainer
st *cluster.Settings
clock *hlc.Clock

binPacking replicaoracle.OracleFactory
closest replicaoracle.OracleFactory
closest replicaoracle.Oracle
binPacking replicaoracle.Oracle
}

func newOracleFactory(cfg replicaoracle.Config) replicaoracle.OracleFactory {
return &oracleFactory{
func newFollowerReadOracle(cfg replicaoracle.Config) replicaoracle.Oracle {
return &followerReadOracle{
clusterID: &cfg.RPCContext.ClusterID,
st: cfg.Settings,
binPacking: replicaoracle.NewOracleFactory(replicaoracle.BinPackingChoice, cfg),
closest: replicaoracle.NewOracleFactory(replicaoracle.ClosestChoice, cfg),
clock: cfg.RPCContext.Clock,
closest: replicaoracle.NewOracle(replicaoracle.ClosestChoice, cfg),
binPacking: replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg),
}
}

func (f oracleFactory) Oracle(txn *kv.Txn) replicaoracle.Oracle {
if txn != nil && canUseFollowerRead(f.clusterID.Get(), f.st, txn.ReadTimestamp()) {
return f.closest.Oracle(txn)
func (o *followerReadOracle) ChoosePreferredReplica(
ctx context.Context,
txn *kv.Txn,
desc *roachpb.RangeDescriptor,
leaseholder *roachpb.ReplicaDescriptor,
ctPolicy roachpb.RangeClosedTimestampPolicy,
queryState replicaoracle.QueryState,
) (roachpb.ReplicaDescriptor, error) {
var oracle replicaoracle.Oracle
if o.useClosestOracle(txn, ctPolicy) {
oracle = o.closest
} else {
oracle = o.binPacking
}
return f.binPacking.Oracle(txn)
return oracle.ChoosePreferredReplica(ctx, txn, desc, leaseholder, ctPolicy, queryState)
}

func (o *followerReadOracle) useClosestOracle(
txn *kv.Txn, ctPolicy roachpb.RangeClosedTimestampPolicy,
) bool {
// NOTE: this logic is almost identical to canSendToFollower, except that it
// operates on a *kv.Txn instead of a roachpb.BatchRequest. As a result, the
// function does not check batchCanBeEvaluatedOnFollower. This is because we
// assume that if a request is going to be executed in a distributed DistSQL
// flow (which is why it is consulting a replicaoracle.Oracle), then all of
// the individual BatchRequests that it send will be eligible to be served
// on follower replicas as follower reads.
//
// If we were to get this assumption wrong, the flow might be planned on a
// node with a follower replica, but individual BatchRequests would still be
// sent to the correct replicas once canSendToFollower is checked for each
// BatchRequests in the DistSender. This would hurt performance, but would
// not violate correctness.
return checkFollowerReadsEnabled(o.clusterID.Get(), o.st) &&
txn != nil &&
closedTimestampLikelySufficient(o.st, o.clock, ctPolicy, txn.MaxObservableTimestamp())
}

// followerReadAwareChoice is a leaseholder choosing policy that detects
// followerReadOraclePolicy is a leaseholder choosing policy that detects
// whether a query can be used with a follower read.
var followerReadAwareChoice = replicaoracle.RegisterPolicy(newOracleFactory)
var followerReadOraclePolicy = replicaoracle.RegisterPolicy(newFollowerReadOracle)

func init() {
sql.ReplicaOraclePolicy = followerReadAwareChoice
sql.ReplicaOraclePolicy = followerReadOraclePolicy
builtins.EvalFollowerReadOffset = evalFollowerReadOffset
kvcoord.CanSendToFollower = canSendToFollower
}
Loading

0 comments on commit a501736

Please sign in to comment.