From 14d71e0a680c9ca11f42ae7f1db2cafb7179266e Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 15 Oct 2020 11:31:13 +0800 Subject: [PATCH] execute: add rpc runtime stats information for insert/update/replace statement (#19334) (#20430) --- executor/batch_point_get.go | 4 ++-- executor/executor_test.go | 1 + executor/insert.go | 7 +++++++ executor/insert_common.go | 25 +++++++++++++++++++++++++ executor/point_get.go | 8 ++++---- executor/replace.go | 7 +++++++ executor/update.go | 30 ++++++++++++++++++++++++++++++ 7 files changed, 76 insertions(+), 6 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index b2b8ff82a9594..98df02e091349 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -62,7 +62,7 @@ type BatchPointGetExec struct { virtualColumnRetFieldTypes []*types.FieldType snapshot kv.Snapshot - stats *pointGetRuntimeStats + stats *runtimeStatsWithSnapshot } // buildVirtualColumnInfo saves virtual column indices and sort them in definition order @@ -144,7 +144,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { } if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &pointGetRuntimeStats{ + e.stats = &runtimeStatsWithSnapshot{ BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, } diff --git a/executor/executor_test.go b/executor/executor_test.go index a04c80a8795c5..f233d63df9abf 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6040,6 +6040,7 @@ func (s *testSuite) TestCollectDMLRuntimeStats(c *C) { } for _, sql := range testSQLs { tk.MustExec(sql) + c.Assert(getRootStats(), Matches, "time.*loops.*Get.*num_rpc.*total_time.*") } // Test for lock keys stats. diff --git a/executor/insert.go b/executor/insert.go index 26655f803a542..22ff24bd15e78 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -188,6 +188,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D return err } + if e.collectRuntimeStatsEnabled() { + if snapshot := txn.GetSnapshot(); snapshot != nil { + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) + } + } + // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { diff --git a/executor/insert_common.go b/executor/insert_common.go index abd0fcb8c4ebf..7fb3d51597602 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" @@ -72,6 +73,8 @@ type InsertValues struct { // https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html lazyFillAutoID bool memTracker *memory.Tracker + + stats *runtimeStatsWithSnapshot } type defaultVal struct { @@ -929,6 +932,21 @@ func (e *InsertValues) handleWarning(err error) { sc.AppendWarning(err) } +func (e *InsertValues) collectRuntimeStatsEnabled() bool { + if e.runtimeStats != nil { + if e.stats == nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &runtimeStatsWithSnapshot{ + BasicRuntimeStats: e.runtimeStats, + SnapshotRuntimeStats: snapshotStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } + return true + } + return false +} + // batchCheckAndInsert checks rows with duplicate errors. // All duplicate rows will be ignored and appended as duplicate warnings. func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error { @@ -946,6 +964,13 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D return err } + if e.collectRuntimeStatsEnabled() { + if snapshot := txn.GetSnapshot(); snapshot != nil { + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) + } + } + // Fill cache using BatchGet, the following Get requests don't need to visit TiKV. if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil { return err diff --git a/executor/point_get.go b/executor/point_get.go index 20e2b24988d51..44c5eb64924e6 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -84,7 +84,7 @@ type PointGetExecutor struct { // virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns. virtualColumnRetFieldTypes []*types.FieldType - stats *pointGetRuntimeStats + stats *runtimeStatsWithSnapshot } // Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field @@ -157,7 +157,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { } if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &pointGetRuntimeStats{ + e.stats = &runtimeStatsWithSnapshot{ BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, } @@ -420,12 +420,12 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo { return nil } -type pointGetRuntimeStats struct { +type runtimeStatsWithSnapshot struct { *execdetails.BasicRuntimeStats *tikv.SnapshotRuntimeStats } -func (e *pointGetRuntimeStats) String() string { +func (e *runtimeStatsWithSnapshot) String() string { var basic, rpcStatsStr string if e.BasicRuntimeStats != nil { basic = e.BasicRuntimeStats.String() diff --git a/executor/replace.go b/executor/replace.go index 7baa330442096..a3f798a524aaa 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -195,6 +195,13 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { } txnSize := txn.Size() + if e.collectRuntimeStatsEnabled() { + if snapshot := txn.GetSnapshot(); snapshot != nil { + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) + } + } + // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { diff --git a/executor/update.go b/executor/update.go index ce1d2c443ee6d..ca6eaf7cf6267 100644 --- a/executor/update.go +++ b/executor/update.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -48,6 +49,8 @@ type UpdateExec struct { allAssignmentsAreConstant bool drained bool memTracker *memory.Tracker + + stats *runtimeStatsWithSnapshot } func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, newData []types.Datum) error { @@ -166,6 +169,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { } memUsageOfChk = chk.MemoryUsage() e.memTracker.Consume(memUsageOfChk) + if e.collectRuntimeStatsEnabled() { + txn, err := e.ctx.Txn(false) + if err == nil && txn.GetSnapshot() != nil { + txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + } + } for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { chunkRow := chk.GetRow(rowIdx) datumRow := chunkRow.GetDatumRow(fields) @@ -258,6 +267,12 @@ func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum, cols []*tab // Close implements the Executor Close interface. func (e *UpdateExec) Close() error { e.setMessage() + if e.runtimeStats != nil && e.stats != nil { + txn, err := e.ctx.Txn(false) + if err == nil && txn.GetSnapshot() != nil { + txn.GetSnapshot().DelOption(kv.CollectRuntimeStats) + } + } return e.children[0].Close() } @@ -278,3 +293,18 @@ func (e *UpdateExec) setMessage() { msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUpdateInfo], numMatched, numChanged, numWarnings) stmtCtx.SetMessage(msg) } + +func (e *UpdateExec) collectRuntimeStatsEnabled() bool { + if e.runtimeStats != nil { + if e.stats == nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &runtimeStatsWithSnapshot{ + BasicRuntimeStats: e.runtimeStats, + SnapshotRuntimeStats: snapshotStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } + return true + } + return false +}