From bb185044df2bc98df34218816df58fe9f60a624c Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 4 Oct 2022 14:52:41 -0400 Subject: [PATCH 1/2] scheduledlogging: shorten TestCaptureIndexUsageStats run time Previously, TestCaptureIndexUsageStats ran through 4 iterations of 20 seconds for a run time of over a minute. This change reduces the run time of the test to under 10 seconds. Release note: None --- pkg/sql/scheduledlogging/BUILD.bazel | 2 - .../captured_index_usage_stats.go | 30 ++--- .../captured_index_usage_stats_test.go | 115 ++++++++++-------- 3 files changed, 80 insertions(+), 67 deletions(-) diff --git a/pkg/sql/scheduledlogging/BUILD.bazel b/pkg/sql/scheduledlogging/BUILD.bazel index 051a04d5cbb5..50484a27d8b5 100644 --- a/pkg/sql/scheduledlogging/BUILD.bazel +++ b/pkg/sql/scheduledlogging/BUILD.bazel @@ -38,9 +38,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", - "//pkg/testutils", "//pkg/testutils/serverutils", - "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", diff --git a/pkg/sql/scheduledlogging/captured_index_usage_stats.go b/pkg/sql/scheduledlogging/captured_index_usage_stats.go index 596729eca7c6..d8f0c9d17285 100644 --- a/pkg/sql/scheduledlogging/captured_index_usage_stats.go +++ b/pkg/sql/scheduledlogging/captured_index_usage_stats.go @@ -72,6 +72,9 @@ type CaptureIndexUsageStatsTestingKnobs struct { // scheduled interval in the case that the logging duration exceeds the // default scheduled interval duration. getOverlapDuration func() time.Duration + // onScheduleComplete allows tests to hook into when the current schedule + // is completed to check for the expected logs. + onScheduleComplete func() } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -104,17 +107,14 @@ func (s *CaptureIndexUsageStatsLoggingScheduler) durationOnOverlap() time.Durati } func (s *CaptureIndexUsageStatsLoggingScheduler) durationUntilNextInterval() time.Duration { - // If telemetry is disabled, return the disabled interval duration. - if !telemetryCaptureIndexUsageStatsEnabled.Get(&s.st.SV) { - return telemetryCaptureIndexUsageStatsStatusCheckEnabledInterval.Get(&s.st.SV) - } + loggingDur := s.getLoggingDuration() // If the previous logging operation took longer than or equal to the set // schedule interval, schedule the next interval immediately. - if s.getLoggingDuration() >= telemetryCaptureIndexUsageStatsInterval.Get(&s.st.SV) { + if loggingDur >= telemetryCaptureIndexUsageStatsInterval.Get(&s.st.SV) { return s.durationOnOverlap() } // Otherwise, schedule the next interval normally. - return telemetryCaptureIndexUsageStatsInterval.Get(&s.st.SV) + return telemetryCaptureIndexUsageStatsInterval.Get(&s.st.SV) - loggingDur } // Start starts the capture index usage statistics logging scheduler. @@ -146,7 +146,15 @@ func (s *CaptureIndexUsageStatsLoggingScheduler) start(ctx context.Context, stop case <-stopper.ShouldQuiesce(): return case <-timer.C: + if !telemetryCaptureIndexUsageStatsEnabled.Get(&s.st.SV) { + timer.Reset(telemetryCaptureIndexUsageStatsStatusCheckEnabledInterval.Get(&s.st.SV)) + continue + } s.currentCaptureStartTime = timeutil.Now() + err := captureIndexUsageStats(ctx, s.ie, stopper, telemetryCaptureIndexUsageStatsLoggingDelay.Get(&s.st.SV)) + if err != nil { + log.Warningf(ctx, "error capturing index usage stats: %+v", err) + } dur := s.durationUntilNextInterval() if dur < time.Second { // Avoid intervals that are too short, to prevent a hot @@ -154,14 +162,8 @@ func (s *CaptureIndexUsageStatsLoggingScheduler) start(ctx context.Context, stop dur = time.Second } timer.Reset(dur) - - if !telemetryCaptureIndexUsageStatsEnabled.Get(&s.st.SV) { - continue - } - - err := captureIndexUsageStats(ctx, s.ie, stopper, telemetryCaptureIndexUsageStatsLoggingDelay.Get(&s.st.SV)) - if err != nil { - log.Warningf(ctx, "error capturing index usage stats: %+v", err) + if s.knobs != nil && s.knobs.onScheduleComplete != nil { + s.knobs.onScheduleComplete() } } } diff --git a/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go b/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go index 654006586ee8..e23023119059 100644 --- a/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go +++ b/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go @@ -22,9 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -66,26 +64,22 @@ func (s *stubDurations) getOverlapDuration() time.Duration { func TestCaptureIndexUsageStats(t *testing.T) { defer leaktest.AfterTest(t)() - - skip.UnderShort(t, "#87772") - sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t) defer cleanup() + firstScheduleLoggingDuration := 1 * time.Second sd := stubDurations{} - sd.setLoggingDuration(1 * time.Second) - sd.setOverlapDuration(10 * time.Second) - stubScheduleInterval := 20 * time.Second - stubScheduleCheckEnabledInterval := 1 * time.Second + sd.setLoggingDuration(firstScheduleLoggingDuration) + sd.setOverlapDuration(time.Second) + stubScheduleInterval := 2 * time.Second + stubScheduleCheckEnabledInterval := time.Second stubLoggingDelay := 0 * time.Second - // timeBuffer is a short time buffer to account for delays in the schedule - // timings when running tests. The time buffer is smaller than the difference - // between each schedule interval to ensure that there is no overlap. - timeBuffer := 5 * time.Second + // timeBuffer is a short time buffer to account for non-determinism in the logging timings. + timeBuffer := time.Second settings := cluster.MakeTestingClusterSettings() // Configure capture index usage statistics to be disabled. This is to test @@ -103,12 +97,18 @@ func TestCaptureIndexUsageStats(t *testing.T) { // Configure the delay between each emission of index usage stats logs. telemetryCaptureIndexUsageStatsLoggingDelay.Override(context.Background(), &settings.SV, stubLoggingDelay) + scheduleCompleteChan := make(chan struct{}) + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Settings: settings, Knobs: base.TestingKnobs{ CapturedIndexUsageStatsKnobs: &CaptureIndexUsageStatsTestingKnobs{ getLoggingDuration: sd.getLoggingDuration, getOverlapDuration: sd.getOverlapDuration, + onScheduleComplete: func() { + scheduleCompleteChan <- struct{}{} + <-scheduleCompleteChan + }, }, }, }) @@ -157,18 +157,19 @@ func TestCaptureIndexUsageStats(t *testing.T) { expectedTotalNumEntriesInSingleInterval := 8 expectedNumberOfIndividualIndexEntriesInSingleInterval := 1 - // Expect index usage statistics logs once the schedule disabled interval has passed. - // Assert that we have the expected number of total logs and expected number - // of logs for each index. - testutils.SucceedsWithin(t, func() error { - return checkNumTotalEntriesAndNumIndexEntries(t, - expectedIndexNames, - expectedTotalNumEntriesInSingleInterval, - expectedNumberOfIndividualIndexEntriesInSingleInterval, - ) - }, stubScheduleCheckEnabledInterval+timeBuffer) - - // Verify that a second schedule has run after the enabled interval has passed. + // Wait for channel value from end of 1st schedule. + <-scheduleCompleteChan + + // Check the expected number of entries from the 1st schedule. + require.NoError(t, checkNumTotalEntriesAndNumIndexEntries(t, + expectedIndexNames, + expectedTotalNumEntriesInSingleInterval, + expectedNumberOfIndividualIndexEntriesInSingleInterval, + scheduleCompleteChan, + ), "error encountered checking the number of total entries and number of index entries") + + scheduleCompleteChan <- struct{}{} + // Expect number of total entries to hold 2 times the number of entries in a // single interval. expectedTotalNumEntriesAfterTwoIntervals := expectedTotalNumEntriesInSingleInterval * 2 @@ -180,18 +181,19 @@ func TestCaptureIndexUsageStats(t *testing.T) { stubLoggingDuration := stubScheduleInterval * 2 sd.setLoggingDuration(stubLoggingDuration) - // Expect index usage statistics logs once the schedule enabled interval has passed. - // Assert that we have the expected number of total logs and expected number - // of logs for each index. - testutils.SucceedsWithin(t, func() error { - return checkNumTotalEntriesAndNumIndexEntries(t, - expectedIndexNames, - expectedTotalNumEntriesAfterTwoIntervals, - expectedNumberOfIndividualIndexEntriesAfterTwoIntervals, - ) - }, stubScheduleInterval+timeBuffer) - - // Verify that a third schedule has run after the overlap duration has passed. + // Wait for channel value from end of 2nd schedule. + <-scheduleCompleteChan + + // Check the expected number of entries from the 2nd schedule. + require.NoError(t, checkNumTotalEntriesAndNumIndexEntries(t, + expectedIndexNames, + expectedTotalNumEntriesAfterTwoIntervals, + expectedNumberOfIndividualIndexEntriesAfterTwoIntervals, + scheduleCompleteChan, + ), "error encountered checking the number of total entries and number of index entries") + + scheduleCompleteChan <- struct{}{} + // Expect number of total entries to hold 3 times the number of entries in a // single interval. expectedTotalNumEntriesAfterThreeIntervals := expectedTotalNumEntriesInSingleInterval * 3 @@ -199,18 +201,22 @@ func TestCaptureIndexUsageStats(t *testing.T) { // entries in a single interval. expectedNumberOfIndividualIndexEntriesAfterThreeIntervals := expectedNumberOfIndividualIndexEntriesInSingleInterval * 3 - // Assert that we have the expected number of total logs and expected number - // of logs for each index. - testutils.SucceedsWithin(t, func() error { - return checkNumTotalEntriesAndNumIndexEntries(t, - expectedIndexNames, - expectedTotalNumEntriesAfterThreeIntervals, - expectedNumberOfIndividualIndexEntriesAfterThreeIntervals, - ) - }, sd.getOverlapDuration()+timeBuffer) + // Wait for channel value from end of 3rd schedule. + <-scheduleCompleteChan + + // Check the expected number of entries from the 3rd schedule. + require.NoError(t, checkNumTotalEntriesAndNumIndexEntries(t, + expectedIndexNames, + expectedTotalNumEntriesAfterThreeIntervals, + expectedNumberOfIndividualIndexEntriesAfterThreeIntervals, + scheduleCompleteChan, + ), "error encountered checking the number of total entries and number of index entries") + // Stop capturing index usage statistics. telemetryCaptureIndexUsageStatsEnabled.Override(context.Background(), &settings.SV, false) + scheduleCompleteChan <- struct{}{} + // Iterate through entries, ensure that the timestamp difference between each // schedule is as expected. startTimestamp := int64(0) @@ -234,7 +240,7 @@ func TestCaptureIndexUsageStats(t *testing.T) { testData := []time.Duration{ 0 * time.Second, // the difference in number of seconds between first and second schedule - stubScheduleInterval, + stubScheduleInterval - firstScheduleLoggingDuration, // the difference in number of seconds between second and third schedule sd.getOverlapDuration(), } @@ -257,9 +263,9 @@ func TestCaptureIndexUsageStats(t *testing.T) { } actualDuration := time.Duration(currentTimestamp-previousTimestamp) * time.Nanosecond - // Use a time window to afford some non-determinisim in the test. - require.Greater(t, expectedDuration, actualDuration-time.Second) - require.Greater(t, actualDuration+time.Second, expectedDuration) + // Use a time window to afford some non-determinism in the test. + require.Greater(t, expectedDuration, actualDuration-timeBuffer) + require.Greater(t, actualDuration+timeBuffer, expectedDuration) previousTimestamp = currentTimestamp } } @@ -273,7 +279,9 @@ func checkNumTotalEntriesAndNumIndexEntries( expectedIndexNames []string, expectedTotalEntries int, expectedIndividualIndexEntries int, + scheduleCompleteChan chan struct{}, ) error { + log.Flush() // Fetch log entries. entries, err := log.FetchEntriesFromFiles( 0, @@ -283,11 +291,13 @@ func checkNumTotalEntriesAndNumIndexEntries( log.WithMarkedSensitiveData, ) if err != nil { + close(scheduleCompleteChan) return err } // Assert that we have the correct number of entries. if expectedTotalEntries != len(entries) { + close(scheduleCompleteChan) return errors.Newf("expected %d total entries, got %d", expectedTotalEntries, len(entries)) } @@ -305,6 +315,7 @@ func checkNumTotalEntriesAndNumIndexEntries( } err := json.Unmarshal([]byte(e.Message), &s) if err != nil { + close(scheduleCompleteChan) t.Fatal(err) } countByIndex[s.IndexName] = countByIndex[s.IndexName] + 1 @@ -313,11 +324,13 @@ func checkNumTotalEntriesAndNumIndexEntries( t.Logf("found index counts: %+v", countByIndex) if expected, actual := expectedTotalEntries/expectedIndividualIndexEntries, len(countByIndex); actual != expected { + close(scheduleCompleteChan) return errors.Newf("expected %d indexes, got %d", expected, actual) } for idxName, c := range countByIndex { if c != expectedIndividualIndexEntries { + close(scheduleCompleteChan) return errors.Newf("for index %s: expected entry count %d, got %d", idxName, expectedIndividualIndexEntries, c) } @@ -325,9 +338,9 @@ func checkNumTotalEntriesAndNumIndexEntries( for _, idxName := range expectedIndexNames { if _, ok := countByIndex[idxName]; !ok { + close(scheduleCompleteChan) return errors.Newf("no entry found for index %s", idxName) } } - return nil } From 6369dbe84cc09c1de8facbef8b864d464624b18e Mon Sep 17 00:00:00 2001 From: e-mbrown Date: Thu, 22 Sep 2022 15:14:33 -0400 Subject: [PATCH 2/2] sql: Add server-side transaction timeout This commit adds the session variable `transation_timeout` Release note (sql change): The `transaction_timeout` session variable was added. `transaction_timeout` aborts an explicit transaction when it runs longer than the configured duration. When the timer times out, the current statement is cancelled and the transaction enters an aborted state. This timeout does not have any effect when no statement is being executed, so it should be used with `idle_in_transaction_timeout` for the best results. --- pkg/sql/conn_executor.go | 2 + pkg/sql/conn_executor_exec.go | 64 ++++++++++++--- pkg/sql/exec_util.go | 4 + .../testdata/logic_test/information_schema | 1 + .../logictest/testdata/logic_test/pg_catalog | 3 + .../logictest/testdata/logic_test/show_source | 1 + pkg/sql/run_control_test.go | 81 +++++++++++++++++++ .../local_only_session_data.proto | 3 + pkg/sql/set_var.go | 14 ++++ pkg/sql/sqlerrors/errors.go | 4 + pkg/sql/vars.go | 12 +++ 11 files changed, 180 insertions(+), 9 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ec023733f6ec..9a67e90f9972 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1850,6 +1850,7 @@ func (ex *connExecutor) run( return err } } + } // errDrainingComplete is returned by execCmd when the connExecutor previously got @@ -1920,6 +1921,7 @@ func (ex *connExecutor) execCmd() error { ev, payload, err = ex.execStmt( ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes, canAutoCommit, ) + return err }() // Note: we write to ex.statsCollector.PhaseTimes, instead of ex.phaseTimes, diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 855c8815d5d7..74d91421e1f7 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -297,10 +297,15 @@ func (ex *connExecutor) execStmtInOpenState( st.mu.stmtCount++ }(&ex.state) - var timeoutTicker *time.Timer + var queryTimeoutTicker *time.Timer + var txnTimeoutTicker *time.Timer queryTimedOut := false - // doneAfterFunc will be allocated only when timeoutTicker is non-nil. - var doneAfterFunc chan struct{} + txnTimedOut := false + + // queryDoneAfterFunc and txnDoneAfterFunc will be allocated only when + // queryTimeoutTicker or txnTimeoutTicker is non-nil. + var queryDoneAfterFunc chan struct{} + var txnDoneAfterFunc chan struct{} // Early-associate placeholder info with the eval context, // so that we can fill in placeholder values in our call to addActiveQuery, below. @@ -316,11 +321,18 @@ func (ex *connExecutor) execStmtInOpenState( // overwriting res.Error to a more user-friendly message in case of query // cancellation. defer func(ctx context.Context, res RestrictedCommandResult) { - if timeoutTicker != nil { - if !timeoutTicker.Stop() { + if queryTimeoutTicker != nil { + if !queryTimeoutTicker.Stop() { // Wait for the timer callback to complete to avoid a data race on // queryTimedOut. - <-doneAfterFunc + <-queryDoneAfterFunc + } + } + if txnTimeoutTicker != nil { + if !txnTimeoutTicker.Stop() { + // Wait for the timer callback to complete to avoid a data race on + // txnTimedOut. + <-txnDoneAfterFunc } } @@ -365,6 +377,12 @@ func (ex *connExecutor) execStmtInOpenState( } res.SetError(sqlerrors.QueryTimeoutError) retPayload = eventNonRetriableErrPayload{err: sqlerrors.QueryTimeoutError} + } else if txnTimedOut { + retEv = eventNonRetriableErr{ + IsCommit: fsm.FromBool(isCommit(ast)), + } + res.SetError(sqlerrors.TxnTimeoutError) + retPayload = eventNonRetriableErrPayload{err: sqlerrors.TxnTimeoutError} } }(ctx, res) @@ -475,6 +493,34 @@ func (ex *connExecutor) execStmtInOpenState( }() } + if ex.sessionData().TransactionTimeout > 0 && !ex.implicitTxn() { + timerDuration := + ex.sessionData().TransactionTimeout - timeutil.Since(ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionTransactionStarted)) + + // If the timer already expired, but the transaction is not yet aborted, + // we should error immediately without executing. If the timer + // expired but the transaction already is aborted, then we should still + // proceed with executing the statement in order to get a + // TransactionAbortedError. + _, txnAborted := ex.machine.CurState().(stateAborted) + + if timerDuration < 0 && !txnAborted { + txnTimedOut = true + return makeErrEvent(sqlerrors.TxnTimeoutError) + } + + if timerDuration > 0 { + txnDoneAfterFunc = make(chan struct{}, 1) + txnTimeoutTicker = time.AfterFunc( + timerDuration, + func() { + cancelQuery() + txnTimedOut = true + txnDoneAfterFunc <- struct{}{} + }) + } + } + // We exempt `SET` statements from the statement timeout, particularly so as // not to block the `SET statement_timeout` command itself. if ex.sessionData().StmtTimeout > 0 && ast.StatementTag() != "SET" { @@ -485,13 +531,13 @@ func (ex *connExecutor) execStmtInOpenState( queryTimedOut = true return makeErrEvent(sqlerrors.QueryTimeoutError) } - doneAfterFunc = make(chan struct{}, 1) - timeoutTicker = time.AfterFunc( + queryDoneAfterFunc = make(chan struct{}, 1) + queryTimeoutTicker = time.AfterFunc( timerDuration, func() { cancelQuery() queryTimedOut = true - doneAfterFunc <- struct{}{} + queryDoneAfterFunc <- struct{}{} }) } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index e51f477fc506..cb46543c74ab 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3119,6 +3119,10 @@ func (m *sessionDataMutator) SetIdleInTransactionSessionTimeout(timeout time.Dur m.data.IdleInTransactionSessionTimeout = timeout } +func (m *sessionDataMutator) SetTransactionTimeout(timeout time.Duration) { + m.data.TransactionTimeout = timeout +} + func (m *sessionDataMutator) SetAllowPrepareAsOptPlan(val bool) { m.data.AllowPrepareAsOptPlan = val } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 7cd83f2ef250..ebf8bc5a0cea 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -4800,6 +4800,7 @@ transaction_rows_read_log 0 transaction_rows_written_err 0 transaction_rows_written_log 0 transaction_status NoTxn +transaction_timeout 0 troubleshooting_mode off unconstrained_non_covering_index_scan_enabled off variable_inequality_lookup_join_enabled on diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index b6f7f707f953..b6b2abda124a 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -4270,6 +4270,7 @@ transaction_rows_read_log 0 NULL transaction_rows_written_err 0 NULL NULL NULL string transaction_rows_written_log 0 NULL NULL NULL string transaction_status NoTxn NULL NULL NULL string +transaction_timeout 0 NULL NULL NULL string troubleshooting_mode off NULL NULL NULL string unconstrained_non_covering_index_scan_enabled off NULL NULL NULL string use_declarative_schema_changer on NULL NULL NULL string @@ -4404,6 +4405,7 @@ transaction_rows_read_log 0 NULL transaction_rows_written_err 0 NULL user NULL 0 0 transaction_rows_written_log 0 NULL user NULL 0 0 transaction_status NoTxn NULL user NULL NoTxn NoTxn +transaction_timeout 0 NULL user NULL 0 0 troubleshooting_mode off NULL user NULL off off unconstrained_non_covering_index_scan_enabled off NULL user NULL off off use_declarative_schema_changer on NULL user NULL on on @@ -4537,6 +4539,7 @@ transaction_rows_read_log NULL NULL NULL transaction_rows_written_err NULL NULL NULL NULL NULL transaction_rows_written_log NULL NULL NULL NULL NULL transaction_status NULL NULL NULL NULL NULL +transaction_timeout NULL NULL NULL NULL NULL troubleshooting_mode NULL NULL NULL NULL NULL unconstrained_non_covering_index_scan_enabled NULL NULL NULL NULL NULL use_declarative_schema_changer NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index d325421f725c..304a92e4330d 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -143,6 +143,7 @@ transaction_rows_read_log 0 transaction_rows_written_err 0 transaction_rows_written_log 0 transaction_status NoTxn +transaction_timeout 0 troubleshooting_mode off unconstrained_non_covering_index_scan_enabled off use_declarative_schema_changer on diff --git a/pkg/sql/run_control_test.go b/pkg/sql/run_control_test.go index 35edd90abaf1..564cc2c90784 100644 --- a/pkg/sql/run_control_test.go +++ b/pkg/sql/run_control_test.go @@ -545,6 +545,87 @@ func TestIdleInTransactionSessionTimeout(t *testing.T) { } } +func TestTransactionTimeout(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + numNodes := 1 + tc := serverutils.StartNewTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + defer tc.ServerConn(0).Close() + + var TransactionStatus string + + conn, err := tc.ServerConn(0).Conn(ctx) + require.NoError(t, err) + + _, err = conn.ExecContext(ctx, `SET transaction_timeout = '1s'`) + require.NoError(t, err) + + _, err = conn.ExecContext(ctx, `BEGIN`) + require.NoError(t, err) + + _, err = conn.ExecContext(ctx, `select pg_sleep(0.1);`) + require.NoError(t, err) + + _, err = conn.ExecContext(ctx, `select pg_sleep(10);`) + require.Regexp(t, "pq: query execution canceled due to transaction timeout", err) + + err = conn.QueryRowContext(ctx, `SHOW TRANSACTION STATUS`).Scan(&TransactionStatus) + require.NoError(t, err) + + require.Equal(t, "Aborted", TransactionStatus) + + _, err = conn.ExecContext(ctx, `SELECT 1;`) + require.Regexp(t, "current transaction is aborted", err) + + _, err = conn.ExecContext(ctx, `ROLLBACK`) + require.NoError(t, err) + + //Ensure the transaction times out when transaction is open and no statement + //is executed. + _, err = conn.ExecContext(ctx, `BEGIN`) + require.NoError(t, err) + + time.Sleep(1010 * time.Millisecond) + + _, err = conn.ExecContext(ctx, `SELECT 1;`) + require.Regexp(t, "pq: query execution canceled due to transaction timeout", err) + + err = conn.QueryRowContext(ctx, `SHOW TRANSACTION STATUS`).Scan(&TransactionStatus) + require.NoError(t, err) + + require.Equal(t, "Aborted", TransactionStatus) + + _, err = conn.ExecContext(ctx, `ROLLBACK`) + require.NoError(t, err) + + _, err = conn.ExecContext(ctx, `SET transaction_timeout = '10s'`) + require.NoError(t, err) + + _, err = conn.ExecContext(ctx, `BEGIN`) + require.NoError(t, err) + + _, err = conn.ExecContext(ctx, `select pg_sleep(0.1);`) + require.NoError(t, err) + + _, err = conn.ExecContext(ctx, `SELECT 1;`) + require.NoError(t, err) + + err = conn.QueryRowContext(ctx, `SHOW TRANSACTION STATUS`).Scan(&TransactionStatus) + require.NoError(t, err) + + require.Equal(t, "Open", TransactionStatus) + + _, err = conn.ExecContext(ctx, `COMMIT`) + require.NoError(t, err) +} + func TestIdleInTransactionSessionTimeoutAbortedState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index a320e5178788..dc5846cafa62 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -295,6 +295,9 @@ message LocalOnlySessionData { // be allowed to consider lookup joins with inequality conditions, in // addition to the other restrictions on when they are planned. bool variable_inequality_lookup_join_enabled = 80; + // TransactionSessionTimeout is the duration a transaction is permitted to + // run before the transaction is canceled. If set to 0, there is no timeout. + int64 transaction_timeout = 81 [(gogoproto.casttype) = "time.Duration"]; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/set_var.go b/pkg/sql/set_var.go index e8324b254e25..c481f1c1bc94 100644 --- a/pkg/sql/set_var.go +++ b/pkg/sql/set_var.go @@ -438,6 +438,20 @@ func idleInSessionTimeoutVarSet(ctx context.Context, m sessionDataMutator, s str return nil } +func transactionTimeoutVarSet(ctx context.Context, m sessionDataMutator, s string) error { + timeout, err := validateTimeoutVar( + m.data.GetIntervalStyle(), + s, + "transaction_timeout", + ) + if err != nil { + return err + } + + m.SetTransactionTimeout(timeout) + return nil +} + func idleInTransactionSessionTimeoutVarSet( ctx context.Context, m sessionDataMutator, s string, ) error { diff --git a/pkg/sql/sqlerrors/errors.go b/pkg/sql/sqlerrors/errors.go index c38b03ba9238..63317a023fc5 100644 --- a/pkg/sql/sqlerrors/errors.go +++ b/pkg/sql/sqlerrors/errors.go @@ -304,6 +304,10 @@ func NewAggInAggError() error { var QueryTimeoutError = pgerror.New( pgcode.QueryCanceled, "query execution canceled due to statement timeout") +// TxnTimeoutError is an error representing a query timeout. +var TxnTimeoutError = pgerror.New( + pgcode.QueryCanceled, "query execution canceled due to transaction timeout") + // IsOutOfMemoryError checks whether this is an out of memory error. func IsOutOfMemoryError(err error) bool { return errHasCode(err, pgcode.OutOfMemory) diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index 6ccd0b15432d..f33450af8614 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -1218,6 +1218,18 @@ var varGen = map[string]sessionVar{ }, GlobalDefault: globalFalse, }, + // CockroachDB extension. + `transaction_timeout`: { + GetStringVal: makeTimeoutVarGetter(`transaction_timeout`), + Set: transactionTimeoutVarSet, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + ms := evalCtx.SessionData().TransactionTimeout.Nanoseconds() / int64(time.Millisecond) + return strconv.FormatInt(ms, 10), nil + }, + GlobalDefault: func(sv *settings.Values) string { + return strconv.FormatInt(0, 10) + }, + }, // This is read-only in Postgres also. // See https://www.postgresql.org/docs/14/sql-show.html and