Skip to content

Commit

Permalink
storage: leave intents behind after blind-writes experiencing write-t…
Browse files Browse the repository at this point in the history
…oo-old

Before this patch, any write running into a write-too-old condition
resulted in a WriteTooOldError being returned by the server. Returning
an error implies that no intents are left behind. This is unfortunate;
we'd like to leave intents (or, in the future, other types of locks)
behind so keep away other transactions. We've observed this resulting in
the starvation of a class of transactions in a user's workload.

This patch makes it so that blind writes (i.e. Puts - used by UPDATE,
not CPuts) don't return WriteTooOldErrors any more. Instead, they return
the a txn proto with the WriteTooOld flag set. This is the behavior they
had before #38668. This patch retains the goal of #38668, however: the
client now eagerly refreshes the transactions when it sees a WriteTooOld
flag, and if the refresh succeeds, it returns a WriteTooOldError to the
higher layers (SQL), allowing for automatic retries where applicable.

Unfortunately, CPuts (used by INSERT) continue to return
WriteTooOldErrors without leaving locks behind. Dealing with them
requires more tenderness because they imply a read, and the timestamp of
a read cannot be bumped as easily as that of a write.

Touches #44653

Release note (sql change): UPDATEs returning a serialization failure error (code
40001) now leave behind a lock, helping the transaction succeed if it
retries. This prevents starvation of transactions whose UPDATEs are
prone to conflicts.
  • Loading branch information
andreimatei committed Feb 13, 2020
1 parent 56cced1 commit da5dc5b
Show file tree
Hide file tree
Showing 15 changed files with 814 additions and 893 deletions.
43 changes: 7 additions & 36 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 5 additions & 26 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 0 additions & 66 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1773,19 +1773,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
txnCoordRetry: true,
},
{
name: "deferred write too old with put",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Header.DeferWriteTooOldError = true
b.Put("a", "put")
return txn.Run(ctx, b)
},
// This trivially succeeds as there are no refresh spans.
},
{
name: "write too old with put timestamp leaked",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down Expand Up @@ -2150,23 +2137,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
txnCoordRetry: true,
},
{
name: "multi-range batch with deferred write too old",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "c", "value")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Header.DeferWriteTooOldError = true
b.Put("a", "put")
b.Put("c", "put")
// Both sub-batches will succeed, but the Put(a) will return a pushed
// timestamp, which is turned into a retriable error by the txnCommitter
// interceptor (because it's concurrent with writing the STAGING record).
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: true,
},
{
name: "multi-range batch with write too old and failed cput",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down Expand Up @@ -2201,42 +2171,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
name: "multi-range batch with deferred write too old and failed cput",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "value")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Header.DeferWriteTooOldError = true
b.CPut("a", "cput", strToValue("orig"))
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: false, // non-matching value means we fail txn coord retry
expFailure: "unexpected value", // the failure we get is a condition failed error
},
{
name: "multi-range batch with deferred write too old and successful cput",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Header.DeferWriteTooOldError = true
b.CPut("a", "cput", strToValue("orig"))
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
// This test checks the behavior of batches that were split by the
// DistSender. We'll check that the whole batch is retried after a
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ func newLeafTxnCoordSender(
tcf *TxnCoordSenderFactory, tis *roachpb.LeafTxnInputState,
) client.TxnSender {
txn := &tis.Txn
// 19.2 roots might have this flag set. In 20.1, the flag is only set by the
// server and terminated by the client in the span refresher interceptor. If
// the root is a 19.2 node, we reset the flag because it only confuses
// that interceptor and provides no benefit.
txn.WriteTooOld = false
txn.AssertInitialized(context.TODO())

// Deal with requests from 19.2 nodes which did not set ReadTimestamp.
Expand Down
43 changes: 43 additions & 0 deletions pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,45 @@ func (sr *txnSpanRefresher) SendLocked(
func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
ctx context.Context, ba roachpb.BatchRequest, maxRefreshAttempts int,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.Txn.WriteTooOld && sr.canAutoRetry {
// The WriteTooOld flag is not supposed to be set on requests. It's only set
// by the server and it's terminated by this interceptor on the client.
log.Fatalf(ctx, "unexpected WriteTooOld request. ba: %s (txn: %s)",
ba.String(), ba.Txn.String())
}
br, pErr := sr.sendHelper(ctx, ba)
if pErr == nil && br.Txn.WriteTooOld {
// If we got a response with the WriteTooOld flag set, then we pretend that
// we got a WriteTooOldError, which will cause us to attempt to refresh and
// propagate the error if we failed. When it can, the server prefers to
// return the WriteTooOld flag, rather than a WriteTooOldError because, in
// the former case, it can leave intents behind. We like refreshing eagerly
// when the WriteTooOld flag is set because it's likely that the refresh
// will fail (if we previously read the key that's now causing a WTO, then
// the refresh will surely fail).
// TODO(andrei): Implement a more discerning policy based on whether we've
// read that key before.
//
// If the refresh fails, we could continue running the transaction even
// though it will not be able to commit, in order for it to lay down more
// intents. Not doing so, though, gives the SQL a chance to auto-retry.
// TODO(andrei): Implement a more discerning policy based on whether
// auto-retries are still possible.
//
// For the refresh, we have two options: either refresh everything read
// *before* this batch, and then retry this batch, or refresh the current
// batch's reads too and then, if successful, there'd be nothing to refresh.
// We take the former option by setting br = nil below to minimized the
// chances that the refresh fails.
bumpedTxn := br.Txn.Clone()
bumpedTxn.WriteTooOld = false
bumpedTxn.ReadTimestamp = bumpedTxn.WriteTimestamp
pErr = roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_WRITE_TOO_OLD,
"WriteTooOld flag converted to WriteTooOldError"),
bumpedTxn)
br = nil
}
if pErr != nil && maxRefreshAttempts > 0 {
br, pErr = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts)
}
Expand All @@ -245,6 +283,11 @@ func (sr *txnSpanRefresher) maybeRetrySend(
return nil, pErr
}

