From 77b4e4028cff7c25c1db0582acf0e5685251ff67 Mon Sep 17 00:00:00 2001 From: Xiang Zhang Date: Tue, 21 Dec 2021 15:23:47 +0800 Subject: [PATCH 01/11] github: add issue requirement to pull request template (#30817) close pingcap/tidb#30814 --- .github/pull_request_template.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 3eef4ea0a6587..b8f7a3b6eeca5 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -9,6 +9,16 @@ PR Title Format: --> ### What problem does this PR solve? + Issue Number: close #xxx From fe1aaf2fd730870d962e8b188e57c19ad6a305e3 Mon Sep 17 00:00:00 2001 From: Yexiang Zhang Date: Tue, 21 Dec 2021 15:43:47 +0800 Subject: [PATCH 02/11] topsql: introduce stmtstats and sql execution count (#30277) --- distsql/distsql.go | 24 +- distsql/distsql_test.go | 2 +- executor/adapter.go | 31 +++ executor/analyze.go | 6 +- executor/batch_point_get.go | 1 + executor/checksum.go | 2 +- executor/executor.go | 8 + executor/insert.go | 1 + executor/point_get.go | 1 + executor/replace.go | 1 + executor/update.go | 4 + go.mod | 2 +- go.sum | 4 +- kv/option.go | 4 +- session/session.go | 7 + sessionctx/stmtctx/stmtctx.go | 7 + sessionctx/variable/session.go | 12 +- store/driver/txn/snapshot.go | 3 + store/driver/txn/txn_driver.go | 3 + util/topsql/stmtstats/aggregator.go | 156 +++++++++++++ util/topsql/stmtstats/aggregator_test.go | 93 ++++++++ util/topsql/stmtstats/kv_exec_count.go | 73 ++++++ util/topsql/stmtstats/kv_exec_count_test.go | 43 ++++ util/topsql/stmtstats/main_test.go | 27 +++ util/topsql/stmtstats/stmtstats.go | 219 ++++++++++++++++++ util/topsql/stmtstats/stmtstats_test.go | 187 +++++++++++++++ .../stmtstats/stmtstatstest/main_test.go | 31 +++ .../stmtstats/stmtstatstest/stmtstats_test.go | 150 ++++++++++++ util/topsql/topsql.go | 3 + 29 files changed, 1092 insertions(+), 13 deletions(-) create mode 100644 util/topsql/stmtstats/aggregator.go create mode 100644 util/topsql/stmtstats/aggregator_test.go create mode 100644 util/topsql/stmtstats/kv_exec_count.go create mode 100644 util/topsql/stmtstats/kv_exec_count_test.go create mode 100644 util/topsql/stmtstats/main_test.go create mode 100644 util/topsql/stmtstats/stmtstats.go create mode 100644 util/topsql/stmtstats/stmtstats_test.go create mode 100644 util/topsql/stmtstats/stmtstatstest/main_test.go create mode 100644 util/topsql/stmtstats/stmtstatstest/stmtstats_test.go diff --git a/distsql/distsql.go b/distsql/distsql.go index 2f952da2a7d3c..77b75efc480fd 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -23,17 +23,20 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "go.uber.org/zap" ) // DispatchMPPTasks dispatches all tasks and returns an iterator. func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) { + ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx) _, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback) if resp == nil { @@ -88,6 +91,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie zap.String("stmt", originalSQL)) } } + + ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx) resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb) if resp == nil { return nil, errors.New("client returns nil response") @@ -149,8 +154,9 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq // Analyze do a analyze request. func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars interface{}, - isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) { - resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil) + isRestrict bool, stmtCtx *stmtctx.StatementContext) (SelectResult, error) { + ctx = WithSQLKvExecCounterInterceptor(ctx, stmtCtx) + resp := client.Send(ctx, kvReq, vars, stmtCtx.MemTracker, false, nil) if resp == nil { return nil, errors.New("client returns nil response") } @@ -244,3 +250,15 @@ func init() { systemEndian = tipb.Endian_LittleEndian } } + +// WithSQLKvExecCounterInterceptor binds an interceptor for client-go to count the +// number of SQL executions of each TiKV (if any). +func WithSQLKvExecCounterInterceptor(ctx context.Context, stmtCtx *stmtctx.StatementContext) context.Context { + if variable.TopSQLEnabled() && stmtCtx.KvExecCounter != nil { + // Unlike calling Transaction or Snapshot interface, in distsql package we directly + // face tikv Request. So we need to manually bind RPCInterceptor to ctx. Instead of + // calling SetRPCInterceptor on Transaction or Snapshot. + return interceptor.WithRPCInterceptor(ctx, stmtCtx.KvExecCounter.RPCInterceptor()) + } + return ctx +} diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 6ff4a5cb284e5..9be2738da2251 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -191,7 +191,7 @@ func TestAnalyze(t *testing.T) { Build() require.NoError(t, err) - response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx.MemTracker) + response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx) require.NoError(t, err) result, ok := response.(*selectResult) diff --git a/executor/adapter.go b/executor/adapter.go index 87f87a9712516..39e660099ed3f 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -233,6 +233,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec ctx = opentracing.ContextWithSpan(ctx, span1) } ctx = a.setPlanLabelForTopSQL(ctx) + a.observeStmtBeginForTopSQL() startTs := uint64(math.MaxUint64) err := a.Ctx.InitTxnWithStartTS(startTs) if err != nil { @@ -383,6 +384,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { } // ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`. ctx = a.setPlanLabelForTopSQL(ctx) + a.observeStmtBeginForTopSQL() if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -896,6 +898,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo // `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`. a.LogSlowQuery(txnTS, succ, hasMoreResults) a.SummaryStmt(succ) + a.observeStmtFinishedForTopSQL() if sessVars.StmtCtx.IsTiFlash.Load() { if succ { totalTiFlashQuerySuccCounter.Inc() @@ -1247,3 +1250,31 @@ func (a *ExecStmt) GetTextToLog() string { } return sql } + +func (a *ExecStmt) observeStmtBeginForTopSQL() { + if vars := a.Ctx.GetSessionVars(); variable.TopSQLEnabled() && vars.StmtStats != nil { + sqlDigest, planDigest := a.getSQLPlanDigest() + vars.StmtStats.OnExecutionBegin(sqlDigest, planDigest) + // This is a special logic prepared for TiKV's SQLExecCount. + vars.StmtCtx.KvExecCounter = vars.StmtStats.CreateKvExecCounter(sqlDigest, planDigest) + } +} + +func (a *ExecStmt) observeStmtFinishedForTopSQL() { + if vars := a.Ctx.GetSessionVars(); variable.TopSQLEnabled() && vars.StmtStats != nil { + sqlDigest, planDigest := a.getSQLPlanDigest() + vars.StmtStats.OnExecutionFinished(sqlDigest, planDigest) + } +} + +func (a *ExecStmt) getSQLPlanDigest() ([]byte, []byte) { + var sqlDigest, planDigest []byte + vars := a.Ctx.GetSessionVars() + if _, d := vars.StmtCtx.SQLDigest(); d != nil { + sqlDigest = d.Bytes() + } + if _, d := vars.StmtCtx.GetPlanDigest(); d != nil { + planDigest = d.Bytes() + } + return sqlDigest, planDigest +} diff --git a/executor/analyze.go b/executor/analyze.go index 5397c1ee0608c..c8f91ccc13f63 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -400,7 +400,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang return err } ctx := context.TODO() - result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker) + result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx) if err != nil { return err } @@ -763,7 +763,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe return nil, err } ctx := context.TODO() - result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker) + result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx) if err != nil { return nil, err } @@ -1854,6 +1854,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) + setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), snapshot) for _, t := range e.scanTasks { iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey)) if err != nil { @@ -1875,6 +1876,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { snapshot.SetOption(kv.IsolationLevel, kv.SI) snapshot.SetOption(kv.Priority, kv.PriorityLow) setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) + setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), snapshot) readReplicaType := e.ctx.GetSessionVars().GetReplicaRead() if readReplicaType.IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, readReplicaType) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index c30bf507d6d9d..f642cf92d56ff 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -150,6 +150,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { }) } setResourceGroupTaggerForTxn(stmtCtx, snapshot) + setRPCInterceptorOfExecCounterForTxn(sessVars, snapshot) var batchGetter kv.BatchGetter = snapshot if txn.Valid() { lock := e.tblInfo.Lock diff --git a/executor/checksum.go b/executor/checksum.go index 69fd6ed319e75..013fd3be2226f 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -128,7 +128,7 @@ func (e *ChecksumTableExec) checksumWorker(taskCh <-chan *checksumTask, resultCh } func (e *ChecksumTableExec) handleChecksumRequest(req *kv.Request) (resp *tipb.ChecksumResponse, err error) { - ctx := context.TODO() + ctx := distsql.WithSQLKvExecCounterInterceptor(context.TODO(), e.ctx.GetSessionVars().StmtCtx) res, err := distsql.Checksum(ctx, e.ctx.GetClient(), req, e.ctx.GetSessionVars().KVVars) if err != nil { return nil, err diff --git a/executor/executor.go b/executor/executor.go index eee07f8774ed0..4338529cea8ac 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1914,3 +1914,11 @@ func setResourceGroupTaggerForTxn(sc *stmtctx.StatementContext, snapshot kv.Snap snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger()) } } + +// setRPCInterceptorOfExecCounterForTxn binds an interceptor for client-go to count +// the number of SQL executions of each TiKV. +func setRPCInterceptorOfExecCounterForTxn(vars *variable.SessionVars, snapshot kv.Snapshot) { + if snapshot != nil && variable.TopSQLEnabled() && vars.StmtCtx.KvExecCounter != nil { + snapshot.SetOption(kv.RPCInterceptor, vars.StmtCtx.KvExecCounter.RPCInterceptor()) + } +} diff --git a/executor/insert.go b/executor/insert.go index 2862416ddf04a..7b0758c7ff076 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -67,6 +67,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { return err } setResourceGroupTaggerForTxn(sessVars.StmtCtx, txn) + setRPCInterceptorOfExecCounterForTxn(sessVars, txn) txnSize := txn.Size() sessVars.StmtCtx.AddRecordRows(uint64(len(rows))) // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. diff --git a/executor/point_get.go b/executor/point_get.go index 45f3fa76e263f..698626b4e1403 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -191,6 +191,7 @@ func (e *PointGetExecutor) Open(context.Context) error { } }) setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot) + setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), e.snapshot) return nil } diff --git a/executor/replace.go b/executor/replace.go index 78e0085aa520e..fe1930639f446 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -223,6 +223,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { } } setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, txn) + setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), txn) prefetchStart := time.Now() // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. diff --git a/executor/update.go b/executor/update.go index 7df144b28196c..16024bc403fa1 100644 --- a/executor/update.go +++ b/executor/update.go @@ -275,6 +275,10 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { txn, err := e.ctx.Txn(true) if err == nil { txn.SetOption(kv.ResourceGroupTagger, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) + if e.ctx.GetSessionVars().StmtCtx.KvExecCounter != nil { + // Bind an interceptor for client-go to count the number of SQL executions of each TiKV. + txn.SetOption(kv.RPCInterceptor, e.ctx.GetSessionVars().StmtCtx.KvExecCounter.RPCInterceptor()) + } } } for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { diff --git a/go.mod b/go.mod index 3c2868df8b118..6f2a22ed5dd5d 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 + github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0 github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee github.com/twmb/murmur3 v1.1.3 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index a6b2d60e60a01..36c8604668b6b 100644 --- a/go.sum +++ b/go.sum @@ -712,8 +712,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 h1:B+cAIm2P1/SNsVV1vL9/mRaGUVl/vdgV8MU03O0vY28= -github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0 h1:38Jst/O36MKXAt7aD1Ipnx4nKwclG66ifkcmi4f0NZ4= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI= diff --git a/kv/option.go b/kv/option.go index 2a7a17fedcb6c..f3247b5b31b2c 100644 --- a/kv/option.go +++ b/kv/option.go @@ -68,12 +68,14 @@ const ( ResourceGroupTagger // KVFilter indicates the filter to ignore key-values in the transaction's memory buffer. KVFilter - // SnapInterceptor is used for setting the interceptor for snapshot SnapInterceptor // CommitTSUpperBoundChec is used by cached table // The commitTS must be greater than all the write lock lease of the visited cached table. CommitTSUpperBoundCheck + // RPCInterceptor is interceptor.RPCInterceptor on Transaction or Snapshot, used to decorate + // additional logic before and after the underlying client-go RPC request. + RPCInterceptor ) // ReplicaReadType is the type of replica to read data from diff --git a/session/session.go b/session/session.go index a7976a5001bee..b7171a79c9525 100644 --- a/session/session.go +++ b/session/session.go @@ -548,6 +548,10 @@ func (s *session) doCommit(ctx context.Context) error { s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit) s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC) s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger()) + if sessVars.StmtCtx.KvExecCounter != nil { + // Bind an interceptor for client-go to count the number of SQL executions of each TiKV. + s.txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor()) + } // priority of the sysvar is lower than `start transaction with causal consistency only` if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) { // We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions @@ -2311,6 +2315,9 @@ func (s *session) Close() { s.RollbackTxn(ctx) if s.sessionVars != nil { s.sessionVars.WithdrawAllPreparedStmt() + if s.sessionVars.StmtStats != nil { + s.sessionVars.StmtStats.SetFinished() + } } s.ClearDiskFullOpt() } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index e41eb4766b47b..a064f891dd854 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/resourcegrouptag" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" @@ -207,6 +208,12 @@ type StatementContext struct { // WaitLockLeaseTime is the duration of cached table read lease expiration time. WaitLockLeaseTime time.Duration + + // KvExecCounter is created from SessionVars.StmtStats to count the number of SQL + // executions of the kv layer during the current execution of the statement. + // Its life cycle is limited to this execution, and a new KvExecCounter is + // always created during each statement execution. + KvExecCounter *stmtstats.KvExecCounter } // StmtHints are SessionVars related sql hints. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 8169eaa5c2d66..d03cbdce86fde 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -29,8 +29,6 @@ import ( "sync/atomic" "time" - utilMath "github.com/pingcap/tidb/util/math" - "github.com/pingcap/errors" pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" "github.com/pingcap/tidb/config" @@ -48,10 +46,12 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" + utilMath "github.com/pingcap/tidb/util/math" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" + "github.com/pingcap/tidb/util/topsql/stmtstats" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/twmb/murmur3" @@ -969,6 +969,13 @@ type SessionVars struct { // EnablePaging indicates whether enable paging in coprocessor requests. EnablePaging bool + + // StmtStats is used to count various indicators of each SQL in this session + // at each point in time. These data will be periodically taken away by the + // background goroutine. The background goroutine will continue to aggregate + // all the local data in each session, and finally report them to the remote + // regularly. + StmtStats *stmtstats.StatementStats } // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. @@ -1203,6 +1210,7 @@ func NewSessionVars() *SessionVars { MPPStoreFailTTL: DefTiDBMPPStoreFailTTL, EnablePlacementChecks: DefEnablePlacementCheck, Rng: utilMath.NewWithTime(), + StmtStats: stmtstats.CreateStatementStats(), } vars.KVVars = tikvstore.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ diff --git a/store/driver/txn/snapshot.go b/store/driver/txn/snapshot.go index 3c372bae83725..28b73e15e3228 100644 --- a/store/driver/txn/snapshot.go +++ b/store/driver/txn/snapshot.go @@ -23,6 +23,7 @@ import ( derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/driver/options" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/txnkv/txnutil" ) @@ -120,6 +121,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) { s.KVSnapshot.SetReadReplicaScope(val.(string)) case kv.SnapInterceptor: s.interceptor = val.(kv.SnapshotInterceptor) + case kv.RPCInterceptor: + s.KVSnapshot.SetRPCInterceptor(val.(interceptor.RPCInterceptor)) } } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 717bf3b154761..bb9e38a4f3c03 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -32,6 +32,7 @@ import ( tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" ) @@ -232,6 +233,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.snapshotInterceptor = val.(kv.SnapshotInterceptor) case kv.CommitTSUpperBoundCheck: txn.KVTxn.SetCommitTSUpperBoundCheck(val.(func(commitTS uint64) bool)) + case kv.RPCInterceptor: + txn.KVTxn.SetRPCInterceptor(val.(interceptor.RPCInterceptor)) } } diff --git a/util/topsql/stmtstats/aggregator.go b/util/topsql/stmtstats/aggregator.go new file mode 100644 index 0000000000000..d78ed7b62dafb --- /dev/null +++ b/util/topsql/stmtstats/aggregator.go @@ -0,0 +1,156 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "context" + "sync" + "time" + + "go.uber.org/atomic" +) + +// globalAggregator is global *aggregator. +var globalAggregator = newAggregator() + +// StatementStatsRecord is the merged StatementStatsMap with timestamp. +type StatementStatsRecord struct { + Timestamp int64 + Data StatementStatsMap +} + +// aggregator is used to collect and aggregate data from all StatementStats. +// It is responsible for collecting data from all StatementStats, aggregating +// them together, uploading them and regularly cleaning up the closed StatementStats. +type aggregator struct { + ctx context.Context + cancel context.CancelFunc + statsSet sync.Map // map[*StatementStats]struct{} + collectors sync.Map // map[Collector]struct{} + running *atomic.Bool +} + +// newAggregator creates an empty aggregator. +func newAggregator() *aggregator { + return &aggregator{running: atomic.NewBool(false)} +} + +// run will block the current goroutine and execute the main loop of aggregator. +func (m *aggregator) run() { + m.ctx, m.cancel = context.WithCancel(context.Background()) + m.running.Store(true) + defer func() { + m.running.Store(false) + }() + tick := time.NewTicker(time.Second) + defer tick.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-tick.C: + m.aggregate() + } + } +} + +// aggregate data from all associated StatementStats. +// If StatementStats has been closed, collect will remove it from the map. +func (m *aggregator) aggregate() { + r := StatementStatsRecord{ + Timestamp: time.Now().Unix(), + Data: StatementStatsMap{}, + } + m.statsSet.Range(func(statsR, _ interface{}) bool { + stats := statsR.(*StatementStats) + if stats.Finished() { + m.unregister(stats) + } + r.Data.Merge(stats.Take()) + return true + }) + m.collectors.Range(func(c, _ interface{}) bool { + c.(Collector).CollectStmtStatsRecords([]StatementStatsRecord{r}) + return true + }) +} + +// register binds StatementStats to aggregator. +// register is thread-safe. +func (m *aggregator) register(stats *StatementStats) { + m.statsSet.Store(stats, struct{}{}) +} + +// unregister removes StatementStats from aggregator. +// unregister is thread-safe. +func (m *aggregator) unregister(stats *StatementStats) { + m.statsSet.Delete(stats) +} + +// registerCollector binds a Collector to aggregator. +// registerCollector is thread-safe. +func (m *aggregator) registerCollector(collector Collector) { + m.collectors.Store(collector, struct{}{}) +} + +// unregisterCollector removes Collector from aggregator. +// unregisterCollector is thread-safe. +func (m *aggregator) unregisterCollector(collector Collector) { + m.collectors.Delete(collector) +} + +// close ends the execution of the current aggregator. +func (m *aggregator) close() { + m.cancel() +} + +// closed returns whether the aggregator has been closed. +func (m *aggregator) closed() bool { + return !m.running.Load() +} + +// SetupAggregator is used to initialize the background aggregator goroutine of the stmtstats module. +// SetupAggregator is **not** thread-safe. +func SetupAggregator() { + if globalAggregator.closed() { + go globalAggregator.run() + } +} + +// CloseAggregator is used to stop the background aggregator goroutine of the stmtstats module. +// SetupAggregator is **not** thread-safe. +func CloseAggregator() { + if !globalAggregator.closed() { + globalAggregator.close() + } +} + +// RegisterCollector binds a Collector to globalAggregator. +// RegisterCollector is thread-safe. +func RegisterCollector(collector Collector) { + globalAggregator.registerCollector(collector) +} + +// UnregisterCollector removes Collector from globalAggregator. +// UnregisterCollector is thread-safe. +func UnregisterCollector(collector Collector) { + globalAggregator.unregisterCollector(collector) +} + +// Collector is used to collect StatementStatsRecord. +type Collector interface { + // CollectStmtStatsRecords is used to collect list of StatementStatsRecord. + CollectStmtStatsRecords([]StatementStatsRecord) +} diff --git a/util/topsql/stmtstats/aggregator_test.go b/util/topsql/stmtstats/aggregator_test.go new file mode 100644 index 0000000000000..24a72bb89131d --- /dev/null +++ b/util/topsql/stmtstats/aggregator_test.go @@ -0,0 +1,93 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func Test_SetupCloseAggregator(t *testing.T) { + for n := 0; n < 3; n++ { + SetupAggregator() + time.Sleep(100 * time.Millisecond) + assert.False(t, globalAggregator.closed()) + CloseAggregator() + time.Sleep(100 * time.Millisecond) + assert.True(t, globalAggregator.closed()) + } +} + +func Test_RegisterUnregisterCollector(t *testing.T) { + SetupAggregator() + defer CloseAggregator() + time.Sleep(100 * time.Millisecond) + collector := newMockCollector(func(records []StatementStatsRecord) {}) + RegisterCollector(collector) + _, ok := globalAggregator.collectors.Load(collector) + assert.True(t, ok) + UnregisterCollector(collector) + _, ok = globalAggregator.collectors.Load(collector) + assert.False(t, ok) +} + +func Test_aggregator_register_collect(t *testing.T) { + a := newAggregator() + stats := &StatementStats{ + data: StatementStatsMap{}, + finished: atomic.NewBool(false), + } + a.register(stats) + stats.OnExecutionBegin([]byte("SQL-1"), []byte("")) + var records []StatementStatsRecord + a.registerCollector(newMockCollector(func(rs []StatementStatsRecord) { + records = append(records, rs...) + })) + a.aggregate() + assert.NotEmpty(t, records) + assert.Equal(t, uint64(1), records[0].Data[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) +} + +func Test_aggregator_run_close(t *testing.T) { + wg := sync.WaitGroup{} + a := newAggregator() + assert.True(t, a.closed()) + wg.Add(1) + go func() { + a.run() + wg.Done() + }() + time.Sleep(100 * time.Millisecond) + assert.False(t, a.closed()) + a.close() + wg.Wait() + assert.True(t, a.closed()) +} + +type mockCollector struct { + f func(records []StatementStatsRecord) +} + +func newMockCollector(f func(records []StatementStatsRecord)) Collector { + return &mockCollector{f: f} +} + +func (c *mockCollector) CollectStmtStatsRecords(records []StatementStatsRecord) { + c.f(records) +} diff --git a/util/topsql/stmtstats/kv_exec_count.go b/util/topsql/stmtstats/kv_exec_count.go new file mode 100644 index 0000000000000..7da4dc8eebdcd --- /dev/null +++ b/util/topsql/stmtstats/kv_exec_count.go @@ -0,0 +1,73 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "sync" + + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" +) + +// CreateKvExecCounter creates an associated KvExecCounter from StatementStats. +// The created KvExecCounter can only be used during a single statement execution +// and cannot be reused. +func (s *StatementStats) CreateKvExecCounter(sqlDigest, planDigest []byte) *KvExecCounter { + return &KvExecCounter{ + stats: s, + digest: SQLPlanDigest{SQLDigest: BinaryDigest(sqlDigest), PlanDigest: BinaryDigest(planDigest)}, + marked: map[string]struct{}{}, + } +} + +// KvExecCounter is used to count the number of SQL executions of the kv layer. +// It internally calls addKvExecCount of StatementStats at the right time, to +// ensure the semantic of "SQL execution count of TiKV". +type KvExecCounter struct { + stats *StatementStats + digest SQLPlanDigest + mu sync.Mutex + marked map[string]struct{} // HashSet +} + +// RPCInterceptor returns an interceptor.RPCInterceptor for client-go. +// The returned interceptor is generally expected to be bind to transaction or +// snapshot. In this way, the logic preset by KvExecCounter will be executed before +// each RPC request is initiated, in order to count the number of SQL executions of +// the TiKV dimension. +func (c *KvExecCounter) RPCInterceptor() interceptor.RPCInterceptor { + return func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { + return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + c.mark(target) + return next(target, req) + } + } +} + +// mark this target during the current execution of statement. +// If this target is marked for the first time, then increase the number of execution. +// mark is thread-safe. +func (c *KvExecCounter) mark(target string) { + firstMark := false + c.mu.Lock() + if _, ok := c.marked[target]; !ok { + c.marked[target] = struct{}{} + firstMark = true + } + c.mu.Unlock() + if firstMark { + c.stats.addKvExecCount([]byte(c.digest.SQLDigest), []byte(c.digest.PlanDigest), target, 1) + } +} diff --git a/util/topsql/stmtstats/kv_exec_count_test.go b/util/topsql/stmtstats/kv_exec_count_test.go new file mode 100644 index 0000000000000..c55a5300c0891 --- /dev/null +++ b/util/topsql/stmtstats/kv_exec_count_test.go @@ -0,0 +1,43 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikvrpc" +) + +func TestKvExecCounter(t *testing.T) { + stats := CreateStatementStats() + counter := stats.CreateKvExecCounter([]byte("SQL-1"), []byte("")) + interceptor := counter.RPCInterceptor() + for n := 0; n < 10; n++ { + _, _ = interceptor(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return nil, nil + })("TIKV-1", nil) + } + for n := 0; n < 10; n++ { + _, _ = interceptor(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return nil, nil + })("TIKV-2", nil) + } + assert.Len(t, counter.marked, 2) + assert.Contains(t, counter.marked, "TIKV-1") + assert.Contains(t, counter.marked, "TIKV-2") + assert.NotNil(t, stats.data[SQLPlanDigest{SQLDigest: "SQL-1"}]) + assert.Equal(t, uint64(1), stats.data[SQLPlanDigest{SQLDigest: "SQL-1"}].KvStatsItem.KvExecCount["TIKV-1"]) +} diff --git a/util/topsql/stmtstats/main_test.go b/util/topsql/stmtstats/main_test.go new file mode 100644 index 0000000000000..24f6c2574c522 --- /dev/null +++ b/util/topsql/stmtstats/main_test.go @@ -0,0 +1,27 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "testing" + + "github.com/pingcap/tidb/util/testbridge" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + goleak.VerifyTestMain(m) +} diff --git a/util/topsql/stmtstats/stmtstats.go b/util/topsql/stmtstats/stmtstats.go new file mode 100644 index 0000000000000..24faa93899cce --- /dev/null +++ b/util/topsql/stmtstats/stmtstats.go @@ -0,0 +1,219 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "sync" + + "go.uber.org/atomic" +) + +var _ StatementObserver = &StatementStats{} + +// StatementObserver is an abstract interface as a callback to the corresponding +// position of TiDB's SQL statement execution process. StatementStats implements +// StatementObserver and performs counting such as SQLExecCount/SQLDuration internally. +// The caller only needs to be responsible for calling different methods at the +// corresponding locations, without paying attention to implementation details. +type StatementObserver interface { + // OnExecutionBegin should be called before statement execution. + OnExecutionBegin(sqlDigest, planDigest []byte) + + // OnExecutionFinished should be called after the statement is executed. + OnExecutionFinished(sqlDigest, planDigest []byte) +} + +// StatementStats is a counter used locally in each session. +// We can use StatementStats to count data such as "the number of SQL executions", +// and it is expected that these statistics will eventually be collected and merged +// in the background. +type StatementStats struct { + mu sync.Mutex + data StatementStatsMap + finished *atomic.Bool +} + +// CreateStatementStats try to create and register an StatementStats. +func CreateStatementStats() *StatementStats { + stats := &StatementStats{ + data: StatementStatsMap{}, + finished: atomic.NewBool(false), + } + globalAggregator.register(stats) + return stats +} + +// OnExecutionBegin implements StatementObserver.OnExecutionBegin. +func (s *StatementStats) OnExecutionBegin(sqlDigest, planDigest []byte) { + s.mu.Lock() + defer s.mu.Unlock() + item := s.GetOrCreateStatementStatsItem(sqlDigest, planDigest) + + item.ExecCount++ + // Count more data here. +} + +// OnExecutionFinished implements StatementObserver.OnExecutionFinished. +func (s *StatementStats) OnExecutionFinished(sqlDigest, planDigest []byte) { + // Count more data here. +} + +// GetOrCreateStatementStatsItem creates the corresponding StatementStatsItem +// for the specified SQLPlanDigest and timestamp if it does not exist before. +// GetOrCreateStatementStatsItem is just a helper function, not responsible for +// concurrency control, so GetOrCreateStatementStatsItem is **not** thread-safe. +func (s *StatementStats) GetOrCreateStatementStatsItem(sqlDigest, planDigest []byte) *StatementStatsItem { + key := SQLPlanDigest{SQLDigest: BinaryDigest(sqlDigest), PlanDigest: BinaryDigest(planDigest)} + item, ok := s.data[key] + if !ok { + s.data[key] = NewStatementStatsItem() + item = s.data[key] + } + return item +} + +// addKvExecCount is used to count the number of executions of a certain SQLPlanDigest for a certain target. +// addKvExecCount is thread-safe. +func (s *StatementStats) addKvExecCount(sqlDigest, planDigest []byte, target string, n uint64) { + s.mu.Lock() + defer s.mu.Unlock() + item := s.GetOrCreateStatementStatsItem(sqlDigest, planDigest) + item.KvStatsItem.KvExecCount[target] += n +} + +// Take takes out all existing StatementStatsMap data from StatementStats. +// Take is thread-safe. +func (s *StatementStats) Take() StatementStatsMap { + s.mu.Lock() + defer s.mu.Unlock() + data := s.data + s.data = StatementStatsMap{} + return data +} + +// SetFinished marks this StatementStats as "finished" and no more counting or +// aggregation should happen. Associated resources will be cleaned up, like background +// aggregators. +// Generally, as the StatementStats is created when a session starts, SetFinished +// should be called when the session ends. +func (s *StatementStats) SetFinished() { + s.finished.Store(true) +} + +// Finished returns whether the StatementStats has been finished. +func (s *StatementStats) Finished() bool { + return s.finished.Load() +} + +// BinaryDigest is converted from parser.Digest.Bytes(), and the purpose +// is to be used as the key of the map. +type BinaryDigest string + +// SQLPlanDigest is used as the key of StatementStatsMap to +// distinguish different sql. +type SQLPlanDigest struct { + SQLDigest BinaryDigest + PlanDigest BinaryDigest +} + +// StatementStatsMap is the local data type of StatementStats. +type StatementStatsMap map[SQLPlanDigest]*StatementStatsItem + +// Merge merges other into StatementStatsMap. +// Values with the same SQLPlanDigest will be merged. +// +// After executing Merge, some pointers in other may be referenced +// by m. So after calling Merge, it is best not to continue to use +// other unless you understand what you are doing. +func (m StatementStatsMap) Merge(other StatementStatsMap) { + if m == nil || other == nil { + return + } + for newDigest, newItem := range other { + item, ok := m[newDigest] + if !ok { + m[newDigest] = newItem + continue + } + item.Merge(newItem) + } +} + +// StatementStatsItem represents a set of mergeable statistics. +// StatementStatsItem is used in a larger data structure to represent +// the stats of a certain SQLPlanDigest under a certain timestamp. +// If there are more indicators that need to be added in the future, +// please add it in StatementStatsItem and implement its aggregation +// in the Merge method. +type StatementStatsItem struct { + // ExecCount represents the number of SQL executions of TiDB. + ExecCount uint64 + + // KvStatsItem contains all indicators of kv layer. + KvStatsItem KvStatementStatsItem +} + +// NewStatementStatsItem creates an empty StatementStatsItem. +func NewStatementStatsItem() *StatementStatsItem { + return &StatementStatsItem{ + KvStatsItem: NewKvStatementStatsItem(), + } +} + +// Merge merges other into StatementStatsItem. +// +// After executing Merge, some pointers in other may be referenced +// by i. So after calling Merge, it is best not to continue to use +// other unless you understand what you are doing. +// +// If you add additional indicators, you need to add their merge code here. +func (i *StatementStatsItem) Merge(other *StatementStatsItem) { + if i == nil || other == nil { + return + } + i.ExecCount += other.ExecCount + i.KvStatsItem.Merge(other.KvStatsItem) +} + +// KvStatementStatsItem is part of StatementStatsItem, it only contains +// indicators of kv layer. +type KvStatementStatsItem struct { + // KvExecCount represents the number of SQL executions of TiKV. + KvExecCount map[string]uint64 +} + +// NewKvStatementStatsItem creates an empty KvStatementStatsItem. +func NewKvStatementStatsItem() KvStatementStatsItem { + return KvStatementStatsItem{ + KvExecCount: map[string]uint64{}, + } +} + +// Merge merges other into KvStatementStatsItem. +// +// After executing Merge, some pointers in other may be referenced +// by i. So after calling Merge, it is best not to continue to use +// other unless you understand what you are doing. +// +// If you add additional indicators, you need to add their merge code here. +func (i *KvStatementStatsItem) Merge(other KvStatementStatsItem) { + if i.KvExecCount == nil { + i.KvExecCount = other.KvExecCount + } else { + for target, count := range other.KvExecCount { + i.KvExecCount[target] += count + } + } +} diff --git a/util/topsql/stmtstats/stmtstats_test.go b/util/topsql/stmtstats/stmtstats_test.go new file mode 100644 index 0000000000000..b78208d918d76 --- /dev/null +++ b/util/topsql/stmtstats/stmtstats_test.go @@ -0,0 +1,187 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "bytes" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +// String is only used for debugging. +func (d SQLPlanDigest) String() string { + bs := bytes.NewBufferString("") + if len(d.SQLDigest) >= 5 { + bs.Write([]byte(d.SQLDigest)[:5]) + } + if len(d.PlanDigest) >= 5 { + bs.WriteRune('-') + bs.Write([]byte(d.PlanDigest)[:5]) + } + return bs.String() +} + +// String is only used for debugging. +func (m StatementStatsMap) String() string { + if len(m) == 0 { + return "StatementStatsMap {}" + } + bs := bytes.NewBufferString("") + bs.WriteString("StatementStatsMap {\n") + for k, v := range m { + bs.WriteString(fmt.Sprintf(" %s => %s\n", k, v)) + } + bs.WriteString("}") + return bs.String() +} + +// String is only used for debugging. +func (i *StatementStatsItem) String() string { + if i == nil { + return "" + } + b, _ := json.Marshal(i) + return string(b) +} + +func TestKvStatementStatsItem_Merge(t *testing.T) { + item1 := KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "127.0.0.1:10001": 1, + "127.0.0.1:10002": 2, + }, + } + item2 := KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "127.0.0.1:10002": 2, + "127.0.0.1:10003": 3, + }, + } + assert.Len(t, item1.KvExecCount, 2) + assert.Len(t, item2.KvExecCount, 2) + item1.Merge(item2) + assert.Len(t, item1.KvExecCount, 3) + assert.Len(t, item2.KvExecCount, 2) + assert.Equal(t, uint64(1), item1.KvExecCount["127.0.0.1:10001"]) + assert.Equal(t, uint64(3), item1.KvExecCount["127.0.0.1:10003"]) + assert.Equal(t, uint64(3), item1.KvExecCount["127.0.0.1:10003"]) +} + +func TestStatementsStatsItem_Merge(t *testing.T) { + item1 := &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: NewKvStatementStatsItem(), + } + item2 := &StatementStatsItem{ + ExecCount: 2, + KvStatsItem: NewKvStatementStatsItem(), + } + item1.Merge(item2) + assert.Equal(t, uint64(3), item1.ExecCount) +} + +func TestStatementStatsMap_Merge(t *testing.T) { + m1 := StatementStatsMap{ + SQLPlanDigest{SQLDigest: "SQL-1"}: &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "KV-1": 1, + "KV-2": 2, + }, + }, + }, + SQLPlanDigest{SQLDigest: "SQL-2"}: &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "KV-1": 1, + "KV-2": 2, + }, + }, + }, + } + m2 := StatementStatsMap{ + SQLPlanDigest{SQLDigest: "SQL-2"}: &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "KV-1": 1, + "KV-2": 2, + }, + }, + }, + SQLPlanDigest{SQLDigest: "SQL-3"}: &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "KV-1": 1, + "KV-2": 2, + }, + }, + }, + } + assert.Len(t, m1, 2) + assert.Len(t, m2, 2) + m1.Merge(m2) + assert.Len(t, m1, 3) + assert.Len(t, m2, 2) + assert.Equal(t, uint64(1), m1[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) + assert.Equal(t, uint64(2), m1[SQLPlanDigest{SQLDigest: "SQL-2"}].ExecCount) + assert.Equal(t, uint64(1), m1[SQLPlanDigest{SQLDigest: "SQL-3"}].ExecCount) + assert.Equal(t, uint64(1), m1[SQLPlanDigest{SQLDigest: "SQL-1"}].KvStatsItem.KvExecCount["KV-1"]) + assert.Equal(t, uint64(2), m1[SQLPlanDigest{SQLDigest: "SQL-1"}].KvStatsItem.KvExecCount["KV-2"]) + assert.Equal(t, uint64(2), m1[SQLPlanDigest{SQLDigest: "SQL-2"}].KvStatsItem.KvExecCount["KV-1"]) + assert.Equal(t, uint64(4), m1[SQLPlanDigest{SQLDigest: "SQL-2"}].KvStatsItem.KvExecCount["KV-2"]) + assert.Equal(t, uint64(1), m1[SQLPlanDigest{SQLDigest: "SQL-3"}].KvStatsItem.KvExecCount["KV-1"]) + assert.Equal(t, uint64(2), m1[SQLPlanDigest{SQLDigest: "SQL-3"}].KvStatsItem.KvExecCount["KV-2"]) + m1.Merge(nil) + assert.Len(t, m1, 3) +} + +func TestCreateStatementStats(t *testing.T) { + stats := CreateStatementStats() + assert.NotNil(t, stats) + _, ok := globalAggregator.statsSet.Load(stats) + assert.True(t, ok) + assert.False(t, stats.Finished()) + stats.SetFinished() + assert.True(t, stats.Finished()) +} + +func TestExecCounter_AddExecCount_Take(t *testing.T) { + stats := CreateStatementStats() + m := stats.Take() + assert.Len(t, m, 0) + for n := 0; n < 1; n++ { + stats.OnExecutionBegin([]byte("SQL-1"), []byte("")) + } + for n := 0; n < 2; n++ { + stats.OnExecutionBegin([]byte("SQL-2"), []byte("")) + } + for n := 0; n < 3; n++ { + stats.OnExecutionBegin([]byte("SQL-3"), []byte("")) + } + m = stats.Take() + assert.Len(t, m, 3) + assert.Equal(t, uint64(1), m[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) + assert.Equal(t, uint64(2), m[SQLPlanDigest{SQLDigest: "SQL-2"}].ExecCount) + assert.Equal(t, uint64(3), m[SQLPlanDigest{SQLDigest: "SQL-3"}].ExecCount) + m = stats.Take() + assert.Len(t, m, 0) +} diff --git a/util/topsql/stmtstats/stmtstatstest/main_test.go b/util/topsql/stmtstats/stmtstatstest/main_test.go new file mode 100644 index 0000000000000..ecf1220642ecf --- /dev/null +++ b/util/topsql/stmtstats/stmtstatstest/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstatstest + +import ( + "testing" + + "github.com/pingcap/tidb/util/testbridge" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + opts := []goleak.Option{ + goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + goleak.VerifyTestMain(m, opts...) +} diff --git a/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go b/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go new file mode 100644 index 0000000000000..d37de52178e0e --- /dev/null +++ b/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go @@ -0,0 +1,150 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstatstest + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/topsql/stmtstats" + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/testutils" +) + +func TestExecCount(t *testing.T) { + // Prepare stmt stats. + stmtstats.SetupAggregator() + defer stmtstats.CloseAggregator() + + // Register stmt stats collector. + var mu sync.Mutex + total := stmtstats.StatementStatsMap{} + stmtstats.RegisterCollector(newMockCollector(func(rs []stmtstats.StatementStatsRecord) { + mu.Lock() + defer mu.Unlock() + for _, r := range rs { + total.Merge(r.Data) + } + })) + + // Create mock store. + store, err := mockstore.NewMockStore(mockstore.WithClusterInspector(func(c testutils.Cluster) { + mockstore.BootstrapWithSingleStore(c) + })) + assert.NoError(t, err) + defer func() { + assert.NoError(t, store.Close()) + }() + + // Prepare mock store. + session.SetSchemaLease(0) + session.DisableStats4Test() + d, err := session.BootstrapSession(store) + assert.NoError(t, err) + defer d.Close() + d.SetStatsUpdating(true) + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + + // Create table for testing. + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int);") + + // Enable TopSQL + variable.TopSQLVariable.Enable.Store(true) + config.UpdateGlobal(func(conf *config.Config) { + conf.TopSQL.ReceiverAddress = "mock-agent" + }) + + // Execute CRUD. + const ExecCountPerSQL = 100 + _, insertSQLDigest := parser.NormalizeDigest("insert into t values (0);") + for n := 0; n < ExecCountPerSQL; n++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d);", n)) + } + _, updateSQLDigest := parser.NormalizeDigest("update t set a = 0 where a = 0;") + for n := 0; n < ExecCountPerSQL; n++ { + tk.MustExec(fmt.Sprintf("update t set a = %d where a = %d;", n, n)) + } + _, selectSQLDigest := parser.NormalizeDigest("select a from t where a = 0;") + for n := 0; n < ExecCountPerSQL; n++ { + tk.MustQuery(fmt.Sprintf("select a from t where a = %d;", n)) + } + _, deleteSQLDigest := parser.NormalizeDigest("delete from t where a = 0;") + for n := 1; n <= ExecCountPerSQL; n++ { + tk.MustExec(fmt.Sprintf("delete from t where a = %d;", n)) + } + + // Wait for collect. + time.Sleep(2 * time.Second) + + // Assertion. + func() { + mu.Lock() + defer mu.Unlock() + + assert.NotEmpty(t, total) + sqlDigests := map[stmtstats.BinaryDigest]struct{}{ + stmtstats.BinaryDigest(insertSQLDigest.Bytes()): {}, + stmtstats.BinaryDigest(updateSQLDigest.Bytes()): {}, + stmtstats.BinaryDigest(selectSQLDigest.Bytes()): {}, + stmtstats.BinaryDigest(deleteSQLDigest.Bytes()): {}, + } + found := 0 + for digest, item := range total { + if _, ok := sqlDigests[digest.SQLDigest]; ok { + found++ + assert.Equal(t, uint64(ExecCountPerSQL), item.ExecCount) + var kvSum uint64 + for _, kvCount := range item.KvStatsItem.KvExecCount { + kvSum += kvCount + } + assert.Equal(t, uint64(ExecCountPerSQL), kvSum) + } + } + assert.Equal(t, 4, found) // insert, update, select, delete + }() + + // Drop table. + tk.MustExec("use test") + r := tk.MustQuery("show tables") + for _, tb := range r.Rows() { + tableName := tb[0] + tk.MustExec(fmt.Sprintf("drop table %v", tableName)) + } +} + +type mockCollector struct { + f func(records []stmtstats.StatementStatsRecord) +} + +func newMockCollector(f func(records []stmtstats.StatementStatsRecord)) stmtstats.Collector { + return &mockCollector{f: f} +} + +func (c *mockCollector) CollectStmtStatsRecords(records []stmtstats.StatementStatsRecord) { + c.f(records) +} diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 8f2aac1566642..12a9da430e7c0 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/topsql/reporter" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tidb/util/topsql/tracecpu" "go.uber.org/zap" ) @@ -50,6 +51,7 @@ func SetupTopSQL() { tracecpu.GlobalSQLCPUProfiler.SetCollector(remoteReporter) tracecpu.GlobalSQLCPUProfiler.Run() + stmtstats.SetupAggregator() } // Close uses to close and release the top sql resource. @@ -60,6 +62,7 @@ func Close() { if globalTopSQLReport != nil { globalTopSQLReport.Close() } + stmtstats.CloseAggregator() } // AttachSQLInfo attach the sql information info top sql. From a8a858bac27c618fa3c4f28daf79393a7b67b8ee Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 21 Dec 2021 16:17:47 +0800 Subject: [PATCH 03/11] topsql: add pubsub datasink (#30860) --- server/rpc_server.go | 2 + sessionctx/variable/tidb_vars.go | 2 +- util/topsql/reporter/mock/pubsub.go | 67 +++++++ util/topsql/reporter/pubsub.go | 267 ++++++++++++++++++++++++++ util/topsql/reporter/single_target.go | 49 +++-- util/topsql/topsql.go | 12 +- util/topsql/topsql_test.go | 160 +++++++++++++++ 7 files changed, 537 insertions(+), 22 deletions(-) create mode 100644 util/topsql/reporter/mock/pubsub.go create mode 100644 util/topsql/reporter/pubsub.go diff --git a/server/rpc_server.go b/server/rpc_server.go index 674047781a6bd..3b23539c0bac1 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/topsql" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -64,6 +65,7 @@ func NewRPCServer(config *config.Config, dom *domain.Domain, sm util.SessionMana } diagnosticspb.RegisterDiagnosticsServer(s, rpcSrv) tikvpb.RegisterTikvServer(s, rpcSrv) + topsql.RegisterPubSubServer(s) return s } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index ee01348a76441..ccec9e1b5a8fc 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -832,5 +832,5 @@ type TopSQL struct { // TopSQLEnabled uses to check whether enabled the top SQL feature. func TopSQLEnabled() bool { - return TopSQLVariable.Enable.Load() && config.GetGlobalConfig().TopSQL.ReceiverAddress != "" + return TopSQLVariable.Enable.Load() } diff --git a/util/topsql/reporter/mock/pubsub.go b/util/topsql/reporter/mock/pubsub.go new file mode 100644 index 0000000000000..493d95c17f827 --- /dev/null +++ b/util/topsql/reporter/mock/pubsub.go @@ -0,0 +1,67 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "fmt" + "net" + + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type mockPubSubServer struct { + addr string + listen net.Listener + grpcServer *grpc.Server +} + +// NewMockPubSubServer creates a mock publisher server. +func NewMockPubSubServer() (*mockPubSubServer, error) { + addr := "127.0.0.1:0" + lis, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + server := grpc.NewServer() + + return &mockPubSubServer{ + addr: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port), + listen: lis, + grpcServer: server, + }, nil +} + +func (svr *mockPubSubServer) Serve() { + err := svr.grpcServer.Serve(svr.listen) + if err != nil { + logutil.BgLogger().Warn("[top-sql] mock pubsub server serve failed", zap.Error(err)) + } +} + +func (svr *mockPubSubServer) Server() *grpc.Server { + return svr.grpcServer +} + +func (svr *mockPubSubServer) Address() string { + return svr.addr +} + +func (svr *mockPubSubServer) Stop() { + if svr.grpcServer != nil { + svr.grpcServer.Stop() + } +} diff --git a/util/topsql/reporter/pubsub.go b/util/topsql/reporter/pubsub.go new file mode 100644 index 0000000000000..7d01c077e058f --- /dev/null +++ b/util/topsql/reporter/pubsub.go @@ -0,0 +1,267 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "context" + "errors" + "time" + + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" +) + +// TopSQLPubSubService implements tipb.TopSQLPubSubServer. +// +// If a client subscribes to TopSQL records, the TopSQLPubSubService is responsible +// for registering an associated DataSink to the reporter. Then the DataSink sends +// data to the client periodically. +type TopSQLPubSubService struct { + dataSinkRegisterer DataSinkRegisterer +} + +// NewTopSQLPubSubService creates a new TopSQLPubSubService. +func NewTopSQLPubSubService(dataSinkRegisterer DataSinkRegisterer) *TopSQLPubSubService { + return &TopSQLPubSubService{dataSinkRegisterer: dataSinkRegisterer} +} + +var _ tipb.TopSQLPubSubServer = &TopSQLPubSubService{} + +// Subscribe registers dataSinks to the reporter and redirects data received from reporter +// to subscribers associated with those dataSinks. +func (ps *TopSQLPubSubService) Subscribe(_ *tipb.TopSQLSubRequest, stream tipb.TopSQLPubSub_SubscribeServer) error { + ds := newPubSubDataSink(stream, ps.dataSinkRegisterer) + if err := ps.dataSinkRegisterer.Register(ds); err != nil { + return err + } + return ds.run() +} + +type pubSubDataSink struct { + ctx context.Context + cancel context.CancelFunc + + stream tipb.TopSQLPubSub_SubscribeServer + sendTaskCh chan sendTask + + // for deregister + registerer DataSinkRegisterer +} + +func newPubSubDataSink(stream tipb.TopSQLPubSub_SubscribeServer, registerer DataSinkRegisterer) *pubSubDataSink { + ctx, cancel := context.WithCancel(stream.Context()) + + return &pubSubDataSink{ + ctx: ctx, + cancel: cancel, + + stream: stream, + sendTaskCh: make(chan sendTask, 1), + + registerer: registerer, + } +} + +var _ DataSink = &pubSubDataSink{} + +func (ds *pubSubDataSink) TrySend(data *ReportData, deadline time.Time) error { + select { + case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}: + return nil + case <-ds.ctx.Done(): + return ds.ctx.Err() + default: + ignoreReportChannelFullCounter.Inc() + return errors.New("the channel of pubsub dataSink is full") + } +} + +func (ds *pubSubDataSink) OnReporterClosing() { + ds.cancel() +} + +func (ds *pubSubDataSink) run() error { + defer func() { + ds.registerer.Deregister(ds) + ds.cancel() + }() + + for { + select { + case task := <-ds.sendTaskCh: + ctx, cancel := context.WithDeadline(ds.ctx, task.deadline) + var err error + + start := time.Now() + go util.WithRecovery(func() { + defer cancel() + err = ds.doSend(ctx, task.data) + + if err != nil { + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }, nil) + + // When the deadline is exceeded, the closure inside `go util.WithRecovery` above may not notice that + // immediately because it can be blocked by `stream.Send`. + // In order to clean up resources as quickly as possible, we let that closure run in an individual goroutine, + // and wait for timeout here. + <-ctx.Done() + + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + logutil.BgLogger().Warn( + "[top-sql] pubsub datasink failed to send data to subscriber due to deadline exceeded", + zap.Time("deadline", task.deadline), + ) + return ctx.Err() + } + + if err != nil { + logutil.BgLogger().Warn( + "[top-sql] pubsub datasink failed to send data to subscriber", + zap.Error(err), + ) + return err + } + case <-ds.ctx.Done(): + return ds.ctx.Err() + } + } +} + +func (ds *pubSubDataSink) doSend(ctx context.Context, data *ReportData) error { + if err := ds.sendCPUTime(ctx, data.CPUTimeRecords); err != nil { + return err + } + if err := ds.sendSQLMeta(ctx, data.SQLMetas); err != nil { + return err + } + return ds.sendPlanMeta(ctx, data.PlanMetas) +} + +func (ds *pubSubDataSink) sendCPUTime(ctx context.Context, records []tipb.CPUTimeRecord) (err error) { + if len(records) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) + if err != nil { + reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + cpuRecord := &tipb.TopSQLSubResponse_Record{} + r := &tipb.TopSQLSubResponse{RespOneof: cpuRecord} + + for i := range records { + cpuRecord.Record = &records[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} + +func (ds *pubSubDataSink) sendSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) { + if len(sqlMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportSQLCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + sqlMeta := &tipb.TopSQLSubResponse_SqlMeta{} + r := &tipb.TopSQLSubResponse{RespOneof: sqlMeta} + + for i := range sqlMetas { + sqlMeta.SqlMeta = &sqlMetas[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} + +func (ds *pubSubDataSink) sendPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) { + if len(planMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportPlanCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + planMeta := &tipb.TopSQLSubResponse_PlanMeta{} + r := &tipb.TopSQLSubResponse{RespOneof: planMeta} + + for i := range planMetas { + planMeta.PlanMeta = &planMetas[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 3ea61d75f633a..3744702ba26d6 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -38,7 +39,7 @@ type SingleTargetDataSink struct { conn *grpc.ClientConn sendTaskCh chan sendTask - registered bool + registered *atomic.Bool registerer DataSinkRegisterer } @@ -53,7 +54,7 @@ func NewSingleTargetDataSink(registerer DataSinkRegisterer) *SingleTargetDataSin conn: nil, sendTaskCh: make(chan sendTask, 1), - registered: false, + registered: atomic.NewBool(false), registerer: registerer, } @@ -64,6 +65,7 @@ func NewSingleTargetDataSink(registerer DataSinkRegisterer) *SingleTargetDataSin logutil.BgLogger().Warn("failed to register single target datasink", zap.Error(err)) return nil } + dataSink.registered.Store(true) } go dataSink.recoverRun() @@ -111,25 +113,27 @@ func (ds *SingleTargetDataSink) run() (rerun bool) { targetRPCAddr = config.GetGlobalConfig().TopSQL.ReceiverAddress } - if err := ds.tryRegister(targetRPCAddr); err != nil { - logutil.BgLogger().Warn("failed to register the single target datasink", zap.Error(err)) + if err := ds.trySwitchRegistration(targetRPCAddr); err != nil { return false } } } -func (ds *SingleTargetDataSink) tryRegister(addr string) error { - if addr == "" && ds.registered { +func (ds *SingleTargetDataSink) trySwitchRegistration(addr string) error { + // deregister if `addr` is empty and registered before + if addr == "" && ds.registered.Load() { ds.registerer.Deregister(ds) - ds.registered = false + ds.registered.Store(false) return nil } - if addr != "" && !ds.registered { + // register if `add` is not empty and not registered before + if addr != "" && !ds.registered.Load() { if err := ds.registerer.Register(ds); err != nil { + logutil.BgLogger().Warn("failed to register the single target datasink", zap.Error(err)) return err } - ds.registered = true + ds.registered.Store(true) } return nil } @@ -160,8 +164,9 @@ func (ds *SingleTargetDataSink) OnReporterClosing() { func (ds *SingleTargetDataSink) Close() { ds.cancel() - if ds.registered { + if ds.registered.Load() { ds.registerer.Deregister(ds) + ds.registered.Store(false) } } @@ -172,12 +177,14 @@ func (ds *SingleTargetDataSink) doSend(addr string, task sendTask) { var err error start := time.Now() - if err != nil { - logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) - reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) - } else { - reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) - } + defer func() { + if err != nil { + logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() ctx, cancel := context.WithDeadline(context.Background(), task.deadline) defer cancel() @@ -223,8 +230,9 @@ func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, reco topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) if err != nil { reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn) @@ -254,11 +262,11 @@ func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas [ sentCount := 0 defer func() { topSQLReportSQLCountHistogram.Observe(float64(sentCount)) - if err != nil { reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn) @@ -291,8 +299,9 @@ func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas topSQLReportPlanCountHistogram.Observe(float64(sentCount)) if err != nil { reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn) diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 12a9da430e7c0..ccc1f61eef0a5 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -27,7 +27,9 @@ import ( "github.com/pingcap/tidb/util/topsql/reporter" "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" + "google.golang.org/grpc" ) const ( @@ -38,7 +40,7 @@ const ( ) var ( - globalTopSQLReport reporter.TopSQLReporter + globalTopSQLReport *reporter.RemoteTopSQLReporter singleTargetDataSink *reporter.SingleTargetDataSink ) @@ -54,6 +56,14 @@ func SetupTopSQL() { stmtstats.SetupAggregator() } +// RegisterPubSubServer registers TopSQLPubSubService to the given gRPC server. +func RegisterPubSubServer(s *grpc.Server) { + if globalTopSQLReport != nil { + service := reporter.NewTopSQLPubSubService(globalTopSQLReport) + tipb.RegisterTopSQLPubSubServer(s, service) + } +} + // Close uses to close and release the top sql resource. func Close() { if singleTargetDataSink != nil { diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index d4aabc746ce0e..462c6ffb70aa2 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -29,7 +29,10 @@ import ( mockServer "github.com/pingcap/tidb/util/topsql/reporter/mock" "github.com/pingcap/tidb/util/topsql/tracecpu" "github.com/pingcap/tidb/util/topsql/tracecpu/mock" + "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type collectorWrapper struct { @@ -213,6 +216,163 @@ func TestMaxSQLAndPlanTest(t *testing.T) { require.Empty(t, cPlan) } +func TestTopSQLPubSub(t *testing.T) { + variable.TopSQLVariable.MaxStatementCount.Store(200) + variable.TopSQLVariable.ReportIntervalSeconds.Store(1) + + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + defer report.Close() + tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{report}) + + server, err := mockServer.NewMockPubSubServer() + require.NoError(t, err) + pubsubService := reporter.NewTopSQLPubSubService(report) + tipb.RegisterTopSQLPubSubServer(server.Server(), pubsubService) + go server.Serve() + defer server.Stop() + + conn, err := grpc.Dial( + server.Address(), + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + }), + ) + require.NoError(t, err) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client := tipb.NewTopSQLPubSubClient(conn) + stream, err := client.Subscribe(ctx, &tipb.TopSQLSubRequest{}) + require.NoError(t, err) + + reqs := []struct { + sql string + plan string + }{ + {"select * from t where a=?", "point-get"}, + {"select * from t where a>?", "table-scan"}, + {"insert into t values (?)", ""}, + } + + digest2sql := make(map[string]string) + sql2plan := make(map[string]string) + for _, req := range reqs { + sql2plan[req.sql] = req.plan + sqlDigest := mock.GenSQLDigest(req.sql) + digest2sql[string(sqlDigest.Bytes())] = req.sql + + go func(sql, plan string) { + for { + select { + case <-ctx.Done(): + return + default: + mockExecuteSQL(sql, plan) + } + } + }(req.sql, req.plan) + } + + sqlMetas := make(map[string]*tipb.SQLMeta) + planMetas := make(map[string]string) + records := make(map[string]*tipb.CPUTimeRecord) + + for { + r, err := stream.Recv() + if err != nil { + break + } + + if r.GetRecord() != nil { + rec := r.GetRecord() + if _, ok := records[string(rec.SqlDigest)]; !ok { + records[string(rec.SqlDigest)] = rec + } else { + cpu := records[string(rec.SqlDigest)] + if rec.PlanDigest != nil { + cpu.PlanDigest = rec.PlanDigest + } + cpu.RecordListTimestampSec = append(cpu.RecordListTimestampSec, rec.RecordListTimestampSec...) + cpu.RecordListCpuTimeMs = append(cpu.RecordListCpuTimeMs, rec.RecordListCpuTimeMs...) + } + } else if r.GetSqlMeta() != nil { + sql := r.GetSqlMeta() + if _, ok := sqlMetas[string(sql.SqlDigest)]; !ok { + sqlMetas[string(sql.SqlDigest)] = sql + } + } else if r.GetPlanMeta() != nil { + plan := r.GetPlanMeta() + if _, ok := planMetas[string(plan.PlanDigest)]; !ok { + planMetas[string(plan.PlanDigest)] = plan.NormalizedPlan + } + } + } + + checkSQLPlanMap := map[string]struct{}{} + for i := range records { + record := records[i] + require.Greater(t, len(record.RecordListCpuTimeMs), 0) + require.Greater(t, record.RecordListCpuTimeMs[0], uint32(0)) + sqlMeta, exist := sqlMetas[string(record.SqlDigest)] + require.True(t, exist) + expectedNormalizedSQL, exist := digest2sql[string(record.SqlDigest)] + require.True(t, exist) + require.Equal(t, expectedNormalizedSQL, sqlMeta.NormalizedSql) + + expectedNormalizedPlan := sql2plan[expectedNormalizedSQL] + if expectedNormalizedPlan == "" || len(record.PlanDigest) == 0 { + require.Equal(t, len(record.PlanDigest), 0) + continue + } + normalizedPlan, exist := planMetas[string(record.PlanDigest)] + require.True(t, exist) + require.Equal(t, expectedNormalizedPlan, normalizedPlan) + checkSQLPlanMap[expectedNormalizedSQL] = struct{}{} + } + require.Equal(t, len(checkSQLPlanMap), 2) +} + +func TestPubSubWhenReporterIsStopped(t *testing.T) { + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + + server, err := mockServer.NewMockPubSubServer() + require.NoError(t, err) + + pubsubService := reporter.NewTopSQLPubSubService(report) + tipb.RegisterTopSQLPubSubServer(server.Server(), pubsubService) + go server.Serve() + defer server.Stop() + + // stop reporter first + report.Close() + + // try to subscribe + conn, err := grpc.Dial( + server.Address(), + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + }), + ) + require.NoError(t, err) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client := tipb.NewTopSQLPubSubClient(conn) + stream, err := client.Subscribe(ctx, &tipb.TopSQLSubRequest{}) + require.NoError(t, err) + + _, err = stream.Recv() + require.Error(t, err, "reporter is closed") +} + func setTopSQLEnable(enabled bool) { variable.TopSQLVariable.Enable.Store(enabled) } From 2b7ce8e61ffb565c7a1fde86f7fd767210793526 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Tue, 21 Dec 2021 16:57:46 +0800 Subject: [PATCH 04/11] executor: fix the incorrect untouch used in optimistic transactions (#30447) close pingcap/tidb#30410 --- go.mod | 2 +- go.sum | 4 ++-- session/session.go | 8 +++++--- store/driver/txn/txn_driver.go | 15 +++++++++++++-- table/tables/index.go | 18 ++++++++++++++++-- table/tables/tables_test.go | 26 ++++++++++++++++++++++++++ 6 files changed, 63 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 6f2a22ed5dd5d..f11eab1cf0456 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0 + github.com/tikv/client-go/v2 v2.0.0-rc.0.20211221041211-e9de5625c45c github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee github.com/twmb/murmur3 v1.1.3 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index 36c8604668b6b..2a4c23d9d70bc 100644 --- a/go.sum +++ b/go.sum @@ -712,8 +712,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0 h1:38Jst/O36MKXAt7aD1Ipnx4nKwclG66ifkcmi4f0NZ4= -github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211221041211-e9de5625c45c h1:1P6iN1csRSZNHXuaylArmG3/bA5MpYVzc9ZkdHK/L2Y= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211221041211-e9de5625c45c/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI= diff --git a/session/session.go b/session/session.go index b7171a79c9525..561120ff4b9cc 100644 --- a/session/session.go +++ b/session/session.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/util/topsql" @@ -753,14 +754,15 @@ func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transac type temporaryTableKVFilter map[int64]tableutil.TempTable -func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool { +func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) { tid := tablecodec.DecodeTableID(key) if _, ok := m[tid]; ok { - return true + return true, nil } // This is the default filter for all tables. - return tablecodec.IsUntouchedIndexKValue(key, value) + defaultFilter := txn.TiDBKVFilter{} + return defaultFilter.IsUnnecessaryKeyValue(key, value, flags) } // errIsNoisy is used to filter DUPLCATE KEY errors. diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index bb9e38a4f3c03..730dba0c3c7fb 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -28,12 +28,14 @@ import ( derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/driver/options" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/logutil" tikverr "github.com/tikv/client-go/v2/error" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" + "go.uber.org/zap" ) type tikvTxn struct { @@ -292,6 +294,15 @@ func (txn *tikvTxn) extractKeyExistsErr(key kv.Key) error { type TiDBKVFilter struct{} // IsUnnecessaryKeyValue defines which kinds of KV pairs from TiDB needn't be committed. -func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool { - return tablecodec.IsUntouchedIndexKValue(key, value) +func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) { + isUntouchedValue := tablecodec.IsUntouchedIndexKValue(key, value) + if isUntouchedValue && flags.HasPresumeKeyNotExists() { + logutil.BgLogger().Error("unexpected path the untouched key value with PresumeKeyNotExists flag", + zap.Stringer("key", kv.Key(key)), zap.Stringer("value", kv.Key(value)), + zap.Uint16("flags", uint16(flags)), zap.Stack("stack")) + return false, errors.Errorf( + "unexpected path the untouched key=%s value=%s contains PresumeKeyNotExists flag keyFlags=%v", + kv.Key(key).String(), kv.Key(value).String(), flags) + } + return isUntouchedValue, nil } diff --git a/table/tables/index.go b/table/tables/index.go index 08d3ecef1f820..8350925fe15b4 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -169,8 +169,22 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue // should not overwrite the key with un-commit flag. // So if the key exists, just do nothing and return. v, err := txn.GetMemBuffer().Get(ctx, key) - if err == nil && len(v) != 0 { - return nil, nil + if err == nil { + if len(v) != 0 { + return nil, nil + } + // The key is marked as deleted in the memory buffer, as the existence check is done lazily + // for optimistic transactions by default. The "untouched" key could still exist in the store, + // it's needed to commit this key to do the existence check so unset the untouched flag. + if !txn.IsPessimistic() { + keyFlags, err := txn.GetMemBuffer().GetFlags(key) + if err != nil { + return nil, err + } + if keyFlags.HasPresumeKeyNotExists() { + opt.Untouched = false + } + } } } diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index b093e96c6be38..dcf3507d92c49 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -736,3 +736,29 @@ func TestViewColumns(t *testing.T) { "Warning|1356|View 'test.va' references invalid table(s) or column(s) or function(s) or definer/invoker of view lack rights to use them")) } } + +func TestConstraintCheckForOptimisticUntouched(t *testing.T) { + t.Parallel() + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists test_optimistic_untouched_flag;") + tk.MustExec(`create table test_optimistic_untouched_flag(c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`) + tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, null, 'green');`) + + // Insert a row with duplicated entry on the unique key, the commit should fail. + tk.MustExec("begin optimistic;") + tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, 'red', 'white');`) + tk.MustExec(`delete from test_optimistic_untouched_flag where c1 is null;`) + tk.MustExec("update test_optimistic_untouched_flag set c2 = 'green' where c2 between 'purple' and 'white';") + err := tk.ExecToErr("commit") + require.Error(t, err) + + tk.MustExec("begin optimistic;") + tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, 'red', 'white');`) + tk.MustExec("update test_optimistic_untouched_flag set c2 = 'green' where c2 between 'purple' and 'white';") + err = tk.ExecToErr("commit") + require.Error(t, err) +} From 55a38cb1627b2f5476e5b6fe0459d7b9a5706fea Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 21 Dec 2021 17:27:47 +0800 Subject: [PATCH 05/11] expression, cmd: let crc32() support gbk (#30900) close pingcap/tidb#30898 --- .../r/new_character_set_builtin.result | 11 +++++ .../t/new_character_set_builtin.test | 9 ++++ expression/builtin_convert_charset.go | 2 + expression/builtin_math_test.go | 43 +++++++++++-------- 4 files changed, 48 insertions(+), 17 deletions(-) diff --git a/cmd/explaintest/r/new_character_set_builtin.result b/cmd/explaintest/r/new_character_set_builtin.result index f587a5ac1370e..74673a02b0832 100644 --- a/cmd/explaintest/r/new_character_set_builtin.result +++ b/cmd/explaintest/r/new_character_set_builtin.result @@ -523,3 +523,14 @@ select hex(aes_encrypt(a, '123')), hex(aes_encrypt(b, '123')), hex(aes_encrypt(c hex(aes_encrypt(a, '123')) hex(aes_encrypt(b, '123')) hex(aes_encrypt(c, '123')) C54279F381B0710E145E94106F03C94C 7A747EC6F1906276D036B1F3CE27BAAB A0E5E01289017B8A3691CCFBDE81A59ED4A9D5BF50A298D41287E395CDDCAD56 set @@tidb_enable_vectorized_expression = false; +drop table if exists t; +create table t (a char(20) charset utf8mb4, b char(20) charset gbk, c binary(20)); +insert into t values ('一二三', '一二三', '一二三'); +select crc32(a), crc32(b), crc32(c) from t; +crc32(a) crc32(b) crc32(c) +1785250883 3461331449 4092198678 +set @@tidb_enable_vectorized_expression = true; +select crc32(a), crc32(b), crc32(c) from t; +crc32(a) crc32(b) crc32(c) +1785250883 3461331449 4092198678 +set @@tidb_enable_vectorized_expression = false; diff --git a/cmd/explaintest/t/new_character_set_builtin.test b/cmd/explaintest/t/new_character_set_builtin.test index bb0a6321e8a53..ae9ab76f9093e 100644 --- a/cmd/explaintest/t/new_character_set_builtin.test +++ b/cmd/explaintest/t/new_character_set_builtin.test @@ -239,3 +239,12 @@ select hex(aes_encrypt(a, '123', '1234567890123456')), hex(aes_encrypt(b, '123', set @@block_encryption_mode='aes-128-ecb'; select hex(aes_encrypt(a, '123')), hex(aes_encrypt(b, '123')), hex(aes_encrypt(c, '123')) from t; set @@tidb_enable_vectorized_expression = false; + +-- test for builtin crc32() +drop table if exists t; +create table t (a char(20) charset utf8mb4, b char(20) charset gbk, c binary(20)); +insert into t values ('一二三', '一二三', '一二三'); +select crc32(a), crc32(b), crc32(c) from t; +set @@tidb_enable_vectorized_expression = true; +select crc32(a), crc32(b), crc32(c) from t; +set @@tidb_enable_vectorized_expression = false; diff --git a/expression/builtin_convert_charset.go b/expression/builtin_convert_charset.go index 5c0ea98f752a5..bc7ad4dfafbbe 100644 --- a/expression/builtin_convert_charset.go +++ b/expression/builtin_convert_charset.go @@ -287,6 +287,8 @@ var convertActionMap = map[funcProp][]string{ ast.Like, ast.Strcmp, /* regex */ ast.Regexp, + /* math */ + ast.CRC32, }, } diff --git a/expression/builtin_math_test.go b/expression/builtin_math_test.go index 898693d14b394..d1ae3ca6ed30c 100644 --- a/expression/builtin_math_test.go +++ b/expression/builtin_math_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/charset" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit/trequire" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -531,27 +532,35 @@ func TestTruncate(t *testing.T) { func TestCRC32(t *testing.T) { ctx := createContext(t) tbl := []struct { - Arg []interface{} - Ret interface{} + input []interface{} + chs string + result int64 + isNull bool }{ - {[]interface{}{nil}, nil}, - {[]interface{}{""}, 0}, - {[]interface{}{-1}, 808273962}, - {[]interface{}{"-1"}, 808273962}, - {[]interface{}{"mysql"}, 2501908538}, - {[]interface{}{"MySQL"}, 3259397556}, - {[]interface{}{"hello"}, 907060870}, + {[]interface{}{nil}, "utf8", 0, true}, + {[]interface{}{""}, "utf8", 0, false}, + {[]interface{}{-1}, "utf8", 808273962, false}, + {[]interface{}{"-1"}, "utf8", 808273962, false}, + {[]interface{}{"mysql"}, "utf8", 2501908538, false}, + {[]interface{}{"MySQL"}, "utf8", 3259397556, false}, + {[]interface{}{"hello"}, "utf8", 907060870, false}, + {[]interface{}{"一二三"}, "utf8", 1785250883, false}, + {[]interface{}{"一"}, "utf8", 2416838398, false}, + {[]interface{}{"一二三"}, "gbk", 3461331449, false}, + {[]interface{}{"一"}, "gbk", 2925846374, false}, } - - Dtbl := tblToDtbl(tbl) - - for _, tt := range Dtbl { - fc := funcs[ast.CRC32] - f, err := fc.getFunction(ctx, datumsToConstants(tt["Arg"])) + for _, c := range tbl { + err := ctx.GetSessionVars().SetSystemVar(variable.CharacterSetConnection, c.chs) require.NoError(t, err) - v, err := evalBuiltinFunc(f, chunk.Row{}) + f, err := newFunctionForTest(ctx, ast.CRC32, primitiveValsToConstants(ctx, c.input)...) require.NoError(t, err) - trequire.DatumEqual(t, tt["Ret"][0], v) + d, err := f.Eval(chunk.Row{}) + require.NoError(t, err) + if c.isNull { + require.True(t, d.IsNull()) + } else { + require.Equal(t, c.result, d.GetInt64()) + } } } From feee7c258ac7799499f561d281434b243c4fc92e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Tue, 21 Dec 2021 10:53:48 +0100 Subject: [PATCH 06/11] server: Add uptime status var and statistics (#29790) close pingcap/tidb#8842 --- domain/infosync/info.go | 2 +- domain/infosync/info_test.go | 4 +-- server/conn.go | 15 +++++++-- server/stat.go | 14 +++++++++ server/stat_test.go | 61 ++++++++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 server/stat_test.go diff --git a/domain/infosync/info.go b/domain/infosync/info.go index fc58783ff3108..8aec3c35275e8 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -856,7 +856,7 @@ func getServerInfo(id string, serverIDGetter func() uint64) *ServerInfo { failpoint.Inject("mockServerInfo", func(val failpoint.Value) { if val.(bool) { - info.StartTimestamp = 1282967700000 + info.StartTimestamp = 1282967700 info.Labels = map[string]string{ "foo": "bar", } diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index 001a106632230..bb0ded60e7e25 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -72,7 +72,7 @@ func TestTopology(t *testing.T) { topology, err := info.getTopologyFromEtcd(ctx) require.NoError(t, err) - require.Equal(t, int64(1282967700000), topology.StartTimestamp) + require.Equal(t, int64(1282967700), topology.StartTimestamp) v, ok := topology.Labels["foo"] require.True(t, ok) @@ -97,7 +97,7 @@ func TestTopology(t *testing.T) { dir := path.Dir(s) require.Equal(t, dir, topology.DeployPath) - require.Equal(t, int64(1282967700000), topology.StartTimestamp) + require.Equal(t, int64(1282967700), topology.StartTimestamp) require.Equal(t, info.getTopologyInfo(), *topology) // check ttl key diff --git a/server/conn.go b/server/conn.go index e9325a6dd97d6..ede5a12bf276d 100644 --- a/server/conn.go +++ b/server/conn.go @@ -59,6 +59,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" @@ -1374,11 +1375,21 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { } func (cc *clientConn) writeStats(ctx context.Context) error { - msg := []byte("Uptime: 0 Threads: 0 Questions: 0 Slow queries: 0 Opens: 0 Flush tables: 0 Open tables: 0 Queries per second avg: 0.000") + var err error + var uptime int64 = 0 + info := serverInfo{} + info.ServerInfo, err = infosync.GetServerInfo() + if err != nil { + logutil.BgLogger().Error("Failed to get ServerInfo for uptime status", zap.Error(err)) + } else { + uptime = int64(time.Since(time.Unix(info.ServerInfo.StartTimestamp, 0)).Seconds()) + } + msg := []byte(fmt.Sprintf("Uptime: %d Threads: 0 Questions: 0 Slow queries: 0 Opens: 0 Flush tables: 0 Open tables: 0 Queries per second avg: 0.000", + uptime)) data := cc.alloc.AllocWithLen(4, len(msg)) data = append(data, msg...) - err := cc.writePacket(data) + err = cc.writePacket(data) if err != nil { return err } diff --git a/server/stat.go b/server/stat.go index 9725a7ec5e480..382a68e701ce0 100644 --- a/server/stat.go +++ b/server/stat.go @@ -16,7 +16,9 @@ package server import ( "crypto/x509" + "time" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -25,11 +27,13 @@ import ( var ( serverNotAfter = "Ssl_server_not_after" serverNotBefore = "Ssl_server_not_before" + upTime = "Uptime" ) var defaultStatus = map[string]*variable.StatusVal{ serverNotAfter: {Scope: variable.ScopeGlobal | variable.ScopeSession, Value: ""}, serverNotBefore: {Scope: variable.ScopeGlobal | variable.ScopeSession, Value: ""}, + upTime: {Scope: variable.ScopeGlobal, Value: 0}, } // GetScope gets the status variables scope. @@ -57,5 +61,15 @@ func (s *Server) Stats(vars *variable.SessionVars) (map[string]interface{}, erro } } } + + var err error + info := serverInfo{} + info.ServerInfo, err = infosync.GetServerInfo() + if err != nil { + logutil.BgLogger().Error("Failed to get ServerInfo for uptime status", zap.Error(err)) + } else { + m[upTime] = int64(time.Since(time.Unix(info.ServerInfo.StartTimestamp, 0)).Seconds()) + } + return m, nil } diff --git a/server/stat_test.go b/server/stat_test.go new file mode 100644 index 0000000000000..88fc2e0081a43 --- /dev/null +++ b/server/stat_test.go @@ -0,0 +1,61 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore" + "github.com/stretchr/testify/require" +) + +func TestUptime(t *testing.T) { + var err error + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockServerInfo", "return(true)")) + defer func() { + err := failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockServerInfo") + require.NoError(t, err) + }() + + store, err := mockstore.NewMockStore() + require.NoError(t, err) + + dom, err := session.BootstrapSession(store) + defer func() { + dom.Close() + err := store.Close() + require.NoError(t, err) + }() + require.NoError(t, err) + + _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + require.NoError(t, err) + + tidbdrv := NewTiDBDriver(store) + cfg := newTestConfig() + cfg.Socket = "" + server, err := NewServer(cfg, tidbdrv) + require.NoError(t, err) + + stats, err := server.Stats(nil) + require.NoError(t, err) + require.GreaterOrEqual(t, stats[upTime].(int64), int64(time.Since(time.Unix(1282967700, 0)).Seconds())) +} From e12342b49446913ce392ef9fb3172f0d008e45a1 Mon Sep 17 00:00:00 2001 From: fengou1 <85682690+fengou1@users.noreply.github.com> Date: Tue, 21 Dec 2021 18:09:46 +0800 Subject: [PATCH 07/11] br: error log optimization (#29640) close pingcap/tidb#27015 --- br/pkg/backup/client.go | 8 +++++++- br/pkg/summary/collector.go | 11 ++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 12a4344a432fe..7a4b1e0e8eb66 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -470,7 +470,13 @@ func (bc *Client) BackupRanges( elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) err := bc.BackupRange(elctx, sk, ek, req, metaWriter, progressCallBack) if err != nil { - return errors.Trace(err) + // The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear) + if errors.Cause(err) == context.Canceled { + return errors.SuspendStack(err) + } else { + return errors.Trace(err) + } + } return nil }) diff --git a/br/pkg/summary/collector.go b/br/pkg/summary/collector.go index 5493f82f77967..6c82bf54fba25 100644 --- a/br/pkg/summary/collector.go +++ b/br/pkg/summary/collector.go @@ -3,11 +3,13 @@ package summary import ( + "context" "strings" "sync" "time" "github.com/docker/go-units" + berror "github.com/pingcap/errors" "github.com/pingcap/log" "go.uber.org/zap" ) @@ -188,9 +190,16 @@ func (tc *logCollector) Summary(name string) { } if len(tc.failureReasons) != 0 || !tc.successStatus { + var canceledUnits int for unitName, reason := range tc.failureReasons { - logFields = append(logFields, zap.String("unit-name", unitName), zap.Error(reason)) + if berror.Cause(reason) != context.Canceled { + logFields = append(logFields, zap.String("unit-name", unitName), zap.Error(reason)) + } else { + canceledUnits++ + } } + // only print total number of cancel unit + log.Info("units canceled", zap.Int("cancel-unit", canceledUnits)) tc.log(name+" failed summary", logFields...) return } From 416617eb9cf633bfc964370ea19cc4fc949fa71b Mon Sep 17 00:00:00 2001 From: wjHuang Date: Tue, 21 Dec 2021 20:47:46 +0800 Subject: [PATCH 08/11] planner: fix wrong collation when rewrite in condition (#30492) close pingcap/tidb#30486 --- expression/integration_serial_test.go | 10 +++++ planner/core/expression_rewriter.go | 60 +++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/expression/integration_serial_test.go b/expression/integration_serial_test.go index 0665f2b2082ba..3077e2f1b33a7 100644 --- a/expression/integration_serial_test.go +++ b/expression/integration_serial_test.go @@ -175,6 +175,16 @@ func TestCollationBasic(t *testing.T) { tk.MustQuery("select * from t1 where col1 >= 0xc484 and col1 <= 0xc3b3;").Check(testkit.Rows("Ȇ")) tk.MustQuery("select collation(IF('a' < 'B' collate utf8mb4_general_ci, 'smaller', 'greater' collate utf8mb4_unicode_ci));").Check(testkit.Rows("utf8mb4_unicode_ci")) + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a char(10))") + tk.MustExec("insert into t values ('a')") + tk.MustQuery("select * from t where a in ('b' collate utf8mb4_general_ci, 'A', 3)").Check(testkit.Rows("a")) + // These test cases may not the same as MySQL, but it's more reasonable. + tk.MustQuery("select ('a', 'a') in (('A' collate utf8mb4_general_ci, 'A' collate utf8mb4_general_ci));").Check(testkit.Rows("1")) + tk.MustQuery("select ('a', 'a') in (('A' collate utf8mb4_general_ci, 'A' collate utf8mb4_bin));").Check(testkit.Rows("0")) + tk.MustQuery("select ('a', 'a') in (('A' collate utf8mb4_general_ci, 'A' collate utf8mb4_general_ci), ('b', 'b'));").Check(testkit.Rows("1")) + tk.MustQuery("select ('a', 'a') in (('A' collate utf8mb4_general_ci, 'A' collate utf8mb4_bin), ('b', 'b'));").Check(testkit.Rows("0")) } func TestWeightString(t *testing.T) { diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index 7dcadc13f4005..08e0262613cb9 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -1492,6 +1492,12 @@ func (er *expressionRewriter) inToExpression(lLen int, not bool, tp *types.Field if allSameType && l == 1 && lLen > 1 { function = er.notToExpression(not, ast.In, tp, er.ctxStack[stkLen-lLen-1:]...) } else { + // If we rewrite IN to EQ, we need to decide what's the collation EQ uses. + coll := er.deriveCollationForIn(l, lLen, stkLen, args) + if er.err != nil { + return + } + er.castCollationForIn(l, lLen, stkLen, coll) eqFunctions := make([]expression.Expression, 0, lLen) for i := stkLen - lLen; i < stkLen; i++ { expr, err := er.constructBinaryOpFunction(args[0], er.ctxStack[i], ast.EQ) @@ -1515,6 +1521,60 @@ func (er *expressionRewriter) inToExpression(lLen int, not bool, tp *types.Field er.ctxStackAppend(function, types.EmptyName) } +// deriveCollationForIn derives collation for in expression. +func (er *expressionRewriter) deriveCollationForIn(colLen int, elemCnt int, stkLen int, args []expression.Expression) []*expression.ExprCollation { + coll := make([]*expression.ExprCollation, 0, colLen) + if colLen == 1 { + // a in (x, y, z) => coll[0] + coll2, err := expression.CheckAndDeriveCollationFromExprs(er.sctx, "IN", types.ETInt, args...) + er.err = err + if er.err != nil { + return nil + } + coll = append(coll, coll2) + } else { + // (a, b, c) in ((x1, x2, x3), (y1, y2, y3), (z1, z2, z3)) => coll[0], coll[1], coll[2] + for i := 0; i < colLen; i++ { + args := make([]expression.Expression, 0, elemCnt) + for j := stkLen - elemCnt - 1; j < stkLen; j++ { + rowFunc, _ := er.ctxStack[j].(*expression.ScalarFunction) + args = append(args, rowFunc.GetArgs()[i]) + } + coll2, err := expression.CheckAndDeriveCollationFromExprs(er.sctx, "IN", types.ETInt, args...) + er.err = err + if er.err != nil { + return nil + } + coll = append(coll, coll2) + } + } + return coll +} + +// castCollationForIn casts collation info for arguments in the `in clause` to make sure the used collation is correct after we +// rewrite it to equal expression. +func (er *expressionRewriter) castCollationForIn(colLen int, elemCnt int, stkLen int, coll []*expression.ExprCollation) { + for i := stkLen - elemCnt; i < stkLen; i++ { + if colLen == 1 && er.ctxStack[i].GetType().EvalType() == types.ETString { + tp := er.ctxStack[i].GetType().Clone() + tp.Charset, tp.Collate = coll[0].Charset, coll[0].Collation + er.ctxStack[i] = expression.BuildCastFunction(er.sctx, er.ctxStack[i], tp) + er.ctxStack[i].SetCoercibility(expression.CoercibilityExplicit) + } else { + rowFunc, _ := er.ctxStack[i].(*expression.ScalarFunction) + for j := 0; j < colLen; j++ { + if er.ctxStack[i].GetType().EvalType() != types.ETString { + continue + } + tp := rowFunc.GetArgs()[j].GetType().Clone() + tp.Charset, tp.Collate = coll[j].Charset, coll[j].Collation + rowFunc.GetArgs()[j] = expression.BuildCastFunction(er.sctx, rowFunc.GetArgs()[j], tp) + rowFunc.GetArgs()[j].SetCoercibility(expression.CoercibilityExplicit) + } + } + } +} + func (er *expressionRewriter) caseToExpression(v *ast.CaseExpr) { stkLen := len(er.ctxStack) argsLen := 2 * len(v.WhenClauses) From 63d23f8aaa3fb5f748325a6debf023d2470b225a Mon Sep 17 00:00:00 2001 From: db <39407623+IcePigZDB@users.noreply.github.com> Date: Tue, 21 Dec 2021 22:49:46 +0800 Subject: [PATCH 09/11] planner: add extractor for tikv_region_peers (#30656) --- planner/core/logical_plan_builder.go | 2 + planner/core/memtable_predicate_extractor.go | 53 +++++++ .../core/memtable_predicate_extractor_test.go | 132 ++++++++++++++++++ 3 files changed, 187 insertions(+) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 5df63d975950a..bd7ae44d36cdf 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4348,6 +4348,8 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table p.Extractor = &TiFlashSystemTableExtractor{} case infoschema.TableStatementsSummary, infoschema.TableStatementsSummaryHistory: p.Extractor = &StatementsSummaryExtractor{} + case infoschema.TableTiKVRegionPeers: + p.Extractor = &TikvRegionPeersExtractor{} } } return p, nil diff --git a/planner/core/memtable_predicate_extractor.go b/planner/core/memtable_predicate_extractor.go index 923d025ac7d13..57b53a75cc080 100644 --- a/planner/core/memtable_predicate_extractor.go +++ b/planner/core/memtable_predicate_extractor.go @@ -1415,3 +1415,56 @@ func (e *StatementsSummaryExtractor) explainInfo(p *PhysicalMemTable) string { } return fmt.Sprintf("digests: [%s]", extractStringFromStringSet(e.Digests)) } + +// TikvRegionPeersExtractor is used to extract some predicates of cluster table. +type TikvRegionPeersExtractor struct { + extractHelper + + // SkipRequest means the where clause always false, we don't need to request any component + SkipRequest bool + + // RegionIDs/StoreIDs represents all region/store ids we should filter in PD to reduce network IO. + // e.g: + // 1. SELECT * FROM tikv_region_peers WHERE region_id=1 + // 2. SELECT * FROM tikv_region_peers WHERE table_id in (11, 22) + RegionIDs []uint64 + StoreIDs []uint64 +} + +// Extract implements the MemTablePredicateExtractor Extract interface +func (e *TikvRegionPeersExtractor) Extract(_ sessionctx.Context, + schema *expression.Schema, + names []*types.FieldName, + predicates []expression.Expression, +) []expression.Expression { + // Extract the `region_id/store_id` columns. + remained, regionIDSkipRequest, regionIDs := e.extractCol(schema, names, predicates, "region_id", false) + remained, storeIDSkipRequest, storeIDs := e.extractCol(schema, names, remained, "store_id", false) + e.RegionIDs, e.StoreIDs = e.parseUint64(regionIDs), e.parseUint64(storeIDs) + + e.SkipRequest = regionIDSkipRequest || storeIDSkipRequest + if e.SkipRequest { + return nil + } + + return remained +} + +func (e *TikvRegionPeersExtractor) explainInfo(p *PhysicalMemTable) string { + if e.SkipRequest { + return "skip_request:true" + } + r := new(bytes.Buffer) + if len(e.RegionIDs) > 0 { + r.WriteString(fmt.Sprintf("region_ids:[%s], ", extractStringFromUint64Slice(e.RegionIDs))) + } + if len(e.StoreIDs) > 0 { + r.WriteString(fmt.Sprintf("store_ids:[%s], ", extractStringFromUint64Slice(e.StoreIDs))) + } + // remove the last ", " in the message info + s := r.String() + if len(s) > 2 { + return s[:len(s)-2] + } + return s +} diff --git a/planner/core/memtable_predicate_extractor_test.go b/planner/core/memtable_predicate_extractor_test.go index 311b9ae4a8838..d932176b9f2eb 100644 --- a/planner/core/memtable_predicate_extractor_test.go +++ b/planner/core/memtable_predicate_extractor_test.go @@ -1420,3 +1420,135 @@ func (s *extractorSuite) TestTiDBHotRegionsHistoryTableExtractor(c *C) { } } } + +func (s *extractorSuite) TestTikvRegionPeersExtractor(c *C) { + se, err := session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + + var cases = []struct { + sql string + regionIDs, storeIDs []uint64 + skipRequest bool + }{ + // Test `region_id`, `store_id` columns. + { + sql: "select * from information_schema.tikv_region_peers where region_id=100", + regionIDs: []uint64{100}, + }, + { + sql: "select * from information_schema.tikv_region_peers where 100=region_id", + regionIDs: []uint64{100}, + }, + { + sql: "select * from information_schema.tikv_region_peers where 100=region_id or region_id=101", + regionIDs: []uint64{100, 101}, + }, + { + sql: "select * from information_schema.tikv_region_peers where 100=region_id or region_id=101 or region_id=102 or 103 = region_id", + regionIDs: []uint64{100, 101, 102, 103}, + }, + { + sql: "select * from information_schema.tikv_region_peers where (region_id=100 or region_id=101) and (store_id=200 or store_id=201)", + regionIDs: []uint64{100, 101}, + storeIDs: []uint64{200, 201}, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id in (100, 101)", + regionIDs: []uint64{100, 101}, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id in (100, 101) and store_id=200", + regionIDs: []uint64{100, 101}, + storeIDs: []uint64{200}, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id in (100, 101) and store_id in (200, 201)", + regionIDs: []uint64{100, 101}, + storeIDs: []uint64{200, 201}, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id=100 and store_id in (200, 201)", + regionIDs: []uint64{100}, + storeIDs: []uint64{200, 201}, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id=100 and store_id=200", + regionIDs: []uint64{100}, + storeIDs: []uint64{200}, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id=100 and region_id=101", + skipRequest: true, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id=100 and region_id in (100,101)", + regionIDs: []uint64{100}, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id=100 and region_id in (100,101) and store_id=200 and store_id in (200,201)", + regionIDs: []uint64{100}, + storeIDs: []uint64{200}, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id=100 and region_id in (101,102)", + skipRequest: true, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id=100 and region_id in (101,102) and store_id=200 and store_id in (200,201)", + skipRequest: true, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id=100 and region_id in (100,101) and store_id=200 and store_id in (201,202)", + skipRequest: true, + }, + { + sql: `select * from information_schema.tikv_region_peers + where region_id=100 and region_id in (100,101) + and store_id=200 and store_id in (201,202)`, + skipRequest: true, + }, + { + sql: "select * from information_schema.tikv_region_peers where region_id in (100,101) and region_id in (101,102)", + regionIDs: []uint64{101}, + }, + { + sql: `select * from information_schema.tikv_region_peers + where region_id in (100,101) + and region_id in (101,102) + and store_id in (200,201) + and store_id in (201,202)`, + regionIDs: []uint64{101}, + storeIDs: []uint64{201}, + }, + { + sql: `select * from information_schema.tikv_region_peers + where region_id in (100,101) + and region_id in (100,102) + and region_id in (102,103) + and region_id in (103,104)`, + skipRequest: true, + }, + // Test columns that is not extracted by TikvRegionPeersExtractor + { + sql: `select * from information_schema.tikv_region_peers + where peer_id=100 + and is_learner=0 + and is_leader=1 + and status='NORMAL' + and down_seconds=1000`, + }, + } + parser := parser.New() + for _, ca := range cases { + logicalMemTable := s.getLogicalMemTable(c, se, parser, ca.sql) + c.Assert(logicalMemTable.Extractor, NotNil) + + tikvRegionPeersExtractor := logicalMemTable.Extractor.(*plannercore.TikvRegionPeersExtractor) + if len(ca.regionIDs) > 0 { + c.Assert(tikvRegionPeersExtractor.RegionIDs, DeepEquals, ca.regionIDs, Commentf("SQL: %v", ca.sql)) + } + if len(ca.storeIDs) > 0 { + c.Assert(tikvRegionPeersExtractor.StoreIDs, DeepEquals, ca.storeIDs, Commentf("SQL: %v", ca.sql)) + } + } +} From 393415782452903b417a539d7fc8cdcecc6ab1bf Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 22 Dec 2021 10:17:47 +0800 Subject: [PATCH 10/11] lightning: add back table empty check and add a switch config (#30887) close pingcap/tidb#27919 --- br/pkg/lightning/config/config.go | 1 + br/pkg/lightning/restore/check_info.go | 91 +++++++++++- br/pkg/lightning/restore/check_info_test.go | 139 ++++++++++++++++++ br/pkg/lightning/restore/meta_manager.go | 58 +++++++- br/pkg/lightning/restore/restore.go | 20 ++- .../lightning_distributed_import/config.toml | 1 + .../config1.toml | 1 + .../config2.toml | 1 + br/tests/lightning_incremental/config.toml | 2 + br/tests/lightning_local_backend/run.sh | 17 ++- .../data/rowid.pre_rebase-schema.sql | 2 +- br/tests/lightning_tidb_rowid/run.sh | 9 +- 12 files changed, 325 insertions(+), 17 deletions(-) diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index d080d1bad16cf..cac695801a64a 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -527,6 +527,7 @@ type TikvImporter struct { DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"` + IncrementalImport bool `toml:"incremental-import" json:"incremental-import"` EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index f97afc33b7cd0..4da674e1cd40a 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -17,6 +17,7 @@ package restore import ( "bytes" "context" + "database/sql" "fmt" "io" "path/filepath" @@ -24,10 +25,15 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "modernc.org/mathutil" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" @@ -38,6 +44,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" @@ -45,9 +52,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/tikv/pd/server/api" pdconfig "github.com/tikv/pd/server/config" - - "go.uber.org/zap" - "modernc.org/mathutil" ) const ( @@ -1053,3 +1057,84 @@ outloop: log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered)) return nil } + +func (rc *Controller) checkTableEmpty(ctx context.Context) error { + if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport { + return nil + } + db, _ := rc.tidbGlue.GetDB() + + tableCount := 0 + for _, db := range rc.dbMetas { + tableCount += len(db.Tables) + } + + var lock sync.Mutex + tableNames := make([]string, 0) + concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency) + ch := make(chan string, concurrency) + eg, gCtx := errgroup.WithContext(ctx) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for tblName := range ch { + // skip tables that have checkpoint + if rc.cfg.Checkpoint.Enable { + _, err := rc.checkpointsDB.Get(gCtx, tblName) + switch { + case err == nil: + continue + case errors.IsNotFound(err): + default: + return errors.Trace(err) + } + } + + hasData, err1 := tableContainsData(gCtx, db, tblName) + if err1 != nil { + return err1 + } + if hasData { + lock.Lock() + tableNames = append(tableNames, tblName) + lock.Unlock() + } + } + return nil + }) + } + for _, db := range rc.dbMetas { + for _, tbl := range db.Tables { + ch <- common.UniqueTable(tbl.DB, tbl.Name) + } + } + close(ch) + if err := eg.Wait(); err != nil { + if common.IsContextCanceledError(err) { + return nil + } + return errors.Trace(err) + } + + if len(tableNames) > 0 { + // sort the failed names + sort.Strings(tableNames) + msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", ")) + rc.checkTemplate.Collect(Critical, false, msg) + } + return nil +} + +func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) { + query := "select 1 from " + tableName + " limit 1" + var dump int + err := db.QueryRowContext(ctx, query).Scan(&dump) + + switch { + case err == sql.ErrNoRows: + return false, nil + case err != nil: + return false, errors.Trace(err) + default: + return true, nil + } +} diff --git a/br/pkg/lightning/restore/check_info_test.go b/br/pkg/lightning/restore/check_info_test.go index ccc4aa74c0c28..c679298f6a612 100644 --- a/br/pkg/lightning/restore/check_info_test.go +++ b/br/pkg/lightning/restore/check_info_test.go @@ -16,15 +16,18 @@ package restore import ( "context" + "database/sql" "fmt" "os" "path/filepath" + "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/glue" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/br/pkg/storage" @@ -404,6 +407,142 @@ func (s *checkInfoSuite) TestCheckCSVHeader(c *C) { } } +func (s *checkInfoSuite) TestCheckTableEmpty(c *C) { + dir := c.MkDir() + cfg := config.NewConfig() + cfg.Checkpoint.Enable = false + dbMetas := []*mydump.MDDatabaseMeta{ + { + Name: "test1", + Tables: []*mydump.MDTableMeta{ + { + DB: "test1", + Name: "tbl1", + }, + { + DB: "test1", + Name: "tbl2", + }, + }, + }, + { + Name: "test2", + Tables: []*mydump.MDTableMeta{ + { + DB: "test2", + Name: "tbl1", + }, + }, + }, + } + + rc := &Controller{ + cfg: cfg, + dbMetas: dbMetas, + checkpointsDB: checkpoints.NewNullCheckpointsDB(), + } + + ctx := context.Background() + + // test tidb will do nothing + rc.cfg.TikvImporter.Backend = config.BackendTiDB + err := rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + + // test incremental mode + rc.cfg.TikvImporter.Backend = config.BackendLocal + rc.cfg.TikvImporter.IncrementalImport = true + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + + rc.cfg.TikvImporter.IncrementalImport = false + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + mock.MatchExpectationsInOrder(false) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + // not error, need not to init check template + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + + // single table contains data + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.MatchExpectationsInOrder(false) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + rc.checkTemplate = NewSimpleTemplate() + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + tmpl := rc.checkTemplate.(*SimpleTemplate) + c.Assert(len(tmpl.criticalMsgs), Equals, 1) + c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test2`.`tbl1`\\] are not empty") + + // multi tables contains data + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.MatchExpectationsInOrder(false) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + rc.checkTemplate = NewSimpleTemplate() + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + tmpl = rc.checkTemplate.(*SimpleTemplate) + c.Assert(len(tmpl.criticalMsgs), Equals, 1) + c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test1`.`tbl1`, `test2`.`tbl1`\\] are not empty") + + // init checkpoint with only two of the three tables + dbInfos := map[string]*checkpoints.TidbDBInfo{ + "test1": { + Name: "test1", + Tables: map[string]*checkpoints.TidbTableInfo{ + "tbl1": { + Name: "tbl1", + }, + }, + }, + "test2": { + Name: "test2", + Tables: map[string]*checkpoints.TidbTableInfo{ + "tbl1": { + Name: "tbl1", + }, + }, + }, + } + rc.cfg.Checkpoint.Enable = true + rc.checkpointsDB = checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb")) + err = rc.checkpointsDB.Initialize(ctx, cfg, dbInfos) + c.Check(err, IsNil) + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + // only need to check the one that is not in checkpoint + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + func (s *checkInfoSuite) TestLocalResource(c *C) { dir := c.MkDir() mockStore, err := storage.NewLocalStorage(dir) diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 544b91c0b5f90..49358a9aee102 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -1027,9 +1027,65 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum } func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) { - return false, false, nil, nil + return true, true, &verify.KVChecksum{}, nil } func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { return nil } + +type singleMgrBuilder struct{} + +func (b singleMgrBuilder) Init(context.Context) error { + return nil +} + +func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { + return &singleTaskMetaMgr{ + pd: pd, + } +} + +func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { + return noopTableMetaMgr{} +} + +type singleTaskMetaMgr struct { + pd *pdutil.PdController +} + +func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error { + return nil +} + +func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { + _, err := action(nil) + return err +} + +func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { + return m.pd.RemoveSchedulers(ctx) +} + +func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { + return true, nil +} + +func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) { + return true, true, nil +} + +func (m *singleTaskMetaMgr) Cleanup(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) CleanupTask(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) Close() { +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 82a8465eb8181..79f132b1cf5f6 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -379,14 +379,17 @@ func NewRestoreControllerWithPauser( } var metaBuilder metaMgrBuilder - switch cfg.TikvImporter.Backend { - case config.BackendLocal, config.BackendImporter: + isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal || cfg.TikvImporter.Backend == config.BackendImporter + switch { + case isSSTImport && cfg.TikvImporter.IncrementalImport: metaBuilder = &dbMetaMgrBuilder{ db: db, taskID: cfg.TaskID, schema: cfg.App.MetaSchemaName, needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff, } + case isSSTImport: + metaBuilder = singleMgrBuilder{} default: metaBuilder = noopMetaMgrBuilder{} } @@ -1967,11 +1970,6 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } } } - err = rc.checkCSVHeader(ctx, rc.dbMetas) - if err != nil { - return err - } - if len(checkPointCriticalMsgs) != 0 { rc.checkTemplate.Collect(Critical, false, strings.Join(checkPointCriticalMsgs, "\n")) } else { @@ -1982,6 +1980,14 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } else { rc.checkTemplate.Collect(Critical, true, "table schemas are valid") } + + if err := rc.checkTableEmpty(ctx); err != nil { + return errors.Trace(err) + } + if err = rc.checkCSVHeader(ctx, rc.dbMetas); err != nil { + return err + } + return nil } diff --git a/br/tests/lightning_distributed_import/config.toml b/br/tests/lightning_distributed_import/config.toml index 200af8e45dfdc..947b16037dd5d 100644 --- a/br/tests/lightning_distributed_import/config.toml +++ b/br/tests/lightning_distributed_import/config.toml @@ -1,6 +1,7 @@ [tikv-importer] backend = 'local' duplicate-resolution = 'none' +incremental-import = true [post-restore] checksum = "required" diff --git a/br/tests/lightning_duplicate_detection/config1.toml b/br/tests/lightning_duplicate_detection/config1.toml index 0b2b6df2a70e8..6497e9e30949b 100644 --- a/br/tests/lightning_duplicate_detection/config1.toml +++ b/br/tests/lightning_duplicate_detection/config1.toml @@ -6,6 +6,7 @@ table-concurrency = 10 [tikv-importer] backend = "local" duplicate-resolution = 'record' +incremental-import = true [checkpoint] enable = true diff --git a/br/tests/lightning_duplicate_detection/config2.toml b/br/tests/lightning_duplicate_detection/config2.toml index e978ffb9cd8b5..760f50168508a 100644 --- a/br/tests/lightning_duplicate_detection/config2.toml +++ b/br/tests/lightning_duplicate_detection/config2.toml @@ -6,6 +6,7 @@ table-concurrency = 10 [tikv-importer] backend = "local" duplicate-resolution = 'record' +incremental-import = true [checkpoint] enable = true diff --git a/br/tests/lightning_incremental/config.toml b/br/tests/lightning_incremental/config.toml index e69de29bb2d1d..761e60b91b804 100644 --- a/br/tests/lightning_incremental/config.toml +++ b/br/tests/lightning_incremental/config.toml @@ -0,0 +1,2 @@ +[tikv-importer] +incremental-import = true diff --git a/br/tests/lightning_local_backend/run.sh b/br/tests/lightning_local_backend/run.sh index 6d0e7e9864145..5843210fea738 100755 --- a/br/tests/lightning_local_backend/run.sh +++ b/br/tests/lightning_local_backend/run.sh @@ -20,12 +20,23 @@ check_cluster_version 4 0 0 'local backend' || exit 0 ENGINE_COUNT=6 -# First, verify that inject with not leader error is fine. -rm -f "$TEST_DIR/lightning-local.log" +# Test check table contains data rm -f "/tmp/tidb_lightning_checkpoint_local_backend_test.pb" +rm -rf $TEST_DIR/lightning.log run_sql 'DROP DATABASE IF EXISTS cpeng;' -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")' +run_sql 'CREATE DATABASE cpeng;' +run_sql 'CREATE TABLE cpeng.a (c int);' +run_sql 'CREATE TABLE cpeng.b (c int);' +run_sql "INSERT INTO cpeng.a values (1), (2);" +run_sql "INSERT INTO cpeng.b values (3);" +! run_lightning --backend local --enable-checkpoint=0 +grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning.log + +# First, verify that inject with not leader error is fine. +export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")' +rm -f "$TEST_DIR/lightning-local.log" +run_sql 'DROP DATABASE IF EXISTS cpeng;' run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml" # Check that everything is correctly imported diff --git a/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql index 887540be58110..1738b64457de6 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql @@ -1 +1 @@ -create table pre_rebase (pk varchar(6) primary key) auto_increment=70000; +create table pre_rebase (pk varchar(6) primary key /*T![clustered_index] NONCLUSTERED */) auto_increment=70000; diff --git a/br/tests/lightning_tidb_rowid/run.sh b/br/tests/lightning_tidb_rowid/run.sh index e877f420cf43f..ae762c514d93c 100755 --- a/br/tests/lightning_tidb_rowid/run.sh +++ b/br/tests/lightning_tidb_rowid/run.sh @@ -58,8 +58,13 @@ for BACKEND in local importer tidb; do run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase' check_contains 'count(*): 1' - check_contains 'min(_tidb_rowid): 70000' - check_contains 'max(_tidb_rowid): 70000' + if [ "$BACKEND" == 'tidb' ]; then + check_contains 'min(_tidb_rowid): 70000' + check_contains 'max(_tidb_rowid): 70000' + else + check_contains 'min(_tidb_rowid): 1' + check_contains 'max(_tidb_rowid): 1' + fi run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")' run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"' check_contains '_tidb_rowid > 70000: 1' From 3bd732f9c4731c7617bf84216a2f3fcfddf845ae Mon Sep 17 00:00:00 2001 From: fengou1 <85682690+fengou1@users.noreply.github.com> Date: Wed, 22 Dec 2021 11:19:47 +0800 Subject: [PATCH 11/11] br: improve backoff unit test (#30892) --- br/pkg/utils/backoff_test.go | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 9ee312f24feab..b0c0f640ab677 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -123,3 +123,43 @@ func TestPdBackoffWithRetryableError(t *testing.T) { gRPCError, }, multierr.Errors(err)) } + +func TestNewImportSSTBackofferWithSucess(t *testing.T) { + t.Parallel() + + var counter int + backoffer := utils.NewImportSSTBackoffer() + err := utils.WithRetry(context.Background(), func() error { + defer func() { counter++ }() + if counter == 15 { + return nil + } else { + return berrors.ErrKVDownloadFailed + } + }, backoffer) + require.Equal(t, 16, counter) + require.Nil(t, err) +} + +func TestNewDownloadSSTBackofferWithCancel(t *testing.T) { + t.Parallel() + + var counter int + backoffer := utils.NewDownloadSSTBackoffer() + err := utils.WithRetry(context.Background(), func() error { + defer func() { counter++ }() + if counter == 3 { + return context.Canceled + } else { + return berrors.ErrKVIngestFailed + } + + }, backoffer) + require.Equal(t, 4, counter) + require.Equal(t, []error{ + berrors.ErrKVIngestFailed, + berrors.ErrKVIngestFailed, + berrors.ErrKVIngestFailed, + context.Canceled, + }, multierr.Errors(err)) +}