Skip to content

Commit

Permalink
storage: write at provisional commit ts, not orig ts
Browse files Browse the repository at this point in the history
The fact that transactions write at their original timestamp, and not
their provisional commit timestamp, allows leaving an intent under a
read. The timestamp cache will ensure that the transaction can't
actually commit unless it can bump its intents above the read, but it
will still leave an intent under the read in the meantime.

This can lead to starvation. Intents are meant to function as a sort of
lock on a key. Once a writer lays down an intent, no readers should be
allowed to read above that intent until that intent is resolved.
Otherwise a continual stream of readers could prevent the writer from
ever managing to commit by continually bumping the timestamp cache.

Now consider how CDC's poller works: it reads (tsmin, ts1], then (ts1,
ts2], then (ts2, ts3], and so on, in a tight loop. Since it uses a
time-bound iterator under the hood, reading (ts2, ts3], for example,
cannot return an intent written at ts2. But the idea was that we
inductively guranteed that we never read above an intent. If an intent
was written at ts2, even though the read from (ts2, ts3] would fail to
observe it, the previous read from (ts1, ts2] would have.

Of course, since transactions write at their original timestamp, a
transaction with an original timestamp of ts2 can write an intent at ts2
*after* the CDC poller has read (ts1, ts2]. (The transaction will be
forced to commit at ts3 or later to be sequenced after the CDC poller's
read, but it will leave the intent at ts2.) The CDC poller's next read,
from (ts2, ts3], thus won't see the intent, nor will any future reads at
higher timestamps. And so the CDC poller will continually bump the
timestamp cache, completely starving the writer.

Fix the problem by writing at the transaction's provisional commit
timestamp (i.e., the timestamp that has been forwarded by the timestamp
cache, if necessary) instead of the transaction's original timestamp.
Writing at the original timestamp was only necessary to prevent a lost
update in snapshot isolation mode, which is no longer supported. In
serializable mode, the anomaly is protected against by the read refresh
mechanism.

Besides fixing the starvation problem, the new behavior is more
intuitive than the old behavior. It also might have some performance
benefits, as it is less likely that intents will need to be bumped at
commit time, which saves on RocksDB writes.

Note that one conceptual oddity surrounds transactions reading their own
writes: a transaction that is reading at ts1 and writing at ts2 will be
able to read its own writes, even though simple timestamp comparison
would make those writes invisible. This oddity is not introduced by
this commit, though this commit does make this situation more common.

Touches #32433.

Release note: None
  • Loading branch information
benesch committed Dec 19, 2018
1 parent 3483037 commit 57d0201
Show file tree
Hide file tree
Showing 14 changed files with 559 additions and 304 deletions.
76 changes: 45 additions & 31 deletions pkg/ccl/storageccl/engineccl/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,25 +161,31 @@ func TestMVCCIterateIncremental(t *testing.T) {

// Exercise intent handling.
txn1ID := uuid.MakeV4()
txn1 := roachpb.Transaction{TxnMeta: enginepb.TxnMeta{
Key: testKey1,
ID: txn1ID,
Epoch: 1,
Timestamp: ts4,
}}
txn1 := roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Key: testKey1,
ID: txn1ID,
Epoch: 1,
Timestamp: ts4,
},
OrigTimestamp: ts4,
}
txn1Val := roachpb.Value{RawBytes: testValue4}
if err := engine.MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.TxnMeta.Timestamp, txn1Val, &txn1); err != nil {
if err := engine.MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.OrigTimestamp, txn1Val, &txn1); err != nil {
t.Fatal(err)
}
txn2ID := uuid.MakeV4()
txn2 := roachpb.Transaction{TxnMeta: enginepb.TxnMeta{
Key: testKey2,
ID: txn2ID,
Epoch: 1,
Timestamp: ts4,
}}
txn2 := roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Key: testKey2,
ID: txn2ID,
Epoch: 1,
Timestamp: ts4,
},
OrigTimestamp: ts4,
}
txn2Val := roachpb.Value{RawBytes: testValue4}
if err := engine.MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.TxnMeta.Timestamp, txn2Val, &txn2); err != nil {
if err := engine.MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.OrigTimestamp, txn2Val, &txn2); err != nil {
t.Fatal(err)
}
t.Run("intents1",
Expand Down Expand Up @@ -233,25 +239,31 @@ func TestMVCCIterateIncremental(t *testing.T) {

// Exercise intent handling.
txn1ID := uuid.MakeV4()
txn1 := roachpb.Transaction{TxnMeta: enginepb.TxnMeta{
Key: testKey1,
ID: txn1ID,
Epoch: 1,
Timestamp: ts4,
}}
txn1 := roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Key: testKey1,
ID: txn1ID,
Epoch: 1,
Timestamp: ts4,
},
OrigTimestamp: ts4,
}
txn1Val := roachpb.Value{RawBytes: testValue4}
if err := engine.MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.TxnMeta.Timestamp, txn1Val, &txn1); err != nil {
if err := engine.MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.OrigTimestamp, txn1Val, &txn1); err != nil {
t.Fatal(err)
}
txn2ID := uuid.MakeV4()
txn2 := roachpb.Transaction{TxnMeta: enginepb.TxnMeta{
Key: testKey2,
ID: txn2ID,
Epoch: 1,
Timestamp: ts4,
}}
txn2 := roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Key: testKey2,
ID: txn2ID,
Epoch: 1,
Timestamp: ts4,
},
OrigTimestamp: ts4,
}
txn2Val := roachpb.Value{RawBytes: testValue4}
if err := engine.MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.TxnMeta.Timestamp, txn2Val, &txn2); err != nil {
if err := engine.MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.OrigTimestamp, txn2Val, &txn2); err != nil {
t.Fatal(err)
}
t.Run("intents1",
Expand Down Expand Up @@ -370,6 +382,7 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
Epoch: 1,
Timestamp: hlc.Timestamp{WallTime: 2},
},
OrigTimestamp: hlc.Timestamp{WallTime: 2},
})

