From 59fe97a160c7365181decb215549f1a77f419fda Mon Sep 17 00:00:00 2001 From: Ryan Allen Date: Thu, 26 Aug 2021 21:40:15 -0500 Subject: [PATCH] [dbnode] Repopulate cached searched in index active block (#3671) --- src/cmd/services/m3dbnode/config/cache.go | 16 +- .../services/m3dbnode/config/config_test.go | 2 + src/dbnode/server/server.go | 24 +- src/dbnode/storage/entry.go | 38 +- src/dbnode/storage/index/block.go | 28 +- .../storage/index/fields_terms_iterator.go | 15 +- src/dbnode/storage/index/index_mock.go | 28 ++ src/dbnode/storage/index/mutable_segments.go | 422 ++++++++++++++++-- .../storage/index/mutable_segments_test.go | 275 ++++++++++++ src/dbnode/storage/index/options.go | 105 +++-- .../storage/index/postings_list_cache.go | 228 ++++++++-- .../storage/index/postings_list_cache_lru.go | 195 ++++++-- .../index/postings_list_cache_lru_test.go | 14 +- .../storage/index/postings_list_cache_test.go | 72 ++- .../storage/index/read_through_segment.go | 163 +++++-- .../index/read_through_segment_test.go | 87 ++-- src/dbnode/storage/index/segments.go | 9 +- src/dbnode/storage/index/types.go | 6 + src/dbnode/storage/shard.go | 1 + src/m3ninx/index/index_mock.go | 4 +- src/m3ninx/index/segment/fst/segment.go | 2 +- src/m3ninx/index/segment/mem/reader.go | 2 +- src/m3ninx/index/segment/segment_mock.go | 4 +- src/m3ninx/index/types.go | 2 +- src/m3ninx/search/executor/executor.go | 10 +- src/m3ninx/search/executor/executor_test.go | 10 +- src/m3ninx/search/executor/iterator.go | 25 +- src/m3ninx/search/executor/iterator_test.go | 40 +- src/m3ninx/search/query/all.go | 6 +- src/m3ninx/search/query/codec.go | 11 +- src/m3ninx/search/query/conjunction.go | 34 +- src/m3ninx/search/query/conjunction_test.go | 2 +- src/m3ninx/search/query/disjunction.go | 25 +- src/m3ninx/search/query/disjunction_test.go | 2 +- src/m3ninx/search/query/field.go | 17 +- src/m3ninx/search/query/negation.go | 21 +- src/m3ninx/search/query/regexp.go | 24 +- src/m3ninx/search/query/term.go | 22 +- src/m3ninx/search/query/util.go | 24 +- src/m3ninx/search/query/util_test.go | 8 +- src/m3ninx/search/search_mock.go | 38 ++ src/m3ninx/search/searcher/negation.go | 18 +- src/m3ninx/search/types.go | 6 + src/m3ninx/x/safe_closer.go | 13 +- src/query/storage/index_test.go | 16 +- src/x/sync/fast_worker_pool.go | 4 + src/x/sync/types.go | 3 + src/x/sync/worker_pool.go | 4 + 48 files changed, 1736 insertions(+), 389 deletions(-) create mode 100644 src/dbnode/storage/index/mutable_segments_test.go diff --git a/src/cmd/services/m3dbnode/config/cache.go b/src/cmd/services/m3dbnode/config/cache.go index 65f72dcf14..49b84bd2e8 100644 --- a/src/cmd/services/m3dbnode/config/cache.go +++ b/src/cmd/services/m3dbnode/config/cache.go @@ -23,10 +23,11 @@ package config import "github.com/m3db/m3/src/dbnode/storage/series" var ( - defaultPostingsListCacheSize = 2 << 11 // 4096 + defaultPostingsListCacheSize = 2 << 15 // ~65k defaultPostingsListCacheRegexp = true defaultPostingsListCacheTerms = true - defaultRegexpCacheSize = 256 + defaultPostingsListCacheSearch = true + defaultRegexpCacheSize = 1024 ) // CacheConfigurations is the cache configurations. @@ -87,6 +88,7 @@ type PostingsListCacheConfiguration struct { Size *int `yaml:"size"` CacheRegexp *bool `yaml:"cacheRegexp"` CacheTerms *bool `yaml:"cacheTerms"` + CacheSearch *bool `yaml:"cacheSearch"` } // SizeOrDefault returns the provided size or the default value is none is @@ -119,6 +121,16 @@ func (p PostingsListCacheConfiguration) CacheTermsOrDefault() bool { return *p.CacheTerms } +// CacheSearchOrDefault returns the provided cache search configuration value +// or the default value is none is provided. +func (p PostingsListCacheConfiguration) CacheSearchOrDefault() bool { + if p.CacheSearch == nil { + return defaultPostingsListCacheSearch + } + + return *p.CacheSearch +} + // RegexpCacheConfiguration is a compiled regexp cache for query regexps. type RegexpCacheConfiguration struct { Size *int `yaml:"size"` diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index dfd7b4055a..286bae0aaa 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -53,6 +53,7 @@ db: size: 100 cacheRegexp: false cacheTerms: false + cacheSearch: null metrics: prometheus: @@ -429,6 +430,7 @@ func TestConfiguration(t *testing.T) { size: 100 cacheRegexp: false cacheTerms: false + cacheSearch: null regexp: null filesystem: filePathPrefix: /var/lib/m3db diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index de2dec45c5..77b8aa44e2 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -497,11 +497,22 @@ func Run(runOpts RunOptions) { SetMetricsScope(scope.SubScope("postings-list-cache")), } ) - postingsListCache, err := index.NewPostingsListCache(plCacheSize, plCacheOptions) + segmentPostingsListCache, err := index.NewPostingsListCache(plCacheSize, plCacheOptions) if err != nil { - logger.Fatal("could not construct postings list cache", zap.Error(err)) + logger.Fatal("could not construct segment postings list cache", zap.Error(err)) } + segmentStopReporting := segmentPostingsListCache.Start() + defer segmentStopReporting() + + searchPostingsListCache, err := index.NewPostingsListCache(plCacheSize, plCacheOptions) + if err != nil { + logger.Fatal("could not construct searches postings list cache", zap.Error(err)) + } + + searchStopReporting := searchPostingsListCache.Start() + defer searchStopReporting() + // Setup index regexp compilation cache. m3ninxindex.SetRegexpCacheOptions(m3ninxindex.RegexpCacheOptions{ Size: cfg.Cache.RegexpConfiguration().SizeOrDefault(), @@ -521,7 +532,6 @@ func Run(runOpts RunOptions) { defer queryLimits.Stop() seriesReadPermits.Start() defer seriesReadPermits.Stop() - defer postingsListCache.Start()() // FOLLOWUP(prateek): remove this once we have the runtime options<->index wiring done indexOpts := opts.IndexOptions() @@ -531,10 +541,12 @@ func Run(runOpts RunOptions) { insertMode = index.InsertAsync } indexOpts = indexOpts.SetInsertMode(insertMode). - SetPostingsListCache(postingsListCache). + SetPostingsListCache(segmentPostingsListCache). + SetSearchPostingsListCache(searchPostingsListCache). SetReadThroughSegmentOptions(index.ReadThroughSegmentOptions{ - CacheRegexp: plCacheConfig.CacheRegexpOrDefault(), - CacheTerms: plCacheConfig.CacheTermsOrDefault(), + CacheRegexp: plCacheConfig.CacheRegexpOrDefault(), + CacheTerms: plCacheConfig.CacheTermsOrDefault(), + CacheSearches: plCacheConfig.CacheSearchOrDefault(), }). SetMmapReporter(mmapReporter). SetQueryLimits(queryLimits) diff --git a/src/dbnode/storage/entry.go b/src/dbnode/storage/entry.go index 65c300078f..d49039b673 100644 --- a/src/dbnode/storage/entry.go +++ b/src/dbnode/storage/entry.go @@ -65,6 +65,7 @@ type Entry struct { Series series.DatabaseSeries Index uint64 IndexGarbageCollected *xatomic.Bool + insertTime *xatomic.Int64 indexWriter IndexWriter curReadWriters int32 reverseIndex entryIndexState @@ -101,6 +102,7 @@ func NewEntry(opts NewEntryOptions) *Entry { Series: opts.Series, Index: opts.Index, IndexGarbageCollected: xatomic.NewBool(false), + insertTime: xatomic.NewInt64(0), indexWriter: opts.IndexWriter, nowFn: nowFn, pendingIndexBatchSizeOne: make([]writes.PendingIndexInsert, 1), @@ -234,16 +236,6 @@ func (entry *Entry) IfAlreadyIndexedMarkIndexSuccessAndFinalize( // TryMarkIndexGarbageCollected checks if the entry is eligible to be garbage collected // from the index. If so, it marks the entry as GCed and returns true. Otherwise returns false. func (entry *Entry) TryMarkIndexGarbageCollected() bool { - return entry.checkNeedsIndexGarbageCollected(true) -} - -// NeedsIndexGarbageCollected checks if the entry is eligible to be garbage collected -// from the index. If so, it marks the entry as GCed and returns true. Otherwise returns false. -func (entry *Entry) NeedsIndexGarbageCollected() bool { - return entry.checkNeedsIndexGarbageCollected(false) -} - -func (entry *Entry) checkNeedsIndexGarbageCollected(mark bool) bool { // Since series insertions + index insertions are done separately async, it is possible for // a series to be in the index but not have data written yet, and so any series not in the // lookup yet we cannot yet consider empty. @@ -265,14 +257,32 @@ func (entry *Entry) checkNeedsIndexGarbageCollected(mark bool) bool { return false } - if mark { - // Mark as GCed from index so the entry can be safely cleaned up elsewhere. - entry.IndexGarbageCollected.Store(true) - } + // Mark as GCed from index so the entry can be safely cleaned up elsewhere. + entry.IndexGarbageCollected.Store(true) return true } +// NeedsIndexGarbageCollected checks if the entry is eligible to be garbage collected +// from the index. Otherwise returns false. +func (entry *Entry) NeedsIndexGarbageCollected() bool { + // This is a cheaper check that loading the entry from the shard again + // which makes it cheaper to run frequently. + // It may not be as accurate, but it's fine for an approximation since + // only a single series in a segment needs to return true to trigger an + // index segment to be garbage collected. + if entry.insertTime.Load() == 0 { + return false // Not inserted, does not need garbage collection. + } + // Check that a write is not potentially pending and the series is empty. + return entry.ReaderWriterCount() == 0 && entry.Series.IsEmpty() +} + +// SetInsertTime marks the entry as having been inserted into the shard at a given timestamp. +func (entry *Entry) SetInsertTime(t time.Time) { + entry.insertTime.Store(t.UnixNano()) +} + // Write writes a new value. func (entry *Entry) Write( ctx context.Context, diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 2e0acc557c..164ee4b7e6 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -25,9 +25,15 @@ import ( "errors" "fmt" "io" + "math" + "runtime" "sync" "time" + opentracinglog "github.com/opentracing/opentracing-go/log" + "github.com/uber-go/tally" + "go.uber.org/zap" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/limits" @@ -44,11 +50,8 @@ import ( "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xresource "github.com/m3db/m3/src/x/resource" + xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" - - opentracinglog "github.com/opentracing/opentracing-go/log" - "github.com/uber-go/tally" - "go.uber.org/zap" ) var ( @@ -133,6 +136,8 @@ type block struct { state blockState + cachedSearchesWorkers xsync.WorkerPool + mutableSegments *mutableSegments coldMutableSegments []*mutableSegments shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType @@ -235,11 +240,16 @@ func NewBlock( scope := iopts.MetricsScope().SubScope("index").SubScope("block") iopts = iopts.SetMetricsScope(scope) + cpus := int(math.Max(1, math.Ceil(0.25*float64(runtime.NumCPU())))) + cachedSearchesWorkers := xsync.NewWorkerPool(cpus) + cachedSearchesWorkers.Init() + segs := newMutableSegments( md, blockStart, opts, blockOpts, + cachedSearchesWorkers, namespaceRuntimeOptsMgr, iopts, ) @@ -249,6 +259,7 @@ func NewBlock( blockStart, opts, blockOpts, + cachedSearchesWorkers, namespaceRuntimeOptsMgr, iopts, ) @@ -261,6 +272,7 @@ func NewBlock( blockEnd: blockStart.Add(blockSize), blockSize: blockSize, blockOpts: blockOpts, + cachedSearchesWorkers: cachedSearchesWorkers, mutableSegments: segs, coldMutableSegments: coldMutableSegments, shardRangesSegmentsByVolumeType: make(shardRangesSegmentsByVolumeType), @@ -982,7 +994,10 @@ func (b *block) addResults( } var ( - plCache = b.opts.PostingsListCache() + plCaches = ReadThroughSegmentCaches{ + SegmentPostingsListCache: b.opts.PostingsListCache(), + SearchPostingsListCache: b.opts.SearchPostingsListCache(), + } readThroughOpts = b.opts.ReadThroughSegmentOptions() segments = results.Segments() ) @@ -991,7 +1006,7 @@ func (b *block) addResults( elem := seg.Segment() if immSeg, ok := elem.(segment.ImmutableSegment); ok { // only wrap the immutable segments with a read through cache. - elem = NewReadThroughSegment(immSeg, plCache, readThroughOpts) + elem = NewReadThroughSegment(immSeg, plCaches, readThroughOpts) } readThroughSegments = append(readThroughSegments, elem) } @@ -1225,6 +1240,7 @@ func (b *block) RotateColdMutableSegments() error { b.blockStart, b.opts, b.blockOpts, + b.cachedSearchesWorkers, b.namespaceRuntimeOptsMgr, b.iopts, ) diff --git a/src/dbnode/storage/index/fields_terms_iterator.go b/src/dbnode/storage/index/fields_terms_iterator.go index 45f98a84e6..d7b93b25f8 100644 --- a/src/dbnode/storage/index/fields_terms_iterator.go +++ b/src/dbnode/storage/index/fields_terms_iterator.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/postings/roaring" + "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" ) @@ -109,13 +110,21 @@ func newFieldsAndTermsIterator( } // If need to restrict by query, run the query on the segment first. - searcher, err := opts.restrictByQuery.SearchQuery().Searcher() + searchQuery := opts.restrictByQuery.SearchQuery() + searcher, err := searchQuery.Searcher() if err != nil { return nil, err } - _, sp := ctx.StartTraceSpan(tracepoint.FieldTermsIteratorIndexSearch) - pl, err := searcher.Search(iter.reader) + var ( + _, sp = ctx.StartTraceSpan(tracepoint.FieldTermsIteratorIndexSearch) + pl postings.List + ) + if readThrough, ok := reader.(search.ReadThroughSegmentSearcher); ok { + pl, err = readThrough.Search(searchQuery, searcher) + } else { + pl, err = searcher.Search(reader) + } sp.Finish() if err != nil { return nil, err diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index cc3e682f0f..8bda7a3570 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -1951,6 +1951,20 @@ func (mr *MockOptionsMockRecorder) ReadThroughSegmentOptions() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadThroughSegmentOptions", reflect.TypeOf((*MockOptions)(nil).ReadThroughSegmentOptions)) } +// SearchPostingsListCache mocks base method. +func (m *MockOptions) SearchPostingsListCache() *PostingsListCache { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchPostingsListCache") + ret0, _ := ret[0].(*PostingsListCache) + return ret0 +} + +// SearchPostingsListCache indicates an expected call of SearchPostingsListCache. +func (mr *MockOptionsMockRecorder) SearchPostingsListCache() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchPostingsListCache", reflect.TypeOf((*MockOptions)(nil).SearchPostingsListCache)) +} + // SegmentBuilderOptions mocks base method. func (m *MockOptions) SegmentBuilderOptions() builder.Options { m.ctrl.T.Helper() @@ -2259,6 +2273,20 @@ func (mr *MockOptionsMockRecorder) SetReadThroughSegmentOptions(value interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetReadThroughSegmentOptions", reflect.TypeOf((*MockOptions)(nil).SetReadThroughSegmentOptions), value) } +// SetSearchPostingsListCache mocks base method. +func (m *MockOptions) SetSearchPostingsListCache(value *PostingsListCache) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetSearchPostingsListCache", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetSearchPostingsListCache indicates an expected call of SetSearchPostingsListCache. +func (mr *MockOptionsMockRecorder) SetSearchPostingsListCache(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSearchPostingsListCache", reflect.TypeOf((*MockOptions)(nil).SetSearchPostingsListCache), value) +} + // SetSegmentBuilderOptions mocks base method. func (m *MockOptions) SetSegmentBuilderOptions(value builder.Options) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/index/mutable_segments.go b/src/dbnode/storage/index/mutable_segments.go index fe6707681a..ee4d066dc0 100644 --- a/src/dbnode/storage/index/mutable_segments.go +++ b/src/dbnode/storage/index/mutable_segments.go @@ -23,27 +23,38 @@ package index import ( "errors" "fmt" + "io" "math" "runtime" "sync" "time" + bitmap "github.com/m3dbx/pilosa/roaring" "github.com/uber-go/tally" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/index/segments" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" m3ninxindex "github.com/m3db/m3/src/m3ninx/index" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/builder" "github.com/m3db/m3/src/m3ninx/index/segment/fst" + "github.com/m3db/m3/src/m3ninx/postings" + "github.com/m3db/m3/src/m3ninx/postings/pilosa" + "github.com/m3db/m3/src/m3ninx/postings/roaring" + "github.com/m3db/m3/src/m3ninx/search" + "github.com/m3db/m3/src/m3ninx/search/query" "github.com/m3db/m3/src/m3ninx/x" "github.com/m3db/m3/src/x/context" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" xresource "github.com/m3db/m3/src/x/resource" + xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" ) @@ -63,6 +74,8 @@ type mutableSegmentsState uint const ( mutableSegmentsStateOpen mutableSegmentsState = iota mutableSegmentsStateClosed mutableSegmentsState = iota + + segmentCheckInactiveSeriesMinInterval = 5 * time.Minute ) // nolint: maligned @@ -82,22 +95,37 @@ type mutableSegments struct { iopts instrument.Options optsListener xresource.SimpleCloser writeIndexingConcurrency int + cachedSearchesWorkers xsync.WorkerPool seriesActiveFn segment.DocumentsFilter metrics mutableSegmentsMetrics logger *zap.Logger + + // For testing purposes. + backgroundCompactDisable bool } type mutableSegmentsMetrics struct { - foregroundCompactionPlanRunLatency tally.Timer - foregroundCompactionTaskRunLatency tally.Timer - backgroundCompactionPlanRunLatency tally.Timer - backgroundCompactionTaskRunLatency tally.Timer - activeBlockIndexNew tally.Counter - activeBlockGarbageCollectSegment tally.Counter - activeBlockGarbageCollectSeries tally.Counter - activeBlockGarbageCollectEmptySegment tally.Counter + foregroundCompactionPlanRunLatency tally.Timer + foregroundCompactionTaskRunLatency tally.Timer + backgroundCompactionPlanRunLatency tally.Timer + backgroundCompactionTaskRunLatency tally.Timer + activeBlockIndexNew tally.Counter + activeBlockGarbageCollectSegment tally.Counter + activeBlockGarbageCollectSeries tally.Counter + activeBlockGarbageCollectEmptySegment tally.Counter + activeBlockGarbageCollectCachedSearchesDisabled tally.Counter + activeBlockGarbageCollectCachedSearchesInRegistry tally.Counter + activeBlockGarbageCollectCachedSearchesNotInRegistry tally.Counter + activeBlockGarbageCollectCachedSearchesTotal tally.Histogram + activeBlockGarbageCollectCachedSearchesMatched tally.Histogram + activeBlockGarbageCollectReconstructCachedSearchEvalSkip tally.Counter + activeBlockGarbageCollectReconstructCachedSearchEvalAttempt tally.Counter + activeBlockGarbageCollectReconstructCachedSearchCacheHit tally.Counter + activeBlockGarbageCollectReconstructCachedSearchCacheMiss tally.Counter + activeBlockGarbageCollectReconstructCachedSearchExecSuccess tally.Counter + activeBlockGarbageCollectReconstructCachedSearchExecError tally.Counter } func newMutableSegmentsMetrics(s tally.Scope) mutableSegmentsMetrics { @@ -112,9 +140,38 @@ func newMutableSegmentsMetrics(s tally.Scope) mutableSegmentsMetrics { activeBlockIndexNew: activeBlockScope.Tagged(map[string]string{ "result_type": "new", }).Counter("index-result"), - activeBlockGarbageCollectSegment: activeBlockScope.Counter("gc-segment"), - activeBlockGarbageCollectSeries: activeBlockScope.Counter("gc-series"), - activeBlockGarbageCollectEmptySegment: activeBlockScope.Counter("gc-empty-segment"), + activeBlockGarbageCollectSegment: activeBlockScope.Counter("gc-segment"), + activeBlockGarbageCollectSeries: activeBlockScope.Counter("gc-series"), + activeBlockGarbageCollectEmptySegment: backgroundScope.Counter("gc-empty-segment"), + activeBlockGarbageCollectCachedSearchesDisabled: backgroundScope.Counter("gc-cached-searches-disabled"), + activeBlockGarbageCollectCachedSearchesInRegistry: backgroundScope.Tagged(map[string]string{ + "found": "true", + }).Counter("gc-cached-searches-in-registry"), + activeBlockGarbageCollectCachedSearchesNotInRegistry: backgroundScope.Tagged(map[string]string{ + "found": "false", + }).Counter("gc-cached-searches-in-registry"), + activeBlockGarbageCollectCachedSearchesTotal: backgroundScope.Histogram("gc-cached-searches-total", + append(tally.ValueBuckets{0, 1}, tally.MustMakeExponentialValueBuckets(2, 2, 12)...)), + activeBlockGarbageCollectCachedSearchesMatched: backgroundScope.Histogram("gc-cached-searches-matched", + append(tally.ValueBuckets{0, 1}, tally.MustMakeExponentialValueBuckets(2, 2, 12)...)), + activeBlockGarbageCollectReconstructCachedSearchEvalSkip: backgroundScope.Tagged(map[string]string{ + "eval_type": "skip", + }).Counter("gc-reconstruct-cached-search-eval"), + activeBlockGarbageCollectReconstructCachedSearchEvalAttempt: backgroundScope.Tagged(map[string]string{ + "eval_type": "attempt", + }).Counter("gc-reconstruct-cached-search-eval"), + activeBlockGarbageCollectReconstructCachedSearchCacheHit: backgroundScope.Tagged(map[string]string{ + "result_type": "cache_hit", + }).Counter("gc-reconstruct-cached-search-cache-result"), + activeBlockGarbageCollectReconstructCachedSearchCacheMiss: backgroundScope.Tagged(map[string]string{ + "result_type": "cache_miss", + }).Counter("gc-reconstruct-cached-search-cache-result"), + activeBlockGarbageCollectReconstructCachedSearchExecSuccess: backgroundScope.Tagged(map[string]string{ + "result_type": "success", + }).Counter("gc-reconstruct-cached-search-exec-result"), + activeBlockGarbageCollectReconstructCachedSearchExecError: backgroundScope.Tagged(map[string]string{ + "result_type": "error", + }).Counter("gc-reconstruct-cached-search-exec-result"), } } @@ -125,18 +182,20 @@ func newMutableSegments( blockStart xtime.UnixNano, opts Options, blockOpts BlockOptions, + cachedSearchesWorkers xsync.WorkerPool, namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager, iopts instrument.Options, ) *mutableSegments { m := &mutableSegments{ - blockStart: blockStart, - blockSize: md.Options().IndexOptions().BlockSize(), - opts: opts, - blockOpts: blockOpts, - compact: mutableSegmentsCompact{opts: opts, blockOpts: blockOpts}, - iopts: iopts, - metrics: newMutableSegmentsMetrics(iopts.MetricsScope()), - logger: iopts.Logger(), + blockStart: blockStart, + blockSize: md.Options().IndexOptions().BlockSize(), + opts: opts, + blockOpts: blockOpts, + compact: mutableSegmentsCompact{opts: opts, blockOpts: blockOpts}, + cachedSearchesWorkers: cachedSearchesWorkers, + iopts: iopts, + metrics: newMutableSegmentsMetrics(iopts.MetricsScope()), + logger: iopts.Logger(), } m.seriesActiveFn = segment.DocumentsFilterFn(m.seriesActive) m.optsListener = namespaceRuntimeOptsMgr.RegisterListener(m) @@ -377,11 +436,11 @@ func (m *mutableSegments) Close() { } func (m *mutableSegments) maybeBackgroundCompactWithLock() { - if m.compact.compactingBackgroundStandard { + if m.compact.compactingBackgroundStandard || m.backgroundCompactDisable { return } - m.backgroundCompactWithLock() + m.backgroundCompactWithLock(false) } // BackgroundCompact background compacts eligible segments. @@ -389,10 +448,10 @@ func (m *mutableSegments) BackgroundCompact() { m.Lock() defer m.Unlock() - m.backgroundCompactWithLock() + m.backgroundCompactWithLock(true) } -func (m *mutableSegments) backgroundCompactWithLock() { +func (m *mutableSegments) backgroundCompactWithLock(force bool) { // Create a logical plan. segs := make([]compaction.Segment, 0, len(m.backgroundSegments)) for _, seg := range m.backgroundSegments { @@ -421,11 +480,20 @@ func (m *mutableSegments) backgroundCompactWithLock() { gcRequired = false gcPlan = &compaction.Plan{} gcAlreadyRunning = m.compact.compactingBackgroundGarbageCollect + nowFn = m.opts.ClockOptions().NowFn() + now = nowFn() ) if !gcAlreadyRunning { gcRequired = true for _, seg := range m.backgroundSegments { + sinceLastInactiveSeriesCheck := now.Sub(seg.garbageCollectLastCheck) + seg.garbageCollectLastCheck = now + if !force && sinceLastInactiveSeriesCheck < segmentCheckInactiveSeriesMinInterval { + // Only consider for compaction every so often. + continue + } + alreadyHasTask := false for _, task := range plan.Tasks { for _, taskSegment := range task.Segments { @@ -664,8 +732,14 @@ func (m *mutableSegments) backgroundCompactWithPlan( } func (m *mutableSegments) newReadThroughSegment(seg fst.Segment) *ReadThroughSegment { - readThroughOpts := m.opts.ReadThroughSegmentOptions() - return NewReadThroughSegment(seg, m.opts.PostingsListCache(), readThroughOpts) + var ( + plCaches = ReadThroughSegmentCaches{ + SegmentPostingsListCache: m.opts.PostingsListCache(), + SearchPostingsListCache: m.opts.SearchPostingsListCache(), + } + readThroughOpts = m.opts.ReadThroughSegmentOptions() + ) + return NewReadThroughSegment(seg, plCaches, readThroughOpts) } func (m *mutableSegments) backgroundCompactWithTask( @@ -718,8 +792,8 @@ func (m *mutableSegments) backgroundCompactWithTask( } var ( - compacted = compactResult.Compacted - // segMetas = compactResult.SegmentMetadatas + compacted = compactResult.Compacted + segMetas = compactResult.SegmentMetadatas replaceSeg segment.Segment ) if empty { @@ -739,13 +813,11 @@ func (m *mutableSegments) backgroundCompactWithTask( // Note: There was very obvious peaks of latency (p99 of <500ms spiking // to 8 times that at first replace of large segments after a block // rotation) without this optimization. - - // TODO: port populating cached searches - // if err := m.populateCachedSearches(readThroughSeg, segMetas); err != nil { - // instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) { - // l.Error("failed to populate cached searches", zap.Error(err)) - // }) - // } + if err := m.populateCachedSearches(readThroughSeg, segMetas); err != nil { + instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) { + l.Error("failed to populate cached searches", zap.Error(err)) + }) + } } // Rotate out the replaced frozen segments and add the compacted one. @@ -759,6 +831,288 @@ func (m *mutableSegments) backgroundCompactWithTask( return nil } +type cachedPatternForCompactedSegment struct { + field string + searchQuery *querypb.Query + patterns []cachedPatternFromSegment +} + +type cachedPatternFromSegment struct { + prevSeg prevSegment + hasCachedPattern bool + cachedPattern CachedPattern +} + +type prevSegment struct { + segment *ReadThroughSegment + meta segment.SegmentsBuilderSegmentMetadata +} + +const ( + defaultBitmapContainerPooling = 128 +) + +type populateCachedSearchesWorker struct { + postings postings.MutableList + encoder *pilosa.Encoder + closers []x.SafeCloser + cachedClosers []x.SafeCloser +} + +func newPopulateCachedSearchesWorker() *populateCachedSearchesWorker { + b := bitmap.NewBitmapWithDefaultPooling(defaultBitmapContainerPooling) + return &populateCachedSearchesWorker{ + postings: roaring.NewPostingsListFromBitmap(b), + encoder: pilosa.NewEncoder(), + } +} + +func (w *populateCachedSearchesWorker) addCloser(c io.Closer) { + if n := len(w.cachedClosers); n > 0 { + last := w.cachedClosers[n-1] + last.Reset(c) + w.cachedClosers[n-1] = nil + w.cachedClosers = w.cachedClosers[:n-1] + w.closers = append(w.closers, last) + return + } + w.closers = append(w.closers, x.NewSafeCloser(c)) +} + +func (w *populateCachedSearchesWorker) close() error { + multiErr := xerrors.NewMultiError() + for i, c := range w.closers { + multiErr = multiErr.Add(c.Close()) + w.closers[i] = nil + c.Reset(nil) + w.cachedClosers = append(w.cachedClosers, c) + } + w.closers = w.closers[:0] + return multiErr.FinalError() +} + +func (m *mutableSegments) populateCachedSearches( + compactedSeg *ReadThroughSegment, + prevSegsMetas []segment.SegmentsBuilderSegmentMetadata, +) error { + prevSegs := make([]prevSegment, 0, len(prevSegsMetas)) + for _, segMeta := range prevSegsMetas { + prevReadThroughSeg, ok := segMeta.Segment.(*ReadThroughSegment) + if !ok { + return fmt.Errorf("failed to cast compacted segment to read through segment") + } + prevSegs = append(prevSegs, prevSegment{ + segment: prevReadThroughSeg, + meta: segMeta, + }) + } + + searches := make(map[PostingsListCacheKey]cachedPatternForCompactedSegment) + for i, seg := range prevSegs { + i := i + result := seg.segment.CachedSearchPatterns(func(p CachedPattern) { + pattern, ok := searches[p.CacheKey] + if !ok { + pattern = cachedPatternForCompactedSegment{ + searchQuery: p.SearchQuery, + patterns: make([]cachedPatternFromSegment, len(prevSegs)), + } + for j, prevSeg := range prevSegs { + pattern.patterns[j] = cachedPatternFromSegment{ + prevSeg: prevSeg, + } + } + searches[p.CacheKey] = pattern + } + // Mark this segment with the cached pattern. + pattern.patterns[i].hasCachedPattern = true + pattern.patterns[i].cachedPattern = p + }) + if result.CacheSearchesDisabled { + m.metrics.activeBlockGarbageCollectCachedSearchesDisabled.Inc(1) + } + if result.CachedPatternsResult.InRegistry { + m.metrics.activeBlockGarbageCollectCachedSearchesInRegistry.Inc(1) + } else { + m.metrics.activeBlockGarbageCollectCachedSearchesNotInRegistry.Inc(1) + } + total := float64(result.CachedPatternsResult.TotalPatterns) + m.metrics.activeBlockGarbageCollectCachedSearchesTotal.RecordValue(total) + matched := float64(result.CachedPatternsResult.MatchedPatterns) + m.metrics.activeBlockGarbageCollectCachedSearchesMatched.RecordValue(matched) + } + + var totalSegmentsSize int64 + for _, seg := range prevSegs { + totalSegmentsSize += seg.segment.Size() + } + + var ( + numWorkers = m.cachedSearchesWorkers.Size() + workers = make(chan *populateCachedSearchesWorker, numWorkers) + instrumentedExec = func(fn func() error) func() error { + return func() error { + e := fn() + if e != nil { + m.metrics.activeBlockGarbageCollectReconstructCachedSearchExecError.Inc(1) + return e + } + m.metrics.activeBlockGarbageCollectReconstructCachedSearchExecSuccess.Inc(1) + return nil + } + } + group errgroup.Group + ) + for i := 0; i < numWorkers; i++ { + workers <- newPopulateCachedSearchesWorker() + } + for _, s := range searches { + s := s // Capture for loop. + + var totalSegmentsHasPatternSize int64 + for i := range s.patterns { + if !s.patterns[i].hasCachedPattern { + continue + } + totalSegmentsHasPatternSize += s.patterns[i].prevSeg.segment.Size() + } + + if totalSegmentsHasPatternSize < totalSegmentsSize/2 { + // If segments that represent less than half total size of all + // segments we compacted together do not have the cached pattern + // then don't bother caching this since it could result in an + // expensive computation and we're not getting the benefit from + // running the computation anyway since these aren't searches + // that were cached in the largest segments we just compacted. + m.metrics.activeBlockGarbageCollectReconstructCachedSearchEvalSkip.Inc(1) + continue + } + + m.metrics.activeBlockGarbageCollectReconstructCachedSearchEvalAttempt.Inc(1) + + // Control concurrency by taking and returning token from worker pool. + w := <-workers + group.Go(instrumentedExec(func() error { + defer func() { + // Close anything needed to be closed. + _ = w.close() + // Return worker. + workers <- w + }() + + if s.searchQuery == nil { + return fmt.Errorf("no search query for cached search pattern") + } + + var searcher search.Searcher + search, err := query.UnmarshalProto(s.searchQuery) + if err != nil { + return fmt.Errorf("failed to unmarshal search for cached search pattern: %w", err) + } + + // Reset reused postings. + w.postings.Reset() + + for i := range s.patterns { + var iter postings.Iterator + if s.patterns[i].hasCachedPattern { + // If has the cached pattern, no need to execute a search, can reuse + // the postings iterator from the cache. + iter = s.patterns[i].cachedPattern.Postings.Iterator() + w.addCloser(iter) + + // Track hit. + m.metrics.activeBlockGarbageCollectReconstructCachedSearchCacheHit.Inc(1) + } else { + // Does not have this pattern cached, need to execute the search + // against this segment. + if searcher == nil { + searcher, err = search.Searcher() + if err != nil { + return fmt.Errorf("failed to create searcher for cached search pattern: %w", err) + } + } + + reader, err := s.patterns[i].prevSeg.segment.Reader() + if err != nil { + return fmt.Errorf("failed to create prev seg reader: %w", err) + } + + w.addCloser(reader) + + pl, err := searcher.Search(reader) + if err != nil { + return fmt.Errorf("failed to search prev seg reader: %w", err) + } + + iter = pl.Iterator() + w.addCloser(iter) + + // Track miss. + m.metrics.activeBlockGarbageCollectReconstructCachedSearchCacheMiss.Inc(1) + } + + if s.patterns[i].prevSeg.meta.Offset == 0 && s.patterns[i].prevSeg.meta.Skips == 0 { + // No offset and no postings to skip, can copy into the reused postings. + if err := w.postings.AddIterator(iter); err != nil { + return fmt.Errorf("could not copy cached postings: %w", err) + } + continue + } + + // We have to take into account offset and duplicates/skips. + negativeOffsets := s.patterns[i].prevSeg.meta.NegativeOffsets + for iter.Next() { + curr := iter.Current() + negativeOffset := negativeOffsets[curr] + // Then skip the individual if matches. + if negativeOffset == -1 { + // Skip this value, as itself is a duplicate. + continue + } + value := curr + s.patterns[i].prevSeg.meta.Offset - postings.ID(negativeOffset) + if err := w.postings.Insert(value); err != nil { + if closeErr := iter.Close(); closeErr != nil { + err = xerrors.NewMultiError().Add(err).Add(closeErr) + } + return fmt.Errorf("could not insert from cached postings: %w", err) + } + } + + err := iter.Err() + if closeErr := iter.Close(); closeErr != nil { + err = xerrors.NewMultiError().Add(err).Add(closeErr) + } + if err != nil { + return fmt.Errorf("could not close cached postings: %w", err) + } + } + + // Encode the result and create a read only copy as we reuse the postings + // list in the worker to build consequent postings. + bytes, err := w.encoder.Encode(w.postings) + if err != nil { + return fmt.Errorf("could not encode result cached search postings: %w", err) + } + + bitmap.NewBitmap() + + // Need to copy bytes since encoder owns the bytes. + copied := append(make([]byte, 0, len(bytes)), bytes...) + bm := bitmap.NewBitmap() + if err := bm.UnmarshalBinary(copied); err != nil { + return fmt.Errorf("could not create result cached search postings: %w", err) + } + + pl := roaring.NewPostingsListFromBitmap(bm) + compactedSeg.PutCachedSearchPattern(s.field, search, pl) + return nil + })) + } + + return group.Wait() +} + func (m *mutableSegments) addCompactedSegmentFromSegmentsWithLock( current []*readableSeg, segmentsJustCompacted []segment.Segment, diff --git a/src/dbnode/storage/index/mutable_segments_test.go b/src/dbnode/storage/index/mutable_segments_test.go new file mode 100644 index 0000000000..711c5f9561 --- /dev/null +++ b/src/dbnode/storage/index/mutable_segments_test.go @@ -0,0 +1,275 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/search" + "github.com/m3db/m3/src/m3ninx/search/query" + "github.com/m3db/m3/src/x/instrument" + xsync "github.com/m3db/m3/src/x/sync" + xtest "github.com/m3db/m3/src/x/test" + xtime "github.com/m3db/m3/src/x/time" +) + +type testMutableSegmentsResult struct { + logger *zap.Logger + searchCache *PostingsListCache +} + +func newTestMutableSegments( + t *testing.T, + md namespace.Metadata, + blockStart xtime.UnixNano, +) (*mutableSegments, testMutableSegmentsResult) { + cachedSearchesWorkers := xsync.NewWorkerPool(2) + cachedSearchesWorkers.Init() + + iOpts := instrument.NewTestOptions(t) + + cache, err := NewPostingsListCache(10, PostingsListCacheOptions{ + InstrumentOptions: iOpts, + }) + require.NoError(t, err) + + searchCache, err := NewPostingsListCache(10, PostingsListCacheOptions{ + InstrumentOptions: iOpts, + }) + require.NoError(t, err) + + opts := testOpts. + SetPostingsListCache(cache). + SetSearchPostingsListCache(searchCache). + SetReadThroughSegmentOptions(ReadThroughSegmentOptions{ + CacheRegexp: true, + CacheTerms: true, + CacheSearches: true, + }) + + segs := newMutableSegments(md, blockStart, opts, BlockOptions{}, + cachedSearchesWorkers, namespace.NewRuntimeOptionsManager("foo"), iOpts) + require.NoError(t, err) + + return segs, testMutableSegmentsResult{ + logger: iOpts.Logger(), + searchCache: searchCache, + } +} + +func TestMutableSegmentsBackgroundCompactGCReconstructCachedSearches(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + blockSize := time.Hour + testMD := newTestNSMetadata(t) + blockStart := xtime.Now().Truncate(blockSize) + + nowNotBlockStartAligned := blockStart.Add(time.Minute) + + segs, result := newTestMutableSegments(t, testMD, blockStart) + segs.backgroundCompactDisable = true // Disable to explicitly test. + + inserted := 0 + segs.Lock() + segsBackground := len(segs.backgroundSegments) + segs.Unlock() + + for runs := 0; runs < 10; runs++ { + runs := runs + t.Run(fmt.Sprintf("run-%d", runs), func(t *testing.T) { + logger := result.logger.With(zap.Int("run", runs)) + + // Insert until we have a new background segment. + for { + segs.Lock() + curr := len(segs.backgroundSegments) + segs.Unlock() + if curr > segsBackground { + segsBackground = curr + break + } + + batch := NewWriteBatch(WriteBatchOptions{ + IndexBlockSize: blockSize, + }) + for i := 0; i < 128; i++ { + onIndexSeries := doc.NewMockOnIndexSeries(ctrl) + onIndexSeries.EXPECT(). + TryMarkIndexGarbageCollected(). + // Every other is "empty". + Return(inserted%2 == 0). + AnyTimes() + onIndexSeries.EXPECT(). + NeedsIndexGarbageCollected(). + // Every other is "empty". + Return(inserted%2 == 0). + AnyTimes() + + batch.Append(WriteBatchEntry{ + Timestamp: nowNotBlockStartAligned, + OnIndexSeries: onIndexSeries, + }, testDocN(inserted)) + inserted++ + } + + _, err := segs.WriteBatch(batch) + require.NoError(t, err) + } + + // Perform some searches. + testDocSearches(t, segs) + + // Make sure search postings cache was populated. + require.True(t, result.searchCache.lru.Len() > 0) + logger.Info("search cache populated", zap.Int("n", result.searchCache.lru.Len())) + + // Start some async searches so we have searches going on while + // executing background compact GC. + doneCh := make(chan struct{}, 2) + defer close(doneCh) + for i := 0; i < 2; i++ { + go func() { + for { + select { + case <-doneCh: + return + default: + } + // Search continuously. + testDocSearches(t, segs) + } + }() + } + + // Explicitly background compact and make sure that background segment + // is GC'd of series no longer present. + segs.Lock() + segs.backgroundCompactWithLock(false) + compactingBackgroundStandard := segs.compact.compactingBackgroundStandard + compactingBackgroundGarbageCollect := segs.compact.compactingBackgroundGarbageCollect + segs.Unlock() + + // Should have kicked off a background compact GC. + require.True(t, compactingBackgroundStandard || compactingBackgroundGarbageCollect) + + // Wait for background compact GC to run. + for { + segs.Lock() + compactingBackgroundStandard := segs.compact.compactingBackgroundStandard + compactingBackgroundGarbageCollect := segs.compact.compactingBackgroundGarbageCollect + segs.Unlock() + if !compactingBackgroundStandard && !compactingBackgroundGarbageCollect { + break + } + time.Sleep(100 * time.Millisecond) + } + + logger.Info("compaction done, search cache", zap.Int("n", result.searchCache.lru.Len())) + }) + } +} + +func testDocSearches( + t *testing.T, + segs *mutableSegments, +) { + for i := 0; i < len(testDocBucket0Values); i++ { + for j := 0; j < len(testDocBucket1Values); j++ { + readers, err := segs.AddReaders(nil) + assert.NoError(t, err) + + regexp0 := fmt.Sprintf("(%s|%s)", moduloByteStr(testDocBucket0Values, i), + moduloByteStr(testDocBucket0Values, i+1)) + b0, err := query.NewRegexpQuery([]byte(testDocBucket0Name), []byte(regexp0)) + assert.NoError(t, err) + + regexp1 := fmt.Sprintf("(%s|%s|%s)", moduloByteStr(testDocBucket1Values, j), + moduloByteStr(testDocBucket1Values, j+1), + moduloByteStr(testDocBucket1Values, j+2)) + b1, err := query.NewRegexpQuery([]byte(testDocBucket1Name), []byte(regexp1)) + assert.NoError(t, err) + + q := query.NewConjunctionQuery([]search.Query{b0, b1}) + searcher, err := q.Searcher() + assert.NoError(t, err) + + for _, reader := range readers { + readThrough, ok := reader.(search.ReadThroughSegmentSearcher) + assert.True(t, ok) + + pl, err := readThrough.Search(q, searcher) + assert.NoError(t, err) + + assert.True(t, pl.Len() > 0) + } + } + } +} + +var ( + testDocBucket0Name = "bucket_0" + testDocBucket0Values = []string{ + "one", + "two", + "three", + } + testDocBucket1Name = "bucket_1" + testDocBucket1Values = []string{ + "one", + "two", + "three", + "four", + "five", + } +) + +func testDocN(n int) doc.Metadata { + return doc.Metadata{ + ID: []byte(fmt.Sprintf("doc-%d", n)), + Fields: []doc.Field{ + { + Name: []byte("foo"), + Value: []byte("bar"), + }, + { + Name: []byte(testDocBucket0Name), + Value: moduloByteStr(testDocBucket0Values, n), + }, + { + Name: []byte(testDocBucket1Name), + Value: moduloByteStr(testDocBucket1Values, n), + }, + }, + } +} + +func moduloByteStr(strs []string, n int) []byte { + return []byte(strs[n%len(strs)]) +} diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 055ab95cac..380e8b8550 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -112,7 +112,7 @@ func init() { } // nolint: maligned -type opts struct { +type options struct { forwardIndexThreshold float64 forwardIndexProbability float64 insertMode InsertMode @@ -132,6 +132,7 @@ type opts struct { foregroundCompactionPlannerOpts compaction.PlannerOptions backgroundCompactionPlannerOpts compaction.PlannerOptions postingsListCache *PostingsListCache + searchPostingsListCache *PostingsListCache readThroughSegmentOptions ReadThroughSegmentOptions mmapReporter mmap.Reporter queryLimits limits.QueryLimits @@ -177,7 +178,7 @@ func NewOptions() Options { aggResultsEntryArrayPool.Init() instrumentOpts := instrument.NewOptions() - opts := &opts{ + opts := &options{ insertMode: defaultIndexInsertMode, clockOpts: clock.NewOptions(), instrumentOpts: instrumentOpts, @@ -206,7 +207,7 @@ func NewOptions() Options { return opts } -func (o *opts) Validate() error { +func (o *options) Validate() error { if o.idPool == nil { return errOptionsIdentifierPoolUnspecified } @@ -237,27 +238,27 @@ func (o *opts) Validate() error { return nil } -func (o *opts) SetInsertMode(value InsertMode) Options { +func (o *options) SetInsertMode(value InsertMode) Options { opts := *o opts.insertMode = value return &opts } -func (o *opts) InsertMode() InsertMode { +func (o *options) InsertMode() InsertMode { return o.insertMode } -func (o *opts) SetClockOptions(value clock.Options) Options { +func (o *options) SetClockOptions(value clock.Options) Options { opts := *o opts.clockOpts = value return &opts } -func (o *opts) ClockOptions() clock.Options { +func (o *options) ClockOptions() clock.Options { return o.clockOpts } -func (o *opts) SetInstrumentOptions(value instrument.Options) Options { +func (o *options) SetInstrumentOptions(value instrument.Options) Options { opts := *o memOpts := opts.MemSegmentOptions().SetInstrumentOptions(value) fstOpts := opts.FSTSegmentOptions().SetInstrumentOptions(value) @@ -267,196 +268,206 @@ func (o *opts) SetInstrumentOptions(value instrument.Options) Options { return &opts } -func (o *opts) InstrumentOptions() instrument.Options { +func (o *options) InstrumentOptions() instrument.Options { return o.instrumentOpts } -func (o *opts) SetSegmentBuilderOptions(value builder.Options) Options { +func (o *options) SetSegmentBuilderOptions(value builder.Options) Options { opts := *o opts.builderOpts = value return &opts } -func (o *opts) SegmentBuilderOptions() builder.Options { +func (o *options) SegmentBuilderOptions() builder.Options { return o.builderOpts } -func (o *opts) SetMemSegmentOptions(value mem.Options) Options { +func (o *options) SetMemSegmentOptions(value mem.Options) Options { opts := *o opts.memOpts = value return &opts } -func (o *opts) MemSegmentOptions() mem.Options { +func (o *options) MemSegmentOptions() mem.Options { return o.memOpts } -func (o *opts) SetFSTSegmentOptions(value fst.Options) Options { +func (o *options) SetFSTSegmentOptions(value fst.Options) Options { opts := *o opts.fstOpts = value return &opts } -func (o *opts) FSTSegmentOptions() fst.Options { +func (o *options) FSTSegmentOptions() fst.Options { return o.fstOpts } -func (o *opts) SetIdentifierPool(value ident.Pool) Options { +func (o *options) SetIdentifierPool(value ident.Pool) Options { opts := *o opts.idPool = value return &opts } -func (o *opts) IdentifierPool() ident.Pool { +func (o *options) IdentifierPool() ident.Pool { return o.idPool } -func (o *opts) SetCheckedBytesPool(value pool.CheckedBytesPool) Options { +func (o *options) SetCheckedBytesPool(value pool.CheckedBytesPool) Options { opts := *o opts.bytesPool = value return &opts } -func (o *opts) CheckedBytesPool() pool.CheckedBytesPool { +func (o *options) CheckedBytesPool() pool.CheckedBytesPool { return o.bytesPool } -func (o *opts) SetQueryResultsPool(value QueryResultsPool) Options { +func (o *options) SetQueryResultsPool(value QueryResultsPool) Options { opts := *o opts.resultsPool = value return &opts } -func (o *opts) QueryResultsPool() QueryResultsPool { +func (o *options) QueryResultsPool() QueryResultsPool { return o.resultsPool } -func (o *opts) SetAggregateResultsPool(value AggregateResultsPool) Options { +func (o *options) SetAggregateResultsPool(value AggregateResultsPool) Options { opts := *o opts.aggResultsPool = value return &opts } -func (o *opts) AggregateResultsPool() AggregateResultsPool { +func (o *options) AggregateResultsPool() AggregateResultsPool { return o.aggResultsPool } -func (o *opts) SetAggregateValuesPool(value AggregateValuesPool) Options { +func (o *options) SetAggregateValuesPool(value AggregateValuesPool) Options { opts := *o opts.aggValuesPool = value return &opts } -func (o *opts) AggregateValuesPool() AggregateValuesPool { +func (o *options) AggregateValuesPool() AggregateValuesPool { return o.aggValuesPool } -func (o *opts) SetDocumentArrayPool(value doc.DocumentArrayPool) Options { +func (o *options) SetDocumentArrayPool(value doc.DocumentArrayPool) Options { opts := *o opts.docArrayPool = value return &opts } -func (o *opts) DocumentArrayPool() doc.DocumentArrayPool { +func (o *options) DocumentArrayPool() doc.DocumentArrayPool { return o.docArrayPool } -func (o *opts) SetMetadataArrayPool(value doc.MetadataArrayPool) Options { +func (o *options) SetMetadataArrayPool(value doc.MetadataArrayPool) Options { opts := *o // nolint:govet opts.metadataArrayPool = value return &opts } -func (o *opts) MetadataArrayPool() doc.MetadataArrayPool { +func (o *options) MetadataArrayPool() doc.MetadataArrayPool { return o.metadataArrayPool } -func (o *opts) SetAggregateResultsEntryArrayPool(value AggregateResultsEntryArrayPool) Options { +func (o *options) SetAggregateResultsEntryArrayPool(value AggregateResultsEntryArrayPool) Options { opts := *o opts.aggResultsEntryArrayPool = value return &opts } -func (o *opts) AggregateResultsEntryArrayPool() AggregateResultsEntryArrayPool { +func (o *options) AggregateResultsEntryArrayPool() AggregateResultsEntryArrayPool { return o.aggResultsEntryArrayPool } -func (o *opts) SetForegroundCompactionPlannerOptions(value compaction.PlannerOptions) Options { +func (o *options) SetForegroundCompactionPlannerOptions(value compaction.PlannerOptions) Options { opts := *o opts.foregroundCompactionPlannerOpts = value return &opts } -func (o *opts) ForegroundCompactionPlannerOptions() compaction.PlannerOptions { +func (o *options) ForegroundCompactionPlannerOptions() compaction.PlannerOptions { return o.foregroundCompactionPlannerOpts } -func (o *opts) SetBackgroundCompactionPlannerOptions(value compaction.PlannerOptions) Options { +func (o *options) SetBackgroundCompactionPlannerOptions(value compaction.PlannerOptions) Options { opts := *o opts.backgroundCompactionPlannerOpts = value return &opts } -func (o *opts) BackgroundCompactionPlannerOptions() compaction.PlannerOptions { +func (o *options) BackgroundCompactionPlannerOptions() compaction.PlannerOptions { return o.backgroundCompactionPlannerOpts } -func (o *opts) SetPostingsListCache(value *PostingsListCache) Options { +func (o *options) SetPostingsListCache(value *PostingsListCache) Options { opts := *o opts.postingsListCache = value return &opts } -func (o *opts) PostingsListCache() *PostingsListCache { +func (o *options) PostingsListCache() *PostingsListCache { return o.postingsListCache } -func (o *opts) SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Options { +func (o *options) SetSearchPostingsListCache(value *PostingsListCache) Options { + opts := *o + opts.searchPostingsListCache = value + return &opts +} + +func (o *options) SearchPostingsListCache() *PostingsListCache { + return o.searchPostingsListCache +} + +func (o *options) SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Options { opts := *o opts.readThroughSegmentOptions = value return &opts } -func (o *opts) ReadThroughSegmentOptions() ReadThroughSegmentOptions { +func (o *options) ReadThroughSegmentOptions() ReadThroughSegmentOptions { return o.readThroughSegmentOptions } -func (o *opts) SetForwardIndexProbability(value float64) Options { +func (o *options) SetForwardIndexProbability(value float64) Options { opts := *o opts.forwardIndexProbability = value return &opts } -func (o *opts) ForwardIndexProbability() float64 { +func (o *options) ForwardIndexProbability() float64 { return o.forwardIndexProbability } -func (o *opts) SetForwardIndexThreshold(value float64) Options { +func (o *options) SetForwardIndexThreshold(value float64) Options { opts := *o opts.forwardIndexThreshold = value return &opts } -func (o *opts) ForwardIndexThreshold() float64 { +func (o *options) ForwardIndexThreshold() float64 { return o.forwardIndexThreshold } -func (o *opts) SetMmapReporter(mmapReporter mmap.Reporter) Options { +func (o *options) SetMmapReporter(mmapReporter mmap.Reporter) Options { opts := *o opts.mmapReporter = mmapReporter return &opts } -func (o *opts) MmapReporter() mmap.Reporter { +func (o *options) MmapReporter() mmap.Reporter { return o.mmapReporter } -func (o *opts) SetQueryLimits(value limits.QueryLimits) Options { +func (o *options) SetQueryLimits(value limits.QueryLimits) Options { opts := *o opts.queryLimits = value return &opts } -func (o *opts) QueryLimits() limits.QueryLimits { +func (o *options) QueryLimits() limits.QueryLimits { return o.queryLimits } diff --git a/src/dbnode/storage/index/postings_list_cache.go b/src/dbnode/storage/index/postings_list_cache.go index 6f4ca23c1a..cce2d65a25 100644 --- a/src/dbnode/storage/index/postings_list_cache.go +++ b/src/dbnode/storage/index/postings_list_cache.go @@ -21,30 +21,38 @@ package index import ( - "sync" + "errors" + "math" "time" + "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" "github.com/m3db/m3/src/m3ninx/postings" + "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/x/instrument" "github.com/pborman/uuid" "github.com/uber-go/tally" + "go.uber.org/zap" ) +var errInstrumentOptions = errors.New("no instrument options set") + // PatternType is an enum for the various pattern types. It allows us // separate them logically within the cache. -type PatternType int +type PatternType string // Closer represents a function that will close managed resources. type Closer func() const ( // PatternTypeRegexp indicates that the pattern is of type regexp. - PatternTypeRegexp PatternType = iota + PatternTypeRegexp PatternType = "regexp" // PatternTypeTerm indicates that the pattern is of type term. - PatternTypeTerm + PatternTypeTerm PatternType = "term" // PatternTypeField indicates that the pattern is of type field. - PatternTypeField + PatternTypeField PatternType = "field" + // PatternTypeSearch indicates that the pattern is of type search. + PatternTypeSearch PatternType = "search" reportLoopInterval = 10 * time.Second emptyPattern = "" @@ -55,20 +63,40 @@ type PostingsListCacheOptions struct { InstrumentOptions instrument.Options } +// Validate will return an error if the options are not valid. +func (o PostingsListCacheOptions) Validate() error { + if o.InstrumentOptions == nil { + return errInstrumentOptions + } + return nil +} + // PostingsListCache implements an LRU for caching queries and their results. type PostingsListCache struct { - sync.Mutex - lru *postingsListLRU size int opts PostingsListCacheOptions metrics *postingsListCacheMetrics + + logger *zap.Logger } // NewPostingsListCache creates a new query cache. -func NewPostingsListCache(size int, opts PostingsListCacheOptions) (*PostingsListCache, error) { - lru, err := newPostingsListLRU(size) +func NewPostingsListCache( + size int, + opts PostingsListCacheOptions, +) (*PostingsListCache, error) { + err := opts.Validate() + if err != nil { + return nil, err + } + + lru, err := newPostingsListLRU(postingsListLRUOptions{ + size: size, + // Use ~1000 items per shard. + shards: int(math.Ceil(float64(size) / 1000)), + }) if err != nil { return nil, err } @@ -78,6 +106,7 @@ func NewPostingsListCache(size int, opts PostingsListCacheOptions) (*PostingsLis size: size, opts: opts, metrics: newPostingsListCacheMetrics(opts.InstrumentOptions.MetricsScope()), + logger: opts.InstrumentOptions.Logger(), } return plc, nil @@ -114,24 +143,40 @@ func (q *PostingsListCache) GetField( return q.get(segmentUUID, field, emptyPattern, PatternTypeField) } +// GetSearch returns the cached results for the provided search query, if any. +func (q *PostingsListCache) GetSearch( + segmentUUID uuid.UUID, + query string, +) (postings.List, bool) { + return q.get(segmentUUID, query, emptyPattern, PatternTypeSearch) +} + func (q *PostingsListCache) get( segmentUUID uuid.UUID, field string, pattern string, patternType PatternType, ) (postings.List, bool) { - // No RLock because a Get() operation mutates the LRU. - q.Lock() - p, ok := q.lru.Get(segmentUUID, field, pattern, patternType) - q.Unlock() - + entry, ok := q.lru.Get(segmentUUID, field, pattern, patternType) q.emitCacheGetMetrics(patternType, ok) - if !ok { return nil, false } - return p, ok + return entry.postings, ok +} + +type cachedPostings struct { + // key + segmentUUID uuid.UUID + field string + pattern string + patternType PatternType + + // value + postings postings.List + // searchQuery is only set for search queries. + searchQuery *querypb.Query } // PutRegexp updates the LRU with the result of the regexp query. @@ -141,7 +186,7 @@ func (q *PostingsListCache) PutRegexp( pattern string, pl postings.List, ) { - q.put(segmentUUID, field, pattern, PatternTypeRegexp, pl) + q.put(segmentUUID, field, pattern, PatternTypeRegexp, nil, pl) } // PutTerm updates the LRU with the result of the term query. @@ -151,7 +196,7 @@ func (q *PostingsListCache) PutTerm( pattern string, pl postings.List, ) { - q.put(segmentUUID, field, pattern, PatternTypeTerm, pl) + q.put(segmentUUID, field, pattern, PatternTypeTerm, nil, pl) } // PutField updates the LRU with the result of the field query. @@ -160,7 +205,17 @@ func (q *PostingsListCache) PutField( field string, pl postings.List, ) { - q.put(segmentUUID, field, emptyPattern, PatternTypeField, pl) + q.put(segmentUUID, field, emptyPattern, PatternTypeField, nil, pl) +} + +// PutSearch updates the LRU with the result of a search query. +func (q *PostingsListCache) PutSearch( + segmentUUID uuid.UUID, + queryStr string, + query search.Query, + pl postings.List, +) { + q.put(segmentUUID, queryStr, emptyPattern, PatternTypeSearch, query, pl) } func (q *PostingsListCache) put( @@ -168,26 +223,31 @@ func (q *PostingsListCache) put( field string, pattern string, patternType PatternType, + searchQuery search.Query, pl postings.List, ) { - q.Lock() - q.lru.Add( - segmentUUID, - field, - pattern, - patternType, - pl, - ) - q.Unlock() + var searchQueryProto *querypb.Query + if searchQuery != nil { + searchQueryProto = searchQuery.ToProto() + } + + value := &cachedPostings{ + segmentUUID: segmentUUID, + field: field, + pattern: pattern, + patternType: patternType, + searchQuery: searchQueryProto, + postings: pl, + } + q.lru.Add(segmentUUID, field, pattern, patternType, value) + q.emitCachePutMetrics(patternType) } // PurgeSegment removes all postings lists associated with the specified // segment from the cache. func (q *PostingsListCache) PurgeSegment(segmentUUID uuid.UUID) { - q.Lock() q.lru.PurgeSegment(segmentUUID) - q.Unlock() } // startReportLoop starts a background process that will call Report() @@ -212,20 +272,78 @@ func (q *PostingsListCache) startReportLoop() Closer { return func() { close(doneCh) } } +// CachedPattern defines a cached pattern. +type CachedPattern struct { + CacheKey PostingsListCacheKey + SearchQuery *querypb.Query + Postings postings.List +} + +// CachedPatternsResult defines the result of a cached pattern. +type CachedPatternsResult struct { + InRegistry bool + TotalPatterns int + MatchedPatterns int +} + +// CachedPatternForEachFn defines a function for iterating a cached pattern. +type CachedPatternForEachFn func(CachedPattern) + +// CachedPatternsQuery defines a cached pattern query. +type CachedPatternsQuery struct { + PatternType *PatternType +} + +// CachedPatterns returns cached patterns for given query. +func (q *PostingsListCache) CachedPatterns( + uuid uuid.UUID, + query CachedPatternsQuery, + fn CachedPatternForEachFn, +) CachedPatternsResult { + var result CachedPatternsResult + + for _, shard := range q.lru.shards { + shard.RLock() + result = shardCachedPatternsWithRLock(uuid, query, fn, shard, result) + shard.RUnlock() + } + + return result +} + +func shardCachedPatternsWithRLock( + uuid uuid.UUID, + query CachedPatternsQuery, + fn CachedPatternForEachFn, + shard *postingsListLRUShard, + result CachedPatternsResult, +) CachedPatternsResult { + segmentPostings, ok := shard.items[uuid.Array()] + if !ok { + return result + } + + result.InRegistry = true + result.TotalPatterns += len(segmentPostings) + for key, value := range segmentPostings { + if v := query.PatternType; v != nil && *v != key.PatternType { + continue + } + + fn(CachedPattern{ + CacheKey: key, + SearchQuery: value.Value.(*entry).cachedPostings.searchQuery, + Postings: value.Value.(*entry).cachedPostings.postings, + }) + result.MatchedPatterns++ + } + + return result +} + // Report will emit metrics about the status of the cache. func (q *PostingsListCache) Report() { - var ( - size float64 - capacity float64 - ) - - q.Lock() - size = float64(q.lru.Len()) - capacity = float64(q.size) - q.Unlock() - - q.metrics.size.Update(size) - q.metrics.capacity.Update(capacity) + q.metrics.capacity.Update(float64(q.size)) } func (q *PostingsListCache) emitCacheGetMetrics(patternType PatternType, hit bool) { @@ -237,6 +355,8 @@ func (q *PostingsListCache) emitCacheGetMetrics(patternType PatternType, hit boo method = q.metrics.term case PatternTypeField: method = q.metrics.field + case PatternTypeSearch: + method = q.metrics.search default: method = q.metrics.unknown // should never happen } @@ -255,6 +375,8 @@ func (q *PostingsListCache) emitCachePutMetrics(patternType PatternType) { q.metrics.term.puts.Inc(1) case PatternTypeField: q.metrics.field.puts.Inc(1) + case PatternTypeSearch: + q.metrics.search.puts.Inc(1) default: q.metrics.unknown.puts.Inc(1) // should never happen } @@ -264,10 +386,16 @@ type postingsListCacheMetrics struct { regexp *postingsListCacheMethodMetrics term *postingsListCacheMethodMetrics field *postingsListCacheMethodMetrics + search *postingsListCacheMethodMetrics unknown *postingsListCacheMethodMetrics size tally.Gauge capacity tally.Gauge + + pooledGet tally.Counter + pooledGetErrAddIter tally.Counter + pooledPut tally.Counter + pooledPutErrNotMutable tally.Counter } func newPostingsListCacheMetrics(scope tally.Scope) *postingsListCacheMetrics { @@ -281,12 +409,22 @@ func newPostingsListCacheMetrics(scope tally.Scope) *postingsListCacheMetrics { field: newPostingsListCacheMethodMetrics(scope.Tagged(map[string]string{ "query_type": "field", })), + search: newPostingsListCacheMethodMetrics(scope.Tagged(map[string]string{ + "query_type": "search", + })), unknown: newPostingsListCacheMethodMetrics(scope.Tagged(map[string]string{ "query_type": "unknown", })), - - size: scope.Gauge("size"), - capacity: scope.Gauge("capacity"), + size: scope.Gauge("size"), + capacity: scope.Gauge("capacity"), + pooledGet: scope.Counter("pooled_get"), + pooledGetErrAddIter: scope.Tagged(map[string]string{ + "error_type": "add_iter", + }).Counter("pooled_get_error"), + pooledPut: scope.Counter("pooled_put"), + pooledPutErrNotMutable: scope.Tagged(map[string]string{ + "error_type": "not_mutable", + }).Counter("pooled_put_error"), } } diff --git a/src/dbnode/storage/index/postings_list_cache_lru.go b/src/dbnode/storage/index/postings_list_cache_lru.go index b7c9d9be4f..488ec45879 100644 --- a/src/dbnode/storage/index/postings_list_cache_lru.go +++ b/src/dbnode/storage/index/postings_list_cache_lru.go @@ -23,9 +23,10 @@ package index import ( "container/list" "errors" + "math" + "sync" - "github.com/m3db/m3/src/m3ninx/postings" - + "github.com/cespare/xxhash/v2" "github.com/pborman/uuid" ) @@ -59,45 +60,132 @@ import ( // LRU. The specialization has the additional nice property that we don't need to allocate everytime // we add an item to the LRU due to the interface{} conversion. type postingsListLRU struct { + shards []*postingsListLRUShard + numShards uint64 +} + +type postingsListLRUShard struct { + sync.RWMutex size int evictList *list.List - items map[uuid.Array]map[key]*list.Element + items map[uuid.Array]map[PostingsListCacheKey]*list.Element } // entry is used to hold a value in the evictList. type entry struct { - uuid uuid.UUID - key key - postingsList postings.List + uuid uuid.UUID + key PostingsListCacheKey + cachedPostings *cachedPostings +} + +// PostingsListCacheKey is a postings list cache key. +type PostingsListCacheKey struct { + Field string + Pattern string + PatternType PatternType } -type key struct { - field string - pattern string - patternType PatternType +type postingsListLRUOptions struct { + size int + shards int } // newPostingsListLRU constructs an LRU of the given size. -func newPostingsListLRU(size int) (*postingsListLRU, error) { +func newPostingsListLRU(opts postingsListLRUOptions) (*postingsListLRU, error) { + size, shards := opts.size, opts.shards if size <= 0 { - return nil, errors.New("Must provide a positive size") + return nil, errors.New("must provide a positive size") + } + if shards <= 0 { + return nil, errors.New("must provide a positive shards") + } + + lruShards := make([]*postingsListLRUShard, 0, shards) + for i := 0; i < shards; i++ { + lruShard := newPostingsListLRUShard(int(math.Ceil(float64(size) / float64(shards)))) + lruShards = append(lruShards, lruShard) } return &postingsListLRU{ + shards: lruShards, + numShards: uint64(len(lruShards)), + }, nil +} + +// newPostingsListLRU constructs an LRU of the given size. +func newPostingsListLRUShard(size int) *postingsListLRUShard { + return &postingsListLRUShard{ size: size, evictList: list.New(), - items: make(map[uuid.Array]map[key]*list.Element), - }, nil + items: make(map[uuid.Array]map[PostingsListCacheKey]*list.Element), + } +} + +func (c *postingsListLRU) shard( + segmentUUID uuid.UUID, + field, pattern string, + patternType PatternType, +) *postingsListLRUShard { + idx := hashKey(segmentUUID, field, pattern, patternType) % c.numShards + return c.shards[idx] } -// Add adds a value to the cache. Returns true if an eviction occurred. func (c *postingsListLRU) Add( segmentUUID uuid.UUID, field string, pattern string, patternType PatternType, - pl postings.List, + cachedPostings *cachedPostings, +) bool { + shard := c.shard(segmentUUID, field, pattern, patternType) + return shard.Add(segmentUUID, field, pattern, patternType, cachedPostings) +} + +func (c *postingsListLRU) Get( + segmentUUID uuid.UUID, + field string, + pattern string, + patternType PatternType, +) (*cachedPostings, bool) { + shard := c.shard(segmentUUID, field, pattern, patternType) + return shard.Get(segmentUUID, field, pattern, patternType) +} + +func (c *postingsListLRU) Remove( + segmentUUID uuid.UUID, + field string, + pattern string, + patternType PatternType, +) bool { + shard := c.shard(segmentUUID, field, pattern, patternType) + return shard.Remove(segmentUUID, field, pattern, patternType) +} + +func (c *postingsListLRU) PurgeSegment(segmentUUID uuid.UUID) { + for _, shard := range c.shards { + shard.PurgeSegment(segmentUUID) + } +} + +func (c *postingsListLRU) Len() int { + n := 0 + for _, shard := range c.shards { + n += shard.Len() + } + return n +} + +// Add adds a value to the cache. Returns true if an eviction occurred. +func (c *postingsListLRUShard) Add( + segmentUUID uuid.UUID, + field string, + pattern string, + patternType PatternType, + cachedPostings *cachedPostings, ) (evicted bool) { + c.Lock() + defer c.Unlock() + newKey := newKey(field, pattern, patternType) // Check for existing item. uuidArray := segmentUUID.Array() @@ -108,7 +196,7 @@ func (c *postingsListLRU) Add( // can only point to one entry at a time and we use them for purges. Also, // it saves space by avoiding storing duplicate values. c.evictList.MoveToFront(ent) - ent.Value.(*entry).postingsList = pl + ent.Value.(*entry).cachedPostings = cachedPostings return false } } @@ -116,16 +204,16 @@ func (c *postingsListLRU) Add( // Add new item. var ( ent = &entry{ - uuid: segmentUUID, - key: newKey, - postingsList: pl, + uuid: segmentUUID, + key: newKey, + cachedPostings: cachedPostings, } entry = c.evictList.PushFront(ent) ) if queries, ok := c.items[uuidArray]; ok { queries[newKey] = entry } else { - c.items[uuidArray] = map[key]*list.Element{ + c.items[uuidArray] = map[PostingsListCacheKey]*list.Element{ newKey: entry, } } @@ -139,32 +227,43 @@ func (c *postingsListLRU) Add( } // Get looks up a key's value from the cache. -func (c *postingsListLRU) Get( +func (c *postingsListLRUShard) Get( segmentUUID uuid.UUID, field string, pattern string, patternType PatternType, -) (postings.List, bool) { +) (*cachedPostings, bool) { + c.Lock() + defer c.Unlock() + newKey := newKey(field, pattern, patternType) uuidArray := segmentUUID.Array() - if uuidEntries, ok := c.items[uuidArray]; ok { - if ent, ok := uuidEntries[newKey]; ok { - c.evictList.MoveToFront(ent) - return ent.Value.(*entry).postingsList, true - } + + uuidEntries, ok := c.items[uuidArray] + if !ok { + return nil, false } - return nil, false + ent, ok := uuidEntries[newKey] + if !ok { + return nil, false + } + + c.evictList.MoveToFront(ent) + return ent.Value.(*entry).cachedPostings, true } // Remove removes the provided key from the cache, returning if the // key was contained. -func (c *postingsListLRU) Remove( +func (c *postingsListLRUShard) Remove( segmentUUID uuid.UUID, field string, pattern string, patternType PatternType, ) bool { + c.Lock() + defer c.Unlock() + newKey := newKey(field, pattern, patternType) uuidArray := segmentUUID.Array() if uuidEntries, ok := c.items[uuidArray]; ok { @@ -177,7 +276,10 @@ func (c *postingsListLRU) Remove( return false } -func (c *postingsListLRU) PurgeSegment(segmentUUID uuid.UUID) { +func (c *postingsListLRUShard) PurgeSegment(segmentUUID uuid.UUID) { + c.Lock() + defer c.Unlock() + if uuidEntries, ok := c.items[segmentUUID.Array()]; ok { for _, ent := range uuidEntries { c.removeElement(ent) @@ -186,12 +288,14 @@ func (c *postingsListLRU) PurgeSegment(segmentUUID uuid.UUID) { } // Len returns the number of items in the cache. -func (c *postingsListLRU) Len() int { +func (c *postingsListLRUShard) Len() int { + c.RLock() + defer c.RUnlock() return c.evictList.Len() } // removeOldest removes the oldest item from the cache. -func (c *postingsListLRU) removeOldest() { +func (c *postingsListLRUShard) removeOldest() { ent := c.evictList.Back() if ent != nil { c.removeElement(ent) @@ -199,7 +303,7 @@ func (c *postingsListLRU) removeOldest() { } // removeElement is used to remove a given list element from the cache -func (c *postingsListLRU) removeElement(e *list.Element) { +func (c *postingsListLRUShard) removeElement(e *list.Element) { c.evictList.Remove(e) entry := e.Value.(*entry) @@ -211,6 +315,25 @@ func (c *postingsListLRU) removeElement(e *list.Element) { } } -func newKey(field, pattern string, patternType PatternType) key { - return key{field: field, pattern: pattern, patternType: patternType} +func newKey(field, pattern string, patternType PatternType) PostingsListCacheKey { + return PostingsListCacheKey{ + Field: field, + Pattern: pattern, + PatternType: patternType, + } +} + +func hashKey( + segmentUUID uuid.UUID, + field string, + pattern string, + patternType PatternType, +) uint64 { + var h xxhash.Digest + h.Reset() + _, _ = h.Write(segmentUUID) + _, _ = h.WriteString(field) + _, _ = h.WriteString(pattern) + _, _ = h.WriteString(string(patternType)) + return h.Sum64() } diff --git a/src/dbnode/storage/index/postings_list_cache_lru_test.go b/src/dbnode/storage/index/postings_list_cache_lru_test.go index 2fb384e767..29d92f50ac 100644 --- a/src/dbnode/storage/index/postings_list_cache_lru_test.go +++ b/src/dbnode/storage/index/postings_list_cache_lru_test.go @@ -22,8 +22,18 @@ package index // Keys returns a slice of the keys in the cache, from oldest to newest. Used for // testing only. -func (c *postingsListLRU) keys() []key { - keys := make([]key, 0, len(c.items)) +func (c *postingsListLRU) keys() []PostingsListCacheKey { + var keys []PostingsListCacheKey + for _, shard := range c.shards { + keys = append(keys, shard.keys()...) + } + return keys +} + +// Keys returns a slice of the keys in the cache, from oldest to newest. Used for +// testing only. +func (c *postingsListLRUShard) keys() []PostingsListCacheKey { + keys := make([]PostingsListCacheKey, 0, len(c.items)) for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() { entry := ent.Value.(*entry) keys = append(keys, entry.key) diff --git a/src/dbnode/storage/index/postings_list_cache_test.go b/src/dbnode/storage/index/postings_list_cache_test.go index dff698f428..6ed3d4ac39 100644 --- a/src/dbnode/storage/index/postings_list_cache_test.go +++ b/src/dbnode/storage/index/postings_list_cache_test.go @@ -79,7 +79,7 @@ func init() { type testEntry struct { segmentUUID uuid.UUID - key key + key PostingsListCacheKey postingsList postings.List } @@ -87,7 +87,6 @@ func TestSimpleLRUBehavior(t *testing.T) { size := 3 plCache, err := NewPostingsListCache(size, testPostingListCacheOptions) require.NoError(t, err) - defer plCache.Start()() var ( e0 = testPlEntries[0] @@ -133,22 +132,21 @@ func TestPurgeSegment(t *testing.T) { size := len(testPlEntries) plCache, err := NewPostingsListCache(size, testPostingListCacheOptions) require.NoError(t, err) - defer plCache.Start()() // Write many entries with the same segment UUID. for i := 0; i < 100; i++ { - if testPlEntries[i].key.patternType == PatternTypeRegexp { + if testPlEntries[i].key.PatternType == PatternTypeRegexp { plCache.PutRegexp( testPlEntries[0].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, testPlEntries[i].postingsList, ) } else { plCache.PutTerm( testPlEntries[0].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, testPlEntries[i].postingsList, ) } @@ -165,18 +163,18 @@ func TestPurgeSegment(t *testing.T) { // All entries related to the purged segment should be gone. require.Equal(t, size-100, plCache.lru.Len()) for i := 0; i < 100; i++ { - if testPlEntries[i].key.patternType == PatternTypeRegexp { + if testPlEntries[i].key.PatternType == PatternTypeRegexp { _, ok := plCache.GetRegexp( testPlEntries[0].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, ) require.False(t, ok) } else { _, ok := plCache.GetTerm( testPlEntries[0].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, ) require.False(t, ok) } @@ -191,7 +189,6 @@ func TestPurgeSegment(t *testing.T) { func TestEverthingInsertedCanBeRetrieved(t *testing.T) { plCache, err := NewPostingsListCache(len(testPlEntries), testPostingListCacheOptions) require.NoError(t, err) - defer plCache.Start()() for i := range testPlEntries { putEntry(t, plCache, i) @@ -215,7 +212,6 @@ func TestConcurrencyVerifyResultsNoEviction(t *testing.T) { func testConcurrency(t *testing.T, size int, purge bool, verify bool) { plCache, err := NewPostingsListCache(size, testPostingListCacheOptions) require.NoError(t, err) - defer plCache.Start()() wg := sync.WaitGroup{} // Spin up writers. @@ -276,70 +272,70 @@ func testConcurrency(t *testing.T, size int, purge bool, verify bool) { func putEntry(t *testing.T, cache *PostingsListCache, i int) { // Do each put twice to test the logic that avoids storing // multiple entries for the same value. - switch testPlEntries[i].key.patternType { + switch testPlEntries[i].key.PatternType { case PatternTypeRegexp: cache.PutRegexp( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, testPlEntries[i].postingsList, ) cache.PutRegexp( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, testPlEntries[i].postingsList, ) case PatternTypeTerm: cache.PutTerm( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, testPlEntries[i].postingsList, ) cache.PutTerm( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, testPlEntries[i].postingsList, ) case PatternTypeField: cache.PutField( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, + testPlEntries[i].key.Field, testPlEntries[i].postingsList, ) cache.PutField( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, + testPlEntries[i].key.Field, testPlEntries[i].postingsList, ) default: - require.FailNow(t, "unknown pattern type", testPlEntries[i].key.patternType) + require.FailNow(t, "unknown pattern type", testPlEntries[i].key.PatternType) } } func getEntry(t *testing.T, cache *PostingsListCache, i int) (postings.List, bool) { - switch testPlEntries[i].key.patternType { + switch testPlEntries[i].key.PatternType { case PatternTypeRegexp: return cache.GetRegexp( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, ) case PatternTypeTerm: return cache.GetTerm( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, - testPlEntries[i].key.pattern, + testPlEntries[i].key.Field, + testPlEntries[i].key.Pattern, ) case PatternTypeField: return cache.GetField( testPlEntries[i].segmentUUID, - testPlEntries[i].key.field, + testPlEntries[i].key.Field, ) default: - require.FailNow(t, "unknown pattern type", testPlEntries[i].key.patternType) + require.FailNow(t, "unknown pattern type", testPlEntries[i].key.PatternType) } return nil, false } @@ -353,14 +349,14 @@ func requireExpectedOrder(t *testing.T, plCache *PostingsListCache, expectedOrde func printSortedKeys(t *testing.T, cache *PostingsListCache) { keys := cache.lru.keys() sort.Slice(keys, func(i, j int) bool { - iIdx, err := strconv.ParseInt(keys[i].field, 10, 64) + iIdx, err := strconv.ParseInt(keys[i].Field, 10, 64) if err != nil { - t.Fatalf("unable to parse: %s into int", keys[i].field) + t.Fatalf("unable to parse: %s into int", keys[i].Field) } - jIdx, err := strconv.ParseInt(keys[j].field, 10, 64) + jIdx, err := strconv.ParseInt(keys[j].Field, 10, 64) if err != nil { - t.Fatalf("unable to parse: %s into int", keys[i].field) + t.Fatalf("unable to parse: %s into int", keys[i].Field) } return iIdx < jIdx diff --git a/src/dbnode/storage/index/read_through_segment.go b/src/dbnode/storage/index/read_through_segment.go index 5d39f6b35e..b4c273e7eb 100644 --- a/src/dbnode/storage/index/read_through_segment.go +++ b/src/dbnode/storage/index/read_through_segment.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/m3ninx/index" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" + "github.com/m3db/m3/src/m3ninx/search" "github.com/pborman/uuid" ) @@ -53,34 +54,46 @@ type ReadThroughSegment struct { segment segment.ImmutableSegment - uuid uuid.UUID - postingsListCache *PostingsListCache + uuid uuid.UUID + caches ReadThroughSegmentCaches opts ReadThroughSegmentOptions closed bool } +// ReadThroughSegmentCaches is the set of caches +// to use for the read through segment. +type ReadThroughSegmentCaches struct { + SegmentPostingsListCache *PostingsListCache + SearchPostingsListCache *PostingsListCache +} + // ReadThroughSegmentOptions is the options struct for the // ReadThroughSegment. type ReadThroughSegmentOptions struct { - // Whether the postings list for regexp queries should be cached. + // CacheRegexp sets whether the postings list for regexp queries + // should be cached. CacheRegexp bool - // Whether the postings list for term queries should be cached. + // CacheTerms sets whether the postings list for term queries + // should be cached. CacheTerms bool + // CacheSearches sets whether the postings list for search queries + // should be cached. + CacheSearches bool } // NewReadThroughSegment creates a new read through segment. func NewReadThroughSegment( seg segment.ImmutableSegment, - cache *PostingsListCache, + caches ReadThroughSegmentCaches, opts ReadThroughSegmentOptions, ) *ReadThroughSegment { return &ReadThroughSegment{ - segment: seg, - opts: opts, - uuid: uuid.NewUUID(), - postingsListCache: cache, + segment: seg, + opts: opts, + uuid: uuid.NewUUID(), + caches: caches, } } @@ -96,8 +109,7 @@ func (r *ReadThroughSegment) Reader() (segment.Reader, error) { if err != nil { return nil, err } - return newReadThroughSegmentReader( - reader, r.uuid, r.postingsListCache, r.opts), nil + return newReadThroughSegmentReader(r, reader, r.uuid, r.caches, r.opts), nil } // Close purges all entries in the cache associated with this segment, @@ -111,12 +123,19 @@ func (r *ReadThroughSegment) Close() error { r.closed = true - if r.postingsListCache != nil { + if cache := r.caches.SegmentPostingsListCache; cache != nil { + // Purge segments from the cache before closing the segment to avoid + // temporarily having postings lists in the cache whose underlying + // bytes are no longer mmap'd. + cache.PurgeSegment(r.uuid) + } + if cache := r.caches.SearchPostingsListCache; cache != nil { // Purge segments from the cache before closing the segment to avoid // temporarily having postings lists in the cache whose underlying // bytes are no longer mmap'd. - r.postingsListCache.PurgeSegment(r.uuid) + cache.PurgeSegment(r.uuid) } + return r.segment.Close() } @@ -155,27 +174,78 @@ func (r *ReadThroughSegment) Size() int64 { return r.segment.Size() } +// PutCachedSearchPattern caches a search pattern. +func (r *ReadThroughSegment) PutCachedSearchPattern( + queryStr string, + query search.Query, + pl postings.List, +) { + r.RLock() + defer r.RUnlock() + if r.closed { + return + } + + cache := r.caches.SearchPostingsListCache + if cache == nil || !r.opts.CacheSearches { + return + } + + cache.PutSearch(r.uuid, queryStr, query, pl) +} + +// CachedSearchPatternsResult defines cached search patterns. +type CachedSearchPatternsResult struct { + CacheSearchesDisabled bool + CachedPatternsResult CachedPatternsResult +} + +// CachedSearchPatterns returns cached search patterns. +func (r *ReadThroughSegment) CachedSearchPatterns( + fn CachedPatternForEachFn, +) CachedSearchPatternsResult { + cache := r.caches.SearchPostingsListCache + if cache == nil || !r.opts.CacheSearches { + return CachedSearchPatternsResult{ + CacheSearchesDisabled: true, + } + } + + patternType := PatternTypeSearch + result := cache.CachedPatterns(r.uuid, CachedPatternsQuery{ + PatternType: &patternType, + }, fn) + return CachedSearchPatternsResult{ + CachedPatternsResult: result, + } +} + +var _ search.ReadThroughSegmentSearcher = (*readThroughSegmentReader)(nil) + type readThroughSegmentReader struct { + seg *ReadThroughSegment // reader is explicitly not embedded at the top level // of the struct to force new methods added to index.Reader // to be explicitly supported by the read through cache. - reader segment.Reader - opts ReadThroughSegmentOptions - uuid uuid.UUID - postingsListCache *PostingsListCache + reader segment.Reader + opts ReadThroughSegmentOptions + uuid uuid.UUID + caches ReadThroughSegmentCaches } func newReadThroughSegmentReader( + seg *ReadThroughSegment, reader segment.Reader, uuid uuid.UUID, - cache *PostingsListCache, + caches ReadThroughSegmentCaches, opts ReadThroughSegmentOptions, ) segment.Reader { return &readThroughSegmentReader{ - reader: reader, - opts: opts, - uuid: uuid, - postingsListCache: cache, + seg: seg, + reader: reader, + opts: opts, + uuid: uuid, + caches: caches, } } @@ -185,21 +255,22 @@ func (s *readThroughSegmentReader) MatchRegexp( field []byte, c index.CompiledRegex, ) (postings.List, error) { - if s.postingsListCache == nil || !s.opts.CacheRegexp { + cache := s.caches.SegmentPostingsListCache + if cache == nil || !s.opts.CacheRegexp { return s.reader.MatchRegexp(field, c) } // TODO(rartoul): Would be nice to not allocate strings here. fieldStr := string(field) patternStr := c.FSTSyntax.String() - pl, ok := s.postingsListCache.GetRegexp(s.uuid, fieldStr, patternStr) + pl, ok := cache.GetRegexp(s.uuid, fieldStr, patternStr) if ok { return pl, nil } pl, err := s.reader.MatchRegexp(field, c) if err == nil { - s.postingsListCache.PutRegexp(s.uuid, fieldStr, patternStr, pl) + cache.PutRegexp(s.uuid, fieldStr, patternStr, pl) } return pl, err } @@ -209,21 +280,22 @@ func (s *readThroughSegmentReader) MatchRegexp( func (s *readThroughSegmentReader) MatchTerm( field []byte, term []byte, ) (postings.List, error) { - if s.postingsListCache == nil || !s.opts.CacheTerms { + cache := s.caches.SegmentPostingsListCache + if cache == nil || !s.opts.CacheTerms { return s.reader.MatchTerm(field, term) } // TODO(rartoul): Would be nice to not allocate strings here. fieldStr := string(field) patternStr := string(term) - pl, ok := s.postingsListCache.GetTerm(s.uuid, fieldStr, patternStr) + pl, ok := cache.GetTerm(s.uuid, fieldStr, patternStr) if ok { return pl, nil } pl, err := s.reader.MatchTerm(field, term) if err == nil { - s.postingsListCache.PutTerm(s.uuid, fieldStr, patternStr, pl) + cache.PutTerm(s.uuid, fieldStr, patternStr, pl) } return pl, err } @@ -231,20 +303,21 @@ func (s *readThroughSegmentReader) MatchTerm( // MatchField returns a cached posting list or queries the underlying // segment if their is a cache miss. func (s *readThroughSegmentReader) MatchField(field []byte) (postings.List, error) { - if s.postingsListCache == nil || !s.opts.CacheTerms { + cache := s.caches.SegmentPostingsListCache + if cache == nil || !s.opts.CacheTerms { return s.reader.MatchField(field) } // TODO(rartoul): Would be nice to not allocate strings here. fieldStr := string(field) - pl, ok := s.postingsListCache.GetField(s.uuid, fieldStr) + pl, ok := cache.GetField(s.uuid, fieldStr) if ok { return pl, nil } pl, err := s.reader.MatchField(field) if err == nil { - s.postingsListCache.PutField(s.uuid, fieldStr, pl) + cache.PutField(s.uuid, fieldStr, pl) } return pl, err } @@ -253,7 +326,7 @@ func (s *readThroughSegmentReader) MatchField(field []byte) (postings.List, erro // NB(r): The postings list returned by match all is just an iterator // from zero to the maximum document number indexed by the segment and as such // causes no allocations to compute and construct. -func (s *readThroughSegmentReader) MatchAll() (postings.MutableList, error) { +func (s *readThroughSegmentReader) MatchAll() (postings.List, error) { return s.reader.MatchAll() } @@ -306,3 +379,29 @@ func (s *readThroughSegmentReader) Terms(field []byte) (segment.TermsIterator, e func (s *readThroughSegmentReader) Close() error { return s.reader.Close() } + +func (s *readThroughSegmentReader) Search( + query search.Query, + searcher search.Searcher, +) (postings.List, error) { + cache := s.caches.SearchPostingsListCache + if cache == nil || !s.opts.CacheSearches { + return searcher.Search(s) + } + + // TODO(r): Would be nice to not allocate strings here. + queryStr := query.String() + pl, ok := cache.GetSearch(s.uuid, queryStr) + if ok { + return pl, nil + } + + pl, err := searcher.Search(s) + if err != nil { + return nil, err + } + + cache.PutSearch(s.uuid, queryStr, query, pl) + + return pl, nil +} diff --git a/src/dbnode/storage/index/read_through_segment_test.go b/src/dbnode/storage/index/read_through_segment_test.go index fe604e9628..2b2f0828b6 100644 --- a/src/dbnode/storage/index/read_through_segment_test.go +++ b/src/dbnode/storage/index/read_through_segment_test.go @@ -41,6 +41,14 @@ var ( } ) +func testReadThroughSegmentCaches( + segmentPostingsListCache *PostingsListCache, +) ReadThroughSegmentCaches { + return ReadThroughSegmentCaches{ + SegmentPostingsListCache: segmentPostingsListCache, + } +} + func TestReadThroughSegmentMatchRegexp(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -51,7 +59,6 @@ func TestReadThroughSegmentMatchRegexp(t *testing.T) { cache, err := NewPostingsListCache(1, testPostingListCacheOptions) require.NoError(t, err) - defer cache.Start()() field := []byte("some-field") parsedRegex, err := syntax.Parse(".*this-will-be-slow.*", syntax.Simple) @@ -60,8 +67,9 @@ func TestReadThroughSegmentMatchRegexp(t *testing.T) { FSTSyntax: parsedRegex, } - readThrough, err := NewReadThroughSegment( - seg, cache, defaultReadThroughSegmentOptions).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(cache), + defaultReadThroughSegmentOptions).Reader() require.NoError(t, err) originalPL := roaring.NewPostingsList() @@ -90,7 +98,6 @@ func TestReadThroughSegmentMatchRegexpCacheDisabled(t *testing.T) { cache, err := NewPostingsListCache(1, testPostingListCacheOptions) require.NoError(t, err) - defer cache.Start()() field := []byte("some-field") parsedRegex, err := syntax.Parse(".*this-will-be-slow.*", syntax.Simple) @@ -99,9 +106,12 @@ func TestReadThroughSegmentMatchRegexpCacheDisabled(t *testing.T) { FSTSyntax: parsedRegex, } - readThrough, err := NewReadThroughSegment(seg, cache, ReadThroughSegmentOptions{ - CacheRegexp: false, - }).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(cache), + ReadThroughSegmentOptions{ + CacheRegexp: false, + }). + Reader() require.NoError(t, err) originalPL := roaring.NewPostingsList() @@ -140,8 +150,10 @@ func TestReadThroughSegmentMatchRegexpNoCache(t *testing.T) { FSTSyntax: parsedRegex, } - readThrough, err := NewReadThroughSegment( - seg, nil, defaultReadThroughSegmentOptions).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(nil), + defaultReadThroughSegmentOptions). + Reader() require.NoError(t, err) originalPL := roaring.NewPostingsList() @@ -164,7 +176,6 @@ func TestReadThroughSegmentMatchTerm(t *testing.T) { cache, err := NewPostingsListCache(1, testPostingListCacheOptions) require.NoError(t, err) - defer cache.Start()() var ( field = []byte("some-field") @@ -174,8 +185,10 @@ func TestReadThroughSegmentMatchTerm(t *testing.T) { ) require.NoError(t, originalPL.Insert(1)) - readThrough, err := NewReadThroughSegment( - seg, cache, defaultReadThroughSegmentOptions).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(cache), + defaultReadThroughSegmentOptions). + Reader() require.NoError(t, err) reader.EXPECT().MatchTerm(field, term).Return(originalPL, nil) @@ -202,7 +215,6 @@ func TestReadThroughSegmentMatchTermCacheDisabled(t *testing.T) { cache, err := NewPostingsListCache(1, testPostingListCacheOptions) require.NoError(t, err) - defer cache.Start()() var ( field = []byte("some-field") @@ -212,9 +224,12 @@ func TestReadThroughSegmentMatchTermCacheDisabled(t *testing.T) { ) require.NoError(t, originalPL.Insert(1)) - readThrough, err := NewReadThroughSegment(seg, cache, ReadThroughSegmentOptions{ - CacheTerms: false, - }).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(cache), + ReadThroughSegmentOptions{ + CacheTerms: false, + }). + Reader() require.NoError(t, err) reader.EXPECT(). @@ -251,8 +266,10 @@ func TestReadThroughSegmentMatchTermNoCache(t *testing.T) { seg.EXPECT().Reader().Return(reader, nil) - readThrough, err := NewReadThroughSegment( - seg, nil, defaultReadThroughSegmentOptions).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(nil), + defaultReadThroughSegmentOptions). + Reader() require.NoError(t, err) reader.EXPECT().MatchTerm(field, term).Return(originalPL, nil) @@ -270,10 +287,10 @@ func TestClose(t *testing.T) { segment := fst.NewMockSegment(ctrl) cache, err := NewPostingsListCache(1, testPostingListCacheOptions) require.NoError(t, err) - defer cache.Start()() - readThroughSeg := NewReadThroughSegment( - segment, cache, defaultReadThroughSegmentOptions) + readThroughSeg := NewReadThroughSegment(segment, + testReadThroughSegmentCaches(nil), + defaultReadThroughSegmentOptions) segmentUUID := readThroughSeg.uuid @@ -305,7 +322,6 @@ func TestReadThroughSegmentMatchField(t *testing.T) { cache, err := NewPostingsListCache(1, testPostingListCacheOptions) require.NoError(t, err) - defer cache.Start()() var ( field = []byte("some-field") @@ -314,8 +330,10 @@ func TestReadThroughSegmentMatchField(t *testing.T) { ) require.NoError(t, originalPL.Insert(1)) - readThrough, err := NewReadThroughSegment( - seg, cache, defaultReadThroughSegmentOptions).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(cache), + defaultReadThroughSegmentOptions). + Reader() require.NoError(t, err) reader.EXPECT().MatchField(field).Return(originalPL, nil) @@ -342,7 +360,6 @@ func TestReadThroughSegmentMatchFieldCacheDisabled(t *testing.T) { cache, err := NewPostingsListCache(1, testPostingListCacheOptions) require.NoError(t, err) - defer cache.Start()() var ( field = []byte("some-field") @@ -351,9 +368,12 @@ func TestReadThroughSegmentMatchFieldCacheDisabled(t *testing.T) { ) require.NoError(t, originalPL.Insert(1)) - readThrough, err := NewReadThroughSegment(seg, cache, ReadThroughSegmentOptions{ - CacheTerms: false, - }).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(cache), + ReadThroughSegmentOptions{ + CacheTerms: false, + }). + Reader() require.NoError(t, err) reader.EXPECT(). @@ -389,8 +409,10 @@ func TestReadThroughSegmentMatchFieldNoCache(t *testing.T) { seg.EXPECT().Reader().Return(reader, nil) - readThrough, err := NewReadThroughSegment( - seg, nil, defaultReadThroughSegmentOptions).Reader() + readThrough, err := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(nil), + defaultReadThroughSegmentOptions). + Reader() require.NoError(t, err) reader.EXPECT().MatchField(field).Return(originalPL, nil) @@ -407,8 +429,9 @@ func TestCloseNoCache(t *testing.T) { seg := fst.NewMockSegment(ctrl) - readThrough := NewReadThroughSegment( - seg, nil, defaultReadThroughSegmentOptions) + readThrough := NewReadThroughSegment(seg, + testReadThroughSegmentCaches(nil), + defaultReadThroughSegmentOptions) seg.EXPECT().Close().Return(nil) err := readThrough.Close() diff --git a/src/dbnode/storage/index/segments.go b/src/dbnode/storage/index/segments.go index 6f3134d1c9..2ef9c92e3c 100644 --- a/src/dbnode/storage/index/segments.go +++ b/src/dbnode/storage/index/segments.go @@ -28,10 +28,11 @@ import ( ) type readableSeg struct { - nowFn clock.NowFn - createdAt time.Time - segment segment.Segment - garbageCollecting bool + nowFn clock.NowFn + createdAt time.Time + segment segment.Segment + garbageCollectLastCheck time.Time + garbageCollecting bool } func newReadableSeg( diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 213483f5db..9a0a49fc2c 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -1088,6 +1088,12 @@ type Options interface { // PostingsListCache returns the postings list cache. PostingsListCache() *PostingsListCache + // SetSearchPostingsListCache sets the postings list cache. + SetSearchPostingsListCache(value *PostingsListCache) Options + + // SearchPostingsListCache returns the postings list cache. + SearchPostingsListCache() *PostingsListCache + // SetReadThroughSegmentOptions sets the read through segment cache options. SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Options diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index a41822d126..7a4d033a3f 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1430,6 +1430,7 @@ func (s *dbShard) insertNewShardEntryWithLock(entry *Entry) { NoCopyKey: true, NoFinalizeKey: true, }) + entry.SetInsertTime(s.nowFn()) } func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { diff --git a/src/m3ninx/index/index_mock.go b/src/m3ninx/index/index_mock.go index 9ad8c74f2c..49d567f0f8 100644 --- a/src/m3ninx/index/index_mock.go +++ b/src/m3ninx/index/index_mock.go @@ -116,10 +116,10 @@ func (mr *MockReaderMockRecorder) Docs(arg0 interface{}) *gomock.Call { } // MatchAll mocks base method. -func (m *MockReader) MatchAll() (postings.MutableList, error) { +func (m *MockReader) MatchAll() (postings.List, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "MatchAll") - ret0, _ := ret[0].(postings.MutableList) + ret0, _ := ret[0].(postings.List) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index ea104f8385..b8608f0912 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -986,7 +986,7 @@ func (sr *fsSegmentReader) MatchRegexp( return pl, err } -func (sr *fsSegmentReader) MatchAll() (postings.MutableList, error) { +func (sr *fsSegmentReader) MatchAll() (postings.List, error) { if sr.closed { return nil, errReaderClosed } diff --git a/src/m3ninx/index/segment/mem/reader.go b/src/m3ninx/index/segment/mem/reader.go index 93a42824f9..981648ebe3 100644 --- a/src/m3ninx/index/segment/mem/reader.go +++ b/src/m3ninx/index/segment/mem/reader.go @@ -113,7 +113,7 @@ func (r *reader) MatchRegexp(field []byte, compiled index.CompiledRegex) (postin return r.segment.matchRegexp(field, compileRE) } -func (r *reader) MatchAll() (postings.MutableList, error) { +func (r *reader) MatchAll() (postings.List, error) { r.RLock() defer r.RUnlock() if r.closed { diff --git a/src/m3ninx/index/segment/segment_mock.go b/src/m3ninx/index/segment/segment_mock.go index 37be204faa..cd70f773f0 100644 --- a/src/m3ninx/index/segment/segment_mock.go +++ b/src/m3ninx/index/segment/segment_mock.go @@ -287,10 +287,10 @@ func (mr *MockReaderMockRecorder) FieldsPostingsList() *gomock.Call { } // MatchAll mocks base method. -func (m *MockReader) MatchAll() (postings.MutableList, error) { +func (m *MockReader) MatchAll() (postings.List, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "MatchAll") - ret0, _ := ret[0].(postings.MutableList) + ret0, _ := ret[0].(postings.List) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/src/m3ninx/index/types.go b/src/m3ninx/index/types.go index 1a005691aa..bed2ad5d81 100644 --- a/src/m3ninx/index/types.go +++ b/src/m3ninx/index/types.go @@ -75,7 +75,7 @@ type Readable interface { MatchRegexp(field []byte, c CompiledRegex) (postings.List, error) // MatchAll returns a postings list for all documents known to the Reader. - MatchAll() (postings.MutableList, error) + MatchAll() (postings.List, error) // MetadataIterator returns an iterator over the metadata whose IDs are in the provided // postings list. diff --git a/src/m3ninx/search/executor/executor.go b/src/m3ninx/search/executor/executor.go index 7a376b8c44..86f18c4b92 100644 --- a/src/m3ninx/search/executor/executor.go +++ b/src/m3ninx/search/executor/executor.go @@ -34,7 +34,11 @@ var ( errExecutorClosed = errors.New("executor is closed") ) -type newIteratorFn func(ctx context.Context, s search.Searcher, rs index.Readers) doc.QueryDocIterator +type newIteratorFn func( + ctx context.Context, + q search.Query, + rs index.Readers, +) (doc.QueryDocIterator, error) type executor struct { sync.RWMutex @@ -60,13 +64,11 @@ func (e *executor) Execute(ctx context.Context, q search.Query) (doc.QueryDocIte return nil, errExecutorClosed } - s, err := q.Searcher() + iter, err := e.newIteratorFn(ctx, q, e.readers) if err != nil { return nil, err } - iter := e.newIteratorFn(ctx, s, e.readers) - return iter, nil } diff --git a/src/m3ninx/search/executor/executor_test.go b/src/m3ninx/search/executor/executor_test.go index dbab9a848f..bfe455c186 100644 --- a/src/m3ninx/search/executor/executor_test.go +++ b/src/m3ninx/search/executor/executor_test.go @@ -51,17 +51,13 @@ func TestExecutor(t *testing.T) { r = index.NewMockReader(mockCtrl) rs = index.Readers{r} ) - gomock.InOrder( - q.EXPECT().Searcher().Return(nil, nil), - - r.EXPECT().Close().Return(nil), - ) + r.EXPECT().Close().Return(nil) e := NewExecutor(rs).(*executor) // Override newIteratorFn to return test iterator. - e.newIteratorFn = func(_ context.Context, _ search.Searcher, _ index.Readers) doc.QueryDocIterator { - return newTestIterator() + e.newIteratorFn = func(_ context.Context, _ search.Query, _ index.Readers) (doc.QueryDocIterator, error) { + return newTestIterator(), nil } it, err := e.Execute(context.NewBackground(), q) diff --git a/src/m3ninx/search/executor/iterator.go b/src/m3ninx/search/executor/iterator.go index e209b01fc5..f9943c3454 100644 --- a/src/m3ninx/search/executor/iterator.go +++ b/src/m3ninx/search/executor/iterator.go @@ -24,6 +24,7 @@ import ( "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index" + "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/errors" @@ -40,6 +41,7 @@ import ( // doc iterators. for each posting in the list, the encoded document is retrieved. type iterator struct { // immutable state + query search.Query searcher search.Searcher readers index.Readers ctx context.Context @@ -55,12 +57,18 @@ type iterator struct { err error } -func newIterator(ctx context.Context, s search.Searcher, rs index.Readers) doc.QueryDocIterator { +func newIterator(ctx context.Context, q search.Query, rs index.Readers) (doc.QueryDocIterator, error) { + s, err := q.Searcher() + if err != nil { + return nil, err + } + return &iterator{ ctx: ctx, + query: q, searcher: s, readers: rs, - } + }, nil } func (it *iterator) Done() bool { @@ -148,15 +156,26 @@ func (it *iterator) initIters() error { it.iters = make([]doc.Iterator, len(it.readers)) for i, reader := range it.readers { _, sp := it.ctx.StartTraceSpan(tracepoint.SearchExecutorIndexSearch) - pl, err := it.searcher.Search(reader) + + var ( + pl postings.List + err error + ) + if readThrough, ok := reader.(search.ReadThroughSegmentSearcher); ok { + pl, err = readThrough.Search(it.query, it.searcher) + } else { + pl, err = it.searcher.Search(reader) + } sp.Finish() if err != nil { return err } + iter, err := reader.Docs(pl) if err != nil { return err } + it.iters[i] = iter } return nil diff --git a/src/m3ninx/search/executor/iterator_test.go b/src/m3ninx/search/executor/iterator_test.go index be876cafca..8eaeccd898 100644 --- a/src/m3ninx/search/executor/iterator_test.go +++ b/src/m3ninx/search/executor/iterator_test.go @@ -83,10 +83,14 @@ func TestIterator(t *testing.T) { searcher.EXPECT().Search(secondReader).Return(secondPL, nil), ) + query := search.NewMockQuery(mockCtrl) + query.EXPECT().Searcher().Return(searcher, nil) + readers := index.Readers{firstReader, secondReader} // Construct iterator and run tests. - iter := newIterator(context.NewBackground(), searcher, readers) + iter, err := newIterator(context.NewBackground(), query, readers) + require.NoError(t, err) require.False(t, iter.Done()) require.True(t, iter.Next()) @@ -146,10 +150,14 @@ func TestCloseEarly(t *testing.T) { searcher.EXPECT().Search(secondReader).Return(secondPL, nil), ) + query := search.NewMockQuery(mockCtrl) + query.EXPECT().Searcher().Return(searcher, nil) + readers := index.Readers{firstReader, secondReader} // Construct iterator and run tests. - iter := newIterator(context.NewBackground(), searcher, readers) + iter, err := newIterator(context.NewBackground(), query, readers) + require.NoError(t, err) require.True(t, iter.Next()) require.Equal(t, docs[0], iter.Current()) @@ -195,10 +203,14 @@ func TestErrIterating(t *testing.T) { searcher.EXPECT().Search(secondReader).Return(secondPL, nil), ) + query := search.NewMockQuery(mockCtrl) + query.EXPECT().Searcher().Return(searcher, nil) + readers := index.Readers{firstReader, secondReader} // Construct iterator and run tests. - iter := newIterator(context.NewBackground(), searcher, readers) + iter, err := newIterator(context.NewBackground(), query, readers) + require.NoError(t, err) require.False(t, iter.Done()) require.True(t, iter.Next()) @@ -211,12 +223,24 @@ func TestErrIterating(t *testing.T) { } func TestCloseBeforeNext(t *testing.T) { - iter := newIterator(context.NewBackground(), nil, nil) + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + query := search.NewMockQuery(mockCtrl) + query.EXPECT().Searcher().Return(nil, nil) + + iter, err := newIterator(context.NewBackground(), query, nil) + require.NoError(t, err) require.NoError(t, iter.Close()) } func TestNoReaders(t *testing.T) { - iter := newIterator(context.NewBackground(), nil, index.Readers{}) + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + query := search.NewMockQuery(mockCtrl) + query.EXPECT().Searcher().Return(nil, nil) + + iter, err := newIterator(context.NewBackground(), query, index.Readers{}) + require.NoError(t, err) require.False(t, iter.Done()) require.False(t, iter.Next()) require.True(t, iter.Done()) @@ -257,10 +281,14 @@ func TestEmptyReaders(t *testing.T) { searcher.EXPECT().Search(secondReader).Return(secondPL, nil), ) + query := search.NewMockQuery(mockCtrl) + query.EXPECT().Searcher().Return(searcher, nil) + readers := index.Readers{firstReader, secondReader} // Construct iterator and run tests. - iter := newIterator(context.NewBackground(), searcher, readers) + iter, err := newIterator(context.NewBackground(), query, readers) + require.NoError(t, err) require.False(t, iter.Done()) require.False(t, iter.Next()) require.True(t, iter.Done()) diff --git a/src/m3ninx/search/query/all.go b/src/m3ninx/search/query/all.go index 693662f870..dceb914c52 100644 --- a/src/m3ninx/search/query/all.go +++ b/src/m3ninx/search/query/all.go @@ -21,13 +21,13 @@ package query import ( - "fmt" - "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/m3ninx/search/searcher" ) +const allQueryStr = "all()" + // AllQuery returns a query which matches all known documents. type AllQuery struct{} @@ -62,5 +62,5 @@ func (q *AllQuery) ToProto() *querypb.Query { } func (q *AllQuery) String() string { - return fmt.Sprintf("all()") + return allQueryStr } diff --git a/src/m3ninx/search/query/codec.go b/src/m3ninx/search/query/codec.go index d7fc0a9a3f..806f39f91c 100644 --- a/src/m3ninx/search/query/codec.go +++ b/src/m3ninx/search/query/codec.go @@ -45,10 +45,11 @@ func Unmarshal(data []byte) (search.Query, error) { return nil, err } - return unmarshal(&pb) + return UnmarshalProto(&pb) } -func unmarshal(q *querypb.Query) (search.Query, error) { +// UnmarshalProto will unmarshal a proto query. +func UnmarshalProto(q *querypb.Query) (search.Query, error) { switch q := q.Query.(type) { case *querypb.Query_All: @@ -64,7 +65,7 @@ func unmarshal(q *querypb.Query) (search.Query, error) { return NewRegexpQuery(q.Regexp.Field, q.Regexp.Regexp) case *querypb.Query_Negation: - inner, err := unmarshal(q.Negation.Query) + inner, err := UnmarshalProto(q.Negation.Query) if err != nil { return nil, err } @@ -73,7 +74,7 @@ func unmarshal(q *querypb.Query) (search.Query, error) { case *querypb.Query_Conjunction: qs := make([]search.Query, 0, len(q.Conjunction.Queries)) for _, qry := range q.Conjunction.Queries { - sqry, err := unmarshal(qry) + sqry, err := UnmarshalProto(qry) if err != nil { return nil, err } @@ -84,7 +85,7 @@ func unmarshal(q *querypb.Query) (search.Query, error) { case *querypb.Query_Disjunction: qs := make([]search.Query, 0, len(q.Disjunction.Queries)) for _, qry := range q.Disjunction.Queries { - sqry, err := unmarshal(qry) + sqry, err := UnmarshalProto(qry) if err != nil { return nil, err } diff --git a/src/m3ninx/search/query/conjunction.go b/src/m3ninx/search/query/conjunction.go index 0b906e4ad6..12b429973a 100644 --- a/src/m3ninx/search/query/conjunction.go +++ b/src/m3ninx/search/query/conjunction.go @@ -21,7 +21,8 @@ package query import ( - "fmt" + "sort" + "strings" "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" "github.com/m3db/m3/src/m3ninx/search" @@ -30,6 +31,7 @@ import ( // ConjuctionQuery finds documents which match at least one of the given queries. type ConjuctionQuery struct { + str string queries []search.Query negations []search.Query } @@ -59,10 +61,23 @@ func NewConjunctionQuery(queries []search.Query) search.Query { ns = ns[1:] } - return &ConjuctionQuery{ + // Cause a sort of the queries/negations for deterministic cache key. + sort.Slice(qs, func(i, j int) bool { + return qs[i].String() < qs[j].String() + }) + sort.Slice(ns, func(i, j int) bool { + return ns[i].String() < ns[j].String() + }) + + q := &ConjuctionQuery{ queries: qs, negations: ns, } + // NB(r): Calculate string value up front so + // not allocated every time String() is called to determine + // the cache key. + q.str = q.string() + return q } // Searcher returns a searcher over the provided readers. @@ -151,10 +166,17 @@ func (q *ConjuctionQuery) ToProto() *querypb.Query { } func (q *ConjuctionQuery) String() string { + return q.str +} + +func (q *ConjuctionQuery) string() string { + var str strings.Builder + str.WriteString("conjunction(") + join(&str, q.queries) if len(q.negations) > 0 { - return fmt.Sprintf("conjunction(%s,%s)", - join(q.queries), joinNegation(q.negations)) + str.WriteRune(',') + joinNegation(&str, q.negations) } - - return fmt.Sprintf("conjunction(%s)", join(q.queries)) + str.WriteRune(')') + return str.String() } diff --git a/src/m3ninx/search/query/conjunction_test.go b/src/m3ninx/search/query/conjunction_test.go index 04a2b6a08c..77443a6758 100644 --- a/src/m3ninx/search/query/conjunction_test.go +++ b/src/m3ninx/search/query/conjunction_test.go @@ -116,7 +116,7 @@ func TestConjunctionQueryEqual(t *testing.T) { NewTermQuery([]byte("fruit"), []byte("banana")), NewTermQuery([]byte("fruit"), []byte("apple")), }), - expected: false, + expected: true, }, } diff --git a/src/m3ninx/search/query/disjunction.go b/src/m3ninx/search/query/disjunction.go index 6904c3b994..51dc83c6dc 100644 --- a/src/m3ninx/search/query/disjunction.go +++ b/src/m3ninx/search/query/disjunction.go @@ -21,7 +21,8 @@ package query import ( - "fmt" + "sort" + "strings" "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" "github.com/m3db/m3/src/m3ninx/search" @@ -30,6 +31,7 @@ import ( // DisjuctionQuery finds documents which match at least one of the given queries. type DisjuctionQuery struct { + str string queries []search.Query } @@ -47,9 +49,18 @@ func NewDisjunctionQuery(queries []search.Query) search.Query { qs = append(qs, query) } - return &DisjuctionQuery{ + // Cause a sort of the queries/negations for deterministic cache key. + sort.Slice(qs, func(i, j int) bool { + return qs[i].String() < qs[j].String() + }) + q := &DisjuctionQuery{ queries: qs, } + // NB(r): Calculate string value up front so + // not allocated every time String() is called to determine + // the cache key. + q.str = q.string() + return q } // Searcher returns a searcher over the provided readers. @@ -112,5 +123,13 @@ func (q *DisjuctionQuery) ToProto() *querypb.Query { } func (q *DisjuctionQuery) String() string { - return fmt.Sprintf("disjunction(%s)", join(q.queries)) + return q.str +} + +func (q *DisjuctionQuery) string() string { + var str strings.Builder + str.WriteString("disjunction(") + join(&str, q.queries) + str.WriteRune(')') + return str.String() } diff --git a/src/m3ninx/search/query/disjunction_test.go b/src/m3ninx/search/query/disjunction_test.go index 2610f75c08..e2b5f675cc 100644 --- a/src/m3ninx/search/query/disjunction_test.go +++ b/src/m3ninx/search/query/disjunction_test.go @@ -102,7 +102,7 @@ func TestDisjunctionQueryEqual(t *testing.T) { NewTermQuery([]byte("fruit"), []byte("banana")), NewTermQuery([]byte("fruit"), []byte("apple")), }), - expected: false, + expected: true, }, } diff --git a/src/m3ninx/search/query/field.go b/src/m3ninx/search/query/field.go index 882f6bae91..958805cc53 100644 --- a/src/m3ninx/search/query/field.go +++ b/src/m3ninx/search/query/field.go @@ -22,7 +22,7 @@ package query import ( "bytes" - "fmt" + "strings" "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" "github.com/m3db/m3/src/m3ninx/search" @@ -31,14 +31,17 @@ import ( // FieldQuery finds document which have the given field exactly. type FieldQuery struct { + str string field []byte } // NewFieldQuery constructs a new FieldQuery for the given field. func NewFieldQuery(field []byte) search.Query { - return &FieldQuery{ + q := &FieldQuery{ field: field, } + q.str = q.string() + return q } // Field returns the field []byte. @@ -78,5 +81,13 @@ func (q *FieldQuery) ToProto() *querypb.Query { } func (q *FieldQuery) String() string { - return fmt.Sprintf("field(%s)", q.field) + return q.str +} + +func (q *FieldQuery) string() string { + var str strings.Builder + str.WriteString("field(") + str.Write(q.field) + str.WriteRune(')') + return str.String() } diff --git a/src/m3ninx/search/query/negation.go b/src/m3ninx/search/query/negation.go index c4863dac6d..3e2b7389b5 100644 --- a/src/m3ninx/search/query/negation.go +++ b/src/m3ninx/search/query/negation.go @@ -21,7 +21,7 @@ package query import ( - "fmt" + "strings" "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" "github.com/m3db/m3/src/m3ninx/search" @@ -30,14 +30,17 @@ import ( // NegationQuery finds document which do not match a given query. type NegationQuery struct { + str string query search.Query } // NewNegationQuery constructs a new NegationQuery for the given query. -func NewNegationQuery(q search.Query) search.Query { - return &NegationQuery{ - query: q, +func NewNegationQuery(query search.Query) search.Query { + q := &NegationQuery{ + query: query, } + q.str = q.string() + return q } // Searcher returns a searcher over the provided readers. @@ -75,5 +78,13 @@ func (q *NegationQuery) ToProto() *querypb.Query { } func (q *NegationQuery) String() string { - return fmt.Sprintf("negation(%s)", q.query) + return q.str +} + +func (q *NegationQuery) string() string { + var str strings.Builder + str.WriteString("negation(") + str.WriteString(q.query.String()) + str.WriteRune(')') + return str.String() } diff --git a/src/m3ninx/search/query/regexp.go b/src/m3ninx/search/query/regexp.go index 475c4590c8..6cf814de62 100644 --- a/src/m3ninx/search/query/regexp.go +++ b/src/m3ninx/search/query/regexp.go @@ -22,7 +22,7 @@ package query import ( "bytes" - "fmt" + "strings" "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" "github.com/m3db/m3/src/m3ninx/index" @@ -32,6 +32,7 @@ import ( // RegexpQuery finds documents which match the given regular expression. type RegexpQuery struct { + str string field []byte regexp []byte compiled index.CompiledRegex @@ -44,11 +45,16 @@ func NewRegexpQuery(field, regexp []byte) (search.Query, error) { return nil, err } - return &RegexpQuery{ + q := &RegexpQuery{ field: field, regexp: regexp, compiled: compiled, - }, nil + } + // NB(r): Calculate string value up front so + // not allocated every time String() is called to determine + // the cache key. + q.str = q.string() + return q, nil } // MustCreateRegexpQuery is like NewRegexpQuery but panics if the query cannot be created. @@ -93,5 +99,15 @@ func (q *RegexpQuery) ToProto() *querypb.Query { } func (q *RegexpQuery) String() string { - return fmt.Sprintf("regexp(%s, %s)", q.field, q.regexp) + return q.str +} + +func (q *RegexpQuery) string() string { + var str strings.Builder + str.WriteString("regexp(") + str.Write(q.field) + str.WriteRune(',') + str.Write(q.regexp) + str.WriteRune(')') + return str.String() } diff --git a/src/m3ninx/search/query/term.go b/src/m3ninx/search/query/term.go index 6783fe968d..b5d10cb63c 100644 --- a/src/m3ninx/search/query/term.go +++ b/src/m3ninx/search/query/term.go @@ -22,7 +22,7 @@ package query import ( "bytes" - "fmt" + "strings" "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" "github.com/m3db/m3/src/m3ninx/search" @@ -31,16 +31,22 @@ import ( // TermQuery finds document which match the given term exactly. type TermQuery struct { + str string field []byte term []byte } // NewTermQuery constructs a new TermQuery for the given field and term. func NewTermQuery(field, term []byte) search.Query { - return &TermQuery{ + q := &TermQuery{ field: field, term: term, } + // NB(r): Calculate string value up front so + // not allocated every time String() is called to determine + // the cache key. + q.str = q.string() + return q } // Searcher returns a searcher over the provided readers. @@ -76,5 +82,15 @@ func (q *TermQuery) ToProto() *querypb.Query { } func (q *TermQuery) String() string { - return fmt.Sprintf("term(%s, %s)", q.field, q.term) + return q.str +} + +func (q *TermQuery) string() string { + var str strings.Builder + str.WriteString("term(") + str.Write(q.field) + str.WriteRune(',') + str.Write(q.term) + str.WriteRune(')') + return str.String() } diff --git a/src/m3ninx/search/query/util.go b/src/m3ninx/search/query/util.go index f162e12cc7..34d157ff5c 100644 --- a/src/m3ninx/search/query/util.go +++ b/src/m3ninx/search/query/util.go @@ -21,8 +21,7 @@ package query import ( - "bytes" - "fmt" + "strings" "github.com/m3db/m3/src/m3ninx/search" ) @@ -53,34 +52,34 @@ func singular(q search.Query) (search.Query, bool) { } // join concatenates a slice of queries. -func join(qs []search.Query) string { +func join(b *strings.Builder, qs []search.Query) { switch len(qs) { case 0: - return "" + return case 1: - return qs[0].String() + b.WriteString(qs[0].String()) + return } - var b bytes.Buffer b.WriteString(qs[0].String()) for _, q := range qs[1:] { b.WriteString(separator) b.WriteString(q.String()) } - - return b.String() } // joinNegation concatenates a slice of negated queries. -func joinNegation(qs []search.Query) string { +func joinNegation(b *strings.Builder, qs []search.Query) { switch len(qs) { case 0: - return "" + return case 1: - return fmt.Sprintf("%s%s%s", negationPrefix, qs[0].String(), negationPostfix) + b.WriteString(negationPrefix) + b.WriteString(qs[0].String()) + b.WriteString(negationPostfix) + return } - var b bytes.Buffer b.WriteString(negationPrefix) b.WriteString(qs[0].String()) for _, q := range qs[1:] { @@ -89,5 +88,4 @@ func joinNegation(qs []search.Query) string { } b.WriteString(negationPostfix) - return b.String() } diff --git a/src/m3ninx/search/query/util_test.go b/src/m3ninx/search/query/util_test.go index 6159b96bc8..56915c410f 100644 --- a/src/m3ninx/search/query/util_test.go +++ b/src/m3ninx/search/query/util_test.go @@ -21,6 +21,7 @@ package query import ( + "strings" "testing" "github.com/m3db/m3/src/m3ninx/search" @@ -64,8 +65,11 @@ func TestJoin(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - require.Equal(t, test.expected, join(test.input)) - require.Equal(t, test.expectedNegation, joinNegation(test.input)) + var j, jn strings.Builder + join(&j, test.input) + joinNegation(&jn, test.input) + require.Equal(t, test.expected, j.String()) + require.Equal(t, test.expectedNegation, jn.String()) }) } } diff --git a/src/m3ninx/search/search_mock.go b/src/m3ninx/search/search_mock.go index 68e90a5bd7..0bf1d8805a 100644 --- a/src/m3ninx/search/search_mock.go +++ b/src/m3ninx/search/search_mock.go @@ -205,3 +205,41 @@ func (mr *MockSearcherMockRecorder) Search(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Search", reflect.TypeOf((*MockSearcher)(nil).Search), arg0) } + +// MockReadThroughSegmentSearcher is a mock of ReadThroughSegmentSearcher interface. +type MockReadThroughSegmentSearcher struct { + ctrl *gomock.Controller + recorder *MockReadThroughSegmentSearcherMockRecorder +} + +// MockReadThroughSegmentSearcherMockRecorder is the mock recorder for MockReadThroughSegmentSearcher. +type MockReadThroughSegmentSearcherMockRecorder struct { + mock *MockReadThroughSegmentSearcher +} + +// NewMockReadThroughSegmentSearcher creates a new mock instance. +func NewMockReadThroughSegmentSearcher(ctrl *gomock.Controller) *MockReadThroughSegmentSearcher { + mock := &MockReadThroughSegmentSearcher{ctrl: ctrl} + mock.recorder = &MockReadThroughSegmentSearcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockReadThroughSegmentSearcher) EXPECT() *MockReadThroughSegmentSearcherMockRecorder { + return m.recorder +} + +// Search mocks base method. +func (m *MockReadThroughSegmentSearcher) Search(query Query, searcher Searcher) (postings.List, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Search", query, searcher) + ret0, _ := ret[0].(postings.List) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Search indicates an expected call of Search. +func (mr *MockReadThroughSegmentSearcherMockRecorder) Search(query, searcher interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Search", reflect.TypeOf((*MockReadThroughSegmentSearcher)(nil).Search), query, searcher) +} diff --git a/src/m3ninx/search/searcher/negation.go b/src/m3ninx/search/searcher/negation.go index 5df70c4e8b..58ea8323c6 100644 --- a/src/m3ninx/search/searcher/negation.go +++ b/src/m3ninx/search/searcher/negation.go @@ -21,11 +21,15 @@ package searcher import ( + "errors" + "github.com/m3db/m3/src/m3ninx/index" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/search" ) +var errNotMutable = errors.New("match all postings list for negation immmutable") + type negationSearcher struct { searcher search.Searcher } @@ -44,11 +48,19 @@ func (s *negationSearcher) Search(r index.Reader) (postings.List, error) { return nil, err } - sPl, err := s.searcher.Search(r) + negatePl, err := s.searcher.Search(r) if err != nil { return nil, err } - pl.Difference(sPl) - return pl, nil + mutable, ok := pl.(postings.MutableList) + if !ok { + return nil, errNotMutable + } + + result := mutable.Clone() + if err := result.Difference(negatePl); err != nil { + return nil, err + } + return result, nil } diff --git a/src/m3ninx/search/types.go b/src/m3ninx/search/types.go index 13152dccd0..40c9619e4e 100644 --- a/src/m3ninx/search/types.go +++ b/src/m3ninx/search/types.go @@ -62,3 +62,9 @@ type Searcher interface { // Searchers is a slice of Searcher. type Searchers []Searcher + +// ReadThroughSegmentSearcher searches a read through segment +// and potentially caches the result. +type ReadThroughSegmentSearcher interface { + Search(query Query, searcher Searcher) (postings.List, error) +} diff --git a/src/m3ninx/x/safe_closer.go b/src/m3ninx/x/safe_closer.go index 69de7cd96b..2af7cc9db8 100644 --- a/src/m3ninx/x/safe_closer.go +++ b/src/m3ninx/x/safe_closer.go @@ -26,10 +26,16 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" ) +// SafeCloser is a reuesable safe closer. +type SafeCloser interface { + io.Closer + Reset(closer io.Closer) +} + // NewSafeCloser returns a io.Closer which ensures the // underlying Close() is only called once. It's // useful for cleanup of resources in functions. -func NewSafeCloser(x io.Closer) io.Closer { +func NewSafeCloser(x io.Closer) SafeCloser { return &safeCloser{Closer: x} } @@ -40,6 +46,11 @@ type safeCloser struct { closed bool } +func (c *safeCloser) Reset(closer io.Closer) { + c.Closer = closer + c.closed = false +} + // Close guarantees the underlying Closable's Close() is // only executed the first time it's called. func (c *safeCloser) Close() error { diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index 59d6356dd9..25a4f8745b 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -81,7 +81,7 @@ func TestFetchQueryToM3Query(t *testing.T) { }{ { name: "exact match", - expected: "term(t1, v1)", + expected: "term(t1,v1)", matchers: models.Matchers{ { Type: models.MatchEqual, @@ -92,7 +92,7 @@ func TestFetchQueryToM3Query(t *testing.T) { }, { name: "exact match negated", - expected: "negation(term(t1, v1))", + expected: "negation(term(t1,v1))", matchers: models.Matchers{ { Type: models.MatchNotEqual, @@ -103,7 +103,7 @@ func TestFetchQueryToM3Query(t *testing.T) { }, { name: "regexp match", - expected: "regexp(t1, v1)", + expected: "regexp(t1,v1)", matchers: models.Matchers{ { Type: models.MatchRegexp, @@ -136,7 +136,7 @@ func TestFetchQueryToM3Query(t *testing.T) { }, { name: "regexp match negated", - expected: "negation(regexp(t1, v1))", + expected: "negation(regexp(t1,v1))", matchers: models.Matchers{ { Type: models.MatchNotRegexp, @@ -194,7 +194,7 @@ func TestFetchQueryToM3Query(t *testing.T) { }, { name: "regexp match dot star with trailing characters -> regex", - expected: "regexp(t1, .*foo)", + expected: "regexp(t1,.*foo)", matchers: models.Matchers{ { Type: models.MatchRegexp, @@ -205,7 +205,7 @@ func TestFetchQueryToM3Query(t *testing.T) { }, { name: "regexp match dot plus with trailing characters -> regex", - expected: "regexp(t1, .+foo)", + expected: "regexp(t1,.+foo)", matchers: models.Matchers{ { Type: models.MatchRegexp, @@ -216,7 +216,7 @@ func TestFetchQueryToM3Query(t *testing.T) { }, { name: "not regexp match dot star with trailing characters -> regex", - expected: "negation(regexp(t1, .*foo))", + expected: "negation(regexp(t1,.*foo))", matchers: models.Matchers{ { Type: models.MatchNotRegexp, @@ -227,7 +227,7 @@ func TestFetchQueryToM3Query(t *testing.T) { }, { name: "not regexp match dot plus with trailing characters -> regex", - expected: "negation(regexp(t1, .+foo))", + expected: "negation(regexp(t1,.+foo))", matchers: models.Matchers{ { Type: models.MatchNotRegexp, diff --git a/src/x/sync/fast_worker_pool.go b/src/x/sync/fast_worker_pool.go index 78b610599a..7544ca8ba8 100644 --- a/src/x/sync/fast_worker_pool.go +++ b/src/x/sync/fast_worker_pool.go @@ -72,3 +72,7 @@ func (p *fastWorkerPool) GoWithContext(ctx context.Context, work Work) ScheduleR func (p *fastWorkerPool) FastContextCheck(batchSize int) WorkerPool { return &fastWorkerPool{workerPool: p.workerPool, batchSize: batchSize} } + +func (p *fastWorkerPool) Size() int { + return p.workerPool.Size() +} diff --git a/src/x/sync/types.go b/src/x/sync/types.go index 6fa797a6cd..f964b790b9 100644 --- a/src/x/sync/types.go +++ b/src/x/sync/types.go @@ -119,6 +119,9 @@ type WorkerPool interface { // This should only be used for code that can guarantee the wait time for a worker is low since if the ctx is not // checked the calling goroutine blocks waiting for a worker. FastContextCheck(batchSize int) WorkerPool + + // Size returns the size of the worker pool. + Size() int } // ScheduleResult is the result of scheduling a goroutine in the worker pool. diff --git a/src/x/sync/worker_pool.go b/src/x/sync/worker_pool.go index 93fad1ec3a..885b1599b6 100644 --- a/src/x/sync/worker_pool.go +++ b/src/x/sync/worker_pool.go @@ -138,3 +138,7 @@ func (p *workerPool) GoWithContext(ctx context.Context, work Work) ScheduleResul func (p *workerPool) FastContextCheck(batchSize int) WorkerPool { return &fastWorkerPool{workerPool: p, batchSize: batchSize} } + +func (p *workerPool) Size() int { + return cap(p.workCh) +}