Skip to content

Commit

Permalink
kv: give non-transactional requests uncertainty intervals
Browse files Browse the repository at this point in the history
Fixes #58459.
Informs #36431.

This commit fixes a long-standing correctness issue where non-transactional
requests did not ensure single-key linearizability even if they deferred their
timestamp allocation to the leaseholder of their (single) range. They still
don't entirely, because of #36431, but this change brings us one step closer to
the fix we plan to land for #36431 also applying to non-transactional requests.

The change addresses this by giving non-transactional requests uncertainty
intervals. This ensures that they also guarantee single-key linearizability even
with only loose (but bounded) clock synchronization. Non-transactional requests
that use a client-provided timestamp do not have uncertainty intervals and do
not make real-time ordering guarantees.

Unlike transactions, which establish an uncertainty interval on their
coordinator node during initialization, non-transactional requests receive
uncertainty intervals from their target leaseholder, using a clock reading
from the leaseholder's local HLC as the local limit and this clock reading +
the cluster's maximum clock skew as the global limit.

It is somewhat non-intuitive that non-transactional requests need uncertainty
intervals — after all, they receive their timestamp to the leaseholder of the
only range that they talk to, so isn't every value with a commit timestamp
above their read timestamp certainly concurrent? The answer is surprisingly
"no" for the following reasons, so they cannot forgo the use of uncertainty
interval:

1. the request timestamp is allocated before consulting the replica's lease.
   This means that there are times when the replica is not the leaseholder at
   the point of timestamp allocation, and only becomes the leaseholder later.
   In such cases, the timestamp assigned to the request is not guaranteed to
   be greater than the written_timestamp of all writes served by the range at
   the time of allocation. This is true despite invariants 1 & 2 presented in
   `pkg/kv/kvserver/uncertainty/doc.go` because the replica allocating the
   timestamp is not yet the leaseholder.

   In cases where the replica that assigned the non-transactional request's
   timestamp takes over as the leaseholder after the timestamp allocation, we
   expect minimumLocalLimitForLeaseholder to forward the local uncertainty
   limit above TimestampFromServerClock, to the lease start time.

   For example, consider the following series of events:
   - client A writes k = v1
   - leaseholder writes v1 at ts = 100
   - client A receives ack for write
   - client B wants to read k using a non-txn request
   - follower replica with slower clock receives non-txn request
   - follower replica assigns request ts = 95
   - lease transferred to follower replica with lease start time = 101
   - non-txn request must use 101 as limit of uncertainty interval to ensure
     that it observes k = v1 in uncertainty interval, performs a server-side
     retry, bumps its read timestamp, and returns k = v1.