// Create a second DB in which we'll create a specific SSTable structure: the
Expand Down Expand Up @@ -473,6 +486,7 @@ func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) {
Epoch: 1,
Timestamp: ts,
},
OrigTimestamp: ts,
}
}
intent := func(txn *roachpb.Transaction) roachpb.Intent {
Expand Down Expand Up @@ -542,9 +556,9 @@ func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) {
// kA:3 -> vA3
// kA:2 -> vA2
// kB -> (intent deletion)
require.NoError(t, engine.MVCCPut(ctx, db, nil, kA, txnA1.Timestamp, vA1, txnA1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kB, txnB1.Timestamp, vB1, txnB1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kC, txnC1.Timestamp, vC1, txnC1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kA, txnA1.OrigTimestamp, vA1, txnA1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kB, txnB1.OrigTimestamp, vB1, txnB1))
require.NoError(t, engine.MVCCPut(ctx, db, nil, kC, txnC1.OrigTimestamp, vC1, txnC1))
require.NoError(t, db.Flush())
require.NoError(t, db.Compact())
require.NoError(t, engine.MVCCResolveWriteIntent(ctx, db, nil, intent(txnA1)))
Expand Down
19 changes: 16 additions & 3 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,22 @@ func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
return errors.New("transactional request must not set batch timestamp")
}

