From 40e61c57c9749f602b5ee34d8383784baf8d4531 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 23 Aug 2024 17:52:43 +0900 Subject: [PATCH] This is an automated cherry-pick of #55547 Signed-off-by: ti-chi-bot --- DEPS.bzl | 11 + go.mod | 8 + go.sum | 13 + pkg/session/test/txn/BUILD.bazel | 28 ++ pkg/session/test/txn/txn_test.go | 555 +++++++++++++++++++++++++++++++ 5 files changed, 615 insertions(+) create mode 100644 pkg/session/test/txn/BUILD.bazel create mode 100644 pkg/session/test/txn/txn_test.go diff --git a/DEPS.bzl b/DEPS.bzl index d705b4a8eb7320..d9d52be0444eb7 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3603,8 +3603,19 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", +<<<<<<< HEAD sum = "h1:ZVeek5EpxLzG/soloyJxmYa/jJ8KsrOAP+ZGn7+JPyw=", version = "v2.0.4-0.20240611032030-02a6a912e7a8", +======= + sha256 = "2c26a7a94e44e2aae520f2013f8d738c5c5f1fb9f70b76894843f6827ce945f7", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240821073530-75e3705e58f1", + urls = [ + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240821073530-75e3705e58f1.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240821073530-75e3705e58f1.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240821073530-75e3705e58f1.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240821073530-75e3705e58f1.zip", + ], +>>>>>>> 599977b1d6a (*: bump client-go to fix correctness issue of membuffer snapshot read (#55547)) ) go_repository( name = "com_github_tikv_pd_client", diff --git a/go.mod b/go.mod index 2361426229f1e6..5f7e2dbadc4edc 100644 --- a/go.mod +++ b/go.mod @@ -90,10 +90,18 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 +<<<<<<< HEAD github.com/tikv/client-go/v2 v2.0.4-0.20240611032030-02a6a912e7a8 github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 +======= + github.com/tidwall/btree v1.7.0 + github.com/tikv/client-go/v2 v2.0.8-0.20240821073530-75e3705e58f1 + github.com/tikv/pd/client v0.0.0-20240805092608-838ee7983b78 + github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a + github.com/twmb/murmur3 v1.1.6 +>>>>>>> 599977b1d6a (*: bump client-go to fix correctness issue of membuffer snapshot read (#55547)) github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/vbauerster/mpb/v7 v7.5.3 github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f diff --git a/go.sum b/go.sum index 8b2168d154b324..3d7610c9ec1587 100644 --- a/go.sum +++ b/go.sum @@ -948,12 +948,25 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= +<<<<<<< HEAD github.com/tikv/client-go/v2 v2.0.4-0.20240611032030-02a6a912e7a8 h1:ZVeek5EpxLzG/soloyJxmYa/jJ8KsrOAP+ZGn7+JPyw= github.com/tikv/client-go/v2 v2.0.4-0.20240611032030-02a6a912e7a8/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ= github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 h1:e4hLUKfgfPeJPZwOfU+/I/03G0sn6IZqVcbX/5o+hvM= github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05/go.mod h1:MLIl+d2WbOF4A3U88WKtyXrQQW417wZDDvBcq2IW9bQ= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= +======= +github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= +github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= +github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= +github.com/tikv/client-go/v2 v2.0.8-0.20240821073530-75e3705e58f1 h1:QivCyAQxBOrXWC9X1/yc9U3Hw45usVnicjOg7T/rpgE= +github.com/tikv/client-go/v2 v2.0.8-0.20240821073530-75e3705e58f1/go.mod h1:4HDOAx8OXAJPtqhCZ03IhChXgaFs4B3+vSrPWmiPxjg= +github.com/tikv/pd/client v0.0.0-20240805092608-838ee7983b78 h1:PtW+yTvs9eGTMblulaCHmJ5OtifuE4SJXCACCtkd6ko= +github.com/tikv/pd/client v0.0.0-20240805092608-838ee7983b78/go.mod h1:TxrJRY949Vl14Lmarx6hTNP/HEDYzn4dP0KmjdzQ59w= +github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo= +github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a/go.mod h1:mkjARE7Yr8qU23YcGMSALbIxTQ9r9QBVahQOBRfU460= +>>>>>>> 599977b1d6a (*: bump client-go to fix correctness issue of membuffer snapshot read (#55547)) github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= diff --git a/pkg/session/test/txn/BUILD.bazel b/pkg/session/test/txn/BUILD.bazel new file mode 100644 index 00000000000000..351b06e8e07fa2 --- /dev/null +++ b/pkg/session/test/txn/BUILD.bazel @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "txn_test", + timeout = "short", + srcs = [ + "main_test.go", + "txn_test.go", + ], + flaky = True, + race = "on", + shard_count = 9, + deps = [ + "//pkg/config", + "//pkg/kv", + "//pkg/parser/auth", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/testkit", + "//pkg/testkit/testmain", + "//pkg/testkit/testsetup", + "//pkg/util/dbterror/plannererrors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/pkg/session/test/txn/txn_test.go b/pkg/session/test/txn/txn_test.go new file mode 100644 index 00000000000000..8d960fcffaf0ec --- /dev/null +++ b/pkg/session/test/txn/txn_test.go @@ -0,0 +1,555 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txn + +import ( + "context" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/auth" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" + "github.com/stretchr/testify/require" +) + +// TestAutocommit . See https://dev.mysql.com/doc/internals/en/status-flags.html +func TestAutocommit(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t;") + require.Greater(t, int(tk.Session().Status()&mysql.ServerStatusAutocommit), 0) + tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") + require.Greater(t, int(tk.Session().Status()&mysql.ServerStatusAutocommit), 0) + tk.MustExec("insert t values ()") + require.Greater(t, int(tk.Session().Status()&mysql.ServerStatusAutocommit), 0) + tk.MustExec("begin") + require.Greater(t, int(tk.Session().Status()&mysql.ServerStatusAutocommit), 0) + tk.MustExec("insert t values ()") + require.Greater(t, int(tk.Session().Status()&mysql.ServerStatusAutocommit), 0) + tk.MustExec("drop table if exists t") + require.Greater(t, int(tk.Session().Status()&mysql.ServerStatusAutocommit), 0) + + tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") + require.Greater(t, int(tk.Session().Status()&mysql.ServerStatusAutocommit), 0) + tk.MustExec("set autocommit=0") + require.Equal(t, 0, int(tk.Session().Status()&mysql.ServerStatusAutocommit)) + tk.MustExec("insert t values ()") + require.Equal(t, 0, int(tk.Session().Status()&mysql.ServerStatusAutocommit)) + tk.MustExec("commit") + require.Equal(t, 0, int(tk.Session().Status()&mysql.ServerStatusAutocommit)) + tk.MustExec("drop table if exists t") + require.Equal(t, 0, int(tk.Session().Status()&mysql.ServerStatusAutocommit)) + tk.MustExec("set autocommit='On'") + require.Greater(t, int(tk.Session().Status()&mysql.ServerStatusAutocommit), 0) + + // When autocommit is 0, transaction start ts should be the first *valid* + // statement, rather than *any* statement. + tk.MustExec("create table t (id int key)") + tk.MustExec("set @@autocommit = 0") + tk.MustExec("rollback") + tk.MustExec("set @@autocommit = 0") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("insert into t select 1") + //nolint:all_revive,revive + tk.MustQuery("select * from t").Check(testkit.Rows("1")) + tk.MustExec("delete from t") + + // When the transaction is rolled back, the global set statement would succeed. + tk.MustExec("set @@global.autocommit = 0") + tk.MustExec("begin") + tk.MustExec("insert into t values (1)") + tk.MustExec("set @@global.autocommit = 1") + tk.MustExec("rollback") + tk.MustQuery("select count(*) from t where id = 1").Check(testkit.Rows("0")) + tk.MustQuery("select @@global.autocommit").Check(testkit.Rows("1")) + + // When the transaction is committed because of switching mode, the session set statement should succeed. + tk.MustExec("set autocommit = 0") + tk.MustExec("begin") + tk.MustExec("insert into t values (1)") + tk.MustExec("set autocommit = 1") + tk.MustExec("rollback") + tk.MustQuery("select count(*) from t where id = 1").Check(testkit.Rows("1")) + tk.MustQuery("select @@autocommit").Check(testkit.Rows("1")) + + tk.MustExec("set autocommit = 0") + tk.MustExec("insert into t values (2)") + tk.MustExec("set autocommit = 1") + tk.MustExec("rollback") + tk.MustQuery("select count(*) from t where id = 2").Check(testkit.Rows("1")) + tk.MustQuery("select @@autocommit").Check(testkit.Rows("1")) + + // Set should not take effect if the mode is not changed. + tk.MustExec("set autocommit = 0") + tk.MustExec("begin") + tk.MustExec("insert into t values (3)") + tk.MustExec("set autocommit = 0") + tk.MustExec("rollback") + tk.MustQuery("select count(*) from t where id = 3").Check(testkit.Rows("0")) + tk.MustQuery("select @@autocommit").Check(testkit.Rows("0")) + + tk.MustExec("set autocommit = 1") + tk.MustExec("begin") + tk.MustExec("insert into t values (4)") + tk.MustExec("set autocommit = 1") + tk.MustExec("rollback") + tk.MustQuery("select count(*) from t where id = 4").Check(testkit.Rows("0")) + tk.MustQuery("select @@autocommit").Check(testkit.Rows("1")) +} + +// TestTxnLazyInitialize tests that when autocommit = 0, not all statement starts +// a new transaction. +func TestTxnLazyInitialize(t *testing.T) { + testTxnLazyInitialize(t, false) + testTxnLazyInitialize(t, true) +} + +func testTxnLazyInitialize(t *testing.T, isPessimistic bool) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int)") + if isPessimistic { + tk.MustExec("set tidb_txn_mode = 'pessimistic'") + } + + tk.MustExec("set @@autocommit = 0") + _, err := tk.Session().Txn(true) + require.True(t, kv.ErrInvalidTxn.Equal(err)) + txn, err := tk.Session().Txn(false) + require.NoError(t, err) + require.False(t, txn.Valid()) + tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0")) + tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0")) + + // Those statements should not start a new transaction automatically. + tk.MustQuery("select 1") + tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0")) + + tk.MustExec("set @@tidb_general_log = 0") + tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0")) + + // Explain now also build the query and starts a transaction + tk.MustQuery("explain select * from t") + res := tk.MustQuery("select @@tidb_current_ts") + require.NotEqual(t, "0", res.Rows()[0][0]) + + // Begin statement should start a new transaction. + tk.MustExec("begin") + txn, err = tk.Session().Txn(false) + require.NoError(t, err) + require.True(t, txn.Valid()) + tk.MustExec("rollback") + + tk.MustExec("select * from t") + txn, err = tk.Session().Txn(false) + require.NoError(t, err) + require.True(t, txn.Valid()) + tk.MustExec("rollback") + + tk.MustExec("insert into t values (1)") + txn, err = tk.Session().Txn(false) + require.NoError(t, err) + require.True(t, txn.Valid()) + tk.MustExec("rollback") +} + +func TestDisableTxnAutoRetry(t *testing.T) { + store := testkit.CreateMockStoreWithSchemaLease(t, 1*time.Second) + + setTxnTk := testkit.NewTestKit(t, store) + setTxnTk.MustExec("set global tidb_txn_mode=''") + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + + tk1.MustExec("use test") + tk2.MustExec("use test") + + tk1.MustExec("create table no_retry (id int)") + tk1.MustExec("insert into no_retry values (1)") + tk1.MustExec("set @@tidb_disable_txn_auto_retry = 1") + + tk1.MustExec("begin") + tk1.MustExec("update no_retry set id = 2") + + tk2.MustExec("begin") + tk2.MustExec("update no_retry set id = 3") + tk2.MustExec("commit") + + // No auto retry because tidb_disable_txn_auto_retry is set to 1. + _, err := tk1.Session().Execute(context.Background(), "commit") + require.Error(t, err) + + // session 1 starts a transaction early. + // execute a select statement to clear retry history. + tk1.MustExec("select 1") + err = tk1.Session().PrepareTxnCtx(context.Background()) + require.NoError(t, err) + // session 2 update the value. + tk2.MustExec("update no_retry set id = 4") + // AutoCommit update will retry, so it would not fail. + tk1.MustExec("update no_retry set id = 5") + + // RestrictedSQL should retry. + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + tk1.Session().ExecuteInternal(ctx, "begin") + + tk2.MustExec("update no_retry set id = 6") + + tk1.Session().ExecuteInternal(ctx, "update no_retry set id = 7") + tk1.Session().ExecuteInternal(ctx, "commit") + + // test for disable transaction local latch + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.TxnLocalLatches.Enabled = false + }) + tk1.MustExec("begin") + tk1.MustExec("update no_retry set id = 9") + + tk2.MustExec("update no_retry set id = 8") + + _, err = tk1.Session().Execute(context.Background(), "commit") + require.Error(t, err) + require.True(t, kv.ErrWriteConflict.Equal(err), fmt.Sprintf("err %v", err)) + require.Contains(t, err.Error(), kv.TxnRetryableMark) + tk1.MustExec("rollback") + + config.UpdateGlobal(func(conf *config.Config) { + conf.TxnLocalLatches.Enabled = true + }) + tk1.MustExec("begin") + tk2.MustExec("alter table no_retry add index idx(id)") + tk2.MustQuery("select * from no_retry").Check(testkit.Rows("8")) + tk1.MustExec("update no_retry set id = 10") + _, err = tk1.Session().Execute(context.Background(), "commit") + require.Error(t, err) + + // set autocommit to begin and commit + tk1.MustExec("set autocommit = 0") + tk1.MustQuery("select * from no_retry").Check(testkit.Rows("8")) + tk2.MustExec("update no_retry set id = 11") + tk1.MustExec("update no_retry set id = 12") + _, err = tk1.Session().Execute(context.Background(), "set autocommit = 1") + require.Error(t, err) + require.True(t, kv.ErrWriteConflict.Equal(err), fmt.Sprintf("err %v", err)) + require.Contains(t, err.Error(), kv.TxnRetryableMark) + tk1.MustExec("rollback") + tk2.MustQuery("select * from no_retry").Check(testkit.Rows("11")) + + tk1.MustExec("set autocommit = 0") + tk1.MustQuery("select * from no_retry").Check(testkit.Rows("11")) + tk2.MustExec("update no_retry set id = 13") + tk1.MustExec("update no_retry set id = 14") + _, err = tk1.Session().Execute(context.Background(), "commit") + require.Error(t, err) + require.True(t, kv.ErrWriteConflict.Equal(err), fmt.Sprintf("err %v", err)) + require.Contains(t, err.Error(), kv.TxnRetryableMark) + tk1.MustExec("rollback") + tk2.MustQuery("select * from no_retry").Check(testkit.Rows("13")) +} + +// The Read-only flags are checked in the planning stage of queries, +// but this test checks we check them again at commit time. +// The main use case for this is a long-running auto-commit statement. +func TestAutoCommitRespectsReadOnly(t *testing.T) { + store := testkit.CreateMockStore(t) + var wg sync.WaitGroup + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + require.NoError(t, tk1.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil, nil)) + require.NoError(t, tk2.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil, nil)) + + tk1.MustExec("create table test.auto_commit_test (a int)") + wg.Add(1) + go func() { + err := tk1.ExecToErr("INSERT INTO test.auto_commit_test VALUES (SLEEP(1))") + require.True(t, terror.ErrorEqual(err, plannererrors.ErrSQLInReadOnlyMode), fmt.Sprintf("err %v", err)) + wg.Done() + }() + tk2.MustExec("SET GLOBAL tidb_restricted_read_only = 1") + err := tk2.ExecToErr("INSERT INTO test.auto_commit_test VALUES (0)") // should also be an error + require.True(t, terror.ErrorEqual(err, plannererrors.ErrSQLInReadOnlyMode), fmt.Sprintf("err %v", err)) + // Reset and check with the privilege to ignore the readonly flag and continue to insert. + wg.Wait() + tk1.MustExec("SET GLOBAL tidb_restricted_read_only = 0") + tk1.MustExec("SET GLOBAL tidb_super_read_only = 0") + tk1.MustExec("GRANT RESTRICTED_REPLICA_WRITER_ADMIN on *.* to 'root'") + + wg.Add(1) + go func() { + tk1.MustExec("INSERT INTO test.auto_commit_test VALUES (SLEEP(1))") + wg.Done() + }() + tk2.MustExec("SET GLOBAL tidb_restricted_read_only = 1") + tk2.MustExec("INSERT INTO test.auto_commit_test VALUES (0)") + + // wait for go routines + wg.Wait() + tk1.MustExec("SET GLOBAL tidb_restricted_read_only = 0") + tk1.MustExec("SET GLOBAL tidb_super_read_only = 0") +} + +func TestTxnRetryErrMsg(t *testing.T) { + store := testkit.CreateMockStore(t) + setTxnTk := testkit.NewTestKit(t, store) + setTxnTk.MustExec("set global tidb_txn_mode=''") + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("create table no_retry (id int)") + tk1.MustExec("insert into no_retry values (1)") + tk1.MustExec("begin") + tk2.MustExec("use test") + tk2.MustExec("update no_retry set id = id + 1") + tk1.MustExec("update no_retry set id = id + 1") + require.NoError(t, failpoint.Enable("tikvclient/mockRetryableErrorResp", `return(true)`)) + _, err := tk1.Session().Execute(context.Background(), "commit") + require.NoError(t, failpoint.Disable("tikvclient/mockRetryableErrorResp")) + require.Error(t, err) + require.True(t, kv.ErrTxnRetryable.Equal(err), "error: %s", err) + require.True(t, strings.Contains(err.Error(), "mock retryable error"), "error: %s", err) + require.True(t, strings.Contains(err.Error(), kv.TxnRetryableMark), "error: %s", err) +} + +func TestSetTxnScope(t *testing.T) { + // Check the default value of @@tidb_enable_local_txn and @@txn_scope without configuring the zone label. + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustQuery("select @@global.tidb_enable_local_txn;").Check(testkit.Rows("0")) + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Check the default value of @@tidb_enable_local_txn and @@txn_scope with configuring the zone label. + require.NoError(t, failpoint.Enable("tikvclient/injectTxnScope", `return("bj")`)) + tk = testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustQuery("select @@global.tidb_enable_local_txn;").Check(testkit.Rows("0")) + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + require.NoError(t, failpoint.Disable("tikvclient/injectTxnScope")) + + // @@tidb_enable_local_txn is off without configuring the zone label. + tk = testkit.NewTestKit(t, store) + tk.MustQuery("select @@global.tidb_enable_local_txn;").Check(testkit.Rows("0")) + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Set @@txn_scope to local. + err := tk.ExecToErr("set @@txn_scope = 'local';") + require.Error(t, err) + require.Regexp(t, `.*txn_scope can not be set to local when tidb_enable_local_txn is off.*`, err) + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Set @@txn_scope to global. + tk.MustExec("set @@txn_scope = 'global';") + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + + // @@tidb_enable_local_txn is off with configuring the zone label. + require.NoError(t, failpoint.Enable("tikvclient/injectTxnScope", `return("bj")`)) + tk = testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustQuery("select @@global.tidb_enable_local_txn;").Check(testkit.Rows("0")) + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Set @@txn_scope to local. + err = tk.ExecToErr("set @@txn_scope = 'local';") + require.Error(t, err) + require.Regexp(t, `.*txn_scope can not be set to local when tidb_enable_local_txn is off.*`, err) + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Set @@txn_scope to global. + tk.MustExec("set @@txn_scope = 'global';") + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + require.NoError(t, failpoint.Disable("tikvclient/injectTxnScope")) + + // @@tidb_enable_local_txn is on without configuring the zone label. + tk = testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set global tidb_enable_local_txn = on;") + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Set @@txn_scope to local. + err = tk.ExecToErr("set @@txn_scope = 'local';") + require.Error(t, err) + require.Regexp(t, `.*txn_scope can not be set to local when zone label is empty or "global".*`, err) + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Set @@txn_scope to global. + tk.MustExec("set @@txn_scope = 'global';") + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + + // @@tidb_enable_local_txn is on with configuring the zone label. + require.NoError(t, failpoint.Enable("tikvclient/injectTxnScope", `return("bj")`)) + tk = testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set global tidb_enable_local_txn = on;") + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.LocalTxnScope)) + require.Equal(t, "bj", tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Set @@txn_scope to global. + tk.MustExec("set @@txn_scope = 'global';") + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.GlobalTxnScope)) + require.Equal(t, kv.GlobalTxnScope, tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Set @@txn_scope to local. + tk.MustExec("set @@txn_scope = 'local';") + tk.MustQuery("select @@txn_scope;").Check(testkit.Rows(kv.LocalTxnScope)) + require.Equal(t, "bj", tk.Session().GetSessionVars().CheckAndGetTxnScope()) + // Try to set @@txn_scope to an invalid value. + err = tk.ExecToErr("set @@txn_scope='foo'") + require.Error(t, err) + require.Regexp(t, `.*txn_scope value should be global or local.*`, err) + require.NoError(t, failpoint.Disable("tikvclient/injectTxnScope")) +} + +func TestErrorRollback(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t_rollback") + tk.MustExec("create table t_rollback (c1 int, c2 int, primary key(c1))") + tk.MustExec("insert into t_rollback values (0, 0)") + + var wg sync.WaitGroup + cnt := 4 + wg.Add(cnt) + num := 20 + + for i := 0; i < cnt; i++ { + go func() { + defer wg.Done() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@session.tidb_retry_limit = 100") + for j := 0; j < num; j++ { + _, _ = tk.Exec("insert into t_rollback values (1, 1)") + tk.MustExec("update t_rollback set c2 = c2 + 1 where c1 = 0") + } + }() + } + + wg.Wait() + tk.MustQuery("select c2 from t_rollback where c1 = 0").Check(testkit.Rows(fmt.Sprint(cnt * num))) +} + +// TestInTrans . See https://dev.mysql.com/doc/internals/en/status-flags.html +func TestInTrans(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") + tk.MustExec("insert t values ()") + tk.MustExec("begin") + txn, err := tk.Session().Txn(true) + require.NoError(t, err) + require.True(t, txn.Valid()) + tk.MustExec("insert t values ()") + require.True(t, txn.Valid()) + tk.MustExec("drop table if exists t;") + require.False(t, txn.Valid()) + tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") + require.False(t, txn.Valid()) + tk.MustExec("insert t values ()") + require.False(t, txn.Valid()) + tk.MustExec("commit") + tk.MustExec("insert t values ()") + + tk.MustExec("set autocommit=0") + tk.MustExec("begin") + require.True(t, txn.Valid()) + tk.MustExec("insert t values ()") + require.True(t, txn.Valid()) + tk.MustExec("commit") + require.False(t, txn.Valid()) + tk.MustExec("insert t values ()") + require.True(t, txn.Valid()) + tk.MustExec("commit") + require.False(t, txn.Valid()) + + tk.MustExec("set autocommit=1") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") + tk.MustExec("begin") + require.True(t, txn.Valid()) + tk.MustExec("insert t values ()") + require.True(t, txn.Valid()) + tk.MustExec("rollback") + require.False(t, txn.Valid()) +} + +func TestMemBufferSnapshotRead(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int primary key, b int, index i(b));") + + tk.MustExec("set session tidb_distsql_scan_concurrency = 1;") + tk.MustExec("set session tidb_index_lookup_join_concurrency = 1;") + tk.MustExec("set session tidb_projection_concurrency=1;") + tk.MustExec("set session tidb_init_chunk_size=1;") + tk.MustExec("set session tidb_max_chunk_size=40;") + tk.MustExec("set session tidb_index_join_batch_size = 10") + + tk.MustExec("begin;") + // write (0, 0), (1, 1), ... ,(100, 100) into membuffer + var sb strings.Builder + sb.WriteString("insert into t values ") + for i := 0; i <= 100; i++ { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString(fmt.Sprintf("(%d, %d)", i, i)) + } + tk.MustExec(sb.String()) + + // insert on duplicate key statement should update the table to (0, 100), (1, 99), ... (100, 0) + // This statement will create UnionScan dynamically during execution, and some UnionScan will see staging data(should be bypassed), + // so it relies on correct snapshot read to get the expected result. + tk.MustExec("insert into t (select /*+ INL_JOIN(t1) */ 100 - t1.a as a, t1.b from t t1, (select a, b from t) t2 where t1.b = t2.b) on duplicate key update b = values(b)") + + require.Empty(t, tk.MustQuery("select a, b from t where a + b != 100;").Rows()) + tk.MustExec("commit;") + require.Empty(t, tk.MustQuery("select a, b from t where a + b != 100;").Rows()) + + tk.MustExec("set session tidb_distsql_scan_concurrency = default;") + tk.MustExec("set session tidb_index_lookup_join_concurrency = default;") + tk.MustExec("set session tidb_projection_concurrency=default;") + tk.MustExec("set session tidb_init_chunk_size=default;") + tk.MustExec("set session tidb_max_chunk_size=default;") + tk.MustExec("set session tidb_index_join_batch_size = default") +}