Skip to content

Commit

Permalink
kv: eliminate write-too-old deferral mechanism
Browse files Browse the repository at this point in the history
Informs cockroachdb#102751.

This commit eliminates the write-too-old deferral mechanism, where blind-write
BatchRequests that hit a WriteTooOld error would successfully write intents and
then return a Transaction proto with the WriteTooOld flag to the client. The
client would then immediately refresh to remove this flag. However, in the
intermediate period, the written intents would act as locks to ensure that if
the refresh succeeded, the writer would have exclusive access to the previously
written keys and would not hit a WriteTooOld error on its next attempt.

The rationale for the removal of this mechanism is outlined in cockroachdb#102751. At a
high-level, the mechanism is complex, error-prone, and sufficiently unnecessary
today due to unreplicated locks and server-side refreshes. It also interacts
poorly with weak isolation levels, which further motivates its removal.

Cases where the write-too-old deferral mechanism is still hypothetically useful
are difficult to construct, especially from SQL's limited use of KV. They
require the following conditions to all hold:

1. a blind-writing BatchRequest (containing Put or Delete, but not ConditionalPut)
2. a BatchRequest without the CanForwardReadTimestamp flag (needs client-side refresh)
3. a write-write conflict that will not cause a refresh to fail

These requirement are almost always contradictory. A write-write conflict
implies a failed refresh if the refresher has already read the conflicting key.
So the cases where this mechanism help are limited to those where the writer has
not already read the conflicting key. However, SQL rarely issues blind-write KV
requests keys that it has not already read. The cases where this might come up
are fast-path DELETE statements that issue DeleteRequest (not
DeleteRangeRequest) and fast-path UPSERT statements that write all columns in a
table. If either of these are heavily contended and take place in
multi-statement transactions that previously read, this mechanism could help.
However, I suspect that these scenarios are very uncommon. If customers do see
them, they can avoid problems by using SELECT FOR UPDATE earlier in the
transaction or by using Read Committed (which effectively resets the
CanForwardReadTimestamp flag on each statement boundary).

The commit does not yet remove the Transaction.WriteTooOld field, as this must
remain until compatibility with v23.1 nodes is no longer a concern.

Release note: None
  • Loading branch information
nvanbenschoten committed May 16, 2023
1 parent 6f2512d commit b52c900
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 237 deletions.
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3211,13 +3211,13 @@ func TestTxnCoordSenderRetries(t *testing.T) {
return txn.CommitInBatch(ctx, b)
},
priorReads: true,
// The Put to "c" will fail, failing the parallel commit and forcing a
// parallel commit auto-retry and preemptive client-side refresh.
// The Put to "c" will fail, failing the parallel commit with an error and
// forcing a client-side refresh and auto-retry of the full batch.
allIsoLevels: &expect{
expServerRefresh: false,
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: false,
expParallelCommitAutoRetry: true,
expClientAutoRetryAfterRefresh: true,
expParallelCommitAutoRetry: false,
},
},
{
Expand Down
43 changes: 0 additions & 43 deletions pkg/kv/kvclient/kvcoord/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,49 +786,6 @@ func TestTxnCommitTimestampAdvancedByRefresh(t *testing.T) {
require.NoError(t, err)
}

// Test that in some write too old situations (i.e. when the server returns the
// WriteTooOld flag set and then the client fails to refresh), intents are
// properly left behind.
func TestTxnLeavesIntentBehindAfterWriteTooOldError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s := createTestDB(t)
defer s.Stop()

key := []byte("b")

txn := s.DB.NewTxn(ctx, "test txn")
// Perform a Get so that the transaction can't refresh.
_, err := txn.Get(ctx, key)
require.NoError(t, err)

// Another guy writes at a higher timestamp.
require.NoError(t, s.DB.Put(ctx, key, "newer value"))

// Now we write and expect a WriteTooOld.
intentVal := []byte("test")
err = txn.Put(ctx, key, intentVal)
require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err)
require.Regexp(t, "WriteTooOld", err)

// Check that the intent was left behind.
b := kv.Batch{}
b.Header.ReadConsistency = kvpb.READ_UNCOMMITTED
b.Get(key)
require.NoError(t, s.DB.Run(ctx, &b))
getResp := b.RawResponse().Responses[0].GetGet()
require.NotNil(t, getResp)
intent := getResp.IntentValue
require.NotNil(t, intent)
intentBytes, err := intent.GetBytes()
require.NoError(t, err)
require.Equal(t, intentVal, intentBytes)