// Always use the original timestamp for reads and writes, even
// though some intents may be written at higher timestamps in the
// event of a WriteTooOldError.
// The batch timestamp is the timestamp at which reads are performed. We set
// this to the txn's original timestamp, even if the txn's provisional
// commit timestamp has been forwarded, so that all reads within a txn
// observe the same snapshot of the database.
//
// In other words, we want to preserve the invariant that reading the same
// key multiple times in the same transaction will always return the same
// value. If we were to read at the latest provisional commit timestamp,
// txn.Timestamp, instead, reading the same key twice in the same txn might
// yield different results, e.g., if an intervening write caused the
// provisional commit timestamp to be advanced. Such a txn would fail to
// commit, as its reads would not successfully be refreshed, but only after
// confusing the client with spurious data.
//
// Note that writes will be performed at the provisional commit timestamp,
// txn.Timestamp, regardless of the batch timestamp.
ba.Timestamp = txn.OrigTimestamp
// If a refreshed timestamp is set for the transaction, forward
// the batch timestamp to it. The refreshed timestamp indicates a
Expand Down
46 changes: 8 additions & 38 deletions pkg/roachpb/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 8 additions & 38 deletions pkg/roachpb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -262,42 +262,11 @@ message Transaction {
// transaction will retry unless we manage to "refresh the reads" - see
// refreshed_timestamp.
//
// This timestamp is the one at which all transactions will read, unless
// refreshed_timestamp is set. It is also, surprisingly, the timestamp at
// which transactions will provisionally _write_ (i.e. intents are written at
// this orig_timestamp and, after commit, when the intents are resolved,
// their timestamps are bumped to the to the commit timestamp), if
// refreshed_timestamp isn't set.
// This is ultimately because of correctness concerns around SNAPSHOT
// transactions.
//
// Intuitively, one could think that the timestamp at which intents should be
// written should be the provisional commit timestamp, and while this is
// morally true, consider the following scenario, where txn1 is a SNAPSHOT
// txn:
//
// - txn1 at orig_timestamp=5 reads key1: (value) 1.
// - txn1 writes elsewhere, has its commit timestamp increased to 20.
// - txn2 at orig_timestamp=10 reads key1: 1
// - txn2 increases the value by 5: key1: 6 and commits
// - txn1 increases the value by 1: key1: 2, attempts commit
// This timestamp is the one at which all reads occur, unless
// refreshed_timestamp is set.
//
// If txn1 uses its orig_timestamp for updating key1 (as it does), it
// conflicts with txn2's committed value (which is at timestamp 10, in the
// future of 5), and restarts.
// Using instead its candidate commit timestamp, it wouldn't see a conflict
// and commit, but this is not the expected outcome (the expected outcome is
// {key1: 6} (since txn1 is not expected to commit)) and we would be
// experiencing the Lost Update Anomaly.
//
// Note that in practice, before restarting, txn1 would still lay down an
// intent (just above the committed value) not with the intent to commit it,
// but to avoid being starved by short-lived transactions on that key which
// would otherwise not have to go through conflict resolution with txn1.
//
// Again, keep in mind that, when the transaction commits, all the intents are
// bumped to the commit timestamp (otherwise, pushing a transaction wouldn't
// achieve anything).
// Note that writes do not occur at this timestamp; they instead occur at the
// provisional commit timestamp, meta.Timestamp.
util.hlc.Timestamp orig_timestamp = 6 [(gogoproto.nullable) = false];
// Initial Timestamp + clock skew. Reads which encounter values with
// timestamps between timestamp and max_timestamp trigger a txn
Expand All @@ -310,9 +279,10 @@ message Transaction {
// can commit without necessitating a serializable restart. This
// value is forwarded to the transaction's current timestamp (meta.timestamp)
// if the transaction coordinator is able to refresh all refreshable spans
// encountered during the course of the txn. If set, this take precedence
// over orig_timestamp and is the timestamp at which the transaction both
// reads and writes going forward.
// encountered during the course of the txn. If set, this takes precedence
// over orig_timestamp and is the timestamp at which the transaction reads
// going forward.
//
// We need to keep track of both refresh_timestamp and orig_timestamp (instead
// of simply overwriting the orig_timestamp after refreshes) because the
// orig_timestamp needs to be used as a lower bound timestamp for the
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestUpsertFastPath(t *testing.T) {
}
}

func TestConcurrentUpsertWithSnapshotIsolation(t *testing.T) {
func TestConcurrentUpsert(t *testing.T) {
defer leaktest.AfterTest(t)()

s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{})
Expand All @@ -138,9 +138,6 @@ func TestConcurrentUpsertWithSnapshotIsolation(t *testing.T) {

sqlDB.Exec(t, `CREATE DATABASE d`)
sqlDB.Exec(t, `CREATE TABLE d.t (a INT PRIMARY KEY, b INT, INDEX b_idx (b))`)
// TODO(andrei): This test is probably broken: it's setting a default
// isolation on a random connection, not on all connections.
sqlDB.Exec(t, `SET DEFAULT_TRANSACTION_ISOLATION TO SNAPSHOT`)

testCases := []struct {
name string
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
Epoch: 1,
Timestamp: ts1,
},
OrigTimestamp: ts1,
}
if err := engine.MVCCPut(ctx, db, nil, k, txn.Timestamp, v, txn); err != nil {
if err := engine.MVCCPut(ctx, db, nil, k, txn.OrigTimestamp, v, txn); err != nil {
t.Fatal(err)
}
if err := engine.MVCCPut(ctx, db, nil, roachpb.Key("unused1"), ts4, v, nil); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func setupMVCCData(

var txn *roachpb.Transaction
if opts.transactional {
txn = txn1Commit
txnCopy := *txn1Commit
txn = &txnCopy
}

writeKey := func(batch Batch, idx int) {
Expand All @@ -127,6 +128,10 @@ func setupMVCCData(
value.InitChecksum(key)
counts[idx]++
ts := hlc.Timestamp{WallTime: int64(counts[idx] * 5)}
if txn != nil {
txn.OrigTimestamp = ts
txn.Timestamp = ts
}
if err := MVCCPut(ctx, batch, nil /* ms */, key, ts, value, txn); err != nil {
b.Fatal(err)
}
Expand Down
64 changes: 55 additions & 9 deletions pkg/storage/engine/enginepb/mvcc3.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 57d0201

Please sign in to comment.