Skip to content

Commit

Permalink
[dbnode] Repopulate cached searched in index active block (#3671)
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 authored Aug 27, 2021
1 parent 9641b9f commit 59fe97a
Show file tree
Hide file tree
Showing 48 changed files with 1,736 additions and 389 deletions.
16 changes: 14 additions & 2 deletions src/cmd/services/m3dbnode/config/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ package config
import "github.com/m3db/m3/src/dbnode/storage/series"

var (
defaultPostingsListCacheSize = 2 << 11 // 4096
defaultPostingsListCacheSize = 2 << 15 // ~65k
defaultPostingsListCacheRegexp = true
defaultPostingsListCacheTerms = true
defaultRegexpCacheSize = 256
defaultPostingsListCacheSearch = true
defaultRegexpCacheSize = 1024
)

// CacheConfigurations is the cache configurations.
Expand Down Expand Up @@ -87,6 +88,7 @@ type PostingsListCacheConfiguration struct {
Size *int `yaml:"size"`
CacheRegexp *bool `yaml:"cacheRegexp"`
CacheTerms *bool `yaml:"cacheTerms"`
CacheSearch *bool `yaml:"cacheSearch"`
}

// SizeOrDefault returns the provided size or the default value is none is
Expand Down Expand Up @@ -119,6 +121,16 @@ func (p PostingsListCacheConfiguration) CacheTermsOrDefault() bool {
return *p.CacheTerms
}

// CacheSearchOrDefault returns the provided cache search configuration value
// or the default value is none is provided.
func (p PostingsListCacheConfiguration) CacheSearchOrDefault() bool {
if p.CacheSearch == nil {
return defaultPostingsListCacheSearch
}

return *p.CacheSearch
}

// RegexpCacheConfiguration is a compiled regexp cache for query regexps.
type RegexpCacheConfiguration struct {
Size *int `yaml:"size"`
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ db:
size: 100
cacheRegexp: false
cacheTerms: false
cacheSearch: null
metrics:
prometheus:
Expand Down Expand Up @@ -429,6 +430,7 @@ func TestConfiguration(t *testing.T) {
size: 100
cacheRegexp: false
cacheTerms: false
cacheSearch: null
regexp: null
filesystem:
filePathPrefix: /var/lib/m3db
Expand Down
24 changes: 18 additions & 6 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,22 @@ func Run(runOpts RunOptions) {
SetMetricsScope(scope.SubScope("postings-list-cache")),
}
)
postingsListCache, err := index.NewPostingsListCache(plCacheSize, plCacheOptions)
segmentPostingsListCache, err := index.NewPostingsListCache(plCacheSize, plCacheOptions)
if err != nil {
logger.Fatal("could not construct postings list cache", zap.Error(err))
logger.Fatal("could not construct segment postings list cache", zap.Error(err))
}

segmentStopReporting := segmentPostingsListCache.Start()
defer segmentStopReporting()

searchPostingsListCache, err := index.NewPostingsListCache(plCacheSize, plCacheOptions)
if err != nil {
logger.Fatal("could not construct searches postings list cache", zap.Error(err))
}

searchStopReporting := searchPostingsListCache.Start()
defer searchStopReporting()

// Setup index regexp compilation cache.
m3ninxindex.SetRegexpCacheOptions(m3ninxindex.RegexpCacheOptions{
Size: cfg.Cache.RegexpConfiguration().SizeOrDefault(),
Expand All @@ -521,7 +532,6 @@ func Run(runOpts RunOptions) {
defer queryLimits.Stop()
seriesReadPermits.Start()
defer seriesReadPermits.Stop()
defer postingsListCache.Start()()

// FOLLOWUP(prateek): remove this once we have the runtime options<->index wiring done
indexOpts := opts.IndexOptions()
Expand All @@ -531,10 +541,12 @@ func Run(runOpts RunOptions) {
insertMode = index.InsertAsync
}
indexOpts = indexOpts.SetInsertMode(insertMode).
SetPostingsListCache(postingsListCache).
SetPostingsListCache(segmentPostingsListCache).
SetSearchPostingsListCache(searchPostingsListCache).
SetReadThroughSegmentOptions(index.ReadThroughSegmentOptions{
CacheRegexp: plCacheConfig.CacheRegexpOrDefault(),
CacheTerms: plCacheConfig.CacheTermsOrDefault(),
CacheRegexp: plCacheConfig.CacheRegexpOrDefault(),
CacheTerms: plCacheConfig.CacheTermsOrDefault(),
CacheSearches: plCacheConfig.CacheSearchOrDefault(),
}).
SetMmapReporter(mmapReporter).
SetQueryLimits(queryLimits)
Expand Down
38 changes: 24 additions & 14 deletions src/dbnode/storage/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Entry struct {
Series series.DatabaseSeries
Index uint64
IndexGarbageCollected *xatomic.Bool
insertTime *xatomic.Int64
indexWriter IndexWriter
curReadWriters int32
reverseIndex entryIndexState
Expand Down Expand Up @@ -101,6 +102,7 @@ func NewEntry(opts NewEntryOptions) *Entry {
Series: opts.Series,
Index: opts.Index,
IndexGarbageCollected: xatomic.NewBool(false),
insertTime: xatomic.NewInt64(0),
indexWriter: opts.IndexWriter,
nowFn: nowFn,
pendingIndexBatchSizeOne: make([]writes.PendingIndexInsert, 1),
Expand Down Expand Up @@ -234,16 +236,6 @@ func (entry *Entry) IfAlreadyIndexedMarkIndexSuccessAndFinalize(
// TryMarkIndexGarbageCollected checks if the entry is eligible to be garbage collected
// from the index. If so, it marks the entry as GCed and returns true. Otherwise returns false.
func (entry *Entry) TryMarkIndexGarbageCollected() bool {
return entry.checkNeedsIndexGarbageCollected(true)
}

// NeedsIndexGarbageCollected checks if the entry is eligible to be garbage collected
// from the index. If so, it marks the entry as GCed and returns true. Otherwise returns false.
func (entry *Entry) NeedsIndexGarbageCollected() bool {
return entry.checkNeedsIndexGarbageCollected(false)
}

func (entry *Entry) checkNeedsIndexGarbageCollected(mark bool) bool {
// Since series insertions + index insertions are done separately async, it is possible for
// a series to be in the index but not have data written yet, and so any series not in the
// lookup yet we cannot yet consider empty.
Expand All @@ -265,14 +257,32 @@ func (entry *Entry) checkNeedsIndexGarbageCollected(mark bool) bool {
return false
}

if mark {
// Mark as GCed from index so the entry can be safely cleaned up elsewhere.
entry.IndexGarbageCollected.Store(true)
}
// Mark as GCed from index so the entry can be safely cleaned up elsewhere.
entry.IndexGarbageCollected.Store(true)

return true
}

// NeedsIndexGarbageCollected checks if the entry is eligible to be garbage collected
// from the index. Otherwise returns false.
func (entry *Entry) NeedsIndexGarbageCollected() bool {
// This is a cheaper check that loading the entry from the shard again
// which makes it cheaper to run frequently.
// It may not be as accurate, but it's fine for an approximation since
// only a single series in a segment needs to return true to trigger an
// index segment to be garbage collected.
if entry.insertTime.Load() == 0 {
return false // Not inserted, does not need garbage collection.
}
// Check that a write is not potentially pending and the series is empty.
return entry.ReaderWriterCount() == 0 && entry.Series.IsEmpty()
}

// SetInsertTime marks the entry as having been inserted into the shard at a given timestamp.
func (entry *Entry) SetInsertTime(t time.Time) {
entry.insertTime.Store(t.UnixNano())
}

// Write writes a new value.
func (entry *Entry) Write(
ctx context.Context,
Expand Down
28 changes: 22 additions & 6 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ import (
"errors"
"fmt"
"io"
"math"
"runtime"
"sync"
"time"

opentracinglog "github.com/opentracing/opentracing-go/log"
"github.com/uber-go/tally"
"go.uber.org/zap"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/limits"
Expand All @@ -44,11 +50,8 @@ import (
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xresource "github.com/m3db/m3/src/x/resource"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

opentracinglog "github.com/opentracing/opentracing-go/log"
"github.com/uber-go/tally"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -133,6 +136,8 @@ type block struct {

state blockState

cachedSearchesWorkers xsync.WorkerPool

mutableSegments *mutableSegments
coldMutableSegments []*mutableSegments
shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType
Expand Down Expand Up @@ -235,11 +240,16 @@ func NewBlock(
scope := iopts.MetricsScope().SubScope("index").SubScope("block")
iopts = iopts.SetMetricsScope(scope)

cpus := int(math.Max(1, math.Ceil(0.25*float64(runtime.NumCPU()))))
cachedSearchesWorkers := xsync.NewWorkerPool(cpus)
cachedSearchesWorkers.Init()

segs := newMutableSegments(
md,
blockStart,
opts,
blockOpts,
cachedSearchesWorkers,
namespaceRuntimeOptsMgr,
iopts,
)
Expand All @@ -249,6 +259,7 @@ func NewBlock(
blockStart,
opts,
blockOpts,
cachedSearchesWorkers,
namespaceRuntimeOptsMgr,
iopts,
)
Expand All @@ -261,6 +272,7 @@ func NewBlock(
blockEnd: blockStart.Add(blockSize),
blockSize: blockSize,
blockOpts: blockOpts,
cachedSearchesWorkers: cachedSearchesWorkers,
mutableSegments: segs,
coldMutableSegments: coldMutableSegments,
shardRangesSegmentsByVolumeType: make(shardRangesSegmentsByVolumeType),
Expand Down Expand Up @@ -982,7 +994,10 @@ func (b *block) addResults(
}

var (
plCache = b.opts.PostingsListCache()
plCaches = ReadThroughSegmentCaches{
SegmentPostingsListCache: b.opts.PostingsListCache(),
SearchPostingsListCache: b.opts.SearchPostingsListCache(),
}
readThroughOpts = b.opts.ReadThroughSegmentOptions()
segments = results.Segments()
)
Expand All @@ -991,7 +1006,7 @@ func (b *block) addResults(
elem := seg.Segment()
if immSeg, ok := elem.(segment.ImmutableSegment); ok {
// only wrap the immutable segments with a read through cache.
elem = NewReadThroughSegment(immSeg, plCache, readThroughOpts)
elem = NewReadThroughSegment(immSeg, plCaches, readThroughOpts)
}
readThroughSegments = append(readThroughSegments, elem)
}
Expand Down Expand Up @@ -1225,6 +1240,7 @@ func (b *block) RotateColdMutableSegments() error {
b.blockStart,
b.opts,
b.blockOpts,
b.cachedSearchesWorkers,
b.namespaceRuntimeOptsMgr,
b.iopts,
)
Expand Down
15 changes: 12 additions & 3 deletions src/dbnode/storage/index/fields_terms_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/postings/roaring"
"github.com/m3db/m3/src/m3ninx/search"
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
)
Expand Down Expand Up @@ -109,13 +110,21 @@ func newFieldsAndTermsIterator(
}

// If need to restrict by query, run the query on the segment first.
searcher, err := opts.restrictByQuery.SearchQuery().Searcher()
searchQuery := opts.restrictByQuery.SearchQuery()
searcher, err := searchQuery.Searcher()
if err != nil {
return nil, err
}

_, sp := ctx.StartTraceSpan(tracepoint.FieldTermsIteratorIndexSearch)
pl, err := searcher.Search(iter.reader)
var (
_, sp = ctx.StartTraceSpan(tracepoint.FieldTermsIteratorIndexSearch)
pl postings.List
)
if readThrough, ok := reader.(search.ReadThroughSegmentSearcher); ok {
pl, err = readThrough.Search(searchQuery, searcher)
} else {
pl, err = searcher.Search(reader)
}
sp.Finish()
if err != nil {
return nil, err
Expand Down
28 changes: 28 additions & 0 deletions src/dbnode/storage/index/index_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 59fe97a

Please sign in to comment.