From 1d10a5ee46dbab7333f21ae6585ef08d72cd5145 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 8 Nov 2023 15:53:16 +0800 Subject: [PATCH 01/16] session: fix select for update statement can't got stmt-count-limit error Signed-off-by: crazycs520 --- .../testserverclient/server_client.go | 37 +++++++++++++++++++ pkg/server/tests/tidb_test.go | 5 +++ pkg/session/session.go | 5 +++ pkg/session/tidb.go | 14 ++++--- 4 files changed, 56 insertions(+), 5 deletions(-) diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index f02a288baa514..3292922b78781 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" tmysql "github.com/pingcap/tidb/pkg/parser/mysql" @@ -2446,4 +2447,40 @@ func (cli *TestServerClient) RunTestInfoschemaClientErrors(t *testing.T) { }) } +func (cli *TestServerClient) RunTestStmtCountLimit(t *testing.T) { + originalStmtCountLimit := config.GetGlobalConfig().Performance.StmtCountLimit + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.StmtCountLimit = 3 + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.StmtCountLimit = originalStmtCountLimit + }) + }() + + cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + dbt.MustExec("create table t (id int key);") + dbt.MustExec("set @@tidb_disable_txn_auto_retry=0;") + dbt.MustExec("set autocommit=0;") + dbt.MustExec("begin optimistic;") + dbt.MustExec("insert into t values (1);") + dbt.MustExec("insert into t values (2);") + _, err := dbt.GetDB().Query("select * from t for update;") + require.Error(t, err) + require.Equal(t, "Error 1105 (HY000): statement count 4 exceeds the transaction limitation, transaction has been rollback, autocommit = false", err.Error()) + dbt.MustExec("insert into t values (3);") + dbt.MustExec("commit;") + rows := dbt.MustQuery("select * from t;") + var id int + count := 0 + for rows.Next() { + rows.Scan(&id) + count++ + } + require.NoError(t, rows.Close()) + require.Equal(t, 3, id) + require.Equal(t, 1, count) + }) +} + //revive:enable:exported diff --git a/pkg/server/tests/tidb_test.go b/pkg/server/tests/tidb_test.go index f4f15f5d7ab08..365fad6d60ecc 100644 --- a/pkg/server/tests/tidb_test.go +++ b/pkg/server/tests/tidb_test.go @@ -1126,6 +1126,11 @@ func TestSumAvg(t *testing.T) { ts.RunTestSumAvg(t) } +func TestStmtCountLimit(t *testing.T) { + ts := createTidbTestSuite(t) + ts.RunTestStmtCountLimit(t) +} + func TestNullFlag(t *testing.T) { ts := createTidbTestSuite(t) diff --git a/pkg/session/session.go b/pkg/session/session.go index 4c1a62e08e6ad..8ede220329933 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2430,6 +2430,11 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. } } } + if err == nil && sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { + if err := checkStmtLimit(ctx, se, true); err != nil { + return nil, err + } + } return &execStmtResult{ RecordSet: rs, sql: s, diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index d6c9c59a6a4d3..31e1ba49ac56d 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -271,7 +271,7 @@ func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.St if err != nil { return err } - return checkStmtLimit(ctx, se) + return checkStmtLimit(ctx, se, false) } func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error { @@ -305,17 +305,21 @@ func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql s return nil } -func checkStmtLimit(ctx context.Context, se *session) error { +func checkStmtLimit(ctx context.Context, se *session, beforeExec bool) error { // If the user insert, insert, insert ... but never commit, TiDB would OOM. // So we limit the statement count in a transaction here. var err error sessVars := se.GetSessionVars() history := GetHistory(se) - if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) { + stmtCount := history.Count() + if beforeExec { + stmtCount += 1 + } + if stmtCount > int(config.GetGlobalConfig().Performance.StmtCountLimit) { if !sessVars.BatchCommit { se.RollbackTxn(ctx) - return errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", - history.Count(), sessVars.IsAutocommit()) + return errors.Errorf("statement count %d exceeds the transaction limitation, transaction has been rollback, autocommit = %t", + stmtCount, sessVars.IsAutocommit()) } err = sessiontxn.NewTxn(ctx, se) // The transaction does not committed yet, we need to keep it in transaction. From 6598e03902c3c3e715ea7412b15d74ed9309ea95 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 8 Nov 2023 16:16:43 +0800 Subject: [PATCH 02/16] make bazel_prepare Signed-off-by: crazycs520 --- pkg/server/internal/testserverclient/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/server/internal/testserverclient/BUILD.bazel b/pkg/server/internal/testserverclient/BUILD.bazel index 1f751d9f38580..b29f420f7e1f5 100644 --- a/pkg/server/internal/testserverclient/BUILD.bazel +++ b/pkg/server/internal/testserverclient/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/server/internal/testserverclient", visibility = ["//pkg/server:__subpackages__"], deps = [ + "//pkg/config", "//pkg/errno", "//pkg/kv", "//pkg/parser/mysql", From ab65def540def7953f96337e0bcc82acf436dc43 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 8 Nov 2023 16:17:30 +0800 Subject: [PATCH 03/16] fix lint Signed-off-by: crazycs520 --- pkg/session/tidb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index 31e1ba49ac56d..dc251afd45124 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -313,7 +313,7 @@ func checkStmtLimit(ctx context.Context, se *session, beforeExec bool) error { history := GetHistory(se) stmtCount := history.Count() if beforeExec { - stmtCount += 1 + stmtCount++ } if stmtCount > int(config.GetGlobalConfig().Performance.StmtCountLimit) { if !sessVars.BatchCommit { From c1006325c103773e5d26822c0cd16c01377dbcd4 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 8 Nov 2023 19:56:17 +0800 Subject: [PATCH 04/16] refactor code and add test Signed-off-by: crazycs520 --- .../testserverclient/server_client.go | 37 +++++++++++++++++++ pkg/session/session.go | 11 +++--- pkg/session/tidb.go | 12 ++---- 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index 3292922b78781..f7d614328ef1a 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -2480,6 +2480,43 @@ func (cli *TestServerClient) RunTestStmtCountLimit(t *testing.T) { require.NoError(t, rows.Close()) require.Equal(t, 3, id) require.Equal(t, 1, count) + + dbt.MustExec("delete from t;") + dbt.MustExec("commit;") + dbt.MustExec("set @@tidb_disable_txn_auto_retry=0;") + dbt.MustExec("set autocommit=0;") + dbt.MustExec("begin optimistic;") + dbt.MustExec("insert into t values (1);") + dbt.MustExec("insert into t values (2);") + _, err = dbt.GetDB().Exec("insert into t values (3);") + require.Error(t, err) + require.Equal(t, "Error 1105 (HY000): statement count 4 exceeds the transaction limitation, transaction has been rollback, autocommit = false", err.Error()) + dbt.MustExec("commit;") + rows = dbt.MustQuery("select count(*) from t;") + for rows.Next() { + rows.Scan(&count) + } + require.NoError(t, rows.Close()) + require.Equal(t, 0, count) + + dbt.MustExec("delete from t;") + dbt.MustExec("commit;") + dbt.MustExec("set @@tidb_batch_commit=1;") + dbt.MustExec("set @@tidb_disable_txn_auto_retry=0;") + dbt.MustExec("set autocommit=0;") + dbt.MustExec("begin optimistic;") + dbt.MustExec("insert into t values (1);") + dbt.MustExec("insert into t values (2);") + dbt.MustExec("insert into t values (3);") + dbt.MustExec("insert into t values (4);") + dbt.MustExec("insert into t values (5);") + dbt.MustExec("commit;") + rows = dbt.MustQuery("select count(*) from t;") + for rows.Next() { + rows.Scan(&count) + } + require.NoError(t, rows.Close()) + require.Equal(t, 5, count) }) } diff --git a/pkg/session/session.go b/pkg/session/session.go index 8ede220329933..2a220c001f12d 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2417,6 +2417,12 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. if err != nil { return nil, err } + if sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { + // Only when the txn is could retry and the statement is not read only, need to do stmt-count-limit check. + if err := checkStmtLimit(ctx, se); err != nil { + return nil, err + } + } rs, err = s.Exec(ctx) se.updateTelemetryMetric(s.(*executor.ExecStmt)) @@ -2430,11 +2436,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. } } } - if err == nil && sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { - if err := checkStmtLimit(ctx, se, true); err != nil { - return nil, err - } - } return &execStmtResult{ RecordSet: rs, sql: s, diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index dc251afd45124..ad3ffa9a489de 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -268,10 +268,7 @@ func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.St // Reset txn state to invalid to dispose the pending start ts. se.txn.changeToInvalid() } - if err != nil { - return err - } - return checkStmtLimit(ctx, se, false) + return err } func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error { @@ -305,16 +302,13 @@ func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql s return nil } -func checkStmtLimit(ctx context.Context, se *session, beforeExec bool) error { +func checkStmtLimit(ctx context.Context, se *session) error { // If the user insert, insert, insert ... but never commit, TiDB would OOM. // So we limit the statement count in a transaction here. var err error sessVars := se.GetSessionVars() history := GetHistory(se) - stmtCount := history.Count() - if beforeExec { - stmtCount++ - } + stmtCount := history.Count() + 1 // history stmt count + current stmt. if stmtCount > int(config.GetGlobalConfig().Performance.StmtCountLimit) { if !sessVars.BatchCommit { se.RollbackTxn(ctx) From 7ea6f2b61060075c07a68fe035558cf7a34b5813 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 8 Nov 2023 20:09:37 +0800 Subject: [PATCH 05/16] add comment Signed-off-by: crazycs520 --- pkg/session/session.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index 2a220c001f12d..f68e8a2eb3a5e 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2418,7 +2418,8 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. return nil, err } if sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { - // Only when the txn is could retry and the statement is not read only, need to do stmt-count-limit check. + // Only when the txn is could retry and the statement is not read only, need to do stmt-count-limit check, + // otherwise, the stmt won't be add into stmt history, and also don't need check. if err := checkStmtLimit(ctx, se); err != nil { return nil, err } From ec32f10c2245d28579f06141ffbc3dc3ed6a8e4f Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 8 Nov 2023 20:14:55 +0800 Subject: [PATCH 06/16] update comment Signed-off-by: crazycs520 --- pkg/session/session.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index f68e8a2eb3a5e..22b6e0fa25e99 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2418,8 +2418,9 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. return nil, err } if sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { - // Only when the txn is could retry and the statement is not read only, need to do stmt-count-limit check, + // Only when the txn auto retry is enabled and the statement is not read only, need to do the stmt-count-limit check, // otherwise, the stmt won't be add into stmt history, and also don't need check. + // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit if err := checkStmtLimit(ctx, se); err != nil { return nil, err } From 756727da7765ce49cf275b8967f698dad95401ec Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 10:50:11 +0800 Subject: [PATCH 07/16] remove canReuseTxnWhenExplicitBegin since it is always false Signed-off-by: crazycs520 --- pkg/sessiontxn/isolation/base.go | 33 +++++++++----------------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index 198cd707b18f7..62ceafa80e960 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -87,17 +87,15 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn return err } case sessiontxn.EnterNewTxnWithBeginStmt: - if !canReuseTxnWhenExplicitBegin(p.sctx) { - // As we will enter a new txn, we need to commit the old txn if it's still valid. - // There are two main steps here to enter a new txn: - // 1. prepareTxnWithOracleTS - // 2. ActivateTxn - if err := internal.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil { - return err - } - if err := p.prepareTxnWithOracleTS(); err != nil { - return err - } + // As we will enter a new txn, we need to commit the old txn if it's still valid. + // There are two main steps here to enter a new txn: + // 1. prepareTxnWithOracleTS + // 2. ActivateTxn + if err := internal.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil { + return err + } + if err := p.prepareTxnWithOracleTS(); err != nil { + return err } sessVars.SetInTxn(true) case sessiontxn.EnterNewTxnBeforeStmt: @@ -468,19 +466,6 @@ func (p *baseTxnContextProvider) getSnapshotByTS(snapshotTS uint64) (kv.Snapshot return snapshot, nil } -// canReuseTxnWhenExplicitBegin returns whether we should reuse the txn when starting a transaction explicitly -func canReuseTxnWhenExplicitBegin(sctx sessionctx.Context) bool { - sessVars := sctx.GetSessionVars() - txnCtx := sessVars.TxnCtx - // If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the - // need to call NewTxn, which commits the existing transaction and begins a new one. - // If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should - // always create a new transaction. - // If the variable `tidb_snapshot` is set, we should always create a new transaction because the current txn may be - // initialized with snapshot ts. - return txnCtx.History == nil && !txnCtx.IsStaleness && sessVars.SnapshotTS == 0 -} - // newOracleFuture creates new future according to the scope and the session context func newOracleFuture(ctx context.Context, sctx sessionctx.Context, scope string) oracle.Future { r, ctx := tracing.StartRegionEx(ctx, "isolation.newOracleFuture") From 1b31f28b2bff9eb442e620ea9c2bc49571958ded Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 10:52:49 +0800 Subject: [PATCH 08/16] Revert "remove canReuseTxnWhenExplicitBegin since it is always false" This reverts commit 756727da7765ce49cf275b8967f698dad95401ec. --- pkg/sessiontxn/isolation/base.go | 33 +++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index 62ceafa80e960..198cd707b18f7 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -87,15 +87,17 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn return err } case sessiontxn.EnterNewTxnWithBeginStmt: - // As we will enter a new txn, we need to commit the old txn if it's still valid. - // There are two main steps here to enter a new txn: - // 1. prepareTxnWithOracleTS - // 2. ActivateTxn - if err := internal.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil { - return err - } - if err := p.prepareTxnWithOracleTS(); err != nil { - return err + if !canReuseTxnWhenExplicitBegin(p.sctx) { + // As we will enter a new txn, we need to commit the old txn if it's still valid. + // There are two main steps here to enter a new txn: + // 1. prepareTxnWithOracleTS + // 2. ActivateTxn + if err := internal.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil { + return err + } + if err := p.prepareTxnWithOracleTS(); err != nil { + return err + } } sessVars.SetInTxn(true) case sessiontxn.EnterNewTxnBeforeStmt: @@ -466,6 +468,19 @@ func (p *baseTxnContextProvider) getSnapshotByTS(snapshotTS uint64) (kv.Snapshot return snapshot, nil } +// canReuseTxnWhenExplicitBegin returns whether we should reuse the txn when starting a transaction explicitly +func canReuseTxnWhenExplicitBegin(sctx sessionctx.Context) bool { + sessVars := sctx.GetSessionVars() + txnCtx := sessVars.TxnCtx + // If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the + // need to call NewTxn, which commits the existing transaction and begins a new one. + // If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should + // always create a new transaction. + // If the variable `tidb_snapshot` is set, we should always create a new transaction because the current txn may be + // initialized with snapshot ts. + return txnCtx.History == nil && !txnCtx.IsStaleness && sessVars.SnapshotTS == 0 +} + // newOracleFuture creates new future according to the scope and the session context func newOracleFuture(ctx context.Context, sctx sessionctx.Context, scope string) oracle.Future { r, ctx := tracing.StartRegionEx(ctx, "isolation.newOracleFuture") From 43347403777b980682353b4cd29de89bf199ad6a Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 11:03:44 +0800 Subject: [PATCH 09/16] always call GetHistory Signed-off-by: crazycs520 --- pkg/session/tidb.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index ad3ffa9a489de..fcb23e990a715 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -241,10 +241,12 @@ func recordAbortTxnDuration(sessVars *variable.SessionVars, isInternal bool) { func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error { sessVars := se.sessionVars + // Call GetHistory here to init stmtHistory if needed, since canReuseTxnWhenExplicitBegin will check whether stmtHistory is nil. + txnStmtHistory := GetHistory(se) if !sql.IsReadOnly(sessVars) { // All the history should be added here. if meetsErr == nil && sessVars.TxnCtx.CouldRetry { - GetHistory(se).Add(sql, sessVars.StmtCtx) + txnStmtHistory.Add(sql, sessVars.StmtCtx) } // Handle the stmt commit/rollback. @@ -326,6 +328,7 @@ func checkStmtLimit(ctx context.Context, se *session) error { } // GetHistory get all stmtHistory in current txn. Exported only for test. +// If stmtHistory is nil, will create a new one for current txn. func GetHistory(ctx sessionctx.Context) *StmtHistory { hist, ok := ctx.GetSessionVars().TxnCtx.History.(*StmtHistory) if ok { From e5d136b2e817a7bd266663b7e8390b90eae31002 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 11:28:06 +0800 Subject: [PATCH 10/16] always call checkStmtLimit Signed-off-by: crazycs520 --- pkg/session/session.go | 10 +++------- pkg/session/tidb.go | 3 +-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index 22b6e0fa25e99..c50114cfc2b1b 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2417,13 +2417,9 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. if err != nil { return nil, err } - if sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { - // Only when the txn auto retry is enabled and the statement is not read only, need to do the stmt-count-limit check, - // otherwise, the stmt won't be add into stmt history, and also don't need check. - // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit - if err := checkStmtLimit(ctx, se); err != nil { - return nil, err - } + // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit + if err := checkStmtLimit(ctx, se); err != nil { + return nil, err } rs, err = s.Exec(ctx) diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index fcb23e990a715..ed5a83e7cbf82 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -242,11 +242,10 @@ func recordAbortTxnDuration(sessVars *variable.SessionVars, isInternal bool) { func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error { sessVars := se.sessionVars // Call GetHistory here to init stmtHistory if needed, since canReuseTxnWhenExplicitBegin will check whether stmtHistory is nil. - txnStmtHistory := GetHistory(se) if !sql.IsReadOnly(sessVars) { // All the history should be added here. if meetsErr == nil && sessVars.TxnCtx.CouldRetry { - txnStmtHistory.Add(sql, sessVars.StmtCtx) + GetHistory(se).Add(sql, sessVars.StmtCtx) } // Handle the stmt commit/rollback. From b32cf5043cb1eb0960cb90a3efb9d45344d2ba85 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 11:37:42 +0800 Subject: [PATCH 11/16] remove comment Signed-off-by: crazycs520 --- pkg/session/tidb.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index ed5a83e7cbf82..523a72dc58080 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -241,7 +241,6 @@ func recordAbortTxnDuration(sessVars *variable.SessionVars, isInternal bool) { func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error { sessVars := se.sessionVars - // Call GetHistory here to init stmtHistory if needed, since canReuseTxnWhenExplicitBegin will check whether stmtHistory is nil. if !sql.IsReadOnly(sessVars) { // All the history should be added here. if meetsErr == nil && sessVars.TxnCtx.CouldRetry { From 6f1ce34271f7d42edf863a5bc05a8b586e315625 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 13:15:27 +0800 Subject: [PATCH 12/16] refactor Signed-off-by: crazycs520 --- pkg/session/session.go | 8 ++++---- pkg/session/test/txn/txn_test.go | 10 ++++++++++ pkg/session/tidb.go | 15 ++++++++++++--- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index c50114cfc2b1b..d834d9575cc2b 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2417,10 +2417,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. if err != nil { return nil, err } - // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit - if err := checkStmtLimit(ctx, se); err != nil { - return nil, err - } rs, err = s.Exec(ctx) se.updateTelemetryMetric(s.(*executor.ExecStmt)) @@ -2434,6 +2430,10 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. } } } + // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit + if err := checkStmtLimit(ctx, se, true); err != nil { + return nil, err + } return &execStmtResult{ RecordSet: rs, sql: s, diff --git a/pkg/session/test/txn/txn_test.go b/pkg/session/test/txn/txn_test.go index a0af220f1077a..3f5893157ea34 100644 --- a/pkg/session/test/txn/txn_test.go +++ b/pkg/session/test/txn/txn_test.go @@ -379,6 +379,16 @@ func TestBatchCommit(t *testing.T) { tk.MustExec("insert into t values (7)") tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("delete from t") + tk.MustExec("commit") + tk.MustExec("begin") + tk.MustExec("explain analyze insert into t values (5)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("explain analyze insert into t values (6)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("explain analyze insert into t values (7)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + // The session is still in transaction. tk.MustExec("insert into t values (8)") tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index 523a72dc58080..26b3774cfe5c1 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -268,7 +268,10 @@ func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.St // Reset txn state to invalid to dispose the pending start ts. se.txn.changeToInvalid() } - return err + if err != nil { + return err + } + return checkStmtLimit(ctx, se, false) } func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error { @@ -302,19 +305,25 @@ func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql s return nil } -func checkStmtLimit(ctx context.Context, se *session) error { +func checkStmtLimit(ctx context.Context, se *session, beforeFinish bool) error { // If the user insert, insert, insert ... but never commit, TiDB would OOM. // So we limit the statement count in a transaction here. var err error sessVars := se.GetSessionVars() history := GetHistory(se) - stmtCount := history.Count() + 1 // history stmt count + current stmt. + stmtCount := history.Count() + if beforeFinish { + stmtCount++ // history stmt count + current stmt. + } if stmtCount > int(config.GetGlobalConfig().Performance.StmtCountLimit) { if !sessVars.BatchCommit { se.RollbackTxn(ctx) return errors.Errorf("statement count %d exceeds the transaction limitation, transaction has been rollback, autocommit = %t", stmtCount, sessVars.IsAutocommit()) } + if beforeFinish { + return nil + } err = sessiontxn.NewTxn(ctx, se) // The transaction does not committed yet, we need to keep it in transaction. // The last history could not be "commit"/"rollback" statement. From 5cfeadda59b809e0f6be75a4c7ff222f428f46a2 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 14:12:37 +0800 Subject: [PATCH 13/16] refine comment Signed-off-by: crazycs520 --- pkg/session/session.go | 2 +- pkg/session/tidb.go | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index d834d9575cc2b..5d3fa21354f7e 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2431,7 +2431,7 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. } } // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit - if err := checkStmtLimit(ctx, se, true); err != nil { + if err := checkStmtLimit(ctx, se, false); err != nil { return nil, err } return &execStmtResult{ diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index 26b3774cfe5c1..a62e56c544f46 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -271,7 +271,7 @@ func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.St if err != nil { return err } - return checkStmtLimit(ctx, se, false) + return checkStmtLimit(ctx, se, true) } func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error { @@ -305,15 +305,16 @@ func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql s return nil } -func checkStmtLimit(ctx context.Context, se *session, beforeFinish bool) error { +func checkStmtLimit(ctx context.Context, se *session, isFinish bool) error { // If the user insert, insert, insert ... but never commit, TiDB would OOM. // So we limit the statement count in a transaction here. var err error sessVars := se.GetSessionVars() history := GetHistory(se) stmtCount := history.Count() - if beforeFinish { - stmtCount++ // history stmt count + current stmt. + if !isFinish { + // history stmt count + current stmt. + stmtCount++ } if stmtCount > int(config.GetGlobalConfig().Performance.StmtCountLimit) { if !sessVars.BatchCommit { @@ -321,9 +322,12 @@ func checkStmtLimit(ctx context.Context, se *session, beforeFinish bool) error { return errors.Errorf("statement count %d exceeds the transaction limitation, transaction has been rollback, autocommit = %t", stmtCount, sessVars.IsAutocommit()) } - if beforeFinish { + if !isFinish { + // if the stmt is not finish execute, then just return. return nil } + // If the stmt is finish execute, and exceed the StmtCountLimit, and BatchCommit is true, + // then commit the current transaction and create a new transaction. err = sessiontxn.NewTxn(ctx, se) // The transaction does not committed yet, we need to keep it in transaction. // The last history could not be "commit"/"rollback" statement. From 045813b8410131f047155cfd78f971c2636de98a Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 14:26:03 +0800 Subject: [PATCH 14/16] refine and add comment Signed-off-by: crazycs520 --- pkg/session/session.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index 5d3fa21354f7e..74e4877d8bae1 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2417,6 +2417,14 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. if err != nil { return nil, err } + if sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { + // Only when the txn is could retry and the statement is not read only, need to do stmt-count-limit check, + // otherwise, the stmt won't be add into stmt history, and also don't need check. + // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit + if err := checkStmtLimit(ctx, se, false); err != nil { + return nil, err + } + } rs, err = s.Exec(ctx) se.updateTelemetryMetric(s.(*executor.ExecStmt)) @@ -2430,10 +2438,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. } } } - // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit - if err := checkStmtLimit(ctx, se, false); err != nil { - return nil, err - } return &execStmtResult{ RecordSet: rs, sql: s, From c8cf1eca4706e74c9a45fb8986eeb1443b0be968 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 14:28:09 +0800 Subject: [PATCH 15/16] refine comment Signed-off-by: crazycs520 --- pkg/session/tidb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index a62e56c544f46..d0b95f8c1b76c 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -323,7 +323,7 @@ func checkStmtLimit(ctx context.Context, se *session, isFinish bool) error { stmtCount, sessVars.IsAutocommit()) } if !isFinish { - // if the stmt is not finish execute, then just return. + // if the stmt is not finish execute, then just return, since some work need to be done such as StmtCommit. return nil } // If the stmt is finish execute, and exceed the StmtCountLimit, and BatchCommit is true, From a906bd36aba491d2a1841b801fd4e018929544a4 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 9 Nov 2023 14:30:42 +0800 Subject: [PATCH 16/16] refine comment Signed-off-by: crazycs520 --- pkg/session/tidb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index d0b95f8c1b76c..16e3a2f423678 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -313,7 +313,7 @@ func checkStmtLimit(ctx context.Context, se *session, isFinish bool) error { history := GetHistory(se) stmtCount := history.Count() if !isFinish { - // history stmt count + current stmt. + // history stmt count + current stmt, since current stmt is not finish, it has not add to history. stmtCount++ } if stmtCount > int(config.GetGlobalConfig().Performance.StmtCountLimit) {