Skip to content

Commit

Permalink
txn: implement staging for Pipelined DML (#51832)
Browse files Browse the repository at this point in the history
ref #50215
  • Loading branch information
ekexium authored Mar 18, 2024
1 parent c5359ca commit af89779
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 33 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7145,13 +7145,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "9e86def6bedefa6095b20a2d90afa099ca301d49f6792eccb54883201df3063b",
strip_prefix = "github.com/tikv/client-go/[email protected].20240316105842-98a7df8f413d",
sha256 = "1838f5b1e46ccef68f651efd1a01a2b70037062ba6a08d58b2bd6aa8d3580e0c",
strip_prefix = "github.com/tikv/client-go/[email protected].20240318065517-a9128e8200ab",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tidwall/btree v1.7.0
github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d
github.com/tikv/client-go/v2 v2.0.8-0.20240318065517-a9128e8200ab
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d h1:QDsglttywjxGkxf7pjInpKTudLwy3qr/sR9BCxBiLow=
github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d/go.mod h1:9s6+YbGt0kW+9qTFDXuc5TkIpwpEf038S1UCa3utsSQ=
github.com/tikv/client-go/v2 v2.0.8-0.20240318065517-a9128e8200ab h1:uzd6N2FtE/9d9g0x6QS2O48+Waxy4qjlyMbwk/vSZRQ=
github.com/tikv/client-go/v2 v2.0.8-0.20240318065517-a9128e8200ab/go.mod h1:9s6+YbGt0kW+9qTFDXuc5TkIpwpEf038S1UCa3utsSQ=
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e h1:kHXMmskVCNyH53u43I73Y5cmZ6yqqder/jGOiI7ylxs=
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo=
Expand Down
8 changes: 0 additions & 8 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
if ignoreErr && txn.IsPipelined() {
// P-DML doesn't support staging now. In a INSERT IGNORE ... ON DUPLICATE ... stmt, it's
// possible that 1 row succeeded and 1 row returns error but is ignored, thus the whole
// statement can successfully commit. See TestIssue50043
// The second row is supposed to be cleanup by the staging mechanism, so forbid this
// case for P-DML temporarily.
return errors.New("cannot use pipelined mode with insert ignore and on duplicate update")
}
err := e.batchUpdateDupRows(ctx, rows)
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2797,8 +2797,17 @@ func TestIssue38756(t *testing.T) {
}

func TestIssue50043(t *testing.T) {
testIssue50043WithInitSQL(t, "")
}

func TestIssue50043WithPipelinedDML(t *testing.T) {
testIssue50043WithInitSQL(t, "set @@tidb_dml_type=bulk")
}

func testIssue50043WithInitSQL(t *testing.T, initSQL string) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(initSQL)
// Test simplified case by update.
tk.MustExec("use test")
tk.MustExec("create table t (c1 boolean ,c2 decimal ( 37 , 17 ), unique key idx1 (c1 ,c2),unique key idx2 ( c1 ));")
Expand Down
14 changes: 10 additions & 4 deletions pkg/session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (txn *LazyTxn) initStmtBuf() {
}
buf := txn.Transaction.GetMemBuffer()
txn.initCnt = buf.Len()
txn.stagingHandle = buf.Staging()
if !txn.IsPipelined() {
txn.stagingHandle = buf.Staging()
}
}

// countHint is estimated count of mutations.
Expand All @@ -139,7 +141,9 @@ func (txn *LazyTxn) flushStmtBuf() {
}
}

buf.Release(txn.stagingHandle)
if !txn.IsPipelined() {
buf.Release(txn.stagingHandle)
}
txn.initCnt = buf.Len()
}

Expand All @@ -148,7 +152,9 @@ func (txn *LazyTxn) cleanupStmtBuf() {
return
}
buf := txn.Transaction.GetMemBuffer()
buf.Cleanup(txn.stagingHandle)
if !txn.IsPipelined() {
buf.Cleanup(txn.stagingHandle)
}
txn.initCnt = buf.Len()

txn.mu.Lock()
Expand Down Expand Up @@ -320,7 +326,7 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context, sctx sessionctx.Co
}

func (txn *LazyTxn) changeToInvalid() {
if txn.stagingHandle != kv.InvalidStagingHandle {
if txn.stagingHandle != kv.InvalidStagingHandle && !txn.IsPipelined() {
txn.Transaction.GetMemBuffer().Cleanup(txn.stagingHandle)
}
txn.stagingHandle = kv.InvalidStagingHandle
Expand Down
10 changes: 0 additions & 10 deletions pkg/store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,14 @@ func (m *memBuffer) GetFlags(key kv.Key) (kv.KeyFlags, error) {
}

func (m *memBuffer) Staging() kv.StagingHandle {
if m.isPipelinedDML {
// 0 stands for staging not supported.
return 0
}
return kv.StagingHandle(m.MemBuffer.Staging())
}

func (m *memBuffer) Cleanup(h kv.StagingHandle) {
if m.isPipelinedDML {
return
}
m.MemBuffer.Cleanup(int(h))
}

func (m *memBuffer) Release(h kv.StagingHandle) {
if m.isPipelinedDML {
return
}
m.MemBuffer.Release(int(h))
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/table/tables/mutation_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func CheckDataConsistency(
if t.Meta().GetPartitionInfo() != nil {
return nil
}
if txn.IsPipelined() {
return nil
}
if sh == 0 {
// some implementations of MemBuffer doesn't support staging, e.g. that in br/pkg/lightning/backend/kv
return nil
Expand Down
4 changes: 2 additions & 2 deletions tests/realtikvtest/pipelineddmltest/pipelineddml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,9 @@ func TestInsertIgnoreOnDuplicateKeyUpdate(t *testing.T) {
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int, unique index u1(a, b), unique index u2(a))")
tk.MustExec("insert into t1 values(0, 0), (1, 1)")
tk.MustExecToErr("insert ignore into t1 values (0, 2) ,(1, 3) on duplicate key update b = 5, a = 0")
tk.MustExec("insert ignore into t1 values (0, 2) ,(1, 3) on duplicate key update b = 5, a = 0")
// if the statement execute successful, the following check should pass.
// tk.MustQuery("select * from t1").Sort().Check(testkit.Rows("0 5", "1 1"))
tk.MustQuery("select * from t1").Sort().Check(testkit.Rows("0 5", "1 1"))
}

func TestConflictError(t *testing.T) {
Expand Down

0 comments on commit af89779

Please sign in to comment.