Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stability: prevent GC from preventing all new transactions #9524

Merged
merged 1 commit into from
Sep 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,25 +434,25 @@ func (r *Replica) BeginTransaction(
reply.Txn = &clonedTxn

// Verify transaction does not already exist.
txn := roachpb.Transaction{}
ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.ZeroTimestamp, true, nil, &txn)
tmpTxn := roachpb.Transaction{}
ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.ZeroTimestamp, true, nil, &tmpTxn)
if err != nil {
return reply, err
}
if ok {
switch txn.Status {
switch tmpTxn.Status {
case roachpb.ABORTED:
// Check whether someone has come in ahead and already aborted the
// txn.
return reply, roachpb.NewTransactionAbortedError()

case roachpb.PENDING:
if h.Txn.Epoch > txn.Epoch {
if h.Txn.Epoch > tmpTxn.Epoch {
// On a transaction retry there will be an extant txn record
// but this run should have an upgraded epoch. The extant txn
// record may have been pushed or otherwise updated, so update
// this command's txn and rewrite the record.
reply.Txn.Update(&txn)
reply.Txn.Update(&tmpTxn)
} else {
// Our txn record already exists. This is either a client error, sending
// a duplicate BeginTransaction, or it's an artefact of DistSender
Expand All @@ -462,12 +462,12 @@ func (r *Replica) BeginTransaction(

case roachpb.COMMITTED:
return reply, roachpb.NewTransactionStatusError(
fmt.Sprintf("BeginTransaction can't overwrite %s", txn),
fmt.Sprintf("BeginTransaction can't overwrite %s", tmpTxn),
)

default:
return reply, roachpb.NewTransactionStatusError(
fmt.Sprintf("bad txn state: %s", txn),
fmt.Sprintf("bad txn state: %s", tmpTxn),
)
}
}
Expand All @@ -482,7 +482,7 @@ func (r *Replica) BeginTransaction(
// (which may have been written before this entry).
//
// See #9265.
if txn.LastActive().Less(threshold) {
if reply.Txn.LastActive().Less(threshold) {
return reply, roachpb.NewTransactionAbortedError()
}

Expand Down
11 changes: 11 additions & 0 deletions storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2511,6 +2511,17 @@ func TestEndTransactionTxnSpanGCThreshold(t *testing.T) {
t.Fatalf("expected txn aborted error, got %v and response %+v", pErr, resp)
}
}

// A transaction which starts later (i.e. at a higher timestamp) should not
// be prevented from writing its record.
// See #9522.
{
txn := newTransaction("foo", key, 1, enginepb.SERIALIZABLE, tc.clock)
beginArgs, header := beginTxnArgs(key, txn)
if _, pErr := tc.SendWrappedWith(header, &beginArgs); pErr != nil {
t.Fatal(pErr)
}
}
}

// TestEndTransactionDeadline_1PC verifies that a transaction that
Expand Down