2. even if the replica's lease is stable and the timestamp is assigned to the
   non-transactional request by the leaseholder, the assigned clock reading
   only reflects the written_timestamp of all of the writes served by the
   leaseholder (and previous leaseholders) thus far. This clock reading is
   not not guaranteed to lead the commit timestamp of all of these writes,
   especially if they are committed remotely and resolved after the request
   has received its clock reading but before the request begins evaluating.

   As a result, the non-transactional request needs an uncertainty interval
   with a global uncertainty limit far enough in advance of the leaseholder's
   local HLC clock to ensure that it considers any value that was part of a
   transaction which could have committed before the request was received by
   the leaseholder to be uncertain. Concretely, the non-transactional request
   needs to consider values of the following form to be uncertain:

     written_timestamp < local_limit && commit_timestamp < global_limit

   The value that the non-transactional request is observing may have been
   written on the local leaseholder at time 10, its transaction may have been
   committed remotely at time 20, acknowledged, then the non-transactional
   request may have begun and received a timestamp of 15 from the local
   leaseholder, then finally the value may have been resolved asynchronously
   and moved to timestamp 20 (written_timestamp: 10, commit_timestamp: 20).
   The failure of the non-transactional request to observe this value would
   be a stale read.

   For example, consider the following series of events:
   - client A begins a txn and is assigned provisional commit timestamp = 95
   - client A's txn performs a Put(k, v1)
   - leaseholder serves Put(k, v1), lays down intent at written_timestamp = 95
   - client A's txn performs a write elsewhere and hits a WriteTooOldError
     that bumps its provisional commit timestamp to 100
   - client A's txn refreshes to ts = 100. This notably happens without
     involvement of the leaseholder that served the Put (this is at the heart
     of #36431), so that leaseholder's clock is not updated
   - client A's txn commits remotely and client A receives the acknowledgment
   - client B initiates non-txn read of k
   - leaseholder assigns read timestamp ts = 97
   - asynchronous intent resolution resolves the txn's intent at k, moving v1
     to ts = 100 in the process
   - non-txn request must use an uncertainty interval that extends past 100
     to ensure that it observes k = v1 in uncertainty interval, performs a
     server-side retry, bumps its read timestamp, and returns k = v1. Failure
     to do so would be a stale read

----

This change is related to #36431 in two ways. First, it allows non-transactional
requests to benefit from our incoming fix for that issue. Second, it unblocks
some of the clock refactors proposed in #72121 (comment),
and by extension #72663. Even though there are clear bugs today, I still don't
feel comfortable removing the `hlc.Clock.Update` in `Store.Send` until we make
this change. We know from #36431 that invariant 1 from
[`uncertainty.D6`](https://github.com/cockroachdb/cockroach/blob/22df0a6658a927e7b65dafbdfc9d790500791993/pkg/kv/kvserver/uncertainty/doc.go#L131)
doesn't hold, and yet I still do think the `hlc.Clock.Update` in `Store.Send`
masks the bugs in this area in many cases. Removing that clock update (I don't
actually plan to remove it, but I plan to disconnect it entirely from operation
timestamps) without first giving non-transactional requests uncertainty intervals
seems like it may reveal these bugs in ways we haven't seen in the past. So I'd
like to land this fix before making that change.

----

Release note: None
  • Loading branch information
nvanbenschoten committed Feb 3, 2022
1 parent 39857d3 commit 181d586
Show file tree
Hide file tree
Showing 15 changed files with 1,478 additions and 102 deletions.
42 changes: 24 additions & 18 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -52,24 +53,29 @@ func DefaultDeclareIsolatedKeys(
timestamp := header.Timestamp
if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) {
access = spanset.SpanReadOnly
if header.Txn != nil {
// For transactional reads, acquire read latches all the way up to
// the transaction's uncertainty limit, because reads may observe
// writes all the way up to this timestamp.
//
// It is critical that reads declare latches up through their
// uncertainty interval so that they are properly synchronized with
// earlier writes that may have a happened-before relationship with
// the read. These writes could not have completed and returned to
// the client until they were durable in the Range's Raft log.
// However, they may not have been applied to the replica's state
// machine by the time the write was acknowledged, because Raft
// entry application occurs asynchronously with respect to the
// writer (see AckCommittedEntriesBeforeApplication). Latching is
// the only mechanism that ensures that any observers of the write
// wait for the write apply before reading.
timestamp.Forward(header.Txn.GlobalUncertaintyLimit)
}

// For non-locking reads, acquire read latches all the way up to the
// request's worst-case (i.e. global) uncertainty limit, because reads may
// observe writes all the way up to this timestamp.
//
// It is critical that reads declare latches up through their uncertainty
// interval so that they are properly synchronized with earlier writes that
// may have a happened-before relationship with the read. These writes could
// not have completed and returned to the client until they were durable in
// the Range's Raft log. However, they may not have been applied to the
// replica's state machine by the time the write was acknowledged, because
// Raft entry application occurs asynchronously with respect to the writer
// (see AckCommittedEntriesBeforeApplication). Latching is the only
// mechanism that ensures that any observers of the write wait for the write
// apply before reading.
//
// NOTE: we pass an empty lease status here, which means that observed
// timestamps collected by transactions will not be used. The actual
// uncertainty interval used by the request may be smaller (i.e. contain a
// local limit), but we can't determine that until after we have declared
// keys, acquired latches, and consulted the replica's lease.
in := uncertainty.ComputeInterval(header, kvserverpb.LeaseStatus{}, maxOffset)
timestamp.Forward(in.GlobalLimit)
}
latchSpans.AddMVCC(access, req.Header().Span(), timestamp)
lockSpans.AddNonMVCC(access, req.Header().Span())
Expand Down
171 changes: 171 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,177 @@ func TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
require.IsType(t, &roachpb.ReadWithinUncertaintyIntervalError{}, pErr.GetDetail())
}

// TestNonTxnReadWithinUncertaintyIntervalAfterLeaseTransfer tests a case where
// a non-transactional request defers its timestamp allocation to a replica that
// does not hold the lease at the time of receiving the request, but does by the
// time that the request consults the lease. In the test, a value is written on
// the previous leaseholder at a higher timestamp than that assigned to the non-
// transactional request. After the lease transfer, the non-txn request is
// required by uncertainty.ComputeInterval to forward its local uncertainty
// limit to the new lease start time. This prevents the read from ignoring the
// previous write, which avoids a stale read. Instead, the non-txn read hits an
// uncertainty error, performs a server-side retry, and re-evaluates with a
// timestamp above the write.
//
// This test exercises the hazard described in "reason #1" of uncertainty.D7.
func TestNonTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// Inject a request filter, which intercepts the server-assigned timestamp
// of a non-transactional request and then blocks that request until after
// the lease has been transferred to the server.
type nonTxnGetKey struct{}
nonTxnOrigTsC := make(chan hlc.Timestamp, 1)
nonTxnBlockerC := make(chan struct{})
requestFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ctx.Value(nonTxnGetKey{}) != nil {
// Give the test the server-assigned timestamp.
require.NotNil(t, ba.TimestampFromServerClock)
nonTxnOrigTsC <- ba.Timestamp
// Wait for the test to give the go-ahead.
select {
case <-nonTxnBlockerC:
case <-ctx.Done():
}
}
return nil
}
var uncertaintyErrs int32
concurrencyRetryFilter := func(ctx context.Context, _ roachpb.BatchRequest, pErr *roachpb.Error) {
if ctx.Value(nonTxnGetKey{}) != nil {
if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok {
atomic.AddInt32(&uncertaintyErrs, 1)
}
}
}

