Skip to content

Commit

Permalink
storage: allow transactions to run at a lower sequence
Browse files Browse the repository at this point in the history
Adds support to have transactions run at a lower sequence
at a given key. It asserts the value it computes with the
value written for the sequence in the intent history instead
or returning a retry-able error.

Release note: None
  • Loading branch information
Ridwan Sharif committed Dec 12, 2018
1 parent b1424c3 commit a147f24
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 59 deletions.
5 changes: 0 additions & 5 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ func (ba *BatchRequest) IsTransactionWrite() bool {
return ba.hasFlag(isTxnWrite)
}

// IsRange returns true iff the BatchRequest contains range-based requests.
func (ba *BatchRequest) IsRange() bool {
return ba.hasFlag(isRange)
}

// IsUnsplittable returns true iff the BatchRequest an un-splittable request.
func (ba *BatchRequest) IsUnsplittable() bool {
return ba.hasFlag(isUnsplittable)
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,8 @@ func TestStoreRangeSplitIdempotency(t *testing.T) {
_, pErr := client.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{
Txn: &lTxn,
}, lIncArgs)
if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok {
t.Fatalf("unexpected idempotency failure: %v", pErr)
if pErr != nil {
t.Fatal(pErr)
}

// Send out the same increment copied from above (same txn/sequence), but
Expand All @@ -684,8 +684,8 @@ func TestStoreRangeSplitIdempotency(t *testing.T) {
RangeID: newRng.RangeID,
Txn: &rTxn,
}, rIncArgs)
if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok {
t.Fatalf("unexpected idempotency failure: %v", pErr)
if pErr != nil {
t.Fatal(pErr)
}

// Compare stats of split ranges to ensure they are non zero and
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/engine/enginepb/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package enginepb

import "sort"

// Short returns a prefix of the transaction's ID.
func (t TxnMeta) Short() string {
return t.ID.Short()
Expand Down Expand Up @@ -135,3 +137,27 @@ func (meta *MVCCMetadata) AddToIntentHistory(seq int32, val []byte) {
meta.IntentHistory = append(meta.IntentHistory,
MVCCMetadata_SequencedIntent{Sequence: seq, Value: val})
}

// GetPrevIntentSeq goes through the intent history and finds the previous
// intent's sequence number given the current sequence.
func (meta *MVCCMetadata) GetPrevIntentSeq(seq int32) (int32, bool) {
index := sort.Search(len(meta.IntentHistory), func(i int) bool {
return meta.IntentHistory[i].Sequence >= seq
})
if index > 0 && index < len(meta.IntentHistory) {
return meta.IntentHistory[index-1].Sequence, true
}
return 0, false
}

// GetIntentValue goes through the intent history and finds the value
// written at the sequence number.
func (meta *MVCCMetadata) GetIntentValue(seq int32) ([]byte, bool) {
index := sort.Search(len(meta.IntentHistory), func(i int) bool {
return meta.IntentHistory[i].Sequence >= seq
})
if index < len(meta.IntentHistory) && meta.IntentHistory[index].Sequence == seq {
return meta.IntentHistory[index].Value, true
}
return nil, false
}
113 changes: 103 additions & 10 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,105 @@ func maybeGetValue(
return valueFn(exVal)
}

// replayTransactionalWrite performs a transactional write under the assumption
// that the transactional write was already executed before. Essentially a replay.
// Since transactions should be idempotent, we must be particularly careful
// about writing an intent if it was already written. If the sequence of the
// transaction is at or below one found in `meta.Txn` then we should
// simply assert the value we're trying to add against the value that was
// previously written at that sequence.
//
// 1) Firstly, we find the value previously written as part of the same sequence.
// 2) We then figure out the value the transaction is trying to write (either the
// value itself or the valueFn applied to the right previous value).
// 3) We assert that the transactional write is idempotent.
//
// Ensure all intents are found and that the values are always accurate for
// transactional idempotency.
func replayTransactionalWrite(
ctx context.Context,
engine Writer,
iter Iterator,
meta *enginepb.MVCCMetadata,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value []byte,
txn *roachpb.Transaction,
buf *putBuffer,
valueFn func(*roachpb.Value) ([]byte, error),
) error {
var found bool
var writtenValue []byte
var err error
metaKey := MakeMVCCMetadataKey(key)
if txn.Sequence == meta.Txn.Sequence {
// This is a special case. This is when the intent hasn't made it
// to the intent history yet. We must now assert the value written
// in the intent to the value we're trying to write.
getBuf := newGetBuffer()
defer getBuf.release()
getBuf.meta = buf.meta
var exVal *roachpb.Value
if exVal, _, _, err = mvccGetInternal(
ctx, iter, metaKey, timestamp, true /* consistent */, unsafeValue, txn, getBuf); err != nil {
return err
}
writtenValue = exVal.RawBytes
found = true
} else {
// Get the value from the intent history.
writtenValue, found = meta.GetIntentValue(txn.Sequence)
}
if !found {
return errors.Errorf("transaction %s with sequence %d missing an intent with lower sequence %d",
txn.ID, meta.Txn.Sequence, txn.Sequence)
}

// If the valueFn is specified, we must apply it to the would-be value at the key.
if valueFn != nil {
prevSeq, prevValueWritten := meta.GetPrevIntentSeq(txn.Sequence)
if prevValueWritten {
// If the previous value was found in the IntentHistory,
// simply apply the value function to the historic value
// to get the would-be value.
prevVal, _ := meta.GetIntentValue(prevSeq)
value, err = valueFn(&roachpb.Value{RawBytes: prevVal})
if err != nil {
return err
}
} else {
// If the previous value at the key wasn't written by this transaction,
// we must apply the value function to the last committed value on the key.
getBuf := newGetBuffer()
defer getBuf.release()
getBuf.meta = buf.meta
var exVal *roachpb.Value
var err error

// Since we want the last committed value on the key, we must make
// an inconsistent read so we ignore our previous intents here.
if exVal, _, _, err = mvccGetInternal(
ctx, iter, metaKey, timestamp, false /* consistent */, unsafeValue, nil /* txn */, getBuf); err != nil {
return err
}
value, err = valueFn(exVal)
if err != nil {
return err
}
}
}

// To ensure the transaction is idempotent, we must assert that the
// calculated value on this replay is the same as the one we've previously
// written.
if !bytes.Equal(value, writtenValue) {
return errors.Errorf("transaction %s with sequence %d has a different value %+v after recomputing from what was written: %+v",
txn.ID, txn.Sequence, value, writtenValue)
}
return nil
}

// mvccPutInternal adds a new timestamped value to the specified key.
// If value is nil, creates a deletion tombstone value. valueFn is
// an optional alternative to supplying value directly. It is passed
Expand Down Expand Up @@ -1232,18 +1331,12 @@ func mvccPutInternal(
} else if txn.Epoch < meta.Txn.Epoch {
return errors.Errorf("put with epoch %d came after put with epoch %d in txn %s",
txn.Epoch, meta.Txn.Epoch, txn.ID)
} else if txn.Epoch == meta.Txn.Epoch &&
(txn.Sequence < meta.Txn.Sequence ||
(txn.Sequence == meta.Txn.Sequence &&
txn.DeprecatedBatchIndex <= meta.Txn.DeprecatedBatchIndex)) {
// Replay error if we encounter an older sequence number or
// the same (or earlier) batch index for the same sequence.
// TODO(ridwanmsharif, nvanbenschoten): Use the IntentHistory here
// to figure out what should happen here.
return roachpb.NewTransactionRetryError(roachpb.RETRY_POSSIBLE_REPLAY)
} else if txn.Epoch == meta.Txn.Epoch && txn.Sequence <= meta.Txn.Sequence {
return replayTransactionalWrite(ctx, engine, iter, meta, ms, key, timestamp, value, txn, buf, valueFn)
}

// We need the previous value written here for the intent history.
// We're overwriting the intent that was present at this key, before we do
// that though - we must record the older intent in the IntentHistory.
var prevIntentValBytes []byte
getBuf := newGetBuffer()
// Release the buffer after using the existing value.
Expand Down
132 changes: 127 additions & 5 deletions pkg/storage/engine/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3302,11 +3302,11 @@ func TestMVCCWriteWithSequenceAndBatchIndex(t *testing.T) {
batchIndex int32
expRetry bool
}{
{1, 0, true}, // old sequence old batch index
{1, 1, true}, // old sequence, same batch index
{1, 2, true}, // old sequence, new batch index
{2, 0, true}, // same sequence, old batch index
{2, 1, true}, // same sequence, same batch index
{1, 0, false}, // old sequence old batch index
{1, 1, false}, // old sequence, same batch index
{1, 2, false}, // old sequence, new batch index
{2, 0, false}, // same sequence, old batch index
{2, 1, false}, // same sequence, same batch index
{2, 2, false}, // same sequence, new batch index
{3, 0, false}, // new sequence, old batch index
{3, 1, false}, // new sequence, same batch index
Expand Down Expand Up @@ -4403,6 +4403,128 @@ func TestResolveIntentWithLowerEpoch(t *testing.T) {
}
}

// TestMVCCIdempotentTransactions verifies that trying to execute a transaction is
// idempotent.
func TestMVCCIdempotentTransactions(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
engine := createTestEngine()
defer engine.Close()

ts1 := hlc.Timestamp{WallTime: 1E9}

key := roachpb.Key("a")
value := roachpb.MakeValueFromString("first value")
newValue := roachpb.MakeValueFromString("second value")
txn := &roachpb.Transaction{TxnMeta: enginepb.TxnMeta{ID: uuid.MakeV4(), Timestamp: ts1}}
txn.Status = roachpb.PENDING

// Lay down an intent.
if err := MVCCPut(ctx, engine, nil, key, ts1, value, txn); err != nil {
t.Fatal(err)
}

// Lay down an intent again with no problem because we're idempotent.
if err := MVCCPut(ctx, engine, nil, key, ts1, value, txn); err != nil {
t.Fatal(err)
}

// Lay down an intent without increasing the sequence but with a different value.
if err := MVCCPut(ctx, engine, nil, key, ts1, newValue, txn); err != nil {
if !testutils.IsError(err, "has a different value") {
t.Fatal(err)
}
} else {
t.Fatalf("put should've failed as replay of a transaction yields a different value")
}

// Lay down a second intent.
txn.Sequence++
if err := MVCCPut(ctx, engine, nil, key, ts1, newValue, txn); err != nil {
t.Fatal(err)
}

// Replay first intent without writing anything down.
txn.Sequence--
if err := MVCCPut(ctx, engine, nil, key, ts1, value, txn); err != nil {
t.Fatal(err)
}

// Check that the intent meta was as expected.
txn.Sequence++
aggMeta := &enginepb.MVCCMetadata{
Txn: &txn.TxnMeta,
Timestamp: hlc.LegacyTimestamp(ts1),
KeyBytes: mvccVersionTimestampSize,
ValBytes: int64(len(newValue.RawBytes)),
IntentHistory: []enginepb.MVCCMetadata_SequencedIntent{
{Sequence: 0, Value: value.RawBytes},
},
}
metaKey := mvccKey(key)
meta := &enginepb.MVCCMetadata{}
ok, _, _, err := engine.GetProto(metaKey, meta)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("intent should not be cleared")
}
if !meta.Equal(aggMeta) {
t.Errorf("expected metadata:\n%+v;\n got: \n%+v", aggMeta, meta)
}
txn.Sequence--
// Lay down an intent without increasing the sequence but with a different value.
if err := MVCCPut(ctx, engine, nil, key, ts1, newValue, txn); err != nil {
if !testutils.IsError(err, "has a different value") {
t.Fatal(err)
}
} else {
t.Fatalf("put should've failed as replay of a transaction yields a different value")
}

txn.Sequence--
// Lay down an intent with a lower sequence number to see if it detects missing intents.
if err := MVCCPut(ctx, engine, nil, key, ts1, newValue, txn); err != nil {
if !testutils.IsError(err, "missing an intent") {
t.Fatal(err)
}
} else {
t.Fatalf("put should've failed as replay of a transaction yields a different value")
}
txn.Sequence += 3

// on a separate key, start an increment.
val, err := MVCCIncrement(ctx, engine, nil, testKey1, ts1, txn, 1)
if val != 1 || err != nil {
t.Fatalf("expected val=1 (got %d): %s", val, err)
}
// As long as the sequence in unchanged, replaying the increment doesn't
// increase the value.
for i := 0; i < 10; i++ {
val, err = MVCCIncrement(ctx, engine, nil, testKey1, ts1, txn, 1)
if val != 1 || err != nil {
t.Fatalf("expected val=1 (got %d): %s", val, err)
}
}

// Increment again.
txn.Sequence++
val, err = MVCCIncrement(ctx, engine, nil, testKey1, ts1, txn, 1)
if val != 2 || err != nil {
t.Fatalf("expected val=2 (got %d): %s", val, err)
}
txn.Sequence--
// Replaying an older increment doesn't increase the value.
for i := 0; i < 10; i++ {
val, err = MVCCIncrement(ctx, engine, nil, testKey1, ts1, txn, 1)
if val != 1 || err != nil {
t.Fatalf("expected val=1 (got %d): %s", val, err)
}
}
}

// TestMVCCIntentHistory verifies that trying to write to a key that already was written
// to, results in the history being recorded in the MVCCMetadata.
func TestMVCCIntentHistory(t *testing.T) {
Expand Down
27 changes: 0 additions & 27 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3426,17 +3426,6 @@ func (r *Replica) evaluateProposal(
// replication is not necessary.
res.Local.Reply = br

// Assert that successful write requests result in intents. This is
// important for transactional pipelining and the parallel commit proposal
// because both use the presence of an intent to indicate that a write
// succeeded. This check may have false negatives but will never have false
// positives.
if br.Txn != nil && br.Txn.Status != roachpb.ABORTED {
if ba.IsTransactionWrite() && !ba.IsRange() && batch.Empty() {
log.Fatalf(ctx, "successful transaction point write batch %v resulted in no-op", ba)
}
}

// needConsensus determines if the result needs to be replicated and
// proposed through Raft. This is necessary if at least one of the
// following conditions is true:
Expand Down Expand Up @@ -6388,22 +6377,6 @@ func evaluateBatch(
// request. Each request will set their own sequence number on
// the TxnMeta, which is stored as part of an intent.
ba.Txn.Sequence = seqNum
} else {
// If the DisallowUnsequencedTransactionalWrites testing knob
// is set, we assert that all transaction writes has assigned
// Request-scoped sequence numbers.
if rec.EvalKnobs().DisallowUnsequencedTransactionalWrites {
if roachpb.IsTransactionWrite(args) {
log.Fatalf(ctx, "found unsequenced transactional request %v in %v", args, ba)
}
}

// The Txn coordinator must not be setting sequence numbers on
// individual requests. Use the now-deprecated BatchIndex field.
//
// TODO(nvanbenschoten): remove this case and the explanation
// above in version 2.2.
ba.Txn.DeprecatedBatchIndex = int32(index)
}
}
// Note that responses are populated even when an error is returned.
Expand Down
Loading

0 comments on commit a147f24

Please sign in to comment.