diff --git a/DEPS.bzl b/DEPS.bzl index ea702b7fdfd55..d23b87c2e67f9 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7145,13 +7145,13 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "9e86def6bedefa6095b20a2d90afa099ca301d49f6792eccb54883201df3063b", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240316105842-98a7df8f413d", + sha256 = "1838f5b1e46ccef68f651efd1a01a2b70037062ba6a08d58b2bd6aa8d3580e0c", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240318065517-a9128e8200ab", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 8fc203642b8a5..43a9a088d523a 100644 --- a/go.mod +++ b/go.mod @@ -107,7 +107,7 @@ require ( github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tidwall/btree v1.7.0 - github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d + github.com/tikv/client-go/v2 v2.0.8-0.20240318065517-a9128e8200ab github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index 412d863583a2e..add08b72d29f0 100644 --- a/go.sum +++ b/go.sum @@ -869,8 +869,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= -github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d h1:QDsglttywjxGkxf7pjInpKTudLwy3qr/sR9BCxBiLow= -github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d/go.mod h1:9s6+YbGt0kW+9qTFDXuc5TkIpwpEf038S1UCa3utsSQ= +github.com/tikv/client-go/v2 v2.0.8-0.20240318065517-a9128e8200ab h1:uzd6N2FtE/9d9g0x6QS2O48+Waxy4qjlyMbwk/vSZRQ= +github.com/tikv/client-go/v2 v2.0.8-0.20240318065517-a9128e8200ab/go.mod h1:9s6+YbGt0kW+9qTFDXuc5TkIpwpEf038S1UCa3utsSQ= github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e h1:kHXMmskVCNyH53u43I73Y5cmZ6yqqder/jGOiI7ylxs= github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg= github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo= diff --git a/pkg/executor/insert.go b/pkg/executor/insert.go index 28625264e65ee..8876180daf64d 100644 --- a/pkg/executor/insert.go +++ b/pkg/executor/insert.go @@ -88,14 +88,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { // If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword, // the to-be-insert rows will be check on duplicate keys and update to the new rows. if len(e.OnDuplicate) > 0 { - if ignoreErr && txn.IsPipelined() { - // P-DML doesn't support staging now. In a INSERT IGNORE ... ON DUPLICATE ... stmt, it's - // possible that 1 row succeeded and 1 row returns error but is ignored, thus the whole - // statement can successfully commit. See TestIssue50043 - // The second row is supposed to be cleanup by the staging mechanism, so forbid this - // case for P-DML temporarily. - return errors.New("cannot use pipelined mode with insert ignore and on duplicate update") - } err := e.batchUpdateDupRows(ctx, rows) if err != nil { return err diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index be76b5b487a89..5d499f251819b 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -2797,8 +2797,17 @@ func TestIssue38756(t *testing.T) { } func TestIssue50043(t *testing.T) { + testIssue50043WithInitSQL(t, "") +} + +func TestIssue50043WithPipelinedDML(t *testing.T) { + testIssue50043WithInitSQL(t, "set @@tidb_dml_type=bulk") +} + +func testIssue50043WithInitSQL(t *testing.T, initSQL string) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec(initSQL) // Test simplified case by update. tk.MustExec("use test") tk.MustExec("create table t (c1 boolean ,c2 decimal ( 37 , 17 ), unique key idx1 (c1 ,c2),unique key idx2 ( c1 ));") diff --git a/pkg/session/txn.go b/pkg/session/txn.go index fbf7920a941bd..7f44220780fc6 100644 --- a/pkg/session/txn.go +++ b/pkg/session/txn.go @@ -113,7 +113,9 @@ func (txn *LazyTxn) initStmtBuf() { } buf := txn.Transaction.GetMemBuffer() txn.initCnt = buf.Len() - txn.stagingHandle = buf.Staging() + if !txn.IsPipelined() { + txn.stagingHandle = buf.Staging() + } } // countHint is estimated count of mutations. @@ -139,7 +141,9 @@ func (txn *LazyTxn) flushStmtBuf() { } } - buf.Release(txn.stagingHandle) + if !txn.IsPipelined() { + buf.Release(txn.stagingHandle) + } txn.initCnt = buf.Len() } @@ -148,7 +152,9 @@ func (txn *LazyTxn) cleanupStmtBuf() { return } buf := txn.Transaction.GetMemBuffer() - buf.Cleanup(txn.stagingHandle) + if !txn.IsPipelined() { + buf.Cleanup(txn.stagingHandle) + } txn.initCnt = buf.Len() txn.mu.Lock() @@ -320,7 +326,7 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context, sctx sessionctx.Co } func (txn *LazyTxn) changeToInvalid() { - if txn.stagingHandle != kv.InvalidStagingHandle { + if txn.stagingHandle != kv.InvalidStagingHandle && !txn.IsPipelined() { txn.Transaction.GetMemBuffer().Cleanup(txn.stagingHandle) } txn.stagingHandle = kv.InvalidStagingHandle diff --git a/pkg/store/driver/txn/unionstore_driver.go b/pkg/store/driver/txn/unionstore_driver.go index 400bca8d921af..c6043be69a60a 100644 --- a/pkg/store/driver/txn/unionstore_driver.go +++ b/pkg/store/driver/txn/unionstore_driver.go @@ -68,24 +68,14 @@ func (m *memBuffer) GetFlags(key kv.Key) (kv.KeyFlags, error) { } func (m *memBuffer) Staging() kv.StagingHandle { - if m.isPipelinedDML { - // 0 stands for staging not supported. - return 0 - } return kv.StagingHandle(m.MemBuffer.Staging()) } func (m *memBuffer) Cleanup(h kv.StagingHandle) { - if m.isPipelinedDML { - return - } m.MemBuffer.Cleanup(int(h)) } func (m *memBuffer) Release(h kv.StagingHandle) { - if m.isPipelinedDML { - return - } m.MemBuffer.Release(int(h)) } diff --git a/pkg/table/tables/mutation_checker.go b/pkg/table/tables/mutation_checker.go index 3ef2c42730b1e..ad47a1dfc565d 100644 --- a/pkg/table/tables/mutation_checker.go +++ b/pkg/table/tables/mutation_checker.go @@ -83,6 +83,9 @@ func CheckDataConsistency( if t.Meta().GetPartitionInfo() != nil { return nil } + if txn.IsPipelined() { + return nil + } if sh == 0 { // some implementations of MemBuffer doesn't support staging, e.g. that in br/pkg/lightning/backend/kv return nil diff --git a/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go b/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go index b5c7a60936dc1..bd3b51789d6d2 100644 --- a/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go +++ b/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go @@ -702,9 +702,9 @@ func TestInsertIgnoreOnDuplicateKeyUpdate(t *testing.T) { tk.MustExec("drop table if exists t1") tk.MustExec("create table t1(a int, b int, unique index u1(a, b), unique index u2(a))") tk.MustExec("insert into t1 values(0, 0), (1, 1)") - tk.MustExecToErr("insert ignore into t1 values (0, 2) ,(1, 3) on duplicate key update b = 5, a = 0") + tk.MustExec("insert ignore into t1 values (0, 2) ,(1, 3) on duplicate key update b = 5, a = 0") // if the statement execute successful, the following check should pass. - // tk.MustQuery("select * from t1").Sort().Check(testkit.Rows("0 5", "1 1")) + tk.MustQuery("select * from t1").Sort().Check(testkit.Rows("0 5", "1 1")) } func TestConflictError(t *testing.T) {