Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timer,ttl: add some metrics for timer framework and TTL timer syncing #45611

Merged
merged 2 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//parser/terror",
"//timer/metrics",
"//util/logutil",
"//util/mathutil",
"@com_github_pingcap_errors//:errors",
Expand Down
5 changes: 5 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package metrics
import (
"sync"

timermetrics "github.com/pingcap/tidb/timer/metrics"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
Expand Down Expand Up @@ -91,6 +92,7 @@ func InitMetrics() {
InitTelemetryMetrics()
InitTopSQLMetrics()
InitTTLMetrics()
timermetrics.InitTimerMetrics()

PanicCounter = NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -255,6 +257,9 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLPhaseTime)
prometheus.MustRegister(TTLInsertRowsCount)
prometheus.MustRegister(TTLWatermarkDelay)
prometheus.MustRegister(TTLEventCounter)

prometheus.MustRegister(timermetrics.TimerEventCounter)

prometheus.MustRegister(EMACPUUsageGauge)
prometheus.MustRegister(PoolConcurrencyCounter)
Expand Down
17 changes: 17 additions & 0 deletions metrics/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ var (
TTLInsertRowsCount prometheus.Counter

TTLWatermarkDelay *prometheus.GaugeVec

TTLEventCounter *prometheus.CounterVec

TTLSyncTimerCounter prometheus.Counter

TTLFullRefreshTimersCounter prometheus.Counter
)

// InitTTLMetrics initializes ttl metrics.
Expand Down Expand Up @@ -91,4 +97,15 @@ func InitTTLMetrics() {
Name: "ttl_watermark_delay",
Help: "Bucketed delay time in seconds for TTL tables.",
}, []string{LblType, LblName})

TTLEventCounter = NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "ttl_event_count",
Help: "Counter of ttl event.",
}, []string{LblType})

TTLSyncTimerCounter = TTLEventCounter.WithLabelValues("sync_one_timer")
TTLFullRefreshTimersCounter = TTLEventCounter.WithLabelValues("full_refresh_timers")
}
9 changes: 9 additions & 0 deletions timer/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "metrics",
srcs = ["metrics.go"],
importpath = "github.com/pingcap/tidb/timer/metrics",
visibility = ["//visibility:public"],
deps = ["@com_github_prometheus_client_golang//prometheus"],
)
49 changes: 49 additions & 0 deletions timer/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"
)

// Timer metrics
var (
TimerEventCounter *prometheus.CounterVec

TimerFullRefreshCounter prometheus.Counter
TimerPartialRefreshCounter prometheus.Counter
)

// InitTimerMetrics initializes timers metrics.
func InitTimerMetrics() {
TimerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "timer_event_count",
Help: "Counter of timer event.",
}, []string{"scope", "type"})

rtScope := "runtime"
TimerFullRefreshCounter = TimerEventCounter.WithLabelValues(rtScope, "full_refresh_timers")
TimerPartialRefreshCounter = TimerEventCounter.WithLabelValues(rtScope, "partial_refresh_timers")
}

