diff --git a/server/server_test.go b/server/server_test.go index 9c672162c3555..3c916f55b7c7a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -37,10 +37,12 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" tmysql "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/versioninfo" + "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -1496,3 +1498,77 @@ func (cli *testServerClient) waitUntilServerOnline() { log.Fatal("failed to connect HTTP status in every 10 ms", zap.Int("retryTime", retryTime)) } } + +func (cli *testServerClient) RunTestStmtCountLimit(t *C) { + originalStmtCountLimit := config.GetGlobalConfig().Performance.StmtCountLimit + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.StmtCountLimit = 3 + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.StmtCountLimit = originalStmtCountLimit + }) + }() + + cli.runTests(t, nil, func(dbt *DBTest) { + dbt.mustExec("drop table if exists t") + dbt.mustExec("create table t (id int key);") + dbt.mustExec("set @@tidb_disable_txn_auto_retry=0;") + dbt.mustExec("set autocommit=0;") + dbt.mustExec("begin optimistic;") + dbt.mustExec("insert into t values (1);") + dbt.mustExec("insert into t values (2);") + _, err := dbt.db.Query("select * from t for update;") + require.Error(t, err) + require.Equal(t, "Error 1105: statement count 4 exceeds the transaction limitation, transaction has been rollback, autocommit = false", err.Error()) + dbt.mustExec("insert into t values (3);") + dbt.mustExec("commit;") + rows := dbt.mustQuery("select * from t;") + var id int + count := 0 + for rows.Next() { + rows.Scan(&id) + count++ + } + require.NoError(t, rows.Close()) + require.Equal(t, 3, id) + require.Equal(t, 1, count) + + dbt.mustExec("delete from t;") + dbt.mustExec("commit;") + dbt.mustExec("set @@tidb_disable_txn_auto_retry=0;") + dbt.mustExec("set autocommit=0;") + dbt.mustExec("begin optimistic;") + dbt.mustExec("insert into t values (1);") + dbt.mustExec("insert into t values (2);") + _, err = dbt.db.Exec("insert into t values (3);") + require.Error(t, err) + require.Equal(t, "Error 1105: statement count 4 exceeds the transaction limitation, transaction has been rollback, autocommit = false", err.Error()) + dbt.mustExec("commit;") + rows = dbt.mustQuery("select count(*) from t;") + for rows.Next() { + rows.Scan(&count) + } + require.NoError(t, rows.Close()) + require.Equal(t, 0, count) + + dbt.mustExec("delete from t;") + dbt.mustExec("commit;") + dbt.mustExec("set @@tidb_batch_commit=1;") + dbt.mustExec("set @@tidb_disable_txn_auto_retry=0;") + dbt.mustExec("set autocommit=0;") + dbt.mustExec("begin optimistic;") + dbt.mustExec("insert into t values (1);") + dbt.mustExec("insert into t values (2);") + dbt.mustExec("insert into t values (3);") + dbt.mustExec("insert into t values (4);") + dbt.mustExec("insert into t values (5);") + dbt.mustExec("commit;") + rows = dbt.mustQuery("select count(*) from t;") + for rows.Next() { + rows.Scan(&count) + } + require.NoError(t, rows.Close()) + require.Equal(t, 5, count) + }) +} diff --git a/server/tidb_test.go b/server/tidb_test.go index acb00f06348a1..26ab8e7ec3a67 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -937,6 +937,10 @@ func (ts *tidbTestSuite) TestSumAvg(c *C) { ts.runTestSumAvg(c) } +func (ts *tidbTestSuite) TestStmtCountLimit(c *C) { + ts.RunTestStmtCountLimit(c) +} + func (ts *tidbTestSuite) TestNullFlag(c *C) { // issue #9689 qctx, err := ts.tidbdrv.OpenCtx(uint64(0), 0, uint8(tmysql.DefaultCollationID), "test", nil) diff --git a/session/session_test.go b/session/session_test.go index a32e9e05349ed..b6709fea6af2c 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2522,6 +2522,16 @@ func (s *testSessionSerialSuite) TestBatchCommit(c *C) { tk.MustExec("insert into t values (7)") tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("delete from t") + tk.MustExec("commit") + tk.MustExec("begin") + tk.MustExec("explain analyze insert into t values (5)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("explain analyze insert into t values (6)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("explain analyze insert into t values (7)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + // The session is still in transaction. tk.MustExec("insert into t values (8)") tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) diff --git a/session/tidb.go b/session/tidb.go index bfa883d84e58e..d597c9f490734 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -209,7 +209,7 @@ func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.St if err != nil { return err } - return checkStmtLimit(ctx, se) + return checkStmtLimit(ctx, se, true) } func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error { @@ -239,18 +239,29 @@ func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql s return nil } -func checkStmtLimit(ctx context.Context, se *session) error { +func checkStmtLimit(ctx context.Context, se *session, isFinish bool) error { // If the user insert, insert, insert ... but never commit, TiDB would OOM. // So we limit the statement count in a transaction here. var err error sessVars := se.GetSessionVars() history := GetHistory(se) - if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) { + stmtCount := history.Count() + if !isFinish { + // history stmt count + current stmt, since current stmt is not finish, it has not add to history. + stmtCount++ + } + if stmtCount > int(config.GetGlobalConfig().Performance.StmtCountLimit) { if !sessVars.BatchCommit { se.RollbackTxn(ctx) - return errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", - history.Count(), sessVars.IsAutocommit()) + return errors.Errorf("statement count %d exceeds the transaction limitation, transaction has been rollback, autocommit = %t", + stmtCount, sessVars.IsAutocommit()) + } + if !isFinish { + // if the stmt is not finish execute, then just return, since some work need to be done such as StmtCommit. + return nil } + // If the stmt is finish execute, and exceed the StmtCountLimit, and BatchCommit is true, + // then commit the current transaction and create a new transaction. err = se.NewTxn(ctx) // The transaction does not committed yet, we need to keep it in transaction. // The last history could not be "commit"/"rollback" statement. @@ -305,6 +316,14 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) if err != nil { return nil, err } + if sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { + // Only when the txn is could retry and the statement is not read only, need to do stmt-count-limit check, + // otherwise, the stmt won't be add into stmt history, and also don't need check. + // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit + if err := checkStmtLimit(ctx, se, false); err != nil { + return nil, err + } + } rs, err = s.Exec(ctx) sessVars.TxnCtx.StatementCount++ if !s.IsReadOnly(sessVars) { @@ -349,6 +368,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) } // GetHistory get all stmtHistory in current txn. Exported only for test. +// If stmtHistory is nil, will create a new one for current txn. func GetHistory(ctx sessionctx.Context) *StmtHistory { hist, ok := ctx.GetSessionVars().TxnCtx.History.(*StmtHistory) if ok {