Skip to content

Commit

Permalink
sql,kv: add SQL savepoints support
Browse files Browse the repository at this point in the history
This patch adds support for SAVEPOINT <foo>, RELEASE SAVEPOINT <foo>,
ROLLBACK TO SAVEPOINT <foo>.
Before this patch, we only had support for the special savepoint
cockroach_restart, which had to be placed at the beginning of the
transaction and was specifically intended for dealing with transaction
retries. This patch implements general support for savepoints, which
provide an error recovery mechanism.

The connExecutor now maintains a stack of savepoints. Rolling back to a
savepoint uses the recent KV api for ignoring a range of write sequence
numbers.

At the SQL level, savepoints differ in two characteristics:
1) savepoints placed at the beginning of a transaction (i.e. before any
KV operations) are marked as "initial". Rolling back to an initial
savepoint is always possible. Rolling back to a non-initial savepoint is
not possible after the transaction restarts (see below).
2) the savepoint named "cockroach_restart" retains special RELEASE
semantics: releasing it (attempts to) commit the underlying KV txn.
This continues to allow for discovering of deferred serilizability
errors (i.e. write timestamp pushes by the ts cache). As before, this
RELEASE can fail with a retriable error, at which point the client can
do ROLLBACK TO SAVEPOINT cockroach_restart (which is guaranteed to work
because cockroach_restart needs to be an "initial" savepoint). The
transaction continues to maintain all its locks after such an error.
This is all in contrast to releasing any other savepoints, which cannot
commit the txn and also never fails. See below for more discussion.
The cockroach_restart savepoint is only special in its release behavior,
not in its rollback behavior.

With the implementation of savepoints, the state machine driving a SQL
connection's transactions becomes a lot simpler. There's no longer a
distinction between an "Aborted" transaction and one that's in
"RestartWait". Rolling back to a savepoint now works the same way across
the two states, so RestartWait is gone.

This patch also improves the KV savepoints. They now capture and restore
the state of the read spans and the in-flight writes.

Some things don't work (yet):
a) Rolling back to a savepoint created after a schema change will error
out. This is because we don't yet snapshot the transaction's schema
change state.
b) After a retriable error, you can only rollback to an initial
savepoint. Attempting to rollback to a non-initial savepoint generates a
retriable error again. If the trasaction has been aborted, I think this
is the right behavior; no recovery is possible since the transaction has
lost its write intents. In the usual case where the transaction has not
been aborted, I think we want something better but it will take more
work to get it. I think the behavior we want is the following:
- after a serializability failure, retrying just part of the transaction
should be doable by attempting a ROLLBACK TO SAVEPOINT. This rollback
should succeed if all the non-rolled-back reads can be refreshed to the
desired timestamp. If they can be refreshed, then the client can simply
retry the rolled back part of the transaction. If they can't, then the
ROLLBACK should return a retriable error again, allowing the client to
attempt a deeper rollback - and so on until the client rolls back to an
initial savepoint (which succeeds by definition).
Implementing this would allow for the following nifty pattern:

func fn_n() {
  for {
    SAVEPOINT savepoint_n
    try {
      fn_n+1()
    } catch retriable error {
      err := ROLLBACK TO SAVEPOINT outer
      if err != nil {
        throw err
      }
      continue
    }
    RELEASE SAVEPOINT savepoint_n
    break
  }
}

The idea here is that the client is trying to re-do as little work as
possible by successively rolling back to earlier and earlier savepoints.
This pattern will technically work with the current patch already,
except it will not actually help the client in any way since all the
rollbacks will fail until we get to the very first savepoint.
There's an argument to be made for making RELEASE SAVEPOINT check for
deferred serializability violations (and perhaps other deferred checks -
like deferred constraint validation), although Postgres doesn't do any
of these.
Anyway, I've left implementing this for a future patch because I want to
do some KV work for supporting it nicely. Currently, the automatic
restart behavior that KV transactions have is a pain in the ass since it
works against what we're trying to do.

For the time-being, non-initial savepoints remember their txn ID and
epoch and attempting to rollback to them after these changes produces a
retriable error automatically.

Fixes #45477
Touches #10735

Release note (sql change): SQL savepoints are now supported. SAVEPOINT
<foo>, RELEASE SAVEPOINT <foo>, ROLLBACK TO SAVEPOINT <foo> now works.
`SHOW SAVEPOINT STATUS` can be used to inspect the current stack of active
savepoints.

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
andreimatei and knz committed Mar 6, 2020
1 parent 19b91d7 commit 06fc1d2
Show file tree
Hide file tree
Showing 41 changed files with 1,762 additions and 706 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/interactive_tests/test_txn_prompt.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ eexpect root@
send "SELECT crdb_internal.force_retry('1s':::INTERVAL);\r"
eexpect "ERROR: restart transaction"
eexpect root@
eexpect "RETRY>"
eexpect "ERROR>"
end_test