// If a prefix of the batch was executed, collect refresh spans for
// that executed portion, and retry the remainder. The canonical
// case is a batch split between everything up to but not including
// the EndTxn. Requests up to the EndTxn succeed, but the EndTxn
// fails with a retryable error. We want to retry only the EndTxn.
ba.UpdateTxn(retryTxn)
log.VEventf(ctx, 2, "retrying %s at refreshed timestamp %s because of %s",
ba, retryTxn.ReadTimestamp, pErr)
Expand Down
29 changes: 28 additions & 1 deletion pkg/kv/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")

cases := []struct {
// If name is not set, the test will use pErr.String().
name string
// OnFirstSend, if set, is invoked to evaluate the batch. If not set, pErr()
// will be used to provide an error.
onFirstSend func(request roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
pErr func() *roachpb.Error
expRefresh bool
expRefreshTS hlc.Timestamp
Expand Down Expand Up @@ -155,9 +160,28 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
},
expRefresh: false,
},
{
name: "write_too_old flag",
onFirstSend: func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: true,
expRefreshTS: txn.WriteTimestamp.Add(20, 1), // Same as br.Txn.WriteTimestamp.
},
}
for _, tc := range cases {
t.Run(tc.pErr().String(), func(t *testing.T) {
name := tc.name
if name == "" {
name = tc.pErr().String()
}
if (tc.onFirstSend != nil) == (tc.pErr != nil) {
panic("exactly one tc.onFirstSend and tc.pErr must be set")
}
t.Run(name, func(t *testing.T) {
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

Expand All @@ -183,6 +207,9 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())

// Return a transaction retry error.
if tc.onFirstSend != nil {
return tc.onFirstSend(ba)
}
pErr = tc.pErr()
pErr.SetTxn(ba.Txn)
return nil, pErr
Expand Down
Loading

0 comments on commit da5dc5b

Please sign in to comment.