// TimerHookWorkerCounter creates a counter for a hook's event
func TimerHookWorkerCounter(hookClass string, event string) prometheus.Counter {
return TimerEventCounter.WithLabelValues(fmt.Sprintf("hook.%s", hookClass), event)
}
4 changes: 4 additions & 0 deletions timer/runtime/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//timer/api",
"//timer/metrics",
"//util/logutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
],
Expand All @@ -35,6 +37,8 @@ go_test(
deps = [
"//testkit/testsetup",
"//timer/api",
"//timer/metrics",
"//util/mock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//mock",
Expand Down
3 changes: 3 additions & 0 deletions timer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/google/uuid"
"github.com/pingcap/tidb/timer/api"
"github.com/pingcap/tidb/timer/metrics"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -210,6 +211,7 @@ func (rt *TimerGroupRuntime) loop() {
}

func (rt *TimerGroupRuntime) fullRefreshTimers() {
metrics.TimerFullRefreshCounter.Inc()
timers, err := rt.store.List(rt.ctx, rt.cond)
if err != nil {
rt.logger.Error("error occurs when fullRefreshTimers", zap.Error(err))
Expand Down Expand Up @@ -354,6 +356,7 @@ func (rt *TimerGroupRuntime) partialRefreshTimers(timerIDs map[string]struct{})
return false
}

metrics.TimerPartialRefreshCounter.Inc()
cond := rt.buildTimerIDsCond(timerIDs)
timers, err := rt.store.List(rt.ctx, cond)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions timer/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/timer/api"
"github.com/pingcap/tidb/timer/metrics"
mockutil "github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -388,6 +390,14 @@ func TestNextTryTriggerDuration(t *testing.T) {
}

func TestFullRefreshTimers(t *testing.T) {
origFullRefreshCounter := metrics.TimerFullRefreshCounter
defer func() {
metrics.TimerFullRefreshCounter = origFullRefreshCounter
}()

fullRefreshCounter := &mockutil.MetricsCounter{}
metrics.TimerFullRefreshCounter = fullRefreshCounter

mockCore, mockStore := newMockStore()
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")}
Expand Down Expand Up @@ -428,11 +438,14 @@ func TestFullRefreshTimers(t *testing.T) {
t6New.Version++

mockCore.On("List", mock.Anything, runtime.cond).Return(timers[0:], errors.New("mockErr")).Once()
require.Equal(t, float64(0), fullRefreshCounter.Val())
runtime.fullRefreshTimers()
require.Equal(t, float64(1), fullRefreshCounter.Val())
require.Equal(t, 7, len(runtime.cache.items))

mockCore.On("List", mock.Anything, runtime.cond).Return([]*api.TimerRecord{t0New, timers[1], t2New, t4New, t6New}, nil).Once()
runtime.fullRefreshTimers()
require.Equal(t, float64(2), fullRefreshCounter.Val())
mockCore.AssertExpectations(t)
require.Equal(t, 5, len(runtime.cache.items))
require.Equal(t, t0New, runtime.cache.items["t0"].timer)
Expand All @@ -446,6 +459,14 @@ func TestFullRefreshTimers(t *testing.T) {
}

func TestBatchHandlerWatchResponses(t *testing.T) {
origPartialRefreshCounter := metrics.TimerPartialRefreshCounter
defer func() {
metrics.TimerPartialRefreshCounter = origPartialRefreshCounter
}()

partialRefreshCounter := &mockutil.MetricsCounter{}
metrics.TimerPartialRefreshCounter = partialRefreshCounter

mockCore, mockStore := newMockStore()
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")}
Expand Down Expand Up @@ -509,6 +530,7 @@ func TestBatchHandlerWatchResponses(t *testing.T) {
require.Contains(t, condIDs, "t2")
})

require.Equal(t, float64(0), partialRefreshCounter.Val())
runtime.batchHandleWatchResponses([]api.WatchTimerResponse{
{
Events: []*api.WatchTimerEvent{
Expand All @@ -535,6 +557,7 @@ func TestBatchHandlerWatchResponses(t *testing.T) {
},
},
})
require.Equal(t, float64(1), partialRefreshCounter.Val())

mockCore.AssertExpectations(t)
require.Equal(t, 6, len(runtime.cache.items))
Expand Down
24 changes: 22 additions & 2 deletions timer/runtime/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/timer/api"
"github.com/pingcap/tidb/timer/metrics"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -68,6 +70,13 @@ type hookWorker struct {
ch chan *triggerEventRequest
logger *zap.Logger
nowFunc func() time.Time
// metrics for worker
triggerRequestCounter prometheus.Counter
onPreSchedEventCounter prometheus.Counter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can create metrics package to support metrics for timer. such as executor/metrics.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

onPreSchedEventErrCounter prometheus.Counter
onPreSchedEventDelayCounter prometheus.Counter
onSchedEventCounter prometheus.Counter
onSchedEventErrCounter prometheus.Counter
}

func newHookWorker(ctx context.Context, wg *sync.WaitGroup, groupID string, hookClass string, hook api.Hook, nowFunc func() time.Time) *hookWorker {
Expand All @@ -85,7 +94,13 @@ func newHookWorker(ctx context.Context, wg *sync.WaitGroup, groupID string, hook
zap.String("groupID", groupID),
zap.String("hookClass", hookClass),
),
nowFunc: nowFunc,
nowFunc: nowFunc,
triggerRequestCounter: metrics.TimerHookWorkerCounter(hookClass, "trigger"),
onPreSchedEventCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent"),
onPreSchedEventErrCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent_error"),
onPreSchedEventDelayCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent_delay"),
onSchedEventCounter: metrics.TimerHookWorkerCounter(hookClass, "OnSchedEvent"),
onSchedEventErrCounter: metrics.TimerHookWorkerCounter(hookClass, "OnSchedEvent_error"),
}

wg.Add(1)
Expand Down Expand Up @@ -127,6 +142,7 @@ func (w *hookWorker) loop() {
}

func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger) *triggerEventResponse {
w.triggerRequestCounter.Inc()
timer := req.timer
resp := &triggerEventResponse{
timerID: timer.ID,
Expand Down Expand Up @@ -177,6 +193,7 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger)
var preResult api.PreSchedEventResult
if w.hook != nil {
logger.Debug("call OnPreSchedEvent")
w.onPreSchedEventCounter.Inc()
result, err := w.hook.OnPreSchedEvent(w.ctx, &timerEvent{
eventID: req.eventID,
record: timer,
Expand All @@ -188,15 +205,16 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger)
zap.Error(err),
zap.Duration("retryAfter", workerEventDefaultRetryInterval),
)
w.onPreSchedEventErrCounter.Inc()
resp.retryAfter.Set(workerEventDefaultRetryInterval)
return resp
}

if result.Delay > 0 {
w.onPreSchedEventDelayCounter.Inc()
resp.retryAfter.Set(result.Delay)
return resp
}

preResult = result
}

Expand Down Expand Up @@ -257,12 +275,14 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger)

if w.hook != nil {
logger.Debug("call OnSchedEvent")
w.onSchedEventCounter.Inc()
err = w.hook.OnSchedEvent(w.ctx, &timerEvent{
eventID: req.eventID,
record: timer,
})

if err != nil {
w.onSchedEventErrCounter.Inc()
logger.Error(
"error occurs when invoking hook OnTimerEvent",
zap.Error(err),
Expand Down
Loading