Skip to content

Commit

Permalink
client: explicitly reject requests sent after an error
Browse files Browse the repository at this point in the history
Requests other than a rollback cannot be sent on a Txn after a previous
request received a non-retriable error because the state of the txn
record is unclear (does it exist? Should another BeginTxn be sent?).
This patch adds explicit checks for rejecting such batches.
  • Loading branch information
andreimatei committed May 16, 2018
1 parent 990ca52 commit 033c6e8
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 17 deletions.
32 changes: 28 additions & 4 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ const (
// that. Note that the server accepts a BeginTxn with a higher epoch if a
// transaction record already exists.
txnWriteInOldEpoch
// txnError means that the txn had performed some writes and then a batch got
// a non-retriable error. Further batches except EndTransaction(commit=false)
// will be rejected.
txnError
)

// NewTxn returns a new txn. The typ parameter specifies whether this
Expand Down Expand Up @@ -684,6 +688,8 @@ func (txn *Txn) maybeFinishReadonly(
return writing, nil
case txnWriteInOldEpoch:
return oldEpochWrite, nil
case txnError:
return writing, nil
case txnReadOnly:
default:
log.Fatalf(ctx, "unknown state: %d", txn.mu.state)
Expand Down Expand Up @@ -953,6 +959,14 @@ func (txn *Txn) Send(
txn.mu.Lock()
defer txn.mu.Unlock()

if txn.mu.state == txnError {
singleAbort := ba.IsSingleEndTransactionRequest() &&
!ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit
if !singleAbort {
return roachpb.NewErrorf("txn already encountered an error; cannot be used anymore")
}
}

sender = txn.mu.sender
if txn.mu.Proto.Status != roachpb.PENDING || txn.mu.finalized {
return roachpb.NewErrorf(
Expand Down Expand Up @@ -1072,8 +1086,10 @@ func (txn *Txn) Send(
if log.V(1) {
log.Infof(ctx, "failed batch: %s", pErr)
}
var retriable bool
switch t := pErr.GetDetail().(type) {
case *roachpb.HandledRetryableTxnError:
retriable = true
retryErr := t
if requestTxnID != retryErr.TxnID {
// KV should not return errors for transactions other than the one that sent
Expand All @@ -1091,11 +1107,19 @@ func (txn *Txn) Send(
// Note that unhandled retryable txn errors are allowed from leaf
// transactions. We pass them up through distributed SQL flows to
// the root transactions, at the receiver.
if txn.typ == RootTxn && pErr.TransactionRestart != roachpb.TransactionRestart_NONE {
log.Fatalf(ctx,
"unexpected retryable error at the client.Txn level: (%T) %s",
pErr.GetDetail(), pErr)
if pErr.TransactionRestart != roachpb.TransactionRestart_NONE {
retriable = true
if txn.typ == RootTxn {
log.Fatalf(ctx,
"unexpected retryable error at the client.Txn level: (%T) %s",
pErr.GetDetail(), pErr)
}
}

if !retriable && txn.mu.state == txnWriting {
txn.mu.state = txnError
}

return nil, pErr
}

Expand Down
11 changes: 0 additions & 11 deletions pkg/sql/logictest/testdata/logic_test/parallel_stmts
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,6 @@ SHOW TIME ZONE
----
UTC

query TT
SHOW CREATE TABLE kv
----
kv CREATE TABLE kv (
k INT NOT NULL,
v INT NULL,
CONSTRAINT "primary" PRIMARY KEY (k ASC),
FAMILY "primary" (k, v),
CONSTRAINT check_v CHECK (v < 100)
)

query T
SHOW TRANSACTION STATUS
----
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/sem/tree/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type HiddenFromShowQueries interface {
// IndependentFromParallelizedPriors is a pseudo-interface to be implemented
// by statements which do not force parallel statement execution synchronization
// when they run.
// NB: Only statements that don't send any requests using the current
// transaction can implement this. Otherwise, the statement will fail if any of
// the parallel statements has encoutered a KV error (which toasts the txn).
// TODO(andrei): audit all the implementers.
type IndependentFromParallelizedPriors interface {
independentFromParallelizedPriors()
}
Expand Down Expand Up @@ -648,8 +652,7 @@ func (*ShowCreateTable) StatementType() StatementType { return Rows }
// StatementTag returns a short string identifying the type of statement.
func (*ShowCreateTable) StatementTag() string { return "SHOW CREATE TABLE" }

func (*ShowCreateTable) hiddenFromStats() {}
func (*ShowCreateTable) independentFromParallelizedPriors() {}
func (*ShowCreateTable) hiddenFromStats() {}

// StatementType implements the Statement interface.
func (*ShowCreateView) StatementType() StatementType { return Rows }
Expand Down

0 comments on commit 033c6e8

Please sign in to comment.