Skip to content

Commit

Permalink
Support user-directed retries through the RETRY INTENT, RELEASE, RESTART
Browse files Browse the repository at this point in the history
syntax.

fixes #4877, fixes #4370
referencing #2625
  • Loading branch information
andreimatei committed Mar 18, 2016
1 parent f7aca8c commit 03dd53d
Show file tree
Hide file tree
Showing 21 changed files with 5,703 additions and 4,148 deletions.
6 changes: 5 additions & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,14 @@ func (db *DB) RunWithResponse(b *Batch) (*roachpb.BatchResponse, *roachpb.Error)
func (db *DB) Txn(retryable func(txn *Txn) *roachpb.Error) *roachpb.Error {
txn := NewTxn(*db)
txn.SetDebugName("", 1)
return txn.Exec(TxnExecOptions{AutoRetry: true, AutoCommit: true},
pErr := txn.Exec(TxnExecOptions{AutoRetry: true, AutoCommit: true},
func(txn *Txn, _ *TxnExecOptions) *roachpb.Error {
return retryable(txn)
})
if pErr != nil {
txn.CleanupOnError(pErr)
}
return pErr
}

// send runs the specified calls synchronously in a single batch and returns
Expand Down
19 changes: 9 additions & 10 deletions client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var DefaultTxnRetryOptions = retry.Options{
// method out of the Txn method set.
type txnSender Txn

// Send updates the transaction on error. Depending on the error type, the
// transaction might be replaced by a new one.
func (ts *txnSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
// Send call through wrapped sender.
ba.Txn = &ts.Proto
Expand Down Expand Up @@ -88,7 +90,7 @@ func (ts *txnSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachp
if pErr.GetTxn() != nil {
ts.Proto.Priority = pErr.GetTxn().Priority
}
} else if pErr.TransactionRestart != roachpb.TransactionRestart_ABORT {
} else if pErr.TransactionRestart != roachpb.TransactionRestart_NONE {
ts.Proto.Update(pErr.GetTxn())
}
return nil, pErr
Expand Down Expand Up @@ -341,7 +343,7 @@ func (txn *Txn) commit(deadline *roachpb.Timestamp) *roachpb.Error {
// CleanupOnError cleans up the transaction as a result of an error.
func (txn *Txn) CleanupOnError(pErr *roachpb.Error) {
if pErr == nil {
panic("CleanupOnError called without an error")
panic("no error")
}
if replyErr := txn.Rollback(); replyErr != nil {
log.Errorf("failure aborting transaction: %s; abort caused by: %s", replyErr, pErr)
Expand Down Expand Up @@ -449,7 +451,10 @@ type TxnExecOptions struct {
// that a ROLLBACK will reset the state. Neither opt.AutoRetry not opt.AutoCommit
// can be set in this case.
//
// If an error is returned, the txn has been aborted.
// When this method returns, txn might be in any state; Exec does not attempt
// to clean up the transaction before returning an error. In case of
// TransactionAbortedError, txn is reset to a fresh transaction, ready to be
// used.
func (txn *Txn) Exec(
opt TxnExecOptions,
fn func(txn *Txn, opt *TxnExecOptions) *roachpb.Error) *roachpb.Error {
Expand Down Expand Up @@ -478,7 +483,7 @@ RetryLoop:
pErr = fn(txn, &opt)
if (pErr == nil) && opt.AutoCommit && (txn.Proto.Status == roachpb.PENDING) {
// fn succeeded, but didn't commit.
pErr = txn.commit(nil)
pErr = txn.CommitNoCleanup()
}

if pErr == nil {
Expand Down Expand Up @@ -510,12 +515,6 @@ RetryLoop:
}
}

if txn != nil && pErr != nil {
// TODO(andrei): don't do Cleanup() on retriable errors here.
// Let the sql executor do it.
txn.CleanupOnError(pErr)
}

if pErr != nil {
pErr.StripErrorTransaction()
}
Expand Down
2 changes: 1 addition & 1 deletion client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func TestRunTransactionRetryOnErrors(t *testing.T) {
}
}

// TestAbortTransactionOnCommitErrors verifies that non-exec transactions are
// TestAbortTransactionOnCommitErrors verifies that transactions are
// aborted on the correct errors.
func TestAbortTransactionOnCommitErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
1 change: 1 addition & 0 deletions roachpb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,30 @@ func (*DidntUpdateDescriptorError) message(_ *Error) string {

var _ ErrorDetailInterface = &DidntUpdateDescriptorError{}

// Error formats error.
func (e *SqlTransactionCommittedError) Error() string {
return e.message(nil)
}

// message returns an error message.
func (*SqlTransactionCommittedError) message(_ *Error) string {
return "current transaction is committed, commands ignored until end of transaction block"
}

var _ ErrorDetailInterface = &SqlTransactionCommittedError{}

// Error formats error.
func (e *SqlTransactionAbortedError) Error() string {
return e.message(nil)
}

// message returns an error message.
func (*SqlTransactionAbortedError) message(_ *Error) string {
return "current transaction is aborted, commands ignored until end of transaction block"
func (e *SqlTransactionAbortedError) message(_ *Error) string {
msg := "current transaction is aborted, commands ignored until end of transaction block"
if e.CustomMsg != "" {
msg += "; " + e.CustomMsg
}
return msg
}

var _ ErrorDetailInterface = &SqlTransactionAbortedError{}
Expand Down
Loading

0 comments on commit 03dd53d

Please sign in to comment.