From c46e22d2a371a1f532119d2416fa26db793abd20 Mon Sep 17 00:00:00 2001 From: Ryan Allen Date: Wed, 15 Apr 2020 15:32:09 -0400 Subject: [PATCH] [dbnode] Track blocks recently queried within some duration in dbnode (#2240) --- src/dbnode/server/server.go | 16 ++- src/dbnode/storage/index/block.go | 13 ++ src/dbnode/storage/index/index_mock.go | 29 +++++ src/dbnode/storage/index/options.go | 17 +++ src/dbnode/storage/index/types.go | 7 + src/dbnode/storage/stats/query_stats.go | 121 ++++++++++++++++++ .../storage/stats/query_stats_metrics.go | 59 +++++++++ 7 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 src/dbnode/storage/stats/query_stats.go create mode 100644 src/dbnode/storage/stats/query_stats_metrics.go diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 5ca2aa1c3a..239a24526f 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -65,6 +65,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/cluster" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/storage/stats" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" xtchannel "github.com/m3db/m3/src/dbnode/x/tchannel" @@ -135,6 +136,9 @@ type RunOptions struct { // interrupt and shutdown the server. InterruptCh <-chan error + // QueryStatsTracker exposes an interface for tracking query stats. + QueryStatsTracker stats.QueryStatsTracker + // CustomOptions are custom options to apply to the session. CustomOptions []client.CustomAdminOption @@ -397,6 +401,15 @@ func Run(runOpts RunOptions) { } defer stopReporting() + // Setup query stats tracking. + tracker := runOpts.QueryStatsTracker + if runOpts.QueryStatsTracker == nil { + tracker = stats.DefaultQueryStatsTrackerForMetrics(iopts) + } + queryStats := stats.NewQueryStats(tracker) + queryStats.Start() + defer queryStats.Stop() + // FOLLOWUP(prateek): remove this once we have the runtime options<->index wiring done indexOpts := opts.IndexOptions() insertMode := index.InsertSync @@ -409,7 +422,8 @@ func Run(runOpts RunOptions) { CacheRegexp: plCacheConfig.CacheRegexpOrDefault(), CacheTerms: plCacheConfig.CacheTermsOrDefault(), }). - SetMmapReporter(mmapReporter) + SetMmapReporter(mmapReporter). + SetQueryStats(queryStats) opts = opts.SetIndexOptions(indexOpts) if tick := cfg.Tick; tick != nil { diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 1783b0e170..2060396ef6 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/index/segments" + "github.com/m3db/m3/src/dbnode/storage/stats" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" m3ninxindex "github.com/m3db/m3/src/m3ninx/index" @@ -147,6 +148,7 @@ type block struct { opts Options iopts instrument.Options nsMD namespace.Metadata + queryStats stats.QueryStats compact blockCompact @@ -231,6 +233,7 @@ func NewBlock( nsMD: md, metrics: newBlockMetrics(iopts.MetricsScope()), logger: iopts.Logger(), + queryStats: indexOpts.QueryStats(), } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator b.newExecutorFn = b.executorWithRLock @@ -965,6 +968,11 @@ func (b *block) addQueryResults( results BaseResults, batch []doc.Document, ) ([]doc.Document, int, error) { + // update recently queried docs to monitor memory. + if err := b.queryStats.Update(len(batch)); err != nil { + return batch, 0, err + } + // checkout the lifetime of the query before adding results. queryValid := cancellable.TryCheckout() if !queryValid { @@ -1198,6 +1206,11 @@ func (b *block) addAggregateResults( results AggregateResults, batch []AggregateResultsEntry, ) ([]AggregateResultsEntry, int, error) { + // update recently queried docs to monitor memory. + if err := b.queryStats.Update(len(batch)); err != nil { + return batch, 0, err + } + // checkout the lifetime of the query before adding results. queryValid := cancellable.TryCheckout() if !queryValid { diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index a8077c807c..6e1ebd899b 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/compaction" + "github.com/m3db/m3/src/dbnode/storage/stats" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/builder" @@ -1552,3 +1553,31 @@ func (mr *MockOptionsMockRecorder) MmapReporter() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MmapReporter", reflect.TypeOf((*MockOptions)(nil).MmapReporter)) } + +// SetQueryStats mocks base method +func (m *MockOptions) SetQueryStats(value stats.QueryStats) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetQueryStats", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetQueryStats indicates an expected call of SetQueryStats +func (mr *MockOptionsMockRecorder) SetQueryStats(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetQueryStats", reflect.TypeOf((*MockOptions)(nil).SetQueryStats), value) +} + +// QueryStats mocks base method +func (m *MockOptions) QueryStats() stats.QueryStats { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueryStats") + ret0, _ := ret[0].(stats.QueryStats) + return ret0 +} + +// QueryStats indicates an expected call of QueryStats +func (mr *MockOptionsMockRecorder) QueryStats() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryStats", reflect.TypeOf((*MockOptions)(nil).QueryStats)) +} diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index d0881133a1..15f7738d16 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/storage/index/compaction" + "github.com/m3db/m3/src/dbnode/storage/stats" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/builder" "github.com/m3db/m3/src/m3ninx/index/segment/fst" @@ -67,6 +68,7 @@ var ( errOptionsAggResultsEntryPoolUnspecified = errors.New("aggregate results entry array pool is unset") errIDGenerationDisabled = errors.New("id generation is disabled") errPostingsListCacheUnspecified = errors.New("postings list cache is unset") + errOptionsQueryStatsUnspecified = errors.New("query stats is unset") defaultForegroundCompactionOpts compaction.PlannerOptions defaultBackgroundCompactionOpts compaction.PlannerOptions @@ -122,6 +124,7 @@ type opts struct { postingsListCache *PostingsListCache readThroughSegmentOptions ReadThroughSegmentOptions mmapReporter mmap.Reporter + queryStats stats.QueryStats } var undefinedUUIDFn = func() ([]byte, error) { return nil, errIDGenerationDisabled } @@ -172,6 +175,7 @@ func NewOptions() Options { aggResultsEntryArrayPool: aggResultsEntryArrayPool, foregroundCompactionPlannerOpts: defaultForegroundCompactionOpts, backgroundCompactionPlannerOpts: defaultBackgroundCompactionOpts, + queryStats: stats.NoOpQueryStats(), } resultsPool.Init(func() QueryResults { return NewQueryResults(nil, QueryResultsOptions{}, opts) @@ -208,6 +212,9 @@ func (o *opts) Validate() error { if o.postingsListCache == nil { return errPostingsListCacheUnspecified } + if o.queryStats == nil { + return errOptionsQueryStatsUnspecified + } return nil } @@ -414,3 +421,13 @@ func (o *opts) SetMmapReporter(mmapReporter mmap.Reporter) Options { func (o *opts) MmapReporter() mmap.Reporter { return o.mmapReporter } + +func (o *opts) SetQueryStats(value stats.QueryStats) Options { + opts := *o + opts.queryStats = value + return &opts +} + +func (o *opts) QueryStats() stats.QueryStats { + return o.queryStats +} diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 67333142d0..1c5cad0d58 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/compaction" + "github.com/m3db/m3/src/dbnode/storage/stats" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/m3ninx/index/segment" @@ -907,4 +908,10 @@ type Options interface { // MmapReporter returns the mmap reporter. MmapReporter() mmap.Reporter + + // SetQueryStatsTracker sets current query stats. + SetQueryStats(value stats.QueryStats) Options + + // QueryStats returns the current query stats. + QueryStats() stats.QueryStats } diff --git a/src/dbnode/storage/stats/query_stats.go b/src/dbnode/storage/stats/query_stats.go new file mode 100644 index 0000000000..3b7671b12f --- /dev/null +++ b/src/dbnode/storage/stats/query_stats.go @@ -0,0 +1,121 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package stats + +import ( + "time" + + "go.uber.org/atomic" +) + +// For tracking query stats in past X duration such as blocks queried. +type queryStats struct { + tracker QueryStatsTracker + + recentDocs *atomic.Int64 + stopCh chan struct{} +} + +type noOpQueryStats struct { +} + +// QueryStats provides an interface for updating query stats. +type QueryStats interface { + Update(newDocs int) error + Start() + Stop() +} + +// QueryStatsTracker provides an interface for tracking current query stats. +type QueryStatsTracker interface { + Lookback() time.Duration + TrackDocs(recentDocs int) error +} + +// NewQueryStats enables query stats to be tracked within a recency lookback duration. +func NewQueryStats(tracker QueryStatsTracker) QueryStats { + return &queryStats{ + tracker: tracker, + recentDocs: atomic.NewInt64(0), + stopCh: make(chan struct{}), + } +} + +// NoOpQueryStats returns inactive query stats. +func NoOpQueryStats() QueryStats { + return &noOpQueryStats{} +} + +// UpdateQueryStats adds new query stats which are being tracked. +func (q *queryStats) Update(newDocs int) error { + if q == nil { + return nil + } + if newDocs <= 0 { + return nil + } + + // Add the new stats to the global state. + recentDocs := q.recentDocs.Add(int64(newDocs)) + + // Invoke the custom tracker based on the new stats values. + return q.tracker.TrackDocs(int(recentDocs)) +} + +// Start initializes background processing for handling query stats. +func (q *queryStats) Start() { + if q == nil { + return + } + go func() { + ticker := time.NewTicker(q.tracker.Lookback()) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // Clear recent docs every X duration. + q.recentDocs.Store(0) + + // Also invoke the track func for having zero value. + q.tracker.TrackDocs(0) + case <-q.stopCh: + return + } + } + }() +} + +func (q *queryStats) Stop() { + if q == nil { + return + } + close(q.stopCh) +} + +func (q *noOpQueryStats) Update(newDocs int) error { + return nil +} + +func (q *noOpQueryStats) Stop() { +} + +func (q *noOpQueryStats) Start() { +} diff --git a/src/dbnode/storage/stats/query_stats_metrics.go b/src/dbnode/storage/stats/query_stats_metrics.go new file mode 100644 index 0000000000..8cd4baf599 --- /dev/null +++ b/src/dbnode/storage/stats/query_stats_metrics.go @@ -0,0 +1,59 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package stats + +import ( + "time" + + "github.com/m3db/m3/src/x/instrument" + + "github.com/uber-go/tally" +) + +const defaultLookback = time.Second * 5 + +// Tracker implementation that emits query stats as metrics. +type queryStatsMetricsTracker struct { + recentDocs tally.Gauge + totalDocs tally.Counter +} + +// DefaultQueryStatsTrackerForMetrics provides a tracker +// implementation that emits query stats as metrics. +func DefaultQueryStatsTrackerForMetrics(opts instrument.Options) QueryStatsTracker { + scope := opts. + MetricsScope(). + SubScope("query-stats") + return &queryStatsMetricsTracker{ + recentDocs: scope.Gauge("recent-docs-per-block"), + totalDocs: scope.Counter("total-docs-per-block"), + } +} + +func (t *queryStatsMetricsTracker) TrackDocs(recentDocs int) error { + t.recentDocs.Update(float64(recentDocs)) + t.totalDocs.Inc(int64(recentDocs)) + return nil +} + +func (t *queryStatsMetricsTracker) Lookback() time.Duration { + return defaultLookback +}