// Cleanup.
require.NoError(t, txn.Rollback(ctx))
}

// Test that a transaction can be used after a CPut returns a
// ConditionFailedError. This is not generally allowed for other errors, but
// ConditionFailedError is special.
Expand Down
17 changes: 0 additions & 17 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,6 @@ func IsReadOnly(args Request) bool {
return (flags&isRead) != 0 && (flags&isWrite) == 0
}

// IsBlindWrite returns true iff the request is a blind-write. A request is a
// blind-write if it is a write that does not observe any key-value state when
// modifying that state (such as puts and deletes). This is in contrast with
// read-write requests, which do observe key-value state when modifying the
// state and may base their modifications off of this existing state (such as
// conditional puts and increments).
//
// As a result of being "blind", blind-writes are allowed to be more freely
// re-ordered with other writes. In practice, this means that they can be
// evaluated with a read timestamp below another write and a write timestamp
// above that write without needing to re-evaluate. This allows the WriteTooOld
// error that is generated during such an occurrence to be deferred.
func IsBlindWrite(args Request) bool {
flags := args.flags()
return (flags&isRead) == 0 && (flags&isWrite) != 0
}

// IsTransactional returns true if the request may be part of a
// transaction.
func IsTransactional(args Request) bool {
Expand Down
188 changes: 63 additions & 125 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,25 +215,10 @@ func evaluateBatch(

var mergedResult result.Result

// WriteTooOldErrors have particular handling. When a request encounters the
// error, we'd like to lay down an intent in order to avoid writers being
// starved. So, for blind writes, we swallow the error and instead we set the
// WriteTooOld flag on the response. For non-blind writes (e.g. CPut), we
// can't do that and so we just return the WriteTooOldError - see note on
// IsRead() stanza below. Upon receiving either a WriteTooOldError or a
// response with the WriteTooOld flag set, the client will attempt to bump
// the txn's read timestamp through a refresh. If successful, the client
// will retry this batch (in both cases).
//
// In any case, evaluation of the current batch always continue after a
// WriteTooOldError in order to find out if there's more conflicts and chose
// a final write timestamp.
var writeTooOldState struct {
err *kvpb.WriteTooOldError
// cantDeferWTOE is set when a WriteTooOldError cannot be deferred past the
// end of the current batch.
cantDeferWTOE bool
}
// WriteTooOldErrors have particular handling. Evaluation of the current batch
// continues after a WriteTooOldError in order to find out if there's more
// conflicts and chose a final write timestamp to return.
var deferredWriteTooOldErr *kvpb.WriteTooOldError

// Only collect the scan stats if the tracing is enabled.
var ss *kvpb.ScanStats
Expand All @@ -254,6 +239,17 @@ func evaluateBatch(
// Execute the command.
args := union.GetInner()

if deferredWriteTooOldErr != nil && args.Method() == kvpb.EndTxn {
// ... unless we have been deferring a WriteTooOld error and have now
// reached an EndTxn request. In such cases, break and return the error.
// The transaction needs to handle the WriteTooOld error before it tries
// to commit. This short-circuiting is not necessary for correctness as
// the write batch will be discarded in favor of the deferred error, but
// we don't want to bother with potentially expensive EndTxn evaluation if
// we know the result will be thrown away.
break
}

if baHeader.Txn != nil {
// Set the Request's sequence number on the TxnMeta for this
// request. The MVCC layer (currently) uses TxnMeta to
Expand Down Expand Up @@ -325,79 +321,6 @@ func evaluateBatch(
reply.SetHeader(headerCopy)
}

if err != nil {
// If an EndTxn wants to restart because of a write too old, we
// might have a better error to return to the client.
if retErr := (*kvpb.TransactionRetryError)(nil); errors.As(err, &retErr) &&
retErr.Reason == kvpb.RETRY_WRITE_TOO_OLD &&
args.Method() == kvpb.EndTxn && writeTooOldState.err != nil {
err = writeTooOldState.err
// Don't defer this error. We could perhaps rely on the client observing
// the WriteTooOld flag and retry the batch, but we choose not too.
writeTooOldState.cantDeferWTOE = true
} else if wtoErr := (*kvpb.WriteTooOldError)(nil); errors.As(err, &wtoErr) {
// We got a WriteTooOldError. We continue on to run all
// commands in the batch in order to determine the highest
// timestamp for more efficient retries. If the batch is
// transactional, we continue to lay down intents so that
// other concurrent overlapping transactions are forced
// through intent resolution and the chances of this batch
// succeeding when it will be retried are increased.
if writeTooOldState.err != nil {
writeTooOldState.err.ActualTimestamp.Forward(
wtoErr.ActualTimestamp)
} else {
writeTooOldState.err = wtoErr
}

// For read-write requests that observe key-value state, we don't have
// the option of leaving an intent behind when they encounter a
// WriteTooOldError, so we have to return an error instead of a response
// with the WriteTooOld flag set (which would also leave intents
// behind). These requests need to be re-evaluated at the bumped
// timestamp in order for their results to be valid. The current
// evaluation resulted in an result that could well be different from
// what the request would return if it were evaluated at the bumped
// timestamp, which would cause the request to be rejected if it were
// sent again with the same sequence number after a refresh.
//
// Similarly, for read-only requests that encounter a WriteTooOldError,
// we don't have the option of returning a response with the WriteTooOld
// flag set because a response is not even generated in tandem with the
// WriteTooOldError. We could fix this and then allow WriteTooOldErrors
// to be deferred in these cases, but doing so would buy more into the
// extremely error-prone approach of retuning responses and errors
// together throughout the MVCC read path. Doing so is not desirable as
// it has repeatedly caused bugs in the past. Instead, we'd like to get
// rid of this pattern entirely and instead address the TODO below.
//
// TODO(andrei): What we really want to do here is either speculatively
// evaluate the request at the bumped timestamp and return that
// speculative result, or leave behind an unreplicated lock that won't
// prevent the request for evaluating again at the same sequence number
// but at a bumped timestamp.
if !kvpb.IsBlindWrite(args) {
writeTooOldState.cantDeferWTOE = true
}

if baHeader.Txn != nil {
log.VEventf(ctx, 2, "setting WriteTooOld because of key: %s. wts: %s -> %s",
args.Header().Key, baHeader.Txn.WriteTimestamp, wtoErr.ActualTimestamp)
baHeader.Txn.WriteTimestamp.Forward(wtoErr.ActualTimestamp)
baHeader.Txn.WriteTooOld = true
} else {
// For non-transactional requests, there's nowhere to defer the error
// to. And the request has to fail because non-transactional batches
// should read and write at the same timestamp.
writeTooOldState.cantDeferWTOE = true
}

// Clear error; we're done processing the WTOE for now and we'll return
// to considering it below after we've evaluated all requests.
err = nil
}
}

// Even on error, we need to propagate the result of evaluation.
//
// TODO(tbg): find out if that's true and why and improve the comment.
Expand All @@ -409,12 +332,38 @@ func evaluateBatch(
)
}

// Handle errors thrown by evaluation, either eagerly or through deferral.
if err != nil {
pErr := kvpb.NewErrorWithTxn(err, baHeader.Txn)
// Initialize the error index.
pErr.SetErrorIndex(int32(index))
var wtoErr *kvpb.WriteTooOldError
switch {
case errors.As(err, &wtoErr):
// We got a WriteTooOldError. We continue on to run all commands in the
// batch in order to determine the highest timestamp for more efficient
// retries.
if deferredWriteTooOldErr != nil {
deferredWriteTooOldErr.ActualTimestamp.Forward(wtoErr.ActualTimestamp)
} else {
deferredWriteTooOldErr = wtoErr
}

if baHeader.Txn != nil {
log.VEventf(ctx, 2, "advancing write timestamp due to "+
"WriteTooOld error on key: %s. wts: %s -> %s",
args.Header().Key, baHeader.Txn.WriteTimestamp, wtoErr.ActualTimestamp)
baHeader.Txn.WriteTimestamp.Forward(wtoErr.ActualTimestamp)
}

return nil, mergedResult, pErr
// Clear error and fall through to the success path; we're done
// processing the error for now. We'll return it below after we've
// evaluated all requests.
err = nil

default:
// For all other error types, immediately propagate the error.
pErr := kvpb.NewErrorWithTxn(err, baHeader.Txn)
pErr.SetErrorIndex(int32(index))
return nil, mergedResult, pErr
}
}

// If the last request was carried out with a limit, subtract the number
Expand Down Expand Up @@ -449,21 +398,17 @@ func evaluateBatch(
}

// If we made it here, there was no error during evaluation, with the exception of
// a deferred WTOE. If it can't be deferred - return it now; otherwise it is swallowed.
// Note that we don't attach an Index to the returned Error.
// a deferred WriteTooOld error. Return that now.
//
// TODO(tbg): we could attach the index of the first WriteTooOldError seen, but does
// that buy us anything?
if writeTooOldState.cantDeferWTOE {
if deferredWriteTooOldErr != nil {
// NB: we can't do any error wrapping here yet due to compatibility with 20.2 nodes;
// there needs to be an ErrorDetail here.
return nil, mergedResult, kvpb.NewErrorWithTxn(writeTooOldState.err, baHeader.Txn)
// TODO(nvanbenschoten): this comment is now stale. Address it.
return nil, mergedResult, kvpb.NewErrorWithTxn(deferredWriteTooOldErr, baHeader.Txn)
}

// The batch evaluation will not return an error (i.e. either everything went
// fine or we're deferring a WriteTooOldError by having bumped
// baHeader.Txn.WriteTimestamp).

// Update the batch response timestamp field to the timestamp at which the
// batch's reads were evaluated.
if baHeader.Txn != nil {
Expand Down Expand Up @@ -553,12 +498,13 @@ func evaluateCommand(
return pd, err
}

// canDoServersideRetry looks at the error produced by evaluating ba (or the
// WriteTooOldFlag in br.Txn if there's no error) and decides if it's possible
// to retry the batch evaluation at a higher timestamp. Retrying is sometimes
// possible in case of some retriable errors which ask for higher timestamps:
// for transactional requests, retrying is possible if the transaction had not
// performed any prior reads that need refreshing.
// canDoServersideRetry looks at the error produced by evaluating ba and decides
// if it's possible to retry the batch evaluation at a higher timestamp.
//
// Retrying is sometimes possible in case of some retriable errors which ask for
// higher timestamps. For transactional requests, retrying is possible if the
// transaction had not performed any prior reads that need refreshing. For
// non-transactional requests, retrying is always possible.
//
// This function is called both below and above latching, which is indicated by
// the concurrency guard argument. The concurrency guard, if not nil, indicates
Expand All @@ -578,10 +524,12 @@ func canDoServersideRetry(
ctx context.Context,
pErr *kvpb.Error,
ba *kvpb.BatchRequest,
br *kvpb.BatchResponse,
g *concurrency.Guard,
deadline hlc.Timestamp,
) bool {
if pErr == nil {
log.Fatalf(ctx, "canDoServersideRetry called without error")
}
if ba.Txn != nil {
if !ba.CanForwardReadTimestamp {
return false
Expand All @@ -597,22 +545,12 @@ func canDoServersideRetry(

var newTimestamp hlc.Timestamp
if ba.Txn != nil {
if pErr != nil {
var ok bool
ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr)
if !ok {
return false
}
} else {
if !br.Txn.WriteTooOld {
log.Fatalf(ctx, "expected the WriteTooOld flag to be set")
}
newTimestamp = br.Txn.WriteTimestamp
var ok bool
ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr)
if !ok {
return false
}
} else {
if pErr == nil {
log.Fatalf(ctx, "canDoServersideRetry called for non-txn request without error")
}
switch tErr := pErr.GetDetail().(type) {
case *kvpb.WriteTooOldError:
newTimestamp = tErr.RetryTimestamp()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
// retry at a higher timestamp because it is not isolated at higher
// timestamps.
latchesHeld := g != nil
if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{}) {
if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) {
// TODO(aayush,arul): These metrics are incorrect at the moment since
// hitting this branch does not mean that we won't serverside retry, it
// just means that we will have to reacquire latches.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError(
// Attempt a server-side retry of the request. Note that we pass nil for
// latchSpans, because we have already released our latches and plan to
// re-acquire them if the retry is allowed.
if !canDoServersideRetry(ctx, pErr, ba, nil /* br */, nil /* g */, hlc.Timestamp{} /* deadline */) {
if !canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */) {
r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1)
return nil, pErr
}
Expand Down
Loading

0 comments on commit b52c900

Please sign in to comment.