Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: use a more accurate type of the context of distsql #51618

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64,
SetStartTS(startTS).
SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}).
SetKeepOrder(true).
SetFromSessionVars(c.SessionContext.GetSessionVars()).
SetFromSessionVars(c.SessionContext.GetDistSQLCtx()).
SetFromInfoSchema(c.SessionContext.GetDomainInfoSchema()).
SetConcurrency(1).
Build()
Expand All @@ -284,7 +284,7 @@ func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64,
if err != nil {
return nil, err
}
return distsql.Select(ctx, c.SessionContext, kvReq, c.FieldTypes)
return distsql.Select(ctx, c.SessionContext.GetDistSQLCtx(), kvReq, c.FieldTypes)
}

func fetchTableScanResult(
Expand Down Expand Up @@ -353,7 +353,7 @@ func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*m
return nil, err
}
dagReq.Executors = append(dagReq.Executors, execPB)
distsql.SetEncodeType(sCtx, dagReq)
distsql.SetEncodeType(sCtx.GetDistSQLCtx(), dagReq)
return dagReq, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func buildDescTableScanDAG(ctx sessionctx.Context, tbl table.PhysicalTable, hand
tblScanExec := constructDescTableScanPB(tbl.GetPhysicalID(), tbl.Meta(), handleCols)
dagReq.Executors = append(dagReq.Executors, tblScanExec)
dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit))
distsql.SetEncodeType(ctx, dagReq)
distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq)
return dagReq, nil
}

Expand Down Expand Up @@ -528,7 +528,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.
} else {
ranges = ranger.FullIntRange(false)
}
builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges)
builder = b.SetHandleRanges(sctx.GetDistSQLCtx(), tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges)
builder.SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeepOrder(true).
Expand All @@ -547,7 +547,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.
return nil, errors.Trace(err)
}

result, err := distsql.Select(ctx.ddlJobCtx, sctx, kvReq, getColumnsTypes(handleCols))
result, err := distsql.Select(ctx.ddlJobCtx, sctx.GetDistSQLCtx(), kvReq, getColumnsTypes(handleCols))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/util",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/store/copr",
"//pkg/tablecodec",
Expand All @@ -36,6 +35,7 @@ go_library(
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/ranger",
"//pkg/util/topsql/stmtstats",
"//pkg/util/tracing",
"//pkg/util/trxevents",
"@com_github_pingcap_errors//:errors",
Expand All @@ -58,6 +58,7 @@ go_test(
timeout = "short",
srcs = [
"bench_test.go",
"context_test.go",
"distsql_test.go",
"main_test.go",
"request_builder_test.go",
Expand All @@ -68,7 +69,9 @@ go_test(
race = "on",
shard_count = 26,
deps = [
"//pkg/distsql/context",
"//pkg/domain/resourcegroup",
"//pkg/errctx",
"//pkg/kv",
"//pkg/parser/charset",
"//pkg/parser/model",
Expand All @@ -84,6 +87,7 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/collate",
"//pkg/util/context",
"//pkg/util/disk",
"//pkg/util/execdetails",
"//pkg/util/memory",
Expand Down
12 changes: 11 additions & 1 deletion pkg/distsql/context/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/distsql/context",
visibility = ["//visibility:public"],
deps = [
"//pkg/domain/resourcegroup",
"//pkg/errctx",
"//pkg/kv",
"//pkg/sessionctx/variable",
"//pkg/parser/mysql",
"//pkg/util/execdetails",
"//pkg/util/memory",
"//pkg/util/nocopy",
"//pkg/util/sqlkiller",
"//pkg/util/tiflash",
"//pkg/util/topsql/stmtstats",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//tikvrpc",
],
)
76 changes: 69 additions & 7 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,76 @@
package context

import (
"time"

"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/nocopy"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
)

// DistSQLContext gives the interface
type DistSQLContext interface {
// GetSessionVars gets the session variables.
GetSessionVars() *variable.SessionVars
// GetClient gets a kv.Client.
GetClient() kv.Client
// DistSQLContext provides all information needed by using functions in `distsql`
type DistSQLContext struct {
// TODO: provide a `Clone` to copy this struct.
// The life cycle of some fields in this struct cannot be extended. For example, some fields will be recycled before
// the next execution. They'll need to be handled specially.
_ nocopy.NoCopy

AppendWarning func(error)
InRestrictedSQL bool
Client kv.Client

EnabledRateLimitAction bool
EnableChunkRPC bool
OriginalSQL string
KVVars *tikvstore.Variables
KvExecCounter *stmtstats.KvExecCounter
SessionMemTracker *memory.Tracker

Location *time.Location
RuntimeStatsColl *execdetails.RuntimeStatsColl
SQLKiller *sqlkiller.SQLKiller
ErrCtx errctx.Context

// TiFlash related configurations
TiFlashReplicaRead tiflash.ReplicaRead
TiFlashMaxThreads int64
TiFlashMaxBytesBeforeExternalJoin int64
TiFlashMaxBytesBeforeExternalGroupBy int64
TiFlashMaxBytesBeforeExternalSort int64
TiFlashMaxQueryMemoryPerNode int64
TiFlashQuerySpillRatio float64

DistSQLConcurrency int
ReplicaReadType kv.ReplicaReadType
WeakConsistency bool
RCCheckTS bool
NotFillCache bool
TaskID uint64
Priority mysql.PriorityEnum
ResourceGroupTagger tikvrpc.ResourceGroupTagger
EnablePaging bool
MinPagingSize int
MaxPagingSize int
RequestSourceType string
ExplicitRequestSourceType string
StoreBatchSize int
ResourceGroupName string
LoadBasedReplicaReadThreshold time.Duration
RunawayChecker *resourcegroup.RunawayChecker
TiKVClientReadTimeout uint64

ReplicaClosestReadThreshold int64
ConnectionID uint64
SessionAlias string

ExecDetails *execdetails.SyncExecDetails
}
46 changes: 46 additions & 0 deletions pkg/distsql/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package distsql

import (
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
contextutil "github.com/pingcap/tidb/pkg/util/context"
)

// NewDistSQLContextForTest creates a new dist sql context for test
func NewDistSQLContextForTest() *distsqlctx.DistSQLContext {
return &distsqlctx.DistSQLContext{
AppendWarning: func(error) {},
TiFlashMaxThreads: variable.DefTiFlashMaxThreads,
TiFlashMaxBytesBeforeExternalJoin: variable.DefTiFlashMaxBytesBeforeExternalJoin,
TiFlashMaxBytesBeforeExternalGroupBy: variable.DefTiFlashMaxBytesBeforeExternalGroupBy,
TiFlashMaxBytesBeforeExternalSort: variable.DefTiFlashMaxBytesBeforeExternalSort,
TiFlashMaxQueryMemoryPerNode: variable.DefTiFlashMemQuotaQueryPerNode,
TiFlashQuerySpillRatio: variable.DefTiFlashQuerySpillRatio,

DistSQLConcurrency: variable.DefDistSQLScanConcurrency,
MinPagingSize: variable.DefMinPagingSize,
MaxPagingSize: variable.DefMaxPagingSize,
ResourceGroupName: "default",

ErrCtx: errctx.NewContext(contextutil.IgnoreWarn),
}
}

// DefaultDistSQLContext is an empty distsql context used for testing, which doesn't have a client and cannot be used to
// send requests.
var DefaultDistSQLContext = NewDistSQLContextForTest()
Loading