const numNodes = 2
var manuals []*hlc.HybridManualClock
for i := 0; i < numNodes; i++ {
manuals = append(manuals, hlc.NewHybridManualClock())
}
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manuals[i].UnixNano,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: requestFilter,
TestingConcurrencyRetryFilter: concurrencyRetryFilter,
},
},
}
}
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

// Split off a scratch range and upreplicate to node 2.
key := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))

// Pause the servers' clocks going forward.
var maxNanos int64
for _, m := range manuals {
m.Pause()
if cur := m.UnixNano(); cur > maxNanos {
maxNanos = cur
}
}
// After doing so, perfectly synchronize them.
for _, m := range manuals {
m.Increment(maxNanos - m.UnixNano())
}

// Initiate a non-txn read on node 2. The request will be intercepted after
// the request has received a server-assigned timestamp, but before it has
// consulted the lease. We'll transfer the lease to node 2 before the request
// checks, so that it ends up evaluating on node 2.
type resp struct {
*roachpb.BatchResponse
*roachpb.Error
}
nonTxnRespC := make(chan resp, 1)
_ = tc.Stopper().RunAsyncTask(ctx, "non-txn get", func(ctx context.Context) {
ctx = context.WithValue(ctx, nonTxnGetKey{}, "foo")
ba := roachpb.BatchRequest{}
ba.RangeID = desc.RangeID
ba.Add(getArgs(key))
br, pErr := tc.GetFirstStoreFromServer(t, 1).Send(ctx, ba)
nonTxnRespC <- resp{br, pErr}
})

// Wait for the non-txn read to get stuck.
var nonTxnOrigTs hlc.Timestamp
select {
case nonTxnOrigTs = <-nonTxnOrigTsC:
case nonTxnResp := <-nonTxnRespC:
t.Fatalf("unexpected response %+v", nonTxnResp)
}

// Advance the clock on node 1.
manuals[0].Increment(100)

// Perform a non-txn write on node 1. This will grab a timestamp from node 1's
// clock, which leads the clock on node 2 and the timestamp assigned to the
// non-txn read.
//
// NOTE: we perform the clock increment and write _after_ sending the non-txn
// read. Ideally, we would write this test such that we did this before
// beginning the read on node 2, so that the absence of an uncertainty error
// would be a true "stale read". However, doing so causes the test to be flaky
// because background operations can leak the clock signal from node 1 to node
// 2 between the time that we write and the time that the non-txn read request
// is sent. If we had a way to disable all best-effort HLC clock stabilization
// channels and only propagate clock signals when strictly necessary then it's
// possible that we could avoid flakiness. For now, we just re-order the
// operations and assert that we observe an uncertainty error even though its
// absence would not be a true stale read.
ba := roachpb.BatchRequest{}
ba.Add(putArgs(key, []byte("val")))
br, pErr := tc.Servers[0].DistSender().Send(ctx, ba)
require.Nil(t, pErr)
writeTs := br.Timestamp
require.True(t, nonTxnOrigTs.Less(writeTs))

// Then transfer the lease to node 2. The new lease should end up with a start
// time above the timestamp assigned to the non-txn read.
var lease roachpb.Lease
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))
testutils.SucceedsSoon(t, func() error {
repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(desc.StartKey)
lease, _ = repl.GetLease()
if lease.Replica.NodeID != repl.NodeID() {
return errors.Errorf("expected lease transfer to node 2: %s", lease)
}
return nil
})
require.True(t, nonTxnOrigTs.Less(lease.Start.ToTimestamp()))