start_test "Test that prompt reverts to OPEN at beginning of new attempt."
Expand Down
4 changes: 1 addition & 3 deletions pkg/cli/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,12 @@ func (c *cliState) refreshTransactionStatus() {

// Change the prompt based on the response from the server.
switch txnString {
case sql.NoTxnStr:
case sql.NoTxnStateStr:
c.lastKnownTxnStatus = ""
case sql.AbortedStateStr:
c.lastKnownTxnStatus = " ERROR"
case sql.CommitWaitStateStr:
c.lastKnownTxnStatus = " DONE"
case sql.RestartWaitStateStr:
c.lastKnownTxnStatus = " RETRY"
case sql.OpenStateStr:
// The state AutoRetry is reported by the server as Open, so no need to
// handle it here.
Expand Down
56 changes: 0 additions & 56 deletions pkg/cmd/roachtest/pgjdbc_blacklist.go

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,13 @@ const (

// SavepointToken represents a savepoint.
type SavepointToken interface {
// SavepointToken is a marker interface.
SavepointToken()
// Initial returns true if this savepoint has been created before performing
// any KV operations. If so, it is possible to rollback to it after a
// retriable error. If not, then rolling back to it after a retriable error
// will return the retriable error again because reads might have been
// evaluated before the savepoint and such reads cannot have their timestamp
// forwarded without a refresh.
Initial() bool
}

// TxnStatusOpt represents options for TxnSender.GetMeta().
Expand Down
36 changes: 28 additions & 8 deletions pkg/kv/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,6 @@ commit
subtest end

subtest rollback_across_retry

# TODO(knz): change this test when rolling back across retries becomes
# supported.

begin
----
0 <noignore>
Expand All @@ -339,11 +335,11 @@ epoch: 0 -> 1

release x
----
(*withstack.withStack) cannot release savepoint across transaction retries
0 <noignore>

rollback x
----
(*withstack.withStack) cannot rollback savepoint across transaction retries
0 <noignore>

subtest end

Expand All @@ -364,11 +360,35 @@ txn id changed

release x
----
(*withstack.withStack) cannot release savepoint across transaction retries
0 <noignore>

rollback x
----
(*withstack.withStack) cannot rollback savepoint across transaction retries
0 <noignore>

subtest end

subtest rollback_across_retry_fails_for_non-initial_savepoint
# The difference from the previous test is that here we do a write before
# creating the savepoint.
begin
----
0 <noignore>

put k a
----

savepoint x
----
1 <noignore>

retry
----
synthetic error: TransactionRetryWithProtoRefreshError: forced retry
epoch: 0 -> 1

rollback x
----
(*roachpb.TransactionRetryWithProtoRefreshError) TransactionRetryWithProtoRefreshError: cannot rollback to savepoint after a transaction restart

subtest end
15 changes: 13 additions & 2 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ type TxnCoordSender struct {
// clients on Send().
storedErr *roachpb.Error

// active is set whenever the transaction has sent any requests.
// active is set whenever the transaction has sent any requests. Rolling
// back to a savepoint taken before the TxnCoordSender became active resets
// the field to false.
active bool

// closed is set once this transaction has either committed or rolled back
Expand Down Expand Up @@ -176,6 +178,14 @@ type txnInterceptor interface {
// increment.
epochBumpedLocked()

// createSavepointLocked is used to populate a savepoint with all the state
// that needs to be restored on a rollback.
createSavepointLocked(context.Context, *savepoint)

// rollbackToSavepointLocked is used to restore the state previously saved by
// createSavepointLocked().
rollbackToSavepointLocked(context.Context, savepoint)

// closeLocked closes the interceptor. It is called when the TxnCoordSender
// shuts down due to either a txn commit or a txn abort. The method will
// be called exactly once from cleanupTxnLocked.
Expand Down Expand Up @@ -943,7 +953,8 @@ func (tc *TxnCoordSender) IsTracking() bool {
return tc.interceptorAlloc.txnHeartbeater.heartbeatLoopRunningLocked()
}

// Active returns true iff there were commands executed already.
// Active returns true if requests were sent already. Rolling back to a
// savepoint taken before any requests were sent resets this to false.
func (tc *TxnCoordSender) Active() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
Expand Down
171 changes: 101 additions & 70 deletions pkg/kv/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,48 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

// savepointToken captures the state in the TxnCoordSender necessary
// to restore that state upon a savepoint rollback.
//
// TODO(knz,andrei): Currently this definition is only sufficient for
// just a few cases of rollbacks. This should be extended to cover
// more ground:
//
// - We also need the current size of txnSpanRefresher.refreshSpans the
// list of tracked reads, such that upon rollback we tell the
// refresher interceptor to discard further reads.
// - We also need something about in-flight writes
// (txnPipeliner.ifWrites). There I guess we need to take some sort of
// snapshot of the current in-flight writes and, on rollback, discard
// in-flight writes that are not part of the savepoint. But, on
// rollback, I don't think we should (nor am I sure that we could)
// simply overwrite the set of in-flight writes with the ones from the
// savepoint because writes that have been verified since the snapshot
// has been taken should continue to be verified. Basically, on
// rollback I think we need to intersect the savepoint with the
// current set of in-flight writes.
type savepointToken struct {
// seqNum is currently the only field that helps to "restore"
// anything upon a rollback. When used, it does not change anything
// in the TCS; instead it simply configures the txn to ignore all
// seqnums from this value until the most recent seqnum emitted by
// the TCS.
seqNum enginepb.TxnSeq

// txnID is used to verify that a rollback is not used to paper
// over a txn abort error.
// savepoint captures the state in the TxnCoordSender necessary to restore that
// state upon a savepoint rollback.
type savepoint struct {
// active is a snapshot of TxnCoordSender.active.
active bool

// txnID and epoch are set for savepoints with the active field set.
// txnID and epoch are used to disallow rollbacks past transaction restarts.
// Savepoints without the active field set are allowed to be used to rollback
// past transaction restarts too, because it's trivial to rollback to the
// beginning of the transaction.
txnID uuid.UUID
// epoch is used to verify that a savepoint rollback is not
// used to paper over a retry error.
// TODO(knz,andrei): expand savepoint rollbacks to recover
// from retry errors.
// TODO(knz,andrei): remove the epoch mechanism entirely in
// favor of seqnums and savepoint rollbacks.
epoch enginepb.TxnEpoch

// seqNum represents the write seq num at the time the savepoint was created.
// On rollback, it configures the txn to ignore all seqnums from this value
// until the most recent seqnum.
seqNum enginepb.TxnSeq

// txnSpanRefresher fields.
refreshSpans []roachpb.Span
refreshInvalid bool
refreshSpanBytes int64
}

var _ client.SavepointToken = (*savepointToken)(nil)
var _ client.SavepointToken = (*savepoint)(nil)

// SavepointToken implements the client.SavepointToken interface.
func (s *savepointToken) SavepointToken() {}
// statically allocated savepoint marking the beginning of a transaction. Used
// to avoid allocations for such savepoints.
var initialSavepoint = savepoint{}

// Initial implements the client.SavepointToken interface.
func (s *savepoint) Initial() bool {
return !s.active
}

// CreateSavepoint is part of the client.TxnSender interface.
func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.SavepointToken, error) {
Expand All @@ -82,11 +74,22 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.Savepoint
return nil, ErrSavepointOperationInErrorTxn
}

return &savepointToken{
if !tc.mu.active {
// Return a preallocated savepoint for the common case of savepoints placed
// at the beginning of transactions.
return &initialSavepoint, nil
}

s := &savepoint{
active: true, // we've handled the not-active case above
txnID: tc.mu.txn.ID,
epoch: tc.mu.txn.Epoch,
seqNum: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
}, nil
}
for _, reqInt := range tc.interceptorStack {
reqInt.createSavepointLocked(ctx, s)
}

return s, nil
}

// RollbackToSavepoint is part of the client.TxnSender interface.
Expand All @@ -102,25 +105,41 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s client.Save
return err
}

st, err := tc.checkSavepointLocked(s, "rollback")
sp := s.(*savepoint)
err := tc.checkSavepointLocked(sp)
if err != nil {
if err == errSavepointInvalidAfterTxnRestart {
err = roachpb.NewTransactionRetryWithProtoRefreshError(
"cannot rollback to savepoint after a transaction restart",
tc.mu.txn.ID,
// The transaction inside this error doesn't matter.
roachpb.Transaction{},
)
}
return err
}

// TODO(knz): handle recoverable errors.
if tc.mu.txnState == txnError {
return unimplemented.New("rollback_error", "savepoint rollback after error")
if tc.mu.txnState == txnFinalized {
return unimplemented.New("rollback_error", "savepoint rollback finalized txn")
}

if st.seqNum == tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
// No operations since savepoint was taken. No-op.
return nil
// Restore the transaction's state, in case we're rewiding after an error.
tc.mu.txnState = txnPending

tc.mu.active = sp.active

for _, reqInt := range tc.interceptorStack {
reqInt.rollbackToSavepointLocked(ctx, *sp)
}

tc.mu.txn.AddIgnoredSeqNumRange(
enginepb.IgnoredSeqNumRange{
Start: st.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
// If there's been any more writes since the savepoint was created, they'll
// need to be ignored.
if sp.seqNum < tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
tc.mu.txn.AddIgnoredSeqNumRange(
enginepb.IgnoredSeqNumRange{
Start: sp.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
}

return nil
}
Expand All @@ -138,7 +157,16 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s client.Savepoi
return ErrSavepointOperationInErrorTxn
}

_, err := tc.checkSavepointLocked(s, "release")
sp := s.(*savepoint)
err := tc.checkSavepointLocked(sp)
if err == errSavepointInvalidAfterTxnRestart {
err = roachpb.NewTransactionRetryWithProtoRefreshError(
"cannot release savepoint after a transaction restart",
tc.mu.txn.ID,
// The transaction inside this error doesn't matter.
roachpb.Transaction{},
)
}
return err
}

Expand All @@ -159,26 +187,29 @@ func (tc *TxnCoordSender) assertNotFinalized() error {
return nil
}

func (tc *TxnCoordSender) checkSavepointLocked(
s client.SavepointToken, opName string,
) (*savepointToken, error) {
st, ok := s.(*savepointToken)
if !ok {
return nil, errors.AssertionFailedf("expected savepointToken, got %T", s)
}
var errSavepointInvalidAfterTxnRestart = errors.New("savepoint invalid after transaction restart")

if st.txnID != tc.mu.txn.ID {
return nil, errors.Newf("cannot %s savepoint across transaction retries", opName)
// checkSavepointLocked checks whether the provided savepoint is still valid.
// Returns errSavepointInvalidAfterTxnRestart if the savepoint is not an
// "initial" one and the transaction has restarted since the savepoint was
// created.
func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint) error {
// Only savepoints taken before any activity are allowed to be used after a
// transaction restart.
if s.Initial() {
return nil
}

if st.epoch != tc.mu.txn.Epoch {
return nil, errors.Newf("cannot %s savepoint across transaction retries", opName)
if s.txnID != tc.mu.txn.ID {
return errSavepointInvalidAfterTxnRestart
}
if s.epoch != tc.mu.txn.Epoch {
return errSavepointInvalidAfterTxnRestart
}

if st.seqNum < 0 || st.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
return nil, errors.AssertionFailedf("invalid savepoint: got %d, expected 0-%d",
st.seqNum, tc.interceptorAlloc.txnSeqNumAllocator.writeSeq)
if s.seqNum < 0 || s.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
return errors.AssertionFailedf("invalid savepoint: got %d, expected 0-%d",
s.seqNum, tc.interceptorAlloc.txnSeqNumAllocator.writeSeq)
}

return st, nil
return nil
}
6 changes: 6 additions & 0 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,12 @@ func (*txnCommitter) importLeafFinalState(*roachpb.LeafTxnFinalState) {}
// epochBumpedLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) epochBumpedLocked() {}

// createSavepointLocked is part of the txnReqInterceptor interface.
func (*txnCommitter) createSavepointLocked(context.Context, *savepoint) {}

// rollbackToSavepointLocked is part of the txnReqInterceptor interface.
func (*txnCommitter) rollbackToSavepointLocked(context.Context, savepoint) {}

// closeLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) closeLocked() {}

Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ func (*txnHeartbeater) importLeafFinalState(*roachpb.LeafTxnFinalState) {}
// epochBumpedLocked is part of the txnInterceptor interface.
func (h *txnHeartbeater) epochBumpedLocked() {}

// createSavepointLocked is part of the txnReqInterceptor interface.
func (*txnHeartbeater) createSavepointLocked(context.Context, *savepoint) {}

// rollbackToSavepointLocked is part of the txnReqInterceptor interface.
func (*txnHeartbeater) rollbackToSavepointLocked(context.Context, savepoint) {}

// closeLocked is part of the txnInterceptor interface.
func (h *txnHeartbeater) closeLocked() {
h.cancelHeartbeatLoopLocked()
Expand Down
Loading

0 comments on commit 06fc1d2

Please sign in to comment.