Skip to content

Commit

Permalink
Merge pull request #9100 from bdarnell/timestamp-cache
Browse files Browse the repository at this point in the history
storage: Handle timestamp collisions in timestampCache
  • Loading branch information
bdarnell authored Sep 7, 2016
2 parents c67d4e0 + 581caf0 commit 7517390
Show file tree
Hide file tree
Showing 4 changed files with 505 additions and 365 deletions.
20 changes: 12 additions & 8 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ func (r *Replica) applyTimestampCache(ba *roachpb.BatchRequest) *roachpb.Error {
// has already been finalized, in which case this is a replay.
if _, ok := args.(*roachpb.BeginTransactionRequest); ok {
key := keys.TransactionKey(header.Key, ba.GetTxnID())
wTS, wOK := r.mu.tsCache.GetMaxWrite(key, nil, nil)
wTS, _, wOK := r.mu.tsCache.GetMaxWrite(key, nil)
if wOK {
return roachpb.NewError(roachpb.NewTransactionReplayError())
} else if !wTS.Less(ba.Txn.Timestamp) {
Expand All @@ -1257,10 +1257,12 @@ func (r *Replica) applyTimestampCache(ba *roachpb.BatchRequest) *roachpb.Error {
continue
}

// Forward the timestamp if there's been a more recent read.
rTS, _ := r.mu.tsCache.GetMaxRead(header.Key, header.EndKey, ba.GetTxnID())
// Forward the timestamp if there's been a more recent read (by someone else).
rTS, rTxnID, _ := r.mu.tsCache.GetMaxRead(header.Key, header.EndKey)
if ba.Txn != nil {
ba.Txn.Timestamp.Forward(rTS.Next())
if rTxnID == nil || *ba.Txn.ID != *rTxnID {
ba.Txn.Timestamp.Forward(rTS.Next())
}
} else {
ba.Timestamp.Forward(rTS.Next())
}
Expand All @@ -1269,11 +1271,13 @@ func (r *Replica) applyTimestampCache(ba *roachpb.BatchRequest) *roachpb.Error {
// write too old boolean for transactions. Note that currently
// only EndTransaction and DeleteRange requests update the
// write timestamp cache.
wTS, _ := r.mu.tsCache.GetMaxWrite(header.Key, header.EndKey, ba.GetTxnID())
wTS, wTxnID, _ := r.mu.tsCache.GetMaxWrite(header.Key, header.EndKey)
if ba.Txn != nil {
if !wTS.Less(ba.Txn.Timestamp) {
ba.Txn.Timestamp.Forward(wTS.Next())
ba.Txn.WriteTooOld = true
if wTxnID == nil || *ba.Txn.ID != *wTxnID {
if !wTS.Less(ba.Txn.Timestamp) {
ba.Txn.Timestamp.Forward(wTS.Next())
ba.Txn.WriteTooOld = true
}
}
} else {
ba.Timestamp.Forward(wTS.Next())
Expand Down
22 changes: 11 additions & 11 deletions storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ func TestReplicaTSCacheLowWaterOnLease(t *testing.T) {
now := hlc.Timestamp{WallTime: tc.manualClock.UnixNano()}

tc.rng.mu.Lock()
baseRTS, _ := tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("a"), nil /* end */, nil /* txn */)
baseRTS, _, _ := tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("a"), nil /* end */)
tc.rng.mu.Unlock()
// TODO(tschottdorf): this value is zero, which seems ripe for producing
// test cases that do not test anything.
Expand Down Expand Up @@ -932,8 +932,8 @@ func TestReplicaTSCacheLowWaterOnLease(t *testing.T) {
}
// Verify expected low water mark.
tc.rng.mu.Lock()
rTS, _ := tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("a"), nil, nil)
wTS, _ := tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("a"), nil, nil)
rTS, _, _ := tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("a"), nil)
wTS, _, _ := tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("a"), nil)
tc.rng.mu.Unlock()
if rTS.WallTime != test.expLowWater || wTS.WallTime != test.expLowWater {
t.Errorf("%d: expected low water %d; got maxRead=%d, maxWrite=%d", i, test.expLowWater, rTS.WallTime, wTS.WallTime)
Expand Down Expand Up @@ -1715,26 +1715,26 @@ func TestReplicaUpdateTSCache(t *testing.T) {
// Verify the timestamp cache has rTS=1s and wTS=0s for "a".
tc.rng.mu.Lock()
defer tc.rng.mu.Unlock()
_, rOK := tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("a"), nil, nil)
_, wOK := tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("a"), nil, nil)
_, _, rOK := tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("a"), nil)
_, _, wOK := tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("a"), nil)
if rOK || wOK {
t.Errorf("expected rOK=false and wOK=false; rOK=%t, wOK=%t", rOK, wOK)
}
tc.rng.mu.tsCache.ExpandRequests(hlc.ZeroTimestamp)
rTS, rOK := tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("a"), nil, nil)
wTS, wOK := tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("a"), nil, nil)
rTS, _, rOK := tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("a"), nil)
wTS, _, wOK := tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("a"), nil)
if rTS.WallTime != t0.Nanoseconds() || wTS.WallTime != 0 || !rOK || wOK {
t.Errorf("expected rTS=1s and wTS=0s, but got %s, %s; rOK=%t, wOK=%t", rTS, wTS, rOK, wOK)
}
// Verify the timestamp cache has rTS=0s and wTS=2s for "b".
rTS, rOK = tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("b"), nil, nil)
wTS, wOK = tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("b"), nil, nil)
rTS, _, rOK = tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("b"), nil)
wTS, _, wOK = tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("b"), nil)
if rTS.WallTime != 0 || wTS.WallTime != t1.Nanoseconds() || rOK || !wOK {
t.Errorf("expected rTS=0s and wTS=2s, but got %s, %s; rOK=%t, wOK=%t", rTS, wTS, rOK, wOK)
}
// Verify another key ("c") has 0sec in timestamp cache.
rTS, rOK = tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("c"), nil, nil)
wTS, wOK = tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("c"), nil, nil)
rTS, _, rOK = tc.rng.mu.tsCache.GetMaxRead(roachpb.Key("c"), nil)
wTS, _, wOK = tc.rng.mu.tsCache.GetMaxWrite(roachpb.Key("c"), nil)
if rTS.WallTime != 0 || wTS.WallTime != 0 || rOK || wOK {
t.Errorf("expected rTS=0s and wTS=0s, but got %s %s; rOK=%t, wOK=%t", rTS, wTS, rOK, wOK)
}
Expand Down
Loading

0 comments on commit 7517390

Please sign in to comment.