Skip to content

Commit

Permalink
Merge #89033 #89319
Browse files Browse the repository at this point in the history
89033: sql: Add server-side transaction timeout r=rafiss a=e-mbrown

Resolves #61102

This commit adds the session variable `transation_timeout`

Release note (sql change): `transaction_timeout` cancels an explicit transaction when it runs longer than the timer set. When the timer times out, the current statement is allowed to finish and from then the transaction enters an aborted state. Because the timer needs a statement to finish before it aborts a transaction `transaction_timeout` should be used with `idle_in_transaction` for the best results.

89319: scheduledlogging: shorten TestCaptureIndexUsageStats run time r=THardy98 a=THardy98

Previously, TestCaptureIndexUsageStats ran through 4 iterations of 20 seconds for a run time of over a minute. This change reduces the run time of the test to under 10 seconds.

Release note: None

Co-authored-by: e-mbrown <[email protected]>
Co-authored-by: Thomas Hardy <[email protected]>
  • Loading branch information
3 people committed Oct 6, 2022
3 parents e04705b + 6369dbe + bb18504 commit ea43f89
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 76 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,7 @@ func (ex *connExecutor) run(
return err
}
}

}

// errDrainingComplete is returned by execCmd when the connExecutor previously got
Expand Down Expand Up @@ -1920,6 +1921,7 @@ func (ex *connExecutor) execCmd() error {
ev, payload, err = ex.execStmt(
ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes, canAutoCommit,
)

return err
}()
// Note: we write to ex.statsCollector.PhaseTimes, instead of ex.phaseTimes,
Expand Down
64 changes: 55 additions & 9 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,15 @@ func (ex *connExecutor) execStmtInOpenState(
st.mu.stmtCount++
}(&ex.state)

var timeoutTicker *time.Timer
var queryTimeoutTicker *time.Timer
var txnTimeoutTicker *time.Timer
queryTimedOut := false
// doneAfterFunc will be allocated only when timeoutTicker is non-nil.
var doneAfterFunc chan struct{}
txnTimedOut := false

// queryDoneAfterFunc and txnDoneAfterFunc will be allocated only when
// queryTimeoutTicker or txnTimeoutTicker is non-nil.
var queryDoneAfterFunc chan struct{}
var txnDoneAfterFunc chan struct{}

// Early-associate placeholder info with the eval context,
// so that we can fill in placeholder values in our call to addActiveQuery, below.
Expand All @@ -316,11 +321,18 @@ func (ex *connExecutor) execStmtInOpenState(
// overwriting res.Error to a more user-friendly message in case of query
// cancellation.
defer func(ctx context.Context, res RestrictedCommandResult) {
if timeoutTicker != nil {
if !timeoutTicker.Stop() {
if queryTimeoutTicker != nil {
if !queryTimeoutTicker.Stop() {
// Wait for the timer callback to complete to avoid a data race on
// queryTimedOut.
<-doneAfterFunc
<-queryDoneAfterFunc
}
}
if txnTimeoutTicker != nil {
if !txnTimeoutTicker.Stop() {
// Wait for the timer callback to complete to avoid a data race on
// txnTimedOut.
<-txnDoneAfterFunc
}
}

Expand Down Expand Up @@ -365,6 +377,12 @@ func (ex *connExecutor) execStmtInOpenState(
}
res.SetError(sqlerrors.QueryTimeoutError)
retPayload = eventNonRetriableErrPayload{err: sqlerrors.QueryTimeoutError}
} else if txnTimedOut {
retEv = eventNonRetriableErr{
IsCommit: fsm.FromBool(isCommit(ast)),
}
res.SetError(sqlerrors.TxnTimeoutError)
retPayload = eventNonRetriableErrPayload{err: sqlerrors.TxnTimeoutError}
}
}(ctx, res)

Expand Down Expand Up @@ -475,6 +493,34 @@ func (ex *connExecutor) execStmtInOpenState(
}()
}

if ex.sessionData().TransactionTimeout > 0 && !ex.implicitTxn() {
timerDuration :=
ex.sessionData().TransactionTimeout - timeutil.Since(ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionTransactionStarted))

// If the timer already expired, but the transaction is not yet aborted,
// we should error immediately without executing. If the timer
// expired but the transaction already is aborted, then we should still
// proceed with executing the statement in order to get a
// TransactionAbortedError.
_, txnAborted := ex.machine.CurState().(stateAborted)

if timerDuration < 0 && !txnAborted {
txnTimedOut = true
return makeErrEvent(sqlerrors.TxnTimeoutError)
}

if timerDuration > 0 {
txnDoneAfterFunc = make(chan struct{}, 1)
txnTimeoutTicker = time.AfterFunc(
timerDuration,
func() {
cancelQuery()
txnTimedOut = true
txnDoneAfterFunc <- struct{}{}
})
}
}

