Skip to content

Commit

Permalink
add metrics for TTL (#41155)
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
Co-authored-by: 王超 <[email protected]>
  • Loading branch information
3 people authored Feb 9, 2023
1 parent a0e6ccb commit 61e1d81
Show file tree
Hide file tree
Showing 11 changed files with 579 additions and 114 deletions.
600 changes: 489 additions & 111 deletions metrics/grafana/tidb.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLJobStatus)
prometheus.MustRegister(TTLTaskStatus)
prometheus.MustRegister(TTLPhaseTime)
prometheus.MustRegister(TTLInsertRowsCount)

prometheus.MustRegister(EMACPUUsageGauge)
prometheus.MustRegister(PoolConcurrencyCounter)
Expand Down
8 changes: 8 additions & 0 deletions metrics/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,12 @@ var (
Name: "ttl_phase_time",
Help: "The time spent in each phase",
}, []string{LblType, LblPhase})

TTLInsertRowsCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "ttl_insert_rows",
Help: "The count of TTL rows inserted",
})
)
3 changes: 3 additions & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ go_test(
"//expression",
"//kv",
"//meta",
"//metrics",
"//parser/ast",
"//parser/auth",
"//parser/model",
Expand All @@ -162,6 +163,8 @@ go_test(
"//util/sqlexec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
3 changes: 3 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,9 @@ func (s *session) CommitTxn(ctx context.Context) error {
s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail)
}

// record the TTLInsertRows in the metric
metrics.TTLInsertRowsCount.Add(float64(s.sessionVars.TxnCtx.InsertTTLRowsCount))

failpoint.Inject("keepHistory", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(err)
Expand Down
49 changes: 49 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"testing"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -89,3 +92,49 @@ func TestMetaTableRegion(t *testing.T) {

require.NotEqual(t, ddlBackfillTableRegionID, ddlBackfillHistoryTableRegionID)
}

func MustReadCounter(t *testing.T, m prometheus.Counter) float64 {
pb := &dto.Metric{}
require.NoError(t, m.Write(pb))
return pb.GetCounter().GetValue()
}

func TestRecordTTLRows(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(created_at datetime) TTL = created_at + INTERVAL 1 DAY")
// simple insert should be recorded
tk.MustExec("insert into t values (NOW())")
require.Equal(t, 1.0, MustReadCounter(t, metrics.TTLInsertRowsCount))

// insert in a explicit transaction should be recorded
tk.MustExec("begin")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("commit")
require.Equal(t, 2.0, MustReadCounter(t, metrics.TTLInsertRowsCount))

// insert multiple rows should be the same
tk.MustExec("begin")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("commit")
require.Equal(t, 4.0, MustReadCounter(t, metrics.TTLInsertRowsCount))

// rollback will remove all recorded TTL rows
tk.MustExec("begin")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("rollback")
require.Equal(t, 6.0, MustReadCounter(t, metrics.TTLInsertRowsCount))

// savepoint will save the recorded TTL rows
tk.MustExec("begin")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("savepoint insert1")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("rollback to insert1")
tk.MustExec("commit")
require.Equal(t, 7.0, MustReadCounter(t, metrics.TTLInsertRowsCount))
}
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ type TxnCtxNeedToRestore struct {

// CachedTables is not nil if the transaction write on cached table.
CachedTables map[int64]interface{}

// InsertTTLRowsCount counts how many rows are inserted in this statement
InsertTTLRowsCount int
}

// TxnCtxNoNeedToRestore stores transaction variables which do not need to restored when rolling back to a savepoint.
Expand Down Expand Up @@ -377,6 +380,7 @@ func (tc *TransactionContext) GetCurrentSavepoint() TxnCtxNeedToRestore {
TableDeltaMap: tableDeltaMap,
pessimisticLockCache: pessimisticLockCache,
CachedTables: cachedTables,
InsertTTLRowsCount: tc.InsertTTLRowsCount,
}
}

Expand All @@ -385,6 +389,7 @@ func (tc *TransactionContext) RestoreBySavepoint(savepoint TxnCtxNeedToRestore)
tc.TableDeltaMap = savepoint.TableDeltaMap
tc.pessimisticLockCache = savepoint.pessimisticLockCache
tc.CachedTables = savepoint.CachedTables
tc.InsertTTLRowsCount = savepoint.InsertTTLRowsCount
}

// AddSavepoint adds a new savepoint.
Expand Down
7 changes: 7 additions & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,9 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
return nil, err
}
}
if shouldIncreaseTTLMetricCount(t.meta) {
sctx.GetSessionVars().TxnCtx.InsertTTLRowsCount += 1
}
if sessVars.TxnCtx == nil {
return recordID, nil
}
Expand Down Expand Up @@ -1592,6 +1595,10 @@ func shouldWriteBinlog(ctx sessionctx.Context, tblInfo *model.TableInfo) bool {
return !ctx.GetSessionVars().InRestrictedSQL
}

func shouldIncreaseTTLMetricCount(tblInfo *model.TableInfo) bool {
return tblInfo.TTLInfo != nil
}

func (t *TableCommon) getMutation(ctx sessionctx.Context) *binlog.TableMutation {
return ctx.StmtGetMutation(t.tableID)
}
Expand Down
3 changes: 2 additions & 1 deletion ttl/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ var (
RunningJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "running"})
CancellingJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "cancelling"})

RunningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "running"})
ScanningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "scanning"})
DeletingTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "deleting"})
)

func initWorkerPhases(workerType string) map[string]prometheus.Counter {
Expand Down
12 changes: 11 additions & 1 deletion ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,17 @@ func (m *taskManager) checkInvalidTask(se session.Session) {
}

func (m *taskManager) reportMetrics() {
metrics.RunningTaskCnt.Set(float64(len(m.runningTasks)))
scanningTaskCnt := 0
deletingTaskCnt := 0
for _, task := range m.runningTasks {
if task.result != nil {
scanningTaskCnt += 1
} else {
deletingTaskCnt += 1
}
}
metrics.ScanningTaskCnt.Set(float64(scanningTaskCnt))
metrics.DeletingTaskCnt.Set(float64(deletingTaskCnt))
}

type runningScanTask struct {
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestTaskMetrics(t *testing.T) {

m.ReportMetrics()
out := &dto.Metric{}
require.NoError(t, metrics.RunningTaskCnt.Write(out))
require.NoError(t, metrics.DeletingTaskCnt.Write(out))
require.Equal(t, float64(1), out.GetGauge().GetValue())
}

Expand Down

0 comments on commit 61e1d81

Please sign in to comment.