// Let the non-txn read proceed. It should complete, but only after hitting a
// ReadWithinUncertaintyInterval, performing a server-side retry, reading
// again at a higher timestamp, and returning the written value.
close(nonTxnBlockerC)
nonTxnResp := <-nonTxnRespC
require.Nil(t, nonTxnResp.Error)
br = nonTxnResp.BatchResponse
require.NotNil(t, br)
require.True(t, nonTxnOrigTs.Less(br.Timestamp))
require.True(t, writeTs.LessEq(br.Timestamp))
require.Len(t, br.Responses, 1)
require.NotNil(t, br.Responses[0].GetGet())
require.NotNil(t, br.Responses[0].GetGet().Value)
require.Equal(t, writeTs, br.Responses[0].GetGet().Value.Timestamp)
require.Equal(t, int32(1), atomic.LoadInt32(&uncertaintyErrs))
}

// TestRangeLookupUseReverse tests whether the results and the results count
// are correct when scanning in reverse order.
func TestRangeLookupUseReverse(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,8 @@ func canDoServersideRetry(
case *roachpb.WriteTooOldError:
newTimestamp = tErr.RetryTimestamp()

// TODO(nvanbenschoten): give non-txn requests uncertainty intervals. #73732.
//case *roachpb.ReadWithinUncertaintyIntervalError:
// newTimestamp = tErr.RetryTimestamp()
case *roachpb.ReadWithinUncertaintyIntervalError:
newTimestamp = tErr.RetryTimestamp()

default:
return false
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *Replica) executeReadOnlyBatch(

// Compute the transaction's local uncertainty limit using observed
// timestamps, which can help avoid uncertainty restarts.
ui := uncertainty.ComputeInterval(ba.Txn, st)
ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset())

// Evaluate read-only batch command.
rec := NewReplicaEvalContext(r, g.LatchSpans())
Expand Down
32 changes: 17 additions & 15 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,21 +808,23 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError(
if !canDoServersideRetry(ctx, pErr, ba, nil, nil, nil) {
return nil, pErr
}
// TODO(nvanbenschoten): give non-txn requests uncertainty intervals. #73732.
//if ba.Txn == nil && ba.Timestamp.Synthetic {
// // If the request is non-transactional and it was refreshed into the future
// // after observing a value with a timestamp in the future, immediately sleep
// // until its new read timestamp becomes present. We don't need to do this
// // for transactional requests because they will do this during their
// // commit-wait sleep after committing.
// //
// // See TxnCoordSender.maybeCommitWait for a discussion about why doing this
// // is necessary to preserve real-time ordering for transactions that write
// // into the future.
// if err := r.Clock().SleepUntil(ctx, ba.Timestamp); err != nil {
// return nil, roachpb.NewError(err)
// }
//}
if ba.Txn == nil && ba.Timestamp.Synthetic {
// If the request is non-transactional and it was refreshed into the future
// after observing a value with a timestamp in the future, immediately sleep
// until its new read timestamp becomes present. We don't need to do this
// for transactional requests because they will do this during their
// commit-wait sleep after committing.
//
// See TxnCoordSender.maybeCommitWait for a discussion about why doing this
// is necessary to preserve real-time ordering for transactions that write
// into the future.
var cancel func()
ctx, cancel = r.store.Stopper().WithCancelOnQuiesce(ctx)
defer cancel()
if err := r.Clock().SleepUntil(ctx, ba.Timestamp); err != nil {
return nil, roachpb.NewError(err)
}
}
return ba, nil
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10696,6 +10696,27 @@ func TestReplicaServersideRefreshes(t *testing.T) {
// EndTransaction. This is hard to do at the moment, though, because we
// never defer the handling of the write too old conditions to the end of
// the transaction (but we might in the future).
{
name: "serverside-refresh of read within uncertainty interval error on get in non-txn",
setupFn: func() (hlc.Timestamp, error) {
return put("a", "put")
},
batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) {
ba.Timestamp = ts.Prev()
// NOTE: set the TimestampFromServerClock field manually. This is
// usually set on the server for non-transactional requests without
// client-assigned timestamps. It is also usually set to the same
// value as the server-assigned timestamp. But to have more control
// over the uncertainty interval that this request receives, we set
// it here to a value above the request timestamp.
serverTS := ts.Next()
ba.TimestampFromServerClock = (*hlc.ClockTimestamp)(&serverTS)
expTS = ts.Next()
get := getArgs(roachpb.Key("a"))
ba.Add(&get)
return
},
},
{
name: "serverside-refresh of read within uncertainty interval error on get in non-1PC txn",
setupFn: func() (hlc.Timestamp, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (r *Replica) executeWriteBatch(

// Compute the transaction's local uncertainty limit using observed
// timestamps, which can help avoid uncertainty restarts.
ui := uncertainty.ComputeInterval(ba.Txn, st)
ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset())

// Start tracking this request if it is an MVCC write (i.e. if it's the kind
// of request that needs to obey the closed timestamp). The act of tracking
Expand Down
Loading

0 comments on commit 181d586

Please sign in to comment.