From de2b7ac04189d61c84cd19bff893ce69b76dbc46 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 31 Dec 2024 17:25:40 +0800 Subject: [PATCH] executor: tiny optimize index-lookup query performance by reuse lite-cop-worker. (#58586) ref pingcap/tidb#56649 --- pkg/distsql/context/BUILD.bazel | 2 ++ pkg/distsql/context/context.go | 5 +++-- pkg/distsql/context/context_test.go | 4 +++- pkg/kv/kv.go | 2 +- pkg/store/copr/BUILD.bazel | 1 + pkg/store/copr/coprocessor.go | 15 +++++++++++---- 6 files changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/distsql/context/BUILD.bazel b/pkg/distsql/context/BUILD.bazel index 635681bb46e71..9e22487cf15ed 100644 --- a/pkg/distsql/context/BUILD.bazel +++ b/pkg/distsql/context/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/util/tiflash", "//pkg/util/topsql/stmtstats", "@com_github_tikv_client_go_v2//kv", + "@org_uber_go_atomic//:atomic", ], ) @@ -40,5 +41,6 @@ go_test( "//pkg/util/tiflash", "//pkg/util/topsql/stmtstats", "@com_github_tikv_client_go_v2//kv", + "@org_uber_go_atomic//:atomic", ], ) diff --git a/pkg/distsql/context/context.go b/pkg/distsql/context/context.go index fdebf09b61484..346df2975d319 100644 --- a/pkg/distsql/context/context.go +++ b/pkg/distsql/context/context.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/util/tiflash" "github.com/pingcap/tidb/pkg/util/topsql/stmtstats" tikvstore "github.com/tikv/client-go/v2/kv" + "go.uber.org/atomic" ) // DistSQLContext provides all information needed by using functions in `distsql` @@ -86,8 +87,8 @@ type DistSQLContext struct { ExecDetails *execdetails.SyncExecDetails - // Only one cop-reader can use lite worker. Using lite-worker in multiple readers will affect the concurrent execution of readers. - TryCopLiteWorker uint32 + // Only one cop-reader can use lite worker at the same time. Using lite-worker in multiple readers will affect the concurrent execution of readers. + TryCopLiteWorker atomic.Uint32 } // AppendWarning appends the warning to the warning handler. diff --git a/pkg/distsql/context/context_test.go b/pkg/distsql/context/context_test.go index 92cccd5d3c1b1..afce497298608 100644 --- a/pkg/distsql/context/context_test.go +++ b/pkg/distsql/context/context_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/util/tiflash" "github.com/pingcap/tidb/pkg/util/topsql/stmtstats" tikvstore "github.com/tikv/client-go/v2/kv" + "go.uber.org/atomic" ) func TestContextDetach(t *testing.T) { @@ -89,8 +90,9 @@ func TestContextDetach(t *testing.T) { ReplicaClosestReadThreshold: 1, ConnectionID: 1, SessionAlias: "c", - TryCopLiteWorker: 1, + TryCopLiteWorker: atomic.Uint32{}, } + obj.TryCopLiteWorker.Store(1) obj.AppendWarning(errors.New("test warning")) deeptest.AssertRecursivelyNotEqual(t, obj, &DistSQLContext{}, diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index e01152f7dd643..92255dbabf822 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -327,7 +327,7 @@ type ClientSendOption struct { EnableCollectExecutionInfo bool TiFlashReplicaRead tiflash.ReplicaRead AppendWarning func(warn error) - TryCopLiteWorker *uint32 + TryCopLiteWorker *atomic.Uint32 } // ReqTypes. diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 299da27a89b5d..b10166d0362c1 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -67,6 +67,7 @@ go_library( "@com_github_twmb_murmur3//:murmur3", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", + "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 3ed1a1f0a1163..ebb4a9ca892b5 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -63,6 +63,7 @@ import ( "github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/util" + atomic2 "go.uber.org/atomic" "go.uber.org/zap" ) @@ -733,6 +734,7 @@ type liteCopIteratorWorker struct { ctx context.Context worker *copIteratorWorker batchCopRespList []*copResponse + tryCopLiteWorker *atomic2.Uint32 } // copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan. @@ -873,13 +875,14 @@ func (worker *copIteratorWorker) run(ctx context.Context) { } // open starts workers and sender goroutines. -func (it *copIterator) open(ctx context.Context, tryCopLiteWorker *uint32) { - if len(it.tasks) == 1 && tryCopLiteWorker != nil && atomic.CompareAndSwapUint32(tryCopLiteWorker, 0, 1) { +func (it *copIterator) open(ctx context.Context, tryCopLiteWorker *atomic2.Uint32) { + if len(it.tasks) == 1 && tryCopLiteWorker != nil && tryCopLiteWorker.CompareAndSwap(0, 1) { // For a query, only one `copIterator` can use `liteWorker`, otherwise it will affect the performance of multiple cop iterators executed concurrently, // see more detail in TestQueryWithConcurrentSmallCop. it.liteWorker = &liteCopIteratorWorker{ - ctx: ctx, - worker: newCopIteratorWorker(it, nil), + ctx: ctx, + worker: newCopIteratorWorker(it, nil), + tryCopLiteWorker: tryCopLiteWorker, } return } @@ -1195,6 +1198,10 @@ func (w *liteCopIteratorWorker) liteSendReq(ctx context.Context, it *copIterator } else { it.tasks = it.tasks[1:] } + if len(it.tasks) == 0 { + // if all tasks are finished, reset tryCopLiteWorker to 0 to make future request can reuse copLiteWorker. + w.tryCopLiteWorker.Store(0) + } if result != nil { if result.resp != nil { w.batchCopRespList = result.batchRespList