From b06dcc0d2da21e85a84345e7e59697a5ea51f0fc Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Tue, 12 Jan 2021 16:35:01 -0500 Subject: [PATCH] [matcher/coordinator] Add latency metrics to rule matching --- src/metrics/matcher/match.go | 34 ++++++++++++++++++++++--- src/metrics/matcher/match_test.go | 41 +++++++++++++++++++++++-------- 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/src/metrics/matcher/match.go b/src/metrics/matcher/match.go index 776428feb2..ddadaa82a8 100644 --- a/src/metrics/matcher/match.go +++ b/src/metrics/matcher/match.go @@ -21,6 +21,10 @@ package matcher import ( + "time" + + "github.com/uber-go/tally" + "github.com/m3db/m3/src/metrics/matcher/cache" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/rules" @@ -40,6 +44,7 @@ type matcher struct { namespaceResolver namespaceResolver namespaces Namespaces cache cache.Cache + metrics matcherMetrics } type namespaceResolver struct { @@ -89,6 +94,7 @@ func NewMatcher(cache cache.Cache, opts Options) (Matcher, error) { return &noCacheMatcher{ namespaceResolver: nsResolver, namespaces: namespaces, + metrics: newMatcherMetrics(scope.SubScope("matcher")), }, nil } @@ -96,6 +102,7 @@ func NewMatcher(cache cache.Cache, opts Options) (Matcher, error) { namespaceResolver: nsResolver, namespaces: namespaces, cache: cache, + metrics: newMatcherMetrics(scope.SubScope("cached-matcher")), }, nil } @@ -103,8 +110,9 @@ func (m *matcher) ForwardMatch( id id.ID, fromNanos, toNanos int64, ) rules.MatchResult { - return m.cache.ForwardMatch(m.namespaceResolver.Resolve(id), - id.Bytes(), fromNanos, toNanos) + sw := m.metrics.matchLatency.Start() + defer sw.Stop() + return m.cache.ForwardMatch(m.namespaceResolver.Resolve(id), id.Bytes(), fromNanos, toNanos) } func (m *matcher) Close() error { @@ -115,14 +123,32 @@ func (m *matcher) Close() error { type noCacheMatcher struct { namespaces Namespaces namespaceResolver namespaceResolver + metrics matcherMetrics +} + +type matcherMetrics struct { + matchLatency tally.Histogram +} + +func newMatcherMetrics(scope tally.Scope) matcherMetrics { + return matcherMetrics{ + matchLatency: scope.Histogram( + "match-latency", + append( + tally.DurationBuckets{0}, + tally.MustMakeExponentialDurationBuckets(time.Millisecond, 1.5, 15)..., + ), + ), + } } func (m *noCacheMatcher) ForwardMatch( id id.ID, fromNanos, toNanos int64, ) rules.MatchResult { - return m.namespaces.ForwardMatch(m.namespaceResolver.Resolve(id), - id.Bytes(), fromNanos, toNanos) + sw := m.metrics.matchLatency.Start() + defer sw.Stop() + return m.namespaces.ForwardMatch(m.namespaceResolver.Resolve(id), id.Bytes(), fromNanos, toNanos) } func (m *noCacheMatcher) Close() error { diff --git a/src/metrics/matcher/match_test.go b/src/metrics/matcher/match_test.go index cce42bab56..71ad1035f1 100644 --- a/src/metrics/matcher/match_test.go +++ b/src/metrics/matcher/match_test.go @@ -25,6 +25,10 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/kv/mem" "github.com/m3db/m3/src/metrics/aggregation" @@ -42,9 +46,6 @@ import ( "github.com/m3db/m3/src/x/instrument" xtest "github.com/m3db/m3/src/x/test" "github.com/m3db/m3/src/x/watch" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/require" ) func TestMatcherCreateWatchError(t *testing.T) { @@ -82,10 +83,12 @@ func TestMatcherMatchDoesNotExist(t *testing.T) { tagValueFn: func(tagName []byte) ([]byte, bool) { return nil, false }, } now := time.Now() - matcher := testMatcher(t, testMatcherOptions{ + matcher, testScope := testMatcher(t, testMatcherOptions{ cache: newMemCache(), }) require.Equal(t, rules.EmptyMatchResult, matcher.ForwardMatch(id, now.UnixNano(), now.UnixNano())) + + requireLatencyMetrics(t, "cached-matcher", testScope) } func TestMatcherMatchExists(t *testing.T) { @@ -100,7 +103,7 @@ func TestMatcherMatchExists(t *testing.T) { memRes = memResults{results: map[string]rules.MatchResult{"foo": res}} ) cache := newMemCache() - matcher := testMatcher(t, testMatcherOptions{ + matcher, _ := testMatcher(t, testMatcherOptions{ cache: cache, }) c := cache.(*memCache) @@ -125,7 +128,7 @@ func TestMatcherMatchExistsNoCache(t *testing.T) { } now = time.Now() ) - matcher := testMatcher(t, testMatcherOptions{ + matcher, testScope := testMatcher(t, testMatcherOptions{ tagFilterOptions: filters.TagsFilterOptions{ NameAndTagsFn: func(id []byte) (name []byte, tags []byte, err error) { name = metric.id @@ -210,10 +213,13 @@ func TestMatcherMatchExistsNoCache(t *testing.T) { result := matcher.ForwardMatch(metric, now.UnixNano(), now.UnixNano()) require.Equal(t, expected, result) + + // Check that latency was measured + requireLatencyMetrics(t, "matcher", testScope) } func TestMatcherClose(t *testing.T) { - matcher := testMatcher(t, testMatcherOptions{ + matcher, _ := testMatcher(t, testMatcherOptions{ cache: newMemCache(), }) require.NoError(t, matcher.Close()) @@ -225,12 +231,13 @@ type testMatcherOptions struct { tagFilterOptions filters.TagsFilterOptions } -func testMatcher(t *testing.T, opts testMatcherOptions) Matcher { +func testMatcher(t *testing.T, opts testMatcherOptions) (Matcher, tally.TestScope) { + scope := tally.NewTestScope("", nil) var ( store = mem.NewStore() matcherOpts = NewOptions(). SetClockOptions(clock.NewOptions()). - SetInstrumentOptions(instrument.NewOptions()). + SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)). SetInitWatchTimeout(100 * time.Millisecond). SetKVStore(store). SetNamespacesKey(testNamespacesKey). @@ -264,7 +271,21 @@ func testMatcher(t *testing.T, opts testMatcherOptions) Matcher { m, err := NewMatcher(opts.cache, matcherOpts) require.NoError(t, err) - return m + return m, scope +} + +func requireLatencyMetrics(t *testing.T, metricScope string, testScope tally.TestScope) { + // Check that latency was measured + values, found := testScope.Snapshot().Histograms()[metricScope+".match-latency+"] + require.True(t, found) + latencyMeasured := false + for _, valuesInBucket := range values.Durations() { + if valuesInBucket > 0 { + latencyMeasured = true + break + } + } + require.True(t, latencyMeasured) } type tagValueFn func(tagName []byte) ([]byte, bool)