diff --git a/metrics/BUILD.bazel b/metrics/BUILD.bazel index f6b729f68e9ef..e1be43fb5e105 100644 --- a/metrics/BUILD.bazel +++ b/metrics/BUILD.bazel @@ -27,6 +27,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//parser/terror", + "//timer/metrics", "//util/logutil", "//util/mathutil", "@com_github_pingcap_errors//:errors", diff --git a/metrics/metrics.go b/metrics/metrics.go index f316cacfa747f..6deb4a1e93808 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -17,6 +17,7 @@ package metrics import ( "sync" + timermetrics "github.com/pingcap/tidb/timer/metrics" "github.com/pingcap/tidb/util/logutil" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -91,6 +92,7 @@ func InitMetrics() { InitTelemetryMetrics() InitTopSQLMetrics() InitTTLMetrics() + timermetrics.InitTimerMetrics() PanicCounter = NewCounterVec( prometheus.CounterOpts{ @@ -255,6 +257,9 @@ func RegisterMetrics() { prometheus.MustRegister(TTLPhaseTime) prometheus.MustRegister(TTLInsertRowsCount) prometheus.MustRegister(TTLWatermarkDelay) + prometheus.MustRegister(TTLEventCounter) + + prometheus.MustRegister(timermetrics.TimerEventCounter) prometheus.MustRegister(EMACPUUsageGauge) prometheus.MustRegister(PoolConcurrencyCounter) diff --git a/metrics/ttl.go b/metrics/ttl.go index da91b88d94e3d..4c9001ac704b3 100644 --- a/metrics/ttl.go +++ b/metrics/ttl.go @@ -31,6 +31,12 @@ var ( TTLInsertRowsCount prometheus.Counter TTLWatermarkDelay *prometheus.GaugeVec + + TTLEventCounter *prometheus.CounterVec + + TTLSyncTimerCounter prometheus.Counter + + TTLFullRefreshTimersCounter prometheus.Counter ) // InitTTLMetrics initializes ttl metrics. @@ -91,4 +97,15 @@ func InitTTLMetrics() { Name: "ttl_watermark_delay", Help: "Bucketed delay time in seconds for TTL tables.", }, []string{LblType, LblName}) + + TTLEventCounter = NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "ttl_event_count", + Help: "Counter of ttl event.", + }, []string{LblType}) + + TTLSyncTimerCounter = TTLEventCounter.WithLabelValues("sync_one_timer") + TTLFullRefreshTimersCounter = TTLEventCounter.WithLabelValues("full_refresh_timers") } diff --git a/timer/metrics/BUILD.bazel b/timer/metrics/BUILD.bazel new file mode 100644 index 0000000000000..9c5e47875624b --- /dev/null +++ b/timer/metrics/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "metrics", + srcs = ["metrics.go"], + importpath = "github.com/pingcap/tidb/timer/metrics", + visibility = ["//visibility:public"], + deps = ["@com_github_prometheus_client_golang//prometheus"], +) diff --git a/timer/metrics/metrics.go b/timer/metrics/metrics.go new file mode 100644 index 0000000000000..d61ce75ea03b8 --- /dev/null +++ b/timer/metrics/metrics.go @@ -0,0 +1,49 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +// Timer metrics +var ( + TimerEventCounter *prometheus.CounterVec + + TimerFullRefreshCounter prometheus.Counter + TimerPartialRefreshCounter prometheus.Counter +) + +// InitTimerMetrics initializes timers metrics. +func InitTimerMetrics() { + TimerEventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "timer_event_count", + Help: "Counter of timer event.", + }, []string{"scope", "type"}) + + rtScope := "runtime" + TimerFullRefreshCounter = TimerEventCounter.WithLabelValues(rtScope, "full_refresh_timers") + TimerPartialRefreshCounter = TimerEventCounter.WithLabelValues(rtScope, "partial_refresh_timers") +} + +// TimerHookWorkerCounter creates a counter for a hook's event +func TimerHookWorkerCounter(hookClass string, event string) prometheus.Counter { + return TimerEventCounter.WithLabelValues(fmt.Sprintf("hook.%s", hookClass), event) +} diff --git a/timer/runtime/BUILD.bazel b/timer/runtime/BUILD.bazel index 7ceb68b66c98c..d64f1cdd421cd 100644 --- a/timer/runtime/BUILD.bazel +++ b/timer/runtime/BUILD.bazel @@ -11,9 +11,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//timer/api", + "//timer/metrics", "//util/logutil", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", + "@com_github_prometheus_client_golang//prometheus", "@org_golang_x_exp//maps", "@org_uber_go_zap//:zap", ], @@ -35,6 +37,8 @@ go_test( deps = [ "//testkit/testsetup", "//timer/api", + "//timer/metrics", + "//util/mock", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//mock", diff --git a/timer/runtime/runtime.go b/timer/runtime/runtime.go index 7b55752156ec4..7bd3e3a6a6fa6 100644 --- a/timer/runtime/runtime.go +++ b/timer/runtime/runtime.go @@ -23,6 +23,7 @@ import ( "github.com/google/uuid" "github.com/pingcap/tidb/timer/api" + "github.com/pingcap/tidb/timer/metrics" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" "golang.org/x/exp/maps" @@ -210,6 +211,7 @@ func (rt *TimerGroupRuntime) loop() { } func (rt *TimerGroupRuntime) fullRefreshTimers() { + metrics.TimerFullRefreshCounter.Inc() timers, err := rt.store.List(rt.ctx, rt.cond) if err != nil { rt.logger.Error("error occurs when fullRefreshTimers", zap.Error(err)) @@ -354,6 +356,7 @@ func (rt *TimerGroupRuntime) partialRefreshTimers(timerIDs map[string]struct{}) return false } + metrics.TimerPartialRefreshCounter.Inc() cond := rt.buildTimerIDsCond(timerIDs) timers, err := rt.store.List(rt.ctx, cond) if err != nil { diff --git a/timer/runtime/runtime_test.go b/timer/runtime/runtime_test.go index 8c9a0c9f2781a..a5146f08b792b 100644 --- a/timer/runtime/runtime_test.go +++ b/timer/runtime/runtime_test.go @@ -24,6 +24,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/timer/api" + "github.com/pingcap/tidb/timer/metrics" + mockutil "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -388,6 +390,14 @@ func TestNextTryTriggerDuration(t *testing.T) { } func TestFullRefreshTimers(t *testing.T) { + origFullRefreshCounter := metrics.TimerFullRefreshCounter + defer func() { + metrics.TimerFullRefreshCounter = origFullRefreshCounter + }() + + fullRefreshCounter := &mockutil.MetricsCounter{} + metrics.TimerFullRefreshCounter = fullRefreshCounter + mockCore, mockStore := newMockStore() runtime := NewTimerRuntimeBuilder("g1", mockStore).Build() runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")} @@ -428,11 +438,14 @@ func TestFullRefreshTimers(t *testing.T) { t6New.Version++ mockCore.On("List", mock.Anything, runtime.cond).Return(timers[0:], errors.New("mockErr")).Once() + require.Equal(t, float64(0), fullRefreshCounter.Val()) runtime.fullRefreshTimers() + require.Equal(t, float64(1), fullRefreshCounter.Val()) require.Equal(t, 7, len(runtime.cache.items)) mockCore.On("List", mock.Anything, runtime.cond).Return([]*api.TimerRecord{t0New, timers[1], t2New, t4New, t6New}, nil).Once() runtime.fullRefreshTimers() + require.Equal(t, float64(2), fullRefreshCounter.Val()) mockCore.AssertExpectations(t) require.Equal(t, 5, len(runtime.cache.items)) require.Equal(t, t0New, runtime.cache.items["t0"].timer) @@ -446,6 +459,14 @@ func TestFullRefreshTimers(t *testing.T) { } func TestBatchHandlerWatchResponses(t *testing.T) { + origPartialRefreshCounter := metrics.TimerPartialRefreshCounter + defer func() { + metrics.TimerPartialRefreshCounter = origPartialRefreshCounter + }() + + partialRefreshCounter := &mockutil.MetricsCounter{} + metrics.TimerPartialRefreshCounter = partialRefreshCounter + mockCore, mockStore := newMockStore() runtime := NewTimerRuntimeBuilder("g1", mockStore).Build() runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")} @@ -509,6 +530,7 @@ func TestBatchHandlerWatchResponses(t *testing.T) { require.Contains(t, condIDs, "t2") }) + require.Equal(t, float64(0), partialRefreshCounter.Val()) runtime.batchHandleWatchResponses([]api.WatchTimerResponse{ { Events: []*api.WatchTimerEvent{ @@ -535,6 +557,7 @@ func TestBatchHandlerWatchResponses(t *testing.T) { }, }, }) + require.Equal(t, float64(1), partialRefreshCounter.Val()) mockCore.AssertExpectations(t) require.Equal(t, 6, len(runtime.cache.items)) diff --git a/timer/runtime/worker.go b/timer/runtime/worker.go index 48e3ade347a59..b28ac3b707741 100644 --- a/timer/runtime/worker.go +++ b/timer/runtime/worker.go @@ -21,7 +21,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/timer/api" + "github.com/pingcap/tidb/timer/metrics" "github.com/pingcap/tidb/util/logutil" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -68,6 +70,13 @@ type hookWorker struct { ch chan *triggerEventRequest logger *zap.Logger nowFunc func() time.Time + // metrics for worker + triggerRequestCounter prometheus.Counter + onPreSchedEventCounter prometheus.Counter + onPreSchedEventErrCounter prometheus.Counter + onPreSchedEventDelayCounter prometheus.Counter + onSchedEventCounter prometheus.Counter + onSchedEventErrCounter prometheus.Counter } func newHookWorker(ctx context.Context, wg *sync.WaitGroup, groupID string, hookClass string, hook api.Hook, nowFunc func() time.Time) *hookWorker { @@ -85,7 +94,13 @@ func newHookWorker(ctx context.Context, wg *sync.WaitGroup, groupID string, hook zap.String("groupID", groupID), zap.String("hookClass", hookClass), ), - nowFunc: nowFunc, + nowFunc: nowFunc, + triggerRequestCounter: metrics.TimerHookWorkerCounter(hookClass, "trigger"), + onPreSchedEventCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent"), + onPreSchedEventErrCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent_error"), + onPreSchedEventDelayCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent_delay"), + onSchedEventCounter: metrics.TimerHookWorkerCounter(hookClass, "OnSchedEvent"), + onSchedEventErrCounter: metrics.TimerHookWorkerCounter(hookClass, "OnSchedEvent_error"), } wg.Add(1) @@ -127,6 +142,7 @@ func (w *hookWorker) loop() { } func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger) *triggerEventResponse { + w.triggerRequestCounter.Inc() timer := req.timer resp := &triggerEventResponse{ timerID: timer.ID, @@ -177,6 +193,7 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger) var preResult api.PreSchedEventResult if w.hook != nil { logger.Debug("call OnPreSchedEvent") + w.onPreSchedEventCounter.Inc() result, err := w.hook.OnPreSchedEvent(w.ctx, &timerEvent{ eventID: req.eventID, record: timer, @@ -188,15 +205,16 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger) zap.Error(err), zap.Duration("retryAfter", workerEventDefaultRetryInterval), ) + w.onPreSchedEventErrCounter.Inc() resp.retryAfter.Set(workerEventDefaultRetryInterval) return resp } if result.Delay > 0 { + w.onPreSchedEventDelayCounter.Inc() resp.retryAfter.Set(result.Delay) return resp } - preResult = result } @@ -257,12 +275,14 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger) if w.hook != nil { logger.Debug("call OnSchedEvent") + w.onSchedEventCounter.Inc() err = w.hook.OnSchedEvent(w.ctx, &timerEvent{ eventID: req.eventID, record: timer, }) if err != nil { + w.onSchedEventErrCounter.Inc() logger.Error( "error occurs when invoking hook OnTimerEvent", zap.Error(err), diff --git a/timer/runtime/worker_test.go b/timer/runtime/worker_test.go index 86d97e7fabeea..4ad0e6a5d8a99 100644 --- a/timer/runtime/worker_test.go +++ b/timer/runtime/worker_test.go @@ -23,11 +23,41 @@ import ( "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/tidb/timer/api" + mockutil "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/atomic" ) +func checkAndMockWorkerCounters(t *testing.T, w *hookWorker) { + require.NotNil(t, w.triggerRequestCounter) + w.triggerRequestCounter = &mockutil.MetricsCounter{} + + require.NotNil(t, w.onPreSchedEventCounter) + w.onPreSchedEventCounter = &mockutil.MetricsCounter{} + + require.NotNil(t, w.onPreSchedEventErrCounter) + w.onPreSchedEventErrCounter = &mockutil.MetricsCounter{} + + require.NotNil(t, w.onPreSchedEventDelayCounter) + w.onPreSchedEventDelayCounter = &mockutil.MetricsCounter{} + + require.NotNil(t, w.onSchedEventCounter) + w.onSchedEventCounter = &mockutil.MetricsCounter{} + + require.NotNil(t, w.onSchedEventErrCounter) + w.onSchedEventErrCounter = &mockutil.MetricsCounter{} +} + +func checkWorkerCounterValues(t *testing.T, trigger, onPreSched, onPreSchedErr, onPreSchedDelay, onSchedEvent, onSchedEventErr int64, w *hookWorker) { + require.Equal(t, float64(trigger), w.triggerRequestCounter.(*mockutil.MetricsCounter).Val()) + require.Equal(t, float64(onPreSched), w.onPreSchedEventCounter.(*mockutil.MetricsCounter).Val()) + require.Equal(t, float64(onPreSchedErr), w.onPreSchedEventErrCounter.(*mockutil.MetricsCounter).Val()) + require.Equal(t, float64(onPreSchedDelay), w.onPreSchedEventDelayCounter.(*mockutil.MetricsCounter).Val()) + require.Equal(t, float64(onSchedEvent), w.onSchedEventCounter.(*mockutil.MetricsCounter).Val()) + require.Equal(t, float64(onSchedEventErr), w.onSchedEventErrCounter.(*mockutil.MetricsCounter).Val()) +} + func TestWorkerStartStop(t *testing.T) { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) @@ -154,6 +184,7 @@ func TestWorkerProcessIdleTimerSuccess(t *testing.T) { hook.On("Stop").Return().Once() w := newHookWorker(ctx, &wg, "g1", "h1", hook, nil) + checkAndMockWorkerCounters(t, w) eventID := uuid.NewString() var eventStartRef atomic.Pointer[time.Time] var finalTimerRef atomic.Pointer[api.TimerRecord] @@ -203,6 +234,7 @@ func TestWorkerProcessIdleTimerSuccess(t *testing.T) { require.Equal(t, finalTimerRef.Load(), newTimer) }) + checkWorkerCounterValues(t, 1, 1, 0, 0, 1, 0, w) cancel() waitDone(hook.stopped, time.Second) hook.AssertExpectations(t) @@ -237,6 +269,7 @@ func TestWorkerProcessTriggeredTimerSuccess(t *testing.T) { hook.On("Stop").Return().Once() w := newHookWorker(ctx, &wg, "g1", "h1", hook, nil) + checkAndMockWorkerCounters(t, w) hook.On("OnSchedEvent", mock.Anything, mock.Anything). Return(nil).Once(). Run(func(args mock.Arguments) { @@ -267,6 +300,7 @@ func TestWorkerProcessTriggeredTimerSuccess(t *testing.T) { require.Equal(t, timer, newTimer) }) + checkWorkerCounterValues(t, 1, 0, 0, 0, 1, 0, w) cancel() waitDone(hook.stopped, time.Second) hook.AssertExpectations(t) @@ -288,6 +322,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { hook.On("Stop").Return().Once() w := newHookWorker(ctx, &wg, "g1", "h1", hook, nil) + checkAndMockWorkerCounters(t, w) eventID := uuid.NewString() request := &triggerEventRequest{ eventID: eventID, @@ -309,6 +344,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { _, ok = resp.newTimerRecord.Get() require.False(t, ok) }) + checkWorkerCounterValues(t, 1, 1, 0, 1, 0, 0, w) // OnPreSchedEvent error hook.On("OnPreSchedEvent", mock.Anything, mock.Anything). @@ -323,6 +359,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { _, ok = resp.newTimerRecord.Get() require.False(t, ok) }) + checkWorkerCounterValues(t, 2, 2, 1, 1, 0, 0, w) tm, err := cli.GetTimerByID(ctx, timer.ID) require.NoError(t, err) @@ -345,6 +382,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { _, ok = resp.newTimerRecord.Get() require.False(t, ok) }) + checkWorkerCounterValues(t, 3, 3, 1, 1, 0, 0, w) // timer meta changed then get record error hook.On("OnPreSchedEvent", mock.Anything, mock.Anything). @@ -363,6 +401,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { _, ok = resp.newTimerRecord.Get() require.False(t, ok) }) + checkWorkerCounterValues(t, 4, 4, 1, 1, 0, 0, w) // timer event updated then get record error hook.On("OnPreSchedEvent", mock.Anything, mock.Anything). @@ -381,6 +420,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { _, ok = resp.newTimerRecord.Get() require.False(t, ok) }) + checkWorkerCounterValues(t, 5, 5, 1, 1, 0, 0, w) // timer event updated then get record return nil hook.On("OnPreSchedEvent", mock.Anything, mock.Anything). @@ -399,6 +439,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { require.True(t, ok) require.Nil(t, newRecord) }) + checkWorkerCounterValues(t, 6, 6, 1, 1, 0, 0, w) // timer event updated then get record return different eventID anotherEventIDTimer := timer.Clone() @@ -423,6 +464,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { require.Equal(t, anotherEventIDTimer, newRecord) }) request.store = store + checkWorkerCounterValues(t, 7, 7, 1, 1, 0, 0, w) // timer meta changed err = cli.UpdateTimer(ctx, timer.ID, api.WithSetSchedExpr(api.SchedEventInterval, "2m")) @@ -446,6 +488,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { require.True(t, ok) require.Equal(t, timer, newTimer) }) + checkWorkerCounterValues(t, 8, 8, 1, 1, 0, 0, w) // OnSchedEvent error now := time.Now() @@ -469,6 +512,7 @@ func TestWorkerProcessDelayOrErr(t *testing.T) { timer = getAndCheckTriggeredTimer(t, cli, timer, eventID, []byte("eventdata"), now, time.Now()) require.Equal(t, timer, finalTimerRef.Load()) request.timer = timer + checkWorkerCounterValues(t, 9, 9, 1, 1, 1, 1, w) // Event closed before trigger err = cli.CloseTimerEvent(ctx, timer.ID, eventID) diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index f830e7568fa63..5073b9ca522b9 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -19,6 +19,7 @@ go_library( deps = [ "//infoschema", "//kv", + "//metrics", "//parser/model", "//parser/terror", "//sessionctx", @@ -74,6 +75,7 @@ go_test( "//domain", "//infoschema", "//kv", + "//metrics", "//parser/ast", "//parser/model", "//parser/mysql", @@ -92,6 +94,7 @@ go_test( "//types", "//util/chunk", "//util/logutil", + "//util/mock", "@com_github_google_uuid//:uuid", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", diff --git a/ttl/ttlworker/timer_sync.go b/ttl/ttlworker/timer_sync.go index 274a47754e675..41bb410a294f0 100644 --- a/ttl/ttlworker/timer_sync.go +++ b/ttl/ttlworker/timer_sync.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" timerapi "github.com/pingcap/tidb/timer/api" "github.com/pingcap/tidb/ttl/cache" @@ -160,6 +161,7 @@ func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSche g.lastSyncTime = g.nowFunc() g.lastSyncVer = is.SchemaMetaVersion() if time.Since(g.lastPullTimers) > fullRefreshTimersCacheInterval { + metrics.TTLFullRefreshTimersCounter.Inc() newKey2Timers := make(map[string]*timerapi.TimerRecord, len(g.key2Timers)) timers, err := g.cli.GetTimers(ctx, timerapi.WithKeyPrefix(timerKeyPrefix)) if err != nil { @@ -200,12 +202,14 @@ func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSche } if time.Since(timer.CreateTime) > g.delayDelete { + metrics.TTLSyncTimerCounter.Inc() if _, err = g.cli.DeleteTimer(ctx, timer.ID); err != nil { logutil.BgLogger().Error("failed to delete timer", zap.Error(err), zap.String("timerID", timer.ID)) } else { delete(g.key2Timers, key) } } else if timer.Enable { + metrics.TTLSyncTimerCounter.Inc() if err = g.cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetEnable(false)); err != nil { logutil.BgLogger().Error("failed to disable timer", zap.Error(err), zap.String("timerID", timer.ID)) } @@ -266,6 +270,7 @@ func (g *TTLTimersSyncer) syncOneTimer(ctx context.Context, se session.Session, } } + metrics.TTLSyncTimerCounter.Inc() timer, err := g.cli.GetTimerByKey(ctx, key) if err != nil && !errors.ErrorEqual(err, timerapi.ErrTimerNotExist) { return nil, err diff --git a/ttl/ttlworker/timer_sync_test.go b/ttl/ttlworker/timer_sync_test.go index a475b20be61ec..c203a55d260c2 100644 --- a/ttl/ttlworker/timer_sync_test.go +++ b/ttl/ttlworker/timer_sync_test.go @@ -24,12 +24,14 @@ import ( "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" timerapi "github.com/pingcap/tidb/timer/api" "github.com/pingcap/tidb/timer/tablestore" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/ttlworker" + mockutil "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" ) @@ -195,7 +197,20 @@ func TestTTLManualTriggerOneTimer(t *testing.T) { } func TestTTLTimerSync(t *testing.T) { + origFullRefreshTimerCounter := metrics.TTLFullRefreshTimersCounter + origSyncTimerCounter := metrics.TTLSyncTimerCounter + defer func() { + metrics.TTLFullRefreshTimersCounter = origFullRefreshTimerCounter + metrics.TTLSyncTimerCounter = origSyncTimerCounter + }() + store, do := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, do) + + fullRefreshTimersCounter := &mockutil.MetricsCounter{} + metrics.TTLFullRefreshTimersCounter = fullRefreshTimersCounter + syncTimerCounter := &mockutil.MetricsCounter{} + metrics.TTLSyncTimerCounter = syncTimerCounter tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -230,7 +245,12 @@ func TestTTLTimerSync(t *testing.T) { // first sync now := time.Now() + require.Equal(t, float64(0), fullRefreshTimersCounter.Val()) + require.Equal(t, float64(0), syncTimerCounter.Val()) sync.SyncTimers(context.TODO(), do.InfoSchema()) + require.Equal(t, float64(1), fullRefreshTimersCounter.Val()) + require.Equal(t, float64(6), syncTimerCounter.Val()) + syncCnt := syncTimerCounter.Val() lastSyncTime, lastSyncVer = sync.GetLastSyncInfo() require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer) require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix()) @@ -252,6 +272,8 @@ func TestTTLTimerSync(t *testing.T) { ")") now = time.Now() sync.SyncTimers(context.TODO(), do.InfoSchema()) + require.Equal(t, syncCnt+4, syncTimerCounter.Val()) + syncCnt = syncTimerCounter.Val() lastSyncTime, lastSyncVer = sync.GetLastSyncInfo() require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer) require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix()) @@ -269,6 +291,8 @@ func TestTTLTimerSync(t *testing.T) { tk.MustExec("alter table t5 TTL=`t`+interval 10 HOUR ttl_enable='OFF'") tk.MustExec("alter table tp1 ttl_job_interval='3m'") sync.SyncTimers(context.TODO(), do.InfoSchema()) + require.Equal(t, syncCnt+7, syncTimerCounter.Val()) + syncCnt = syncTimerCounter.Val() checkTimerCnt(t, cli, 11) checkTimerWithTableMeta(t, do, cli, "test", "t1", "", zeroTime) timer2 = checkTimerWithTableMeta(t, do, cli, "test", "t2", "", wm1) @@ -282,6 +306,8 @@ func TestTTLTimerSync(t *testing.T) { // rename table tk.MustExec("rename table t1 to t1a") sync.SyncTimers(context.TODO(), do.InfoSchema()) + require.Equal(t, syncCnt+1, syncTimerCounter.Val()) + syncCnt = syncTimerCounter.Val() checkTimerCnt(t, cli, 11) timer1 = checkTimerWithTableMeta(t, do, cli, "test", "t1a", "", zeroTime) checkTimersNotChange(t, cli, timer1, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12, timerP13, timerP20, timerP21) @@ -295,6 +321,8 @@ func TestTTLTimerSync(t *testing.T) { tk.MustExec("alter table tp1 truncate partition p1") tk.MustExec("truncate table tp2") sync.SyncTimers(context.TODO(), do.InfoSchema()) + require.Equal(t, syncCnt+7, syncTimerCounter.Val()) + syncCnt = syncTimerCounter.Val() checkTimerCnt(t, cli, 15) timer2 = checkTimerWithTableMeta(t, do, cli, "test", "t2", "", zeroTime) require.NotEqual(t, oldTimer2.ID, timer2.ID) @@ -315,6 +343,8 @@ func TestTTLTimerSync(t *testing.T) { tk.MustExec("alter table tp1 drop partition p3") tk.MustExec("drop table tp2") sync.SyncTimers(context.TODO(), do.InfoSchema()) + require.Equal(t, syncCnt+3, syncTimerCounter.Val()) + syncCnt = syncTimerCounter.Val() checkTimerCnt(t, cli, 15) checkTimerOnlyDisabled(t, cli, timer1) checkTimerOnlyDisabled(t, cli, timerP13) @@ -327,6 +357,8 @@ func TestTTLTimerSync(t *testing.T) { sync.SetDelayDeleteInterval(time.Millisecond) time.Sleep(time.Second) sync.SyncTimers(context.TODO(), do.InfoSchema()) + require.Equal(t, syncCnt+8, syncTimerCounter.Val()) + syncCnt = syncTimerCounter.Val() checkTimerCnt(t, cli, 7) checkTimersNotChange(t, cli, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12) @@ -338,7 +370,10 @@ func TestTTLTimerSync(t *testing.T) { // sync after reset now = time.Now() + require.Equal(t, float64(1), fullRefreshTimersCounter.Val()) sync.SyncTimers(context.TODO(), do.InfoSchema()) + require.Equal(t, float64(2), fullRefreshTimersCounter.Val()) + require.Equal(t, syncCnt, syncTimerCounter.Val()) lastSyncTime, lastSyncVer = sync.GetLastSyncInfo() require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer) require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix()) diff --git a/util/mock/BUILD.bazel b/util/mock/BUILD.bazel index 78ba5291cbe8b..556a2ac195477 100644 --- a/util/mock/BUILD.bazel +++ b/util/mock/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "client.go", "context.go", "iter.go", + "metrics.go", "store.go", ], importpath = "github.com/pingcap/tidb/util/mock", @@ -29,9 +30,11 @@ go_library( "@com_github_pingcap_kvproto//pkg/deadlock", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_tipb//go-binlog", + "@com_github_prometheus_client_golang//prometheus", "@com_github_stretchr_testify//assert", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", + "@org_uber_go_atomic//:atomic", ], ) diff --git a/util/mock/metrics.go b/util/mock/metrics.go new file mode 100644 index 0000000000000..e0dcd89468682 --- /dev/null +++ b/util/mock/metrics.go @@ -0,0 +1,41 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" +) + +// MetricsCounter is a mock for metrics counter +type MetricsCounter struct { + prometheus.Counter + val atomic.Float64 +} + +// Add adds the value +func (c *MetricsCounter) Add(v float64) { + c.val.Add(v) +} + +// Inc increase the value +func (c *MetricsCounter) Inc() { + c.val.Add(1) +} + +// Val returns val +func (c *MetricsCounter) Val() float64 { + return c.val.Load() +}