From a147f24d94b4ecb99162c0808e4a037f29166658 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Mon, 10 Dec 2018 17:55:34 -0500 Subject: [PATCH] storage: allow transactions to run at a lower sequence Adds support to have transactions run at a lower sequence at a given key. It asserts the value it computes with the value written for the sequence in the intent history instead or returning a retry-able error. Release note: None --- pkg/roachpb/batch.go | 5 -- pkg/storage/client_split_test.go | 8 +- pkg/storage/engine/enginepb/mvcc.go | 26 ++++++ pkg/storage/engine/mvcc.go | 113 +++++++++++++++++++++--- pkg/storage/engine/mvcc_test.go | 132 ++++++++++++++++++++++++++-- pkg/storage/replica.go | 27 ------ pkg/storage/replica_test.go | 14 ++- 7 files changed, 266 insertions(+), 59 deletions(-) diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index faf0e88582a0..2e6ea0b7000d 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -123,11 +123,6 @@ func (ba *BatchRequest) IsTransactionWrite() bool { return ba.hasFlag(isTxnWrite) } -// IsRange returns true iff the BatchRequest contains range-based requests. -func (ba *BatchRequest) IsRange() bool { - return ba.hasFlag(isRange) -} - // IsUnsplittable returns true iff the BatchRequest an un-splittable request. func (ba *BatchRequest) IsUnsplittable() bool { return ba.hasFlag(isUnsplittable) diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 93031634a02d..89e04a5fecbc 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -674,8 +674,8 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { _, pErr := client.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ Txn: &lTxn, }, lIncArgs) - if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { - t.Fatalf("unexpected idempotency failure: %v", pErr) + if pErr != nil { + t.Fatal(pErr) } // Send out the same increment copied from above (same txn/sequence), but @@ -684,8 +684,8 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { RangeID: newRng.RangeID, Txn: &rTxn, }, rIncArgs) - if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { - t.Fatalf("unexpected idempotency failure: %v", pErr) + if pErr != nil { + t.Fatal(pErr) } // Compare stats of split ranges to ensure they are non zero and diff --git a/pkg/storage/engine/enginepb/mvcc.go b/pkg/storage/engine/enginepb/mvcc.go index 3ac33fccc510..dc8fa217215c 100644 --- a/pkg/storage/engine/enginepb/mvcc.go +++ b/pkg/storage/engine/enginepb/mvcc.go @@ -14,6 +14,8 @@ package enginepb +import "sort" + // Short returns a prefix of the transaction's ID. func (t TxnMeta) Short() string { return t.ID.Short() @@ -135,3 +137,27 @@ func (meta *MVCCMetadata) AddToIntentHistory(seq int32, val []byte) { meta.IntentHistory = append(meta.IntentHistory, MVCCMetadata_SequencedIntent{Sequence: seq, Value: val}) } + +// GetPrevIntentSeq goes through the intent history and finds the previous +// intent's sequence number given the current sequence. +func (meta *MVCCMetadata) GetPrevIntentSeq(seq int32) (int32, bool) { + index := sort.Search(len(meta.IntentHistory), func(i int) bool { + return meta.IntentHistory[i].Sequence >= seq + }) + if index > 0 && index < len(meta.IntentHistory) { + return meta.IntentHistory[index-1].Sequence, true + } + return 0, false +} + +// GetIntentValue goes through the intent history and finds the value +// written at the sequence number. +func (meta *MVCCMetadata) GetIntentValue(seq int32) ([]byte, bool) { + index := sort.Search(len(meta.IntentHistory), func(i int) bool { + return meta.IntentHistory[i].Sequence >= seq + }) + if index < len(meta.IntentHistory) && meta.IntentHistory[index].Sequence == seq { + return meta.IntentHistory[index].Value, true + } + return nil, false +} diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index 9d6a03a6c80d..80d6d237e773 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -1140,6 +1140,105 @@ func maybeGetValue( return valueFn(exVal) } +// replayTransactionalWrite performs a transactional write under the assumption +// that the transactional write was already executed before. Essentially a replay. +// Since transactions should be idempotent, we must be particularly careful +// about writing an intent if it was already written. If the sequence of the +// transaction is at or below one found in `meta.Txn` then we should +// simply assert the value we're trying to add against the value that was +// previously written at that sequence. +// +// 1) Firstly, we find the value previously written as part of the same sequence. +// 2) We then figure out the value the transaction is trying to write (either the +// value itself or the valueFn applied to the right previous value). +// 3) We assert that the transactional write is idempotent. +// +// Ensure all intents are found and that the values are always accurate for +// transactional idempotency. +func replayTransactionalWrite( + ctx context.Context, + engine Writer, + iter Iterator, + meta *enginepb.MVCCMetadata, + ms *enginepb.MVCCStats, + key roachpb.Key, + timestamp hlc.Timestamp, + value []byte, + txn *roachpb.Transaction, + buf *putBuffer, + valueFn func(*roachpb.Value) ([]byte, error), +) error { + var found bool + var writtenValue []byte + var err error + metaKey := MakeMVCCMetadataKey(key) + if txn.Sequence == meta.Txn.Sequence { + // This is a special case. This is when the intent hasn't made it + // to the intent history yet. We must now assert the value written + // in the intent to the value we're trying to write. + getBuf := newGetBuffer() + defer getBuf.release() + getBuf.meta = buf.meta + var exVal *roachpb.Value + if exVal, _, _, err = mvccGetInternal( + ctx, iter, metaKey, timestamp, true /* consistent */, unsafeValue, txn, getBuf); err != nil { + return err + } + writtenValue = exVal.RawBytes + found = true + } else { + // Get the value from the intent history. + writtenValue, found = meta.GetIntentValue(txn.Sequence) + } + if !found { + return errors.Errorf("transaction %s with sequence %d missing an intent with lower sequence %d", + txn.ID, meta.Txn.Sequence, txn.Sequence) + } + + // If the valueFn is specified, we must apply it to the would-be value at the key. + if valueFn != nil { + prevSeq, prevValueWritten := meta.GetPrevIntentSeq(txn.Sequence) + if prevValueWritten { + // If the previous value was found in the IntentHistory, + // simply apply the value function to the historic value + // to get the would-be value. + prevVal, _ := meta.GetIntentValue(prevSeq) + value, err = valueFn(&roachpb.Value{RawBytes: prevVal}) + if err != nil { + return err + } + } else { + // If the previous value at the key wasn't written by this transaction, + // we must apply the value function to the last committed value on the key. + getBuf := newGetBuffer() + defer getBuf.release() + getBuf.meta = buf.meta + var exVal *roachpb.Value + var err error + + // Since we want the last committed value on the key, we must make + // an inconsistent read so we ignore our previous intents here. + if exVal, _, _, err = mvccGetInternal( + ctx, iter, metaKey, timestamp, false /* consistent */, unsafeValue, nil /* txn */, getBuf); err != nil { + return err + } + value, err = valueFn(exVal) + if err != nil { + return err + } + } + } + + // To ensure the transaction is idempotent, we must assert that the + // calculated value on this replay is the same as the one we've previously + // written. + if !bytes.Equal(value, writtenValue) { + return errors.Errorf("transaction %s with sequence %d has a different value %+v after recomputing from what was written: %+v", + txn.ID, txn.Sequence, value, writtenValue) + } + return nil +} + // mvccPutInternal adds a new timestamped value to the specified key. // If value is nil, creates a deletion tombstone value. valueFn is // an optional alternative to supplying value directly. It is passed @@ -1232,18 +1331,12 @@ func mvccPutInternal( } else if txn.Epoch < meta.Txn.Epoch { return errors.Errorf("put with epoch %d came after put with epoch %d in txn %s", txn.Epoch, meta.Txn.Epoch, txn.ID) - } else if txn.Epoch == meta.Txn.Epoch && - (txn.Sequence < meta.Txn.Sequence || - (txn.Sequence == meta.Txn.Sequence && - txn.DeprecatedBatchIndex <= meta.Txn.DeprecatedBatchIndex)) { - // Replay error if we encounter an older sequence number or - // the same (or earlier) batch index for the same sequence. - // TODO(ridwanmsharif, nvanbenschoten): Use the IntentHistory here - // to figure out what should happen here. - return roachpb.NewTransactionRetryError(roachpb.RETRY_POSSIBLE_REPLAY) + } else if txn.Epoch == meta.Txn.Epoch && txn.Sequence <= meta.Txn.Sequence { + return replayTransactionalWrite(ctx, engine, iter, meta, ms, key, timestamp, value, txn, buf, valueFn) } - // We need the previous value written here for the intent history. + // We're overwriting the intent that was present at this key, before we do + // that though - we must record the older intent in the IntentHistory. var prevIntentValBytes []byte getBuf := newGetBuffer() // Release the buffer after using the existing value. diff --git a/pkg/storage/engine/mvcc_test.go b/pkg/storage/engine/mvcc_test.go index 1a8b7b91c4e4..31bf7b5a9eb0 100644 --- a/pkg/storage/engine/mvcc_test.go +++ b/pkg/storage/engine/mvcc_test.go @@ -3302,11 +3302,11 @@ func TestMVCCWriteWithSequenceAndBatchIndex(t *testing.T) { batchIndex int32 expRetry bool }{ - {1, 0, true}, // old sequence old batch index - {1, 1, true}, // old sequence, same batch index - {1, 2, true}, // old sequence, new batch index - {2, 0, true}, // same sequence, old batch index - {2, 1, true}, // same sequence, same batch index + {1, 0, false}, // old sequence old batch index + {1, 1, false}, // old sequence, same batch index + {1, 2, false}, // old sequence, new batch index + {2, 0, false}, // same sequence, old batch index + {2, 1, false}, // same sequence, same batch index {2, 2, false}, // same sequence, new batch index {3, 0, false}, // new sequence, old batch index {3, 1, false}, // new sequence, same batch index @@ -4403,6 +4403,128 @@ func TestResolveIntentWithLowerEpoch(t *testing.T) { } } +// TestMVCCIdempotentTransactions verifies that trying to execute a transaction is +// idempotent. +func TestMVCCIdempotentTransactions(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + engine := createTestEngine() + defer engine.Close() + + ts1 := hlc.Timestamp{WallTime: 1E9} + + key := roachpb.Key("a") + value := roachpb.MakeValueFromString("first value") + newValue := roachpb.MakeValueFromString("second value") + txn := &roachpb.Transaction{TxnMeta: enginepb.TxnMeta{ID: uuid.MakeV4(), Timestamp: ts1}} + txn.Status = roachpb.PENDING + + // Lay down an intent. + if err := MVCCPut(ctx, engine, nil, key, ts1, value, txn); err != nil { + t.Fatal(err) + } + + // Lay down an intent again with no problem because we're idempotent. + if err := MVCCPut(ctx, engine, nil, key, ts1, value, txn); err != nil { + t.Fatal(err) + } + + // Lay down an intent without increasing the sequence but with a different value. + if err := MVCCPut(ctx, engine, nil, key, ts1, newValue, txn); err != nil { + if !testutils.IsError(err, "has a different value") { + t.Fatal(err) + } + } else { + t.Fatalf("put should've failed as replay of a transaction yields a different value") + } + + // Lay down a second intent. + txn.Sequence++ + if err := MVCCPut(ctx, engine, nil, key, ts1, newValue, txn); err != nil { + t.Fatal(err) + } + + // Replay first intent without writing anything down. + txn.Sequence-- + if err := MVCCPut(ctx, engine, nil, key, ts1, value, txn); err != nil { + t.Fatal(err) + } + + // Check that the intent meta was as expected. + txn.Sequence++ + aggMeta := &enginepb.MVCCMetadata{ + Txn: &txn.TxnMeta, + Timestamp: hlc.LegacyTimestamp(ts1), + KeyBytes: mvccVersionTimestampSize, + ValBytes: int64(len(newValue.RawBytes)), + IntentHistory: []enginepb.MVCCMetadata_SequencedIntent{ + {Sequence: 0, Value: value.RawBytes}, + }, + } + metaKey := mvccKey(key) + meta := &enginepb.MVCCMetadata{} + ok, _, _, err := engine.GetProto(metaKey, meta) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("intent should not be cleared") + } + if !meta.Equal(aggMeta) { + t.Errorf("expected metadata:\n%+v;\n got: \n%+v", aggMeta, meta) + } + txn.Sequence-- + // Lay down an intent without increasing the sequence but with a different value. + if err := MVCCPut(ctx, engine, nil, key, ts1, newValue, txn); err != nil { + if !testutils.IsError(err, "has a different value") { + t.Fatal(err) + } + } else { + t.Fatalf("put should've failed as replay of a transaction yields a different value") + } + + txn.Sequence-- + // Lay down an intent with a lower sequence number to see if it detects missing intents. + if err := MVCCPut(ctx, engine, nil, key, ts1, newValue, txn); err != nil { + if !testutils.IsError(err, "missing an intent") { + t.Fatal(err) + } + } else { + t.Fatalf("put should've failed as replay of a transaction yields a different value") + } + txn.Sequence += 3 + + // on a separate key, start an increment. + val, err := MVCCIncrement(ctx, engine, nil, testKey1, ts1, txn, 1) + if val != 1 || err != nil { + t.Fatalf("expected val=1 (got %d): %s", val, err) + } + // As long as the sequence in unchanged, replaying the increment doesn't + // increase the value. + for i := 0; i < 10; i++ { + val, err = MVCCIncrement(ctx, engine, nil, testKey1, ts1, txn, 1) + if val != 1 || err != nil { + t.Fatalf("expected val=1 (got %d): %s", val, err) + } + } + + // Increment again. + txn.Sequence++ + val, err = MVCCIncrement(ctx, engine, nil, testKey1, ts1, txn, 1) + if val != 2 || err != nil { + t.Fatalf("expected val=2 (got %d): %s", val, err) + } + txn.Sequence-- + // Replaying an older increment doesn't increase the value. + for i := 0; i < 10; i++ { + val, err = MVCCIncrement(ctx, engine, nil, testKey1, ts1, txn, 1) + if val != 1 || err != nil { + t.Fatalf("expected val=1 (got %d): %s", val, err) + } + } +} + // TestMVCCIntentHistory verifies that trying to write to a key that already was written // to, results in the history being recorded in the MVCCMetadata. func TestMVCCIntentHistory(t *testing.T) { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 8b17df5c9ab4..1a36f34305d7 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -3426,17 +3426,6 @@ func (r *Replica) evaluateProposal( // replication is not necessary. res.Local.Reply = br - // Assert that successful write requests result in intents. This is - // important for transactional pipelining and the parallel commit proposal - // because both use the presence of an intent to indicate that a write - // succeeded. This check may have false negatives but will never have false - // positives. - if br.Txn != nil && br.Txn.Status != roachpb.ABORTED { - if ba.IsTransactionWrite() && !ba.IsRange() && batch.Empty() { - log.Fatalf(ctx, "successful transaction point write batch %v resulted in no-op", ba) - } - } - // needConsensus determines if the result needs to be replicated and // proposed through Raft. This is necessary if at least one of the // following conditions is true: @@ -6388,22 +6377,6 @@ func evaluateBatch( // request. Each request will set their own sequence number on // the TxnMeta, which is stored as part of an intent. ba.Txn.Sequence = seqNum - } else { - // If the DisallowUnsequencedTransactionalWrites testing knob - // is set, we assert that all transaction writes has assigned - // Request-scoped sequence numbers. - if rec.EvalKnobs().DisallowUnsequencedTransactionalWrites { - if roachpb.IsTransactionWrite(args) { - log.Fatalf(ctx, "found unsequenced transactional request %v in %v", args, ba) - } - } - - // The Txn coordinator must not be setting sequence numbers on - // individual requests. Use the now-deprecated BatchIndex field. - // - // TODO(nvanbenschoten): remove this case and the explanation - // above in version 2.2. - ba.Txn.DeprecatedBatchIndex = int32(index) } } // Note that responses are populated even when an error is returned. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 364d0df52b6a..add21c21bca0 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -2891,9 +2891,10 @@ func TestReplicaAbortSpanReadError(t *testing.T) { } } -// TestReplicaAbortSpanStoredTxnRetryError verifies that if a cached -// entry is present, a transaction restart error is returned. -func TestReplicaAbortSpanStoredTxnRetryError(t *testing.T) { +// TestReplicaAbortSpanTxnIdempotency verifies that a TransactionAbortedError is +// found when a put is tried on an aborted txn. It further verifies transactions +// run successfully and in an idempotent manner when replaying the same requests. +func TestReplicaAbortSpanTxnIdempotency(t *testing.T) { defer leaktest.AfterTest(t)() tc := testContext{} stopper := stop.NewStopper() @@ -2940,11 +2941,8 @@ func TestReplicaAbortSpanStoredTxnRetryError(t *testing.T) { t.Fatal(pErr) } txn.Timestamp.Forward(txn.Timestamp.Add(10, 10)) // can't hurt - { - pErr := try() - if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { - t.Fatal(pErr) - } + if pErr := try(); pErr != nil { + t.Fatal(pErr) } // Pretend we restarted by increasing the epoch. That's all that's needed.