diff --git a/pkg/cli/interactive_tests/test_txn_prompt.tcl b/pkg/cli/interactive_tests/test_txn_prompt.tcl index 37615f38a544..af0382c94bac 100644 --- a/pkg/cli/interactive_tests/test_txn_prompt.tcl +++ b/pkg/cli/interactive_tests/test_txn_prompt.tcl @@ -123,7 +123,7 @@ eexpect root@ send "SELECT crdb_internal.force_retry('1s':::INTERVAL);\r" eexpect "ERROR: restart transaction" eexpect root@ -eexpect "RETRY>" +eexpect "ERROR>" end_test start_test "Test that prompt reverts to OPEN at beginning of new attempt." diff --git a/pkg/cli/sql.go b/pkg/cli/sql.go index 5224dae435a1..5a1ae12bf60e 100644 --- a/pkg/cli/sql.go +++ b/pkg/cli/sql.go @@ -740,14 +740,12 @@ func (c *cliState) refreshTransactionStatus() { // Change the prompt based on the response from the server. switch txnString { - case sql.NoTxnStr: + case sql.NoTxnStateStr: c.lastKnownTxnStatus = "" case sql.AbortedStateStr: c.lastKnownTxnStatus = " ERROR" case sql.CommitWaitStateStr: c.lastKnownTxnStatus = " DONE" - case sql.RestartWaitStateStr: - c.lastKnownTxnStatus = " RETRY" case sql.OpenStateStr: // The state AutoRetry is reported by the server as Open, so no need to // handle it here. diff --git a/pkg/cmd/roachtest/pgjdbc_blacklist.go b/pkg/cmd/roachtest/pgjdbc_blacklist.go index ffe0aeee29f5..03e44942e679 100644 --- a/pkg/cmd/roachtest/pgjdbc_blacklist.go +++ b/pkg/cmd/roachtest/pgjdbc_blacklist.go @@ -388,15 +388,7 @@ var pgjdbcBlackList20_1 = blacklist{ "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[730: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=YES, failMode=INSERT_BATCH, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "41448", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[731: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=YES, failMode=INSERT_BATCH, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "41448", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[73: autorollback(autoSave=NEVER, cleanSavePoint=TRUE, autoCommit=YES, failMode=INSERT_BATCH, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "41448", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[744: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[745: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[746: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[747: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[748: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[749: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "10735", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[74: autorollback(autoSave=NEVER, cleanSavePoint=TRUE, autoCommit=YES, failMode=INSERT_BATCH, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "41448", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[750: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[751: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[752: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=EXACT)]": "26508", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[753: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=STAR)]": "26508", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[754: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=EXACT)]": "26508", @@ -406,22 +398,6 @@ var pgjdbcBlackList20_1 = blacklist{ "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[758: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "26508", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[759: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "26508", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[75: autorollback(autoSave=NEVER, cleanSavePoint=TRUE, autoCommit=YES, failMode=INSERT_BATCH, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "41448", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[784: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[785: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[786: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[787: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[788: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[789: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[790: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[791: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[792: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=true, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[793: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=true, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[794: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=true, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[795: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=true, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[796: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=false, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[797: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=false, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[798: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[799: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[800: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=INSERT_BATCH, continueMode=COMMIT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=EXACT)]": "41448", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[801: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=INSERT_BATCH, continueMode=COMMIT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "41448", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[802: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=TRUE, autoCommit=NO, failMode=INSERT_BATCH, continueMode=COMMIT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "41448", @@ -456,14 +432,6 @@ var pgjdbcBlackList20_1 = blacklist{ "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[893: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=YES, failMode=INSERT_BATCH, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "41448", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[894: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=YES, failMode=INSERT_BATCH, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "41448", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[895: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=YES, failMode=INSERT_BATCH, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "41448", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[908: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[909: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[910: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[911: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[912: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[913: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[914: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[915: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=SELECT, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[916: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=EXACT)]": "26508", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[917: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=STAR)]": "26508", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[918: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=EXACT)]": "26508", @@ -472,22 +440,6 @@ var pgjdbcBlackList20_1 = blacklist{ "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[921: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "26508", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[922: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "26508", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[923: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=ALTER, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "26508", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[948: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[949: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[950: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[951: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=true, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[952: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[953: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[954: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[955: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[956: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=true, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[957: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=true, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[958: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=true, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[959: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=true, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[960: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=false, sql=SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[961: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=false, sql=SELECT, columns=STAR)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[962: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "10735", - "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[963: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=DEALLOCATE, continueMode=SELECT, flushOnDeallocate=false, hastransaction=false, sql=WITH_INSERT_SELECT, columns=STAR)]": "10735", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[964: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=INSERT_BATCH, continueMode=COMMIT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=EXACT)]": "41448", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[965: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=INSERT_BATCH, continueMode=COMMIT, flushOnDeallocate=true, hastransaction=false, sql=SELECT, columns=STAR)]": "41448", "org.postgresql.test.jdbc2.AutoRollbackTestSuite.run[966: autorollback(autoSave=CONSERVATIVE, cleanSavePoint=FALSE, autoCommit=NO, failMode=INSERT_BATCH, continueMode=COMMIT, flushOnDeallocate=true, hastransaction=false, sql=WITH_INSERT_SELECT, columns=EXACT)]": "41448", @@ -986,14 +938,6 @@ var pgjdbcBlackList20_1 = blacklist{ "org.postgresql.test.jdbc3.Jdbc3CallableStatementTest.testUpdateDecimal": "17511", "org.postgresql.test.jdbc3.Jdbc3CallableStatementTest.testUpdateReal": "17511", "org.postgresql.test.jdbc3.Jdbc3CallableStatementTest.testVarcharBool": "17511", - "org.postgresql.test.jdbc3.Jdbc3SavepointTest.testCantMixSavepointTypes": "10735", - "org.postgresql.test.jdbc3.Jdbc3SavepointTest.testComplicatedSavepointName": "10735", - "org.postgresql.test.jdbc3.Jdbc3SavepointTest.testContinueAfterError": "10735", - "org.postgresql.test.jdbc3.Jdbc3SavepointTest.testGlobalRollbackWorks": "10735", - "org.postgresql.test.jdbc3.Jdbc3SavepointTest.testReleaseSavepoint": "10735", - "org.postgresql.test.jdbc3.Jdbc3SavepointTest.testRollbackMultipleTimes": "10735", - "org.postgresql.test.jdbc3.Jdbc3SavepointTest.testRollingBackToInvalidSavepointFails": "10735", - "org.postgresql.test.jdbc3.Jdbc3SavepointTest.testRollingBackToSavepoints": "10735", "org.postgresql.test.jdbc3.ParameterMetaDataTest.testFailsOnBadIndex": "21286", "org.postgresql.test.jdbc3.ParameterMetaDataTest.testMultiStatement": "21286", "org.postgresql.test.jdbc3.ParameterMetaDataTest.testParameterMD": "21286", diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index e185df3886c6..ad356078824a 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -304,8 +304,13 @@ const ( // SavepointToken represents a savepoint. type SavepointToken interface { - // SavepointToken is a marker interface. - SavepointToken() + // Initial returns true if this savepoint has been created before performing + // any KV operations. If so, it is possible to rollback to it after a + // retriable error. If not, then rolling back to it after a retriable error + // will return the retriable error again because reads might have been + // evaluated before the savepoint and such reads cannot have their timestamp + // forwarded without a refresh. + Initial() bool } // TxnStatusOpt represents options for TxnSender.GetMeta(). diff --git a/pkg/kv/testdata/savepoints b/pkg/kv/testdata/savepoints index c973d48ae72f..0ab3d2ba5834 100644 --- a/pkg/kv/testdata/savepoints +++ b/pkg/kv/testdata/savepoints @@ -320,10 +320,6 @@ commit subtest end subtest rollback_across_retry - -# TODO(knz): change this test when rolling back across retries becomes -# supported. - begin ---- 0 @@ -339,11 +335,11 @@ epoch: 0 -> 1 release x ---- -(*withstack.withStack) cannot release savepoint across transaction retries +0 rollback x ---- -(*withstack.withStack) cannot rollback savepoint across transaction retries +0 subtest end @@ -364,11 +360,35 @@ txn id changed release x ---- -(*withstack.withStack) cannot release savepoint across transaction retries +0 rollback x ---- -(*withstack.withStack) cannot rollback savepoint across transaction retries +0 + +subtest end + +subtest rollback_across_retry_fails_for_non-initial_savepoint +# The difference from the previous test is that here we do a write before +# creating the savepoint. +begin +---- +0 + +put k a +---- +savepoint x +---- +1 + +retry +---- +synthetic error: TransactionRetryWithProtoRefreshError: forced retry +epoch: 0 -> 1 + +rollback x +---- +(*roachpb.TransactionRetryWithProtoRefreshError) TransactionRetryWithProtoRefreshError: cannot rollback to savepoint after a transaction restart subtest end diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index f8a846c55b86..0a64736263a6 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -103,7 +103,9 @@ type TxnCoordSender struct { // clients on Send(). storedErr *roachpb.Error - // active is set whenever the transaction has sent any requests. + // active is set whenever the transaction has sent any requests. Rolling + // back to a savepoint taken before the TxnCoordSender became active resets + // the field to false. active bool // closed is set once this transaction has either committed or rolled back @@ -176,6 +178,14 @@ type txnInterceptor interface { // increment. epochBumpedLocked() + // createSavepointLocked is used to populate a savepoint with all the state + // that needs to be restored on a rollback. + createSavepointLocked(context.Context, *savepoint) + + // rollbackToSavepointLocked is used to restore the state previously saved by + // createSavepointLocked(). + rollbackToSavepointLocked(context.Context, savepoint) + // closeLocked closes the interceptor. It is called when the TxnCoordSender // shuts down due to either a txn commit or a txn abort. The method will // be called exactly once from cleanupTxnLocked. @@ -943,7 +953,8 @@ func (tc *TxnCoordSender) IsTracking() bool { return tc.interceptorAlloc.txnHeartbeater.heartbeatLoopRunningLocked() } -// Active returns true iff there were commands executed already. +// Active returns true if requests were sent already. Rolling back to a +// savepoint taken before any requests were sent resets this to false. func (tc *TxnCoordSender) Active() bool { tc.mu.Lock() defer tc.mu.Unlock() diff --git a/pkg/kv/txn_coord_sender_savepoints.go b/pkg/kv/txn_coord_sender_savepoints.go index fad4cea6bb6b..9d6302dd70c4 100644 --- a/pkg/kv/txn_coord_sender_savepoints.go +++ b/pkg/kv/txn_coord_sender_savepoints.go @@ -14,56 +14,48 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) -// savepointToken captures the state in the TxnCoordSender necessary -// to restore that state upon a savepoint rollback. -// -// TODO(knz,andrei): Currently this definition is only sufficient for -// just a few cases of rollbacks. This should be extended to cover -// more ground: -// -// - We also need the current size of txnSpanRefresher.refreshSpans the -// list of tracked reads, such that upon rollback we tell the -// refresher interceptor to discard further reads. -// - We also need something about in-flight writes -// (txnPipeliner.ifWrites). There I guess we need to take some sort of -// snapshot of the current in-flight writes and, on rollback, discard -// in-flight writes that are not part of the savepoint. But, on -// rollback, I don't think we should (nor am I sure that we could) -// simply overwrite the set of in-flight writes with the ones from the -// savepoint because writes that have been verified since the snapshot -// has been taken should continue to be verified. Basically, on -// rollback I think we need to intersect the savepoint with the -// current set of in-flight writes. -type savepointToken struct { - // seqNum is currently the only field that helps to "restore" - // anything upon a rollback. When used, it does not change anything - // in the TCS; instead it simply configures the txn to ignore all - // seqnums from this value until the most recent seqnum emitted by - // the TCS. - seqNum enginepb.TxnSeq - - // txnID is used to verify that a rollback is not used to paper - // over a txn abort error. +// savepoint captures the state in the TxnCoordSender necessary to restore that +// state upon a savepoint rollback. +type savepoint struct { + // active is a snapshot of TxnCoordSender.active. + active bool + + // txnID and epoch are set for savepoints with the active field set. + // txnID and epoch are used to disallow rollbacks past transaction restarts. + // Savepoints without the active field set are allowed to be used to rollback + // past transaction restarts too, because it's trivial to rollback to the + // beginning of the transaction. txnID uuid.UUID - // epoch is used to verify that a savepoint rollback is not - // used to paper over a retry error. - // TODO(knz,andrei): expand savepoint rollbacks to recover - // from retry errors. - // TODO(knz,andrei): remove the epoch mechanism entirely in - // favor of seqnums and savepoint rollbacks. epoch enginepb.TxnEpoch + + // seqNum represents the write seq num at the time the savepoint was created. + // On rollback, it configures the txn to ignore all seqnums from this value + // until the most recent seqnum. + seqNum enginepb.TxnSeq + + // txnSpanRefresher fields. + refreshSpans []roachpb.Span + refreshInvalid bool + refreshSpanBytes int64 } -var _ client.SavepointToken = (*savepointToken)(nil) +var _ client.SavepointToken = (*savepoint)(nil) -// SavepointToken implements the client.SavepointToken interface. -func (s *savepointToken) SavepointToken() {} +// statically allocated savepoint marking the beginning of a transaction. Used +// to avoid allocations for such savepoints. +var initialSavepoint = savepoint{} + +// Initial implements the client.SavepointToken interface. +func (s *savepoint) Initial() bool { + return !s.active +} // CreateSavepoint is part of the client.TxnSender interface. func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.SavepointToken, error) { @@ -82,11 +74,22 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.Savepoint return nil, ErrSavepointOperationInErrorTxn } - return &savepointToken{ + if !tc.mu.active { + // Return a preallocated savepoint for the common case of savepoints placed + // at the beginning of transactions. + return &initialSavepoint, nil + } + + s := &savepoint{ + active: true, // we've handled the not-active case above txnID: tc.mu.txn.ID, epoch: tc.mu.txn.Epoch, - seqNum: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, - }, nil + } + for _, reqInt := range tc.interceptorStack { + reqInt.createSavepointLocked(ctx, s) + } + + return s, nil } // RollbackToSavepoint is part of the client.TxnSender interface. @@ -102,25 +105,41 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s client.Save return err } - st, err := tc.checkSavepointLocked(s, "rollback") + sp := s.(*savepoint) + err := tc.checkSavepointLocked(sp) if err != nil { + if err == errSavepointInvalidAfterTxnRestart { + err = roachpb.NewTransactionRetryWithProtoRefreshError( + "cannot rollback to savepoint after a transaction restart", + tc.mu.txn.ID, + // The transaction inside this error doesn't matter. + roachpb.Transaction{}, + ) + } return err } - // TODO(knz): handle recoverable errors. - if tc.mu.txnState == txnError { - return unimplemented.New("rollback_error", "savepoint rollback after error") + if tc.mu.txnState == txnFinalized { + return unimplemented.New("rollback_error", "savepoint rollback finalized txn") } - if st.seqNum == tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { - // No operations since savepoint was taken. No-op. - return nil + // Restore the transaction's state, in case we're rewiding after an error. + tc.mu.txnState = txnPending + + tc.mu.active = sp.active + + for _, reqInt := range tc.interceptorStack { + reqInt.rollbackToSavepointLocked(ctx, *sp) } - tc.mu.txn.AddIgnoredSeqNumRange( - enginepb.IgnoredSeqNumRange{ - Start: st.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, - }) + // If there's been any more writes since the savepoint was created, they'll + // need to be ignored. + if sp.seqNum < tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { + tc.mu.txn.AddIgnoredSeqNumRange( + enginepb.IgnoredSeqNumRange{ + Start: sp.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, + }) + } return nil } @@ -138,7 +157,16 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s client.Savepoi return ErrSavepointOperationInErrorTxn } - _, err := tc.checkSavepointLocked(s, "release") + sp := s.(*savepoint) + err := tc.checkSavepointLocked(sp) + if err == errSavepointInvalidAfterTxnRestart { + err = roachpb.NewTransactionRetryWithProtoRefreshError( + "cannot release savepoint after a transaction restart", + tc.mu.txn.ID, + // The transaction inside this error doesn't matter. + roachpb.Transaction{}, + ) + } return err } @@ -159,26 +187,29 @@ func (tc *TxnCoordSender) assertNotFinalized() error { return nil } -func (tc *TxnCoordSender) checkSavepointLocked( - s client.SavepointToken, opName string, -) (*savepointToken, error) { - st, ok := s.(*savepointToken) - if !ok { - return nil, errors.AssertionFailedf("expected savepointToken, got %T", s) - } +var errSavepointInvalidAfterTxnRestart = errors.New("savepoint invalid after transaction restart") - if st.txnID != tc.mu.txn.ID { - return nil, errors.Newf("cannot %s savepoint across transaction retries", opName) +// checkSavepointLocked checks whether the provided savepoint is still valid. +// Returns errSavepointInvalidAfterTxnRestart if the savepoint is not an +// "initial" one and the transaction has restarted since the savepoint was +// created. +func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint) error { + // Only savepoints taken before any activity are allowed to be used after a + // transaction restart. + if s.Initial() { + return nil } - - if st.epoch != tc.mu.txn.Epoch { - return nil, errors.Newf("cannot %s savepoint across transaction retries", opName) + if s.txnID != tc.mu.txn.ID { + return errSavepointInvalidAfterTxnRestart + } + if s.epoch != tc.mu.txn.Epoch { + return errSavepointInvalidAfterTxnRestart } - if st.seqNum < 0 || st.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { - return nil, errors.AssertionFailedf("invalid savepoint: got %d, expected 0-%d", - st.seqNum, tc.interceptorAlloc.txnSeqNumAllocator.writeSeq) + if s.seqNum < 0 || s.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { + return errors.AssertionFailedf("invalid savepoint: got %d, expected 0-%d", + s.seqNum, tc.interceptorAlloc.txnSeqNumAllocator.writeSeq) } - return st, nil + return nil } diff --git a/pkg/kv/txn_interceptor_committer.go b/pkg/kv/txn_interceptor_committer.go index 429893aa9889..8f4515d1479a 100644 --- a/pkg/kv/txn_interceptor_committer.go +++ b/pkg/kv/txn_interceptor_committer.go @@ -465,6 +465,12 @@ func (*txnCommitter) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked implements the txnReqInterceptor interface. func (tc *txnCommitter) epochBumpedLocked() {} +// createSavepointLocked is part of the txnReqInterceptor interface. +func (*txnCommitter) createSavepointLocked(context.Context, *savepoint) {} + +// rollbackToSavepointLocked is part of the txnReqInterceptor interface. +func (*txnCommitter) rollbackToSavepointLocked(context.Context, savepoint) {} + // closeLocked implements the txnReqInterceptor interface. func (tc *txnCommitter) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_heartbeater.go b/pkg/kv/txn_interceptor_heartbeater.go index 15ce3909125e..18ea00f4008d 100644 --- a/pkg/kv/txn_interceptor_heartbeater.go +++ b/pkg/kv/txn_interceptor_heartbeater.go @@ -189,6 +189,12 @@ func (*txnHeartbeater) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked is part of the txnInterceptor interface. func (h *txnHeartbeater) epochBumpedLocked() {} +// createSavepointLocked is part of the txnReqInterceptor interface. +func (*txnHeartbeater) createSavepointLocked(context.Context, *savepoint) {} + +// rollbackToSavepointLocked is part of the txnReqInterceptor interface. +func (*txnHeartbeater) rollbackToSavepointLocked(context.Context, savepoint) {} + // closeLocked is part of the txnInterceptor interface. func (h *txnHeartbeater) closeLocked() { h.cancelHeartbeatLoopLocked() diff --git a/pkg/kv/txn_interceptor_metric_recorder.go b/pkg/kv/txn_interceptor_metric_recorder.go index 28c555e975a9..2a1b8047fd00 100644 --- a/pkg/kv/txn_interceptor_metric_recorder.go +++ b/pkg/kv/txn_interceptor_metric_recorder.go @@ -75,6 +75,12 @@ func (*txnMetricRecorder) importLeafFinalState(*roachpb.LeafTxnFinalState) {} // epochBumpedLocked is part of the txnInterceptor interface. func (*txnMetricRecorder) epochBumpedLocked() {} +// createSavepointLocked is part of the txnReqInterceptor interface. +func (*txnMetricRecorder) createSavepointLocked(context.Context, *savepoint) {} + +// rollbackToSavepointLocked is part of the txnReqInterceptor interface. +func (*txnMetricRecorder) rollbackToSavepointLocked(context.Context, savepoint) {} + // closeLocked is part of the txnInterceptor interface. func (m *txnMetricRecorder) closeLocked() { if m.onePCCommit { diff --git a/pkg/kv/txn_interceptor_pipeliner.go b/pkg/kv/txn_interceptor_pipeliner.go index c82f3c01abf8..77e8f3995c4b 100644 --- a/pkg/kv/txn_interceptor_pipeliner.go +++ b/pkg/kv/txn_interceptor_pipeliner.go @@ -574,12 +574,7 @@ func (tp *txnPipeliner) setWrapped(wrapped lockedSender) { // populateLeafInputState is part of the txnInterceptor interface. func (tp *txnPipeliner) populateLeafInputState(tis *roachpb.LeafTxnInputState) { - if l := tp.ifWrites.len(); l > 0 { - tis.InFlightWrites = make([]roachpb.SequencedWrite, 0, l) - tp.ifWrites.ascend(func(w *inFlightWrite) { - tis.InFlightWrites = append(tis.InFlightWrites, w.SequencedWrite) - }) - } + tis.InFlightWrites = tp.ifWrites.AsSlice() } // initializeLeaf loads the in-flight writes for a leaf transaction. @@ -610,6 +605,38 @@ func (tp *txnPipeliner) epochBumpedLocked() { } } +// createSavepointLocked is part of the txnReqInterceptor interface. +func (tp *txnPipeliner) createSavepointLocked(context.Context, *savepoint) {} + +// rollbackToSavepointLocked is part of the txnReqInterceptor interface. +func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoint) { + // Move all the writes in txnPipeliner that are not in the savepoint to the + // write footprint. We no longer care if these write succeed or fail, so we're + // going to stop tracking these as in-flight writes. The respective intents + // still need to be cleaned up at the end of the transaction. + var writesToDelete []*inFlightWrite + needCollecting := !s.Initial() + tp.ifWrites.ascend(func(w *inFlightWrite) { + if w.Sequence > s.seqNum { + tp.footprint.insert(roachpb.Span{Key: w.Key}) + if needCollecting { + writesToDelete = append(writesToDelete, w) + } + } + }) + tp.footprint.mergeAndSort() + + // Restore the inflight writes from the savepoint (minus the ones that have + // been verified in the meantime) by removing all the extra ones. + if needCollecting { + for _, ifw := range writesToDelete { + tp.ifWrites.remove(ifw.Key, ifw.Sequence) + } + } else { + tp.ifWrites.clear(true /* reuse */) + } +} + // closeLocked implements the txnReqInterceptor interface. func (tp *txnPipeliner) closeLocked() {} @@ -653,6 +680,8 @@ func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) { s.t = btree.New(txnPipelinerBtreeDegree) } + // If the tree has not been cloned before, we can attempt a fast path where we + // update an existing element. s.tmp1.Key = key item := s.t.Get(&s.tmp1) if item != nil { @@ -758,7 +787,7 @@ func (s *inFlightWriteSet) byteSize() int64 { } // clear purges all elements from the in-flight write set and frees associated -// memory. The reuse flag indicates whether the caller is intending to reu-use +// memory. The reuse flag indicates whether the caller is intending to reuse // the set or not. func (s *inFlightWriteSet) clear(reuse bool) { if s.t == nil { @@ -769,6 +798,19 @@ func (s *inFlightWriteSet) clear(reuse bool) { s.alloc.clear() } +// AsSlice returns the in-flight writes, ordered by key. +func (s *inFlightWriteSet) AsSlice() []roachpb.SequencedWrite { + l := s.len() + if l == 0 { + return nil + } + writes := make([]roachpb.SequencedWrite, 0, l) + s.ascend(func(w *inFlightWrite) { + writes = append(writes, w.SequencedWrite) + }) + return writes +} + // inFlightWriteAlloc provides chunk allocation of inFlightWrites, // amortizing the overhead of each allocation. type inFlightWriteAlloc []inFlightWrite diff --git a/pkg/kv/txn_interceptor_pipeliner_test.go b/pkg/kv/txn_interceptor_pipeliner_test.go index eb03cad0d786..194c74fa805a 100644 --- a/pkg/kv/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/txn_interceptor_pipeliner_test.go @@ -1226,3 +1226,90 @@ func TestTxnPipelinerRecordsWritesOnFailure(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) require.Len(t, tp.footprint.asSlice(), 2) } + +// Test that the pipeliners knows how to save and restore its state. +func TestTxnPipelinerSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner() + + initialSavepoint := savepoint{} + tp.createSavepointLocked(ctx, &initialSavepoint) + + tp.ifWrites.insert(roachpb.Key("a"), 10) + tp.ifWrites.insert(roachpb.Key("b"), 11) + tp.ifWrites.insert(roachpb.Key("c"), 12) + require.Equal(t, 3, tp.ifWrites.len()) + + s := savepoint{seqNum: enginepb.TxnSeq(12), active: true} + tp.createSavepointLocked(ctx, &s) + + // Some more writes after the savepoint. One of them is on key "c" that is + // part of the savepoint too, so we'll check that, upon rollback, the savepoint is + // updated to remove the lower-seq-num write to "c" that it was tracking as in-flight. + tp.ifWrites.insert(roachpb.Key("c"), 13) + tp.ifWrites.insert(roachpb.Key("d"), 14) + require.Empty(t, tp.footprint.asSlice()) + + // Now verify one of the writes. When we'll rollback to the savepoint below, + // we'll check that the verified write stayed verified. + txn := makeTxnProto() + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a")}}) + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 2) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &roachpb.GetRequest{}, ba.Requests[1].GetInner()) + + qiReq := ba.Requests[0].GetInner().(*roachpb.QueryIntentRequest) + require.Equal(t, roachpb.Key("a"), qiReq.Key) + require.Equal(t, enginepb.TxnSeq(10), qiReq.Txn.Sequence) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetQueryIntent().FoundIntent = true + return br, nil + }) + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, []roachpb.Span{{Key: roachpb.Key("a")}}, tp.footprint.asSlice()) + require.Equal(t, 3, tp.ifWrites.len()) // We've verified one out of 4 writes. + + // Now restore the savepoint and check that the in-flight write state has been restored + // and all rolled-back writes were moved to the write footprint. + tp.rollbackToSavepointLocked(ctx, s) + + // Check that the tracked inflight writes were updated correctly. The key that + // had been verified ("a") should have been taken out of the savepoint. Same + // for the "c", for which the pipeliner is now tracking a + // higher-sequence-number (which implies that it must have verified the lower + // sequence number write). + var ifWrites []inFlightWrite + tp.ifWrites.ascend(func(w *inFlightWrite) { + ifWrites = append(ifWrites, *w) + }) + require.Equal(t, + []inFlightWrite{ + {roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}}, + }, + ifWrites) + + // Check that the footprint was updated correctly. In addition to the "a" + // which it had before, it will also have "d" because it's not part of the + // savepoint. It will also have "c" since that's not an in-flight write any + // more (see above). + require.Equal(t, + []roachpb.Span{ + {Key: roachpb.Key("a")}, + {Key: roachpb.Key("c")}, + {Key: roachpb.Key("d")}, + }, + tp.footprint.asSlice()) + + // Now rollback to the initial savepoint and check that all in-flight writes are gone. + tp.rollbackToSavepointLocked(ctx, initialSavepoint) + require.Empty(t, tp.ifWrites.len()) +} diff --git a/pkg/kv/txn_interceptor_seq_num_allocator.go b/pkg/kv/txn_interceptor_seq_num_allocator.go index f5324ad1c215..52e900931dac 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator.go @@ -175,5 +175,16 @@ func (s *txnSeqNumAllocator) epochBumpedLocked() { s.readSeq = 0 } +// createSavepointLocked is part of the txnReqInterceptor interface. +func (s *txnSeqNumAllocator) createSavepointLocked(ctx context.Context, sp *savepoint) { + sp.seqNum = s.writeSeq +} + +// rollbackToSavepointLocked is part of the txnReqInterceptor interface. +func (*txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) { + // Nothing to restore. The seq nums keep increasing. The TxnCoordSender has + // added a range of sequence numbers to the ignored list. +} + // closeLocked is part of the txnInterceptor interface. func (*txnSeqNumAllocator) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_seq_num_allocator_test.go b/pkg/kv/txn_interceptor_seq_num_allocator_test.go index 07e84a73b4ce..44a8dd9c9857 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator_test.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator_test.go @@ -353,3 +353,33 @@ func TestSequenceNumberAllocationAfterLeafInitialization(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) } + +// TestSequenceNumberAllocationSavepoint tests that the allocator populates a +// savepoint with the cur seq num. +func TestSequenceNumberAllocationSavepoint(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s, mockSender := makeMockTxnSeqNumAllocator() + txn := makeTxnProto() + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + + // Perform a few writes to increase the sequence number counter. + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + br, pErr := s.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, enginepb.TxnSeq(2), s.writeSeq) + + sp := &savepoint{} + s.createSavepointLocked(ctx, sp) + require.Equal(t, enginepb.TxnSeq(2), sp.seqNum) +} diff --git a/pkg/kv/txn_interceptor_span_refresher.go b/pkg/kv/txn_interceptor_span_refresher.go index d3f43349b1c3..83a08f7d9cf3 100644 --- a/pkg/kv/txn_interceptor_span_refresher.go +++ b/pkg/kv/txn_interceptor_span_refresher.go @@ -451,5 +451,21 @@ func (sr *txnSpanRefresher) epochBumpedLocked() { sr.refreshedTimestamp.Reset() } +// createSavepointLocked is part of the txnReqInterceptor interface. +func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepoint) { + s.refreshSpans = make([]roachpb.Span, len(sr.refreshSpans)) + copy(s.refreshSpans, sr.refreshSpans) + s.refreshInvalid = sr.refreshInvalid + s.refreshSpanBytes = sr.refreshSpansBytes +} + +// rollbackToSavepointLocked is part of the txnReqInterceptor interface. +func (sr *txnSpanRefresher) rollbackToSavepointLocked(ctx context.Context, s savepoint) { + sr.refreshSpans = make([]roachpb.Span, len(s.refreshSpans)) + copy(sr.refreshSpans, s.refreshSpans) + sr.refreshInvalid = s.refreshInvalid + sr.refreshSpansBytes = s.refreshSpanBytes +} + // closeLocked implements the txnInterceptor interface. func (*txnSpanRefresher) closeLocked() {} diff --git a/pkg/kv/txn_interceptor_span_refresher_test.go b/pkg/kv/txn_interceptor_span_refresher_test.go index 732d81e10efd..1386a40526f4 100644 --- a/pkg/kv/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/txn_interceptor_span_refresher_test.go @@ -555,3 +555,71 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) } + +// TestTxnSpanRefresherSavepoint checks that the span refresher can savepoint +// its state and restore it. +func TestTxnSpanRefresherSavepoint(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tsr, mockSender := makeMockTxnSpanRefresher() + + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + txn := makeTxnProto() + + read := func(key roachpb.Key) { + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + getArgs := roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: key}} + ba.Add(&getArgs) + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 1) + require.IsType(t, &roachpb.GetRequest{}, ba.Requests[0].GetInner()) + + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + } + read(keyA) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + + s := savepoint{} + tsr.createSavepointLocked(ctx, &s) + + // Another read after the savepoint was created. + read(keyB) + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshSpans) + + require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans) + require.Less(t, s.refreshSpanBytes, tsr.refreshSpansBytes) + require.False(t, s.refreshInvalid) + + // Rollback the savepoint and check that refresh spans were overwritten. + tsr.rollbackToSavepointLocked(ctx, s) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + + // Set MaxTxnRefreshSpansBytes limit low and then exceed it. + MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 1) + read(keyB) + require.True(t, tsr.refreshInvalid) + + // Check that rolling back to the savepoint resets refreshInvalid. + tsr.rollbackToSavepointLocked(ctx, s) + require.Equal(t, tsr.refreshSpansBytes, s.refreshSpanBytes) + require.False(t, tsr.refreshInvalid) + + // Exceed the limit again and then create a savepoint. + read(keyB) + require.True(t, tsr.refreshInvalid) + s = savepoint{} + tsr.createSavepointLocked(ctx, &s) + require.True(t, s.refreshInvalid) + require.Empty(t, s.refreshSpans) + // Rollback to the savepoint check that refreshes are still invalid. + tsr.rollbackToSavepointLocked(ctx, s) + require.Empty(t, tsr.refreshSpans) + require.True(t, tsr.refreshInvalid) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ef39fbe83887..eb533c3308ba 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -980,6 +980,10 @@ type connExecutor struct { // stateOpen. autoRetryCounter int + // numDDL keeps track of how many DDL statements have been + // executed so far. + numDDL int + // txnRewindPos is the position within stmtBuf to which we'll rewind when // performing automatic retries. This is more or less the position where the // current transaction started. @@ -1013,6 +1017,13 @@ type connExecutor struct { // committed or aborted). It is set when txn is started but can remain // unset when txn is executed within another higher-level txn. onTxnFinish func(txnEvent) + + // savepoints maintains the stack of savepoints currently open. + savepoints savepointStack + // savepointsAtTxnRewindPos is a snapshot of the savepoints stack before + // processing the command at position txnRewindPos. When rewinding, we're + // going to restore this snapshot. + savepointsAtTxnRewindPos savepointStack } // sessionData contains the user-configurable connection variables. @@ -1191,12 +1202,16 @@ func (ex *connExecutor) resetExtraTxnState( switch ev { case txnCommit, txnRollback: + ex.extraTxnState.savepoints.clear() // After txn is finished, we need to call onTxnFinish (if it's non-nil). if ex.extraTxnState.onTxnFinish != nil { ex.extraTxnState.onTxnFinish(ev) ex.extraTxnState.onTxnFinish = nil } } + // NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a + // a ROLLBACK TO SAVEPOINT that generated the event, and that statement deals with the + // savepoints, or it's a rewind which also deals with them. return nil } @@ -1558,6 +1573,7 @@ func (ex *connExecutor) execCmd(ctx context.Context) error { } case rewind: ex.rewindPrepStmtNamespace(ctx) + ex.extraTxnState.savepoints = ex.extraTxnState.savepointsAtTxnRewindPos advInfo.rewCap.rewindAndUnlock(ctx) case stayInPlace: // Nothing to do. The same statement will be executed again. @@ -1684,6 +1700,7 @@ func (ex *connExecutor) setTxnRewindPos(ctx context.Context, pos CmdPos) { ex.extraTxnState.txnRewindPos = pos ex.stmtBuf.ltrim(ctx, pos) ex.commitPrepStmtNamespace(ctx) + ex.extraTxnState.savepointsAtTxnRewindPos = ex.extraTxnState.savepoints.clone() } // stmtDoesntNeedRetry returns true if the given statement does not need to be @@ -1700,8 +1717,6 @@ func stateToTxnStatusIndicator(s fsm.State) TransactionStatusIndicator { return InTxnBlock case stateAborted: return InFailedTxnBlock - case stateRestartWait: - return InTxnBlock case stateNoTxn: return IdleTxnBlock case stateCommitWait: @@ -1869,10 +1884,6 @@ func errIsRetriable(err error) bool { func (ex *connExecutor) makeErrEvent(err error, stmt tree.Statement) (fsm.Event, fsm.EventPayload) { retriable := errIsRetriable(err) if retriable { - if _, inOpen := ex.machine.CurState().(stateOpen); !inOpen { - panic(fmt.Sprintf("retriable error in unexpected state: %#v", - ex.machine.CurState())) - } rc, canAutoRetry := ex.getRewindTxnCapability() ev := eventRetriableErr{ IsCommit: fsm.FromBool(isCommit(stmt)), @@ -2473,22 +2484,19 @@ func (sc *StatementCounters) incrementCount(ex *connExecutor, stmt tree.Statemen case *tree.RollbackTransaction: sc.TxnRollbackCount.Inc() case *tree.Savepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Name); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Name) { sc.RestartSavepointCount.Inc() } else { sc.SavepointCount.Inc() } case *tree.ReleaseSavepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Savepoint); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Savepoint) { sc.ReleaseRestartSavepointCount.Inc() } else { sc.ReleaseSavepointCount.Inc() } case *tree.RollbackToSavepoint: - // TODO(knz): Sanitize this. - if err := ex.validateSavepointName(t.Savepoint); err == nil { + if ex.isCommitOnReleaseSavepoint(t.Savepoint) { sc.RollbackToRestartSavepointCount.Inc() } else { sc.RollbackToSavepointCount.Inc() diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index bcf2a99471c4..16459246cc32 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -120,7 +120,7 @@ func (ex *connExecutor) execStmt( case eventNonRetriableErr: ex.recordFailure() } - case stateAborted, stateRestartWait: + case stateAborted: ev, payload = ex.execStmtInAbortedState(ctx, stmt, res) case stateCommitWait: ev, payload = ex.execStmtInCommitWaitState(stmt, res) @@ -273,10 +273,12 @@ func (ex *connExecutor) execStmtInOpenState( return ex.execSavepointInOpenState(ctx, s, res) case *tree.ReleaseSavepoint: - return ex.execReleaseSavepointInOpenState(ctx, s, res) + ev, payload := ex.execRelease(ctx, s, res) + return ev, payload, nil case *tree.RollbackToSavepoint: - return ex.execRollbackToSavepointInOpenState(ctx, s, res) + ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res) + return ev, payload, nil case *tree.Prepare: // This is handling the SQL statement "PREPARE". See execPrepare for @@ -603,32 +605,28 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error func (ex *connExecutor) commitSQLTransaction( ctx context.Context, stmt tree.Statement, ) (fsm.Event, fsm.EventPayload) { - ev, payload, _ := ex.commitSQLTransactionInternal(ctx, stmt) - return ev, payload + err := ex.commitSQLTransactionInternal(ctx, stmt) + if err != nil { + return ex.makeErrEvent(err, stmt) + } + return eventTxnFinish{}, eventTxnFinishPayload{commit: true} } -// commitSQLTransactionInternal is the part of a commit common to -// commitSQLTransaction and runReleaseRestartSavepointAsTxnCommit. func (ex *connExecutor) commitSQLTransactionInternal( ctx context.Context, stmt tree.Statement, -) (ev fsm.Event, payload fsm.EventPayload, ok bool) { - ex.clearSavepoints() - +) error { for _, sc := range ex.extraTxnState.schemaChangers.schemaChangers { if err := sc.validateTablePrimaryKeys(ctx, ex.state.mu.txn); err != nil { - ev, payload = ex.makeErrEvent(err, stmt) - return ev, payload, false + return err } } if err := ex.checkTableTwoVersionInvariant(ctx); err != nil { - ev, payload = ex.makeErrEvent(err, stmt) - return ev, payload, false + return err } if err := ex.state.mu.txn.Commit(ctx); err != nil { - ev, payload = ex.makeErrEvent(err, stmt) - return ev, payload, false + return err } // Now that we've committed, if we modified any table we need to make sure @@ -637,15 +635,12 @@ func (ex *connExecutor) commitSQLTransactionInternal( if tables := ex.extraTxnState.tables.getTablesWithNewVersion(); tables != nil { ex.extraTxnState.tables.releaseLeases(ctx) } - - return eventTxnFinish{}, eventTxnFinishPayload{commit: true}, true + return nil } // rollbackSQLTransaction executes a ROLLBACK statement: the KV transaction is // rolled-back and an event is produced. func (ex *connExecutor) rollbackSQLTransaction(ctx context.Context) (fsm.Event, fsm.EventPayload) { - ex.clearSavepoints() - if err := ex.state.mu.txn.Rollback(ctx); err != nil { log.Warningf(ctx, "txn rollback failed: %s", err) } @@ -780,6 +775,13 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro log.VEventf(ctx, 1, "optimizer plan failed: %v", err) return err } + + // TODO(knz): Remove this accounting if/when savepoint rollbacks + // support rolling back over DDL. + if planner.curPlan.flags.IsSet(planFlagIsDDL) { + ex.extraTxnState.numDDL++ + } + return nil } @@ -961,42 +963,45 @@ func (ex *connExecutor) execStmtInNoTxnState( func (ex *connExecutor) execStmtInAbortedState( ctx context.Context, stmt Statement, res RestrictedCommandResult, ) (fsm.Event, fsm.EventPayload) { - _, inRestartWait := ex.machine.CurState().(stateRestartWait) - // TODO(andrei/cuongdo): Figure out what statements to count here. + reject := func() (fsm.Event, fsm.EventPayload) { + ev := eventNonRetriableErr{IsCommit: fsm.False} + payload := eventNonRetriableErrPayload{ + err: sqlbase.NewTransactionAbortedError("" /* customMsg */), + } + return ev, payload + } + switch s := stmt.AST.(type) { case *tree.CommitTransaction, *tree.RollbackTransaction: - if inRestartWait { - ev, payload := ex.rollbackSQLTransaction(ctx) - return ev, payload + if _, ok := s.(*tree.CommitTransaction); ok { + // Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too. + res.ResetStmtType((*tree.RollbackTransaction)(nil)) } - ex.rollbackSQLTransaction(ctx) - ex.clearSavepoints() - - // Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too. - res.ResetStmtType((*tree.RollbackTransaction)(nil)) - - return eventTxnFinish{}, eventTxnFinishPayload{commit: false} + return ex.rollbackSQLTransaction(ctx) case *tree.RollbackToSavepoint: - return ex.execRollbackToSavepointInAbortedState(ctx, inRestartWait, s, res) + return ex.execRollbackToSavepointInAbortedState(ctx, s) case *tree.Savepoint: - return ex.execSavepointInAbortedState(ctx, inRestartWait, s, res) + if ex.isCommitOnReleaseSavepoint(s.Name) { + // We allow SAVEPOINT cockroach_restart as an alternative to ROLLBACK TO + // SAVEPOINT cockroach_restart in the Aborted state. This is needed + // because any client driver (that we know of) which links subtransaction + // `ROLLBACK/RELEASE` to an object's lifetime will fail to `ROLLBACK` on a + // failed `RELEASE`. Instead, we now can use the creation of another + // subtransaction object (which will issue another `SAVEPOINT` statement) + // to indicate retry intent. Specifically, this change was prompted by + // subtransaction handling in `libpqxx` (C++ driver) and `rust-postgres` + // (Rust driver). + res.ResetStmtType((*tree.RollbackToSavepoint)(nil)) + return ex.execRollbackToSavepointInAbortedState( + ctx, &tree.RollbackToSavepoint{Savepoint: s.Name}) + } + return reject() default: - ev := eventNonRetriableErr{IsCommit: fsm.False} - if inRestartWait { - payload := eventNonRetriableErrPayload{ - err: sqlbase.NewTransactionAbortedError( - "Expected \"ROLLBACK TO SAVEPOINT cockroach_restart\"" /* customMsg */), - } - return ev, payload - } - payload := eventNonRetriableErrPayload{ - err: sqlbase.NewTransactionAbortedError("" /* customMsg */), - } - return ev, payload + return reject() } } @@ -1017,7 +1022,7 @@ func (ex *connExecutor) execStmtInCommitWaitState( // Reply to a rollback with the COMMIT tag, by analogy to what we do when we // get a COMMIT in state Aborted. res.ResetStmtType((*tree.CommitTransaction)(nil)) - return eventTxnFinish{}, eventTxnFinishPayload{commit: true} + return eventTxnFinish{}, eventTxnFinishPayload{commit: false} default: ev = eventNonRetriableErr{IsCommit: fsm.False} payload = eventNonRetriableErrPayload{ @@ -1036,6 +1041,8 @@ func (ex *connExecutor) runObserverStatement( switch sqlStmt := stmt.AST.(type) { case *tree.ShowTransactionStatus: return ex.runShowTransactionState(ctx, res) + case *tree.ShowSavepointStatus: + return ex.runShowSavepointState(ctx, res) case *tree.ShowSyntax: return ex.runShowSyntax(ctx, sqlStmt.Statement, res) case *tree.SetTracing: diff --git a/pkg/sql/conn_executor_savepoints.go b/pkg/sql/conn_executor_savepoints.go index bb0f5d57e630..3a9d23ca8db7 100644 --- a/pkg/sql/conn_executor_savepoints.go +++ b/pkg/sql/conn_executor_savepoints.go @@ -14,191 +14,318 @@ import ( "context" "strings" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/fsm" ) +// commitOnReleaseSavepointName is the name of the savepoint with special +// release semantics: releasing this savepoint commits the underlying KV txn. +// This special savepoint is used to catch deferred serializability violations +// and is part of the client-directed transaction retries protocol. +const commitOnReleaseSavepointName = "cockroach_restart" + // execSavepointInOpenState runs a SAVEPOINT statement inside an open // txn. func (ex *connExecutor) execSavepointInOpenState( ctx context.Context, s *tree.Savepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - // Ensure that the user isn't trying to run BEGIN; SAVEPOINT; SAVEPOINT; - if ex.state.activeRestartSavepointName != "" { - err := unimplemented.NewWithIssueDetail(10735, "nested", "SAVEPOINT may not be nested") - ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil +) (fsm.Event, fsm.EventPayload, error) { + savepoints := &ex.extraTxnState.savepoints + // Sanity check for "SAVEPOINT cockroach_restart". + commitOnRelease := ex.isCommitOnReleaseSavepoint(s.Name) + if commitOnRelease { + // Validate the special savepoint cockroach_restart. It cannot be nested + // because it has special release semantics. + active := ex.state.mu.txn.Active() + l := len(*savepoints) + // If we've already declared this savepoint, but we haven't done anything + // with the KV txn yet (or, more importantly, we haven't done an anything + // with the KV txn since we've rolled back to it), treat the recreation of + // the savepoint as a no-op instead of erroring out because this savepoint + // cannot be nested (even within itself). + // This serves to support the following pattern: + // SAVEPOINT cockroach_restart + // -> serializability failure + // ROLLBACK TO SAVEPOINT cockroach_restart + // SAVEPOINT cockroach_restart + // + // Some of our examples use this pattern, issuing the SAVEPOINT cockroach_restart + // inside the retry loop. + // + // Of course, this means that the following doesn't work: + // SAVEPOINT cockroach_restart + // SAVEPOINT cockroach_restart + // RELEASE SAVEPOINT cockroach_restart + // ROLLBACK TO SAVEPOINT cockroach_restart -> the savepoint no longer exists here + // + // Although it would work for any other savepoint but cockroach_restart. But + // that's natural given the release semantics. + if l == 1 && (*savepoints)[0].commitOnRelease && !active { + return nil, nil, nil + } + + err := func() error { + if !savepoints.empty() { + return pgerror.Newf(pgcode.Syntax, + "SAVEPOINT \"%s\" cannot be nested", + tree.ErrNameString(commitOnReleaseSavepointName)) + } + // We want to disallow restart SAVEPOINTs to be issued after a KV + // transaction has started running. It is desirable to allow metadata + // queries against vtables to proceed before starting a SAVEPOINT for better + // ORM compatibility. + // See also https://github.com/cockroachdb/cockroach/issues/15012. + if ex.state.mu.txn.Active() { + return pgerror.Newf(pgcode.Syntax, + "SAVEPOINT \"%s\" needs to be the first statement in a transaction", + tree.ErrNameString(commitOnReleaseSavepointName)) + } + return nil + }() + if err != nil { + ev, payload := ex.makeErrEvent(err, s) + return ev, payload, nil + } } - if err := ex.validateSavepointName(s.Name); err != nil { + + token, err := ex.state.mu.txn.CreateSavepoint(ctx) + if err != nil { ev, payload := ex.makeErrEvent(err, s) return ev, payload, nil } - // We want to disallow SAVEPOINTs to be issued after a KV transaction has - // started running. The client txn's statement count indicates how many - // statements have been executed as part of this transaction. It is - // desirable to allow metadata queries against vtables to proceed - // before starting a SAVEPOINT for better ORM compatibility. - // See also: - // https://github.com/cockroachdb/cockroach/issues/15012 - if ex.state.mu.txn.Active() { - err := pgerror.Newf(pgcode.Syntax, - "SAVEPOINT %s needs to be the first statement in a transaction", restartSavepointName) - ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + + sp := savepoint{ + name: s.Name, + commitOnRelease: commitOnRelease, + kvToken: token, + numDDL: ex.extraTxnState.numDDL, } - ex.state.activeRestartSavepointName = s.Name - // Note that Savepoint doesn't have a corresponding plan node. - // This here is all the execution there is. + savepoints.push(sp) + return nil, nil, nil } -// execReleaseSavepointInOpenState runs a RELEASE SAVEPOINT statement -// inside an open txn. -func (ex *connExecutor) execReleaseSavepointInOpenState( +// execRelease runs a RELEASE SAVEPOINT statement inside an open txn. +func (ex *connExecutor) execRelease( ctx context.Context, s *tree.ReleaseSavepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - if err := ex.validateSavepointName(s.Savepoint); err != nil { +) (fsm.Event, fsm.EventPayload) { + env := &ex.extraTxnState.savepoints + entry, idx := env.find(s.Savepoint) + if entry == nil { + ev, payload := ex.makeErrEvent( + pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint \"%s\" does not exist", &s.Savepoint), s) + return ev, payload + } + + // Discard our savepoint and all further ones. Depending on what happens with + // the release below, we might add this savepoint back. + env.popToIdx(idx - 1) + + if entry.commitOnRelease { + res.ResetStmtType((*tree.CommitTransaction)(nil)) + err := ex.commitSQLTransactionInternal(ctx, s) + if err == nil { + return eventTxnReleased{}, nil + } + // Committing the transaction failed. We'll go to state RestartWait if + // it's a retriable error, or to state RollbackWait otherwise. + if errIsRetriable(err) { + // Add the savepoint back. We want to allow a ROLLBACK TO SAVEPOINT + // cockroach_restart (that's the whole point of commitOnRelease). + env.push(*entry) + + rc, canAutoRetry := ex.getRewindTxnCapability() + ev := eventRetriableErr{ + IsCommit: fsm.FromBool(isCommit(s)), + CanAutoRetry: fsm.FromBool(canAutoRetry), + } + payload := eventRetriableErrPayload{err: err, rewCap: rc} + return ev, payload + } + + // Non-retriable error. The transaction might have committed (i.e. the + // error might be ambiguous). We can't allow a ROLLBACK TO SAVEPOINT to + // recover the transaction, so we're not adding the savepoint back. + ex.rollbackSQLTransaction(ctx) + ev := eventNonRetriableErr{IsCommit: fsm.FromBool(false)} + payload := eventNonRetriableErrPayload{err: err} + return ev, payload + } + + if err := ex.state.mu.txn.ReleaseSavepoint(ctx, entry.kvToken); err != nil { ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + return ev, payload } - // ReleaseSavepoint is executed fully here; there's no plan for it. - ev, payload := ex.runReleaseRestartSavepointAsTxnCommit(ctx, s) - res.ResetStmtType((*tree.CommitTransaction)(nil)) - return ev, payload, nil + + return nil, nil } // execRollbackToSavepointInOpenState runs a ROLLBACK TO SAVEPOINT // statement inside an open txn. func (ex *connExecutor) execRollbackToSavepointInOpenState( ctx context.Context, s *tree.RollbackToSavepoint, res RestrictedCommandResult, -) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - if err := ex.validateSavepointName(s.Savepoint); err != nil { +) (fsm.Event, fsm.EventPayload) { + entry, idx := ex.extraTxnState.savepoints.find(s.Savepoint) + if entry == nil { + ev, payload := ex.makeErrEvent(pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint \"%s\" does not exist", &s.Savepoint), s) + return ev, payload + } + + // We don't yet support rolling back over DDL. Instead of creating an + // inconsistent txn or schema state, prefer to tell the users we don't know + // how to proceed yet. Initial savepoints are a special case - we can always + // rollback to them because we can reset all the schema change state. + if !entry.kvToken.Initial() && ex.extraTxnState.numDDL > entry.numDDL { + ev, payload := ex.makeErrEvent(unimplemented.NewWithIssueDetail(10735, "rollback-after-ddl", + "ROLLBACK TO SAVEPOINT not yet supported after DDL statements"), s) + return ev, payload + } + + if err := ex.state.mu.txn.RollbackToSavepoint(ctx, entry.kvToken); err != nil { ev, payload := ex.makeErrEvent(err, s) - return ev, payload, nil + return ev, payload } - ex.state.activeRestartSavepointName = "" - res.ResetStmtType((*tree.Savepoint)(nil)) - return eventTxnRestart{}, nil /* payload */, nil + ex.extraTxnState.savepoints.popToIdx(idx) + + if entry.kvToken.Initial() { + return eventTxnRestart{}, nil + } + // No event is necessary; there's nothing for the state machine to do. + return nil, nil } -// execSavepointInAbortedState runs a SAVEPOINT statement when a txn is aborted. -// It also contains the logic for ROLLBACK TO SAVEPOINT. -// TODO(knz): split this in different functions. -func (ex *connExecutor) execSavepointInAbortedState( - ctx context.Context, inRestartWait bool, s tree.Statement, res RestrictedCommandResult, +func (ex *connExecutor) execRollbackToSavepointInAbortedState( + ctx context.Context, s *tree.RollbackToSavepoint, ) (fsm.Event, fsm.EventPayload) { - // We accept both the "ROLLBACK TO SAVEPOINT cockroach_restart" and the - // "SAVEPOINT cockroach_restart" commands to indicate client intent to - // retry a transaction in a RestartWait state. - var spName tree.Name - var isRollback bool - switch n := s.(type) { - case *tree.RollbackToSavepoint: - spName = n.Savepoint - isRollback = true - case *tree.Savepoint: - spName = n.Name - default: - panic("unreachable") - } - // If the user issued a SAVEPOINT in the abort state, validate - // as though there were no active savepoint. - if !isRollback { - ex.state.activeRestartSavepointName = "" - } - if err := ex.validateSavepointName(spName); err != nil { + makeErr := func(err error) (fsm.Event, fsm.EventPayload) { ev := eventNonRetriableErr{IsCommit: fsm.False} payload := eventNonRetriableErrPayload{ err: err, } return ev, payload } - // Either clear or reset the current savepoint name so that - // ROLLBACK TO; SAVEPOINT; works. - if isRollback { - ex.state.activeRestartSavepointName = "" - } else { - ex.state.activeRestartSavepointName = spName - } - res.ResetStmtType((*tree.RollbackTransaction)(nil)) + entry, idx := ex.extraTxnState.savepoints.find(s.Savepoint) + if entry == nil { + return makeErr(pgerror.Newf(pgcode.InvalidSavepointSpecification, + "savepoint \"%s\" does not exist", tree.ErrString(&s.Savepoint))) + } - if inRestartWait { + ex.extraTxnState.savepoints.popToIdx(idx) + if err := ex.state.mu.txn.RollbackToSavepoint(ctx, entry.kvToken); err != nil { + return ex.makeErrEvent(err, s) + } + if entry.kvToken.Initial() { return eventTxnRestart{}, nil } - // We accept ROLLBACK TO SAVEPOINT even after non-retryable errors to make - // it easy for client libraries that want to indiscriminately issue - // ROLLBACK TO SAVEPOINT after every error and possibly follow it with a - // ROLLBACK and also because we accept ROLLBACK TO SAVEPOINT in the Open - // state, so this is consistent. - // We start a new txn with the same sql timestamp and isolation as the - // current one. - - ev := eventTxnStart{ - ImplicitTxn: fsm.False, - } - rwMode := tree.ReadWrite - if ex.state.readOnly { - rwMode = tree.ReadOnly - } - ex.rollbackSQLTransaction(ctx) - payload := makeEventTxnStartPayload( - ex.state.priority, rwMode, ex.state.sqlTimestamp, - nil /* historicalTimestamp */, ex.transitionCtx) - return ev, payload + return eventSavepointRollback{}, nil } -// execRollbackToSavepointInAbortedState runs a ROLLBACK TO SAVEPOINT -// statement when a txn is aborted. -func (ex *connExecutor) execRollbackToSavepointInAbortedState( - ctx context.Context, inRestartWait bool, s tree.Statement, res RestrictedCommandResult, -) (fsm.Event, fsm.EventPayload) { - return ex.execSavepointInAbortedState(ctx, inRestartWait, s, res) +// isCommitOnReleaseSavepoint returns true if the savepoint name implies special +// release semantics: releasing it commits the underlying KV txn. +func (ex *connExecutor) isCommitOnReleaseSavepoint(savepoint tree.Name) bool { + if ex.sessionData.ForceSavepointRestart { + // The session setting force_savepoint_restart implies that all + // uses of the SAVEPOINT statement are targeting restarts. + return true + } + return strings.HasPrefix(string(savepoint), commitOnReleaseSavepointName) } -// runReleaseRestartSavepointAsTxnCommit executes a commit after -// RELEASE SAVEPOINT statement when using an explicit transaction. -func (ex *connExecutor) runReleaseRestartSavepointAsTxnCommit( - ctx context.Context, stmt tree.Statement, -) (fsm.Event, fsm.EventPayload) { - if ev, payload, ok := ex.commitSQLTransactionInternal(ctx, stmt); !ok { - return ev, payload - } - return eventTxnReleased{}, nil +// savepoint represents a SQL savepoint - a snapshot of the current +// transaction's state at a previous point in time. +// +// Savepoints' behavior on RELEASE differs based on commitOnRelease, and their +// behavior on ROLLBACK after retriable errors differs based on +// kvToken.Initial(). +type savepoint struct { + name tree.Name + + // commitOnRelease is set if the special syntax "SAVEPOINT cockroach_restart" + // was used. Such a savepoint is special in that a RELEASE actually commits + // the transaction - giving the client a change to find out about any + // retriable error and issue another "ROLLBACK TO SAVEPOINT cockroach_restart" + // afterwards. Regular savepoints (even top-level savepoints) cannot commit + // the transaction on RELEASE. + // + // Only an `initial` savepoint can have this set (see + // client.SavepointToken.Initial()). + commitOnRelease bool + + kvToken client.SavepointToken + + // The number of DDL statements that had been executed in the transaction (at + // the time the savepoint was created). We refuse to roll back a savepoint if + // more DDL statements were executed since the savepoint's creation. + // TODO(knz): support partial DDL cancellation in pending txns. + numDDL int } -// validateSavepointName validates that it is that the provided ident -// matches the active savepoint name, begins with RestartSavepointName, -// or that force_savepoint_restart==true. We accept everything with the -// desired prefix because at least the C++ libpqxx appends sequence -// numbers to the savepoint name specified by the user. -func (ex *connExecutor) validateSavepointName(savepoint tree.Name) error { - if ex.state.activeRestartSavepointName != "" { - if savepoint == ex.state.activeRestartSavepointName { - return nil +type savepointStack []savepoint + +func (stack savepointStack) empty() bool { return len(stack) == 0 } + +func (stack *savepointStack) clear() { *stack = (*stack)[:0] } + +func (stack *savepointStack) push(s savepoint) { + *stack = append(*stack, s) +} + +// find finds the most recent savepoint with the given name. +// +// The returned savepoint can be modified (rolling back modifies the kvToken). +// Callers shouldn't maintain references to the returned savepoint, as +// references can be invalidated by further operations on the savepoints. +func (stack savepointStack) find(sn tree.Name) (*savepoint, int) { + for i := len(stack) - 1; i >= 0; i-- { + if stack[i].name == sn { + return &stack[i], i } - return pgerror.Newf(pgcode.InvalidSavepointSpecification, - `SAVEPOINT %q is in use`, tree.ErrString(&ex.state.activeRestartSavepointName)) - } - if !ex.sessionData.ForceSavepointRestart && !strings.HasPrefix(string(savepoint), restartSavepointName) { - return unimplemented.NewWithIssueHint(10735, - "SAVEPOINT not supported except for "+restartSavepointName, - "Retryable transactions with arbitrary SAVEPOINT names can be enabled "+ - "with SET force_savepoint_restart=true") } - return nil + return nil, -1 } -// clearSavepoints clears all savepoints defined so far. This -// occurs when the SQL txn is closed (abort/commit) and upon -// a top-level restart. -func (ex *connExecutor) clearSavepoints() { - ex.state.activeRestartSavepointName = "" +// popToIdx pops (discards) all the savepoints at higher indexes. +func (stack *savepointStack) popToIdx(idx int) { + *stack = (*stack)[:idx+1] } -// restartSavepointName is the only savepoint ident that we accept. -const restartSavepointName string = "cockroach_restart" +func (stack savepointStack) clone() savepointStack { + if len(stack) == 0 { + // Avoid allocating a slice. + return nil + } + cpy := make(savepointStack, len(stack)) + copy(cpy, stack) + return cpy +} + +// runShowSavepointState executes a SHOW SAVEPOINT STATUS statement. +// +// If an error is returned, the connection needs to stop processing queries. +func (ex *connExecutor) runShowSavepointState( + ctx context.Context, res RestrictedCommandResult, +) error { + res.SetColumns(ctx, sqlbase.ResultColumns{ + {Name: "savepoint_name", Typ: types.String}, + {Name: "is_initial_savepoint", Typ: types.Bool}, + }) + + for _, entry := range ex.extraTxnState.savepoints { + if err := res.AddRow(ctx, tree.Datums{ + tree.NewDString(string(entry.name)), + tree.MakeDBool(tree.DBool(entry.kvToken.Initial())), + }); err != nil { + return err + } + } + return nil +} diff --git a/pkg/sql/conn_executor_savepoints_test.go b/pkg/sql/conn_executor_savepoints_test.go new file mode 100644 index 000000000000..168379b46634 --- /dev/null +++ b/pkg/sql/conn_executor_savepoints_test.go @@ -0,0 +1,195 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql_test + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" +) + +func TestSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + datadriven.Walk(t, "testdata/savepoints", func(t *testing.T, path string) { + + params := base.TestServerArgs{} + s, sqlConn, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + if _, err := sqlConn.Exec("CREATE TABLE progress(n INT, marker BOOL)"); err != nil { + t.Fatal(err) + } + + datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "sql": + // Implicitly abort any previously-ongoing txn. + _, _ = sqlConn.Exec("ABORT") + // Prepare for the next test. + if _, err := sqlConn.Exec("DELETE FROM progress"); err != nil { + td.Fatalf(t, "cleaning up: %v", err) + } + + // Prepare a buffer to accumulate the results. + var buf strings.Builder + + // We're going to execute the input line-by-line. + stmts := strings.Split(td.Input, "\n") + + // progressBar is going to show the cancellation of writes + // during rollbacks. + progressBar := make([]byte, len(stmts)) + erase := func(status string) { + char := byte('.') + if !isOpenTxn(status) { + char = 'X' + } + for i := range progressBar { + progressBar[i] = char + } + } + + // stepNum is the index of the current statement + // in the input. + var stepNum int + + // updateProgress loads the current set of writes + // into the progress bar. + updateProgress := func() { + rows, err := sqlConn.Query("SELECT n FROM progress") + if err != nil { + t.Logf("%d: reading progress: %v", stepNum, err) + // It's OK if we can't read this. + return + } + defer rows.Close() + for rows.Next() { + var n int + if err := rows.Scan(&n); err != nil { + td.Fatalf(t, "%d: unexpected error while reading progress: %v", stepNum, err) + } + if n < 1 || n > len(progressBar) { + td.Fatalf(t, "%d: unexpected stepnum in progress table: %d", stepNum, n) + } + progressBar[n-1] = '#' + } + } + + // getTxnStatus retrieves the current txn state. + // This is guaranteed to always succeed because SHOW TRANSACTION STATUS + // is an observer statement. + getTxnStatus := func() string { + row := sqlConn.QueryRow("SHOW TRANSACTION STATUS") + var status string + if err := row.Scan(&status); err != nil { + td.Fatalf(t, "%d: unable to retrieve txn status: %v", stepNum, err) + } + return status + } + // showSavepointStatus is like getTxnStatus but retrieves the + // savepoint stack. + showSavepointStatus := func() { + rows, err := sqlConn.Query("SHOW SAVEPOINT STATUS") + if err != nil { + td.Fatalf(t, "%d: unable to retrieve savepoint status: %v", stepNum, err) + } + defer rows.Close() + + comma := "" + hasSavepoints := false + for rows.Next() { + var name string + var isRestart bool + if err := rows.Scan(&name, &isRestart); err != nil { + td.Fatalf(t, "%d: unexpected error while reading savepoints: %v", stepNum, err) + } + if isRestart { + name += "(r)" + } + buf.WriteString(comma) + buf.WriteString(name) + hasSavepoints = true + comma = ">" + } + if !hasSavepoints { + buf.WriteString("(none)") + } + } + // report shows the progress of execution so far after + // each statement executed. + report := func(beforeStatus, afterStatus string) { + erase(afterStatus) + if isOpenTxn(afterStatus) { + updateProgress() + } + fmt.Fprintf(&buf, "-- %-11s -> %-11s %s ", beforeStatus, afterStatus, string(progressBar)) + buf.WriteByte(' ') + showSavepointStatus() + buf.WriteByte('\n') + } + + // The actual execution of the statements starts here. + + beforeStatus := getTxnStatus() + for i, stmt := range stmts { + stepNum = i + 1 + // Before each statement, mark the progress so far with + // a KV write. + if isOpenTxn(beforeStatus) { + _, err := sqlConn.Exec("INSERT INTO progress(n, marker) VALUES ($1, true)", stepNum) + if err != nil { + td.Fatalf(t, "%d: before-stmt: %v", stepNum, err) + } + } + + // Run the statement and report errors/results. + fmt.Fprintf(&buf, "%d: %s -- ", stepNum, stmt) + execRes, err := sqlConn.Exec(stmt) + if err != nil { + fmt.Fprintf(&buf, "%v\n", err) + } else { + nRows, err := execRes.RowsAffected() + if err != nil { + fmt.Fprintf(&buf, "error retrieving rows: %v\n", err) + } else { + fmt.Fprintf(&buf, "%d row%s\n", nRows, util.Pluralize(nRows)) + } + } + + // Report progress on the next line + afterStatus := getTxnStatus() + report(beforeStatus, afterStatus) + beforeStatus = afterStatus + } + + return buf.String() + + default: + td.Fatalf(t, "unknown directive: %s", td.Cmd) + } + return "" + }) + }) +} + +func isOpenTxn(status string) bool { + return status == sql.OpenStateStr || status == sql.NoTxnStateStr +} diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index a826c1a983d3..82654b27abce 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -119,7 +119,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); // the kv-level transaction has already been committed). But we still // exercise this state to check that the server doesn't crash (which used to // happen - #9879). - tests := []string{"Open", "RestartWait", "CommitWait"} + tests := []string{"Open", "Aborted", "CommitWait"} for _, state := range tests { t.Run(state, func(t *testing.T) { // Create a low-level lib/pq connection so we can close it at will. @@ -151,7 +151,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); t.Fatal(err) } - if state == "RestartWait" || state == "CommitWait" { + if state == "CommitWait" { if _, err := tx.ExecContext(ctx, "SAVEPOINT cockroach_restart", nil); err != nil { t.Fatal(err) } @@ -173,7 +173,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); t.Fatal(err) } - if state == "RestartWait" || state == "CommitWait" { + if state == "CommitWait" { _, err := tx.ExecContext(ctx, "RELEASE SAVEPOINT cockroach_restart", nil) if state == "CommitWait" { if err != nil { diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index d997ee1c1061..e3e59c03c200 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -28,11 +28,10 @@ import ( // Constants for the String() representation of the session states. Shared with // the CLI code which needs to recognize them. const ( - NoTxnStr = "NoTxn" + NoTxnStateStr = "NoTxn" OpenStateStr = "Open" AbortedStateStr = "Aborted" CommitWaitStateStr = "CommitWait" - RestartWaitStateStr = "RestartWait" InternalErrorStateStr = "InternalError" ) @@ -43,7 +42,7 @@ type stateNoTxn struct{} var _ fsm.State = &stateNoTxn{} func (stateNoTxn) String() string { - return NoTxnStr + return NoTxnStateStr } type stateOpen struct { @@ -56,6 +55,8 @@ func (stateOpen) String() string { return OpenStateStr } +// stateAborted is entered on errors (retriable and non-retriable). A ROLLBACK +// TO SAVEPOINT can move the transaction back to stateOpen. type stateAborted struct{} var _ fsm.State = &stateAborted{} @@ -64,14 +65,6 @@ func (stateAborted) String() string { return AbortedStateStr } -type stateRestartWait struct{} - -var _ fsm.State = &stateRestartWait{} - -func (stateRestartWait) String() string { - return RestartWaitStateStr -} - type stateCommitWait struct{} var _ fsm.State = &stateCommitWait{} @@ -96,7 +89,6 @@ func (stateInternalError) String() string { func (stateNoTxn) State() {} func (stateOpen) State() {} func (stateAborted) State() {} -func (stateRestartWait) State() {} func (stateCommitWait) State() {} func (stateInternalError) State() {} @@ -117,6 +109,7 @@ type eventTxnStartPayload struct { historicalTimestamp *hlc.Timestamp } +// makeEventTxnStartPayload creates an eventTxnStartPayload. func makeEventTxnStartPayload( pri roachpb.UserPriority, readOnly tree.ReadWriteMode, @@ -141,7 +134,11 @@ type eventTxnFinishPayload struct { commit bool } -type eventTxnRestart struct{} +// eventSavepointRollback is generated when we want to move from Aborted to Open +// through a ROLLBACK TO SAVEPOINT . Note that it is not +// generated when such a savepoint is rolled back to from the Open state. In +// that case no event is necessary. +type eventSavepointRollback struct{} type eventNonRetriableErr struct { IsCommit fsm.Bool @@ -183,8 +180,13 @@ func (p eventRetriableErrPayload) errorCause() error { // eventRetriableErrPayload implements payloadWithError. var _ payloadWithError = eventRetriableErrPayload{} +// eventTxnRestart is generated by a rollback to a savepoint placed at the +// beginning of the transaction (commonly SAVEPOINT cockroach_restart). +type eventTxnRestart struct{} + // eventTxnReleased is generated after a successful RELEASE SAVEPOINT -// cockroach_restart. It moves the state to CommitWait. +// cockroach_restart. It moves the state to CommitWait. The event is not +// generated by releasing regular savepoints. type eventTxnReleased struct{} // payloadWithError is a common interface for the payloads that wrap an error. @@ -192,12 +194,13 @@ type payloadWithError interface { errorCause() error } -func (eventTxnStart) Event() {} -func (eventTxnFinish) Event() {} -func (eventTxnRestart) Event() {} -func (eventNonRetriableErr) Event() {} -func (eventRetriableErr) Event() {} -func (eventTxnReleased) Event() {} +func (eventTxnStart) Event() {} +func (eventTxnFinish) Event() {} +func (eventSavepointRollback) Event() {} +func (eventNonRetriableErr) Event() {} +func (eventRetriableErr) Event() {} +func (eventTxnRestart) Event() {} +func (eventTxnReleased) Event() {} // TxnStateTransitions describe the transitions used by a connExecutor's // fsm.Machine. Args.Extended is a txnState, which is muted by the Actions. @@ -300,12 +303,21 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) ts.setAdvanceInfo(skipBatch, noRewind, noEvent) - ts.txnAbortCount.Inc(1) + return nil + }, + }, + // ROLLBACK TO SAVEPOINT cockroach. There's not much to do other than generating a + // txnRestart output event. + eventTxnRestart{}: { + Description: "ROLLBACK TO SAVEPOINT cockroach_restart", + Next: stateOpen{ImplicitTxn: fsm.False}, + Action: func(args fsm.Args) error { + args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart) return nil }, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { - Next: stateRestartWait{}, + Next: stateAborted{}, Action: func(args fsm.Args) error { // Note: Preparing the KV txn for restart has already happened by this // point. @@ -321,38 +333,22 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - // ROLLBACK TO SAVEPOINT - eventTxnRestart{}: { - Description: "ROLLBACK TO SAVEPOINT cockroach_restart", - Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - state := args.Extended.(*txnState) - // NOTE: We don't bump the txn timestamp on this restart. Should we? - // Well, if we generally supported savepoints and one would issue a - // rollback to a regular savepoint, clearly we couldn't bump the - // timestamp in that case. In the special case of the cockroach_restart - // savepoint, it's not clear to me what a user's expectation might be. - state.mu.txn.ManualRestart(args.Ctx, hlc.Timestamp{}) - args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart) - return nil - }, - }, }, /// Aborted // // Note that we don't handle any error events here. Any statement but a - // ROLLBACK is expected to not be passed to the state machine. + // ROLLBACK (TO SAVEPOINT) is expected to not be passed to the state machine. stateAborted{}: { eventTxnFinish{}: { Description: "ROLLBACK", Next: stateNoTxn{}, Action: func(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.txnAbortCount.Inc(1) // Note that the KV txn has been rolled back by now by statement // execution. - return args.Extended.(*txnState).finishTxn( - args.Payload.(eventTxnFinishPayload), - ) + return ts.finishTxn(args.Payload.(eventTxnFinishPayload)) }, }, eventNonRetriableErr{IsCommit: fsm.Any}: { @@ -364,51 +360,26 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - // ROLLBACK TO SAVEPOINT. We accept this in the Aborted state for the - // convenience of clients who want to issue ROLLBACK TO SAVEPOINT regardless - // of the preceding query error. - eventTxnStart{ImplicitTxn: fsm.False}: { - Description: "ROLLBACK TO SAVEPOINT cockroach_restart", + // ROLLBACK TO SAVEPOINT success. + eventSavepointRollback{}: { + Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success", Next: stateOpen{ImplicitTxn: fsm.False}, Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - ts.finishSQLTxn() - - payload := args.Payload.(eventTxnStartPayload) - - // Note that we pass the connection's context here, not args.Ctx which - // was the previous txn's context. - ts.resetForNewSQLTxn( - ts.connCtx, - explicitTxn, - payload.txnSQLTimestamp, - payload.historicalTimestamp, - payload.pri, payload.readOnly, - nil, /* txn */ - args.Payload.(eventTxnStartPayload).tranCtx, - ) - ts.setAdvanceInfo(advanceOne, noRewind, txnRestart) + args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, noEvent) return nil }, }, - }, - - stateRestartWait{}: { - // ROLLBACK (and also COMMIT which acts like ROLLBACK) - eventTxnFinish{}: { - Description: "ROLLBACK", - Next: stateNoTxn{}, + // ROLLBACK TO SAVEPOINT failed because the txn needs to restart. + eventRetriableErr{CanAutoRetry: fsm.Any, IsCommit: fsm.Any}: { + // This event doesn't change state, but it returns a skipBatch code. + Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart", + Next: stateAborted{}, Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - // The client is likely rolling back because it can't do retries. Let's - // count it as an abort. - ts.txnAbortCount.Inc(1) - return args.Extended.(*txnState).finishTxn( - args.Payload.(eventTxnFinishPayload), - ) + args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) + return nil }, }, - // ROLLBACK TO SAVEPOINT + // ROLLBACK TO SAVEPOINT cockroach_restart. eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, @@ -417,15 +388,6 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, - eventNonRetriableErr{IsCommit: fsm.Any}: { - Next: stateAborted{}, - Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - ts.setAdvanceInfo(skipBatch, noRewind, noEvent) - ts.txnAbortCount.Inc(1) - return nil - }, - }, }, stateCommitWait{}: { diff --git a/pkg/sql/logictest/testdata/logic_test/manual_retry b/pkg/sql/logictest/testdata/logic_test/manual_retry index e9a9f70b2a2b..20c2c94ac1e2 100644 --- a/pkg/sql/logictest/testdata/logic_test/manual_retry +++ b/pkg/sql/logictest/testdata/logic_test/manual_retry @@ -51,21 +51,51 @@ SELECT currval('s') statement ok COMMIT -subtest savepoint_name +# subtest savepoint_name + +# Check that releasing the special cockroach_restart savepoint moves us to CommitWait. statement ok BEGIN -# Ensure that ident case rules are used. -statement error pq: unimplemented: SAVEPOINT not supported except for cockroach_restart -SAVEPOINT "COCKROACH_RESTART" +statement ok +SAVEPOINT cockroach_restart + +query T +SHOW TRANSACTION STATUS +---- +Open statement ok -ROLLBACK; BEGIN +RELEASE SAVEPOINT cockroach_restart + +query T +SHOW TRANSACTION STATUS +---- +CommitWait + +statement ok +ROLLBACK # Ensure that ident case rules are used. statement ok -SAVEPOINT COCKROACH_RESTART +BEGIN + +statement ok +SAVEPOINT "COCKROACH_RESTART" + +query T +SHOW TRANSACTION STATUS +---- +Open + +statement ok +RELEASE SAVEPOINT "COCKROACH_RESTART" + +query T +SHOW TRANSACTION STATUS +---- +Open statement ok ROLLBACK @@ -132,34 +162,52 @@ BEGIN TRANSACTION; SAVEPOINT something_else; COMMIT statement ok BEGIN TRANSACTION; SAVEPOINT foo -statement error pq: SAVEPOINT "foo" is in use +statement error pq: savepoint "bar" does not exist ROLLBACK TO SAVEPOINT bar # Verify we're doing the right thing for non-quoted idents. statement ok ROLLBACK TO SAVEPOINT FOO +statement ok +ABORT; BEGIN TRANSACTION + # Verify use of quoted idents. statement ok SAVEPOINT "Foo Bar" -statement error pq: SAVEPOINT "Foo Bar" is in use +statement error pq: savepoint "foobar" does not exist ROLLBACK TO SAVEPOINT FooBar # Verify case-sensitivity of quoted idents. -statement error pq: SAVEPOINT "Foo Bar" is in use +statement error pq: savepoint "foo bar" does not exist ROLLBACK TO SAVEPOINT "foo bar" statement ok ROLLBACK TO SAVEPOINT "Foo Bar" +query TB colnames +SHOW SAVEPOINT STATUS +---- +savepoint_name is_initial_savepoint +Foo Bar true + +statement ok +ABORT; BEGIN TRANSACTION + # Verify case-sensitivity of quoted vs. unquoted idents. statement ok SAVEPOINT "UpperCase" -statement error pq: SAVEPOINT "UpperCase" is in use +statement error pq: savepoint "uppercase" does not exist ROLLBACK TO SAVEPOINT UpperCase +query TB colnames +SHOW SAVEPOINT STATUS +---- +savepoint_name is_initial_savepoint +UpperCase true + statement ok ABORT diff --git a/pkg/sql/logictest/testdata/logic_test/txn b/pkg/sql/logictest/testdata/logic_test/txn index 89447d8266c0..96c717af71ca 100644 --- a/pkg/sql/logictest/testdata/logic_test/txn +++ b/pkg/sql/logictest/testdata/logic_test/txn @@ -549,7 +549,7 @@ CommitWait statement ok COMMIT -# RestartWait state +# Aborted state # The SELECT 1 is necessary to move the txn out of the AutoRetry state, # otherwise the next statement is automatically retried on the server. statement ok @@ -561,7 +561,7 @@ SELECT crdb_internal.force_retry('1h':::INTERVAL) query T SHOW TRANSACTION STATUS ---- -RestartWait +Aborted statement ok ROLLBACK TO SAVEPOINT cockroach_restart @@ -695,146 +695,6 @@ ROLLBACK; DROP SEQUENCE s -# Wrong savepoint name moves the txn state from RestartWait to Aborted. -statement ok -BEGIN TRANSACTION; - SAVEPOINT cockroach_restart; - SELECT 1 - -query error pgcode 40001 restart transaction: crdb_internal.force_retry\(\): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry\(\) -SELECT crdb_internal.force_retry('1h':::INTERVAL) - -query T -SHOW TRANSACTION STATUS ----- -RestartWait - -statement error pq: SAVEPOINT "cockroach_restart" is in use -ROLLBACK TO SAVEPOINT bogus_name - -query T -SHOW TRANSACTION STATUS ----- -Aborted - -statement ok -ROLLBACK - -# General savepoints -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -SAVEPOINT other - -statement ok -ROLLBACK - -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -RELEASE SAVEPOINT other - -statement ok -ROLLBACK - -statement ok -BEGIN TRANSACTION - -statement error SAVEPOINT not supported except for cockroach_restart -ROLLBACK TO SAVEPOINT other - -statement ok -ROLLBACK - -# Savepoint must be first statement in a transaction. -statement ok -BEGIN TRANSACTION; UPSERT INTO kv VALUES('savepoint', 'true') - -statement error SAVEPOINT cockroach_restart needs to be the first statement in a transaction -SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - -# Can rollback to a savepoint if no statements have been executed. -statement ok -BEGIN TRANSACTION; SAVEPOINT cockroach_restart - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -# Can do it twice in a row. -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -# Can rollback after a transactional write, even from a non-error state. -statement ok -UPSERT INTO kv VALUES('savepoint', 'true') - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -COMMIT - -# Because we rolled back, the 'savepoint' insert will not have been committed. -query I -SELECT count(*) FROM kv WHERE k = 'savepoint' ----- -0 - - -# Can ROLLBACK TO SAVEPOINT even from a non-retryable error. -statement ok -BEGIN TRANSACTION; SAVEPOINT cockroach_restart - -statement error pq: relation "bogus_name" does not exist -SELECT * from bogus_name - -query T -SHOW TRANSACTION STATUS ----- -Aborted - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -query T -SHOW TRANSACTION STATUS ----- -Open - -statement ok -ROLLBACK - - -# ROLLBACK TO SAVEPOINT in a txn without a SAVEPOINT. -statement ok -BEGIN - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - - -# ROLLBACK TO SAVEPOINT in an aborted txn without a SAVEPOINT. -statement ok -BEGIN - -statement error pq: relation "bogus_name" does not exist -SELECT * from bogus_name - -statement ok -ROLLBACK TO SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - - # Test READ ONLY/WRITE syntax. statement ok @@ -1026,14 +886,6 @@ SELECT crdb_internal.force_retry('1h':::INTERVAL) statement ok ROLLBACK -# Check that we don't crash when doing a release that wasn't preceded by a -# savepoint. -statement ok -BEGIN; RELEASE SAVEPOINT cockroach_restart - -statement ok -ROLLBACK - # restore the default statement ok SET default_transaction_read_only = false diff --git a/pkg/sql/metric_test.go b/pkg/sql/metric_test.go index f100a7c6f7bf..b17c2eda1635 100644 --- a/pkg/sql/metric_test.go +++ b/pkg/sql/metric_test.go @@ -252,9 +252,6 @@ func TestAbortCountErrorDuringTransaction(t *testing.T) { t.Fatal("Expected an error but didn't get one") } - if _, err := checkCounterDelta(s, sql.MetaTxnAbort, accum.txnAbortCount, 1); err != nil { - t.Error(err) - } if _, err := checkCounterDelta(s, sql.MetaTxnBeginStarted, accum.txnBeginCount, 1); err != nil { t.Error(err) } @@ -265,6 +262,10 @@ func TestAbortCountErrorDuringTransaction(t *testing.T) { if err := txn.Rollback(); err != nil { t.Fatal(err) } + + if _, err := checkCounterDelta(s, sql.MetaTxnAbort, accum.txnAbortCount, 1); err != nil { + t.Error(err) + } } func TestSavepointMetrics(t *testing.T) { @@ -309,8 +310,8 @@ func TestSavepointMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if _, err := txn.Exec("SAVEPOINT blah"); err == nil { - t.Fatal("expected an error but didn't get one") + if _, err := txn.Exec("SAVEPOINT blah"); err != nil { + t.Fatal(err) } if err := txn.Rollback(); err != nil { t.Fatal(err) diff --git a/pkg/sql/opt/exec/execbuilder/builder.go b/pkg/sql/opt/exec/execbuilder/builder.go index 9e4ba717df69..4a79c5d2cd4a 100644 --- a/pkg/sql/opt/exec/execbuilder/builder.go +++ b/pkg/sql/opt/exec/execbuilder/builder.go @@ -69,6 +69,11 @@ type Builder struct { // set to true, it ensures that a FOR UPDATE row-level locking mode is used // by scans. See forUpdateLocking. forceForUpdateLocking bool + + // -- output -- + + // IsDDL is set to true if the statement contains DDL. + IsDDL bool } // New constructs an instance of the execution node builder using the diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index b8cd95b79e17..0efd92844951 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -141,13 +141,16 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { var ep execPlan var err error - // This will set the system DB trigger for transactions containing - // schema-modifying statements that have no effect, such as - // `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;` - // where the table already exists. This will generate some false schema - // cache refreshes, but that's expected to be quite rare in practice. - isDDL := opt.IsDDLOp(e) - if isDDL { + if opt.IsDDLOp(e) { + // Mark the statement as containing DDL for use + // in the SQL executor. + b.IsDDL = true + + // This will set the system DB trigger for transactions containing + // schema-modifying statements that have no effect, such as + // `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;` + // where the table already exists. This will generate some false schema + // cache refreshes, but that's expected to be quite rare in practice. if err := b.evalCtx.Txn.SetSystemConfigTrigger(); err != nil { return execPlan{}, errors.WithSecondaryError( unimplemented.NewWithIssuef(26508, diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index 862bfb1dbfba..fbf893e9591b 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -301,6 +301,7 @@ func TestContextualHelp(t *testing.T) { {`SHOW TRANSACTION ISOLATION LEVEL ??`, `SHOW TRANSACTION`}, {`SHOW SYNTAX ??`, `SHOW SYNTAX`}, {`SHOW SYNTAX 'foo' ??`, `SHOW SYNTAX`}, + {`SHOW SAVEPOINT STATUS ??`, `SHOW SAVEPOINT`}, {`SHOW RANGE ??`, `SHOW RANGE`}, diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 9294de4df3b7..703475ba91e8 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -519,6 +519,8 @@ func TestParse(t *testing.T) { {`SHOW TRANSACTION STATUS`}, {`EXPLAIN SHOW TRANSACTION STATUS`}, + {`SHOW SAVEPOINT STATUS`}, + {`EXPLAIN SHOW SAVEPOINT STATUS`}, {`SHOW SYNTAX 'select 1'`}, {`EXPLAIN SHOW SYNTAX 'select 1'`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 57612e1769f3..7a17307c220f 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -781,6 +781,7 @@ func newNameFromStr(s string) *tree.Name { %type show_sequences_stmt %type show_session_stmt %type show_sessions_stmt +%type show_savepoint_stmt %type show_stats_stmt %type show_syntax_stmt %type show_tables_stmt @@ -3368,6 +3369,7 @@ show_stmt: | show_ranges_stmt // EXTEND WITH HELP: SHOW RANGES | show_range_for_row_stmt | show_roles_stmt // EXTEND WITH HELP: SHOW ROLES +| show_savepoint_stmt // EXTEND WITH HELP: SHOW SAVEPOINT | show_schemas_stmt // EXTEND WITH HELP: SHOW SCHEMAS | show_sequences_stmt // EXTEND WITH HELP: SHOW SEQUENCES | show_session_stmt // EXTEND WITH HELP: SHOW SESSION @@ -3820,6 +3822,17 @@ show_syntax_stmt: } | SHOW SYNTAX error // SHOW HELP: SHOW SYNTAX +// %Help: SHOW SAVEPOINT - display current savepoint properties +// %Category: Cfg +// %Text: SHOW SAVEPOINT STATUS +show_savepoint_stmt: + SHOW SAVEPOINT STATUS + { + /* SKIP DOC */ + $$.val = &tree.ShowSavepointStatus{} + } +| SHOW SAVEPOINT error // SHOW HELP: SHOW SAVEPOINT + // %Help: SHOW TRANSACTION - display current transaction properties // %Category: Cfg // %Text: SHOW TRANSACTION {ISOLATION LEVEL | PRIORITY | STATUS} diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 17b2225dce93..0879e7588cf9 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -442,6 +442,9 @@ const ( // planFlagImplicitTxn marks that the plan was run inside of an implicit // transaction. planFlagImplicitTxn + + // planFlagIsDDL marks that the plan contains DDL. + planFlagIsDDL ) func (pf planFlags) IsSet(flag planFlags) bool { diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index a6df4432ef3d..17988b385e62 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -164,7 +164,8 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { // Build the plan tree. root := execMemo.RootExpr() execFactory := makeExecFactory(p) - plan, err := execbuilder.New(&execFactory, execMemo, &opc.catalog, root, p.EvalContext()).Build() + bld := execbuilder.New(&execFactory, execMemo, &opc.catalog, root, p.EvalContext()) + plan, err := bld.Build() if err != nil { return err } @@ -172,6 +173,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { result := plan.(*planTop) result.stmt = stmt result.flags = opc.flags + if bld.IsDDL { + result.flags.Set(planFlagIsDDL) + } cols := planColumns(result.plan) if stmt.ExpectedTypes != nil { diff --git a/pkg/sql/sem/tree/show.go b/pkg/sql/sem/tree/show.go index 95a441914137..326fc8e4848a 100644 --- a/pkg/sql/sem/tree/show.go +++ b/pkg/sql/sem/tree/show.go @@ -395,6 +395,15 @@ func (node *ShowTransactionStatus) Format(ctx *FmtCtx) { ctx.WriteString("SHOW TRANSACTION STATUS") } +// ShowSavepointStatus represents a SHOW SAVEPOINT STATUS statement. +type ShowSavepointStatus struct { +} + +// Format implements the NodeFormatter interface. +func (node *ShowSavepointStatus) Format(ctx *FmtCtx) { + ctx.WriteString("SHOW SAVEPOINT STATUS") +} + // ShowUsers represents a SHOW USERS statement. type ShowUsers struct { } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 260222378501..93593c13158b 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -782,6 +782,14 @@ func (*ShowTransactionStatus) StatementTag() string { return "SHOW TRANSACTION S func (*ShowTransactionStatus) observerStatement() {} +// StatementType implements the Statement interface. +func (*ShowSavepointStatus) StatementType() StatementType { return Rows } + +// StatementTag returns a short string identifying the type of statement. +func (*ShowSavepointStatus) StatementTag() string { return "SHOW SAVEPOINT STATUS" } + +func (*ShowSavepointStatus) observerStatement() {} + // StatementType implements the Statement interface. func (*ShowUsers) StatementType() StatementType { return Rows } @@ -976,6 +984,7 @@ func (n *ShowRanges) String() string { return AsString(n) } func (n *ShowRangeForRow) String() string { return AsString(n) } func (n *ShowRoleGrants) String() string { return AsString(n) } func (n *ShowRoles) String() string { return AsString(n) } +func (n *ShowSavepointStatus) String() string { return AsString(n) } func (n *ShowSchemas) String() string { return AsString(n) } func (n *ShowSequences) String() string { return AsString(n) } func (n *ShowSessions) String() string { return AsString(n) } diff --git a/pkg/sql/sqlbase/errors.go b/pkg/sql/sqlbase/errors.go index 78febeb2366e..cd769a044a78 100644 --- a/pkg/sql/sqlbase/errors.go +++ b/pkg/sql/sqlbase/errors.go @@ -30,7 +30,8 @@ const ( ) // NewTransactionAbortedError creates an error for trying to run a command in -// the context of transaction that's already aborted. +// the context of transaction that's in the aborted state. Any statement other +// than ROLLBACK TO SAVEPOINT will return this error. func NewTransactionAbortedError(customMsg string) error { if customMsg != "" { return pgerror.Newf( diff --git a/pkg/sql/testdata/savepoints b/pkg/sql/testdata/savepoints new file mode 100644 index 000000000000..4b36c9b980d0 --- /dev/null +++ b/pkg/sql/testdata/savepoints @@ -0,0 +1,583 @@ +# This test exercises the savepoint state in the conn executor. + +subtest implicit_release_at_end + +# It's OK to leave savepoints open when the txn commits. +# This releases everything. +sql +BEGIN +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###.. foo>bar +4: SAVEPOINT baz -- 0 rows +-- Open -> Open ####. foo>bar>baz +5: COMMIT -- 0 rows +-- Open -> NoTxn ##### (none) + +# Ditto rollbacks. +sql +BEGIN +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +ROLLBACK +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###.. foo>bar +4: SAVEPOINT baz -- 0 rows +-- Open -> Open ####. foo>bar>baz +5: ROLLBACK -- 0 rows +-- Open -> NoTxn #.... (none) + +subtest end + +subtest savepoint_stack + +sql +BEGIN +SAVEPOINT foo +SAVEPOINT foo +SAVEPOINT bar +SAVEPOINT baz +ROLLBACK TO SAVEPOINT foo +SAVEPOINT baz +RELEASE SAVEPOINT foo +SAVEPOINT bar +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.......... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##......... foo +3: SAVEPOINT foo -- 0 rows +-- Open -> Open ###........ foo>foo +4: SAVEPOINT bar -- 0 rows +-- Open -> Open ####....... foo>foo>bar +5: SAVEPOINT baz -- 0 rows +-- Open -> Open #####...... foo>foo>bar>baz +6: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ###........ foo>foo +7: SAVEPOINT baz -- 0 rows +-- Open -> Open ###...#.... foo>foo>baz +8: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###...##... foo +9: SAVEPOINT bar -- 0 rows +-- Open -> Open ###...###.. foo>bar +10: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###...####. (none) +11: COMMIT -- 0 rows +-- Open -> NoTxn ###...##### (none) + + +subtest end + +subtest savepoint_release_vs_rollback + +# A rollback keeps the savepoint active. +sql +BEGIN +SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +5: COMMIT -- 0 rows +-- Open -> NoTxn ##..# (none) + +# A release does not. +sql +BEGIN +SAVEPOINT foo +RELEASE SAVEPOINT foo +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ###.. (none) +4: RELEASE SAVEPOINT foo -- pq: savepoint "foo" does not exist +-- Open -> Aborted XXXXX (none) +5: COMMIT -- 0 rows +-- Aborted -> NoTxn #.... (none) + +subtest end + + +subtest rollback_after_sql_error + +sql +BEGIN +SAVEPOINT foo +SELECT nonexistent +ROLLBACK TO SAVEPOINT foo +SELECT 123 +COMMIT +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #..... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##.... foo +3: SELECT nonexistent -- pq: column "nonexistent" does not exist +-- Open -> Aborted XXXXXX foo +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open ##.... foo +5: SELECT 123 -- 1 row +-- Open -> Open ##..#. foo +6: COMMIT -- 0 rows +-- Open -> NoTxn ##..## (none) + +subtest end + +subtest rollback_after_dup_error + +sql +CREATE TABLE t(x INT UNIQUE) +INSERT INTO t(x) VALUES (1) +BEGIN +SAVEPOINT foo +INSERT INTO t(x) VALUES (1) +ROLLBACK TO SAVEPOINT foo +INSERT INTO t(x) VALUES (2) +COMMIT +---- +1: CREATE TABLE t(x INT UNIQUE) -- 0 rows +-- NoTxn -> NoTxn #....... (none) +2: INSERT INTO t(x) VALUES (1) -- 1 row +-- NoTxn -> NoTxn ##...... (none) +3: BEGIN -- 0 rows +-- NoTxn -> Open ###..... (none) +4: SAVEPOINT foo -- 0 rows +-- Open -> Open ####.... foo +5: INSERT INTO t(x) VALUES (1) -- pq: duplicate key value (x)=(1) violates unique constraint "t_x_key" +-- Open -> Aborted XXXXXXXX foo +6: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open ####.... foo +7: INSERT INTO t(x) VALUES (2) -- 1 row +-- Open -> Open ####..#. foo +8: COMMIT -- 0 rows +-- Open -> NoTxn ####..## (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest rollback_after_ddl + +# DDL under savepoints is fine as long as there is no rollback. +# Note: we do two DDL; the first one is there just to anchor +# the txn on the config range. The second DDL is the one +# exercised in the test. +sql +BEGIN; CREATE TABLE unused(x INT) +SAVEPOINT foo +CREATE TABLE t(x INT) +RELEASE SAVEPOINT foo +COMMIT +---- +1: BEGIN; CREATE TABLE unused(x INT) -- 0 rows +-- NoTxn -> Open #.... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##... foo +3: CREATE TABLE t(x INT) -- 0 rows +-- Open -> Open ###.. foo +4: RELEASE SAVEPOINT foo -- 0 rows +-- Open -> Open ####. (none) +5: COMMIT -- 0 rows +-- Open -> NoTxn ##### (none) + +sql +DROP TABLE unused +DROP TABLE t +---- +1: DROP TABLE unused -- 0 rows +-- NoTxn -> NoTxn #. (none) +2: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn ## (none) + +# Rollback is unsupported after DDL for now. +# TODO(knz): Lift this limitation. + +sql +BEGIN; CREATE TABLE unused(x INT) +SAVEPOINT foo +CREATE TABLE t(x INT) +ROLLBACK TO SAVEPOINT foo +---- +1: BEGIN; CREATE TABLE unused(x INT) -- 0 rows +-- NoTxn -> Open #... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##.. foo +3: CREATE TABLE t(x INT) -- 0 rows +-- Open -> Open ###. foo +4: ROLLBACK TO SAVEPOINT foo -- pq: unimplemented: ROLLBACK TO SAVEPOINT not yet supported after DDL statements +-- Open -> Aborted XXXX foo + +subtest end + +subtest cockroach_restart_cant_be_nested + +sql +BEGIN +SAVEPOINT foo +SAVEPOINT cockroach_restart +ROLLBACK +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #... (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##.. foo +3: SAVEPOINT cockroach_restart -- pq: SAVEPOINT "cockroach_restart" cannot be nested +-- Open -> Aborted XXXX foo +4: ROLLBACK -- 0 rows +-- Aborted -> NoTxn #... (none) + +# Check the behavior of issuing "SAVEPOINT cockroach_restart". Multiple times. +# That is allowed (to facilitate SAVEPOINT cr; ROLLBACK TO cr; SAVEPOINT cr), +# but we're not actually creating multiple savepoints with the same name because +# the special release semantics don't allow us. +sql +BEGIN; SAVEPOINT cockroach_restart; SAVEPOINT cockroach_restart +RELEASE SAVEPOINT cockroach_restart +ROLLBACK TO SAVEPOINT cockroach_restart +---- +1: BEGIN; SAVEPOINT cockroach_restart; SAVEPOINT cockroach_restart -- 0 rows +-- NoTxn -> Open #.. cockroach_restart(r) +2: RELEASE SAVEPOINT cockroach_restart -- 0 rows +-- Open -> CommitWait XXX (none) +3: ROLLBACK TO SAVEPOINT cockroach_restart -- pq: current transaction is committed, commands ignored until end of transaction block +-- CommitWait -> CommitWait XXX (none) + +subtest end + +subtest invalid_uses + +sql +SAVEPOINT foo +ROLLBACK TO SAVEPOINT foo +RELEASE SAVEPOINT foo +---- +1: SAVEPOINT foo -- pq: there is no transaction in progress +-- NoTxn -> NoTxn #.. (none) +2: ROLLBACK TO SAVEPOINT foo -- pq: savepoint "foo" does not exist +-- NoTxn -> NoTxn ##. (none) +3: RELEASE SAVEPOINT foo -- pq: there is no transaction in progress +-- NoTxn -> NoTxn ### (none) + +sql +BEGIN +SAVEPOINT foo +RELEASE SAVEPOINT bar +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##. foo +3: RELEASE SAVEPOINT bar -- pq: savepoint "bar" does not exist +-- Open -> Aborted XXX foo + +sql +BEGIN +SAVEPOINT foo +ROLLBACK TO SAVEPOINT bar +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##. foo +3: ROLLBACK TO SAVEPOINT bar -- pq: savepoint "bar" does not exist +-- Open -> Aborted XXX foo + +subtest end + +subtest rollback_after_error + +# check that we can rollback after an error +sql +BEGIN; SAVEPOINT foo +SELECT * FROM bogus_name +ROLLBACK TO SAVEPOINT foo +ROLLBACK +---- +1: BEGIN; SAVEPOINT foo -- 0 rows +-- NoTxn -> Open #... foo(r) +2: SELECT * FROM bogus_name -- pq: relation "bogus_name" does not exist +-- Open -> Aborted XXXX foo(r) +3: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Aborted -> Open #... foo(r) +4: ROLLBACK -- 0 rows +-- Open -> NoTxn #... (none) + +# check that we can rollback after a retriable error to an initial savepoint +sql +CREATE TABLE t(x INT) +BEGIN; SAVEPOINT init; INSERT INTO t(x) VALUES (1) +SELECT crdb_internal.force_retry('1h') +ROLLBACK TO SAVEPOINT init +SELECT x from t +ROLLBACK; DROP TABLE t +---- +1: CREATE TABLE t(x INT) -- 0 rows +-- NoTxn -> NoTxn #..... (none) +2: BEGIN; SAVEPOINT init; INSERT INTO t(x) VALUES (1) -- 1 row +-- NoTxn -> Open ##.... init(r) +3: SELECT crdb_internal.force_retry('1h') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXXXX init(r) +4: ROLLBACK TO SAVEPOINT init -- 0 rows +-- Aborted -> Open ##.... init(r) +5: SELECT x from t -- 0 rows +-- Open -> Open ##..#. init(r) +6: ROLLBACK; DROP TABLE t -- 0 rows +-- Open -> NoTxn ##.... (none) + +# Check that, after a retriable error, rolling back to anything other than an +# initial savepoint fails with a retriable error. +sql +CREATE TABLE t(x INT) +BEGIN; SAVEPOINT init; SELECT count(1) from t; SAVEPOINT inner_savepoint +SELECT crdb_internal.force_retry('1h') +ROLLBACK TO SAVEPOINT inner_savepoint +ROLLBACK TO SAVEPOINT init +ROLLBACK; DROP TABLE t +---- +1: CREATE TABLE t(x INT) -- 0 rows +-- NoTxn -> NoTxn #..... (none) +2: BEGIN; SAVEPOINT init; SELECT count(1) from t; SAVEPOINT inner_savepoint -- 0 rows +-- NoTxn -> Open ##.... init(r)>inner_savepoint +3: SELECT crdb_internal.force_retry('1h') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXXXX init(r)>inner_savepoint +4: ROLLBACK TO SAVEPOINT inner_savepoint -- pq: restart transaction: TransactionRetryWithProtoRefreshError: cannot rollback to savepoint after a transaction restart +-- Aborted -> Aborted XXXXXX init(r)>inner_savepoint +5: ROLLBACK TO SAVEPOINT init -- 0 rows +-- Aborted -> Open ##.... init(r) +6: ROLLBACK; DROP TABLE t -- 0 rows +-- Open -> NoTxn ##.... (none) + +subtest end + + +subtest restart + +subtest restart/must_be_first_in_txn + +sql +CREATE TABLE t(x INT) +BEGIN +INSERT INTO t(x) VALUES (1) +SAVEPOINT cockroach_restart +---- +1: CREATE TABLE t(x INT) -- 0 rows +-- NoTxn -> NoTxn #... (none) +2: BEGIN -- 0 rows +-- NoTxn -> Open ##.. (none) +3: INSERT INTO t(x) VALUES (1) -- 1 row +-- Open -> Open ###. (none) +4: SAVEPOINT cockroach_restart -- pq: SAVEPOINT "cockroach_restart" needs to be the first statement in a transaction +-- Open -> Aborted XXXX (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest restart/release_without_savepoint + +sql +BEGIN +RELEASE SAVEPOINT cockroach_restart +ROLLBACK +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: RELEASE SAVEPOINT cockroach_restart -- pq: savepoint "cockroach_restart" does not exist +-- Open -> Aborted XXX (none) +3: ROLLBACK -- 0 rows +-- Aborted -> NoTxn #.. (none) + +subtest end + +subtest restart/rollback_without_savepoint + +# ROLLBACK TO SAVEPOINT in an open txn without a SAVEPOINT. +sql +BEGIN +ROLLBACK TO SAVEPOINT cockroach_restart +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #. (none) +2: ROLLBACK TO SAVEPOINT cockroach_restart -- pq: savepoint "cockroach_restart" does not exist +-- Open -> Aborted XX (none) + +# ROLLBACK TO SAVEPOINT in an aborted txn without a SAVEPOINT. +sql +BEGIN +SELECT * FROM bogus_name +ROLLBACK TO SAVEPOINT cockroach_restart +---- +1: BEGIN -- 0 rows +-- NoTxn -> Open #.. (none) +2: SELECT * FROM bogus_name -- pq: relation "bogus_name" does not exist +-- Open -> Aborted XXX (none) +3: ROLLBACK TO SAVEPOINT cockroach_restart -- pq: savepoint "cockroach_restart" does not exist +-- Aborted -> Aborted XXX (none) + +subtest end + +subtest restart/rollbacks + +sql +CREATE TABLE t(x INT); +BEGIN; SAVEPOINT cockroach_restart +ROLLBACK TO SAVEPOINT cockroach_restart +ROLLBACK TO SAVEPOINT cockroach_restart +INSERT INTO t(x) VALUES (1) +ROLLBACK TO SAVEPOINT cockroach_restart +COMMIT +---- +1: CREATE TABLE t(x INT); -- 0 rows +-- NoTxn -> NoTxn #...... (none) +2: BEGIN; SAVEPOINT cockroach_restart -- 0 rows +-- NoTxn -> Open ##..... cockroach_restart(r) +3: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +4: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +5: INSERT INTO t(x) VALUES (1) -- 1 row +-- Open -> Open ##..#.. cockroach_restart(r) +6: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Open -> Open ##..... cockroach_restart(r) +7: COMMIT -- 0 rows +-- Open -> NoTxn ##....# (none) + +sql +DROP TABLE t +---- +1: DROP TABLE t -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + + +subtest restart/savepoint_under_restart + +sql +BEGIN; SAVEPOINT cockroach_restart +SAVEPOINT foo +SAVEPOINT bar +ROLLBACK TO SAVEPOINT foo +SELECT crdb_internal.force_retry('1h') +ROLLBACK TO SAVEPOINT cockroach_restart +SELECT 123 +COMMIT +---- +1: BEGIN; SAVEPOINT cockroach_restart -- 0 rows +-- NoTxn -> Open #....... cockroach_restart(r) +2: SAVEPOINT foo -- 0 rows +-- Open -> Open ##...... cockroach_restart(r)>foo +3: SAVEPOINT bar -- 0 rows +-- Open -> Open ###..... cockroach_restart(r)>foo>bar +4: ROLLBACK TO SAVEPOINT foo -- 0 rows +-- Open -> Open ##...... cockroach_restart(r)>foo +5: SELECT crdb_internal.force_retry('1h') -- pq: restart transaction: crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry() +-- Open -> Aborted XXXXXXXX cockroach_restart(r)>foo +6: ROLLBACK TO SAVEPOINT cockroach_restart -- 0 rows +-- Aborted -> Open #....... cockroach_restart(r) +7: SELECT 123 -- 1 row +-- Open -> Open #.....#. cockroach_restart(r) +8: COMMIT -- 0 rows +-- Open -> NoTxn #.....## (none) + +subtest end + +subtest restart/all_savepoints_disabled + +# Under "force_savepoint_restart", every savepoint +# is a restart savepoint. + +sql +SET force_savepoint_restart = true +BEGIN; SAVEPOINT foo +SAVEPOINT bar +---- +1: SET force_savepoint_restart = true -- 0 rows +-- NoTxn -> NoTxn #.. (none) +2: BEGIN; SAVEPOINT foo -- 0 rows +-- NoTxn -> Open ##. foo(r) +3: SAVEPOINT bar -- pq: SAVEPOINT "cockroach_restart" cannot be nested +-- Open -> Aborted XXX foo(r) + +sql +SET force_savepoint_restart = false +---- +1: SET force_savepoint_restart = false -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +subtest end + +# Test that the rewinding we do when performing an automatic retry restores the +# savepoint stack properly. +subtest rewind_on_automatic_restarts + +# We're going to generate a retriable error that will rewind us back to the +# SELECT statement (not to the original SAVEPOINT statement since that one is +# special and we advance the rewind position past it). The test checks that, +# after every restart, the RELEASE works because the savepoint has be +# re-instituted before we rewind. +sql +BEGIN; SAVEPOINT a; SELECT 42; RELEASE a; SELECT crdb_internal.force_retry('10ms'); COMMIT; +---- +1: BEGIN; SAVEPOINT a; SELECT 42; RELEASE a; SELECT crdb_internal.force_retry('10ms'); COMMIT; -- 0 rows +-- NoTxn -> NoTxn # (none) + +subtest end + +# Test that cockroach_restart doesn't nest in the same way that regular +# savepoints do. We allow the savepoint cockroach_restart to be redeclared after +# a rollback to cockroach_restart (or even immediately after declaring it the +# first time), and this redeclaration doesn't introduce a new savepoint. +subtest cockroach_restart_nesting +sql +BEGIN; SAVEPOINT cockroach_restart; SAVEPOINT cockroach_restart; +ROLLBACK TO cockroach_restart; SAVEPOINT cockroach_restart; +COMMIT; +---- +1: BEGIN; SAVEPOINT cockroach_restart; SAVEPOINT cockroach_restart; -- 0 rows +-- NoTxn -> Open #.. cockroach_restart(r) +2: ROLLBACK TO cockroach_restart; SAVEPOINT cockroach_restart; -- 0 rows +-- Open -> Open #.. cockroach_restart(r) +3: COMMIT; -- 0 rows +-- Open -> NoTxn #.# (none) + +subtest end diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index 33ea74832845..773ccd6e8d2d 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1065,13 +1065,12 @@ func TestUnexpectedStatementInRestartWait(t *testing.T) { if err := tx.QueryRow("SHOW TRANSACTION STATUS").Scan(&state); err != nil { t.Fatal(err) } - if state != "RestartWait" { - t.Fatalf("expected state %s, got: %s", "RestartWait", state) + if state != "Aborted" { + t.Fatalf("expected state %s, got: %s", "Aborted", state) } if _, err := tx.Exec("SELECT 1"); !testutils.IsError(err, - `pq: Expected "ROLLBACK TO SAVEPOINT cockroach_restart": `+ - "current transaction is aborted, commands ignored until end of transaction block") { + `pq: current transaction is aborted, commands ignored until end of transaction block`) { t.Fatal(err) } if err := tx.QueryRow("SHOW TRANSACTION STATUS").Scan(&state); err != nil { @@ -1449,12 +1448,6 @@ func TestRollbackToSavepointFromUnusualStates(t *testing.T) { } } - // ROLLBACK TO SAVEPOINT with a wrong name - _, err := sqlDB.Exec("ROLLBACK TO SAVEPOINT foo") - if !testutils.IsError(err, "SAVEPOINT not supported except for cockroach_restart") { - t.Fatalf("unexpected error: %v", err) - } - tx, err := sqlDB.Begin() if err != nil { t.Fatal(err) @@ -1513,12 +1506,12 @@ func TestTxnAutoRetriesDisabledAfterResultsHaveBeenSentToClient(t *testing.T) { { name: "client_directed_retries", clientDirectedRetry: true, - expectedTxnStateAfterRetriableErr: "RestartWait", + expectedTxnStateAfterRetriableErr: "Aborted", }, { name: "no_client_directed_retries", clientDirectedRetry: false, - expectedTxnStateAfterRetriableErr: "RestartWait", + expectedTxnStateAfterRetriableErr: "Aborted", }, { name: "autocommit", diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 1b0dda449af1..4c4f45d94178 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -104,11 +104,6 @@ type txnState struct { // txnAbortCount is incremented whenever the state transitions to // stateAborted. txnAbortCount *metric.Counter - - // activeRestartSavepointName stores the name of the active - // top-level restart savepoint, or is empty if no top-level restart - // savepoint is active. - activeRestartSavepointName tree.Name } // txnType represents the type of a SQL transaction. @@ -376,7 +371,9 @@ const ( txnRollback // txnRestart means that the transaction is restarting. The iteration of the // txn just finished will not commit. It is generated when we're about to - // auto-retry a txn and after a "ROLLBACK TO SAVEPOINT cockroach_restart". + // auto-retry a txn and after a rollback to a savepoint placed at the start of + // the transaction. This allows such savepoints to reset more state than other + // savepoints. txnRestart ) diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index 350a39446cde..9b3fba53c190 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -117,12 +117,6 @@ func (tc *testContext) createAbortedState() (fsm.State, *txnState) { return stateAborted{}, ts } -func (tc *testContext) createRestartWaitState() (fsm.State, *txnState) { - _, ts := tc.createOpenState(explicitTxn) - s := stateRestartWait{} - return s, ts -} - func (tc *testContext) createCommitWaitState() (fsm.State, *txnState, error) { _, ts := tc.createOpenState(explicitTxn) // Commit the KV txn, simulating what the execution layer is doing. @@ -418,7 +412,7 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b }, - expState: stateRestartWait{}, + expState: stateAborted{}, expAdv: expAdvance{ expCode: skipBatch, expEv: noEvent, @@ -563,68 +557,47 @@ func TestTransitions(t *testing.T) { expTxn: nil, }, { - // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). - name: "Aborted->Starting", + // The txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted). + name: "Aborted->Open", init: func() (fsm.State, *txnState, error) { s, ts := testCon.createAbortedState() return s, ts, nil }, - ev: eventTxnStart{ImplicitTxn: fsm.False}, - evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), - nil /* historicalTimestamp */, tranCtx), + ev: eventSavepointRollback{}, expState: stateOpen{ImplicitTxn: fsm.False}, expAdv: expAdvance{ expCode: advanceOne, - expEv: txnRestart, + expEv: noEvent, }, expTxn: &expKVTxn{}, }, { - // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). - // Verify that the historical timestamp from the evPayload is propagated - // to the expTxn. - name: "Aborted->Starting (historical)", + // The txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted). + name: "Aborted->Restart", init: func() (fsm.State, *txnState, error) { s, ts := testCon.createAbortedState() return s, ts, nil }, - ev: eventTxnStart{ImplicitTxn: fsm.False}, - evPayload: makeEventTxnStartPayload(pri, tree.ReadOnly, now.GoTime(), - &now, tranCtx), + ev: eventTxnRestart{}, expState: stateOpen{ImplicitTxn: fsm.False}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, }, expTxn: &expKVTxn{ - tsNanos: proto.Int64(now.WallTime), - }, - }, - // - // Tests starting from the RestartWait state. - // - { - // The txn got finished, such as after a ROLLBACK. - name: "RestartWait->NoTxn", - init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() - err := ts.mu.txn.Rollback(ts.Ctx) - return s, ts, err - }, - ev: eventTxnFinish{}, - evPayload: eventTxnFinishPayload{commit: false}, - expState: stateNoTxn{}, - expAdv: expAdvance{ - expCode: advanceOne, - expEv: txnRollback, + userPriority: &pri, + tsNanos: &now.WallTime, + origTSNanos: &now.WallTime, + maxTSNanos: &maxTS.WallTime, }, - expTxn: nil, }, { - // The txn got restarted, through a ROLLBACK TO SAVEPOINT. - name: "RestartWait->Open", + // The txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). + // Verify that the historical timestamp from the evPayload is propagated + // to the expTxn. + name: "Aborted->Starting (historical)", init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() + s, ts := testCon.createAbortedState() return s, ts, nil }, ev: eventTxnRestart{}, @@ -633,22 +606,9 @@ func TestTransitions(t *testing.T) { expCode: advanceOne, expEv: txnRestart, }, - expTxn: &expKVTxn{}, - }, - { - name: "RestartWait->Aborted", - init: func() (fsm.State, *txnState, error) { - s, ts := testCon.createRestartWaitState() - return s, ts, nil - }, - ev: eventNonRetriableErr{IsCommit: fsm.False}, - evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, - expState: stateAborted{}, - expAdv: expAdvance{ - expCode: skipBatch, - expEv: noEvent, + expTxn: &expKVTxn{ + tsNanos: proto.Int64(now.WallTime), }, - expTxn: &expKVTxn{}, }, // // Tests starting from the CommitWait state. diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv index 74c6a072896f..296b13bd89ee 100644 --- a/pkg/sql/txnstatetransitions_diagram.gv +++ b/pkg/sql/txnstatetransitions_diagram.gv @@ -15,8 +15,13 @@ digraph finite_state_machine { node [shape = circle]; "Aborted{}" -> "Aborted{}" [label = any other statement>] "Aborted{}" -> "Aborted{}" [label = any other statement>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] "Aborted{}" -> "NoTxn{}" [label = ROLLBACK>] - "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "NoTxn{}" [label = COMMIT>] @@ -26,7 +31,7 @@ digraph finite_state_machine { "NoTxn{}" -> "Open{ImplicitTxn:true}" [label = BEGIN, or before a statement running as an implicit txn>] "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:false}"] "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] - "Open{ImplicitTxn:false}" -> "RestartWait{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] @@ -40,8 +45,4 @@ digraph finite_state_machine { "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = COMMIT/ROLLBACK, or after a statement running as an implicit txn>] - "RestartWait{}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:false}"] - "RestartWait{}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:true}"] - "RestartWait{}" -> "NoTxn{}" [label = ROLLBACK>] - "RestartWait{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] } diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt index a46864c1dedb..82f7822ef152 100644 --- a/pkg/sql/txnstatetransitions_report.txt +++ b/pkg/sql/txnstatetransitions_report.txt @@ -4,15 +4,16 @@ Aborted{} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} - TxnFinish{} - TxnStart{ImplicitTxn:false} - missing events: RetriableErr{CanAutoRetry:false, IsCommit:false} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} - TxnReleased{} + SavepointRollback{} + TxnFinish{} TxnRestart{} + missing events: + TxnReleased{} + TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} CommitWait{} handled events: @@ -24,6 +25,7 @@ CommitWait{} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} TxnReleased{} TxnRestart{} TxnStart{ImplicitTxn:false} @@ -39,6 +41,7 @@ NoTxn{} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} TxnFinish{} TxnReleased{} TxnRestart{} @@ -54,6 +57,7 @@ Open{ImplicitTxn:false} TxnReleased{} TxnRestart{} missing events: + SavepointRollback{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} Open{ImplicitTxn:true} @@ -66,21 +70,8 @@ Open{ImplicitTxn:true} RetriableErr{CanAutoRetry:true, IsCommit:true} TxnFinish{} missing events: + SavepointRollback{} TxnReleased{} TxnRestart{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} -RestartWait{} - handled events: - NonRetriableErr{IsCommit:false} - NonRetriableErr{IsCommit:true} - TxnFinish{} - TxnRestart{} - missing events: - RetriableErr{CanAutoRetry:false, IsCommit:false} - RetriableErr{CanAutoRetry:false, IsCommit:true} - RetriableErr{CanAutoRetry:true, IsCommit:false} - RetriableErr{CanAutoRetry:true, IsCommit:true} - TxnReleased{} - TxnStart{ImplicitTxn:false} - TxnStart{ImplicitTxn:true}