Skip to content

Commit

Permalink
sql,kv: add SQL savepoints support
Browse files Browse the repository at this point in the history
This patch adds support for SAVEPOINT <foo>, RELEASE SAVEPOINT <foo>,
ROLLBACK TO SAVEPOINT <foo>.
Before this patch, we only had support for the special savepoint
cockroach_restart, which had to be placed at the beginning of the
transaction and was specifically intended for dealing with transaction
retries. This patch implements general support for savepoints, which
provide an error recovery mechanism.

The connExecutor now maintains a stack of savepoints. Rolling back to a
savepoint uses the recent KV api for ignoring a range of write sequence
numbers.

At the SQL level, savepoints differ in two characteristics:
1) savepoints placed at the beginning of a transaction (i.e. before any
KV operations) are marked as "initial". Rolling back to an initial
savepoint is always possible. Rolling back to a non-initial savepoint is
not possible after the transaction restarts (see below).
2) the savepoint named "cockroach_restart" retains special RELEASE
semantics: releasing it (attempts to) commit the underlying KV txn.
This continues to allow for descovering of deferred serilizability
errors (i.e. write timestamp pushes by the ts cache). As before, this
RELEASE can fail with a retriable error, at which point the client can
do ROLLBACK TO SAVEPOINT cockroach_restart (which is guaranteed to work
because cockroach_restart needs to be an "initial" savepoint). The
transaction continues to maintain all its locks after such an error.
This is all in contrast to releasing any other savepoints, which cannot
commit the txn and also never fails. See below for more discussion.
The cockroach_restart savepoint is only special in its release behavior,
not in its rollback behavior.

With the implementation of savepoints, the state machine driving a SQL
connection's transactions becomes a lot simpler. There's no longer a
distinction between an "Aborted" transaction and one that's in
"RestartWait". Rolling back to a savepoint now works the same way across
the two states, so RestartWait is gone.

This patch also improves the KV savepoints. They now capture and restore
the state of the read spans and the in-flight writes.

Some things don't work (yet):
a) Rolling back to a savepoint created after a schema change will error
out. This is because we don't yet snapshot the transaction's schema
change state.
b) After a retriable error, you can only rollback to an initial
savepoint. Attempting to rollback to a non-initial savepoint generates a
retriable error again. If the trasaction has been aborted, I think this
is the right behavior; no recovery is possible since the transaction has
lost its write intents. In the usual case where the transaction has not
been aborted, I think we want something better but it will take more
work to get it. I think the behavior we want is the following:
- after a serializability failure, retrying just part of the transaction
should be doable by attempting a ROLLBACK TO SAVEPOINT. This rollback
should succeed if all the non-rolled-back reads can be refreshed to the
desired timestamp. If they can be refreshed, then the client can simply
retry the rolled back part of the transaction. If they can't, then the
ROLLBACK should return a retriable error again, allowing the client to
attempt a deeper rollback - and so on until the client rolls back to an
initial savepoint (which succeeds by definition).
Implementing this would allow for the following nifty pattern:

func fn_n() {
  for {
    SAVEPOINT savepoint_n
    try {
      fn_n+1()
    } catch retriable error {
      err := ROLLBACK TO SAVEPOINT outer
      if err != nil {
        throw err
      }
      continue
    }
    RELEASE SAVEPOINT savepoint_n
    break
  }
}

The idea here is that the client is trying to re-do as little work as
possible by successively rolling back to earlier and earlier savepoints.
This pattern will technically work with the current patch already,
except it will not actually help the client in any way since all the
rollbacks will fail until we get to the very first savepoint.
There's an argument to be made for making RELEASE SAVEPOINT check for
deferred serializability violations (and perhaps other deferred checks -
like deferred constraint validation), although Postgres doesn't do any
of these.
Anyway, I've left implementing this for a future patch because I want to
do some KV work for supporting it nicely. Currently, the automatic
restart behavior that KV transactions have is a pain in the ass since it
works against what we're trying to do.

For the time-being, non-initial savepoints remember their txn ID and
epoch and attempting to rollback to them after these changes produces a
retriable error automatically.

