Skip to content

Commit

Permalink
executor: tiny optimize index-lookup query performance by reuse lite-…
Browse files Browse the repository at this point in the history
…cop-worker. (#58586)

ref #56649
  • Loading branch information
crazycs520 authored Dec 31, 2024
1 parent 432110b commit de2b7ac
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 8 deletions.
2 changes: 2 additions & 0 deletions pkg/distsql/context/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/tiflash",
"//pkg/util/topsql/stmtstats",
"@com_github_tikv_client_go_v2//kv",
"@org_uber_go_atomic//:atomic",
],
)

Expand All @@ -40,5 +41,6 @@ go_test(
"//pkg/util/tiflash",
"//pkg/util/topsql/stmtstats",
"@com_github_tikv_client_go_v2//kv",
"@org_uber_go_atomic//:atomic",
],
)
5 changes: 3 additions & 2 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
tikvstore "github.com/tikv/client-go/v2/kv"
"go.uber.org/atomic"
)

// DistSQLContext provides all information needed by using functions in `distsql`
Expand Down Expand Up @@ -86,8 +87,8 @@ type DistSQLContext struct {

ExecDetails *execdetails.SyncExecDetails

// Only one cop-reader can use lite worker. Using lite-worker in multiple readers will affect the concurrent execution of readers.
TryCopLiteWorker uint32
// Only one cop-reader can use lite worker at the same time. Using lite-worker in multiple readers will affect the concurrent execution of readers.
TryCopLiteWorker atomic.Uint32
}

// AppendWarning appends the warning to the warning handler.
Expand Down
4 changes: 3 additions & 1 deletion pkg/distsql/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
tikvstore "github.com/tikv/client-go/v2/kv"
"go.uber.org/atomic"
)

func TestContextDetach(t *testing.T) {
Expand Down Expand Up @@ -89,8 +90,9 @@ func TestContextDetach(t *testing.T) {
ReplicaClosestReadThreshold: 1,
ConnectionID: 1,
SessionAlias: "c",
TryCopLiteWorker: 1,
TryCopLiteWorker: atomic.Uint32{},
}
obj.TryCopLiteWorker.Store(1)

obj.AppendWarning(errors.New("test warning"))
deeptest.AssertRecursivelyNotEqual(t, obj, &DistSQLContext{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ type ClientSendOption struct {
EnableCollectExecutionInfo bool
TiFlashReplicaRead tiflash.ReplicaRead
AppendWarning func(warn error)
TryCopLiteWorker *uint32
TryCopLiteWorker *atomic.Uint32
}

// ReqTypes.
Expand Down
1 change: 1 addition & 0 deletions pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_library(
"@com_github_twmb_murmur3//:murmur3",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
Expand Down
15 changes: 11 additions & 4 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -733,6 +734,7 @@ type liteCopIteratorWorker struct {
ctx context.Context
worker *copIteratorWorker
batchCopRespList []*copResponse
tryCopLiteWorker *atomic2.Uint32
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
Expand Down Expand Up @@ -873,13 +875,14 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
}

// open starts workers and sender goroutines.
func (it *copIterator) open(ctx context.Context, tryCopLiteWorker *uint32) {
if len(it.tasks) == 1 && tryCopLiteWorker != nil && atomic.CompareAndSwapUint32(tryCopLiteWorker, 0, 1) {
func (it *copIterator) open(ctx context.Context, tryCopLiteWorker *atomic2.Uint32) {
if len(it.tasks) == 1 && tryCopLiteWorker != nil && tryCopLiteWorker.CompareAndSwap(0, 1) {
// For a query, only one `copIterator` can use `liteWorker`, otherwise it will affect the performance of multiple cop iterators executed concurrently,
// see more detail in TestQueryWithConcurrentSmallCop.
it.liteWorker = &liteCopIteratorWorker{
ctx: ctx,
worker: newCopIteratorWorker(it, nil),
ctx: ctx,
worker: newCopIteratorWorker(it, nil),
tryCopLiteWorker: tryCopLiteWorker,
}
return
}
Expand Down Expand Up @@ -1195,6 +1198,10 @@ func (w *liteCopIteratorWorker) liteSendReq(ctx context.Context, it *copIterator
} else {
it.tasks = it.tasks[1:]
}
if len(it.tasks) == 0 {
// if all tasks are finished, reset tryCopLiteWorker to 0 to make future request can reuse copLiteWorker.
w.tryCopLiteWorker.Store(0)
}
if result != nil {
if result.resp != nil {
w.batchCopRespList = result.batchRespList
Expand Down

0 comments on commit de2b7ac

Please sign in to comment.