From 7b4a5e31fc6640d8c28d046ccf91e67eec39fe0e Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Sun, 1 Mar 2020 21:40:15 -0500 Subject: [PATCH] sql,kv: add SQL savepoints support This patch adds support for SAVEPOINT , RELEASE SAVEPOINT , ROLLBACK TO SAVEPOINT . Before this patch, we only had support for the special savepoint cockroach_restart, which had to be placed at the beginning of the transaction and was specifically intended for dealing with transaction retries. This patch implements general support for savepoints, which provide an error recovery mechanism. The connExecutor now maintains a stack of savepoints. Rolling back to a savepoint uses the recent KV api for ignoring a range of write sequence numbers. At the SQL level, savepoints differ in two characteristics: 1) savepoints placed at the beginning of a transaction (i.e. before any KV operations) are marked as "initial". Rolling back to an initial savepoint is always possible. Rolling back to a non-initial savepoint is not possible after the transaction restarts (see below). 2) the savepoint named "cockroach_restart" retains special RELEASE semantics: releasing it (attempts to) commit the underlying KV txn. This continues to allow for descovering of deferred serilizability errors (i.e. write timestamp pushes by the ts cache). As before, this RELEASE can fail with a retriable error, at which point the client can do ROLLBACK TO SAVEPOINT cockroach_restart (which is guaranteed to work because cockroach_restart needs to be an "initial" savepoint). The transaction continues to maintain all its locks after such an error. This is all in contrast to releasing any other savepoints, which cannot commit the txn and also never fails. See below for more discussion. The cockroach_restart savepoint is only special in its release behavior, not in its rollback behavior. With the implementation of savepoints, the state machine driving a SQL connection's transactions becomes a lot simpler. There's no longer a distinction between an "Aborted" transaction and one that's in "RestartWait". Rolling back to a savepoint now works the same way across the two states, so RestartWait is gone. This patch also improves the KV savepoints. They now capture and restore the state of the read spans and the in-flight writes. Some things don't work (yet): a) Rolling back to a savepoint created after a schema change will error out. This is because we don't yet snapshot the transaction's schema change state. b) After a retriable error, you can only rollback to an initial savepoint. Attempting to rollback to a non-initial savepoint generates a retriable error again. If the trasaction has been aborted, I think this is the right behavior; no recovery is possible since the transaction has lost its write intents. In the usual case where the transaction has not been aborted, I think we want something better but it will take more work to get it. I think the behavior we want is the following: - after a serializability failure, retrying just part of the transaction should be doable by attempting a ROLLBACK TO SAVEPOINT. This rollback should succeed if all the non-rolled-back reads can be refreshed to the desired timestamp. If they can be refreshed, then the client can simply retry the rolled back part of the transaction. If they can't, then the ROLLBACK should return a retriable error again, allowing the client to attempt a deeper rollback - and so on until the client rolls back to an initial savepoint (which succeeds by definition). Implementing this would allow for the following nifty pattern: func fn_n() { for { SAVEPOINT savepoint_n try { fn_n+1() } catch retriable error { err := ROLLBACK TO SAVEPOINT outer if err != nil { throw err } continue } RELEASE SAVEPOINT savepoint_n break } } The idea here is that the client is trying to re-do as little work as possible by successively rolling back to earlier and earlier savepoints. This pattern will technically work with the current patch already, except it will not actually help the client in any way since all the rollbacks will fail until we get to the very first savepoint. There's an argument to be made for making RELEASE SAVEPOINT check for deferred serializability violations (and perhaps other deferred checks - like deferred constraint validation), although Postgres doesn't do any of these. Anyway, I've left implementing this for a future patch because I want to do some KV work for supporting it nicely. Currently, the automatic restart behavior that KV transactions have is a pain in the ass since it works against what we're trying to do. For the time-being, non-initial savepoints remember their txn ID and epoch and attempting to rollback to them after these changes produces a retriable error automatically. Release note (sql change): SQL savepoints are now supported. SAVEPOINT , RELEASE SAVEPOINT , ROLLBACK TO SAVEPOINT now works. `SHOW SAVEPOINT STATUS` can be used to inspect the current stack of active savepoints. Co-authored-by: Raphael 'kena' Poss Co-authored-by: Andrei Matei --- pkg/cli/sql.go | 2 - pkg/kv/testdata/savepoints | 107 ++-- pkg/kv/txn_coord_sender.go | 10 + pkg/kv/txn_coord_sender_savepoints.go | 80 +-- pkg/kv/txn_interceptor_committer.go | 6 + pkg/kv/txn_interceptor_heartbeater.go | 6 + pkg/kv/txn_interceptor_metric_recorder.go | 6 + pkg/kv/txn_interceptor_pipeliner.go | 85 ++- pkg/kv/txn_interceptor_pipeliner_test.go | 106 ++++ pkg/kv/txn_interceptor_seq_num_allocator.go | 11 + pkg/kv/txn_interceptor_span_refresher.go | 16 + pkg/kv/txn_interceptor_span_refresher_test.go | 65 +++ pkg/sql/conn_executor.go | 32 +- pkg/sql/conn_executor_exec.go | 123 ++-- pkg/sql/conn_executor_savepoints.go | 397 ++++++++----- pkg/sql/conn_executor_savepoints_test.go | 194 +++++++ pkg/sql/conn_executor_test.go | 6 +- pkg/sql/conn_fsm.go | 150 ++--- .../testdata/logic_test/manual_retry | 66 ++- pkg/sql/logictest/testdata/logic_test/txn | 152 +---- pkg/sql/metric_test.go | 11 +- pkg/sql/opt/exec/execbuilder/builder.go | 5 + pkg/sql/opt/exec/execbuilder/relational.go | 14 +- pkg/sql/parser/help_test.go | 1 + pkg/sql/parser/parse_test.go | 2 + pkg/sql/parser/sql.y | 13 + pkg/sql/plan.go | 3 + pkg/sql/plan_opt.go | 6 +- pkg/sql/sem/tree/show.go | 9 + pkg/sql/sem/tree/stmt.go | 9 + pkg/sql/sqlbase/errors.go | 3 +- pkg/sql/testdata/savepoints | 539 ++++++++++++++++++ pkg/sql/txn_restart_test.go | 17 +- pkg/sql/txn_state.go | 9 +- pkg/sql/txn_state_test.go | 78 +-- pkg/sql/txnstatetransitions_diagram.gv | 13 +- pkg/sql/txnstatetransitions_report.txt | 27 +- 37 files changed, 1709 insertions(+), 670 deletions(-) create mode 100644 pkg/sql/conn_executor_savepoints_test.go create mode 100644 pkg/sql/testdata/savepoints diff --git a/pkg/cli/sql.go b/pkg/cli/sql.go index 5224dae435a1..5bb8b5cb7f17 100644 --- a/pkg/cli/sql.go +++ b/pkg/cli/sql.go @@ -746,8 +746,6 @@ func (c *cliState) refreshTransactionStatus() { c.lastKnownTxnStatus = " ERROR" case sql.CommitWaitStateStr: c.lastKnownTxnStatus = " DONE" - case sql.RestartWaitStateStr: - c.lastKnownTxnStatus = " RETRY" case sql.OpenStateStr: // The state AutoRetry is reported by the server as Open, so no need to // handle it here. diff --git a/pkg/kv/testdata/savepoints b/pkg/kv/testdata/savepoints index c973d48ae72f..4c3cd69b1769 100644 --- a/pkg/kv/testdata/savepoints +++ b/pkg/kv/testdata/savepoints @@ -319,56 +319,57 @@ commit subtest end -subtest rollback_across_retry - -# TODO(knz): change this test when rolling back across retries becomes -# supported. - -begin ----- -0 - -savepoint x ----- -0 - -retry ----- -synthetic error: TransactionRetryWithProtoRefreshError: forced retry -epoch: 0 -> 1 - -release x ----- -(*withstack.withStack) cannot release savepoint across transaction retries - -rollback x ----- -(*withstack.withStack) cannot rollback savepoint across transaction retries - -subtest end - -subtest rollback_across_abort - -begin ----- -0 - -savepoint x ----- -0 - -abort ----- -(*roachpb.TransactionRetryWithProtoRefreshError) -txn id changed - -release x ----- -(*withstack.withStack) cannot release savepoint across transaction retries - -rollback x ----- -(*withstack.withStack) cannot rollback savepoint across transaction retries - - -subtest end +# !!! +# subtest rollback_across_retry +# +# # TODO(knz): change this test when rolling back across retries becomes +# # supported. +# +# begin +# ---- +# 0 +# +# savepoint x +# ---- +# 0 +# +# retry +# ---- +# synthetic error: TransactionRetryWithProtoRefreshError: forced retry +# epoch: 0 -> 1 +# +# release x +# ---- +# (*withstack.withStack) cannot release savepoint across transaction retries +# +# rollback x +# ---- +# (*withstack.withStack) cannot rollback savepoint across transaction retries +# +# subtest end +# +# subtest rollback_across_abort +# +# begin +# ---- +# 0 +# +# savepoint x +# ---- +# 0 +# +# abort +# ---- +# (*roachpb.TransactionRetryWithProtoRefreshError) +# txn id changed +# +# release x +# ---- +# (*withstack.withStack) cannot release savepoint across transaction retries +# +# rollback x +# ---- +# (*withstack.withStack) cannot rollback savepoint across transaction retries +# +# +# subtest end diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index dd655e1354e5..a6b816d2d74f 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -176,6 +176,16 @@ type txnInterceptor interface { // increment. epochBumpedLocked() + // createSavepoint is used to populate a savepoint with all the state the + // needs to be restored on a rollback. + createSavepoint(context.Context, *savepoint) + + // rollbackToSavepoint is used to restore the state previously saved by createSavepoint(). + // implementations are allowed to modify the savepoint if they want to, optimizing it for future + // rollbacks. For example, the txnPipeliner removes tracked writes from the savepoint which have + // already been verified since the savepoint has been created. + rollbackToSavepoint(context.Context, *savepoint) + // closeLocked closes the interceptor. It is called when the TxnCoordSender // shuts down due to either a txn commit or a txn abort. The method will // be called exactly once from cleanupTxnLocked. diff --git a/pkg/kv/txn_coord_sender_savepoints.go b/pkg/kv/txn_coord_sender_savepoints.go index f83ea41b92f5..a1fc57f77c6f 100644 --- a/pkg/kv/txn_coord_sender_savepoints.go +++ b/pkg/kv/txn_coord_sender_savepoints.go @@ -19,36 +19,26 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/google/btree" ) -// savepointToken captures the state in the TxnCoordSender necessary -// to restore that state upon a savepoint rollback. -// -// TODO(knz,andrei): Currently this definition is only sufficient for -// just a few cases of rollbacks. This should be extended to cover -// more ground: -// -// - We also need the current size of txnSpanRefresher.refreshSpans the -// list of tracked reads, such that upon rollback we tell the -// refresher interceptor to discard further reads. -// - We also need something about in-flight writes -// (txnPipeliner.ifWrites). There I guess we need to take some sort of -// snapshot of the current in-flight writes and, on rollback, discard -// in-flight writes that are not part of the savepoint. But, on -// rollback, I don't think we should (nor am I sure that we could) -// simply overwrite the set of in-flight writes with the ones from the -// savepoint because writes that have been verified since the snapshot -// has been taken should continue to be verified. Basically, on -// rollback I think we need to intersect the savepoint with the -// current set of in-flight writes. -type savepointToken struct { - // seqNum is currently the only field that helps to "restore" - // anything upon a rollback. When used, it does not change anything - // in the TCS; instead it simply configures the txn to ignore all - // seqnums from this value until the most recent seqnum emitted by - // the TCS. +// savepoint captures the state in the TxnCoordSender necessary to restore that +// state upon a savepoint rollback. +type savepoint struct { + // seqNum represents the write seq num at the time the savepoint was created. + // On rollback, it configures the txn to ignore all seqnums from this value + // until the most recent seqnum. seqNum enginepb.TxnSeq + // txnSpanRefresher fields + refreshSpans []roachpb.Span + refreshInvalid bool + refreshSpanBytes int64 + + // txnPipeliner fields + ifWrites *btree.BTree // can be nil if no reads were tracked + ifBytes int64 + // txnID is used to verify that a rollback is not used to paper // over a txn abort error. txnID uuid.UUID @@ -61,10 +51,10 @@ type savepointToken struct { epoch enginepb.TxnEpoch } -var _ client.SavepointToken = (*savepointToken)(nil) +var _ client.SavepointToken = (*savepoint)(nil) // SavepointToken implements the client.SavepointToken interface. -func (s *savepointToken) SavepointToken() {} +func (s *savepoint) SavepointToken() {} // CreateSavepoint is part of the client.TxnSender interface. func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.SavepointToken, error) { @@ -83,11 +73,15 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.Savepoint return nil, ErrSavepointOperationInErrorTxn } - return &savepointToken{ - txnID: tc.mu.txn.ID, - epoch: tc.mu.txn.Epoch, - seqNum: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, - }, nil + s := &savepoint{ + txnID: tc.mu.txn.ID, + epoch: tc.mu.txn.Epoch, + } + for _, reqInt := range tc.interceptorStack { + reqInt.createSavepoint(nil, s) + } + + return s, nil } // RollbackToSavepoint is part of the client.TxnSender interface. @@ -108,9 +102,15 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s client.Save return err } - // TODO(knz): handle recoverable errors. - if tc.mu.txnState == txnError { - return unimplemented.New("rollback_error", "savepoint rollback after error") + if tc.mu.txnState == txnFinalized { + return unimplemented.New("rollback_error", "savepoint rollback finalized txn") + } + + // Restore the transaction's state, in case we're rewiding after an error. + tc.mu.txnState = txnPending + + for _, reqInt := range tc.interceptorStack { + reqInt.rollbackToSavepoint(ctx, st) } if st.seqNum == tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { @@ -163,17 +163,17 @@ func (tc *TxnCoordSender) assertNotFinalized() error { func (tc *TxnCoordSender) checkSavepointLocked( s client.SavepointToken, opName string, -) (*savepointToken, error) { - st, ok := s.(*savepointToken) +) (*savepoint, error) { + st, ok := s.(*savepoint) if !ok { return nil, errors.AssertionFailedf("expected savepointToken, got %T", s) } - if st.txnID != tc.mu.txn.ID { + if st.seqNum > 0 && st.txnID != tc.mu.txn.ID { return nil, errors.Newf("cannot %s savepoint across transaction retries", opName) } - if st.epoch != tc.mu.txn.Epoch { + if st.seqNum > 0 && st.epoch != tc.mu.txn.Epoch { return nil, errors.Newf("cannot %s savepoint across transaction retries", opName) } diff --git a/pkg/kv/txn_interceptor_committer.go b/pkg/kv/txn_interceptor_committer.go index 95f5efd9f1b8..cb4867212684 100644 --- a/pkg/kv/txn_interceptor_committer.go +++ b/pkg/kv/txn_interceptor_committer.go @@ -465,6 +465,12 @@ func (*txnCommitter) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked implements the txnReqInterceptor interface. func (tc *txnCommitter) epochBumpedLocked() {} +// createSavepoint is part of the txnReqInterceptor interface. +func (*txnCommitter) createSavepoint(context.Context, *savepoint) {} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (*txnCommitter) rollbackToSavepoint(context.Context, *savepoint) {} + // closeLocked implements the txnReqInterceptor interface. func (tc *txnCommitter) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_heartbeater.go b/pkg/kv/txn_interceptor_heartbeater.go index 8754bc5655e3..2a6bff581a66 100644 --- a/pkg/kv/txn_interceptor_heartbeater.go +++ b/pkg/kv/txn_interceptor_heartbeater.go @@ -190,6 +190,12 @@ func (*txnHeartbeater) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked is part of the txnInterceptor interface. func (h *txnHeartbeater) epochBumpedLocked() {} +// createSavepoint is part of the txnReqInterceptor interface. +func (*txnHeartbeater) createSavepoint(context.Context, *savepoint) {} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (*txnHeartbeater) rollbackToSavepoint(context.Context, *savepoint) {} + // closeLocked is part of the txnInterceptor interface. func (h *txnHeartbeater) closeLocked() { h.cancelHeartbeatLoopLocked() diff --git a/pkg/kv/txn_interceptor_metric_recorder.go b/pkg/kv/txn_interceptor_metric_recorder.go index 28c555e975a9..f792c91e9538 100644 --- a/pkg/kv/txn_interceptor_metric_recorder.go +++ b/pkg/kv/txn_interceptor_metric_recorder.go @@ -75,6 +75,12 @@ func (*txnMetricRecorder) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked is part of the txnInterceptor interface. func (*txnMetricRecorder) epochBumpedLocked() {} +// createSavepoint is part of the txnReqInterceptor interface. +func (*txnMetricRecorder) createSavepoint(context.Context, *savepoint) {} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (*txnMetricRecorder) rollbackToSavepoint(context.Context, *savepoint) {} + // closeLocked is part of the txnInterceptor interface. func (m *txnMetricRecorder) closeLocked() { if m.onePCCommit { diff --git a/pkg/kv/txn_interceptor_pipeliner.go b/pkg/kv/txn_interceptor_pipeliner.go index bd21f1f9eaef..e404f1b87bca 100644 --- a/pkg/kv/txn_interceptor_pipeliner.go +++ b/pkg/kv/txn_interceptor_pipeliner.go @@ -601,6 +601,64 @@ func (tp *txnPipeliner) epochBumpedLocked() { } } +// createSavepoint is part of the txnReqInterceptor interface. +func (tp *txnPipeliner) createSavepoint(ctx context.Context, s *savepoint) { + if tp.ifWrites.len() > 0 { + s.ifWrites = tp.ifWrites.t.Clone() + tp.ifWrites.cloned = true + } + s.ifBytes = tp.ifWrites.bytes +} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (tp *txnPipeliner) rollbackToSavepoint(ctx context.Context, s *savepoint) { + // Intersect the inflight writes from the savepoint to the ones from the + // txnPipeliner to avoid re-installing inflight writes that have been verified + // in the meantime. + if s.ifWrites != nil { + var alreadyVerified []btree.Item + s.ifWrites.Ascend(func(i btree.Item) bool { + // If tp is not currently tracking this write as inflight, then we'll + // delete it from the savepoint. + // Note that we're slightly pessimistic here: we might have verified the + // write captured by the savepoint and then started tracking a write at a + // higher seqnum for the same key. We don't keep track of what sequence + // numbers we've verified and which we haven't, so we're going to assume + // that the savepoint's write has not been verified. + // Note that having verified a write at seq num 2 recursively implies that + // we had verified a write at seq num 1. + if tp.ifWrites.t.Get(i) == nil { + alreadyVerified = append(alreadyVerified, i) + } + return true + }) + // TODO(andrei): Can I delete directly during the iteration above? + for _, i := range alreadyVerified { + s.ifWrites.Delete(i) + s.ifBytes -= keySize(i.(*inFlightWrite).Key) + } + } + + // Move all the writes in txnPipeliner that are not in the savepoint to the + // write footprint. We no longer care if these write succeed or fail, so we're + // going to stop tracking these as in-flight writes. The respective + // intents still need to be cleaned up at the end of the transaction. + tp.ifWrites.ascend(func(w *inFlightWrite) { + if s.ifWrites == nil || s.ifWrites.Get(w) == nil { + tp.footprint.insert(roachpb.Span{Key: w.Key}) + } + }) + tp.footprint.mergeAndSort() + + // Restore the inflightWrites from the savepoint. + if s.ifWrites == nil { + tp.ifWrites.t = nil + } else { + tp.ifWrites.t = s.ifWrites.Clone() + } + tp.ifWrites.bytes = s.ifBytes +} + // closeLocked implements the txnReqInterceptor interface. func (tp *txnPipeliner) closeLocked() {} @@ -633,6 +691,9 @@ type inFlightWriteSet struct { // Avoids allocs. tmp1, tmp2 inFlightWrite alloc inFlightWriteAlloc + // cloned is set if the BTree has been cloned. If it has, then the elements + // (*inFlightWrite's) might be shared, and so their memory cannot be reused. + cloned bool } // insert attempts to insert an in-flight write that has not been proven to have @@ -644,15 +705,19 @@ func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) { s.t = btree.New(txnPipelinerBtreeDegree) } - s.tmp1.Key = key - item := s.t.Get(&s.tmp1) - if item != nil { - otherW := item.(*inFlightWrite) - if seq > otherW.Sequence { - // Existing in-flight write has old information. - otherW.Sequence = seq + // If the tree has not been cloned before, we can attempt a fast path where we + // update an existing element. + if !s.cloned { + s.tmp1.Key = key + item := s.t.Get(&s.tmp1) + if item != nil { + otherW := item.(*inFlightWrite) + if seq > otherW.Sequence { + // Existing in-flight write has old information. + otherW.Sequence = seq + } + return } - return } w := s.alloc.alloc(key, seq) @@ -687,7 +752,7 @@ func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) { // Delete the write from the in-flight writes set. delItem := s.t.Delete(item) - if delItem != nil { + if delItem != nil && !s.cloned { *delItem.(*inFlightWrite) = inFlightWrite{} // for GC } s.bytes -= keySize(key) @@ -749,7 +814,7 @@ func (s *inFlightWriteSet) byteSize() int64 { } // clear purges all elements from the in-flight write set and frees associated -// memory. The reuse flag indicates whether the caller is intending to reu-use +// memory. The reuse flag indicates whether the caller is intending to reuse // the set or not. func (s *inFlightWriteSet) clear(reuse bool) { if s.t == nil { diff --git a/pkg/kv/txn_interceptor_pipeliner_test.go b/pkg/kv/txn_interceptor_pipeliner_test.go index 967fd7394f9d..4b78e80317df 100644 --- a/pkg/kv/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/txn_interceptor_pipeliner_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/google/btree" "github.com/stretchr/testify/require" ) @@ -1226,3 +1227,108 @@ func TestTxnPipelinerRecordsWritesOnFailure(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) require.Len(t, tp.footprint.asSlice(), 2) } + +func TestTxnPipelinerSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner() + + tp.ifWrites.insert(roachpb.Key("a"), 10) + tp.ifWrites.insert(roachpb.Key("b"), 11) + tp.ifWrites.insert(roachpb.Key("c"), 12) + require.Equal(t, 3, tp.ifWrites.len()) + + s := &savepoint{} + tp.createSavepoint(ctx, s) + + // Some more write after the savepoint. One of them is on key "c" that is part + // of the savepoint too, so we'll check that the savepoint still retains the + // earlier seq num. + tp.ifWrites.insert(roachpb.Key("c"), 13) + tp.ifWrites.insert(roachpb.Key("d"), 14) + require.Empty(t, tp.footprint.asSlice()) + + // Check that the savepoint has the correct writes. + { + var savepointWrites []inFlightWrite + s.ifWrites.Ascend(func(i btree.Item) bool { + savepointWrites = append(savepointWrites, *(i.(*inFlightWrite))) + return true + }) + require.Equal(t, + []inFlightWrite{ + inFlightWrite{roachpb.SequencedWrite{roachpb.Key("a"), 10}}, + inFlightWrite{roachpb.SequencedWrite{roachpb.Key("b"), 11}}, + inFlightWrite{roachpb.SequencedWrite{roachpb.Key("c"), 12}}, + }, + savepointWrites) + } + + // Now verify one of the writes. When we'll rollback to the savepoint below, + // we'll check that the verified write stayed verified. + txn := makeTxnProto() + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a")}}) + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 2) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &roachpb.GetRequest{}, ba.Requests[1].GetInner()) + + qiReq := ba.Requests[0].GetInner().(*roachpb.QueryIntentRequest) + require.Equal(t, roachpb.Key("a"), qiReq.Key) + require.Equal(t, enginepb.TxnSeq(10), qiReq.Txn.Sequence) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetQueryIntent().FoundIntent = true + return br, nil + }) + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, []roachpb.Span{{Key: roachpb.Key("a")}}, tp.footprint.asSlice()) + require.Equal(t, 3, tp.ifWrites.len()) // We've verified one out of 4 writes. + + // Now restore the savepoint and check that the in-flight write state has been restored + // and all rolled-back writes were moved to the write footprint. + bytesBeforeRollback := s.ifBytes + tp.rollbackToSavepoint(ctx, s) + + // Check that the savepoint itself was updated by the rollback. The key that + // had been verified ("a") should have been taken out of the savepoint. + expectedIfWrites := []inFlightWrite{ + {roachpb.SequencedWrite{roachpb.Key("b"), 11}}, + {roachpb.SequencedWrite{roachpb.Key("c"), 12}}, + } + { + var savepointWrites []inFlightWrite + s.ifWrites.Ascend(func(i btree.Item) bool { + savepointWrites = append(savepointWrites, *(i.(*inFlightWrite))) + return true + }) + require.Equal(t, expectedIfWrites, savepointWrites) + // Check that the size of the savepoint was decremented too. + require.Less(t, s.ifBytes, bytesBeforeRollback) + } + + // Check that the tracked inflight writes were updated correctly. + { + var ifWrites []inFlightWrite + tp.ifWrites.ascend(func(w *inFlightWrite) { + ifWrites = append(ifWrites, *w) + }) + require.Equal(t, expectedIfWrites, ifWrites) + } + + // Check that the footprint was updated correctly. In addition to the "a" which it had before, + // it will also have "d" because it's not part of the savepoint. + require.Equal(t, + []roachpb.Span{ + {Key: roachpb.Key("a")}, + {Key: roachpb.Key("d")}, + }, + tp.footprint.asSlice()) + +} diff --git a/pkg/kv/txn_interceptor_seq_num_allocator.go b/pkg/kv/txn_interceptor_seq_num_allocator.go index c21621229c6a..f4cad606c68b 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator.go @@ -175,5 +175,16 @@ func (s *txnSeqNumAllocator) epochBumpedLocked() { s.readSeq = 0 } +// createSavepoint is part of the txnReqInterceptor interface. +func (s *txnSeqNumAllocator) createSavepoint(ctx context.Context, sp *savepoint) { + sp.seqNum = s.writeSeq +} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (*txnSeqNumAllocator) rollbackToSavepoint(context.Context, *savepoint) { + // Nothing to restore. The seq nums keep increasing. The TxnCoordSender has + // added a range of sequence numbers to the ignored list. +} + // closeLocked is part of the txnInterceptor interface. func (*txnSeqNumAllocator) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_span_refresher.go b/pkg/kv/txn_interceptor_span_refresher.go index d3f43349b1c3..40e7fac5a7eb 100644 --- a/pkg/kv/txn_interceptor_span_refresher.go +++ b/pkg/kv/txn_interceptor_span_refresher.go @@ -451,5 +451,21 @@ func (sr *txnSpanRefresher) epochBumpedLocked() { sr.refreshedTimestamp.Reset() } +// createSavepoint is part of the txnReqInterceptor interface. +func (sr *txnSpanRefresher) createSavepoint(ctx context.Context, s *savepoint) { + s.refreshSpans = make([]roachpb.Span, len(sr.refreshSpans)) + copy(s.refreshSpans, sr.refreshSpans) + s.refreshInvalid = sr.refreshInvalid + s.refreshSpanBytes = sr.refreshSpansBytes +} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (sr *txnSpanRefresher) rollbackToSavepoint(ctx context.Context, s *savepoint) { + sr.refreshSpans = make([]roachpb.Span, len(s.refreshSpans)) + copy(sr.refreshSpans, s.refreshSpans) + sr.refreshInvalid = s.refreshInvalid + sr.refreshSpansBytes = s.refreshSpanBytes +} + // closeLocked implements the txnInterceptor interface. func (*txnSpanRefresher) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_span_refresher_test.go b/pkg/kv/txn_interceptor_span_refresher_test.go index 732d81e10efd..b00db0e8463c 100644 --- a/pkg/kv/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/txn_interceptor_span_refresher_test.go @@ -555,3 +555,68 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) } + +func TestTxnSpanRefresherSavepoint(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tsr, mockSender := makeMockTxnSpanRefresher() + + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + txn := makeTxnProto() + + read := func(key roachpb.Key) { + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + getArgs := roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: key}} + ba.Add(&getArgs) + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 1) + require.IsType(t, &roachpb.GetRequest{}, ba.Requests[0].GetInner()) + + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + } + read(keyA) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + + s := &savepoint{} + tsr.createSavepoint(ctx, s) + + // Another read after the savepoint was created. + read(keyB) + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshSpans) + + require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans) + require.Less(t, s.refreshSpanBytes, tsr.refreshSpansBytes) + require.False(t, s.refreshInvalid) + + // Rollback the savepoint and check that refresh spans were overwritten. + tsr.rollbackToSavepoint(ctx, s) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + + // Set MaxTxnRefreshSpansBytes limit low and then exceed it. + MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 1) + read(keyB) + require.True(t, tsr.refreshInvalid) + + // Check that rolling back to the savepoint resets refreshInvalid. + tsr.rollbackToSavepoint(ctx, s) + require.Equal(t, tsr.refreshSpansBytes, s.refreshSpanBytes) + require.False(t, tsr.refreshInvalid) + + // Exceed the limit again and then create a savepoint. + read(keyB) + require.True(t, tsr.refreshInvalid) + tsr.createSavepoint(ctx, s) + require.True(t, s.refreshInvalid) + require.Empty(t, s.refreshSpans) + // Rollback to the savepoint check that refreshes are still invalid. + tsr.rollbackToSavepoint(ctx, s) + require.Empty(t, tsr.refreshSpans) + require.True(t, tsr.refreshInvalid) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ebe28da17169..ff15d0542c85 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -980,6 +980,10 @@ type connExecutor struct { // stateOpen. autoRetryCounter int + // numDDL keeps track of how many DDL statements have been + // executed so far. + numDDL int + // txnRewindPos is the position within stmtBuf to which we'll rewind when // performing automatic retries. This is more or less the position where the // current transaction started. @@ -1013,6 +1017,13 @@ type connExecutor struct { // committed or aborted). It is set when txn is started but can remain // unset when txn is executed within another higher-level txn. onTxnFinish func(txnEvent) + + // savepoints maintains the stack of savepoints currently open. + savepoints savepointStack + // savepointsAtTxnRewindPos is a snapshot of the savepoints stack before + // processing the command at position txnRewindPos. When rewinding, we're + // going to restore this snapshot. + savepointsAtTxnRewindPos savepointStack } // sessionData contains the user-configurable connection variables. @@ -1191,12 +1202,16 @@ func (ex *connExecutor) resetExtraTxnState( switch ev { case txnCommit, txnRollback: + ex.extraTxnState.savepoints.clear() // After txn is finished, we need to call onTxnFinish (if it's non-nil). if ex.extraTxnState.onTxnFinish != nil { ex.extraTxnState.onTxnFinish(ev) ex.extraTxnState.onTxnFinish = nil } } + // NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a + // a ROLLBACK TO SAVEPOINT that generated the event, and that statement deals with the + // savepoints, or it's a rewind which also deals with them. return nil } @@ -1558,6 +1573,7 @@ func (ex *connExecutor) execCmd(ctx context.Context) error { } case rewind: ex.rewindPrepStmtNamespace(ctx) + ex.extraTxnState.savepoints = ex.extraTxnState.savepointsAtTxnRewindPos advInfo.rewCap.rewindAndUnlock(ctx) case stayInPlace: // Nothing to do. The same statement will be executed again. @@ -1684,6 +1700,7 @@ func (ex *connExecutor) setTxnRewindPos(ctx context.Context, pos CmdPos) { ex.extraTxnState.txnRewindPos = pos ex.stmtBuf.ltrim(ctx, pos) ex.commitPrepStmtNamespace(ctx) + ex.extraTxnState.savepointsAtTxnRewindPos = ex.extraTxnState.savepoints.clone() } // stmtDoesntNeedRetry returns true if the given statement does not need to be @@ -1700,8 +1717,6 @@ func stateToTxnStatusIndicator(s fsm.State) TransactionStatusIndicator { return InTxnBlock case stateAborted: return InFailedTxnBlock - case stateRestartWait: - return InTxnBlock case stateNoTxn: return IdleTxnBlock case stateCommitWait: @@ -1869,10 +1884,6 @@ func errIsRetriable(err error) bool { func (ex *connExecutor) makeErrEvent(err error, stmt tree.Statement) (fsm.Event, fsm.EventPayload) { retriable := errIsRetriable(err) if retriable { - if _, inOpen := ex.machine.CurState().(stateOpen); !inOpen { - panic(fmt.Sprintf("retriable error in unexpected state: %#v", - ex.machine.CurState())) - } rc, canAutoRetry := ex.getRewindTxnCapability() ev := eventRetriableErr{ IsCommit: fsm.FromBool(isCommit(stmt)), @@ -2472,22 +2483,19 @@ func (sc *StatementCounters) incrementCount(ex *connExecutor, stmt tree.Statemen case *tree.RollbackTransaction: sc.TxnRollbackCount.Inc() case *tree.Savepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Name); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Name) { sc.RestartSavepointCount.Inc() } else { sc.SavepointCount.Inc() } case *tree.ReleaseSavepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Savepoint); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Savepoint) { sc.ReleaseRestartSavepointCount.Inc() } else { sc.ReleaseSavepointCount.Inc() } case *tree.RollbackToSavepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Savepoint); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Savepoint) { sc.RollbackToRestartSavepointCount.Inc() } else { sc.RollbackToSavepointCount.Inc() diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 099728ddca77..722579efef3b 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -97,7 +97,7 @@ func (ex *connExecutor) execStmt( case eventNonRetriableErr: ex.recordFailure() } - case stateAborted, stateRestartWait: + case stateAborted: ev, payload = ex.execStmtInAbortedState(ctx, stmt, res) case stateCommitWait: ev, payload = ex.execStmtInCommitWaitState(stmt, res) @@ -250,10 +250,12 @@ func (ex *connExecutor) execStmtInOpenState( return ex.execSavepointInOpenState(ctx, s, res) case *tree.ReleaseSavepoint: - return ex.execReleaseSavepointInOpenState(ctx, s, res) + ev, payload := ex.execRelease(ctx, s, res) + return ev, payload, nil case *tree.RollbackToSavepoint: - return ex.execRollbackToSavepointInOpenState(ctx, s, res) + ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res) + return ev, payload, nil case *tree.Prepare: // This is handling the SQL statement "PREPARE". See execPrepare for @@ -579,25 +581,22 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error func (ex *connExecutor) commitSQLTransaction( ctx context.Context, stmt tree.Statement, ) (fsm.Event, fsm.EventPayload) { - ev, payload, _ := ex.commitSQLTransactionInternal(ctx, stmt) - return ev, payload + err := ex.commitSQLTransactionInternal(ctx, stmt) + if err != nil { + return ex.makeErrEvent(err, stmt) + } + return eventTxnFinish{}, eventTxnFinishPayload{commit: true} } -// commitSQLTransactionInternal is the part of a commit common to -// commitSQLTransaction and runReleaseRestartSavepointAsTxnCommit. func (ex *connExecutor) commitSQLTransactionInternal( ctx context.Context, stmt tree.Statement, -) (ev fsm.Event, payload fsm.EventPayload, ok bool) { - ex.clearSavepoints() - +) error { if err := ex.checkTableTwoVersionInvariant(ctx); err != nil { - ev, payload = ex.makeErrEvent(err, stmt) - return ev, payload, false + return err } if err := ex.state.mu.txn.Commit(ctx); err != nil { - ev, payload = ex.makeErrEvent(err, stmt) - return ev, payload, false + return err } // Now that we've committed, if we modified any table we need to make sure @@ -606,15 +605,12 @@ func (ex *connExecutor) commitSQLTransactionInternal( if tables := ex.extraTxnState.tables.getTablesWithNewVersion(); tables != nil { ex.extraTxnState.tables.releaseLeases(ctx) } - - return eventTxnFinish{}, eventTxnFinishPayload{commit: true}, true + return nil } // rollbackSQLTransaction executes a ROLLBACK statement: the KV transaction is // rolled-back and an event is produced. func (ex *connExecutor) rollbackSQLTransaction(ctx context.Context) (fsm.Event, fsm.EventPayload) { - ex.clearSavepoints() - if err := ex.state.mu.txn.Rollback(ctx); err != nil { log.Warningf(ctx, "txn rollback failed: %s", err) } @@ -749,6 +745,13 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro log.VEventf(ctx, 1, "optimizer plan failed: %v", err) return err } + + // TODO(knz): Remove this accounting if/when savepoint rollbacks + // support rolling back over DDL. + if planner.curPlan.flags.IsSet(planFlagIsDDL) { + ex.extraTxnState.numDDL++ + } + return nil } @@ -930,42 +933,46 @@ func (ex *connExecutor) execStmtInNoTxnState( func (ex *connExecutor) execStmtInAbortedState( ctx context.Context, stmt Statement, res RestrictedCommandResult, ) (fsm.Event, fsm.EventPayload) { - _, inRestartWait := ex.machine.CurState().(stateRestartWait) + + reject := func() (fsm.Event, fsm.EventPayload) { + ev := eventNonRetriableErr{IsCommit: fsm.False} + payload := eventNonRetriableErrPayload{ + err: sqlbase.NewTransactionAbortedError("" /* customMsg */), + } + return ev, payload + } // TODO(andrei/cuongdo): Figure out what statements to count here. switch s := stmt.AST.(type) { case *tree.CommitTransaction, *tree.RollbackTransaction: - if inRestartWait { - ev, payload := ex.rollbackSQLTransaction(ctx) - return ev, payload + if _, ok := s.(*tree.CommitTransaction); ok { + // Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too. + res.ResetStmtType((*tree.RollbackTransaction)(nil)) } - ex.rollbackSQLTransaction(ctx) - ex.clearSavepoints() - - // Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too. - res.ResetStmtType((*tree.RollbackTransaction)(nil)) - - return eventTxnFinish{}, eventTxnFinishPayload{commit: false} + return ex.rollbackSQLTransaction(ctx) case *tree.RollbackToSavepoint: - return ex.execRollbackToSavepointInAbortedState(ctx, inRestartWait, s, res) + return ex.execRollbackToSavepointInAbortedState(ctx, s) case *tree.Savepoint: - return ex.execSavepointInAbortedState(ctx, inRestartWait, s, res) + if ex.isCommitOnReleaseSavepoint(s.Name) { + // We allow SAVEPOINT cockroach_restart as an alternative to ROLLBACK TO + // SAVEPOINT cockroach_restart in the Aborted state. This is needed + // because any client driver (that we know of) which links subtransaction + // `ROLLBACK/RELEASE` to an object's lifetime will fail to `ROLLBACK` on a + // failed `RELEASE`. Instead, we now can use the creation of another + // subtransaction object (which will issue another `SAVEPOINT` statement) + // to indicate retry intent. Specifically, this change was prompted by + // subtransaction handling in `libpqxx` (C++ driver) and `rust-postgres` + // (Rust driver). + res.ResetStmtType((*tree.RollbackToSavepoint)(nil)) + return ex.execRollbackToSavepointInAbortedState( + ctx, &tree.RollbackToSavepoint{Savepoint: s.Name}) + } + return reject() default: - ev := eventNonRetriableErr{IsCommit: fsm.False} - if inRestartWait { - payload := eventNonRetriableErrPayload{ - err: sqlbase.NewTransactionAbortedError( - "Expected \"ROLLBACK TO SAVEPOINT cockroach_restart\"" /* customMsg */), - } - return ev, payload - } - payload := eventNonRetriableErrPayload{ - err: sqlbase.NewTransactionAbortedError("" /* customMsg */), - } - return ev, payload + return reject() } } @@ -986,7 +993,7 @@ func (ex *connExecutor) execStmtInCommitWaitState( // Reply to a rollback with the COMMIT tag, by analogy to what we do when we // get a COMMIT in state Aborted. res.ResetStmtType((*tree.CommitTransaction)(nil)) - return eventTxnFinish{}, eventTxnFinishPayload{commit: true} + return eventTxnFinish{}, eventTxnFinishPayload{commit: false} default: ev = eventNonRetriableErr{IsCommit: fsm.False} payload = eventNonRetriableErrPayload{ @@ -996,6 +1003,32 @@ func (ex *connExecutor) execStmtInCommitWaitState( } } +// execStmtInRollbackWaitState executes a statement in a txn that's in state +// RollbackWait. +// Everything but ROLLBACK/COMMIT is rejected. COMMIT is treated like ROLLBACK. +func (ex *connExecutor) execStmtInRollbackWaitState( + stmt Statement, res RestrictedCommandResult, +) (ev fsm.Event, payload fsm.EventPayload) { + ex.incrementStartedStmtCounter(stmt) + defer func() { + if !payloadHasError(payload) { + ex.incrementExecutedStmtCounter(stmt) + } + }() + switch stmt.AST.(type) { + case *tree.CommitTransaction, *tree.RollbackTransaction: + // Note that the KV transaction has been rolled back when we entered this state. + res.ResetStmtType((*tree.RollbackToSavepoint)(nil)) + return eventTxnFinish{}, eventTxnFinishPayload{commit: false} + default: + ev = eventNonRetriableErr{IsCommit: fsm.False} + payload = eventNonRetriableErrPayload{ + err: sqlbase.NewTransactionAbortedError("final RELEASE SAVEPOINT had failed"), + } + return ev, payload + } +} + // runObserverStatement executes the given observer statement. // // If an error is returned, the connection needs to stop processing queries. @@ -1005,6 +1038,8 @@ func (ex *connExecutor) runObserverStatement( switch sqlStmt := stmt.AST.(type) { case *tree.ShowTransactionStatus: return ex.runShowTransactionState(ctx, res) + case *tree.ShowSavepointStatus: + return ex.runShowSavepointState(ctx, res) case *tree.ShowSyntax: return ex.runShowSyntax(ctx, sqlStmt.Statement, res) case *tree.SetTracing: diff --git a/pkg/sql/conn_executor_savepoints.go b/pkg/sql/conn_executor_savepoints.go index bb0f5d57e630..322aba407377 100644 --- a/pkg/sql/conn_executor_savepoints.go +++ b/pkg/sql/conn_executor_savepoints.go @@ -14,191 +14,326 @@ import ( "context" "strings" + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/fsm" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // execSavepointInOpenState runs a SAVEPOINT statement inside an open // txn. func (ex *connExecutor) execSavepointInOpenState( ctx context.Context, s *tree.Savepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - // Ensure that the user isn't trying to run BEGIN; SAVEPOINT; SAVEPOINT; - if ex.state.activeRestartSavepointName != "" { - err := unimplemented.NewWithIssueDetail(10735, "nested", "SAVEPOINT may not be nested") - ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil +) (fsm.Event, fsm.EventPayload, error) { + env := &ex.extraTxnState.savepoints + // Sanity check for "SAVEPOINT cockroach_restart". + commitOnRelease := ex.isCommitOnReleaseSavepoint(s.Name) + if commitOnRelease { + /* SAVEPOINT cockroach_restart */ + if !env.isEmpty() { + err := pgerror.Newf(pgcode.Syntax, + "SAVEPOINT %s cannot be nested", + tree.ErrNameString(commitOnReleaseSavepointName)) + ev, payload := ex.makeErrEvent(err, s) + return ev, payload, nil + } + // We want to disallow restart SAVEPOINTs to be issued after a KV + // transaction has started running. The client txn's statement count + // indicates how many statements have been executed as part of this + // transaction. It is desirable to allow metadata queries against + // vtables to proceed before starting a SAVEPOINT for better ORM + // compatibility. + // + // See also: + // https://github.com/cockroachdb/cockroach/issues/15012 + if ex.state.mu.txn.Active() { + err := pgerror.Newf(pgcode.Syntax, + "SAVEPOINT %s needs to be the first statement in a transaction", + tree.ErrNameString(commitOnReleaseSavepointName)) + ev, payload := ex.makeErrEvent(err, s) + return ev, payload, nil + } } - if err := ex.validateSavepointName(s.Name); err != nil { + + token, err := ex.state.mu.txn.CreateSavepoint(ctx) + if err != nil { ev, payload := ex.makeErrEvent(err, s) return ev, payload, nil } - // We want to disallow SAVEPOINTs to be issued after a KV transaction has - // started running. The client txn's statement count indicates how many - // statements have been executed as part of this transaction. It is - // desirable to allow metadata queries against vtables to proceed - // before starting a SAVEPOINT for better ORM compatibility. - // See also: - // https://github.com/cockroachdb/cockroach/issues/15012 - if ex.state.mu.txn.Active() { - err := pgerror.Newf(pgcode.Syntax, - "SAVEPOINT %s needs to be the first statement in a transaction", restartSavepointName) - ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + entry := savepoint{ + name: s.Name, + initial: !ex.state.mu.txn.Active(), + commitOnRelease: commitOnRelease, + kvToken: token, + numDDL: ex.extraTxnState.numDDL, } - ex.state.activeRestartSavepointName = s.Name - // Note that Savepoint doesn't have a corresponding plan node. - // This here is all the execution there is. + if !entry.initial { + entry.txnID = ex.state.mu.txn.ID() + entry.epoch = ex.state.mu.txn.Epoch() + } + env.push(entry) + return nil, nil, nil } -// execReleaseSavepointInOpenState runs a RELEASE SAVEPOINT statement -// inside an open txn. -func (ex *connExecutor) execReleaseSavepointInOpenState( +// execRelease runs a RELEASE SAVEPOINT statement inside an open txn. +func (ex *connExecutor) execRelease( ctx context.Context, s *tree.ReleaseSavepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - if err := ex.validateSavepointName(s.Savepoint); err != nil { +) (fsm.Event, fsm.EventPayload) { + env := &ex.extraTxnState.savepoints + entry, idx := env.find(s.Savepoint) + if entry == nil { + ev, payload := ex.makeErrEvent( + pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint %s does not exist", &s.Savepoint), s) + return ev, payload + } + + // Discard our savepoint and all further ones. Depending on what happens with + // the release below, we might add this savepoint back. + env.popToIdx(idx - 1) + + if entry.commitOnRelease { + res.ResetStmtType((*tree.CommitTransaction)(nil)) + err := ex.commitSQLTransactionInternal(ctx, s) + if err == nil { + return eventTxnReleased{}, nil + } + // Committing the transaction failed. We'll go to state RestartWait if + // it's a retriable error, or to state RollbackWait otherwise. + if errIsRetriable(err) { + // Add the savepoint back. We want to allow a ROLLBACK TO SAVEPOINT + // cockroach_restart (that's the whole point of commitOnRelease). + env.push(*entry) + + rc, canAutoRetry := ex.getRewindTxnCapability() + ev := eventRetriableErr{ + IsCommit: fsm.FromBool(isCommit(s)), + CanAutoRetry: fsm.FromBool(canAutoRetry), + } + payload := eventRetriableErrPayload{err: err, rewCap: rc} + return ev, payload + } + + // Non-retriable error. The transaction might have committed (i.e. the + // error might be ambiguous). We can't allow a ROLLBACK TO SAVEPOINT to + // recover the transaction, so we're not adding the savepoint back. + ex.rollbackSQLTransaction(ctx) + ev := eventNonRetriableErr{IsCommit: fsm.FromBool(false)} + payload := eventNonRetriableErrPayload{err: err} + return ev, payload + } + + if err := ex.state.mu.txn.ReleaseSavepoint(ctx, entry.kvToken); err != nil { ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + return ev, payload } - // ReleaseSavepoint is executed fully here; there's no plan for it. - ev, payload := ex.runReleaseRestartSavepointAsTxnCommit(ctx, s) - res.ResetStmtType((*tree.CommitTransaction)(nil)) - return ev, payload, nil + + return nil, nil } // execRollbackToSavepointInOpenState runs a ROLLBACK TO SAVEPOINT // statement inside an open txn. func (ex *connExecutor) execRollbackToSavepointInOpenState( ctx context.Context, s *tree.RollbackToSavepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - if err := ex.validateSavepointName(s.Savepoint); err != nil { +) (fsm.Event, fsm.EventPayload) { + entry, idx := ex.extraTxnState.savepoints.find(s.Savepoint) + if entry == nil { + ev, payload := ex.makeErrEvent(pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint %s does not exist", &s.Savepoint), s) + return ev, payload + } + + // We don't yet support rolling back over DDL. Instead of creating an + // inconsistent txn or schema state, prefer to tell the users we don't know + // how to proceed yet. Initial savepoints are a special case - we can always + // rollback to them because we can reset all the schema change state. + if !entry.initial && ex.extraTxnState.numDDL > entry.numDDL { + ev, payload := ex.makeErrEvent(unimplemented.NewWithIssueDetail(10735, "rollback-after-ddl", + "ROLLBACK TO SAVEPOINT not yet supported after DDL statements"), s) + return ev, payload + } + + if err := ex.state.mu.txn.RollbackToSavepoint(ctx, entry.kvToken); err != nil { ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + return ev, payload } - ex.state.activeRestartSavepointName = "" - res.ResetStmtType((*tree.Savepoint)(nil)) - return eventTxnRestart{}, nil /* payload */, nil + ex.extraTxnState.savepoints.popToIdx(idx) + + if entry.initial { + return eventTxnRestart{}, nil + } + // No event is necessary; there's nothing for the state machine to do. + return nil, nil } -// execSavepointInAbortedState runs a SAVEPOINT statement when a txn is aborted. -// It also contains the logic for ROLLBACK TO SAVEPOINT. -// TODO(knz): split this in different functions. -func (ex *connExecutor) execSavepointInAbortedState( - ctx context.Context, inRestartWait bool, s tree.Statement, res RestrictedCommandResult, +func (ex *connExecutor) execRollbackToSavepointInAbortedState( + ctx context.Context, s *tree.RollbackToSavepoint, ) (fsm.Event, fsm.EventPayload) { - // We accept both the "ROLLBACK TO SAVEPOINT cockroach_restart" and the - // "SAVEPOINT cockroach_restart" commands to indicate client intent to - // retry a transaction in a RestartWait state. - var spName tree.Name - var isRollback bool - switch n := s.(type) { - case *tree.RollbackToSavepoint: - spName = n.Savepoint - isRollback = true - case *tree.Savepoint: - spName = n.Name - default: - panic("unreachable") - } - // If the user issued a SAVEPOINT in the abort state, validate - // as though there were no active savepoint. - if !isRollback { - ex.state.activeRestartSavepointName = "" - } - if err := ex.validateSavepointName(spName); err != nil { + makeErr := func(err error) (fsm.Event, fsm.EventPayload) { ev := eventNonRetriableErr{IsCommit: fsm.False} payload := eventNonRetriableErrPayload{ err: err, } return ev, payload } - // Either clear or reset the current savepoint name so that - // ROLLBACK TO; SAVEPOINT; works. - if isRollback { - ex.state.activeRestartSavepointName = "" - } else { - ex.state.activeRestartSavepointName = spName + + entry, idx := ex.extraTxnState.savepoints.find(s.Savepoint) + if entry == nil { + return makeErr(pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint %s does not exist", tree.ErrString(&s.Savepoint))) } - res.ResetStmtType((*tree.RollbackTransaction)(nil)) + // We can always rollback to initial savepoints, but for non-initial ones we + // need to check that the underlying KV txn is still copacetic. + if !entry.initial { + curID, curEpoch := ex.state.mu.txn.ID(), ex.state.mu.txn.Epoch() + if !curID.Equal(entry.txnID) { + return ex.makeErrEvent(roachpb.NewTransactionRetryWithProtoRefreshError( + "cannot rollback to savepoint because the transaction has been aborted", + curID, + // The transaction inside this error doesn't matter. + roachpb.Transaction{}, + ), s) + } + if curEpoch != entry.epoch { + return ex.makeErrEvent(roachpb.NewTransactionRetryWithProtoRefreshError( + "cannot rollback to savepoint because the transaction experience a serializable restart", + curID, + // The transaction inside this error doesn't matter. + roachpb.Transaction{}, + ), s) + } + } - if inRestartWait { + ex.extraTxnState.savepoints.popToIdx(idx) + if err := ex.state.mu.txn.RollbackToSavepoint(ctx, entry.kvToken); err != nil { + return makeErr(err) + } + if entry.initial { return eventTxnRestart{}, nil } - // We accept ROLLBACK TO SAVEPOINT even after non-retryable errors to make - // it easy for client libraries that want to indiscriminately issue - // ROLLBACK TO SAVEPOINT after every error and possibly follow it with a - // ROLLBACK and also because we accept ROLLBACK TO SAVEPOINT in the Open - // state, so this is consistent. - // We start a new txn with the same sql timestamp and isolation as the - // current one. - - ev := eventTxnStart{ - ImplicitTxn: fsm.False, - } - rwMode := tree.ReadWrite - if ex.state.readOnly { - rwMode = tree.ReadOnly - } - ex.rollbackSQLTransaction(ctx) - payload := makeEventTxnStartPayload( - ex.state.priority, rwMode, ex.state.sqlTimestamp, - nil /* historicalTimestamp */, ex.transitionCtx) - return ev, payload + return eventSavepointRollback{}, nil } -// execRollbackToSavepointInAbortedState runs a ROLLBACK TO SAVEPOINT -// statement when a txn is aborted. -func (ex *connExecutor) execRollbackToSavepointInAbortedState( - ctx context.Context, inRestartWait bool, s tree.Statement, res RestrictedCommandResult, -) (fsm.Event, fsm.EventPayload) { - return ex.execSavepointInAbortedState(ctx, inRestartWait, s, res) +// isCommitOnReleaseSavepoint returns true if the savepoint name implies special +// release semantics: releasing it commits the underlying KV txn. +func (ex *connExecutor) isCommitOnReleaseSavepoint(savepoint tree.Name) bool { + if ex.sessionData.ForceSavepointRestart { + // The session setting force_savepoint_restart implies that all + // uses of the SAVEPOINT statement are targeting restarts. + return true + } + return strings.HasPrefix(string(savepoint), commitOnReleaseSavepointName) } -// runReleaseRestartSavepointAsTxnCommit executes a commit after -// RELEASE SAVEPOINT statement when using an explicit transaction. -func (ex *connExecutor) runReleaseRestartSavepointAsTxnCommit( - ctx context.Context, stmt tree.Statement, -) (fsm.Event, fsm.EventPayload) { - if ev, payload, ok := ex.commitSQLTransactionInternal(ctx, stmt); !ok { - return ev, payload - } - return eventTxnReleased{}, nil +type savepoint struct { + name tree.Name + + // initial is set if this savepoint has been created before performing any KV + // operations. If so, a ROLLBACK to it will work after a retriable error. If + // not set, then rolling back to it after a retriable error will return the + // retriable error again because reads might have been evaluated before the + // savepoint and such reads cannot have their timestamp forwarded without a + // refresh. + initial bool + + // commitOnRelease is set if the special syntax "SAVEPOINT cockroach_restart" + // was used. Such a savepoint is special in that a RELEASE actually commits + // the transaction - giving the client a change to find out about any + // retriable error and issue another "ROLLBACK TO SAVEPOINT cockroach_restart" + // afterwards. Regular savepoints (even top-level savepoints) cannot commit + // the transaction on RELEASE. + // + // Only an `initial` savepoint can have this set. + commitOnRelease bool + + kvToken client.SavepointToken + + // In the initial implementation, we refuse to roll back + // a savepoint if there was DDL performed "under it". + // TODO(knz): support partial DDL cancellation in pending txns. + numDDL int + + // txnID and epoch describe the transaction iteration that the savepoint is + // bound to. "initial" savepoints are not bound to a particular transaction as + // rolling back to an initial savepoint resets all the transaction state. + // Other savepoints are bound to a transaction iteration, and rolling back to + // them is not permitted if the transaction ID or epoch has changed in the + // meantime: + // - if the txnID has changed, then the previous transaction was aborted. We + // can't rollback to a savepoint since all the transaction's writes are gone. + // - if the epoch has changed, then we can only rollback if we performed a + // refresh for the reads that are not rolled back. We currently don't do this. + txnID uuid.UUID + epoch enginepb.TxnEpoch } -// validateSavepointName validates that it is that the provided ident -// matches the active savepoint name, begins with RestartSavepointName, -// or that force_savepoint_restart==true. We accept everything with the -// desired prefix because at least the C++ libpqxx appends sequence -// numbers to the savepoint name specified by the user. -func (ex *connExecutor) validateSavepointName(savepoint tree.Name) error { - if ex.state.activeRestartSavepointName != "" { - if savepoint == ex.state.activeRestartSavepointName { - return nil +type savepointStack []savepoint + +func (stack savepointStack) isEmpty() bool { return len(stack) == 0 } + +func (stack *savepointStack) clear() { *stack = (*stack)[:0] } + +func (stack *savepointStack) push(s savepoint) { + *stack = append(*stack, s) +} + +// find finds the most recent savepoint with the given name. +// +// The returned savepoint can be modified (rolling back modifies the kvToken). +// Callers shouldn't maintain references to the returned savepoint, as +// references can be invalidated by further operations on the savepoints. +func (stack savepointStack) find(sn tree.Name) (*savepoint, int) { + for i := len(stack) - 1; i >= 0; i-- { + if stack[i].name == sn { + return &stack[i], i } - return pgerror.Newf(pgcode.InvalidSavepointSpecification, - `SAVEPOINT %q is in use`, tree.ErrString(&ex.state.activeRestartSavepointName)) - } - if !ex.sessionData.ForceSavepointRestart && !strings.HasPrefix(string(savepoint), restartSavepointName) { - return unimplemented.NewWithIssueHint(10735, - "SAVEPOINT not supported except for "+restartSavepointName, - "Retryable transactions with arbitrary SAVEPOINT names can be enabled "+ - "with SET force_savepoint_restart=true") } - return nil + return nil, -1 } -// clearSavepoints clears all savepoints defined so far. This -// occurs when the SQL txn is closed (abort/commit) and upon -// a top-level restart. -func (ex *connExecutor) clearSavepoints() { - ex.state.activeRestartSavepointName = "" +// popToIdx pops (discards) all the savepoints at higher indexes. +func (stack *savepointStack) popToIdx(idx int) { + *stack = (*stack)[:idx+1] +} + +func (stack savepointStack) clone() savepointStack { + cpy := make(savepointStack, len(stack)) + copy(cpy, stack) + return cpy +} + +// runShowSavepointState executes a SHOW SAVEPOINT STATUS statement. +// +// If an error is returned, the connection needs to stop processing queries. +func (ex *connExecutor) runShowSavepointState( + ctx context.Context, res RestrictedCommandResult, +) error { + res.SetColumns(ctx, sqlbase.ResultColumns{ + {Name: "savepoint_name", Typ: types.String}, + {Name: "is_initial_savepoint", Typ: types.Bool}, + }) + + for _, entry := range ex.extraTxnState.savepoints { + if err := res.AddRow(ctx, tree.Datums{ + tree.NewDString(string(entry.name)), + tree.MakeDBool(tree.DBool(entry.initial)), + }); err != nil { + return err + } + } + return nil } -// restartSavepointName is the only savepoint ident that we accept. -const restartSavepointName string = "cockroach_restart" +// commitOnReleaseSavepointName is the name of the savepoint with special +// release semantics. +const commitOnReleaseSavepointName = "cockroach_restart" diff --git a/pkg/sql/conn_executor_savepoints_test.go b/pkg/sql/conn_executor_savepoints_test.go new file mode 100644 index 000000000000..dedc96238c0e --- /dev/null +++ b/pkg/sql/conn_executor_savepoints_test.go @@ -0,0 +1,194 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql_test + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" +) + +func TestSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + datadriven.Walk(t, "testdata/savepoints", func(t *testing.T, path string) { + + params := base.TestServerArgs{} + s, sqlConn, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + if _, err := sqlConn.Exec("CREATE TABLE progress(n INT, marker BOOL)"); err != nil { + t.Fatal(err) + } + + datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "sql": + // Implicitly abort any previously-ongoing txn. + _, _ = sqlConn.Exec("ABORT") + // Prepare for the next test. + if _, err := sqlConn.Exec("DELETE FROM progress"); err != nil { + td.Fatalf(t, "cleaning up: %v", err) + } + + // Prepare a buffer to accumulate the results. + var buf strings.Builder + + // We're going to execute the input line-by-line. + stmts := strings.Split(td.Input, "\n") + + // progressBar is going to show the cancellation of writes + // during rollbacks. + progressBar := make([]byte, len(stmts)) + erase := func(status string) { + char := byte('.') + if !isOpenTxn(status) { + char = 'X' + } + for i := range progressBar { + progressBar[i] = char + } + } + + // stepNum is the index of the current statement + // in the input. + var stepNum int + + // updateProgress loads the current set of writes + // into the progress bar. + updateProgress := func() { + rows, err := sqlConn.Query("SELECT n FROM progress") + if err != nil { + t.Logf("%d: reading progress: %v", stepNum, err) + // It's OK if we can't read this. + return + } + defer rows.Close() + for rows.Next() { + var n int + if err := rows.Scan(&n); err != nil { + td.Fatalf(t, "%d: unexpected error while reading progress: %v", stepNum, err) + } + if n < 1 || n > len(progressBar) { + td.Fatalf(t, "%d: unexpected stepnum in progress table: %d", stepNum, n) + } + progressBar[n-1] = '#' + } + } + + // getTxnStatus retrieves the current txn state. + // This is guaranteed to always succeed because SHOW TRANSACTION STATUS + // is an observer statement. + getTxnStatus := func() string { + row := sqlConn.QueryRow("SHOW TRANSACTION STATUS") + var status string + if err := row.Scan(&status); err != nil { + td.Fatalf(t, "%d: unable to retrieve txn status: %v", stepNum, err) + } + return status + } + // showSavepointStatus is like getTxnStatus but retrieves the + // savepoint stack. + showSavepointStatus := func() { + rows, err := sqlConn.Query("SHOW SAVEPOINT STATUS") + if err != nil { + td.Fatalf(t, "%d: unable to retrieve savepoint status: %v", stepNum, err) + } + defer rows.Close() + + comma := "" + hasSavepoints := false + for rows.Next() { + var name string + var isRestart bool + if err := rows.Scan(&name, &isRestart); err != nil { + td.Fatalf(t, "%d: unexpected error while reading savepoints: %v", stepNum, err) + } + if isRestart { + name += "(r)" + } + buf.WriteString(comma) + buf.WriteString(name) + hasSavepoints = true + comma = ">" + } + if !hasSavepoints { + buf.WriteString("(none)") + } + } + // report shows the progress of execution so far after + // each statement executed. + report := func(beforeStatus, afterStatus string) { + erase(afterStatus) + if isOpenTxn(afterStatus) { + updateProgress() + } + fmt.Fprintf(&buf, "-- %-11s -> %-11s %s ", beforeStatus, afterStatus, string(progressBar)) + buf.WriteByte(' ') + showSavepointStatus() + buf.WriteByte('\n') + } + + // The actual execution of the statements starts here. + + beforeStatus := getTxnStatus() + for i, stmt := range stmts { + stepNum = i + 1 + // Before each statement, mark the progress so far with + // a KV write. + if isOpenTxn(beforeStatus) { + _, err := sqlConn.Exec("INSERT INTO progress(n, marker) VALUES ($1, true)", stepNum) + if err != nil { + td.Fatalf(t, "%d: before-stmt: %v", stepNum, err) + } + } + + // Run the statement and report errors/results. + fmt.Fprintf(&buf, "%d: %s -- ", stepNum, stmt) + execRes, err := sqlConn.Exec(stmt) + if err != nil { + fmt.Fprintf(&buf, "%v\n", err) + } else { + nRows, err := execRes.RowsAffected() + if err != nil { + fmt.Fprintf(&buf, "error retrieving rows: %v\n", err) + } else { + fmt.Fprintf(&buf, "%d row%s\n", nRows, util.Pluralize(nRows)) + } + } + + // Report progress on the next line + afterStatus := getTxnStatus() + report(beforeStatus, afterStatus) + beforeStatus = afterStatus + } + + return buf.String() + + default: + td.Fatalf(t, "unknown directive: %s", td.Cmd) + } + return "" + }) + }) +} + +func isOpenTxn(status string) bool { + return status == "Open" || status == "NoTxn" +} diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 1f03e46b1cfb..4cbe1ce5296b 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -119,7 +119,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); // the kv-level transaction has already been committed). But we still // exercise this state to check that the server doesn't crash (which used to // happen - #9879). - tests := []string{"Open", "RestartWait", "CommitWait"} + tests := []string{"Open", "Aborted", "CommitWait"} for _, state := range tests { t.Run(state, func(t *testing.T) { // Create a low-level lib/pq connection so we can close it at will. @@ -151,7 +151,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); t.Fatal(err) } - if state == "RestartWait" || state == "CommitWait" { + if state == "CommitWait" { if _, err := tx.ExecContext(ctx, "SAVEPOINT cockroach_restart", nil); err != nil { t.Fatal(err) } @@ -173,7 +173,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); t.Fatal(err) } - if state == "RestartWait" || state == "CommitWait" { + if state == "CommitWait" { _, err := tx.ExecContext(ctx, "RELEASE SAVEPOINT cockroach_restart", nil) if state == "CommitWait" { if err != nil { diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index d997ee1c1061..da1988a52ec7 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -32,7 +32,6 @@ const ( OpenStateStr = "Open" AbortedStateStr = "Aborted" CommitWaitStateStr = "CommitWait" - RestartWaitStateStr = "RestartWait" InternalErrorStateStr = "InternalError" ) @@ -56,6 +55,8 @@ func (stateOpen) String() string { return OpenStateStr } +// stateAborted is entered on retriable errors. A ROLLBACK TO SAVEPOINT can +// move the transaction back to stateOpen. type stateAborted struct{} var _ fsm.State = &stateAborted{} @@ -64,14 +65,6 @@ func (stateAborted) String() string { return AbortedStateStr } -type stateRestartWait struct{} - -var _ fsm.State = &stateRestartWait{} - -func (stateRestartWait) String() string { - return RestartWaitStateStr -} - type stateCommitWait struct{} var _ fsm.State = &stateCommitWait{} @@ -93,10 +86,10 @@ func (stateInternalError) String() string { return InternalErrorStateStr } -func (stateNoTxn) State() {} -func (stateOpen) State() {} -func (stateAborted) State() {} -func (stateRestartWait) State() {} +func (stateNoTxn) State() {} +func (stateOpen) State() {} +func (stateAborted) State() {} + func (stateCommitWait) State() {} func (stateInternalError) State() {} @@ -117,6 +110,11 @@ type eventTxnStartPayload struct { historicalTimestamp *hlc.Timestamp } +// makeEventTxnStartPayload creates an eventTxnStartPayload. +// +// Pass noRolledBackSavepoint for rolledBackSavepoint when the transaction is +// not started as a result of rolling back to a savepoint (i.e. the normal +// case). func makeEventTxnStartPayload( pri roachpb.UserPriority, readOnly tree.ReadWriteMode, @@ -141,7 +139,11 @@ type eventTxnFinishPayload struct { commit bool } -type eventTxnRestart struct{} +// eventSavepointRollback is generated when we want to move from Aborted to Open +// through a ROLLBACK TO SAVEPOINT . Note that it is not +// generated when such a savepoint is rolled back to from the Open state. In +// that case no event is necessary. +type eventSavepointRollback struct{} type eventNonRetriableErr struct { IsCommit fsm.Bool @@ -183,21 +185,30 @@ func (p eventRetriableErrPayload) errorCause() error { // eventRetriableErrPayload implements payloadWithError. var _ payloadWithError = eventRetriableErrPayload{} -// eventTxnReleased is generated after a successful RELEASE SAVEPOINT -// cockroach_restart. It moves the state to CommitWait. +// eventTxnRestart is generated by a rollback to a savepoint placed at the +// beginning of the transaction (commonly SAVEPOINT cockroach_restart). +type eventTxnRestart struct{} + +// eventTxnReleased is generated after a successful +// RELEASE SAVEPOINT cockroach_restart. It moves the state to CommitWait. type eventTxnReleased struct{} +// eventTxnReleaseFailed represents a failed RELEASE SAVEPOINT cockroach_restart. +type eventTxnReleaseFailed struct{} + // payloadWithError is a common interface for the payloads that wrap an error. type payloadWithError interface { errorCause() error } -func (eventTxnStart) Event() {} -func (eventTxnFinish) Event() {} -func (eventTxnRestart) Event() {} -func (eventNonRetriableErr) Event() {} -func (eventRetriableErr) Event() {} -func (eventTxnReleased) Event() {} +func (eventTxnStart) Event() {} +func (eventTxnFinish) Event() {} +func (eventTxnRestart) Event() {} +func (eventSavepointRollback) Event() {} +func (eventNonRetriableErr) Event() {} +func (eventRetriableErr) Event() {} +func (eventTxnReleased) Event() {} +func (eventTxnReleaseFailed) Event() {} // TxnStateTransitions describe the transitions used by a connExecutor's // fsm.Machine. Args.Extended is a txnState, which is muted by the Actions. @@ -300,12 +311,21 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) ts.setAdvanceInfo(skipBatch, noRewind, noEvent) - ts.txnAbortCount.Inc(1) + return nil + }, + }, + // ROLLBACK TO SAVEPOINT cockroach. There's not much to do other than generating a + // txnRestart output event. + eventTxnRestart{}: { + Description: "ROLLBACK TO SAVEPOINT cockroach_restart", + Next: stateOpen{ImplicitTxn: fsm.False}, + Action: func(args fsm.Args) error { + args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart) return nil }, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { - Next: stateRestartWait{}, + Next: stateAborted{}, Action: func(args fsm.Args) error { // Note: Preparing the KV txn for restart has already happened by this // point. @@ -321,38 +341,22 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - // ROLLBACK TO SAVEPOINT - eventTxnRestart{}: { - Description: "ROLLBACK TO SAVEPOINT cockroach_restart", - Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - state := args.Extended.(*txnState) - // NOTE: We don't bump the txn timestamp on this restart. Should we? - // Well, if we generally supported savepoints and one would issue a - // rollback to a regular savepoint, clearly we couldn't bump the - // timestamp in that case. In the special case of the cockroach_restart - // savepoint, it's not clear to me what a user's expectation might be. - state.mu.txn.ManualRestart(args.Ctx, hlc.Timestamp{}) - args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart) - return nil - }, - }, }, /// Aborted // // Note that we don't handle any error events here. Any statement but a - // ROLLBACK is expected to not be passed to the state machine. + // ROLLBACK (TO SAVEPOINT) is expected to not be passed to the state machine. stateAborted{}: { eventTxnFinish{}: { Description: "ROLLBACK", Next: stateNoTxn{}, Action: func(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.txnAbortCount.Inc(1) // Note that the KV txn has been rolled back by now by statement // execution. - return args.Extended.(*txnState).finishTxn( - args.Payload.(eventTxnFinishPayload), - ) + return ts.finishTxn(args.Payload.(eventTxnFinishPayload)) }, }, eventNonRetriableErr{IsCommit: fsm.Any}: { @@ -364,51 +368,26 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - // ROLLBACK TO SAVEPOINT. We accept this in the Aborted state for the - // convenience of clients who want to issue ROLLBACK TO SAVEPOINT regardless - // of the preceding query error. - eventTxnStart{ImplicitTxn: fsm.False}: { - Description: "ROLLBACK TO SAVEPOINT cockroach_restart", + // ROLLBACK TO SAVEPOINT success. + eventSavepointRollback{}: { + Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success", Next: stateOpen{ImplicitTxn: fsm.False}, Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - ts.finishSQLTxn() - - payload := args.Payload.(eventTxnStartPayload) - - // Note that we pass the connection's context here, not args.Ctx which - // was the previous txn's context. - ts.resetForNewSQLTxn( - ts.connCtx, - explicitTxn, - payload.txnSQLTimestamp, - payload.historicalTimestamp, - payload.pri, payload.readOnly, - nil, /* txn */ - args.Payload.(eventTxnStartPayload).tranCtx, - ) - ts.setAdvanceInfo(advanceOne, noRewind, txnRestart) + args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, noEvent) return nil }, }, - }, - - stateRestartWait{}: { - // ROLLBACK (and also COMMIT which acts like ROLLBACK) - eventTxnFinish{}: { - Description: "ROLLBACK", - Next: stateNoTxn{}, + // ROLLBACK TO SAVEPOINT failed because the txn needs to restart. + eventRetriableErr{CanAutoRetry: fsm.Any, IsCommit: fsm.Any}: { + // This event doesn't change state, but it returns a skipBatch code. + Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart", + Next: stateAborted{}, Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - // The client is likely rolling back because it can't do retries. Let's - // count it as an abort. - ts.txnAbortCount.Inc(1) - return args.Extended.(*txnState).finishTxn( - args.Payload.(eventTxnFinishPayload), - ) + args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) + return nil }, }, - // ROLLBACK TO SAVEPOINT + // ROLLBACK TO SAVEPOINT cockroach_restart. eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, @@ -417,15 +396,6 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - eventNonRetriableErr{IsCommit: fsm.Any}: { - Next: stateAborted{}, - Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - ts.setAdvanceInfo(skipBatch, noRewind, noEvent) - ts.txnAbortCount.Inc(1) - return nil - }, - }, }, stateCommitWait{}: { diff --git a/pkg/sql/logictest/testdata/logic_test/manual_retry b/pkg/sql/logictest/testdata/logic_test/manual_retry index e9a9f70b2a2b..0e4a75b22302 100644 --- a/pkg/sql/logictest/testdata/logic_test/manual_retry +++ b/pkg/sql/logictest/testdata/logic_test/manual_retry @@ -51,24 +51,30 @@ SELECT currval('s') statement ok COMMIT -subtest savepoint_name - -statement ok -BEGIN - -# Ensure that ident case rules are used. -statement error pq: unimplemented: SAVEPOINT not supported except for cockroach_restart -SAVEPOINT "COCKROACH_RESTART" - -statement ok -ROLLBACK; BEGIN - -# Ensure that ident case rules are used. -statement ok -SAVEPOINT COCKROACH_RESTART - -statement ok -ROLLBACK +# subtest savepoint_name +# +# statement ok +# BEGIN +# +# !!! rewrite this test somehow to assert the release behavior, not the initial status +# # Ensure that ident case rules are used. +# statement ok +# SAVEPOINT "COCKROACH_RESTART" +# +# query TB +# SHOW SAVEPOINT STATUS +# ---- +# COCKROACH_RESTART false +# +# statement ok +# ROLLBACK; BEGIN +# +# # Ensure that ident case rules are used. +# statement ok +# SAVEPOINT COCKROACH_RESTART +# +# statement ok +# ROLLBACK subtest schema_change_with_rollback @@ -132,34 +138,50 @@ BEGIN TRANSACTION; SAVEPOINT something_else; COMMIT statement ok BEGIN TRANSACTION; SAVEPOINT foo -statement error pq: SAVEPOINT "foo" is in use +statement error pq: savepoint bar does not exist ROLLBACK TO SAVEPOINT bar # Verify we're doing the right thing for non-quoted idents. statement ok ROLLBACK TO SAVEPOINT FOO +statement ok +ABORT; BEGIN TRANSACTION + # Verify use of quoted idents. statement ok SAVEPOINT "Foo Bar" -statement error pq: SAVEPOINT "Foo Bar" is in use +statement error pq: savepoint foobar does not exist ROLLBACK TO SAVEPOINT FooBar # Verify case-sensitivity of quoted idents. -statement error pq: SAVEPOINT "Foo Bar" is in use +statement error pq: savepoint foo bar does not exist ROLLBACK TO SAVEPOINT "foo bar" statement ok ROLLBACK TO SAVEPOINT "Foo Bar" +query TB +SHOW SAVEPOINT STATUS +---- +Foo Bar true + +statement ok +ABORT; BEGIN TRANSACTION + # Verify case-sensitivity of quoted vs. unquoted idents. statement ok SAVEPOINT "UpperCase" -statement error pq: SAVEPOINT "UpperCase" is in use +statement error pq: savepoint uppercase does not exist ROLLBACK TO SAVEPOINT UpperCase +query TB +SHOW SAVEPOINT STATUS +---- +UpperCase true + statement ok ABORT diff --git a/pkg/sql/logictest/testdata/logic_test/txn b/pkg/sql/logictest/testdata/logic_test/txn index 89447d8266c0..ddac35839892 100644 --- a/pkg/sql/logictest/testdata/logic_test/txn +++ b/pkg/sql/logictest/testdata/logic_test/txn @@ -549,7 +549,7 @@ CommitWait statement ok COMMIT -# RestartWait state +# RestartWait state !!! adapt the comment; there's no longer a RestartWait. # The SELECT 1 is necessary to move the txn out of the AutoRetry state, # otherwise the next statement is automatically retried on the server. statement ok @@ -561,7 +561,7 @@ SELECT crdb_internal.force_retry('1h':::INTERVAL) query T SHOW TRANSACTION STATUS ---- -RestartWait +Aborted statement ok ROLLBACK TO SAVEPOINT cockroach_restart @@ -695,146 +695,6 @@ ROLLBACK; DROP SEQUENCE s -# Wrong savepoint name moves the txn state from RestartWait to Aborted. -statement ok -BEGIN TRANSACTION; - SAVEPOINT cockroach_restart; - SELECT 1 - -query error pgcode 40001 restart transaction: crdb_internal.force_retry\(\): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry\(\) -SELECT crdb_internal.force_retry('1h':::INTERVAL) - -query T -SHOW TRANSACTION STATUS ----- -RestartWait - -statement error pq: SAVEPOINT "cockroach_restart" is in use -ROLLBACK TO SAVEPOINT bogus_name - -query T -SHOW TRANSACTION STATUS ----- -Aborted - -statement ok -ROLLBACK - -# General savepoints -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -SAVEPOINT other - -statement ok -ROLLBACK - -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -RELEASE SAVEPOINT other - -statement ok -ROLLBACK - -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -ROLLBACK TO SAVEPOINT other - -statement ok -ROLLBACK - -# Savepoint must be first statement in a transaction. -statement ok -BEGIN TRANSACTION; UPSERT INTO kv VALUES('savepoint', 'true') - -statement error SAVEPOINT cockroach_restart needs to be the first statement in a transaction -SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - -# Can rollback to a savepoint if no statements have been executed. -statement ok -BEGIN TRANSACTION; SAVEPOINT cockroach_restart - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -# Can do it twice in a row. -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -# Can rollback after a transactional write, even from a non-error state. -statement ok -UPSERT INTO kv VALUES('savepoint', 'true') - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -COMMIT - -# Because we rolled back, the 'savepoint' insert will not have been committed. -query I -SELECT count(*) FROM kv WHERE k = 'savepoint' ----- -0 - - -# Can ROLLBACK TO SAVEPOINT even from a non-retryable error. -statement ok -BEGIN TRANSACTION; SAVEPOINT cockroach_restart - -statement error pq: relation "bogus_name" does not exist -SELECT * from bogus_name - -query T -SHOW TRANSACTION STATUS ----- -Aborted - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -query T -SHOW TRANSACTION STATUS ----- -Open - -statement ok -ROLLBACK - - -# ROLLBACK TO SAVEPOINT in a txn without a SAVEPOINT. -statement ok -BEGIN - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - - -# ROLLBACK TO SAVEPOINT in an aborted txn without a SAVEPOINT. -statement ok -BEGIN - -statement error pq: relation "bogus_name" does not exist -SELECT * from bogus_name - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - - # Test READ ONLY/WRITE syntax. statement ok @@ -1026,14 +886,6 @@ SELECT crdb_internal.force_retry('1h':::INTERVAL) statement ok ROLLBACK -# Check that we don't crash when doing a release that wasn't preceded by a -# savepoint. -statement ok -BEGIN; RELEASE SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - # restore the default statement ok SET default_transaction_read_only = false diff --git a/pkg/sql/metric_test.go b/pkg/sql/metric_test.go index 0a15e0ec370a..73322f2acd4e 100644 --- a/pkg/sql/metric_test.go +++ b/pkg/sql/metric_test.go @@ -252,9 +252,6 @@ func TestAbortCountErrorDuringTransaction(t *testing.T) { t.Fatal("Expected an error but didn't get one") } - if _, err := checkCounterDelta(s, sql.MetaTxnAbort, accum.txnAbortCount, 1); err != nil { - t.Error(err) - } if _, err := checkCounterDelta(s, sql.MetaTxnBeginStarted, accum.txnBeginCount, 1); err != nil { t.Error(err) } @@ -265,6 +262,10 @@ func TestAbortCountErrorDuringTransaction(t *testing.T) { if err := txn.Rollback(); err != nil { t.Fatal(err) } + + if _, err := checkCounterDelta(s, sql.MetaTxnAbort, accum.txnAbortCount, 1); err != nil { + t.Error(err) + } } func TestSavepointMetrics(t *testing.T) { @@ -309,8 +310,8 @@ func TestSavepointMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if _, err := txn.Exec("SAVEPOINT blah"); err == nil { - t.Fatal("expected an error but didn't get one") + if _, err := txn.Exec("SAVEPOINT blah"); err != nil { + t.Fatal(err) } if err := txn.Rollback(); err != nil { t.Fatal(err) diff --git a/pkg/sql/opt/exec/execbuilder/builder.go b/pkg/sql/opt/exec/execbuilder/builder.go index 9e4ba717df69..4a79c5d2cd4a 100644 --- a/pkg/sql/opt/exec/execbuilder/builder.go +++ b/pkg/sql/opt/exec/execbuilder/builder.go @@ -69,6 +69,11 @@ type Builder struct { // set to true, it ensures that a FOR UPDATE row-level locking mode is used // by scans. See forUpdateLocking. forceForUpdateLocking bool + + // -- output -- + + // IsDDL is set to true if the statement contains DDL. + IsDDL bool } // New constructs an instance of the execution node builder using the diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 5e981701e07a..b286a3ce2217 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -141,13 +141,17 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { var ep execPlan var err error - // This will set the system DB trigger for transactions containing - // schema-modifying statements that have no effect, such as - // `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;` - // where the table already exists. This will generate some false schema - // cache refreshes, but that's expected to be quite rare in practice. isDDL := opt.IsDDLOp(e) if isDDL { + // Mark the statement as containing DDL for use + // in the SQL executor. + b.IsDDL = true + + // This will set the system DB trigger for transactions containing + // schema-modifying statements that have no effect, such as + // `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;` + // where the table already exists. This will generate some false schema + // cache refreshes, but that's expected to be quite rare in practice. if err := b.evalCtx.Txn.SetSystemConfigTrigger(); err != nil { return execPlan{}, errors.WithSecondaryError( unimplemented.NewWithIssuef(26508, diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index c843360918f5..4a01a173db6c 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -301,6 +301,7 @@ func TestContextualHelp(t *testing.T) { {`SHOW TRANSACTION ISOLATION LEVEL ??`, `SHOW TRANSACTION`}, {`SHOW SYNTAX ??`, `SHOW SYNTAX`}, {`SHOW SYNTAX 'foo' ??`, `SHOW SYNTAX`}, + {`SHOW SAVEPOINT STATUS ??`, `SHOW SAVEPOINT`}, {`SHOW RANGE ??`, `SHOW RANGE`}, diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 720bde3533fe..505db42c9111 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -519,6 +519,8 @@ func TestParse(t *testing.T) { {`SHOW TRANSACTION STATUS`}, {`EXPLAIN SHOW TRANSACTION STATUS`}, + {`SHOW SAVEPOINT STATUS`}, + {`EXPLAIN SHOW SAVEPOINT STATUS`}, {`SHOW SYNTAX 'select 1'`}, {`EXPLAIN SHOW SYNTAX 'select 1'`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index c507449fef98..f86b72c86b5d 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -790,6 +790,7 @@ func newNameFromStr(s string) *tree.Name { %type show_sequences_stmt %type show_session_stmt %type show_sessions_stmt +%type show_savepoint_stmt %type show_stats_stmt %type show_syntax_stmt %type show_tables_stmt @@ -3386,6 +3387,7 @@ show_stmt: | show_ranges_stmt // EXTEND WITH HELP: SHOW RANGES | show_range_for_row_stmt | show_roles_stmt // EXTEND WITH HELP: SHOW ROLES +| show_savepoint_stmt // EXTEND WITH HELP: SHOW SAVEPOINT | show_schemas_stmt // EXTEND WITH HELP: SHOW SCHEMAS | show_sequences_stmt // EXTEND WITH HELP: SHOW SEQUENCES | show_session_stmt // EXTEND WITH HELP: SHOW SESSION @@ -3838,6 +3840,17 @@ show_syntax_stmt: } | SHOW SYNTAX error // SHOW HELP: SHOW SYNTAX +// %Help: SHOW SAVEPOINT - display current savepoint properties +// %Category: Cfg +// %Text: SHOW SAVEPOINT STATUS +show_savepoint_stmt: + SHOW SAVEPOINT STATUS + { + /* SKIP DOC */ + $$.val = &tree.ShowSavepointStatus{} + } +| SHOW SAVEPOINT error // SHOW HELP: SHOW SAVEPOINT + // %Help: SHOW TRANSACTION - display current transaction properties // %Category: Cfg // %Text: SHOW TRANSACTION {ISOLATION LEVEL | PRIORITY | STATUS} diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 85336e393754..5f27d5518ee9 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -443,6 +443,9 @@ const ( // planFlagImplicitTxn marks that the plan was run inside of an implicit // transaction. planFlagImplicitTxn + + // planFlagIsDDL marks that the plan contains DDL. + planFlagIsDDL ) func (pf planFlags) IsSet(flag planFlags) bool { diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index b58103436c93..e696e12ef463 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -164,7 +164,8 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { // Build the plan tree. root := execMemo.RootExpr() execFactory := makeExecFactory(p) - plan, err := execbuilder.New(&execFactory, execMemo, &opc.catalog, root, p.EvalContext()).Build() + bld := execbuilder.New(&execFactory, execMemo, &opc.catalog, root, p.EvalContext()) + plan, err := bld.Build() if err != nil { return err } @@ -172,6 +173,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { result := plan.(*planTop) result.stmt = stmt result.flags = opc.flags + if bld.IsDDL { + result.flags.Set(planFlagIsDDL) + } cols := planColumns(result.plan) if stmt.ExpectedTypes != nil { diff --git a/pkg/sql/sem/tree/show.go b/pkg/sql/sem/tree/show.go index 95a441914137..326fc8e4848a 100644 --- a/pkg/sql/sem/tree/show.go +++ b/pkg/sql/sem/tree/show.go @@ -395,6 +395,15 @@ func (node *ShowTransactionStatus) Format(ctx *FmtCtx) { ctx.WriteString("SHOW TRANSACTION STATUS") } +// ShowSavepointStatus represents a SHOW SAVEPOINT STATUS statement. +type ShowSavepointStatus struct { +} + +// Format implements the NodeFormatter interface. +func (node *ShowSavepointStatus) Format(ctx *FmtCtx) { + ctx.WriteString("SHOW SAVEPOINT STATUS") +} + // ShowUsers represents a SHOW USERS statement. type ShowUsers struct { } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index dc855745ba30..c33f5ba78e58 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -777,6 +777,14 @@ func (*ShowTransactionStatus) StatementTag() string { return "SHOW TRANSACTION S func (*ShowTransactionStatus) observerStatement() {} +// StatementType implements the Statement interface. +func (*ShowSavepointStatus) StatementType() StatementType { return Rows } + +// StatementTag returns a short string identifying the type of statement. +func (*ShowSavepointStatus) StatementTag() string { return "SHOW SAVEPOINT STATUS" } + +func (*ShowSavepointStatus) observerStatement() {} + // StatementType implements the Statement interface. func (*ShowUsers) StatementType() StatementType { return Rows } @@ -971,6 +979,7 @@ func (n *ShowRanges) String() string { return AsString(n) } func (n *ShowRangeForRow) String() string { return AsString(n) } func (n *ShowRoleGrants) String() string { return AsString(n) } func (n *ShowRoles) String() string { return AsString(n) } +func (n *ShowSavepointStatus) String() string { return AsString(n) } func (n *ShowSchemas) String() string { return AsString(n) } func (n *ShowSequences) String() string { return AsString(n) } func (n *ShowSessions) String() string { return AsString(n) } diff --git a/pkg/sql/sqlbase/errors.go b/pkg/sql/sqlbase/errors.go index 78febeb2366e..cd769a044a78 100644 --- a/pkg/sql/sqlbase/errors.go +++ b/pkg/sql/sqlbase/errors.go @@ -30,7 +30,8 @@ const ( ) // NewTransactionAbortedError creates an error for trying to run a command in -// the context of transaction that's already aborted. +// the context of transaction that's in the aborted state. Any statement other +// than ROLLBACK TO SAVEPOINT will return this error. func NewTransactionAbortedError(customMsg string) error { if customMsg != "" { return pgerror.Newf( diff --git a/pkg/sql/testdata/savepoints b/pkg/sql/testdata/savepoints new file mode 100644 index 000000000000..05f91b81c49e --- /dev/null +++ b/pkg/sql/testdata/savepoints @@ -0,0 +1,539 @@ +# This test exercises the savepoint state in the conn executor. + +subtest implicit_release_at_end + +# It's OK to leave savepoints open when the txn commits. +# This releases everything. +sql +BEGIN +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###.. foo>bar +4: SAVEPOINT baz -- 0 rows +-- Open -> Open ####. foo>bar>baz +5: COMMIT -- 0 rows +-- Open -> NoTxn ##### (none) + +# Ditto rollbacks. +sql +BEGIN +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +ROLLBACK +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###.. foo>bar +4: SAVEPOINT baz -- 0 rows +-- Open -> Open ####. foo>bar>baz +5: ROLLBACK -- 0 rows +-- Open -> NoTxn #.... (none) + +subtest end + +subtest savepoint_stack + +sql +BEGIN +SAVEPOINT foo +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +ROLLBACK TO SAVEPOINT foo +SAVEPOINT baz +RELEASE SAVEPOINT foo +SAVEPOINT bar +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.......... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##......... foo +3: SAVEPOINT foo -- 0 rows +-- Open -> Open ###........ foo>foo +4: SAVEPOINT bar -- 0 rows +-- Open -> Open ####....... foo>foo>bar +5: SAVEPOINT baz -- 0 rows +-- Open -> Open #####...... foo>foo>bar>baz +6: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ###........ foo>foo +7: SAVEPOINT baz -- 0 rows +-- Open -> Open ###...#.... foo>foo>baz +8: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###...##... foo +9: SAVEPOINT bar -- 0 rows +-- Open -> Open ###...###.. foo>bar +10: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###...####. (none) +11: COMMIT -- 0 rows +-- Open -> NoTxn ###...##### (none) + + +subtest end + +subtest savepoint_release_vs_rollback + +# A rollback keeps the savepoint active. +sql +BEGIN +SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +5: COMMIT -- 0 rows +-- Open -> NoTxn ##..# (none) + +# A release does not. +sql +BEGIN +SAVEPOINT foo +RELEASE SAVEPOINT foo +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###.. (none) +4: RELEASE SAVEPOINT foo -- pq: savepoint foo does not exist +-- Open -> Aborted XXXXX (none) +5: COMMIT -- 0 rows +-- Aborted -> NoTxn #.... (none) + +subtest end + + +subtest rollback_after_sql_error + +sql +BEGIN +SAVEPOINT foo +SELECT nonexistent +ROLLBACK TO SAVEPOINT foo +SELECT 123 +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #..... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##.... foo +3: SELECT nonexistent -- pq: column "nonexistent" does not exist +-- Open -> Aborted XXXXXX foo +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open ##.... foo +5: SELECT 123 -- 1 row +-- Open -> Open ##..#. foo +6: COMMIT -- 0 rows +-- Open -> NoTxn ##..## (none) + +subtest end + +subtest rollback_after_dup_error + +sql +CREATE TABLE t(x INT UNIQUE) +INSERT INTO t(x) VALUES (1) +BEGIN +SAVEPOINT foo +INSERT INTO t(x) VALUES (1) +ROLLBACK TO SAVEPOINT foo +INSERT INTO t(x) VALUES (2) +COMMIT +---- +1: CREATE TABLE t(x INT UNIQUE) -- 0 rows +-- NoTxn -> NoTxn #....... (none) +2: INSERT INTO t(x) VALUES (1) -- 1 row +-- NoTxn -> NoTxn ##...... (none) +3: BEGIN -- 0 rows +-- NoTxn -> Open ###..... (none) +4: SAVEPOINT foo -- 0 rows +-- Open -> Open ####.... foo +5: INSERT INTO t(x) VALUES (1) -- pq: duplicate key value (x)=(1) violates unique constraint "t_x_key" +-- Open -> Aborted XXXXXXXX foo +6: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open ####.... foo +7: INSERT INTO t(x) VALUES (2) -- 1 row +-- Open -> Open ####..#. foo +8: COMMIT -- 0 rows +-- Open -> NoTxn ####..## (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest rollback_after_ddl + +# DDL under savepoints is fine as long as there is no rollback. +# Note: we do two DDL; the first one is there just to anchor +# the txn on the config range. The second DDL is the one +# exercised in the test. +sql +BEGIN; CREATE TABLE unused(x INT) +SAVEPOINT foo +CREATE TABLE t(x INT) +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN; CREATE TABLE unused(x INT) -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: CREATE TABLE t(x INT) -- 0 rows +-- Open -> Open ###.. foo +4: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ####. (none) +5: COMMIT -- 0 rows +-- Open -> NoTxn ##### (none) + +sql +DROP TABLE unused +DROP TABLE t +---- +1: DROP TABLE unused -- 0 rows +-- NoTxn -> NoTxn #. (none) +2: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn ## (none) + +# Rollback is unsupported after DDL for now. +# TODO(knz): Lift this limitation. + +sql +BEGIN; CREATE TABLE unused(x INT) +SAVEPOINT foo +CREATE TABLE t(x INT) +ROLLBACK TO SAVEPOINT foo +---- +1: BEGIN; CREATE TABLE unused(x INT) -- 0 rows +-- NoTxn -> Open #... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##.. foo +3: CREATE TABLE t(x INT) -- 0 rows +-- Open -> Open ###. foo +4: ROLLBACK TO SAVEPOINT foo -- pq: unimplemented: ROLLBACK TO SAVEPOINT not yet supported after DDL statements +-- Open -> Aborted XXXX foo + +subtest end + +subtest cockroach_restart_cant_be_nested + +sql +BEGIN +SAVEPOINT foo +SAVEPOINT cockroach_restart +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##. foo +3: SAVEPOINT cockroach_restart -- pq: SAVEPOINT cockroach_restart cannot be nested +-- Open -> Aborted XXX foo + +subtest end + +subtest invalid_uses + +sql +SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +RELEASE SAVEPOINT foo +---- +1: SAVEPOINT foo -- pq: there is no transaction in progress +-- NoTxn -> NoTxn #.. (none) +2: ROLLBACK TO SAVEPOINT foo -- pq: savepoint foo does not exist +-- NoTxn -> NoTxn ##. (none) +3: RELEASE SAVEPOINT foo -- pq: there is no transaction in progress +-- NoTxn -> NoTxn ### (none) + +sql +BEGIN +SAVEPOINT foo +RELEASE SAVEPOINT bar +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##. foo +3: RELEASE SAVEPOINT bar -- pq: savepoint bar does not exist +-- Open -> Aborted XXX foo + +sql +BEGIN +SAVEPOINT foo +ROLLBACK TO SAVEPOINT bar +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##. foo +3: ROLLBACK TO SAVEPOINT bar -- pq: savepoint bar does not exist +-- Open -> Aborted XXX foo + +subtest end + +subtest rollback_after_error + +# check that we can rollback after an error +sql +BEGIN; SAVEPOINT foo +SELECT * FROM bogus_name +ROLLBACK TO SAVEPOINT foo +ROLLBACK +---- +1: BEGIN; SAVEPOINT foo -- 0 rows +-- NoTxn -> Open #... foo(r) +2: SELECT * FROM bogus_name -- pq: relation "bogus_name" does not exist +-- Open -> Aborted XXXX foo(r) +3: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open #... foo(r) +4: ROLLBACK -- 0 rows +-- Open -> NoTxn #... (none) + +# check that we can rollback after a retriable error to an initial savepoint +sql +BEGIN; SAVEPOINT init +SELECT crdb_internal.force_retry('100ms') +ROLLBACK TO SAVEPOINT init +ROLLBACK +---- +1: BEGIN; SAVEPOINT init -- 0 rows +-- NoTxn -> Open #... init(r) +2: SELECT crdb_internal.force_retry('100ms') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXX init(r) +3: ROLLBACK TO SAVEPOINT init -- 0 rows +-- Aborted -> Open #... init(r) +4: ROLLBACK -- 0 rows +-- Open -> NoTxn #... (none) + +# Check that, after a retriable error, rolling back to anything an initial +# savepoint fails with a retriable error. +sql +CREATE TABLE t(x INT) +BEGIN; SAVEPOINT init; SELECT count(1) from t; SAVEPOINT inner_savepoint +SELECT crdb_internal.force_retry('100ms') +ROLLBACK TO SAVEPOINT inner_savepoint +ROLLBACK TO SAVEPOINT init +ROLLBACK; DROP TABLE t +---- +1: CREATE TABLE t(x INT) -- 0 rows +-- NoTxn -> NoTxn #..... (none) +2: BEGIN; SAVEPOINT init; SELECT count(1) from t; SAVEPOINT inner_savepoint -- 0 rows +-- NoTxn -> Open ##.... init(r)>inner_savepoint +3: SELECT crdb_internal.force_retry('100ms') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXXXX init(r)>inner_savepoint +4: ROLLBACK TO SAVEPOINT inner_savepoint -- pq: restart transaction: TransactionRetryWithProtoRefreshError: cannot rollback to savepoint because the transaction experience a serializable restart +-- Aborted -> Aborted XXXXXX init(r)>inner_savepoint +5: ROLLBACK TO SAVEPOINT init -- 0 rows +-- Aborted -> Open ##.... init(r) +6: ROLLBACK; DROP TABLE t -- 0 rows +-- Open -> NoTxn ##.... (none) + +subtest end + + +subtest restart + +subtest restart/must_be_first_in_txn + +sql +CREATE TABLE t(x INT) +BEGIN +INSERT INTO t(x) VALUES (1) +SAVEPOINT cockroach_restart +---- +1: CREATE TABLE t(x INT) -- 0 rows +-- NoTxn -> NoTxn #... (none) +2: BEGIN -- 0 rows +-- NoTxn -> Open ##.. (none) +3: INSERT INTO t(x) VALUES (1) -- 1 row +-- Open -> Open ###. (none) +4: SAVEPOINT cockroach_restart -- pq: SAVEPOINT cockroach_restart needs to be the first statement in a transaction +-- Open -> Aborted XXXX (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest restart/release_without_savepoint + +sql +BEGIN +RELEASE SAVEPOINT cockroach_restart +ROLLBACK +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: RELEASE SAVEPOINT cockroach_restart -- pq: savepoint cockroach_restart does not exist +-- Open -> Aborted XXX (none) +3: ROLLBACK -- 0 rows +-- Aborted -> NoTxn #.. (none) + +subtest end + +subtest restart/rollback_without_savepoint + +# ROLLBACK TO SAVEPOINT in an open txn without a SAVEPOINT. +sql +BEGIN +ROLLBACK TO SAVEPOINT cockroach_restart +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #. (none) +2: ROLLBACK TO SAVEPOINT cockroach_restart -- pq: savepoint cockroach_restart does not exist +-- Open -> Aborted XX (none) + +# ROLLBACK TO SAVEPOINT in an aborted txn without a SAVEPOINT. +sql +BEGIN +SELECT * FROM bogus_name +ROLLBACK TO SAVEPOINT cockroach_restart +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SELECT * FROM bogus_name -- pq: relation "bogus_name" does not exist +-- Open -> Aborted XXX (none) +3: ROLLBACK TO SAVEPOINT cockroach_restart -- pq: savepoint cockroach_restart does not exist +-- Aborted -> Aborted XXX (none) + +subtest end + +subtest restart/rollbacks + +sql +CREATE TABLE t(x INT); +BEGIN; SAVEPOINT cockroach_restart +ROLLBACK TO SAVEPOINT cockroach_restart +ROLLBACK TO SAVEPOINT cockroach_restart +INSERT INTO t(x) VALUES (1) +ROLLBACK TO SAVEPOINT cockroach_restart +COMMIT +---- +1: CREATE TABLE t(x INT); -- 0 rows +-- NoTxn -> NoTxn #...... (none) +2: BEGIN; SAVEPOINT cockroach_restart -- 0 rows +-- NoTxn -> Open ##..... cockroach_restart(r) +3: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +4: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +5: INSERT INTO t(x) VALUES (1) -- 1 row +-- Open -> Open ##..#.. cockroach_restart(r) +6: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +7: COMMIT -- 0 rows +-- Open -> NoTxn ##....# (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + + +subtest restart/savepoint_under_restart + +sql +BEGIN; SAVEPOINT cockroach_restart +SAVEPOINT foo +SAVEPOINT bar +ROLLBACK TO SAVEPOINT foo +SELECT crdb_internal.force_retry('1s') +ROLLBACK TO SAVEPOINT cockroach_restart +SELECT 123 +COMMIT +---- +1: BEGIN; SAVEPOINT cockroach_restart -- 0 rows +-- NoTxn -> Open #....... cockroach_restart(r) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##...... cockroach_restart(r)>foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###..... cockroach_restart(r)>foo>bar +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##...... cockroach_restart(r)>foo +5: SELECT crdb_internal.force_retry('1s') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXXXXXX cockroach_restart(r)>foo +6: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Aborted -> Open #....... cockroach_restart(r) +7: SELECT 123 -- 1 row +-- Open -> Open #.....#. cockroach_restart(r) +8: COMMIT -- 0 rows +-- Open -> NoTxn #.....## (none) + +subtest end + +subtest restart/all_savepoints_disabled + +# Under "force_savepoint_restart", every savepoint +# is a restart savepoint. + +sql +SET force_savepoint_restart = true +BEGIN; SAVEPOINT foo +SAVEPOINT bar +---- +1: SET force_savepoint_restart = true -- 0 rows +-- NoTxn -> NoTxn #.. (none) +2: BEGIN; SAVEPOINT foo -- 0 rows +-- NoTxn -> Open ##. foo(r) +3: SAVEPOINT bar -- pq: SAVEPOINT cockroach_restart cannot be nested +-- Open -> Aborted XXX foo(r) + +sql +SET force_savepoint_restart = false +---- +1: SET force_savepoint_restart = false -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest end + +# Test that the rewinding we do when performing an automatic retry restores the +# savepoint stack properly. +subtest rewing_on_automatic_restarts + +# We're going to generate a retriable error that will rewind us back to the +# SELECT statement (not to the original SAVEPOINT statement since that one is +# special and we advance the rewind position past it). The test checks that, +# after every restart, the RELEASE works because the savepoint has be +# re-instituted before we rewind. +sql +BEGIN; SAVEPOINT a; SELECT 42; RELEASE a; SELECT crdb_internal.force_retry('10ms'); COMMIT; +---- +1: BEGIN; SAVEPOINT a; SELECT 42; RELEASE a; SELECT crdb_internal.force_retry('10ms'); COMMIT; -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index a0745cdc7457..9252bb8a0a99 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1065,13 +1065,12 @@ func TestUnexpectedStatementInRestartWait(t *testing.T) { if err := tx.QueryRow("SHOW TRANSACTION STATUS").Scan(&state); err != nil { t.Fatal(err) } - if state != "RestartWait" { - t.Fatalf("expected state %s, got: %s", "RestartWait", state) + if state != "Aborted" { + t.Fatalf("expected state %s, got: %s", "Aborted", state) } if _, err := tx.Exec("SELECT 1"); !testutils.IsError(err, - `pq: Expected "ROLLBACK TO SAVEPOINT cockroach_restart": `+ - "current transaction is aborted, commands ignored until end of transaction block") { + `pq: current transaction is aborted, commands ignored until end of transaction block`) { t.Fatal(err) } if err := tx.QueryRow("SHOW TRANSACTION STATUS").Scan(&state); err != nil { @@ -1449,12 +1448,6 @@ func TestRollbackToSavepointFromUnusualStates(t *testing.T) { } } - // ROLLBACK TO SAVEPOINT with a wrong name - _, err := sqlDB.Exec("ROLLBACK TO SAVEPOINT foo") - if !testutils.IsError(err, "SAVEPOINT not supported except for cockroach_restart") { - t.Fatalf("unexpected error: %v", err) - } - tx, err := sqlDB.Begin() if err != nil { t.Fatal(err) @@ -1513,12 +1506,12 @@ func TestTxnAutoRetriesDisabledAfterResultsHaveBeenSentToClient(t *testing.T) { { name: "client_directed_retries", clientDirectedRetry: true, - expectedTxnStateAfterRetriableErr: "RestartWait", + expectedTxnStateAfterRetriableErr: "Aborted", }, { name: "no_client_directed_retries", clientDirectedRetry: false, - expectedTxnStateAfterRetriableErr: "RestartWait", + expectedTxnStateAfterRetriableErr: "Aborted", }, { name: "autocommit", diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 1b0dda449af1..4c4f45d94178 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -104,11 +104,6 @@ type txnState struct { // txnAbortCount is incremented whenever the state transitions to // stateAborted. txnAbortCount *metric.Counter - - // activeRestartSavepointName stores the name of the active - // top-level restart savepoint, or is empty if no top-level restart - // savepoint is active. - activeRestartSavepointName tree.Name } // txnType represents the type of a SQL transaction. @@ -376,7 +371,9 @@ const ( txnRollback // txnRestart means that the transaction is restarting. The iteration of the // txn just finished will not commit. It is generated when we're about to - // auto-retry a txn and after a "ROLLBACK TO SAVEPOINT cockroach_restart". + // auto-retry a txn and after a rollback to a savepoint placed at the start of + // the transaction. This allows such savepoints to reset more state than other + // savepoints. txnRestart ) diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index 350a39446cde..9b3fba53c190 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -117,12 +117,6 @@ func (tc *testContext) createAbortedState() (fsm.State, *txnState) { return stateAborted{}, ts } -func (tc *testContext) createRestartWaitState() (fsm.State, *txnState) { - _, ts := tc.createOpenState(explicitTxn) - s := stateRestartWait{} - return s, ts -} - func (tc *testContext) createCommitWaitState() (fsm.State, *txnState, error) { _, ts := tc.createOpenState(explicitTxn) // Commit the KV txn, simulating what the execution layer is doing. @@ -418,7 +412,7 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b }, - expState: stateRestartWait{}, + expState: stateAborted{}, expAdv: expAdvance{ expCode: skipBatch, expEv: noEvent, @@ -563,68 +557,47 @@ func TestTransitions(t *testing.T) { expTxn: nil, }, { - // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). - name: "Aborted->Starting", + // The txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted). + name: "Aborted->Open", init: func() (fsm.State, *txnState, error) { s, ts := testCon.createAbortedState() return s, ts, nil }, - ev: eventTxnStart{ImplicitTxn: fsm.False}, - evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), - nil /* historicalTimestamp */, tranCtx), + ev: eventSavepointRollback{}, expState: stateOpen{ImplicitTxn: fsm.False}, expAdv: expAdvance{ expCode: advanceOne, - expEv: txnRestart, + expEv: noEvent, }, expTxn: &expKVTxn{}, }, { - // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). - // Verify that the historical timestamp from the evPayload is propagated - // to the expTxn. - name: "Aborted->Starting (historical)", + // The txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted). + name: "Aborted->Restart", init: func() (fsm.State, *txnState, error) { s, ts := testCon.createAbortedState() return s, ts, nil }, - ev: eventTxnStart{ImplicitTxn: fsm.False}, - evPayload: makeEventTxnStartPayload(pri, tree.ReadOnly, now.GoTime(), - &now, tranCtx), + ev: eventTxnRestart{}, expState: stateOpen{ImplicitTxn: fsm.False}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, }, expTxn: &expKVTxn{ - tsNanos: proto.Int64(now.WallTime), - }, - }, - // - // Tests starting from the RestartWait state. - // - { - // The txn got finished, such as after a ROLLBACK. - name: "RestartWait->NoTxn", - init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() - err := ts.mu.txn.Rollback(ts.Ctx) - return s, ts, err - }, - ev: eventTxnFinish{}, - evPayload: eventTxnFinishPayload{commit: false}, - expState: stateNoTxn{}, - expAdv: expAdvance{ - expCode: advanceOne, - expEv: txnRollback, + userPriority: &pri, + tsNanos: &now.WallTime, + origTSNanos: &now.WallTime, + maxTSNanos: &maxTS.WallTime, }, - expTxn: nil, }, { - // The txn got restarted, through a ROLLBACK TO SAVEPOINT. - name: "RestartWait->Open", + // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). + // Verify that the historical timestamp from the evPayload is propagated + // to the expTxn. + name: "Aborted->Starting (historical)", init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() + s, ts := testCon.createAbortedState() return s, ts, nil }, ev: eventTxnRestart{}, @@ -633,22 +606,9 @@ func TestTransitions(t *testing.T) { expCode: advanceOne, expEv: txnRestart, }, - expTxn: &expKVTxn{}, - }, - { - name: "RestartWait->Aborted", - init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() - return s, ts, nil - }, - ev: eventNonRetriableErr{IsCommit: fsm.False}, - evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, - expState: stateAborted{}, - expAdv: expAdvance{ - expCode: skipBatch, - expEv: noEvent, + expTxn: &expKVTxn{ + tsNanos: proto.Int64(now.WallTime), }, - expTxn: &expKVTxn{}, }, // // Tests starting from the CommitWait state. diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv index 74c6a072896f..296b13bd89ee 100644 --- a/pkg/sql/txnstatetransitions_diagram.gv +++ b/pkg/sql/txnstatetransitions_diagram.gv @@ -15,8 +15,13 @@ digraph finite_state_machine { node [shape = circle]; "Aborted{}" -> "Aborted{}" [label = any other statement>] "Aborted{}" -> "Aborted{}" [label = any other statement>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] "Aborted{}" -> "NoTxn{}" [label = ROLLBACK>] - "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "NoTxn{}" [label = COMMIT>] @@ -26,7 +31,7 @@ digraph finite_state_machine { "NoTxn{}" -> "Open{ImplicitTxn:true}" [label = BEGIN, or before a statement running as an implicit txn>] "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:false}"] "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] - "Open{ImplicitTxn:false}" -> "RestartWait{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] @@ -40,8 +45,4 @@ digraph finite_state_machine { "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = COMMIT/ROLLBACK, or after a statement running as an implicit txn>] - "RestartWait{}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:false}"] - "RestartWait{}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:true}"] - "RestartWait{}" -> "NoTxn{}" [label = ROLLBACK>] - "RestartWait{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] } diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt index a46864c1dedb..82f7822ef152 100644 --- a/pkg/sql/txnstatetransitions_report.txt +++ b/pkg/sql/txnstatetransitions_report.txt @@ -4,15 +4,16 @@ Aborted{} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} - TxnFinish{} - TxnStart{ImplicitTxn:false} - missing events: RetriableErr{CanAutoRetry:false, IsCommit:false} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} - TxnReleased{} + SavepointRollback{} + TxnFinish{} TxnRestart{} + missing events: + TxnReleased{} + TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} CommitWait{} handled events: @@ -24,6 +25,7 @@ CommitWait{} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} TxnReleased{} TxnRestart{} TxnStart{ImplicitTxn:false} @@ -39,6 +41,7 @@ NoTxn{} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} TxnFinish{} TxnReleased{} TxnRestart{} @@ -54,6 +57,7 @@ Open{ImplicitTxn:false} TxnReleased{} TxnRestart{} missing events: + SavepointRollback{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} Open{ImplicitTxn:true} @@ -66,21 +70,8 @@ Open{ImplicitTxn:true} RetriableErr{CanAutoRetry:true, IsCommit:true} TxnFinish{} missing events: + SavepointRollback{} TxnReleased{} TxnRestart{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} -RestartWait{} - handled events: - NonRetriableErr{IsCommit:false} - NonRetriableErr{IsCommit:true} - TxnFinish{} - TxnRestart{} - missing events: - RetriableErr{CanAutoRetry:false, IsCommit:false} - RetriableErr{CanAutoRetry:false, IsCommit:true} - RetriableErr{CanAutoRetry:true, IsCommit:false} - RetriableErr{CanAutoRetry:true, IsCommit:true} - TxnReleased{} - TxnStart{ImplicitTxn:false} - TxnStart{ImplicitTxn:true}