// We exempt `SET` statements from the statement timeout, particularly so as
// not to block the `SET statement_timeout` command itself.
if ex.sessionData().StmtTimeout > 0 && ast.StatementTag() != "SET" {
Expand All @@ -485,13 +531,13 @@ func (ex *connExecutor) execStmtInOpenState(
queryTimedOut = true
return makeErrEvent(sqlerrors.QueryTimeoutError)
}
doneAfterFunc = make(chan struct{}, 1)
timeoutTicker = time.AfterFunc(
queryDoneAfterFunc = make(chan struct{}, 1)
queryTimeoutTicker = time.AfterFunc(
timerDuration,
func() {
cancelQuery()
queryTimedOut = true
doneAfterFunc <- struct{}{}
queryDoneAfterFunc <- struct{}{}
})
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3119,6 +3119,10 @@ func (m *sessionDataMutator) SetIdleInTransactionSessionTimeout(timeout time.Dur
m.data.IdleInTransactionSessionTimeout = timeout
}

func (m *sessionDataMutator) SetTransactionTimeout(timeout time.Duration) {
m.data.TransactionTimeout = timeout
}

func (m *sessionDataMutator) SetAllowPrepareAsOptPlan(val bool) {
m.data.AllowPrepareAsOptPlan = val
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -4800,6 +4800,7 @@ transaction_rows_read_log 0
transaction_rows_written_err 0
transaction_rows_written_log 0
transaction_status NoTxn
transaction_timeout 0
troubleshooting_mode off
unconstrained_non_covering_index_scan_enabled off
variable_inequality_lookup_join_enabled on
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -4270,6 +4270,7 @@ transaction_rows_read_log 0 NULL
transaction_rows_written_err 0 NULL NULL NULL string
transaction_rows_written_log 0 NULL NULL NULL string
transaction_status NoTxn NULL NULL NULL string
transaction_timeout 0 NULL NULL NULL string
troubleshooting_mode off NULL NULL NULL string
unconstrained_non_covering_index_scan_enabled off NULL NULL NULL string
use_declarative_schema_changer on NULL NULL NULL string
Expand Down Expand Up @@ -4404,6 +4405,7 @@ transaction_rows_read_log 0 NULL
transaction_rows_written_err 0 NULL user NULL 0 0
transaction_rows_written_log 0 NULL user NULL 0 0
transaction_status NoTxn NULL user NULL NoTxn NoTxn
transaction_timeout 0 NULL user NULL 0 0
troubleshooting_mode off NULL user NULL off off
unconstrained_non_covering_index_scan_enabled off NULL user NULL off off
use_declarative_schema_changer on NULL user NULL on on
Expand Down Expand Up @@ -4537,6 +4539,7 @@ transaction_rows_read_log NULL NULL NULL
transaction_rows_written_err NULL NULL NULL NULL NULL
transaction_rows_written_log NULL NULL NULL NULL NULL
transaction_status NULL NULL NULL NULL NULL
transaction_timeout NULL NULL NULL NULL NULL
troubleshooting_mode NULL NULL NULL NULL NULL
unconstrained_non_covering_index_scan_enabled NULL NULL NULL NULL NULL
use_declarative_schema_changer NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ transaction_rows_read_log 0
transaction_rows_written_err 0
transaction_rows_written_log 0
transaction_status NoTxn
transaction_timeout 0
troubleshooting_mode off
unconstrained_non_covering_index_scan_enabled off
use_declarative_schema_changer on
Expand Down
81 changes: 81 additions & 0 deletions pkg/sql/run_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,87 @@ func TestIdleInTransactionSessionTimeout(t *testing.T) {
}
}

func TestTransactionTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

numNodes := 1
tc := serverutils.StartNewTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

defer tc.ServerConn(0).Close()

var TransactionStatus string

conn, err := tc.ServerConn(0).Conn(ctx)
require.NoError(t, err)

_, err = conn.ExecContext(ctx, `SET transaction_timeout = '1s'`)
require.NoError(t, err)

_, err = conn.ExecContext(ctx, `BEGIN`)
require.NoError(t, err)

_, err = conn.ExecContext(ctx, `select pg_sleep(0.1);`)
require.NoError(t, err)

_, err = conn.ExecContext(ctx, `select pg_sleep(10);`)
require.Regexp(t, "pq: query execution canceled due to transaction timeout", err)

err = conn.QueryRowContext(ctx, `SHOW TRANSACTION STATUS`).Scan(&TransactionStatus)
require.NoError(t, err)

require.Equal(t, "Aborted", TransactionStatus)

_, err = conn.ExecContext(ctx, `SELECT 1;`)
require.Regexp(t, "current transaction is aborted", err)

_, err = conn.ExecContext(ctx, `ROLLBACK`)
require.NoError(t, err)

//Ensure the transaction times out when transaction is open and no statement
//is executed.
_, err = conn.ExecContext(ctx, `BEGIN`)
require.NoError(t, err)

time.Sleep(1010 * time.Millisecond)

_, err = conn.ExecContext(ctx, `SELECT 1;`)
require.Regexp(t, "pq: query execution canceled due to transaction timeout", err)

err = conn.QueryRowContext(ctx, `SHOW TRANSACTION STATUS`).Scan(&TransactionStatus)
require.NoError(t, err)

require.Equal(t, "Aborted", TransactionStatus)

_, err = conn.ExecContext(ctx, `ROLLBACK`)
require.NoError(t, err)

_, err = conn.ExecContext(ctx, `SET transaction_timeout = '10s'`)
require.NoError(t, err)

_, err = conn.ExecContext(ctx, `BEGIN`)
require.NoError(t, err)

_, err = conn.ExecContext(ctx, `select pg_sleep(0.1);`)
require.NoError(t, err)

_, err = conn.ExecContext(ctx, `SELECT 1;`)
require.NoError(t, err)

err = conn.QueryRowContext(ctx, `SHOW TRANSACTION STATUS`).Scan(&TransactionStatus)
require.NoError(t, err)

require.Equal(t, "Open", TransactionStatus)

_, err = conn.ExecContext(ctx, `COMMIT`)
require.NoError(t, err)
}

func TestIdleInTransactionSessionTimeoutAbortedState(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/scheduledlogging/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
Expand Down
30 changes: 16 additions & 14 deletions pkg/sql/scheduledlogging/captured_index_usage_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type CaptureIndexUsageStatsTestingKnobs struct {
// scheduled interval in the case that the logging duration exceeds the
// default scheduled interval duration.
getOverlapDuration func() time.Duration
// onScheduleComplete allows tests to hook into when the current schedule
// is completed to check for the expected logs.
onScheduleComplete func()
}

// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -104,17 +107,14 @@ func (s *CaptureIndexUsageStatsLoggingScheduler) durationOnOverlap() time.Durati
}

func (s *CaptureIndexUsageStatsLoggingScheduler) durationUntilNextInterval() time.Duration {
// If telemetry is disabled, return the disabled interval duration.
if !telemetryCaptureIndexUsageStatsEnabled.Get(&s.st.SV) {
return telemetryCaptureIndexUsageStatsStatusCheckEnabledInterval.Get(&s.st.SV)
}
loggingDur := s.getLoggingDuration()
// If the previous logging operation took longer than or equal to the set
// schedule interval, schedule the next interval immediately.
if s.getLoggingDuration() >= telemetryCaptureIndexUsageStatsInterval.Get(&s.st.SV) {
if loggingDur >= telemetryCaptureIndexUsageStatsInterval.Get(&s.st.SV) {
return s.durationOnOverlap()
}
// Otherwise, schedule the next interval normally.
return telemetryCaptureIndexUsageStatsInterval.Get(&s.st.SV)
return telemetryCaptureIndexUsageStatsInterval.Get(&s.st.SV) - loggingDur
}

// Start starts the capture index usage statistics logging scheduler.
Expand Down Expand Up @@ -146,22 +146,24 @@ func (s *CaptureIndexUsageStatsLoggingScheduler) start(ctx context.Context, stop
case <-stopper.ShouldQuiesce():
return
case <-timer.C:
if !telemetryCaptureIndexUsageStatsEnabled.Get(&s.st.SV) {
timer.Reset(telemetryCaptureIndexUsageStatsStatusCheckEnabledInterval.Get(&s.st.SV))
continue
}
s.currentCaptureStartTime = timeutil.Now()
err := captureIndexUsageStats(ctx, s.ie, stopper, telemetryCaptureIndexUsageStatsLoggingDelay.Get(&s.st.SV))
if err != nil {
log.Warningf(ctx, "error capturing index usage stats: %+v", err)
}
dur := s.durationUntilNextInterval()
if dur < time.Second {
// Avoid intervals that are too short, to prevent a hot
// spot on this task.
dur = time.Second
}
timer.Reset(dur)

if !telemetryCaptureIndexUsageStatsEnabled.Get(&s.st.SV) {
continue
}

err := captureIndexUsageStats(ctx, s.ie, stopper, telemetryCaptureIndexUsageStatsLoggingDelay.Get(&s.st.SV))
if err != nil {
log.Warningf(ctx, "error capturing index usage stats: %+v", err)
if s.knobs != nil && s.knobs.onScheduleComplete != nil {
s.knobs.onScheduleComplete()
}
}
}
Expand Down
Loading

0 comments on commit ea43f89

Please sign in to comment.