Release note (sql change): SQL savepoints are now supported. SAVEPOINT
<foo>, RELEASE SAVEPOINT <foo>, ROLLBACK TO SAVEPOINT <foo> now works.
`SHOW SAVEPOINT STATUS` can be used to inspect the current stack of active
savepoints.

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
andreimatei and knz committed Mar 3, 2020
1 parent edbe475 commit 7b4a5e3
Show file tree
Hide file tree
Showing 37 changed files with 1,709 additions and 670 deletions.
2 changes: 0 additions & 2 deletions pkg/cli/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,8 +746,6 @@ func (c *cliState) refreshTransactionStatus() {
c.lastKnownTxnStatus = " ERROR"
case sql.CommitWaitStateStr:
c.lastKnownTxnStatus = " DONE"
case sql.RestartWaitStateStr:
c.lastKnownTxnStatus = " RETRY"
case sql.OpenStateStr:
// The state AutoRetry is reported by the server as Open, so no need to
// handle it here.
Expand Down
107 changes: 54 additions & 53 deletions pkg/kv/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -319,56 +319,57 @@ commit

subtest end

subtest rollback_across_retry

# TODO(knz): change this test when rolling back across retries becomes
# supported.

begin
----
0 <noignore>

savepoint x
----
0 <noignore>

retry
----
synthetic error: TransactionRetryWithProtoRefreshError: forced retry
epoch: 0 -> 1

release x
----
(*withstack.withStack) cannot release savepoint across transaction retries

rollback x
----
(*withstack.withStack) cannot rollback savepoint across transaction retries

subtest end

subtest rollback_across_abort

begin
----
0 <noignore>

savepoint x
----
0 <noignore>

abort
----
(*roachpb.TransactionRetryWithProtoRefreshError)
txn id changed

release x
----
(*withstack.withStack) cannot release savepoint across transaction retries

rollback x
----
(*withstack.withStack) cannot rollback savepoint across transaction retries


subtest end
# !!!
# subtest rollback_across_retry
#
# # TODO(knz): change this test when rolling back across retries becomes
# # supported.
#
# begin
# ----
# 0 <noignore>
#
# savepoint x
# ----
# 0 <noignore>
#
# retry
# ----
# synthetic error: TransactionRetryWithProtoRefreshError: forced retry
# epoch: 0 -> 1
#
# release x
# ----
# (*withstack.withStack) cannot release savepoint across transaction retries
#
# rollback x
# ----
# (*withstack.withStack) cannot rollback savepoint across transaction retries
#
# subtest end
#
# subtest rollback_across_abort
#
# begin
# ----
# 0 <noignore>
#
# savepoint x
# ----
# 0 <noignore>
#
# abort
# ----
# (*roachpb.TransactionRetryWithProtoRefreshError)
# txn id changed
#
# release x
# ----
# (*withstack.withStack) cannot release savepoint across transaction retries
#
# rollback x
# ----
# (*withstack.withStack) cannot rollback savepoint across transaction retries
#
#
# subtest end
10 changes: 10 additions & 0 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ type txnInterceptor interface {
// increment.
epochBumpedLocked()

// createSavepoint is used to populate a savepoint with all the state the
// needs to be restored on a rollback.
createSavepoint(context.Context, *savepoint)

// rollbackToSavepoint is used to restore the state previously saved by createSavepoint().
// implementations are allowed to modify the savepoint if they want to, optimizing it for future
// rollbacks. For example, the txnPipeliner removes tracked writes from the savepoint which have
// already been verified since the savepoint has been created.
rollbackToSavepoint(context.Context, *savepoint)

// closeLocked closes the interceptor. It is called when the TxnCoordSender
// shuts down due to either a txn commit or a txn abort. The method will
// be called exactly once from cleanupTxnLocked.
Expand Down
80 changes: 40 additions & 40 deletions pkg/kv/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,26 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/google/btree"
)

// savepointToken captures the state in the TxnCoordSender necessary
// to restore that state upon a savepoint rollback.
//
// TODO(knz,andrei): Currently this definition is only sufficient for
// just a few cases of rollbacks. This should be extended to cover
// more ground:
//
// - We also need the current size of txnSpanRefresher.refreshSpans the
// list of tracked reads, such that upon rollback we tell the
// refresher interceptor to discard further reads.
// - We also need something about in-flight writes
// (txnPipeliner.ifWrites). There I guess we need to take some sort of
// snapshot of the current in-flight writes and, on rollback, discard
// in-flight writes that are not part of the savepoint. But, on
// rollback, I don't think we should (nor am I sure that we could)
// simply overwrite the set of in-flight writes with the ones from the
// savepoint because writes that have been verified since the snapshot
// has been taken should continue to be verified. Basically, on
// rollback I think we need to intersect the savepoint with the
// current set of in-flight writes.
type savepointToken struct {
// seqNum is currently the only field that helps to "restore"
// anything upon a rollback. When used, it does not change anything
// in the TCS; instead it simply configures the txn to ignore all
// seqnums from this value until the most recent seqnum emitted by
// the TCS.
// savepoint captures the state in the TxnCoordSender necessary to restore that
// state upon a savepoint rollback.
type savepoint struct {
// seqNum represents the write seq num at the time the savepoint was created.
// On rollback, it configures the txn to ignore all seqnums from this value
// until the most recent seqnum.
seqNum enginepb.TxnSeq

// txnSpanRefresher fields
refreshSpans []roachpb.Span
refreshInvalid bool
refreshSpanBytes int64

// txnPipeliner fields
ifWrites *btree.BTree // can be nil if no reads were tracked
ifBytes int64

// txnID is used to verify that a rollback is not used to paper
// over a txn abort error.
txnID uuid.UUID
Expand All @@ -61,10 +51,10 @@ type savepointToken struct {
epoch enginepb.TxnEpoch
}

var _ client.SavepointToken = (*savepointToken)(nil)
var _ client.SavepointToken = (*savepoint)(nil)

// SavepointToken implements the client.SavepointToken interface.
func (s *savepointToken) SavepointToken() {}
func (s *savepoint) SavepointToken() {}

// CreateSavepoint is part of the client.TxnSender interface.
func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.SavepointToken, error) {
Expand All @@ -83,11 +73,15 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.Savepoint
return nil, ErrSavepointOperationInErrorTxn
}

return &savepointToken{
txnID: tc.mu.txn.ID,
epoch: tc.mu.txn.Epoch,
seqNum: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
}, nil
s := &savepoint{
txnID: tc.mu.txn.ID,
epoch: tc.mu.txn.Epoch,
}
for _, reqInt := range tc.interceptorStack {
reqInt.createSavepoint(nil, s)
}

return s, nil
}

// RollbackToSavepoint is part of the client.TxnSender interface.
Expand All @@ -108,9 +102,15 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s client.Save
return err
}

