Skip to content

Commit

Permalink
[dbnode] Track blocks recently queried within some duration in dbnode (
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 authored Apr 15, 2020
1 parent f67aab5 commit c46e22d
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 1 deletion.
16 changes: 15 additions & 1 deletion src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/cluster"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/dbnode/storage/stats"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/ts"
xtchannel "github.com/m3db/m3/src/dbnode/x/tchannel"
Expand Down Expand Up @@ -135,6 +136,9 @@ type RunOptions struct {
// interrupt and shutdown the server.
InterruptCh <-chan error

// QueryStatsTracker exposes an interface for tracking query stats.
QueryStatsTracker stats.QueryStatsTracker

// CustomOptions are custom options to apply to the session.
CustomOptions []client.CustomAdminOption

Expand Down Expand Up @@ -397,6 +401,15 @@ func Run(runOpts RunOptions) {
}
defer stopReporting()

// Setup query stats tracking.
tracker := runOpts.QueryStatsTracker
if runOpts.QueryStatsTracker == nil {
tracker = stats.DefaultQueryStatsTrackerForMetrics(iopts)
}
queryStats := stats.NewQueryStats(tracker)
queryStats.Start()
defer queryStats.Stop()

// FOLLOWUP(prateek): remove this once we have the runtime options<->index wiring done
indexOpts := opts.IndexOptions()
insertMode := index.InsertSync
Expand All @@ -409,7 +422,8 @@ func Run(runOpts RunOptions) {
CacheRegexp: plCacheConfig.CacheRegexpOrDefault(),
CacheTerms: plCacheConfig.CacheTermsOrDefault(),
}).
SetMmapReporter(mmapReporter)
SetMmapReporter(mmapReporter).
SetQueryStats(queryStats)
opts = opts.SetIndexOptions(indexOpts)

if tick := cfg.Tick; tick != nil {
Expand Down
13 changes: 13 additions & 0 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
"github.com/m3db/m3/src/dbnode/storage/index/segments"
"github.com/m3db/m3/src/dbnode/storage/stats"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/m3ninx/doc"
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
Expand Down Expand Up @@ -147,6 +148,7 @@ type block struct {
opts Options
iopts instrument.Options
nsMD namespace.Metadata
queryStats stats.QueryStats

compact blockCompact

Expand Down Expand Up @@ -231,6 +233,7 @@ func NewBlock(
nsMD: md,
metrics: newBlockMetrics(iopts.MetricsScope()),
logger: iopts.Logger(),
queryStats: indexOpts.QueryStats(),
}
b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator
b.newExecutorFn = b.executorWithRLock
Expand Down Expand Up @@ -965,6 +968,11 @@ func (b *block) addQueryResults(
results BaseResults,
batch []doc.Document,
) ([]doc.Document, int, error) {
// update recently queried docs to monitor memory.
if err := b.queryStats.Update(len(batch)); err != nil {
return batch, 0, err
}

// checkout the lifetime of the query before adding results.
queryValid := cancellable.TryCheckout()
if !queryValid {
Expand Down Expand Up @@ -1198,6 +1206,11 @@ func (b *block) addAggregateResults(
results AggregateResults,
batch []AggregateResultsEntry,
) ([]AggregateResultsEntry, int, error) {
// update recently queried docs to monitor memory.
if err := b.queryStats.Update(len(batch)); err != nil {
return batch, 0, err
}

// checkout the lifetime of the query before adding results.
queryValid := cancellable.TryCheckout()
if !queryValid {
Expand Down
29 changes: 29 additions & 0 deletions src/dbnode/storage/index/index_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions src/dbnode/storage/index/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
"github.com/m3db/m3/src/dbnode/storage/stats"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index/segment/builder"
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
Expand Down Expand Up @@ -67,6 +68,7 @@ var (
errOptionsAggResultsEntryPoolUnspecified = errors.New("aggregate results entry array pool is unset")
errIDGenerationDisabled = errors.New("id generation is disabled")
errPostingsListCacheUnspecified = errors.New("postings list cache is unset")
errOptionsQueryStatsUnspecified = errors.New("query stats is unset")

defaultForegroundCompactionOpts compaction.PlannerOptions
defaultBackgroundCompactionOpts compaction.PlannerOptions
Expand Down Expand Up @@ -122,6 +124,7 @@ type opts struct {
postingsListCache *PostingsListCache
readThroughSegmentOptions ReadThroughSegmentOptions
mmapReporter mmap.Reporter
queryStats stats.QueryStats
}

var undefinedUUIDFn = func() ([]byte, error) { return nil, errIDGenerationDisabled }
Expand Down Expand Up @@ -172,6 +175,7 @@ func NewOptions() Options {
aggResultsEntryArrayPool: aggResultsEntryArrayPool,
foregroundCompactionPlannerOpts: defaultForegroundCompactionOpts,
backgroundCompactionPlannerOpts: defaultBackgroundCompactionOpts,
queryStats: stats.NoOpQueryStats(),
}
resultsPool.Init(func() QueryResults {
return NewQueryResults(nil, QueryResultsOptions{}, opts)
Expand Down Expand Up @@ -208,6 +212,9 @@ func (o *opts) Validate() error {
if o.postingsListCache == nil {
return errPostingsListCacheUnspecified
}
if o.queryStats == nil {
return errOptionsQueryStatsUnspecified
}
return nil
}

Expand Down Expand Up @@ -414,3 +421,13 @@ func (o *opts) SetMmapReporter(mmapReporter mmap.Reporter) Options {
func (o *opts) MmapReporter() mmap.Reporter {
return o.mmapReporter
}

func (o *opts) SetQueryStats(value stats.QueryStats) Options {
opts := *o
opts.queryStats = value
return &opts
}

func (o *opts) QueryStats() stats.QueryStats {
return o.queryStats
}
7 changes: 7 additions & 0 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
"github.com/m3db/m3/src/dbnode/storage/stats"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/m3ninx/index/segment"
Expand Down Expand Up @@ -907,4 +908,10 @@ type Options interface {

// MmapReporter returns the mmap reporter.
MmapReporter() mmap.Reporter

// SetQueryStatsTracker sets current query stats.
SetQueryStats(value stats.QueryStats) Options

// QueryStats returns the current query stats.
QueryStats() stats.QueryStats
}
121 changes: 121 additions & 0 deletions src/dbnode/storage/stats/query_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package stats

import (
"time"

"go.uber.org/atomic"
)

// For tracking query stats in past X duration such as blocks queried.
type queryStats struct {
tracker QueryStatsTracker

recentDocs *atomic.Int64
stopCh chan struct{}
}

type noOpQueryStats struct {
}

// QueryStats provides an interface for updating query stats.
type QueryStats interface {
Update(newDocs int) error
Start()
Stop()
}

// QueryStatsTracker provides an interface for tracking current query stats.
type QueryStatsTracker interface {
Lookback() time.Duration
TrackDocs(recentDocs int) error
}

// NewQueryStats enables query stats to be tracked within a recency lookback duration.
func NewQueryStats(tracker QueryStatsTracker) QueryStats {
return &queryStats{
tracker: tracker,
recentDocs: atomic.NewInt64(0),
stopCh: make(chan struct{}),
}
}

// NoOpQueryStats returns inactive query stats.
func NoOpQueryStats() QueryStats {
return &noOpQueryStats{}
}

// UpdateQueryStats adds new query stats which are being tracked.
func (q *queryStats) Update(newDocs int) error {
if q == nil {
return nil
}
if newDocs <= 0 {
return nil
}

// Add the new stats to the global state.
recentDocs := q.recentDocs.Add(int64(newDocs))

// Invoke the custom tracker based on the new stats values.
return q.tracker.TrackDocs(int(recentDocs))
}

// Start initializes background processing for handling query stats.
func (q *queryStats) Start() {
if q == nil {
return
}
go func() {
ticker := time.NewTicker(q.tracker.Lookback())
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Clear recent docs every X duration.
q.recentDocs.Store(0)

// Also invoke the track func for having zero value.
q.tracker.TrackDocs(0)
case <-q.stopCh:
return
}
}
}()
}

func (q *queryStats) Stop() {
if q == nil {
return
}
close(q.stopCh)
}

func (q *noOpQueryStats) Update(newDocs int) error {
return nil
}

func (q *noOpQueryStats) Stop() {
}

func (q *noOpQueryStats) Start() {
}
59 changes: 59 additions & 0 deletions src/dbnode/storage/stats/query_stats_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package stats

import (
"time"

"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
)

const defaultLookback = time.Second * 5

// Tracker implementation that emits query stats as metrics.
type queryStatsMetricsTracker struct {
recentDocs tally.Gauge
totalDocs tally.Counter
}

// DefaultQueryStatsTrackerForMetrics provides a tracker
// implementation that emits query stats as metrics.
func DefaultQueryStatsTrackerForMetrics(opts instrument.Options) QueryStatsTracker {
scope := opts.
MetricsScope().
SubScope("query-stats")
return &queryStatsMetricsTracker{
recentDocs: scope.Gauge("recent-docs-per-block"),
totalDocs: scope.Counter("total-docs-per-block"),
}
}

func (t *queryStatsMetricsTracker) TrackDocs(recentDocs int) error {
t.recentDocs.Update(float64(recentDocs))
t.totalDocs.Inc(int64(recentDocs))
return nil
}

func (t *queryStatsMetricsTracker) Lookback() time.Duration {
return defaultLookback
}

0 comments on commit c46e22d

Please sign in to comment.