diff --git a/pkg/cli/sql.go b/pkg/cli/sql.go index 5224dae435a1..5bb8b5cb7f17 100644 --- a/pkg/cli/sql.go +++ b/pkg/cli/sql.go @@ -746,8 +746,6 @@ func (c *cliState) refreshTransactionStatus() { c.lastKnownTxnStatus = " ERROR" case sql.CommitWaitStateStr: c.lastKnownTxnStatus = " DONE" - case sql.RestartWaitStateStr: - c.lastKnownTxnStatus = " RETRY" case sql.OpenStateStr: // The state AutoRetry is reported by the server as Open, so no need to // handle it here. diff --git a/pkg/kv/testdata/savepoints b/pkg/kv/testdata/savepoints index c973d48ae72f..4c3cd69b1769 100644 --- a/pkg/kv/testdata/savepoints +++ b/pkg/kv/testdata/savepoints @@ -319,56 +319,57 @@ commit subtest end -subtest rollback_across_retry - -# TODO(knz): change this test when rolling back across retries becomes -# supported. - -begin ----- -0 - -savepoint x ----- -0 - -retry ----- -synthetic error: TransactionRetryWithProtoRefreshError: forced retry -epoch: 0 -> 1 - -release x ----- -(*withstack.withStack) cannot release savepoint across transaction retries - -rollback x ----- -(*withstack.withStack) cannot rollback savepoint across transaction retries - -subtest end - -subtest rollback_across_abort - -begin ----- -0 - -savepoint x ----- -0 - -abort ----- -(*roachpb.TransactionRetryWithProtoRefreshError) -txn id changed - -release x ----- -(*withstack.withStack) cannot release savepoint across transaction retries - -rollback x ----- -(*withstack.withStack) cannot rollback savepoint across transaction retries - - -subtest end +# !!! +# subtest rollback_across_retry +# +# # TODO(knz): change this test when rolling back across retries becomes +# # supported. +# +# begin +# ---- +# 0 +# +# savepoint x +# ---- +# 0 +# +# retry +# ---- +# synthetic error: TransactionRetryWithProtoRefreshError: forced retry +# epoch: 0 -> 1 +# +# release x +# ---- +# (*withstack.withStack) cannot release savepoint across transaction retries +# +# rollback x +# ---- +# (*withstack.withStack) cannot rollback savepoint across transaction retries +# +# subtest end +# +# subtest rollback_across_abort +# +# begin +# ---- +# 0 +# +# savepoint x +# ---- +# 0 +# +# abort +# ---- +# (*roachpb.TransactionRetryWithProtoRefreshError) +# txn id changed +# +# release x +# ---- +# (*withstack.withStack) cannot release savepoint across transaction retries +# +# rollback x +# ---- +# (*withstack.withStack) cannot rollback savepoint across transaction retries +# +# +# subtest end diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index dd655e1354e5..a6b816d2d74f 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -176,6 +176,16 @@ type txnInterceptor interface { // increment. epochBumpedLocked() + // createSavepoint is used to populate a savepoint with all the state the + // needs to be restored on a rollback. + createSavepoint(context.Context, *savepoint) + + // rollbackToSavepoint is used to restore the state previously saved by createSavepoint(). + // implementations are allowed to modify the savepoint if they want to, optimizing it for future + // rollbacks. For example, the txnPipeliner removes tracked writes from the savepoint which have + // already been verified since the savepoint has been created. + rollbackToSavepoint(context.Context, *savepoint) + // closeLocked closes the interceptor. It is called when the TxnCoordSender // shuts down due to either a txn commit or a txn abort. The method will // be called exactly once from cleanupTxnLocked. diff --git a/pkg/kv/txn_coord_sender_savepoints.go b/pkg/kv/txn_coord_sender_savepoints.go index f83ea41b92f5..a1fc57f77c6f 100644 --- a/pkg/kv/txn_coord_sender_savepoints.go +++ b/pkg/kv/txn_coord_sender_savepoints.go @@ -19,36 +19,26 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/google/btree" ) -// savepointToken captures the state in the TxnCoordSender necessary -// to restore that state upon a savepoint rollback. -// -// TODO(knz,andrei): Currently this definition is only sufficient for -// just a few cases of rollbacks. This should be extended to cover -// more ground: -// -// - We also need the current size of txnSpanRefresher.refreshSpans the -// list of tracked reads, such that upon rollback we tell the -// refresher interceptor to discard further reads. -// - We also need something about in-flight writes -// (txnPipeliner.ifWrites). There I guess we need to take some sort of -// snapshot of the current in-flight writes and, on rollback, discard -// in-flight writes that are not part of the savepoint. But, on -// rollback, I don't think we should (nor am I sure that we could) -// simply overwrite the set of in-flight writes with the ones from the -// savepoint because writes that have been verified since the snapshot -// has been taken should continue to be verified. Basically, on -// rollback I think we need to intersect the savepoint with the -// current set of in-flight writes. -type savepointToken struct { - // seqNum is currently the only field that helps to "restore" - // anything upon a rollback. When used, it does not change anything - // in the TCS; instead it simply configures the txn to ignore all - // seqnums from this value until the most recent seqnum emitted by - // the TCS. +// savepoint captures the state in the TxnCoordSender necessary to restore that +// state upon a savepoint rollback. +type savepoint struct { + // seqNum represents the write seq num at the time the savepoint was created. + // On rollback, it configures the txn to ignore all seqnums from this value + // until the most recent seqnum. seqNum enginepb.TxnSeq + // txnSpanRefresher fields + refreshSpans []roachpb.Span + refreshInvalid bool + refreshSpanBytes int64 + + // txnPipeliner fields + ifWrites *btree.BTree // can be nil if no reads were tracked + ifBytes int64 + // txnID is used to verify that a rollback is not used to paper // over a txn abort error. txnID uuid.UUID @@ -61,10 +51,10 @@ type savepointToken struct { epoch enginepb.TxnEpoch } -var _ client.SavepointToken = (*savepointToken)(nil) +var _ client.SavepointToken = (*savepoint)(nil) // SavepointToken implements the client.SavepointToken interface. -func (s *savepointToken) SavepointToken() {} +func (s *savepoint) SavepointToken() {} // CreateSavepoint is part of the client.TxnSender interface. func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.SavepointToken, error) { @@ -83,11 +73,15 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.Savepoint return nil, ErrSavepointOperationInErrorTxn } - return &savepointToken{ - txnID: tc.mu.txn.ID, - epoch: tc.mu.txn.Epoch, - seqNum: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, - }, nil + s := &savepoint{ + txnID: tc.mu.txn.ID, + epoch: tc.mu.txn.Epoch, + } + for _, reqInt := range tc.interceptorStack { + reqInt.createSavepoint(nil, s) + } + + return s, nil } // RollbackToSavepoint is part of the client.TxnSender interface. @@ -108,9 +102,15 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s client.Save return err } - // TODO(knz): handle recoverable errors. - if tc.mu.txnState == txnError { - return unimplemented.New("rollback_error", "savepoint rollback after error") + if tc.mu.txnState == txnFinalized { + return unimplemented.New("rollback_error", "savepoint rollback finalized txn") + } + + // Restore the transaction's state, in case we're rewiding after an error. + tc.mu.txnState = txnPending + + for _, reqInt := range tc.interceptorStack { + reqInt.rollbackToSavepoint(ctx, st) } if st.seqNum == tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { @@ -163,17 +163,17 @@ func (tc *TxnCoordSender) assertNotFinalized() error { func (tc *TxnCoordSender) checkSavepointLocked( s client.SavepointToken, opName string, -) (*savepointToken, error) { - st, ok := s.(*savepointToken) +) (*savepoint, error) { + st, ok := s.(*savepoint) if !ok { return nil, errors.AssertionFailedf("expected savepointToken, got %T", s) } - if st.txnID != tc.mu.txn.ID { + if st.seqNum > 0 && st.txnID != tc.mu.txn.ID { return nil, errors.Newf("cannot %s savepoint across transaction retries", opName) } - if st.epoch != tc.mu.txn.Epoch { + if st.seqNum > 0 && st.epoch != tc.mu.txn.Epoch { return nil, errors.Newf("cannot %s savepoint across transaction retries", opName) } diff --git a/pkg/kv/txn_interceptor_committer.go b/pkg/kv/txn_interceptor_committer.go index 95f5efd9f1b8..cb4867212684 100644 --- a/pkg/kv/txn_interceptor_committer.go +++ b/pkg/kv/txn_interceptor_committer.go @@ -465,6 +465,12 @@ func (*txnCommitter) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked implements the txnReqInterceptor interface. func (tc *txnCommitter) epochBumpedLocked() {} +// createSavepoint is part of the txnReqInterceptor interface. +func (*txnCommitter) createSavepoint(context.Context, *savepoint) {} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (*txnCommitter) rollbackToSavepoint(context.Context, *savepoint) {} + // closeLocked implements the txnReqInterceptor interface. func (tc *txnCommitter) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_heartbeater.go b/pkg/kv/txn_interceptor_heartbeater.go index 8754bc5655e3..2a6bff581a66 100644 --- a/pkg/kv/txn_interceptor_heartbeater.go +++ b/pkg/kv/txn_interceptor_heartbeater.go @@ -190,6 +190,12 @@ func (*txnHeartbeater) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked is part of the txnInterceptor interface. func (h *txnHeartbeater) epochBumpedLocked() {} +// createSavepoint is part of the txnReqInterceptor interface. +func (*txnHeartbeater) createSavepoint(context.Context, *savepoint) {} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (*txnHeartbeater) rollbackToSavepoint(context.Context, *savepoint) {} + // closeLocked is part of the txnInterceptor interface. func (h *txnHeartbeater) closeLocked() { h.cancelHeartbeatLoopLocked() diff --git a/pkg/kv/txn_interceptor_metric_recorder.go b/pkg/kv/txn_interceptor_metric_recorder.go index 28c555e975a9..f792c91e9538 100644 --- a/pkg/kv/txn_interceptor_metric_recorder.go +++ b/pkg/kv/txn_interceptor_metric_recorder.go @@ -75,6 +75,12 @@ func (*txnMetricRecorder) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked is part of the txnInterceptor interface. func (*txnMetricRecorder) epochBumpedLocked() {} +// createSavepoint is part of the txnReqInterceptor interface. +func (*txnMetricRecorder) createSavepoint(context.Context, *savepoint) {} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (*txnMetricRecorder) rollbackToSavepoint(context.Context, *savepoint) {} + // closeLocked is part of the txnInterceptor interface. func (m *txnMetricRecorder) closeLocked() { if m.onePCCommit { diff --git a/pkg/kv/txn_interceptor_pipeliner.go b/pkg/kv/txn_interceptor_pipeliner.go index bd21f1f9eaef..e404f1b87bca 100644 --- a/pkg/kv/txn_interceptor_pipeliner.go +++ b/pkg/kv/txn_interceptor_pipeliner.go @@ -601,6 +601,64 @@ func (tp *txnPipeliner) epochBumpedLocked() { } } +// createSavepoint is part of the txnReqInterceptor interface. +func (tp *txnPipeliner) createSavepoint(ctx context.Context, s *savepoint) { + if tp.ifWrites.len() > 0 { + s.ifWrites = tp.ifWrites.t.Clone() + tp.ifWrites.cloned = true + } + s.ifBytes = tp.ifWrites.bytes +} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (tp *txnPipeliner) rollbackToSavepoint(ctx context.Context, s *savepoint) { + // Intersect the inflight writes from the savepoint to the ones from the + // txnPipeliner to avoid re-installing inflight writes that have been verified + // in the meantime. + if s.ifWrites != nil { + var alreadyVerified []btree.Item + s.ifWrites.Ascend(func(i btree.Item) bool { + // If tp is not currently tracking this write as inflight, then we'll + // delete it from the savepoint. + // Note that we're slightly pessimistic here: we might have verified the + // write captured by the savepoint and then started tracking a write at a + // higher seqnum for the same key. We don't keep track of what sequence + // numbers we've verified and which we haven't, so we're going to assume + // that the savepoint's write has not been verified. + // Note that having verified a write at seq num 2 recursively implies that + // we had verified a write at seq num 1. + if tp.ifWrites.t.Get(i) == nil { + alreadyVerified = append(alreadyVerified, i) + } + return true + }) + // TODO(andrei): Can I delete directly during the iteration above? + for _, i := range alreadyVerified { + s.ifWrites.Delete(i) + s.ifBytes -= keySize(i.(*inFlightWrite).Key) + } + } + + // Move all the writes in txnPipeliner that are not in the savepoint to the + // write footprint. We no longer care if these write succeed or fail, so we're + // going to stop tracking these as in-flight writes. The respective + // intents still need to be cleaned up at the end of the transaction. + tp.ifWrites.ascend(func(w *inFlightWrite) { + if s.ifWrites == nil || s.ifWrites.Get(w) == nil { + tp.footprint.insert(roachpb.Span{Key: w.Key}) + } + }) + tp.footprint.mergeAndSort() + + // Restore the inflightWrites from the savepoint. + if s.ifWrites == nil { + tp.ifWrites.t = nil + } else { + tp.ifWrites.t = s.ifWrites.Clone() + } + tp.ifWrites.bytes = s.ifBytes +} + // closeLocked implements the txnReqInterceptor interface. func (tp *txnPipeliner) closeLocked() {} @@ -633,6 +691,9 @@ type inFlightWriteSet struct { // Avoids allocs. tmp1, tmp2 inFlightWrite alloc inFlightWriteAlloc + // cloned is set if the BTree has been cloned. If it has, then the elements + // (*inFlightWrite's) might be shared, and so their memory cannot be reused. + cloned bool } // insert attempts to insert an in-flight write that has not been proven to have @@ -644,15 +705,19 @@ func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) { s.t = btree.New(txnPipelinerBtreeDegree) } - s.tmp1.Key = key - item := s.t.Get(&s.tmp1) - if item != nil { - otherW := item.(*inFlightWrite) - if seq > otherW.Sequence { - // Existing in-flight write has old information. - otherW.Sequence = seq + // If the tree has not been cloned before, we can attempt a fast path where we + // update an existing element. + if !s.cloned { + s.tmp1.Key = key + item := s.t.Get(&s.tmp1) + if item != nil { + otherW := item.(*inFlightWrite) + if seq > otherW.Sequence { + // Existing in-flight write has old information. + otherW.Sequence = seq + } + return } - return } w := s.alloc.alloc(key, seq) @@ -687,7 +752,7 @@ func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) { // Delete the write from the in-flight writes set. delItem := s.t.Delete(item) - if delItem != nil { + if delItem != nil && !s.cloned { *delItem.(*inFlightWrite) = inFlightWrite{} // for GC } s.bytes -= keySize(key) @@ -749,7 +814,7 @@ func (s *inFlightWriteSet) byteSize() int64 { } // clear purges all elements from the in-flight write set and frees associated -// memory. The reuse flag indicates whether the caller is intending to reu-use +// memory. The reuse flag indicates whether the caller is intending to reuse // the set or not. func (s *inFlightWriteSet) clear(reuse bool) { if s.t == nil { diff --git a/pkg/kv/txn_interceptor_pipeliner_test.go b/pkg/kv/txn_interceptor_pipeliner_test.go index 967fd7394f9d..4b78e80317df 100644 --- a/pkg/kv/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/txn_interceptor_pipeliner_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/google/btree" "github.com/stretchr/testify/require" ) @@ -1226,3 +1227,108 @@ func TestTxnPipelinerRecordsWritesOnFailure(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) require.Len(t, tp.footprint.asSlice(), 2) } + +func TestTxnPipelinerSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner() + + tp.ifWrites.insert(roachpb.Key("a"), 10) + tp.ifWrites.insert(roachpb.Key("b"), 11) + tp.ifWrites.insert(roachpb.Key("c"), 12) + require.Equal(t, 3, tp.ifWrites.len()) + + s := &savepoint{} + tp.createSavepoint(ctx, s) + + // Some more write after the savepoint. One of them is on key "c" that is part + // of the savepoint too, so we'll check that the savepoint still retains the + // earlier seq num. + tp.ifWrites.insert(roachpb.Key("c"), 13) + tp.ifWrites.insert(roachpb.Key("d"), 14) + require.Empty(t, tp.footprint.asSlice()) + + // Check that the savepoint has the correct writes. + { + var savepointWrites []inFlightWrite + s.ifWrites.Ascend(func(i btree.Item) bool { + savepointWrites = append(savepointWrites, *(i.(*inFlightWrite))) + return true + }) + require.Equal(t, + []inFlightWrite{ + inFlightWrite{roachpb.SequencedWrite{roachpb.Key("a"), 10}}, + inFlightWrite{roachpb.SequencedWrite{roachpb.Key("b"), 11}}, + inFlightWrite{roachpb.SequencedWrite{roachpb.Key("c"), 12}}, + }, + savepointWrites) + } + + // Now verify one of the writes. When we'll rollback to the savepoint below, + // we'll check that the verified write stayed verified. + txn := makeTxnProto() + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a")}}) + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 2) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &roachpb.GetRequest{}, ba.Requests[1].GetInner()) + + qiReq := ba.Requests[0].GetInner().(*roachpb.QueryIntentRequest) + require.Equal(t, roachpb.Key("a"), qiReq.Key) + require.Equal(t, enginepb.TxnSeq(10), qiReq.Txn.Sequence) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetQueryIntent().FoundIntent = true + return br, nil + }) + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, []roachpb.Span{{Key: roachpb.Key("a")}}, tp.footprint.asSlice()) + require.Equal(t, 3, tp.ifWrites.len()) // We've verified one out of 4 writes. + + // Now restore the savepoint and check that the in-flight write state has been restored + // and all rolled-back writes were moved to the write footprint. + bytesBeforeRollback := s.ifBytes + tp.rollbackToSavepoint(ctx, s) + + // Check that the savepoint itself was updated by the rollback. The key that + // had been verified ("a") should have been taken out of the savepoint. + expectedIfWrites := []inFlightWrite{ + {roachpb.SequencedWrite{roachpb.Key("b"), 11}}, + {roachpb.SequencedWrite{roachpb.Key("c"), 12}}, + } + { + var savepointWrites []inFlightWrite + s.ifWrites.Ascend(func(i btree.Item) bool { + savepointWrites = append(savepointWrites, *(i.(*inFlightWrite))) + return true + }) + require.Equal(t, expectedIfWrites, savepointWrites) + // Check that the size of the savepoint was decremented too. + require.Less(t, s.ifBytes, bytesBeforeRollback) + } + + // Check that the tracked inflight writes were updated correctly. + { + var ifWrites []inFlightWrite + tp.ifWrites.ascend(func(w *inFlightWrite) { + ifWrites = append(ifWrites, *w) + }) + require.Equal(t, expectedIfWrites, ifWrites) + } + + // Check that the footprint was updated correctly. In addition to the "a" which it had before, + // it will also have "d" because it's not part of the savepoint. + require.Equal(t, + []roachpb.Span{ + {Key: roachpb.Key("a")}, + {Key: roachpb.Key("d")}, + }, + tp.footprint.asSlice()) + +} diff --git a/pkg/kv/txn_interceptor_seq_num_allocator.go b/pkg/kv/txn_interceptor_seq_num_allocator.go index c21621229c6a..f4cad606c68b 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator.go @@ -175,5 +175,16 @@ func (s *txnSeqNumAllocator) epochBumpedLocked() { s.readSeq = 0 } +// createSavepoint is part of the txnReqInterceptor interface. +func (s *txnSeqNumAllocator) createSavepoint(ctx context.Context, sp *savepoint) { + sp.seqNum = s.writeSeq +} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (*txnSeqNumAllocator) rollbackToSavepoint(context.Context, *savepoint) { + // Nothing to restore. The seq nums keep increasing. The TxnCoordSender has + // added a range of sequence numbers to the ignored list. +} + // closeLocked is part of the txnInterceptor interface. func (*txnSeqNumAllocator) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_span_refresher.go b/pkg/kv/txn_interceptor_span_refresher.go index d3f43349b1c3..40e7fac5a7eb 100644 --- a/pkg/kv/txn_interceptor_span_refresher.go +++ b/pkg/kv/txn_interceptor_span_refresher.go @@ -451,5 +451,21 @@ func (sr *txnSpanRefresher) epochBumpedLocked() { sr.refreshedTimestamp.Reset() } +// createSavepoint is part of the txnReqInterceptor interface. +func (sr *txnSpanRefresher) createSavepoint(ctx context.Context, s *savepoint) { + s.refreshSpans = make([]roachpb.Span, len(sr.refreshSpans)) + copy(s.refreshSpans, sr.refreshSpans) + s.refreshInvalid = sr.refreshInvalid + s.refreshSpanBytes = sr.refreshSpansBytes +} + +// rollbackToSavepoint is part of the txnReqInterceptor interface. +func (sr *txnSpanRefresher) rollbackToSavepoint(ctx context.Context, s *savepoint) { + sr.refreshSpans = make([]roachpb.Span, len(s.refreshSpans)) + copy(sr.refreshSpans, s.refreshSpans) + sr.refreshInvalid = s.refreshInvalid + sr.refreshSpansBytes = s.refreshSpanBytes +} + // closeLocked implements the txnInterceptor interface. func (*txnSpanRefresher) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_span_refresher_test.go b/pkg/kv/txn_interceptor_span_refresher_test.go index 732d81e10efd..b00db0e8463c 100644 --- a/pkg/kv/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/txn_interceptor_span_refresher_test.go @@ -555,3 +555,68 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) } + +func TestTxnSpanRefresherSavepoint(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tsr, mockSender := makeMockTxnSpanRefresher() + + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + txn := makeTxnProto() + + read := func(key roachpb.Key) { + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + getArgs := roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: key}} + ba.Add(&getArgs) + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 1) + require.IsType(t, &roachpb.GetRequest{}, ba.Requests[0].GetInner()) + + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + } + read(keyA) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + + s := &savepoint{} + tsr.createSavepoint(ctx, s) + + // Another read after the savepoint was created. + read(keyB) + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshSpans) + + require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans) + require.Less(t, s.refreshSpanBytes, tsr.refreshSpansBytes) + require.False(t, s.refreshInvalid) + + // Rollback the savepoint and check that refresh spans were overwritten. + tsr.rollbackToSavepoint(ctx, s) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + + // Set MaxTxnRefreshSpansBytes limit low and then exceed it. + MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 1) + read(keyB) + require.True(t, tsr.refreshInvalid) + + // Check that rolling back to the savepoint resets refreshInvalid. + tsr.rollbackToSavepoint(ctx, s) + require.Equal(t, tsr.refreshSpansBytes, s.refreshSpanBytes) + require.False(t, tsr.refreshInvalid) + + // Exceed the limit again and then create a savepoint. + read(keyB) + require.True(t, tsr.refreshInvalid) + tsr.createSavepoint(ctx, s) + require.True(t, s.refreshInvalid) + require.Empty(t, s.refreshSpans) + // Rollback to the savepoint check that refreshes are still invalid. + tsr.rollbackToSavepoint(ctx, s) + require.Empty(t, tsr.refreshSpans) + require.True(t, tsr.refreshInvalid) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ebe28da17169..ff15d0542c85 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -980,6 +980,10 @@ type connExecutor struct { // stateOpen. autoRetryCounter int + // numDDL keeps track of how many DDL statements have been + // executed so far. + numDDL int + // txnRewindPos is the position within stmtBuf to which we'll rewind when // performing automatic retries. This is more or less the position where the // current transaction started. @@ -1013,6 +1017,13 @@ type connExecutor struct { // committed or aborted). It is set when txn is started but can remain // unset when txn is executed within another higher-level txn. onTxnFinish func(txnEvent) + + // savepoints maintains the stack of savepoints currently open. + savepoints savepointStack + // savepointsAtTxnRewindPos is a snapshot of the savepoints stack before + // processing the command at position txnRewindPos. When rewinding, we're + // going to restore this snapshot. + savepointsAtTxnRewindPos savepointStack } // sessionData contains the user-configurable connection variables. @@ -1191,12 +1202,16 @@ func (ex *connExecutor) resetExtraTxnState( switch ev { case txnCommit, txnRollback: + ex.extraTxnState.savepoints.clear() // After txn is finished, we need to call onTxnFinish (if it's non-nil). if ex.extraTxnState.onTxnFinish != nil { ex.extraTxnState.onTxnFinish(ev) ex.extraTxnState.onTxnFinish = nil } } + // NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a + // a ROLLBACK TO SAVEPOINT that generated the event, and that statement deals with the + // savepoints, or it's a rewind which also deals with them. return nil } @@ -1558,6 +1573,7 @@ func (ex *connExecutor) execCmd(ctx context.Context) error { } case rewind: ex.rewindPrepStmtNamespace(ctx) + ex.extraTxnState.savepoints = ex.extraTxnState.savepointsAtTxnRewindPos advInfo.rewCap.rewindAndUnlock(ctx) case stayInPlace: // Nothing to do. The same statement will be executed again. @@ -1684,6 +1700,7 @@ func (ex *connExecutor) setTxnRewindPos(ctx context.Context, pos CmdPos) { ex.extraTxnState.txnRewindPos = pos ex.stmtBuf.ltrim(ctx, pos) ex.commitPrepStmtNamespace(ctx) + ex.extraTxnState.savepointsAtTxnRewindPos = ex.extraTxnState.savepoints.clone() } // stmtDoesntNeedRetry returns true if the given statement does not need to be @@ -1700,8 +1717,6 @@ func stateToTxnStatusIndicator(s fsm.State) TransactionStatusIndicator { return InTxnBlock case stateAborted: return InFailedTxnBlock - case stateRestartWait: - return InTxnBlock case stateNoTxn: return IdleTxnBlock case stateCommitWait: @@ -1869,10 +1884,6 @@ func errIsRetriable(err error) bool { func (ex *connExecutor) makeErrEvent(err error, stmt tree.Statement) (fsm.Event, fsm.EventPayload) { retriable := errIsRetriable(err) if retriable { - if _, inOpen := ex.machine.CurState().(stateOpen); !inOpen { - panic(fmt.Sprintf("retriable error in unexpected state: %#v", - ex.machine.CurState())) - } rc, canAutoRetry := ex.getRewindTxnCapability() ev := eventRetriableErr{ IsCommit: fsm.FromBool(isCommit(stmt)), @@ -2472,22 +2483,19 @@ func (sc *StatementCounters) incrementCount(ex *connExecutor, stmt tree.Statemen case *tree.RollbackTransaction: sc.TxnRollbackCount.Inc() case *tree.Savepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Name); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Name) { sc.RestartSavepointCount.Inc() } else { sc.SavepointCount.Inc() } case *tree.ReleaseSavepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Savepoint); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Savepoint) { sc.ReleaseRestartSavepointCount.Inc() } else { sc.ReleaseSavepointCount.Inc() } case *tree.RollbackToSavepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Savepoint); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Savepoint) { sc.RollbackToRestartSavepointCount.Inc() } else { sc.RollbackToSavepointCount.Inc() diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 099728ddca77..722579efef3b 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -97,7 +97,7 @@ func (ex *connExecutor) execStmt( case eventNonRetriableErr: ex.recordFailure() } - case stateAborted, stateRestartWait: + case stateAborted: ev, payload = ex.execStmtInAbortedState(ctx, stmt, res) case stateCommitWait: ev, payload = ex.execStmtInCommitWaitState(stmt, res) @@ -250,10 +250,12 @@ func (ex *connExecutor) execStmtInOpenState( return ex.execSavepointInOpenState(ctx, s, res) case *tree.ReleaseSavepoint: - return ex.execReleaseSavepointInOpenState(ctx, s, res) + ev, payload := ex.execRelease(ctx, s, res) + return ev, payload, nil case *tree.RollbackToSavepoint: - return ex.execRollbackToSavepointInOpenState(ctx, s, res) + ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res) + return ev, payload, nil case *tree.Prepare: // This is handling the SQL statement "PREPARE". See execPrepare for @@ -579,25 +581,22 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error func (ex *connExecutor) commitSQLTransaction( ctx context.Context, stmt tree.Statement, ) (fsm.Event, fsm.EventPayload) { - ev, payload, _ := ex.commitSQLTransactionInternal(ctx, stmt) - return ev, payload + err := ex.commitSQLTransactionInternal(ctx, stmt) + if err != nil { + return ex.makeErrEvent(err, stmt) + } + return eventTxnFinish{}, eventTxnFinishPayload{commit: true} } -// commitSQLTransactionInternal is the part of a commit common to -// commitSQLTransaction and runReleaseRestartSavepointAsTxnCommit. func (ex *connExecutor) commitSQLTransactionInternal( ctx context.Context, stmt tree.Statement, -) (ev fsm.Event, payload fsm.EventPayload, ok bool) { - ex.clearSavepoints() - +) error { if err := ex.checkTableTwoVersionInvariant(ctx); err != nil { - ev, payload = ex.makeErrEvent(err, stmt) - return ev, payload, false + return err } if err := ex.state.mu.txn.Commit(ctx); err != nil { - ev, payload = ex.makeErrEvent(err, stmt) - return ev, payload, false + return err } // Now that we've committed, if we modified any table we need to make sure @@ -606,15 +605,12 @@ func (ex *connExecutor) commitSQLTransactionInternal( if tables := ex.extraTxnState.tables.getTablesWithNewVersion(); tables != nil { ex.extraTxnState.tables.releaseLeases(ctx) } - - return eventTxnFinish{}, eventTxnFinishPayload{commit: true}, true + return nil } // rollbackSQLTransaction executes a ROLLBACK statement: the KV transaction is // rolled-back and an event is produced. func (ex *connExecutor) rollbackSQLTransaction(ctx context.Context) (fsm.Event, fsm.EventPayload) { - ex.clearSavepoints() - if err := ex.state.mu.txn.Rollback(ctx); err != nil { log.Warningf(ctx, "txn rollback failed: %s", err) } @@ -749,6 +745,13 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro log.VEventf(ctx, 1, "optimizer plan failed: %v", err) return err } + + // TODO(knz): Remove this accounting if/when savepoint rollbacks + // support rolling back over DDL. + if planner.curPlan.flags.IsSet(planFlagIsDDL) { + ex.extraTxnState.numDDL++ + } + return nil } @@ -930,42 +933,46 @@ func (ex *connExecutor) execStmtInNoTxnState( func (ex *connExecutor) execStmtInAbortedState( ctx context.Context, stmt Statement, res RestrictedCommandResult, ) (fsm.Event, fsm.EventPayload) { - _, inRestartWait := ex.machine.CurState().(stateRestartWait) + + reject := func() (fsm.Event, fsm.EventPayload) { + ev := eventNonRetriableErr{IsCommit: fsm.False} + payload := eventNonRetriableErrPayload{ + err: sqlbase.NewTransactionAbortedError("" /* customMsg */), + } + return ev, payload + } // TODO(andrei/cuongdo): Figure out what statements to count here. switch s := stmt.AST.(type) { case *tree.CommitTransaction, *tree.RollbackTransaction: - if inRestartWait { - ev, payload := ex.rollbackSQLTransaction(ctx) - return ev, payload + if _, ok := s.(*tree.CommitTransaction); ok { + // Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too. + res.ResetStmtType((*tree.RollbackTransaction)(nil)) } - ex.rollbackSQLTransaction(ctx) - ex.clearSavepoints() - - // Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too. - res.ResetStmtType((*tree.RollbackTransaction)(nil)) - - return eventTxnFinish{}, eventTxnFinishPayload{commit: false} + return ex.rollbackSQLTransaction(ctx) case *tree.RollbackToSavepoint: - return ex.execRollbackToSavepointInAbortedState(ctx, inRestartWait, s, res) + return ex.execRollbackToSavepointInAbortedState(ctx, s) case *tree.Savepoint: - return ex.execSavepointInAbortedState(ctx, inRestartWait, s, res) + if ex.isCommitOnReleaseSavepoint(s.Name) { + // We allow SAVEPOINT cockroach_restart as an alternative to ROLLBACK TO + // SAVEPOINT cockroach_restart in the Aborted state. This is needed + // because any client driver (that we know of) which links subtransaction + // `ROLLBACK/RELEASE` to an object's lifetime will fail to `ROLLBACK` on a + // failed `RELEASE`. Instead, we now can use the creation of another + // subtransaction object (which will issue another `SAVEPOINT` statement) + // to indicate retry intent. Specifically, this change was prompted by + // subtransaction handling in `libpqxx` (C++ driver) and `rust-postgres` + // (Rust driver). + res.ResetStmtType((*tree.RollbackToSavepoint)(nil)) + return ex.execRollbackToSavepointInAbortedState( + ctx, &tree.RollbackToSavepoint{Savepoint: s.Name}) + } + return reject() default: - ev := eventNonRetriableErr{IsCommit: fsm.False} - if inRestartWait { - payload := eventNonRetriableErrPayload{ - err: sqlbase.NewTransactionAbortedError( - "Expected \"ROLLBACK TO SAVEPOINT cockroach_restart\"" /* customMsg */), - } - return ev, payload - } - payload := eventNonRetriableErrPayload{ - err: sqlbase.NewTransactionAbortedError("" /* customMsg */), - } - return ev, payload + return reject() } } @@ -986,7 +993,7 @@ func (ex *connExecutor) execStmtInCommitWaitState( // Reply to a rollback with the COMMIT tag, by analogy to what we do when we // get a COMMIT in state Aborted. res.ResetStmtType((*tree.CommitTransaction)(nil)) - return eventTxnFinish{}, eventTxnFinishPayload{commit: true} + return eventTxnFinish{}, eventTxnFinishPayload{commit: false} default: ev = eventNonRetriableErr{IsCommit: fsm.False} payload = eventNonRetriableErrPayload{ @@ -996,6 +1003,32 @@ func (ex *connExecutor) execStmtInCommitWaitState( } } +// execStmtInRollbackWaitState executes a statement in a txn that's in state +// RollbackWait. +// Everything but ROLLBACK/COMMIT is rejected. COMMIT is treated like ROLLBACK. +func (ex *connExecutor) execStmtInRollbackWaitState( + stmt Statement, res RestrictedCommandResult, +) (ev fsm.Event, payload fsm.EventPayload) { + ex.incrementStartedStmtCounter(stmt) + defer func() { + if !payloadHasError(payload) { + ex.incrementExecutedStmtCounter(stmt) + } + }() + switch stmt.AST.(type) { + case *tree.CommitTransaction, *tree.RollbackTransaction: + // Note that the KV transaction has been rolled back when we entered this state. + res.ResetStmtType((*tree.RollbackToSavepoint)(nil)) + return eventTxnFinish{}, eventTxnFinishPayload{commit: false} + default: + ev = eventNonRetriableErr{IsCommit: fsm.False} + payload = eventNonRetriableErrPayload{ + err: sqlbase.NewTransactionAbortedError("final RELEASE SAVEPOINT had failed"), + } + return ev, payload + } +} + // runObserverStatement executes the given observer statement. // // If an error is returned, the connection needs to stop processing queries. @@ -1005,6 +1038,8 @@ func (ex *connExecutor) runObserverStatement( switch sqlStmt := stmt.AST.(type) { case *tree.ShowTransactionStatus: return ex.runShowTransactionState(ctx, res) + case *tree.ShowSavepointStatus: + return ex.runShowSavepointState(ctx, res) case *tree.ShowSyntax: return ex.runShowSyntax(ctx, sqlStmt.Statement, res) case *tree.SetTracing: diff --git a/pkg/sql/conn_executor_savepoints.go b/pkg/sql/conn_executor_savepoints.go index bb0f5d57e630..322aba407377 100644 --- a/pkg/sql/conn_executor_savepoints.go +++ b/pkg/sql/conn_executor_savepoints.go @@ -14,191 +14,326 @@ import ( "context" "strings" + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/fsm" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // execSavepointInOpenState runs a SAVEPOINT statement inside an open // txn. func (ex *connExecutor) execSavepointInOpenState( ctx context.Context, s *tree.Savepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - // Ensure that the user isn't trying to run BEGIN; SAVEPOINT; SAVEPOINT; - if ex.state.activeRestartSavepointName != "" { - err := unimplemented.NewWithIssueDetail(10735, "nested", "SAVEPOINT may not be nested") - ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil +) (fsm.Event, fsm.EventPayload, error) { + env := &ex.extraTxnState.savepoints + // Sanity check for "SAVEPOINT cockroach_restart". + commitOnRelease := ex.isCommitOnReleaseSavepoint(s.Name) + if commitOnRelease { + /* SAVEPOINT cockroach_restart */ + if !env.isEmpty() { + err := pgerror.Newf(pgcode.Syntax, + "SAVEPOINT %s cannot be nested", + tree.ErrNameString(commitOnReleaseSavepointName)) + ev, payload := ex.makeErrEvent(err, s) + return ev, payload, nil + } + // We want to disallow restart SAVEPOINTs to be issued after a KV + // transaction has started running. The client txn's statement count + // indicates how many statements have been executed as part of this + // transaction. It is desirable to allow metadata queries against + // vtables to proceed before starting a SAVEPOINT for better ORM + // compatibility. + // + // See also: + // https://github.com/cockroachdb/cockroach/issues/15012 + if ex.state.mu.txn.Active() { + err := pgerror.Newf(pgcode.Syntax, + "SAVEPOINT %s needs to be the first statement in a transaction", + tree.ErrNameString(commitOnReleaseSavepointName)) + ev, payload := ex.makeErrEvent(err, s) + return ev, payload, nil + } } - if err := ex.validateSavepointName(s.Name); err != nil { + + token, err := ex.state.mu.txn.CreateSavepoint(ctx) + if err != nil { ev, payload := ex.makeErrEvent(err, s) return ev, payload, nil } - // We want to disallow SAVEPOINTs to be issued after a KV transaction has - // started running. The client txn's statement count indicates how many - // statements have been executed as part of this transaction. It is - // desirable to allow metadata queries against vtables to proceed - // before starting a SAVEPOINT for better ORM compatibility. - // See also: - // https://github.com/cockroachdb/cockroach/issues/15012 - if ex.state.mu.txn.Active() { - err := pgerror.Newf(pgcode.Syntax, - "SAVEPOINT %s needs to be the first statement in a transaction", restartSavepointName) - ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + entry := savepoint{ + name: s.Name, + initial: !ex.state.mu.txn.Active(), + commitOnRelease: commitOnRelease, + kvToken: token, + numDDL: ex.extraTxnState.numDDL, } - ex.state.activeRestartSavepointName = s.Name - // Note that Savepoint doesn't have a corresponding plan node. - // This here is all the execution there is. + if !entry.initial { + entry.txnID = ex.state.mu.txn.ID() + entry.epoch = ex.state.mu.txn.Epoch() + } + env.push(entry) + return nil, nil, nil } -// execReleaseSavepointInOpenState runs a RELEASE SAVEPOINT statement -// inside an open txn. -func (ex *connExecutor) execReleaseSavepointInOpenState( +// execRelease runs a RELEASE SAVEPOINT statement inside an open txn. +func (ex *connExecutor) execRelease( ctx context.Context, s *tree.ReleaseSavepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - if err := ex.validateSavepointName(s.Savepoint); err != nil { +) (fsm.Event, fsm.EventPayload) { + env := &ex.extraTxnState.savepoints + entry, idx := env.find(s.Savepoint) + if entry == nil { + ev, payload := ex.makeErrEvent( + pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint %s does not exist", &s.Savepoint), s) + return ev, payload + } + + // Discard our savepoint and all further ones. Depending on what happens with + // the release below, we might add this savepoint back. + env.popToIdx(idx - 1) + + if entry.commitOnRelease { + res.ResetStmtType((*tree.CommitTransaction)(nil)) + err := ex.commitSQLTransactionInternal(ctx, s) + if err == nil { + return eventTxnReleased{}, nil + } + // Committing the transaction failed. We'll go to state RestartWait if + // it's a retriable error, or to state RollbackWait otherwise. + if errIsRetriable(err) { + // Add the savepoint back. We want to allow a ROLLBACK TO SAVEPOINT + // cockroach_restart (that's the whole point of commitOnRelease). + env.push(*entry) + + rc, canAutoRetry := ex.getRewindTxnCapability() + ev := eventRetriableErr{ + IsCommit: fsm.FromBool(isCommit(s)), + CanAutoRetry: fsm.FromBool(canAutoRetry), + } + payload := eventRetriableErrPayload{err: err, rewCap: rc} + return ev, payload + } + + // Non-retriable error. The transaction might have committed (i.e. the + // error might be ambiguous). We can't allow a ROLLBACK TO SAVEPOINT to + // recover the transaction, so we're not adding the savepoint back. + ex.rollbackSQLTransaction(ctx) + ev := eventNonRetriableErr{IsCommit: fsm.FromBool(false)} + payload := eventNonRetriableErrPayload{err: err} + return ev, payload + } + + if err := ex.state.mu.txn.ReleaseSavepoint(ctx, entry.kvToken); err != nil { ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + return ev, payload } - // ReleaseSavepoint is executed fully here; there's no plan for it. - ev, payload := ex.runReleaseRestartSavepointAsTxnCommit(ctx, s) - res.ResetStmtType((*tree.CommitTransaction)(nil)) - return ev, payload, nil + + return nil, nil } // execRollbackToSavepointInOpenState runs a ROLLBACK TO SAVEPOINT // statement inside an open txn. func (ex *connExecutor) execRollbackToSavepointInOpenState( ctx context.Context, s *tree.RollbackToSavepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - if err := ex.validateSavepointName(s.Savepoint); err != nil { +) (fsm.Event, fsm.EventPayload) { + entry, idx := ex.extraTxnState.savepoints.find(s.Savepoint) + if entry == nil { + ev, payload := ex.makeErrEvent(pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint %s does not exist", &s.Savepoint), s) + return ev, payload + } + + // We don't yet support rolling back over DDL. Instead of creating an + // inconsistent txn or schema state, prefer to tell the users we don't know + // how to proceed yet. Initial savepoints are a special case - we can always + // rollback to them because we can reset all the schema change state. + if !entry.initial && ex.extraTxnState.numDDL > entry.numDDL { + ev, payload := ex.makeErrEvent(unimplemented.NewWithIssueDetail(10735, "rollback-after-ddl", + "ROLLBACK TO SAVEPOINT not yet supported after DDL statements"), s) + return ev, payload + } + + if err := ex.state.mu.txn.RollbackToSavepoint(ctx, entry.kvToken); err != nil { ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + return ev, payload } - ex.state.activeRestartSavepointName = "" - res.ResetStmtType((*tree.Savepoint)(nil)) - return eventTxnRestart{}, nil /* payload */, nil + ex.extraTxnState.savepoints.popToIdx(idx) + + if entry.initial { + return eventTxnRestart{}, nil + } + // No event is necessary; there's nothing for the state machine to do. + return nil, nil } -// execSavepointInAbortedState runs a SAVEPOINT statement when a txn is aborted. -// It also contains the logic for ROLLBACK TO SAVEPOINT. -// TODO(knz): split this in different functions. -func (ex *connExecutor) execSavepointInAbortedState( - ctx context.Context, inRestartWait bool, s tree.Statement, res RestrictedCommandResult, +func (ex *connExecutor) execRollbackToSavepointInAbortedState( + ctx context.Context, s *tree.RollbackToSavepoint, ) (fsm.Event, fsm.EventPayload) { - // We accept both the "ROLLBACK TO SAVEPOINT cockroach_restart" and the - // "SAVEPOINT cockroach_restart" commands to indicate client intent to - // retry a transaction in a RestartWait state. - var spName tree.Name - var isRollback bool - switch n := s.(type) { - case *tree.RollbackToSavepoint: - spName = n.Savepoint - isRollback = true - case *tree.Savepoint: - spName = n.Name - default: - panic("unreachable") - } - // If the user issued a SAVEPOINT in the abort state, validate - // as though there were no active savepoint. - if !isRollback { - ex.state.activeRestartSavepointName = "" - } - if err := ex.validateSavepointName(spName); err != nil { + makeErr := func(err error) (fsm.Event, fsm.EventPayload) { ev := eventNonRetriableErr{IsCommit: fsm.False} payload := eventNonRetriableErrPayload{ err: err, } return ev, payload } - // Either clear or reset the current savepoint name so that - // ROLLBACK TO; SAVEPOINT; works. - if isRollback { - ex.state.activeRestartSavepointName = "" - } else { - ex.state.activeRestartSavepointName = spName + + entry, idx := ex.extraTxnState.savepoints.find(s.Savepoint) + if entry == nil { + return makeErr(pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint %s does not exist", tree.ErrString(&s.Savepoint))) } - res.ResetStmtType((*tree.RollbackTransaction)(nil)) + // We can always rollback to initial savepoints, but for non-initial ones we + // need to check that the underlying KV txn is still copacetic. + if !entry.initial { + curID, curEpoch := ex.state.mu.txn.ID(), ex.state.mu.txn.Epoch() + if !curID.Equal(entry.txnID) { + return ex.makeErrEvent(roachpb.NewTransactionRetryWithProtoRefreshError( + "cannot rollback to savepoint because the transaction has been aborted", + curID, + // The transaction inside this error doesn't matter. + roachpb.Transaction{}, + ), s) + } + if curEpoch != entry.epoch { + return ex.makeErrEvent(roachpb.NewTransactionRetryWithProtoRefreshError( + "cannot rollback to savepoint because the transaction experience a serializable restart", + curID, + // The transaction inside this error doesn't matter. + roachpb.Transaction{}, + ), s) + } + } - if inRestartWait { + ex.extraTxnState.savepoints.popToIdx(idx) + if err := ex.state.mu.txn.RollbackToSavepoint(ctx, entry.kvToken); err != nil { + return makeErr(err) + } + if entry.initial { return eventTxnRestart{}, nil } - // We accept ROLLBACK TO SAVEPOINT even after non-retryable errors to make - // it easy for client libraries that want to indiscriminately issue - // ROLLBACK TO SAVEPOINT after every error and possibly follow it with a - // ROLLBACK and also because we accept ROLLBACK TO SAVEPOINT in the Open - // state, so this is consistent. - // We start a new txn with the same sql timestamp and isolation as the - // current one. - - ev := eventTxnStart{ - ImplicitTxn: fsm.False, - } - rwMode := tree.ReadWrite - if ex.state.readOnly { - rwMode = tree.ReadOnly - } - ex.rollbackSQLTransaction(ctx) - payload := makeEventTxnStartPayload( - ex.state.priority, rwMode, ex.state.sqlTimestamp, - nil /* historicalTimestamp */, ex.transitionCtx) - return ev, payload + return eventSavepointRollback{}, nil } -// execRollbackToSavepointInAbortedState runs a ROLLBACK TO SAVEPOINT -// statement when a txn is aborted. -func (ex *connExecutor) execRollbackToSavepointInAbortedState( - ctx context.Context, inRestartWait bool, s tree.Statement, res RestrictedCommandResult, -) (fsm.Event, fsm.EventPayload) { - return ex.execSavepointInAbortedState(ctx, inRestartWait, s, res) +// isCommitOnReleaseSavepoint returns true if the savepoint name implies special +// release semantics: releasing it commits the underlying KV txn. +func (ex *connExecutor) isCommitOnReleaseSavepoint(savepoint tree.Name) bool { + if ex.sessionData.ForceSavepointRestart { + // The session setting force_savepoint_restart implies that all + // uses of the SAVEPOINT statement are targeting restarts. + return true + } + return strings.HasPrefix(string(savepoint), commitOnReleaseSavepointName) } -// runReleaseRestartSavepointAsTxnCommit executes a commit after -// RELEASE SAVEPOINT statement when using an explicit transaction. -func (ex *connExecutor) runReleaseRestartSavepointAsTxnCommit( - ctx context.Context, stmt tree.Statement, -) (fsm.Event, fsm.EventPayload) { - if ev, payload, ok := ex.commitSQLTransactionInternal(ctx, stmt); !ok { - return ev, payload - } - return eventTxnReleased{}, nil +type savepoint struct { + name tree.Name + + // initial is set if this savepoint has been created before performing any KV + // operations. If so, a ROLLBACK to it will work after a retriable error. If + // not set, then rolling back to it after a retriable error will return the + // retriable error again because reads might have been evaluated before the + // savepoint and such reads cannot have their timestamp forwarded without a + // refresh. + initial bool + + // commitOnRelease is set if the special syntax "SAVEPOINT cockroach_restart" + // was used. Such a savepoint is special in that a RELEASE actually commits + // the transaction - giving the client a change to find out about any + // retriable error and issue another "ROLLBACK TO SAVEPOINT cockroach_restart" + // afterwards. Regular savepoints (even top-level savepoints) cannot commit + // the transaction on RELEASE. + // + // Only an `initial` savepoint can have this set. + commitOnRelease bool + + kvToken client.SavepointToken + + // In the initial implementation, we refuse to roll back + // a savepoint if there was DDL performed "under it". + // TODO(knz): support partial DDL cancellation in pending txns. + numDDL int + + // txnID and epoch describe the transaction iteration that the savepoint is + // bound to. "initial" savepoints are not bound to a particular transaction as + // rolling back to an initial savepoint resets all the transaction state. + // Other savepoints are bound to a transaction iteration, and rolling back to + // them is not permitted if the transaction ID or epoch has changed in the + // meantime: + // - if the txnID has changed, then the previous transaction was aborted. We + // can't rollback to a savepoint since all the transaction's writes are gone. + // - if the epoch has changed, then we can only rollback if we performed a + // refresh for the reads that are not rolled back. We currently don't do this. + txnID uuid.UUID + epoch enginepb.TxnEpoch } -// validateSavepointName validates that it is that the provided ident -// matches the active savepoint name, begins with RestartSavepointName, -// or that force_savepoint_restart==true. We accept everything with the -// desired prefix because at least the C++ libpqxx appends sequence -// numbers to the savepoint name specified by the user. -func (ex *connExecutor) validateSavepointName(savepoint tree.Name) error { - if ex.state.activeRestartSavepointName != "" { - if savepoint == ex.state.activeRestartSavepointName { - return nil +type savepointStack []savepoint + +func (stack savepointStack) isEmpty() bool { return len(stack) == 0 } + +func (stack *savepointStack) clear() { *stack = (*stack)[:0] } + +func (stack *savepointStack) push(s savepoint) { + *stack = append(*stack, s) +} + +// find finds the most recent savepoint with the given name. +// +// The returned savepoint can be modified (rolling back modifies the kvToken). +// Callers shouldn't maintain references to the returned savepoint, as +// references can be invalidated by further operations on the savepoints. +func (stack savepointStack) find(sn tree.Name) (*savepoint, int) { + for i := len(stack) - 1; i >= 0; i-- { + if stack[i].name == sn { + return &stack[i], i } - return pgerror.Newf(pgcode.InvalidSavepointSpecification, - `SAVEPOINT %q is in use`, tree.ErrString(&ex.state.activeRestartSavepointName)) - } - if !ex.sessionData.ForceSavepointRestart && !strings.HasPrefix(string(savepoint), restartSavepointName) { - return unimplemented.NewWithIssueHint(10735, - "SAVEPOINT not supported except for "+restartSavepointName, - "Retryable transactions with arbitrary SAVEPOINT names can be enabled "+ - "with SET force_savepoint_restart=true") } - return nil + return nil, -1 } -// clearSavepoints clears all savepoints defined so far. This -// occurs when the SQL txn is closed (abort/commit) and upon -// a top-level restart. -func (ex *connExecutor) clearSavepoints() { - ex.state.activeRestartSavepointName = "" +// popToIdx pops (discards) all the savepoints at higher indexes. +func (stack *savepointStack) popToIdx(idx int) { + *stack = (*stack)[:idx+1] +} + +func (stack savepointStack) clone() savepointStack { + cpy := make(savepointStack, len(stack)) + copy(cpy, stack) + return cpy +} + +// runShowSavepointState executes a SHOW SAVEPOINT STATUS statement. +// +// If an error is returned, the connection needs to stop processing queries. +func (ex *connExecutor) runShowSavepointState( + ctx context.Context, res RestrictedCommandResult, +) error { + res.SetColumns(ctx, sqlbase.ResultColumns{ + {Name: "savepoint_name", Typ: types.String}, + {Name: "is_initial_savepoint", Typ: types.Bool}, + }) + + for _, entry := range ex.extraTxnState.savepoints { + if err := res.AddRow(ctx, tree.Datums{ + tree.NewDString(string(entry.name)), + tree.MakeDBool(tree.DBool(entry.initial)), + }); err != nil { + return err + } + } + return nil } -// restartSavepointName is the only savepoint ident that we accept. -const restartSavepointName string = "cockroach_restart" +// commitOnReleaseSavepointName is the name of the savepoint with special +// release semantics. +const commitOnReleaseSavepointName = "cockroach_restart" diff --git a/pkg/sql/conn_executor_savepoints_test.go b/pkg/sql/conn_executor_savepoints_test.go new file mode 100644 index 000000000000..dedc96238c0e --- /dev/null +++ b/pkg/sql/conn_executor_savepoints_test.go @@ -0,0 +1,194 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql_test + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" +) + +func TestSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + datadriven.Walk(t, "testdata/savepoints", func(t *testing.T, path string) { + + params := base.TestServerArgs{} + s, sqlConn, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + if _, err := sqlConn.Exec("CREATE TABLE progress(n INT, marker BOOL)"); err != nil { + t.Fatal(err) + } + + datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "sql": + // Implicitly abort any previously-ongoing txn. + _, _ = sqlConn.Exec("ABORT") + // Prepare for the next test. + if _, err := sqlConn.Exec("DELETE FROM progress"); err != nil { + td.Fatalf(t, "cleaning up: %v", err) + } + + // Prepare a buffer to accumulate the results. + var buf strings.Builder + + // We're going to execute the input line-by-line. + stmts := strings.Split(td.Input, "\n") + + // progressBar is going to show the cancellation of writes + // during rollbacks. + progressBar := make([]byte, len(stmts)) + erase := func(status string) { + char := byte('.') + if !isOpenTxn(status) { + char = 'X' + } + for i := range progressBar { + progressBar[i] = char + } + } + + // stepNum is the index of the current statement + // in the input. + var stepNum int + + // updateProgress loads the current set of writes + // into the progress bar. + updateProgress := func() { + rows, err := sqlConn.Query("SELECT n FROM progress") + if err != nil { + t.Logf("%d: reading progress: %v", stepNum, err) + // It's OK if we can't read this. + return + } + defer rows.Close() + for rows.Next() { + var n int + if err := rows.Scan(&n); err != nil { + td.Fatalf(t, "%d: unexpected error while reading progress: %v", stepNum, err) + } + if n < 1 || n > len(progressBar) { + td.Fatalf(t, "%d: unexpected stepnum in progress table: %d", stepNum, n) + } + progressBar[n-1] = '#' + } + } + + // getTxnStatus retrieves the current txn state. + // This is guaranteed to always succeed because SHOW TRANSACTION STATUS + // is an observer statement. + getTxnStatus := func() string { + row := sqlConn.QueryRow("SHOW TRANSACTION STATUS") + var status string + if err := row.Scan(&status); err != nil { + td.Fatalf(t, "%d: unable to retrieve txn status: %v", stepNum, err) + } + return status + } + // showSavepointStatus is like getTxnStatus but retrieves the + // savepoint stack. + showSavepointStatus := func() { + rows, err := sqlConn.Query("SHOW SAVEPOINT STATUS") + if err != nil { + td.Fatalf(t, "%d: unable to retrieve savepoint status: %v", stepNum, err) + } + defer rows.Close() + + comma := "" + hasSavepoints := false + for rows.Next() { + var name string + var isRestart bool + if err := rows.Scan(&name, &isRestart); err != nil { + td.Fatalf(t, "%d: unexpected error while reading savepoints: %v", stepNum, err) + } + if isRestart { + name += "(r)" + } + buf.WriteString(comma) + buf.WriteString(name) + hasSavepoints = true + comma = ">" + } + if !hasSavepoints { + buf.WriteString("(none)") + } + } + // report shows the progress of execution so far after + // each statement executed. + report := func(beforeStatus, afterStatus string) { + erase(afterStatus) + if isOpenTxn(afterStatus) { + updateProgress() + } + fmt.Fprintf(&buf, "-- %-11s -> %-11s %s ", beforeStatus, afterStatus, string(progressBar)) + buf.WriteByte(' ') + showSavepointStatus() + buf.WriteByte('\n') + } + + // The actual execution of the statements starts here. + + beforeStatus := getTxnStatus() + for i, stmt := range stmts { + stepNum = i + 1 + // Before each statement, mark the progress so far with + // a KV write. + if isOpenTxn(beforeStatus) { + _, err := sqlConn.Exec("INSERT INTO progress(n, marker) VALUES ($1, true)", stepNum) + if err != nil { + td.Fatalf(t, "%d: before-stmt: %v", stepNum, err) + } + } + + // Run the statement and report errors/results. + fmt.Fprintf(&buf, "%d: %s -- ", stepNum, stmt) + execRes, err := sqlConn.Exec(stmt) + if err != nil { + fmt.Fprintf(&buf, "%v\n", err) + } else { + nRows, err := execRes.RowsAffected() + if err != nil { + fmt.Fprintf(&buf, "error retrieving rows: %v\n", err) + } else { + fmt.Fprintf(&buf, "%d row%s\n", nRows, util.Pluralize(nRows)) + } + } + + // Report progress on the next line + afterStatus := getTxnStatus() + report(beforeStatus, afterStatus) + beforeStatus = afterStatus + } + + return buf.String() + + default: + td.Fatalf(t, "unknown directive: %s", td.Cmd) + } + return "" + }) + }) +} + +func isOpenTxn(status string) bool { + return status == "Open" || status == "NoTxn" +} diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 1f03e46b1cfb..4cbe1ce5296b 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -119,7 +119,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); // the kv-level transaction has already been committed). But we still // exercise this state to check that the server doesn't crash (which used to // happen - #9879). - tests := []string{"Open", "RestartWait", "CommitWait"} + tests := []string{"Open", "Aborted", "CommitWait"} for _, state := range tests { t.Run(state, func(t *testing.T) { // Create a low-level lib/pq connection so we can close it at will. @@ -151,7 +151,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); t.Fatal(err) } - if state == "RestartWait" || state == "CommitWait" { + if state == "CommitWait" { if _, err := tx.ExecContext(ctx, "SAVEPOINT cockroach_restart", nil); err != nil { t.Fatal(err) } @@ -173,7 +173,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); t.Fatal(err) } - if state == "RestartWait" || state == "CommitWait" { + if state == "CommitWait" { _, err := tx.ExecContext(ctx, "RELEASE SAVEPOINT cockroach_restart", nil) if state == "CommitWait" { if err != nil { diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index d997ee1c1061..da1988a52ec7 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -32,7 +32,6 @@ const ( OpenStateStr = "Open" AbortedStateStr = "Aborted" CommitWaitStateStr = "CommitWait" - RestartWaitStateStr = "RestartWait" InternalErrorStateStr = "InternalError" ) @@ -56,6 +55,8 @@ func (stateOpen) String() string { return OpenStateStr } +// stateAborted is entered on retriable errors. A ROLLBACK TO SAVEPOINT can +// move the transaction back to stateOpen. type stateAborted struct{} var _ fsm.State = &stateAborted{} @@ -64,14 +65,6 @@ func (stateAborted) String() string { return AbortedStateStr } -type stateRestartWait struct{} - -var _ fsm.State = &stateRestartWait{} - -func (stateRestartWait) String() string { - return RestartWaitStateStr -} - type stateCommitWait struct{} var _ fsm.State = &stateCommitWait{} @@ -93,10 +86,10 @@ func (stateInternalError) String() string { return InternalErrorStateStr } -func (stateNoTxn) State() {} -func (stateOpen) State() {} -func (stateAborted) State() {} -func (stateRestartWait) State() {} +func (stateNoTxn) State() {} +func (stateOpen) State() {} +func (stateAborted) State() {} + func (stateCommitWait) State() {} func (stateInternalError) State() {} @@ -117,6 +110,11 @@ type eventTxnStartPayload struct { historicalTimestamp *hlc.Timestamp } +// makeEventTxnStartPayload creates an eventTxnStartPayload. +// +// Pass noRolledBackSavepoint for rolledBackSavepoint when the transaction is +// not started as a result of rolling back to a savepoint (i.e. the normal +// case). func makeEventTxnStartPayload( pri roachpb.UserPriority, readOnly tree.ReadWriteMode, @@ -141,7 +139,11 @@ type eventTxnFinishPayload struct { commit bool } -type eventTxnRestart struct{} +// eventSavepointRollback is generated when we want to move from Aborted to Open +// through a ROLLBACK TO SAVEPOINT . Note that it is not +// generated when such a savepoint is rolled back to from the Open state. In +// that case no event is necessary. +type eventSavepointRollback struct{} type eventNonRetriableErr struct { IsCommit fsm.Bool @@ -183,21 +185,30 @@ func (p eventRetriableErrPayload) errorCause() error { // eventRetriableErrPayload implements payloadWithError. var _ payloadWithError = eventRetriableErrPayload{} -// eventTxnReleased is generated after a successful RELEASE SAVEPOINT -// cockroach_restart. It moves the state to CommitWait. +// eventTxnRestart is generated by a rollback to a savepoint placed at the +// beginning of the transaction (commonly SAVEPOINT cockroach_restart). +type eventTxnRestart struct{} + +// eventTxnReleased is generated after a successful +// RELEASE SAVEPOINT cockroach_restart. It moves the state to CommitWait. type eventTxnReleased struct{} +// eventTxnReleaseFailed represents a failed RELEASE SAVEPOINT cockroach_restart. +type eventTxnReleaseFailed struct{} + // payloadWithError is a common interface for the payloads that wrap an error. type payloadWithError interface { errorCause() error } -func (eventTxnStart) Event() {} -func (eventTxnFinish) Event() {} -func (eventTxnRestart) Event() {} -func (eventNonRetriableErr) Event() {} -func (eventRetriableErr) Event() {} -func (eventTxnReleased) Event() {} +func (eventTxnStart) Event() {} +func (eventTxnFinish) Event() {} +func (eventTxnRestart) Event() {} +func (eventSavepointRollback) Event() {} +func (eventNonRetriableErr) Event() {} +func (eventRetriableErr) Event() {} +func (eventTxnReleased) Event() {} +func (eventTxnReleaseFailed) Event() {} // TxnStateTransitions describe the transitions used by a connExecutor's // fsm.Machine. Args.Extended is a txnState, which is muted by the Actions. @@ -300,12 +311,21 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) ts.setAdvanceInfo(skipBatch, noRewind, noEvent) - ts.txnAbortCount.Inc(1) + return nil + }, + }, + // ROLLBACK TO SAVEPOINT cockroach. There's not much to do other than generating a + // txnRestart output event. + eventTxnRestart{}: { + Description: "ROLLBACK TO SAVEPOINT cockroach_restart", + Next: stateOpen{ImplicitTxn: fsm.False}, + Action: func(args fsm.Args) error { + args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart) return nil }, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { - Next: stateRestartWait{}, + Next: stateAborted{}, Action: func(args fsm.Args) error { // Note: Preparing the KV txn for restart has already happened by this // point. @@ -321,38 +341,22 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - // ROLLBACK TO SAVEPOINT - eventTxnRestart{}: { - Description: "ROLLBACK TO SAVEPOINT cockroach_restart", - Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - state := args.Extended.(*txnState) - // NOTE: We don't bump the txn timestamp on this restart. Should we? - // Well, if we generally supported savepoints and one would issue a - // rollback to a regular savepoint, clearly we couldn't bump the - // timestamp in that case. In the special case of the cockroach_restart - // savepoint, it's not clear to me what a user's expectation might be. - state.mu.txn.ManualRestart(args.Ctx, hlc.Timestamp{}) - args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart) - return nil - }, - }, }, /// Aborted // // Note that we don't handle any error events here. Any statement but a - // ROLLBACK is expected to not be passed to the state machine. + // ROLLBACK (TO SAVEPOINT) is expected to not be passed to the state machine. stateAborted{}: { eventTxnFinish{}: { Description: "ROLLBACK", Next: stateNoTxn{}, Action: func(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.txnAbortCount.Inc(1) // Note that the KV txn has been rolled back by now by statement // execution. - return args.Extended.(*txnState).finishTxn( - args.Payload.(eventTxnFinishPayload), - ) + return ts.finishTxn(args.Payload.(eventTxnFinishPayload)) }, }, eventNonRetriableErr{IsCommit: fsm.Any}: { @@ -364,51 +368,26 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - // ROLLBACK TO SAVEPOINT. We accept this in the Aborted state for the - // convenience of clients who want to issue ROLLBACK TO SAVEPOINT regardless - // of the preceding query error. - eventTxnStart{ImplicitTxn: fsm.False}: { - Description: "ROLLBACK TO SAVEPOINT cockroach_restart", + // ROLLBACK TO SAVEPOINT success. + eventSavepointRollback{}: { + Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success", Next: stateOpen{ImplicitTxn: fsm.False}, Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - ts.finishSQLTxn() - - payload := args.Payload.(eventTxnStartPayload) - - // Note that we pass the connection's context here, not args.Ctx which - // was the previous txn's context. - ts.resetForNewSQLTxn( - ts.connCtx, - explicitTxn, - payload.txnSQLTimestamp, - payload.historicalTimestamp, - payload.pri, payload.readOnly, - nil, /* txn */ - args.Payload.(eventTxnStartPayload).tranCtx, - ) - ts.setAdvanceInfo(advanceOne, noRewind, txnRestart) + args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, noEvent) return nil }, }, - }, - - stateRestartWait{}: { - // ROLLBACK (and also COMMIT which acts like ROLLBACK) - eventTxnFinish{}: { - Description: "ROLLBACK", - Next: stateNoTxn{}, + // ROLLBACK TO SAVEPOINT failed because the txn needs to restart. + eventRetriableErr{CanAutoRetry: fsm.Any, IsCommit: fsm.Any}: { + // This event doesn't change state, but it returns a skipBatch code. + Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart", + Next: stateAborted{}, Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - // The client is likely rolling back because it can't do retries. Let's - // count it as an abort. - ts.txnAbortCount.Inc(1) - return args.Extended.(*txnState).finishTxn( - args.Payload.(eventTxnFinishPayload), - ) + args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) + return nil }, }, - // ROLLBACK TO SAVEPOINT + // ROLLBACK TO SAVEPOINT cockroach_restart. eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, @@ -417,15 +396,6 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - eventNonRetriableErr{IsCommit: fsm.Any}: { - Next: stateAborted{}, - Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - ts.setAdvanceInfo(skipBatch, noRewind, noEvent) - ts.txnAbortCount.Inc(1) - return nil - }, - }, }, stateCommitWait{}: { diff --git a/pkg/sql/logictest/testdata/logic_test/manual_retry b/pkg/sql/logictest/testdata/logic_test/manual_retry index e9a9f70b2a2b..0e4a75b22302 100644 --- a/pkg/sql/logictest/testdata/logic_test/manual_retry +++ b/pkg/sql/logictest/testdata/logic_test/manual_retry @@ -51,24 +51,30 @@ SELECT currval('s') statement ok COMMIT -subtest savepoint_name - -statement ok -BEGIN - -# Ensure that ident case rules are used. -statement error pq: unimplemented: SAVEPOINT not supported except for cockroach_restart -SAVEPOINT "COCKROACH_RESTART" - -statement ok -ROLLBACK; BEGIN - -# Ensure that ident case rules are used. -statement ok -SAVEPOINT COCKROACH_RESTART - -statement ok -ROLLBACK +# subtest savepoint_name +# +# statement ok +# BEGIN +# +# !!! rewrite this test somehow to assert the release behavior, not the initial status +# # Ensure that ident case rules are used. +# statement ok +# SAVEPOINT "COCKROACH_RESTART" +# +# query TB +# SHOW SAVEPOINT STATUS +# ---- +# COCKROACH_RESTART false +# +# statement ok +# ROLLBACK; BEGIN +# +# # Ensure that ident case rules are used. +# statement ok +# SAVEPOINT COCKROACH_RESTART +# +# statement ok +# ROLLBACK subtest schema_change_with_rollback @@ -132,34 +138,50 @@ BEGIN TRANSACTION; SAVEPOINT something_else; COMMIT statement ok BEGIN TRANSACTION; SAVEPOINT foo -statement error pq: SAVEPOINT "foo" is in use +statement error pq: savepoint bar does not exist ROLLBACK TO SAVEPOINT bar # Verify we're doing the right thing for non-quoted idents. statement ok ROLLBACK TO SAVEPOINT FOO +statement ok +ABORT; BEGIN TRANSACTION + # Verify use of quoted idents. statement ok SAVEPOINT "Foo Bar" -statement error pq: SAVEPOINT "Foo Bar" is in use +statement error pq: savepoint foobar does not exist ROLLBACK TO SAVEPOINT FooBar # Verify case-sensitivity of quoted idents. -statement error pq: SAVEPOINT "Foo Bar" is in use +statement error pq: savepoint foo bar does not exist ROLLBACK TO SAVEPOINT "foo bar" statement ok ROLLBACK TO SAVEPOINT "Foo Bar" +query TB +SHOW SAVEPOINT STATUS +---- +Foo Bar true + +statement ok +ABORT; BEGIN TRANSACTION + # Verify case-sensitivity of quoted vs. unquoted idents. statement ok SAVEPOINT "UpperCase" -statement error pq: SAVEPOINT "UpperCase" is in use +statement error pq: savepoint uppercase does not exist ROLLBACK TO SAVEPOINT UpperCase +query TB +SHOW SAVEPOINT STATUS +---- +UpperCase true + statement ok ABORT diff --git a/pkg/sql/logictest/testdata/logic_test/txn b/pkg/sql/logictest/testdata/logic_test/txn index 89447d8266c0..ddac35839892 100644 --- a/pkg/sql/logictest/testdata/logic_test/txn +++ b/pkg/sql/logictest/testdata/logic_test/txn @@ -549,7 +549,7 @@ CommitWait statement ok COMMIT -# RestartWait state +# RestartWait state !!! adapt the comment; there's no longer a RestartWait. # The SELECT 1 is necessary to move the txn out of the AutoRetry state, # otherwise the next statement is automatically retried on the server. statement ok @@ -561,7 +561,7 @@ SELECT crdb_internal.force_retry('1h':::INTERVAL) query T SHOW TRANSACTION STATUS ---- -RestartWait +Aborted statement ok ROLLBACK TO SAVEPOINT cockroach_restart @@ -695,146 +695,6 @@ ROLLBACK; DROP SEQUENCE s -# Wrong savepoint name moves the txn state from RestartWait to Aborted. -statement ok -BEGIN TRANSACTION; - SAVEPOINT cockroach_restart; - SELECT 1 - -query error pgcode 40001 restart transaction: crdb_internal.force_retry\(\): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry\(\) -SELECT crdb_internal.force_retry('1h':::INTERVAL) - -query T -SHOW TRANSACTION STATUS ----- -RestartWait - -statement error pq: SAVEPOINT "cockroach_restart" is in use -ROLLBACK TO SAVEPOINT bogus_name - -query T -SHOW TRANSACTION STATUS ----- -Aborted - -statement ok -ROLLBACK - -# General savepoints -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -SAVEPOINT other - -statement ok -ROLLBACK - -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -RELEASE SAVEPOINT other - -statement ok -ROLLBACK - -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -ROLLBACK TO SAVEPOINT other - -statement ok -ROLLBACK - -# Savepoint must be first statement in a transaction. -statement ok -BEGIN TRANSACTION; UPSERT INTO kv VALUES('savepoint', 'true') - -statement error SAVEPOINT cockroach_restart needs to be the first statement in a transaction -SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - -# Can rollback to a savepoint if no statements have been executed. -statement ok -BEGIN TRANSACTION; SAVEPOINT cockroach_restart - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -# Can do it twice in a row. -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -# Can rollback after a transactional write, even from a non-error state. -statement ok -UPSERT INTO kv VALUES('savepoint', 'true') - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -COMMIT - -# Because we rolled back, the 'savepoint' insert will not have been committed. -query I -SELECT count(*) FROM kv WHERE k = 'savepoint' ----- -0 - - -# Can ROLLBACK TO SAVEPOINT even from a non-retryable error. -statement ok -BEGIN TRANSACTION; SAVEPOINT cockroach_restart - -statement error pq: relation "bogus_name" does not exist -SELECT * from bogus_name - -query T -SHOW TRANSACTION STATUS ----- -Aborted - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -query T -SHOW TRANSACTION STATUS ----- -Open - -statement ok -ROLLBACK - - -# ROLLBACK TO SAVEPOINT in a txn without a SAVEPOINT. -statement ok -BEGIN - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - - -# ROLLBACK TO SAVEPOINT in an aborted txn without a SAVEPOINT. -statement ok -BEGIN - -statement error pq: relation "bogus_name" does not exist -SELECT * from bogus_name - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - - # Test READ ONLY/WRITE syntax. statement ok @@ -1026,14 +886,6 @@ SELECT crdb_internal.force_retry('1h':::INTERVAL) statement ok ROLLBACK -# Check that we don't crash when doing a release that wasn't preceded by a -# savepoint. -statement ok -BEGIN; RELEASE SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - # restore the default statement ok SET default_transaction_read_only = false diff --git a/pkg/sql/metric_test.go b/pkg/sql/metric_test.go index 0a15e0ec370a..73322f2acd4e 100644 --- a/pkg/sql/metric_test.go +++ b/pkg/sql/metric_test.go @@ -252,9 +252,6 @@ func TestAbortCountErrorDuringTransaction(t *testing.T) { t.Fatal("Expected an error but didn't get one") } - if _, err := checkCounterDelta(s, sql.MetaTxnAbort, accum.txnAbortCount, 1); err != nil { - t.Error(err) - } if _, err := checkCounterDelta(s, sql.MetaTxnBeginStarted, accum.txnBeginCount, 1); err != nil { t.Error(err) } @@ -265,6 +262,10 @@ func TestAbortCountErrorDuringTransaction(t *testing.T) { if err := txn.Rollback(); err != nil { t.Fatal(err) } + + if _, err := checkCounterDelta(s, sql.MetaTxnAbort, accum.txnAbortCount, 1); err != nil { + t.Error(err) + } } func TestSavepointMetrics(t *testing.T) { @@ -309,8 +310,8 @@ func TestSavepointMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if _, err := txn.Exec("SAVEPOINT blah"); err == nil { - t.Fatal("expected an error but didn't get one") + if _, err := txn.Exec("SAVEPOINT blah"); err != nil { + t.Fatal(err) } if err := txn.Rollback(); err != nil { t.Fatal(err) diff --git a/pkg/sql/opt/exec/execbuilder/builder.go b/pkg/sql/opt/exec/execbuilder/builder.go index 9e4ba717df69..4a79c5d2cd4a 100644 --- a/pkg/sql/opt/exec/execbuilder/builder.go +++ b/pkg/sql/opt/exec/execbuilder/builder.go @@ -69,6 +69,11 @@ type Builder struct { // set to true, it ensures that a FOR UPDATE row-level locking mode is used // by scans. See forUpdateLocking. forceForUpdateLocking bool + + // -- output -- + + // IsDDL is set to true if the statement contains DDL. + IsDDL bool } // New constructs an instance of the execution node builder using the diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 5e981701e07a..b286a3ce2217 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -141,13 +141,17 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { var ep execPlan var err error - // This will set the system DB trigger for transactions containing - // schema-modifying statements that have no effect, such as - // `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;` - // where the table already exists. This will generate some false schema - // cache refreshes, but that's expected to be quite rare in practice. isDDL := opt.IsDDLOp(e) if isDDL { + // Mark the statement as containing DDL for use + // in the SQL executor. + b.IsDDL = true + + // This will set the system DB trigger for transactions containing + // schema-modifying statements that have no effect, such as + // `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;` + // where the table already exists. This will generate some false schema + // cache refreshes, but that's expected to be quite rare in practice. if err := b.evalCtx.Txn.SetSystemConfigTrigger(); err != nil { return execPlan{}, errors.WithSecondaryError( unimplemented.NewWithIssuef(26508, diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index c843360918f5..4a01a173db6c 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -301,6 +301,7 @@ func TestContextualHelp(t *testing.T) { {`SHOW TRANSACTION ISOLATION LEVEL ??`, `SHOW TRANSACTION`}, {`SHOW SYNTAX ??`, `SHOW SYNTAX`}, {`SHOW SYNTAX 'foo' ??`, `SHOW SYNTAX`}, + {`SHOW SAVEPOINT STATUS ??`, `SHOW SAVEPOINT`}, {`SHOW RANGE ??`, `SHOW RANGE`}, diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 720bde3533fe..505db42c9111 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -519,6 +519,8 @@ func TestParse(t *testing.T) { {`SHOW TRANSACTION STATUS`}, {`EXPLAIN SHOW TRANSACTION STATUS`}, + {`SHOW SAVEPOINT STATUS`}, + {`EXPLAIN SHOW SAVEPOINT STATUS`}, {`SHOW SYNTAX 'select 1'`}, {`EXPLAIN SHOW SYNTAX 'select 1'`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index c507449fef98..f86b72c86b5d 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -790,6 +790,7 @@ func newNameFromStr(s string) *tree.Name { %type show_sequences_stmt %type show_session_stmt %type show_sessions_stmt +%type show_savepoint_stmt %type show_stats_stmt %type show_syntax_stmt %type show_tables_stmt @@ -3386,6 +3387,7 @@ show_stmt: | show_ranges_stmt // EXTEND WITH HELP: SHOW RANGES | show_range_for_row_stmt | show_roles_stmt // EXTEND WITH HELP: SHOW ROLES +| show_savepoint_stmt // EXTEND WITH HELP: SHOW SAVEPOINT | show_schemas_stmt // EXTEND WITH HELP: SHOW SCHEMAS | show_sequences_stmt // EXTEND WITH HELP: SHOW SEQUENCES | show_session_stmt // EXTEND WITH HELP: SHOW SESSION @@ -3838,6 +3840,17 @@ show_syntax_stmt: } | SHOW SYNTAX error // SHOW HELP: SHOW SYNTAX +// %Help: SHOW SAVEPOINT - display current savepoint properties +// %Category: Cfg +// %Text: SHOW SAVEPOINT STATUS +show_savepoint_stmt: + SHOW SAVEPOINT STATUS + { + /* SKIP DOC */ + $$.val = &tree.ShowSavepointStatus{} + } +| SHOW SAVEPOINT error // SHOW HELP: SHOW SAVEPOINT + // %Help: SHOW TRANSACTION - display current transaction properties // %Category: Cfg // %Text: SHOW TRANSACTION {ISOLATION LEVEL | PRIORITY | STATUS} diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 85336e393754..5f27d5518ee9 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -443,6 +443,9 @@ const ( // planFlagImplicitTxn marks that the plan was run inside of an implicit // transaction. planFlagImplicitTxn + + // planFlagIsDDL marks that the plan contains DDL. + planFlagIsDDL ) func (pf planFlags) IsSet(flag planFlags) bool { diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index b58103436c93..e696e12ef463 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -164,7 +164,8 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { // Build the plan tree. root := execMemo.RootExpr() execFactory := makeExecFactory(p) - plan, err := execbuilder.New(&execFactory, execMemo, &opc.catalog, root, p.EvalContext()).Build() + bld := execbuilder.New(&execFactory, execMemo, &opc.catalog, root, p.EvalContext()) + plan, err := bld.Build() if err != nil { return err } @@ -172,6 +173,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { result := plan.(*planTop) result.stmt = stmt result.flags = opc.flags + if bld.IsDDL { + result.flags.Set(planFlagIsDDL) + } cols := planColumns(result.plan) if stmt.ExpectedTypes != nil { diff --git a/pkg/sql/sem/tree/show.go b/pkg/sql/sem/tree/show.go index 95a441914137..326fc8e4848a 100644 --- a/pkg/sql/sem/tree/show.go +++ b/pkg/sql/sem/tree/show.go @@ -395,6 +395,15 @@ func (node *ShowTransactionStatus) Format(ctx *FmtCtx) { ctx.WriteString("SHOW TRANSACTION STATUS") } +// ShowSavepointStatus represents a SHOW SAVEPOINT STATUS statement. +type ShowSavepointStatus struct { +} + +// Format implements the NodeFormatter interface. +func (node *ShowSavepointStatus) Format(ctx *FmtCtx) { + ctx.WriteString("SHOW SAVEPOINT STATUS") +} + // ShowUsers represents a SHOW USERS statement. type ShowUsers struct { } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index dc855745ba30..c33f5ba78e58 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -777,6 +777,14 @@ func (*ShowTransactionStatus) StatementTag() string { return "SHOW TRANSACTION S func (*ShowTransactionStatus) observerStatement() {} +// StatementType implements the Statement interface. +func (*ShowSavepointStatus) StatementType() StatementType { return Rows } + +// StatementTag returns a short string identifying the type of statement. +func (*ShowSavepointStatus) StatementTag() string { return "SHOW SAVEPOINT STATUS" } + +func (*ShowSavepointStatus) observerStatement() {} + // StatementType implements the Statement interface. func (*ShowUsers) StatementType() StatementType { return Rows } @@ -971,6 +979,7 @@ func (n *ShowRanges) String() string { return AsString(n) } func (n *ShowRangeForRow) String() string { return AsString(n) } func (n *ShowRoleGrants) String() string { return AsString(n) } func (n *ShowRoles) String() string { return AsString(n) } +func (n *ShowSavepointStatus) String() string { return AsString(n) } func (n *ShowSchemas) String() string { return AsString(n) } func (n *ShowSequences) String() string { return AsString(n) } func (n *ShowSessions) String() string { return AsString(n) } diff --git a/pkg/sql/sqlbase/errors.go b/pkg/sql/sqlbase/errors.go index 78febeb2366e..cd769a044a78 100644 --- a/pkg/sql/sqlbase/errors.go +++ b/pkg/sql/sqlbase/errors.go @@ -30,7 +30,8 @@ const ( ) // NewTransactionAbortedError creates an error for trying to run a command in -// the context of transaction that's already aborted. +// the context of transaction that's in the aborted state. Any statement other +// than ROLLBACK TO SAVEPOINT will return this error. func NewTransactionAbortedError(customMsg string) error { if customMsg != "" { return pgerror.Newf( diff --git a/pkg/sql/testdata/savepoints b/pkg/sql/testdata/savepoints new file mode 100644 index 000000000000..05f91b81c49e --- /dev/null +++ b/pkg/sql/testdata/savepoints @@ -0,0 +1,539 @@ +# This test exercises the savepoint state in the conn executor. + +subtest implicit_release_at_end + +# It's OK to leave savepoints open when the txn commits. +# This releases everything. +sql +BEGIN +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###.. foo>bar +4: SAVEPOINT baz -- 0 rows +-- Open -> Open ####. foo>bar>baz +5: COMMIT -- 0 rows +-- Open -> NoTxn ##### (none) + +# Ditto rollbacks. +sql +BEGIN +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +ROLLBACK +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###.. foo>bar +4: SAVEPOINT baz -- 0 rows +-- Open -> Open ####. foo>bar>baz +5: ROLLBACK -- 0 rows +-- Open -> NoTxn #.... (none) + +subtest end + +subtest savepoint_stack + +sql +BEGIN +SAVEPOINT foo +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +ROLLBACK TO SAVEPOINT foo +SAVEPOINT baz +RELEASE SAVEPOINT foo +SAVEPOINT bar +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.......... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##......... foo +3: SAVEPOINT foo -- 0 rows +-- Open -> Open ###........ foo>foo +4: SAVEPOINT bar -- 0 rows +-- Open -> Open ####....... foo>foo>bar +5: SAVEPOINT baz -- 0 rows +-- Open -> Open #####...... foo>foo>bar>baz +6: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ###........ foo>foo +7: SAVEPOINT baz -- 0 rows +-- Open -> Open ###...#.... foo>foo>baz +8: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###...##... foo +9: SAVEPOINT bar -- 0 rows +-- Open -> Open ###...###.. foo>bar +10: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###...####. (none) +11: COMMIT -- 0 rows +-- Open -> NoTxn ###...##### (none) + + +subtest end + +subtest savepoint_release_vs_rollback + +# A rollback keeps the savepoint active. +sql +BEGIN +SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +5: COMMIT -- 0 rows +-- Open -> NoTxn ##..# (none) + +# A release does not. +sql +BEGIN +SAVEPOINT foo +RELEASE SAVEPOINT foo +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###.. (none) +4: RELEASE SAVEPOINT foo -- pq: savepoint foo does not exist +-- Open -> Aborted XXXXX (none) +5: COMMIT -- 0 rows +-- Aborted -> NoTxn #.... (none) + +subtest end + + +subtest rollback_after_sql_error + +sql +BEGIN +SAVEPOINT foo +SELECT nonexistent +ROLLBACK TO SAVEPOINT foo +SELECT 123 +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #..... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##.... foo +3: SELECT nonexistent -- pq: column "nonexistent" does not exist +-- Open -> Aborted XXXXXX foo +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open ##.... foo +5: SELECT 123 -- 1 row +-- Open -> Open ##..#. foo +6: COMMIT -- 0 rows +-- Open -> NoTxn ##..## (none) + +subtest end + +subtest rollback_after_dup_error + +sql +CREATE TABLE t(x INT UNIQUE) +INSERT INTO t(x) VALUES (1) +BEGIN +SAVEPOINT foo +INSERT INTO t(x) VALUES (1) +ROLLBACK TO SAVEPOINT foo +INSERT INTO t(x) VALUES (2) +COMMIT +---- +1: CREATE TABLE t(x INT UNIQUE) -- 0 rows +-- NoTxn -> NoTxn #....... (none) +2: INSERT INTO t(x) VALUES (1) -- 1 row +-- NoTxn -> NoTxn ##...... (none) +3: BEGIN -- 0 rows +-- NoTxn -> Open ###..... (none) +4: SAVEPOINT foo -- 0 rows +-- Open -> Open ####.... foo +5: INSERT INTO t(x) VALUES (1) -- pq: duplicate key value (x)=(1) violates unique constraint "t_x_key" +-- Open -> Aborted XXXXXXXX foo +6: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open ####.... foo +7: INSERT INTO t(x) VALUES (2) -- 1 row +-- Open -> Open ####..#. foo +8: COMMIT -- 0 rows +-- Open -> NoTxn ####..## (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest rollback_after_ddl + +# DDL under savepoints is fine as long as there is no rollback. +# Note: we do two DDL; the first one is there just to anchor +# the txn on the config range. The second DDL is the one +# exercised in the test. +sql +BEGIN; CREATE TABLE unused(x INT) +SAVEPOINT foo +CREATE TABLE t(x INT) +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN; CREATE TABLE unused(x INT) -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: CREATE TABLE t(x INT) -- 0 rows +-- Open -> Open ###.. foo +4: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ####. (none) +5: COMMIT -- 0 rows +-- Open -> NoTxn ##### (none) + +sql +DROP TABLE unused +DROP TABLE t +---- +1: DROP TABLE unused -- 0 rows +-- NoTxn -> NoTxn #. (none) +2: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn ## (none) + +# Rollback is unsupported after DDL for now. +# TODO(knz): Lift this limitation. + +sql +BEGIN; CREATE TABLE unused(x INT) +SAVEPOINT foo +CREATE TABLE t(x INT) +ROLLBACK TO SAVEPOINT foo +---- +1: BEGIN; CREATE TABLE unused(x INT) -- 0 rows +-- NoTxn -> Open #... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##.. foo +3: CREATE TABLE t(x INT) -- 0 rows +-- Open -> Open ###. foo +4: ROLLBACK TO SAVEPOINT foo -- pq: unimplemented: ROLLBACK TO SAVEPOINT not yet supported after DDL statements +-- Open -> Aborted XXXX foo + +subtest end + +subtest cockroach_restart_cant_be_nested + +sql +BEGIN +SAVEPOINT foo +SAVEPOINT cockroach_restart +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##. foo +3: SAVEPOINT cockroach_restart -- pq: SAVEPOINT cockroach_restart cannot be nested +-- Open -> Aborted XXX foo + +subtest end + +subtest invalid_uses + +sql +SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +RELEASE SAVEPOINT foo +---- +1: SAVEPOINT foo -- pq: there is no transaction in progress +-- NoTxn -> NoTxn #.. (none) +2: ROLLBACK TO SAVEPOINT foo -- pq: savepoint foo does not exist +-- NoTxn -> NoTxn ##. (none) +3: RELEASE SAVEPOINT foo -- pq: there is no transaction in progress +-- NoTxn -> NoTxn ### (none) + +sql +BEGIN +SAVEPOINT foo +RELEASE SAVEPOINT bar +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##. foo +3: RELEASE SAVEPOINT bar -- pq: savepoint bar does not exist +-- Open -> Aborted XXX foo + +sql +BEGIN +SAVEPOINT foo +ROLLBACK TO SAVEPOINT bar +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##. foo +3: ROLLBACK TO SAVEPOINT bar -- pq: savepoint bar does not exist +-- Open -> Aborted XXX foo + +subtest end + +subtest rollback_after_error + +# check that we can rollback after an error +sql +BEGIN; SAVEPOINT foo +SELECT * FROM bogus_name +ROLLBACK TO SAVEPOINT foo +ROLLBACK +---- +1: BEGIN; SAVEPOINT foo -- 0 rows +-- NoTxn -> Open #... foo(r) +2: SELECT * FROM bogus_name -- pq: relation "bogus_name" does not exist +-- Open -> Aborted XXXX foo(r) +3: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open #... foo(r) +4: ROLLBACK -- 0 rows +-- Open -> NoTxn #... (none) + +# check that we can rollback after a retriable error to an initial savepoint +sql +BEGIN; SAVEPOINT init +SELECT crdb_internal.force_retry('100ms') +ROLLBACK TO SAVEPOINT init +ROLLBACK +---- +1: BEGIN; SAVEPOINT init -- 0 rows +-- NoTxn -> Open #... init(r) +2: SELECT crdb_internal.force_retry('100ms') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXX init(r) +3: ROLLBACK TO SAVEPOINT init -- 0 rows +-- Aborted -> Open #... init(r) +4: ROLLBACK -- 0 rows +-- Open -> NoTxn #... (none) + +# Check that, after a retriable error, rolling back to anything an initial +# savepoint fails with a retriable error. +sql +CREATE TABLE t(x INT) +BEGIN; SAVEPOINT init; SELECT count(1) from t; SAVEPOINT inner_savepoint +SELECT crdb_internal.force_retry('100ms') +ROLLBACK TO SAVEPOINT inner_savepoint +ROLLBACK TO SAVEPOINT init +ROLLBACK; DROP TABLE t +---- +1: CREATE TABLE t(x INT) -- 0 rows +-- NoTxn -> NoTxn #..... (none) +2: BEGIN; SAVEPOINT init; SELECT count(1) from t; SAVEPOINT inner_savepoint -- 0 rows +-- NoTxn -> Open ##.... init(r)>inner_savepoint +3: SELECT crdb_internal.force_retry('100ms') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXXXX init(r)>inner_savepoint +4: ROLLBACK TO SAVEPOINT inner_savepoint -- pq: restart transaction: TransactionRetryWithProtoRefreshError: cannot rollback to savepoint because the transaction experience a serializable restart +-- Aborted -> Aborted XXXXXX init(r)>inner_savepoint +5: ROLLBACK TO SAVEPOINT init -- 0 rows +-- Aborted -> Open ##.... init(r) +6: ROLLBACK; DROP TABLE t -- 0 rows +-- Open -> NoTxn ##.... (none) + +subtest end + + +subtest restart + +subtest restart/must_be_first_in_txn + +sql +CREATE TABLE t(x INT) +BEGIN +INSERT INTO t(x) VALUES (1) +SAVEPOINT cockroach_restart +---- +1: CREATE TABLE t(x INT) -- 0 rows +-- NoTxn -> NoTxn #... (none) +2: BEGIN -- 0 rows +-- NoTxn -> Open ##.. (none) +3: INSERT INTO t(x) VALUES (1) -- 1 row +-- Open -> Open ###. (none) +4: SAVEPOINT cockroach_restart -- pq: SAVEPOINT cockroach_restart needs to be the first statement in a transaction +-- Open -> Aborted XXXX (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest restart/release_without_savepoint + +sql +BEGIN +RELEASE SAVEPOINT cockroach_restart +ROLLBACK +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: RELEASE SAVEPOINT cockroach_restart -- pq: savepoint cockroach_restart does not exist +-- Open -> Aborted XXX (none) +3: ROLLBACK -- 0 rows +-- Aborted -> NoTxn #.. (none) + +subtest end + +subtest restart/rollback_without_savepoint + +# ROLLBACK TO SAVEPOINT in an open txn without a SAVEPOINT. +sql +BEGIN +ROLLBACK TO SAVEPOINT cockroach_restart +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #. (none) +2: ROLLBACK TO SAVEPOINT cockroach_restart -- pq: savepoint cockroach_restart does not exist +-- Open -> Aborted XX (none) + +# ROLLBACK TO SAVEPOINT in an aborted txn without a SAVEPOINT. +sql +BEGIN +SELECT * FROM bogus_name +ROLLBACK TO SAVEPOINT cockroach_restart +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SELECT * FROM bogus_name -- pq: relation "bogus_name" does not exist +-- Open -> Aborted XXX (none) +3: ROLLBACK TO SAVEPOINT cockroach_restart -- pq: savepoint cockroach_restart does not exist +-- Aborted -> Aborted XXX (none) + +subtest end + +subtest restart/rollbacks + +sql +CREATE TABLE t(x INT); +BEGIN; SAVEPOINT cockroach_restart +ROLLBACK TO SAVEPOINT cockroach_restart +ROLLBACK TO SAVEPOINT cockroach_restart +INSERT INTO t(x) VALUES (1) +ROLLBACK TO SAVEPOINT cockroach_restart +COMMIT +---- +1: CREATE TABLE t(x INT); -- 0 rows +-- NoTxn -> NoTxn #...... (none) +2: BEGIN; SAVEPOINT cockroach_restart -- 0 rows +-- NoTxn -> Open ##..... cockroach_restart(r) +3: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +4: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +5: INSERT INTO t(x) VALUES (1) -- 1 row +-- Open -> Open ##..#.. cockroach_restart(r) +6: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +7: COMMIT -- 0 rows +-- Open -> NoTxn ##....# (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + + +subtest restart/savepoint_under_restart + +sql +BEGIN; SAVEPOINT cockroach_restart +SAVEPOINT foo +SAVEPOINT bar +ROLLBACK TO SAVEPOINT foo +SELECT crdb_internal.force_retry('1s') +ROLLBACK TO SAVEPOINT cockroach_restart +SELECT 123 +COMMIT +---- +1: BEGIN; SAVEPOINT cockroach_restart -- 0 rows +-- NoTxn -> Open #....... cockroach_restart(r) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##...... cockroach_restart(r)>foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###..... cockroach_restart(r)>foo>bar +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##...... cockroach_restart(r)>foo +5: SELECT crdb_internal.force_retry('1s') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXXXXXX cockroach_restart(r)>foo +6: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Aborted -> Open #....... cockroach_restart(r) +7: SELECT 123 -- 1 row +-- Open -> Open #.....#. cockroach_restart(r) +8: COMMIT -- 0 rows +-- Open -> NoTxn #.....## (none) + +subtest end + +subtest restart/all_savepoints_disabled + +# Under "force_savepoint_restart", every savepoint +# is a restart savepoint. + +sql +SET force_savepoint_restart = true +BEGIN; SAVEPOINT foo +SAVEPOINT bar +---- +1: SET force_savepoint_restart = true -- 0 rows +-- NoTxn -> NoTxn #.. (none) +2: BEGIN; SAVEPOINT foo -- 0 rows +-- NoTxn -> Open ##. foo(r) +3: SAVEPOINT bar -- pq: SAVEPOINT cockroach_restart cannot be nested +-- Open -> Aborted XXX foo(r) + +sql +SET force_savepoint_restart = false +---- +1: SET force_savepoint_restart = false -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest end + +# Test that the rewinding we do when performing an automatic retry restores the +# savepoint stack properly. +subtest rewing_on_automatic_restarts + +# We're going to generate a retriable error that will rewind us back to the +# SELECT statement (not to the original SAVEPOINT statement since that one is +# special and we advance the rewind position past it). The test checks that, +# after every restart, the RELEASE works because the savepoint has be +# re-instituted before we rewind. +sql +BEGIN; SAVEPOINT a; SELECT 42; RELEASE a; SELECT crdb_internal.force_retry('10ms'); COMMIT; +---- +1: BEGIN; SAVEPOINT a; SELECT 42; RELEASE a; SELECT crdb_internal.force_retry('10ms'); COMMIT; -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index a0745cdc7457..9252bb8a0a99 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1065,13 +1065,12 @@ func TestUnexpectedStatementInRestartWait(t *testing.T) { if err := tx.QueryRow("SHOW TRANSACTION STATUS").Scan(&state); err != nil { t.Fatal(err) } - if state != "RestartWait" { - t.Fatalf("expected state %s, got: %s", "RestartWait", state) + if state != "Aborted" { + t.Fatalf("expected state %s, got: %s", "Aborted", state) } if _, err := tx.Exec("SELECT 1"); !testutils.IsError(err, - `pq: Expected "ROLLBACK TO SAVEPOINT cockroach_restart": `+ - "current transaction is aborted, commands ignored until end of transaction block") { + `pq: current transaction is aborted, commands ignored until end of transaction block`) { t.Fatal(err) } if err := tx.QueryRow("SHOW TRANSACTION STATUS").Scan(&state); err != nil { @@ -1449,12 +1448,6 @@ func TestRollbackToSavepointFromUnusualStates(t *testing.T) { } } - // ROLLBACK TO SAVEPOINT with a wrong name - _, err := sqlDB.Exec("ROLLBACK TO SAVEPOINT foo") - if !testutils.IsError(err, "SAVEPOINT not supported except for cockroach_restart") { - t.Fatalf("unexpected error: %v", err) - } - tx, err := sqlDB.Begin() if err != nil { t.Fatal(err) @@ -1513,12 +1506,12 @@ func TestTxnAutoRetriesDisabledAfterResultsHaveBeenSentToClient(t *testing.T) { { name: "client_directed_retries", clientDirectedRetry: true, - expectedTxnStateAfterRetriableErr: "RestartWait", + expectedTxnStateAfterRetriableErr: "Aborted", }, { name: "no_client_directed_retries", clientDirectedRetry: false, - expectedTxnStateAfterRetriableErr: "RestartWait", + expectedTxnStateAfterRetriableErr: "Aborted", }, { name: "autocommit", diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 1b0dda449af1..4c4f45d94178 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -104,11 +104,6 @@ type txnState struct { // txnAbortCount is incremented whenever the state transitions to // stateAborted. txnAbortCount *metric.Counter - - // activeRestartSavepointName stores the name of the active - // top-level restart savepoint, or is empty if no top-level restart - // savepoint is active. - activeRestartSavepointName tree.Name } // txnType represents the type of a SQL transaction. @@ -376,7 +371,9 @@ const ( txnRollback // txnRestart means that the transaction is restarting. The iteration of the // txn just finished will not commit. It is generated when we're about to - // auto-retry a txn and after a "ROLLBACK TO SAVEPOINT cockroach_restart". + // auto-retry a txn and after a rollback to a savepoint placed at the start of + // the transaction. This allows such savepoints to reset more state than other + // savepoints. txnRestart ) diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index 350a39446cde..9b3fba53c190 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -117,12 +117,6 @@ func (tc *testContext) createAbortedState() (fsm.State, *txnState) { return stateAborted{}, ts } -func (tc *testContext) createRestartWaitState() (fsm.State, *txnState) { - _, ts := tc.createOpenState(explicitTxn) - s := stateRestartWait{} - return s, ts -} - func (tc *testContext) createCommitWaitState() (fsm.State, *txnState, error) { _, ts := tc.createOpenState(explicitTxn) // Commit the KV txn, simulating what the execution layer is doing. @@ -418,7 +412,7 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b }, - expState: stateRestartWait{}, + expState: stateAborted{}, expAdv: expAdvance{ expCode: skipBatch, expEv: noEvent, @@ -563,68 +557,47 @@ func TestTransitions(t *testing.T) { expTxn: nil, }, { - // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). - name: "Aborted->Starting", + // The txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted). + name: "Aborted->Open", init: func() (fsm.State, *txnState, error) { s, ts := testCon.createAbortedState() return s, ts, nil }, - ev: eventTxnStart{ImplicitTxn: fsm.False}, - evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), - nil /* historicalTimestamp */, tranCtx), + ev: eventSavepointRollback{}, expState: stateOpen{ImplicitTxn: fsm.False}, expAdv: expAdvance{ expCode: advanceOne, - expEv: txnRestart, + expEv: noEvent, }, expTxn: &expKVTxn{}, }, { - // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). - // Verify that the historical timestamp from the evPayload is propagated - // to the expTxn. - name: "Aborted->Starting (historical)", + // The txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted). + name: "Aborted->Restart", init: func() (fsm.State, *txnState, error) { s, ts := testCon.createAbortedState() return s, ts, nil }, - ev: eventTxnStart{ImplicitTxn: fsm.False}, - evPayload: makeEventTxnStartPayload(pri, tree.ReadOnly, now.GoTime(), - &now, tranCtx), + ev: eventTxnRestart{}, expState: stateOpen{ImplicitTxn: fsm.False}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, }, expTxn: &expKVTxn{ - tsNanos: proto.Int64(now.WallTime), - }, - }, - // - // Tests starting from the RestartWait state. - // - { - // The txn got finished, such as after a ROLLBACK. - name: "RestartWait->NoTxn", - init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() - err := ts.mu.txn.Rollback(ts.Ctx) - return s, ts, err - }, - ev: eventTxnFinish{}, - evPayload: eventTxnFinishPayload{commit: false}, - expState: stateNoTxn{}, - expAdv: expAdvance{ - expCode: advanceOne, - expEv: txnRollback, + userPriority: &pri, + tsNanos: &now.WallTime, + origTSNanos: &now.WallTime, + maxTSNanos: &maxTS.WallTime, }, - expTxn: nil, }, { - // The txn got restarted, through a ROLLBACK TO SAVEPOINT. - name: "RestartWait->Open", + // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). + // Verify that the historical timestamp from the evPayload is propagated + // to the expTxn. + name: "Aborted->Starting (historical)", init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() + s, ts := testCon.createAbortedState() return s, ts, nil }, ev: eventTxnRestart{}, @@ -633,22 +606,9 @@ func TestTransitions(t *testing.T) { expCode: advanceOne, expEv: txnRestart, }, - expTxn: &expKVTxn{}, - }, - { - name: "RestartWait->Aborted", - init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() - return s, ts, nil - }, - ev: eventNonRetriableErr{IsCommit: fsm.False}, - evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, - expState: stateAborted{}, - expAdv: expAdvance{ - expCode: skipBatch, - expEv: noEvent, + expTxn: &expKVTxn{ + tsNanos: proto.Int64(now.WallTime), }, - expTxn: &expKVTxn{}, }, // // Tests starting from the CommitWait state. diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv index 74c6a072896f..296b13bd89ee 100644 --- a/pkg/sql/txnstatetransitions_diagram.gv +++ b/pkg/sql/txnstatetransitions_diagram.gv @@ -15,8 +15,13 @@ digraph finite_state_machine { node [shape = circle]; "Aborted{}" -> "Aborted{}" [label = any other statement>] "Aborted{}" -> "Aborted{}" [label = any other statement>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] "Aborted{}" -> "NoTxn{}" [label = ROLLBACK>] - "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "NoTxn{}" [label = COMMIT>] @@ -26,7 +31,7 @@ digraph finite_state_machine { "NoTxn{}" -> "Open{ImplicitTxn:true}" [label = BEGIN, or before a statement running as an implicit txn>] "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:false}"] "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] - "Open{ImplicitTxn:false}" -> "RestartWait{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] @@ -40,8 +45,4 @@ digraph finite_state_machine { "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = COMMIT/ROLLBACK, or after a statement running as an implicit txn>] - "RestartWait{}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:false}"] - "RestartWait{}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:true}"] - "RestartWait{}" -> "NoTxn{}" [label = ROLLBACK>] - "RestartWait{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] } diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt index a46864c1dedb..82f7822ef152 100644 --- a/pkg/sql/txnstatetransitions_report.txt +++ b/pkg/sql/txnstatetransitions_report.txt @@ -4,15 +4,16 @@ Aborted{} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} - TxnFinish{} - TxnStart{ImplicitTxn:false} - missing events: RetriableErr{CanAutoRetry:false, IsCommit:false} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} - TxnReleased{} + SavepointRollback{} + TxnFinish{} TxnRestart{} + missing events: + TxnReleased{} + TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} CommitWait{} handled events: @@ -24,6 +25,7 @@ CommitWait{} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} TxnReleased{} TxnRestart{} TxnStart{ImplicitTxn:false} @@ -39,6 +41,7 @@ NoTxn{} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} TxnFinish{} TxnReleased{} TxnRestart{} @@ -54,6 +57,7 @@ Open{ImplicitTxn:false} TxnReleased{} TxnRestart{} missing events: + SavepointRollback{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} Open{ImplicitTxn:true} @@ -66,21 +70,8 @@ Open{ImplicitTxn:true} RetriableErr{CanAutoRetry:true, IsCommit:true} TxnFinish{} missing events: + SavepointRollback{} TxnReleased{} TxnRestart{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} -RestartWait{} - handled events: - NonRetriableErr{IsCommit:false} - NonRetriableErr{IsCommit:true} - TxnFinish{} - TxnRestart{} - missing events: - RetriableErr{CanAutoRetry:false, IsCommit:false} - RetriableErr{CanAutoRetry:false, IsCommit:true} - RetriableErr{CanAutoRetry:true, IsCommit:false} - RetriableErr{CanAutoRetry:true, IsCommit:true} - TxnReleased{} - TxnStart{ImplicitTxn:false} - TxnStart{ImplicitTxn:true}