// TODO(knz): handle recoverable errors.
if tc.mu.txnState == txnError {
return unimplemented.New("rollback_error", "savepoint rollback after error")
if tc.mu.txnState == txnFinalized {
return unimplemented.New("rollback_error", "savepoint rollback finalized txn")
}

// Restore the transaction's state, in case we're rewiding after an error.
tc.mu.txnState = txnPending

for _, reqInt := range tc.interceptorStack {
reqInt.rollbackToSavepoint(ctx, st)
}

if st.seqNum == tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
Expand Down Expand Up @@ -163,17 +163,17 @@ func (tc *TxnCoordSender) assertNotFinalized() error {

func (tc *TxnCoordSender) checkSavepointLocked(
s client.SavepointToken, opName string,
) (*savepointToken, error) {
st, ok := s.(*savepointToken)
) (*savepoint, error) {
st, ok := s.(*savepoint)
if !ok {
return nil, errors.AssertionFailedf("expected savepointToken, got %T", s)
}

if st.txnID != tc.mu.txn.ID {
if st.seqNum > 0 && st.txnID != tc.mu.txn.ID {
return nil, errors.Newf("cannot %s savepoint across transaction retries", opName)
}

if st.epoch != tc.mu.txn.Epoch {
if st.seqNum > 0 && st.epoch != tc.mu.txn.Epoch {
return nil, errors.Newf("cannot %s savepoint across transaction retries", opName)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,12 @@ func (*txnCommitter) importLeafFinalState(*roachpb.LeafTxnFinalState) {}
// epochBumpedLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) epochBumpedLocked() {}

// createSavepoint is part of the txnReqInterceptor interface.
func (*txnCommitter) createSavepoint(context.Context, *savepoint) {}

// rollbackToSavepoint is part of the txnReqInterceptor interface.
func (*txnCommitter) rollbackToSavepoint(context.Context, *savepoint) {}

// closeLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) closeLocked() {}

Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ func (*txnHeartbeater) importLeafFinalState(*roachpb.LeafTxnFinalState) {}
// epochBumpedLocked is part of the txnInterceptor interface.
func (h *txnHeartbeater) epochBumpedLocked() {}

// createSavepoint is part of the txnReqInterceptor interface.
func (*txnHeartbeater) createSavepoint(context.Context, *savepoint) {}

// rollbackToSavepoint is part of the txnReqInterceptor interface.
func (*txnHeartbeater) rollbackToSavepoint(context.Context, *savepoint) {}

// closeLocked is part of the txnInterceptor interface.
func (h *txnHeartbeater) closeLocked() {
h.cancelHeartbeatLoopLocked()
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/txn_interceptor_metric_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func (*txnMetricRecorder) importLeafFinalState(*roachpb.LeafTxnFinalState) {}
// epochBumpedLocked is part of the txnInterceptor interface.
func (*txnMetricRecorder) epochBumpedLocked() {}

// createSavepoint is part of the txnReqInterceptor interface.
func (*txnMetricRecorder) createSavepoint(context.Context, *savepoint) {}

// rollbackToSavepoint is part of the txnReqInterceptor interface.
func (*txnMetricRecorder) rollbackToSavepoint(context.Context, *savepoint) {}

// closeLocked is part of the txnInterceptor interface.
func (m *txnMetricRecorder) closeLocked() {
if m.onePCCommit {
Expand Down
Loading

0 comments on commit 7b4a5e3

Please sign in to comment.