Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
44407: storage: improve the migration away from txn.DeprecatedOrigTimestamp r=andreimatei a=andreimatei

19.2 doesn't generally set txn.ReadTimestamp. Instead, it sets
txn.DeprecatedOrigTimestamp. Before this patch, all code dealing with
txn.ReadTimestamp had to deal with the possibility of it not being set.
This is fragile; I recently forgot to deal with it in a patch.
This patch sets txn.ReadTimestamp to txn.DeprecatedOrigTimestamp when it
wasn't set, thereby releaving most other code of they worry.

This comes at the cost of an extra txn clone for requests coming from
19.2 nodes.

Release note: None

44428: storage: fix handling of refreshed timestamp r=andreimatei a=andreimatei

Before this patch, the refresher interceptor was erroneously asserting
its tracking of the refreshed timestamp is in sync with the
TxnCoordSender. It may, in fact, not be in sync in edge cases where a
refresh succeeded but the TxnCoordSender doesn't hear about that
success.

Touches #38156
Touches #41941
Touches #43707

Release note: None

44503: roachpb: fix txn.Update() commutativity r=andreimatei a=andreimatei

Updates to the WriteTooOld field were not commutative. This patch fixes
that, by clarifying that the transaction with the higher ReadTimestamp
gets to dictate the WriteTooOld value.
I'm not sure what consequences this used to have, besides allowing for
the confusing case where the server would receive a request with the
WriteTooOld flag set, but with the ReadTimestamp==WriteTimestamp.  A
future commit introduces a sanity assertion that all the requests with
the WTO flag have a bumped WriteTimestamp.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Jan 30, 2020
4 parents b4c708b + 86f94a0 + b9fb236 + bf7cb96 commit c4fb5cb
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 24 deletions.
14 changes: 10 additions & 4 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,15 @@ func (tc *TxnCoordSender) connectInterceptors() {
func newLeafTxnCoordSender(
tcf *TxnCoordSenderFactory, tis *roachpb.LeafTxnInputState,
) client.TxnSender {
tis.Txn.AssertInitialized(context.TODO())
txn := &tis.Txn
txn.AssertInitialized(context.TODO())

// Deal with requests from 19.2 nodes which did not set ReadTimestamp.
if txn.ReadTimestamp.Less(txn.DeprecatedOrigTimestamp) {
txn.ReadTimestamp = txn.DeprecatedOrigTimestamp
}

if tis.Txn.Status != roachpb.PENDING {
if txn.Status != roachpb.PENDING {
log.Fatalf(context.TODO(), "unexpected non-pending txn in LeafTransactionalSender: %s", tis)
}

Expand All @@ -330,7 +336,7 @@ func newLeafTxnCoordSender(
if ds, ok := tcf.wrapped.(*DistSender); ok {
riGen = ds.rangeIteratorGen
}
tcs.initCommonInterceptors(tcf, &tis.Txn, client.LeafTxn, riGen)
tcs.initCommonInterceptors(tcf, txn, client.LeafTxn, riGen)

// Per-interceptor leaf initialization. If/when more interceptors
// need leaf initialization, this should be turned into an interface
Expand Down Expand Up @@ -369,7 +375,7 @@ func newLeafTxnCoordSender(

tcs.connectInterceptors()

tcs.mu.txn.Update(&tis.Txn)
tcs.mu.txn.Update(txn)
return tcs
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,21 @@ func (sr *txnSpanRefresher) SendLocked(
// refreshes shouldn't check values below batchReadTimestamp, so initialize
// sr.refreshedTimestamp.
sr.refreshedTimestamp = batchReadTimestamp
} else if batchReadTimestamp.Less(sr.refreshedTimestamp) {
// sr.refreshedTimestamp might be ahead of batchReadTimestamp. We want to
// read at the latest refreshed timestamp, so bump the batch.
// batchReadTimestamp can be behind after a successful refresh, if the
// TxnCoordSender hasn't actually heard about the updated read timestamp.
// This can happen if a refresh succeeds, but then the retry of the batch
// that produced the timestamp fails without returning the update txn (for
// example, through a canceled ctx). The client should only be sending
// rollbacks in such cases.
ba.Txn.ReadTimestamp.Forward(sr.refreshedTimestamp)
ba.Txn.WriteTimestamp.Forward(sr.refreshedTimestamp)
} else if sr.refreshedTimestamp != batchReadTimestamp {
// Sanity check: we're supposed to control the read timestamp. What we're
// tracking in sr.refreshedTimestamp is not supposed to get out of sync
// with what batches use (which comes from tc.mu.txn).
return nil, roachpb.NewError(errors.AssertionFailedf(
"unexpected batch read timestamp: %s. Expected refreshed timestamp: %s. ba: %s",
batchReadTimestamp, sr.refreshedTimestamp, ba))
"unexpected batch read timestamp: %s. Expected refreshed timestamp: %s. ba: %s. txn: %s",
batchReadTimestamp, sr.refreshedTimestamp, ba, ba.Txn))
}

if rArgs, hasET := ba.GetArg(roachpb.EndTxn); hasET {
Expand Down
3 changes: 0 additions & 3 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
// Note that writes will be performed at the provisional commit timestamp,
// txn.Timestamp, regardless of the batch timestamp.
ba.Timestamp = txn.ReadTimestamp
// For compatibility with 19.2 nodes which might not have set ReadTimestamp,
// fallback to DeprecatedOrigTimestamp.
ba.Timestamp.Forward(txn.DeprecatedOrigTimestamp)
} else {
// When not transactional, allow empty timestamp and use nowFn instead
if ba.Timestamp == (hlc.Timestamp{}) {
Expand Down
19 changes: 13 additions & 6 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,15 +1003,22 @@ func (t *Transaction) Update(o *Transaction) {
// Nothing to do.
}

// If the refreshed timestamp move forward, overwrite
// WriteTooOld, otherwise the flags are cumulative.
if t.ReadTimestamp.Less(o.ReadTimestamp) {
t.WriteTooOld = o.WriteTooOld
t.CommitTimestampFixed = o.CommitTimestampFixed
} else {
if t.ReadTimestamp.Equal(o.ReadTimestamp) {
// If neither of the transactions has a bumped ReadTimestamp, then the
// WriteTooOld flag is cumulative.
t.WriteTooOld = t.WriteTooOld || o.WriteTooOld
t.CommitTimestampFixed = t.CommitTimestampFixed || o.CommitTimestampFixed
} else if t.ReadTimestamp.Less(o.ReadTimestamp) {
// If `o` has a higher ReadTimestamp (i.e. it's the result of a refresh,
// which refresh generally clears the WriteTooOld field), then it dictates
// the WriteTooOld field. This relies on refreshes not being performed
// concurrently with any requests whose response's WriteTooOld field
// matters.
t.WriteTooOld = o.WriteTooOld
t.CommitTimestampFixed = o.CommitTimestampFixed
}
// If t has a higher ReadTimestamp, than it gets to dictate the
// WriteTooOld field - so there's nothing to update.

if t.Sequence < o.Sequence {
t.Sequence = o.Sequence
Expand Down
24 changes: 24 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,30 @@ func TestTransactionUpdate(t *testing.T) {
expTxn4.Sequence = txn.Sequence + 10
require.Equal(t, expTxn4, txn4)

// Test the updates to the WriteTooOld field. The WriteTooOld field is
// supposed to be dictated by the transaction with the higher ReadTimestamp,
// or it's cumulative when the ReadTimestamps are equal.
{
txn2 := txn
txn2.ReadTimestamp = txn2.ReadTimestamp.Add(-1, 0)
txn2.WriteTooOld = false
txn2.Update(&txn)
require.True(t, txn2.WriteTooOld)
}
{
txn2 := txn
txn2.WriteTooOld = false
txn2.Update(&txn)
require.True(t, txn2.WriteTooOld)
}
{
txn2 := txn
txn2.ReadTimestamp = txn2.ReadTimestamp.Add(1, 0)
txn2.WriteTooOld = false
txn2.Update(&txn)
require.False(t, txn2.WriteTooOld)
}

// Updating a Transaction at a future epoch ignores all epoch-scoped fields.
var txn5 Transaction
txn5.ID = txn.ID
Expand Down
3 changes: 0 additions & 3 deletions pkg/storage/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,6 @@ func IsEndTxnTriggeringRetryError(
retry, reason = true, roachpb.RETRY_WRITE_TOO_OLD
} else {
readTimestamp := txn.ReadTimestamp
// For compatibility with 19.2 nodes which might not have set
// ReadTimestamp, fallback to DeprecatedOrigTimestamp.
readTimestamp.Forward(txn.DeprecatedOrigTimestamp)
isTxnPushed := txn.WriteTimestamp != readTimestamp

// Return a transaction retry error if the commit timestamp isn't equal to
Expand Down
3 changes: 0 additions & 3 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1459,9 +1459,6 @@ func mvccPutInternal(
writeTimestamp := timestamp
if txn != nil {
readTimestamp = txn.ReadTimestamp
// For compatibility with 19.2 nodes which might not have set
// ReadTimestamp, fallback to DeprecatedOrigTimestamp.
readTimestamp.Forward(txn.DeprecatedOrigTimestamp)
if readTimestamp != timestamp {
return errors.AssertionFailedf(
"mvccPutInternal: txn's read timestamp %s does not match timestamp %s",
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ func (s *Store) Send(
}
}

if ba.Txn != nil && ba.Txn.ReadTimestamp.Less(ba.Txn.DeprecatedOrigTimestamp) {
// For compatibility with 19.2 nodes which might not have set ReadTimestamp,
// fallback to DeprecatedOrigTimestamp. Note that even if ReadTimestamp is
// set, it might still be less than DeprecatedOrigTimestamp if the txn was
// restarted.
ba.Txn = ba.Txn.Clone()
ba.Txn.ReadTimestamp = ba.Txn.DeprecatedOrigTimestamp
}
if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
return nil, roachpb.NewError(err)
}
Expand Down

0 comments on commit c4fb5cb

Please sign in to comment.