From 838288f9e201d7c9d9397cbb3af5e8bd6f134744 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Thu, 21 Feb 2019 13:00:14 -0500 Subject: [PATCH 01/18] Add lru cache for query conversion --- .../development/m3_stack/m3coordinator.yml | 4 + src/cmd/services/m3dbnode/config/cache.go | 2 +- src/cmd/services/m3query/config/config.go | 36 ++++++ src/query/server/server.go | 13 ++- src/query/storage/index.go | 37 +++++- src/query/storage/m3/storage.go | 16 ++- src/query/storage/query_conversion_lru.go | 110 ++++++++++++++++++ 7 files changed, 209 insertions(+), 9 deletions(-) create mode 100644 src/query/storage/query_conversion_lru.go diff --git a/scripts/development/m3_stack/m3coordinator.yml b/scripts/development/m3_stack/m3coordinator.yml index aa963b2982..f6236764d5 100644 --- a/scripts/development/m3_stack/m3coordinator.yml +++ b/scripts/development/m3_stack/m3coordinator.yml @@ -55,3 +55,7 @@ carbon: tagOptions: idScheme: quoted + +cache: + queryConversion: + size: 200 diff --git a/src/cmd/services/m3dbnode/config/cache.go b/src/cmd/services/m3dbnode/config/cache.go index d088a4c026..4731344f85 100644 --- a/src/cmd/services/m3dbnode/config/cache.go +++ b/src/cmd/services/m3dbnode/config/cache.go @@ -1,4 +1,4 @@ -// Copyright (c) 2017 Uber Technologies, Inc. +// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index fa2d5f5c8b..4100f92fef 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -53,6 +53,8 @@ const ( errNoIDGenerationScheme = "error: a recent breaking change means that an ID " + "generation scheme is required in coordinator configuration settings. " + "More information is available here: %s" + + defaultQueryConversionCacheSize = 100 ) var ( @@ -123,6 +125,9 @@ type Configuration struct { // LookbackDuration determines the lookback duration for queries LookbackDuration *time.Duration `yaml:"lookbackDuration"` + + // Cache configurations. + Cache CacheConfigurations `yaml:"cache"` } // Filter is a query filter type. @@ -146,6 +151,37 @@ type FilterConfiguration struct { CompleteTags Filter `yaml:"completeTags"` } +// CacheConfigurations is the cache configurations. +type CacheConfigurations struct { + // QueryConversion cache policy. + QueryConversion *QueryConversionCacheConfiguration `yaml:"queryConversion"` +} + +// QueryConversionCacheConfiguration is the query conversion cache configuration. +type QueryConversionCacheConfiguration struct { + Size *int `yaml:"size"` +} + +// QueryConversionCacheConfiguration returns the query conversion cache configuration +// or default if none is specified. +func (c CacheConfigurations) QueryConversionCacheConfiguration() QueryConversionCacheConfiguration { + if c.QueryConversion == nil { + return QueryConversionCacheConfiguration{} + } + + return *c.QueryConversion +} + +// SizeOrDefault returns the provided size or the default value is none is +// provided. +func (p *QueryConversionCacheConfiguration) SizeOrDefault() int { + if p.Size == nil { + return defaultQueryConversionCacheSize + } + + return *p.Size +} + // LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit. type LimitsConfiguration struct { MaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` diff --git a/src/query/server/server.go b/src/query/server/server.go index 81eecf661b..c4e1cfb2f5 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -608,13 +608,24 @@ func newStorages( ) (storage.Storage, cleanupFn, error) { cleanup := func() error { return nil } - localStorage := m3.NewStorage( + // Setup query conversion cache. + var ( + conversionCacheConfig = cfg.Cache.QueryConversionCacheConfiguration() + conversionCacheSize = conversionCacheConfig.SizeOrDefault() + ) + + localStorage, err := m3.NewStorage( clusters, readWorkerPool, writeWorkerPool, tagOptions, *cfg.LookbackDuration, + conversionCacheSize, ) + if err != nil { + return nil, nil, err + } + stores := []storage.Storage{localStorage} remoteEnabled := false if cfg.RPC != nil && cfg.RPC.Enabled { diff --git a/src/query/storage/index.go b/src/query/storage/index.go index b89b9bc5a9..ffd61814cf 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -90,9 +90,37 @@ func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery) } } +var ( + // byte representation for [1,2,3,4] + lookup = [4]byte{49, 50, 51, 52} +) + +func queryKey(m models.Matchers) []byte { + l := len(m) + for _, t := range m { + l += len(t.Name) + len(t.Value) + } + + key := make([]byte, l) + idx := 0 + for _, t := range m { + idx += copy(key[idx:], t.Name) + key[idx] = lookup[t.Type] + idx += copy(key[idx+1:], t.Value) + } + + return key +} + // FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query -func FetchQueryToM3Query(fetchQuery *FetchQuery) (index.Query, error) { +func FetchQueryToM3Query(fetchQuery *FetchQuery, lru *QueryConversionLRU) (index.Query, error) { matchers := fetchQuery.TagMatchers + k := queryKey(matchers) + + if val, ok := lru.Get(string(k)); ok { + return index.Query{Query: val}, nil + } + // Optimization for single matcher case. if len(matchers) == 1 { q, err := matcherToQuery(matchers[0]) @@ -100,19 +128,22 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery) (index.Query, error) { return index.Query{}, err } + lru.Add(string(k), q) return index.Query{Query: q}, nil } idxQueries := make([]idx.Query, len(matchers)) - var err error for i, matcher := range matchers { - idxQueries[i], err = matcherToQuery(matcher) + q, err := matcherToQuery(matcher) if err != nil { return index.Query{}, err } + + idxQueries[i] = q } q := idx.NewConjunctionQuery(idxQueries...) + lru.Add(string(k), q) return index.Query{Query: q}, nil } diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index d5a92992be..394b157e8b 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -62,6 +62,7 @@ type m3storage struct { writeWorkerPool xsync.PooledWorkerPool opts m3db.Options nowFn func() time.Time + conversionCache *storage.QueryConversionLRU } // NewStorage creates a new local m3storage instance. @@ -72,19 +73,26 @@ func NewStorage( writeWorkerPool xsync.PooledWorkerPool, tagOptions models.TagOptions, lookbackDuration time.Duration, -) Storage { + conversionCacheSize int, +) (Storage, error) { opts := m3db.NewOptions(). SetTagOptions(tagOptions). SetLookbackDuration(lookbackDuration). SetConsolidationFunc(consolidators.TakeLast) + conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize) + if err != nil { + return nil, err + } + return &m3storage{ clusters: clusters, readWorkerPool: readWorkerPool, writeWorkerPool: writeWorkerPool, opts: opts, nowFn: time.Now, - } + conversionCache: conversionLRU, + }, nil } func (s *m3storage) Fetch( @@ -200,7 +208,7 @@ func (s *m3storage) fetchCompressed( default: } - m3query, err := storage.FetchQueryToM3Query(query) + m3query, err := storage.FetchQueryToM3Query(query, s.conversionCache) if err != nil { return nil, err } @@ -370,7 +378,7 @@ func (s *m3storage) SearchCompressed( default: } - m3query, err := storage.FetchQueryToM3Query(query) + m3query, err := storage.FetchQueryToM3Query(query, s.conversionCache) if err != nil { return nil, noop, err } diff --git a/src/query/storage/query_conversion_lru.go b/src/query/storage/query_conversion_lru.go new file mode 100644 index 0000000000..5a1def1385 --- /dev/null +++ b/src/query/storage/query_conversion_lru.go @@ -0,0 +1,110 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "container/list" + "fmt" + "sync" + + "github.com/m3db/m3/src/m3ninx/idx" +) + +// QueryConversionLRU implements a non-thread safe fixed size LRU cache +type QueryConversionLRU struct { + sync.Mutex + + size int + evictList *list.List + items map[interface{}]*list.Element +} + +// entry is used to hold a value in the evictList +type entry struct { + key string + value idx.Query +} + +// NewQueryConversionLRU constructs an LRU of the given size +func NewQueryConversionLRU(size int) (*QueryConversionLRU, error) { + if size <= 0 { + return nil, fmt.Errorf("must provide a positive size, instead got: %d", size) + } + + c := &QueryConversionLRU{ + size: size, + evictList: list.New(), + items: make(map[interface{}]*list.Element), + } + + return c, nil +} + +// Add adds a value to the cache. Returns true if an eviction occurred. +func (c *QueryConversionLRU) Add(key string, value idx.Query) (evicted bool) { + // Check for existing item + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + ent.Value.(*entry).value = value + return false + } + + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[key] = entry + + evict := c.evictList.Len() > c.size + // Verify size not exceeded + if evict { + c.removeOldest() + } + + return evict +} + +// Get looks up a key's value from the cache. +func (c *QueryConversionLRU) Get(key string) (value idx.Query, ok bool) { + c.Lock() + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + c.Unlock() + return ent.Value.(*entry).value, true + } + + c.Unlock() + return +} + +// removeOldest removes the oldest item from the cache. +func (c *QueryConversionLRU) removeOldest() { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + } +} + +// removeElement is used to remove a given list element from the cache +func (c *QueryConversionLRU) removeElement(e *list.Element) { + c.evictList.Remove(e) + kv := e.Value.(*entry) + delete(c.items, kv.key) +} From af5fc9cc9969ebdaf74751253c7d7b973d3a43c5 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Thu, 21 Feb 2019 13:11:26 -0500 Subject: [PATCH 02/18] Spelling --- src/cmd/services/m3query/config/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 4100f92fef..c04d6743f7 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -174,12 +174,12 @@ func (c CacheConfigurations) QueryConversionCacheConfiguration() QueryConversion // SizeOrDefault returns the provided size or the default value is none is // provided. -func (p *QueryConversionCacheConfiguration) SizeOrDefault() int { - if p.Size == nil { +func (q *QueryConversionCacheConfiguration) SizeOrDefault() int { + if q.Size == nil { return defaultQueryConversionCacheSize } - return *p.Size + return *q.Size } // LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit. From cecd94ee4cf37e1aa397710beac360f06c7dda84 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Thu, 21 Feb 2019 13:17:25 -0500 Subject: [PATCH 03/18] Add locks to add --- src/query/storage/index.go | 6 +++--- src/query/storage/query_conversion_lru.go | 13 ++++++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/query/storage/index.go b/src/query/storage/index.go index ffd61814cf..d864a239ad 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -117,7 +117,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, lru *QueryConversionLRU) (index matchers := fetchQuery.TagMatchers k := queryKey(matchers) - if val, ok := lru.Get(string(k)); ok { + if val, ok := lru.GetWithLock(string(k)); ok { return index.Query{Query: val}, nil } @@ -128,7 +128,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, lru *QueryConversionLRU) (index return index.Query{}, err } - lru.Add(string(k), q) + lru.AddWithLock(string(k), q) return index.Query{Query: q}, nil } @@ -143,7 +143,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, lru *QueryConversionLRU) (index } q := idx.NewConjunctionQuery(idxQueries...) - lru.Add(string(k), q) + lru.AddWithLock(string(k), q) return index.Query{Query: q}, nil } diff --git a/src/query/storage/query_conversion_lru.go b/src/query/storage/query_conversion_lru.go index 5a1def1385..843a86fa20 100644 --- a/src/query/storage/query_conversion_lru.go +++ b/src/query/storage/query_conversion_lru.go @@ -28,7 +28,7 @@ import ( "github.com/m3db/m3/src/m3ninx/idx" ) -// QueryConversionLRU implements a non-thread safe fixed size LRU cache +// QueryConversionLRU implements a fixed size LRU cache type QueryConversionLRU struct { sync.Mutex @@ -58,12 +58,14 @@ func NewQueryConversionLRU(size int) (*QueryConversionLRU, error) { return c, nil } -// Add adds a value to the cache. Returns true if an eviction occurred. -func (c *QueryConversionLRU) Add(key string, value idx.Query) (evicted bool) { +// AddWithLock adds a value to the cache. Returns true if an eviction occurred. +func (c *QueryConversionLRU) AddWithLock(key string, value idx.Query) (evicted bool) { // Check for existing item + c.Lock() if ent, ok := c.items[key]; ok { c.evictList.MoveToFront(ent) ent.Value.(*entry).value = value + c.Unlock() return false } @@ -78,11 +80,12 @@ func (c *QueryConversionLRU) Add(key string, value idx.Query) (evicted bool) { c.removeOldest() } + c.Unlock() return evict } -// Get looks up a key's value from the cache. -func (c *QueryConversionLRU) Get(key string) (value idx.Query, ok bool) { +// GetWithLock looks up a key's value from the cache. +func (c *QueryConversionLRU) GetWithLock(key string) (value idx.Query, ok bool) { c.Lock() if ent, ok := c.items[key]; ok { c.evictList.MoveToFront(ent) From 69981c776cd0ce536c9e80c4c18163b0f7b9e5a0 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Thu, 21 Feb 2019 13:20:31 -0500 Subject: [PATCH 04/18] Small fix --- src/query/storage/index.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/query/storage/index.go b/src/query/storage/index.go index d864a239ad..c2514c945e 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -133,13 +133,12 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, lru *QueryConversionLRU) (index } idxQueries := make([]idx.Query, len(matchers)) + var err error for i, matcher := range matchers { - q, err := matcherToQuery(matcher) + idxQueries[i], err = matcherToQuery(matcher) if err != nil { return index.Query{}, err } - - idxQueries[i] = q } q := idx.NewConjunctionQuery(idxQueries...) From cde10405f74ee454a476f4c94e97f82abef728d1 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Fri, 22 Feb 2019 11:59:11 -0500 Subject: [PATCH 05/18] Address comments --- src/query/storage/index.go | 22 ++++++++-- src/query/storage/index_test.go | 9 +++- src/query/storage/m3/storage.go | 4 +- src/query/storage/query_conversion_lru.go | 21 +++------ .../storage/query_conversion_lru_test.go | 44 +++++++++++++++++++ 5 files changed, 78 insertions(+), 22 deletions(-) create mode 100644 src/query/storage/query_conversion_lru_test.go diff --git a/src/query/storage/index.go b/src/query/storage/index.go index c2514c945e..b142df309d 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -22,6 +22,7 @@ package storage import ( "fmt" + "sync" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/m3ninx/idx" @@ -29,6 +30,13 @@ import ( "github.com/m3db/m3x/ident" ) +// QueryConvserionCache represents the query conversion LRU cache +type QueryConvserionCache struct { + mu sync.RWMutex + + LRU *QueryConversionLRU +} + // FromM3IdentToMetric converts an M3 ident metric to a coordinator metric func FromM3IdentToMetric( identID ident.ID, @@ -113,11 +121,15 @@ func queryKey(m models.Matchers) []byte { } // FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query -func FetchQueryToM3Query(fetchQuery *FetchQuery, lru *QueryConversionLRU) (index.Query, error) { +func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConvserionCache) (index.Query, error) { matchers := fetchQuery.TagMatchers k := queryKey(matchers) - if val, ok := lru.GetWithLock(string(k)); ok { + cache.mu.Lock() + defer cache.mu.Unlock() + + if val, ok := cache.LRU.Get(string(k)); ok { + fmt.Println("get") return index.Query{Query: val}, nil } @@ -128,7 +140,8 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, lru *QueryConversionLRU) (index return index.Query{}, err } - lru.AddWithLock(string(k), q) + fmt.Println("add") + cache.LRU.Add(string(k), q) return index.Query{Query: q}, nil } @@ -141,8 +154,9 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, lru *QueryConversionLRU) (index } } + fmt.Println("adding") q := idx.NewConjunctionQuery(idxQueries...) - lru.AddWithLock(string(k), q) + cache.LRU.Add(string(k), q) return index.Query{Query: q}, nil } diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index f5815aec69..8f447fe7d2 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -132,6 +132,13 @@ func TestFetchQueryToM3Query(t *testing.T) { }, } + lru, err := NewQueryConversionLRU(10) + require.NoError(t, err) + + cache := &QueryConvserionCache{ + LRU: lru, + } + for _, test := range tests { t.Run(test.name, func(t *testing.T) { fetchQuery := &FetchQuery{ @@ -142,7 +149,7 @@ func TestFetchQueryToM3Query(t *testing.T) { Interval: 15 * time.Second, } - m3Query, err := FetchQueryToM3Query(fetchQuery) + m3Query, err := FetchQueryToM3Query(fetchQuery, cache) require.NoError(t, err) assert.Equal(t, test.expected, m3Query.String()) }) diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index 394b157e8b..89016b3f1d 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -62,7 +62,7 @@ type m3storage struct { writeWorkerPool xsync.PooledWorkerPool opts m3db.Options nowFn func() time.Time - conversionCache *storage.QueryConversionLRU + conversionCache *storage.QueryConvserionCache } // NewStorage creates a new local m3storage instance. @@ -91,7 +91,7 @@ func NewStorage( writeWorkerPool: writeWorkerPool, opts: opts, nowFn: time.Now, - conversionCache: conversionLRU, + conversionCache: &storage.QueryConvserionCache{LRU: conversionLRU}, }, nil } diff --git a/src/query/storage/query_conversion_lru.go b/src/query/storage/query_conversion_lru.go index 843a86fa20..d6870317f6 100644 --- a/src/query/storage/query_conversion_lru.go +++ b/src/query/storage/query_conversion_lru.go @@ -23,18 +23,15 @@ package storage import ( "container/list" "fmt" - "sync" "github.com/m3db/m3/src/m3ninx/idx" ) // QueryConversionLRU implements a fixed size LRU cache type QueryConversionLRU struct { - sync.Mutex - size int evictList *list.List - items map[interface{}]*list.Element + items map[string]*list.Element } // entry is used to hold a value in the evictList @@ -52,20 +49,18 @@ func NewQueryConversionLRU(size int) (*QueryConversionLRU, error) { c := &QueryConversionLRU{ size: size, evictList: list.New(), - items: make(map[interface{}]*list.Element), + items: make(map[string]*list.Element, size), } return c, nil } -// AddWithLock adds a value to the cache. Returns true if an eviction occurred. -func (c *QueryConversionLRU) AddWithLock(key string, value idx.Query) (evicted bool) { +// Add adds a value to the cache. Returns true if an eviction occurred. +func (c *QueryConversionLRU) Add(key string, value idx.Query) (evicted bool) { // Check for existing item - c.Lock() if ent, ok := c.items[key]; ok { c.evictList.MoveToFront(ent) ent.Value.(*entry).value = value - c.Unlock() return false } @@ -80,20 +75,16 @@ func (c *QueryConversionLRU) AddWithLock(key string, value idx.Query) (evicted b c.removeOldest() } - c.Unlock() return evict } -// GetWithLock looks up a key's value from the cache. -func (c *QueryConversionLRU) GetWithLock(key string) (value idx.Query, ok bool) { - c.Lock() +// Get looks up a key's value from the cache. +func (c *QueryConversionLRU) Get(key string) (value idx.Query, ok bool) { if ent, ok := c.items[key]; ok { c.evictList.MoveToFront(ent) - c.Unlock() return ent.Value.(*entry).value, true } - c.Unlock() return } diff --git a/src/query/storage/query_conversion_lru_test.go b/src/query/storage/query_conversion_lru_test.go new file mode 100644 index 0000000000..664ad23ce4 --- /dev/null +++ b/src/query/storage/query_conversion_lru_test.go @@ -0,0 +1,44 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "testing" + + "github.com/m3db/m3ninx/idx" + + "github.com/stretchr/testify/require" +) + +func generateTestData() map[string]idx.Query { + return map[string]idx.Query{ + "a": idx.NewTermQuery([]byte("foo"), []byte("bar")), + } +} + +func TestLRU(t *testing.T) { + lru, err := NewQueryConversionLRU(5) + require.NoError(t, err) + + lru.Add("a", idx.NewTermQuery([]byte("foo"), []byte("bar"))) + _, ok := lru.Get("a") + require.True(t, ok) +} From 74541b6fb2793556983b3a290a3b331a86ba47ef Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Fri, 22 Feb 2019 12:33:49 -0500 Subject: [PATCH 06/18] Fix test --- src/query/storage/m3/storage_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index bce35f0842..67a7596cb7 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -122,7 +122,8 @@ func newTestStorage(t *testing.T, clusters Clusters) storage.Storage { require.NoError(t, err) writePool.Init() opts := models.NewTagOptions().SetMetricName([]byte("name")) - storage := NewStorage(clusters, nil, writePool, opts, time.Minute) + storage, err := NewStorage(clusters, nil, writePool, opts, time.Minute, 100) + require.NoError(t, err) return storage } From 74b1a9d6ea0830a4fabacb2a46cc970f33fe58f8 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Fri, 22 Feb 2019 13:22:25 -0500 Subject: [PATCH 07/18] Write tests --- .../storage/query_conversion_lru_test.go | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/src/query/storage/query_conversion_lru_test.go b/src/query/storage/query_conversion_lru_test.go index 664ad23ce4..4e64a3d7d6 100644 --- a/src/query/storage/query_conversion_lru_test.go +++ b/src/query/storage/query_conversion_lru_test.go @@ -23,22 +23,39 @@ package storage import ( "testing" - "github.com/m3db/m3ninx/idx" + "github.com/m3db/m3/src/m3ninx/idx" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func generateTestData() map[string]idx.Query { - return map[string]idx.Query{ - "a": idx.NewTermQuery([]byte("foo"), []byte("bar")), - } -} - func TestLRU(t *testing.T) { lru, err := NewQueryConversionLRU(5) require.NoError(t, err) + // test add and get lru.Add("a", idx.NewTermQuery([]byte("foo"), []byte("bar"))) - _, ok := lru.Get("a") + lru.Add("b", idx.NewTermQuery([]byte("biz"), []byte("baz"))) + + var ( + ok bool + q idx.Query + ) + + q, ok = lru.Get("a") require.True(t, ok) + assert.Equal(t, "term(foo, bar)", q.String()) + q, ok = lru.Get("b") + require.True(t, ok) + assert.Equal(t, "term(biz, baz)", q.String()) + + // fill up the cache + lru.Add("c", idx.NewTermQuery([]byte("bar"), []byte("foo"))) + lru.Add("d", idx.NewTermQuery([]byte("baz"), []byte("biz"))) + lru.Add("e", idx.NewTermQuery([]byte("qux"), []byte("quz"))) + lru.Add("f", idx.NewTermQuery([]byte("quz"), []byte("qux"))) + + // make sure "a" is no longer in the cache + _, ok = lru.Get("a") + require.False(t, ok) } From ece218f08508bd873bbed7032ff300d8a0964b5e Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Fri, 22 Feb 2019 13:23:03 -0500 Subject: [PATCH 08/18] Remove prints --- src/query/storage/index.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/query/storage/index.go b/src/query/storage/index.go index b142df309d..a8a378d856 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -129,7 +129,6 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConvserionCache) (i defer cache.mu.Unlock() if val, ok := cache.LRU.Get(string(k)); ok { - fmt.Println("get") return index.Query{Query: val}, nil } @@ -140,7 +139,6 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConvserionCache) (i return index.Query{}, err } - fmt.Println("add") cache.LRU.Add(string(k), q) return index.Query{Query: q}, nil } @@ -154,7 +152,6 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConvserionCache) (i } } - fmt.Println("adding") q := idx.NewConjunctionQuery(idxQueries...) cache.LRU.Add(string(k), q) return index.Query{Query: q}, nil From ac0f410cd65c5aa61d8df5a5d0e1f6c79dc8e99f Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Fri, 22 Feb 2019 19:18:58 -0500 Subject: [PATCH 09/18] Address comments --- src/query/server/server.go | 9 ++++- src/query/storage/index.go | 21 +++++++---- src/query/storage/index_test.go | 14 ++++++- src/query/storage/m3/storage.go | 15 ++++---- src/query/storage/m3/storage_test.go | 4 +- src/query/storage/query_conversion_lru.go | 16 ++++---- .../storage/query_conversion_lru_test.go | 37 ++++++++++++------- 7 files changed, 77 insertions(+), 39 deletions(-) diff --git a/src/query/server/server.go b/src/query/server/server.go index c4e1cfb2f5..2cfdf9736b 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -64,7 +64,7 @@ import ( xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/uber-go/tally" "go.uber.org/zap" @@ -614,13 +614,18 @@ func newStorages( conversionCacheSize = conversionCacheConfig.SizeOrDefault() ) + conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize) + if err != nil { + return nil, nil, err + } + localStorage, err := m3.NewStorage( clusters, readWorkerPool, writeWorkerPool, tagOptions, *cfg.LookbackDuration, - conversionCacheSize, + storage.NewQueryConversionCache(conversionLRU), ) if err != nil { return nil, nil, err diff --git a/src/query/storage/index.go b/src/query/storage/index.go index a8a378d856..a00968e962 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -30,13 +30,20 @@ import ( "github.com/m3db/m3x/ident" ) -// QueryConvserionCache represents the query conversion LRU cache -type QueryConvserionCache struct { - mu sync.RWMutex +// QueryConversionCache represents the query conversion LRU cache +type QueryConversionCache struct { + mu sync.Mutex LRU *QueryConversionLRU } +// NewQueryConversionCache creates a new QueryConversionCache with a provided LRU cache +func NewQueryConversionCache(lru *QueryConversionLRU) *QueryConversionCache { + return &QueryConversionCache{ + LRU: lru, + } +} + // FromM3IdentToMetric converts an M3 ident metric to a coordinator metric func FromM3IdentToMetric( identID ident.ID, @@ -121,14 +128,14 @@ func queryKey(m models.Matchers) []byte { } // FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query -func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConvserionCache) (index.Query, error) { +func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (index.Query, error) { matchers := fetchQuery.TagMatchers k := queryKey(matchers) cache.mu.Lock() defer cache.mu.Unlock() - if val, ok := cache.LRU.Get(string(k)); ok { + if val, ok := cache.LRU.Get(k); ok { return index.Query{Query: val}, nil } @@ -139,7 +146,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConvserionCache) (i return index.Query{}, err } - cache.LRU.Add(string(k), q) + cache.LRU.Set(k, q) return index.Query{Query: q}, nil } @@ -153,7 +160,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConvserionCache) (i } q := idx.NewConjunctionQuery(idxQueries...) - cache.LRU.Add(string(k), q) + cache.LRU.Set(k, q) return index.Query{Query: q}, nil } diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index 8f447fe7d2..75ebcbfcb6 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -135,7 +135,7 @@ func TestFetchQueryToM3Query(t *testing.T) { lru, err := NewQueryConversionLRU(10) require.NoError(t, err) - cache := &QueryConvserionCache{ + cache := &QueryConversionCache{ LRU: lru, } @@ -154,5 +154,17 @@ func TestFetchQueryToM3Query(t *testing.T) { assert.Equal(t, test.expected, m3Query.String()) }) } +} + +func TestQueryKey(t *testing.T) { + matchers := models.Matchers{ + { + Type: models.MatchEqual, + Name: []byte("t1"), + Value: []byte("v1"), + }, + } + keyByte := queryKey(matchers) + assert.Equal(t, []byte("t11v1"), keyByte) } diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index 89016b3f1d..d39d99a9b6 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -62,7 +62,7 @@ type m3storage struct { writeWorkerPool xsync.PooledWorkerPool opts m3db.Options nowFn func() time.Time - conversionCache *storage.QueryConvserionCache + conversionCache *storage.QueryConversionCache } // NewStorage creates a new local m3storage instance. @@ -73,17 +73,17 @@ func NewStorage( writeWorkerPool xsync.PooledWorkerPool, tagOptions models.TagOptions, lookbackDuration time.Duration, - conversionCacheSize int, + queryConversionCache *storage.QueryConversionCache, ) (Storage, error) { opts := m3db.NewOptions(). SetTagOptions(tagOptions). SetLookbackDuration(lookbackDuration). SetConsolidationFunc(consolidators.TakeLast) - conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize) - if err != nil { - return nil, err - } + // conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize) + // if err != nil { + // return nil, err + // } return &m3storage{ clusters: clusters, @@ -91,7 +91,8 @@ func NewStorage( writeWorkerPool: writeWorkerPool, opts: opts, nowFn: time.Now, - conversionCache: &storage.QueryConvserionCache{LRU: conversionLRU}, + // conversionCache: &storage.QueryConversionCache{LRU: conversionLRU}, + conversionCache: queryConversionCache, }, nil } diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index 67a7596cb7..e38f8c7075 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -122,7 +122,9 @@ func newTestStorage(t *testing.T, clusters Clusters) storage.Storage { require.NoError(t, err) writePool.Init() opts := models.NewTagOptions().SetMetricName([]byte("name")) - storage, err := NewStorage(clusters, nil, writePool, opts, time.Minute, 100) + queryCache, err := storage.NewQueryConversionLRU(100) + require.NoError(t, err) + storage, err := NewStorage(clusters, nil, writePool, opts, time.Minute, storage.NewQueryConversionCache(queryCache)) require.NoError(t, err) return storage } diff --git a/src/query/storage/query_conversion_lru.go b/src/query/storage/query_conversion_lru.go index d6870317f6..f1358ba16d 100644 --- a/src/query/storage/query_conversion_lru.go +++ b/src/query/storage/query_conversion_lru.go @@ -36,7 +36,7 @@ type QueryConversionLRU struct { // entry is used to hold a value in the evictList type entry struct { - key string + key []byte value idx.Query } @@ -55,10 +55,10 @@ func NewQueryConversionLRU(size int) (*QueryConversionLRU, error) { return c, nil } -// Add adds a value to the cache. Returns true if an eviction occurred. -func (c *QueryConversionLRU) Add(key string, value idx.Query) (evicted bool) { +// Set adds a value to the cache. Returns true if an eviction occurred. +func (c *QueryConversionLRU) Set(key []byte, value idx.Query) (evicted bool) { // Check for existing item - if ent, ok := c.items[key]; ok { + if ent, ok := c.items[string(key)]; ok { c.evictList.MoveToFront(ent) ent.Value.(*entry).value = value return false @@ -67,7 +67,7 @@ func (c *QueryConversionLRU) Add(key string, value idx.Query) (evicted bool) { // Add new item ent := &entry{key, value} entry := c.evictList.PushFront(ent) - c.items[key] = entry + c.items[string(key)] = entry evict := c.evictList.Len() > c.size // Verify size not exceeded @@ -79,8 +79,8 @@ func (c *QueryConversionLRU) Add(key string, value idx.Query) (evicted bool) { } // Get looks up a key's value from the cache. -func (c *QueryConversionLRU) Get(key string) (value idx.Query, ok bool) { - if ent, ok := c.items[key]; ok { +func (c *QueryConversionLRU) Get(key []byte) (value idx.Query, ok bool) { + if ent, ok := c.items[string(key)]; ok { c.evictList.MoveToFront(ent) return ent.Value.(*entry).value, true } @@ -100,5 +100,5 @@ func (c *QueryConversionLRU) removeOldest() { func (c *QueryConversionLRU) removeElement(e *list.Element) { c.evictList.Remove(e) kv := e.Value.(*entry) - delete(c.items, kv.key) + delete(c.items, string(kv.key)) } diff --git a/src/query/storage/query_conversion_lru_test.go b/src/query/storage/query_conversion_lru_test.go index 4e64a3d7d6..0bb7131385 100644 --- a/src/query/storage/query_conversion_lru_test.go +++ b/src/query/storage/query_conversion_lru_test.go @@ -33,29 +33,40 @@ func TestLRU(t *testing.T) { lru, err := NewQueryConversionLRU(5) require.NoError(t, err) - // test add and get - lru.Add("a", idx.NewTermQuery([]byte("foo"), []byte("bar"))) - lru.Add("b", idx.NewTermQuery([]byte("biz"), []byte("baz"))) - var ( - ok bool - q idx.Query + ok bool + evicted bool + q idx.Query ) - q, ok = lru.Get("a") + // test add and get + evicted = lru.Set([]byte("a"), idx.NewTermQuery([]byte("foo"), []byte("bar"))) + require.False(t, evicted) + evicted = lru.Set([]byte("b"), idx.NewTermQuery([]byte("biz"), []byte("baz"))) + require.False(t, evicted) + + q, ok = lru.Get([]byte("a")) require.True(t, ok) assert.Equal(t, "term(foo, bar)", q.String()) - q, ok = lru.Get("b") + q, ok = lru.Get([]byte("b")) require.True(t, ok) assert.Equal(t, "term(biz, baz)", q.String()) // fill up the cache - lru.Add("c", idx.NewTermQuery([]byte("bar"), []byte("foo"))) - lru.Add("d", idx.NewTermQuery([]byte("baz"), []byte("biz"))) - lru.Add("e", idx.NewTermQuery([]byte("qux"), []byte("quz"))) - lru.Add("f", idx.NewTermQuery([]byte("quz"), []byte("qux"))) + evicted = lru.Set([]byte("c"), idx.NewTermQuery([]byte("bar"), []byte("foo"))) + require.False(t, evicted) + evicted = lru.Set([]byte("d"), idx.NewTermQuery([]byte("baz"), []byte("biz"))) + require.False(t, evicted) + evicted = lru.Set([]byte("e"), idx.NewTermQuery([]byte("qux"), []byte("quz"))) + require.False(t, evicted) + evicted = lru.Set([]byte("f"), idx.NewTermQuery([]byte("quz"), []byte("qux"))) + require.True(t, evicted) // make sure "a" is no longer in the cache - _, ok = lru.Get("a") + _, ok = lru.Get([]byte("a")) require.False(t, ok) + + // make sure "b" is still in the cache + _, ok = lru.Get([]byte("b")) + require.True(t, ok) } From d8ebfb9f04ad440bc33b846e578a785833f348fb Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Mon, 25 Feb 2019 11:53:28 -0500 Subject: [PATCH 10/18] Address more comments --- scripts/development/m3_stack/m3coordinator.yml | 4 ---- src/cmd/services/m3query/config/config.go | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/scripts/development/m3_stack/m3coordinator.yml b/scripts/development/m3_stack/m3coordinator.yml index f6236764d5..aa963b2982 100644 --- a/scripts/development/m3_stack/m3coordinator.yml +++ b/scripts/development/m3_stack/m3coordinator.yml @@ -55,7 +55,3 @@ carbon: tagOptions: idScheme: quoted - -cache: - queryConversion: - size: 200 diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index c04d6743f7..9a285f6854 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -54,7 +54,7 @@ const ( "generation scheme is required in coordinator configuration settings. " + "More information is available here: %s" - defaultQueryConversionCacheSize = 100 + defaultQueryConversionCacheSize = 4096 ) var ( From 8631ef77825349209d3d2ed7d56d689fb27397c2 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Mon, 25 Feb 2019 11:58:33 -0500 Subject: [PATCH 11/18] Fix test --- src/query/test/m3/test_storage.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/query/test/m3/test_storage.go b/src/query/test/m3/test_storage.go index 22af2fe2bc..e2277f3e5d 100644 --- a/src/query/test/m3/test_storage.go +++ b/src/query/test/m3/test_storage.go @@ -64,7 +64,10 @@ func NewStorageAndSession( require.NoError(t, err) writePool.Init() tagOptions := models.NewTagOptions().SetMetricName([]byte("name")) - storage := m3.NewStorage(clusters, nil, writePool, tagOptions, defaultLookbackDuration) + queryCache, err := storage.NewQueryConversionLRU(100) + require.NoError(t, err) + storage, err := m3.NewStorage(clusters, nil, writePool, tagOptions, defaultLookbackDuration, storage.NewQueryConversionCache(queryCache)) + require.NoError(t, err) return storage, session } @@ -91,6 +94,9 @@ func NewStorageAndSessionWithAggregatedNamespaces( require.NoError(t, err) writePool.Init() tagOptions := models.NewTagOptions().SetMetricName([]byte("name")) - storage := m3.NewStorage(clusters, nil, writePool, tagOptions, defaultLookbackDuration) + queryCache, err := storage.NewQueryConversionLRU(100) + require.NoError(t, err) + storage, err := m3.NewStorage(clusters, nil, writePool, tagOptions, defaultLookbackDuration, storage.NewQueryConversionCache(queryCache)) + require.NoError(t, err) return storage, session } From 0b5ab747b69347e996478f2463ff74bb4698db58 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Mon, 25 Feb 2019 13:01:28 -0500 Subject: [PATCH 12/18] Address more comments --- src/cmd/services/m3dbnode/config/cache.go | 2 +- src/cmd/services/m3query/config/config.go | 19 +++++++++++-------- src/query/server/server.go | 6 +++++- src/query/storage/index.go | 14 +++++++++++--- src/query/storage/m3/storage.go | 6 ------ 5 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/cmd/services/m3dbnode/config/cache.go b/src/cmd/services/m3dbnode/config/cache.go index 4731344f85..d088a4c026 100644 --- a/src/cmd/services/m3dbnode/config/cache.go +++ b/src/cmd/services/m3dbnode/config/cache.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 9a285f6854..0276b2e9d9 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -151,8 +151,8 @@ type FilterConfiguration struct { CompleteTags Filter `yaml:"completeTags"` } -// CacheConfigurations is the cache configurations. -type CacheConfigurations struct { +// CacheConfiguration is the cache configurations. +type CacheConfiguration struct { // QueryConversion cache policy. QueryConversion *QueryConversionCacheConfiguration `yaml:"queryConversion"` } @@ -164,7 +164,7 @@ type QueryConversionCacheConfiguration struct { // QueryConversionCacheConfiguration returns the query conversion cache configuration // or default if none is specified. -func (c CacheConfigurations) QueryConversionCacheConfiguration() QueryConversionCacheConfiguration { +func (c CacheConfiguration) QueryConversionCacheConfiguration() QueryConversionCacheConfiguration { if c.QueryConversion == nil { return QueryConversionCacheConfiguration{} } @@ -174,12 +174,15 @@ func (c CacheConfigurations) QueryConversionCacheConfiguration() QueryConversion // SizeOrDefault returns the provided size or the default value is none is // provided. -func (q *QueryConversionCacheConfiguration) SizeOrDefault() int { - if q.Size == nil { - return defaultQueryConversionCacheSize +func (q *QueryConversionCacheConfiguration) SizeOrDefault() (int, error) { + switch { + case q.Size == nil: + return defaultQueryConversionCacheSize, nil + case *q.Size <= 0: + return -1, fmt.Errorf("must provide a positive size for query conversion config, instead got: %d", *q.Size) + default: + return *q.Size, nil } - - return *q.Size } // LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit. diff --git a/src/query/server/server.go b/src/query/server/server.go index 2cfdf9736b..2439e3aa64 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -611,9 +611,13 @@ func newStorages( // Setup query conversion cache. var ( conversionCacheConfig = cfg.Cache.QueryConversionCacheConfiguration() - conversionCacheSize = conversionCacheConfig.SizeOrDefault() ) + conversionCacheSize, err := conversionCacheConfig.SizeOrDefault() + if err != nil { + return nil, nil, err + } + conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize) if err != nil { return nil, nil, err diff --git a/src/query/storage/index.go b/src/query/storage/index.go index a00968e962..367774e607 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -44,6 +44,14 @@ func NewQueryConversionCache(lru *QueryConversionLRU) *QueryConversionCache { } } +func (q *QueryConversionCache) set(k []byte, v idx.Query) bool { + return q.LRU.Set(k, v) +} + +func (q *QueryConversionCache) get(k []byte) (idx.Query, bool) { + return q.LRU.Get(k) +} + // FromM3IdentToMetric converts an M3 ident metric to a coordinator metric func FromM3IdentToMetric( identID ident.ID, @@ -135,7 +143,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (i cache.mu.Lock() defer cache.mu.Unlock() - if val, ok := cache.LRU.Get(k); ok { + if val, ok := cache.get(k); ok { return index.Query{Query: val}, nil } @@ -146,7 +154,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (i return index.Query{}, err } - cache.LRU.Set(k, q) + cache.set(k, q) return index.Query{Query: q}, nil } @@ -160,7 +168,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (i } q := idx.NewConjunctionQuery(idxQueries...) - cache.LRU.Set(k, q) + cache.set(k, q) return index.Query{Query: q}, nil } diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index d39d99a9b6..5627447e30 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -80,18 +80,12 @@ func NewStorage( SetLookbackDuration(lookbackDuration). SetConsolidationFunc(consolidators.TakeLast) - // conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize) - // if err != nil { - // return nil, err - // } - return &m3storage{ clusters: clusters, readWorkerPool: readWorkerPool, writeWorkerPool: writeWorkerPool, opts: opts, nowFn: time.Now, - // conversionCache: &storage.QueryConversionCache{LRU: conversionLRU}, conversionCache: queryConversionCache, }, nil } From d1d8aa7619f04b2a91fbf4a2ab8efcc36431877d Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Mon, 25 Feb 2019 13:09:37 -0500 Subject: [PATCH 13/18] Fix test --- src/cmd/services/m3query/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 0276b2e9d9..ef1cab4443 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -127,7 +127,7 @@ type Configuration struct { LookbackDuration *time.Duration `yaml:"lookbackDuration"` // Cache configurations. - Cache CacheConfigurations `yaml:"cache"` + Cache CacheConfiguration `yaml:"cache"` } // Filter is a query filter type. From ed654fc3780936a96e8aaae4306471c8a4d78243 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Mon, 25 Feb 2019 17:08:16 -0500 Subject: [PATCH 14/18] Address comments --- src/cmd/services/m3query/config/config.go | 22 ++++--- .../services/m3query/config/config_test.go | 10 +++ src/query/server/server.go | 4 +- src/query/storage/index.go | 8 +-- src/query/storage/index_test.go | 61 ++++++++++++++++--- .../storage/query_conversion_lru_test.go | 3 +- 6 files changed, 86 insertions(+), 22 deletions(-) diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index ef1cab4443..579fbf4778 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -174,15 +174,21 @@ func (c CacheConfiguration) QueryConversionCacheConfiguration() QueryConversionC // SizeOrDefault returns the provided size or the default value is none is // provided. -func (q *QueryConversionCacheConfiguration) SizeOrDefault() (int, error) { - switch { - case q.Size == nil: - return defaultQueryConversionCacheSize, nil - case *q.Size <= 0: - return -1, fmt.Errorf("must provide a positive size for query conversion config, instead got: %d", *q.Size) - default: - return *q.Size, nil +func (q *QueryConversionCacheConfiguration) SizeOrDefault() int { + if q.Size == nil { + return defaultQueryConversionCacheSize } + + return *q.Size +} + +// Validate validates the QueryConversionCacheConfiguration settings. +func (q *QueryConversionCacheConfiguration) Validate() error { + if *q.Size <= 0 { + return fmt.Errorf("must provide a positive size for query conversion config, instead got: %d", *q.Size) + } + + return nil } // LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit. diff --git a/src/cmd/services/m3query/config/config_test.go b/src/cmd/services/m3query/config/config_test.go index 8d7c2f8d94..8d3e50bbed 100644 --- a/src/cmd/services/m3query/config/config_test.go +++ b/src/cmd/services/m3query/config/config_test.go @@ -162,3 +162,13 @@ func TestTagOptionsConfig(t *testing.T) { assert.Equal(t, []byte("foo"), opts.BucketName()) assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType()) } + +func TestNegativeQueryConversionSize(t *testing.T) { + size := -2 + q := QueryConversionCacheConfiguration{ + Size: &size, + } + + err := q.Validate() + require.Error(t, err) +} diff --git a/src/query/server/server.go b/src/query/server/server.go index 2439e3aa64..d95b2ae995 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -611,10 +611,10 @@ func newStorages( // Setup query conversion cache. var ( conversionCacheConfig = cfg.Cache.QueryConversionCacheConfiguration() + conversionCacheSize = conversionCacheConfig.SizeOrDefault() ) - conversionCacheSize, err := conversionCacheConfig.SizeOrDefault() - if err != nil { + if err := conversionCacheConfig.Validate(); err != nil { return nil, nil, err } diff --git a/src/query/storage/index.go b/src/query/storage/index.go index 367774e607..91f882978f 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -34,22 +34,22 @@ import ( type QueryConversionCache struct { mu sync.Mutex - LRU *QueryConversionLRU + lru *QueryConversionLRU } // NewQueryConversionCache creates a new QueryConversionCache with a provided LRU cache func NewQueryConversionCache(lru *QueryConversionLRU) *QueryConversionCache { return &QueryConversionCache{ - LRU: lru, + lru: lru, } } func (q *QueryConversionCache) set(k []byte, v idx.Query) bool { - return q.LRU.Set(k, v) + return q.lru.Set(k, v) } func (q *QueryConversionCache) get(k []byte) (idx.Query, bool) { - return q.LRU.Get(k) + return q.lru.Get(k) } // FromM3IdentToMetric converts an M3 ident metric to a coordinator metric diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index 75ebcbfcb6..e3ae5ef3ea 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -136,7 +136,7 @@ func TestFetchQueryToM3Query(t *testing.T) { require.NoError(t, err) cache := &QueryConversionCache{ - LRU: lru, + lru: lru, } for _, test := range tests { @@ -157,14 +157,61 @@ func TestFetchQueryToM3Query(t *testing.T) { } func TestQueryKey(t *testing.T) { - matchers := models.Matchers{ + tests := []struct { + name string + expected string + matchers models.Matchers + }{ + { + name: "exact match", + expected: "t11v1", + matchers: models.Matchers{ + { + Type: models.MatchEqual, + Name: []byte("t1"), + Value: []byte("v1"), + }, + }, + }, { - Type: models.MatchEqual, - Name: []byte("t1"), - Value: []byte("v1"), + name: "exact match negated", + expected: "t12v1", + matchers: models.Matchers{ + { + Type: models.MatchNotEqual, + Name: []byte("t1"), + Value: []byte("v1"), + }, + }, + }, + { + name: "regexp match", + expected: "t13v1", + matchers: models.Matchers{ + { + Type: models.MatchRegexp, + Name: []byte("t1"), + Value: []byte("v1"), + }, + }, + }, + { + name: "regexp match negated", + expected: "t14v1", + matchers: models.Matchers{ + { + Type: models.MatchNotRegexp, + Name: []byte("t1"), + Value: []byte("v1"), + }, + }, }, } - keyByte := queryKey(matchers) - assert.Equal(t, []byte("t11v1"), keyByte) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + keyByte := queryKey(test.matchers) + assert.Equal(t, []byte(test.expected), keyByte) + }) + } } diff --git a/src/query/storage/query_conversion_lru_test.go b/src/query/storage/query_conversion_lru_test.go index 0bb7131385..24f9f05a3e 100644 --- a/src/query/storage/query_conversion_lru_test.go +++ b/src/query/storage/query_conversion_lru_test.go @@ -67,6 +67,7 @@ func TestLRU(t *testing.T) { require.False(t, ok) // make sure "b" is still in the cache - _, ok = lru.Get([]byte("b")) + q, ok = lru.Get([]byte("b")) require.True(t, ok) + assert.Equal(t, "term(biz, baz)", q.String()) } From b88193f213b56582152c6ed0a734b973674d9834 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Mon, 25 Feb 2019 17:18:06 -0500 Subject: [PATCH 15/18] Fix test --- src/cmd/services/m3query/config/config.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 579fbf4778..77c4ac2103 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -184,11 +184,14 @@ func (q *QueryConversionCacheConfiguration) SizeOrDefault() int { // Validate validates the QueryConversionCacheConfiguration settings. func (q *QueryConversionCacheConfiguration) Validate() error { - if *q.Size <= 0 { + switch { + case q.Size == nil: + return nil + case *q.Size <= 0: return fmt.Errorf("must provide a positive size for query conversion config, instead got: %d", *q.Size) + default: + return nil } - - return nil } // LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit. From 9d932b9519f59bff84a25c633caca785df628cf4 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Mon, 25 Feb 2019 17:56:22 -0500 Subject: [PATCH 16/18] More tests --- src/query/server/server.go | 7 ++----- src/query/storage/index.go | 1 + src/query/storage/index_test.go | 12 +++++++++++- src/query/storage/query_conversion_lru_test.go | 5 +++++ 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/query/server/server.go b/src/query/server/server.go index d95b2ae995..38f940feb5 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -609,15 +609,12 @@ func newStorages( cleanup := func() error { return nil } // Setup query conversion cache. - var ( - conversionCacheConfig = cfg.Cache.QueryConversionCacheConfiguration() - conversionCacheSize = conversionCacheConfig.SizeOrDefault() - ) - + conversionCacheConfig := cfg.Cache.QueryConversionCacheConfiguration() if err := conversionCacheConfig.Validate(); err != nil { return nil, nil, err } + conversionCacheSize := conversionCacheConfig.SizeOrDefault() conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize) if err != nil { return nil, nil, err diff --git a/src/query/storage/index.go b/src/query/storage/index.go index 91f882978f..22dcae15ba 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -130,6 +130,7 @@ func queryKey(m models.Matchers) []byte { idx += copy(key[idx:], t.Name) key[idx] = lookup[t.Type] idx += copy(key[idx+1:], t.Value) + idx++ } return key diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index e3ae5ef3ea..71541e05d5 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -152,6 +152,11 @@ func TestFetchQueryToM3Query(t *testing.T) { m3Query, err := FetchQueryToM3Query(fetchQuery, cache) require.NoError(t, err) assert.Equal(t, test.expected, m3Query.String()) + + k := queryKey(test.matchers) + q, ok := cache.get(k) + require.True(t, ok) + assert.Equal(t, test.expected, q.String()) }) } } @@ -164,13 +169,18 @@ func TestQueryKey(t *testing.T) { }{ { name: "exact match", - expected: "t11v1", + expected: "t11v1t22v2", matchers: models.Matchers{ { Type: models.MatchEqual, Name: []byte("t1"), Value: []byte("v1"), }, + { + Type: models.MatchNotEqual, + Name: []byte("t2"), + Value: []byte("v2"), + }, }, }, { diff --git a/src/query/storage/query_conversion_lru_test.go b/src/query/storage/query_conversion_lru_test.go index 24f9f05a3e..86f2cf7a98 100644 --- a/src/query/storage/query_conversion_lru_test.go +++ b/src/query/storage/query_conversion_lru_test.go @@ -70,4 +70,9 @@ func TestLRU(t *testing.T) { q, ok = lru.Get([]byte("b")) require.True(t, ok) assert.Equal(t, "term(biz, baz)", q.String()) + + // rewrite "e" and make sure nothing gets evicted + // since "e" is already in the cache. + evicted = lru.Set([]byte("e"), idx.NewTermQuery([]byte("qux"), []byte("quz"))) + require.False(t, evicted) } From 5d0e46c19ed925d66eaf51f3fc35f60945383c6b Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Tue, 26 Feb 2019 10:21:34 -0500 Subject: [PATCH 17/18] Last couple nits --- src/cmd/services/m3query/config/config.go | 9 +++------ src/cmd/services/m3query/config/config_test.go | 7 +++++++ src/query/storage/query_conversion_lru_test.go | 5 +++++ 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 77c4ac2103..e02c950a1a 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -184,14 +184,11 @@ func (q *QueryConversionCacheConfiguration) SizeOrDefault() int { // Validate validates the QueryConversionCacheConfiguration settings. func (q *QueryConversionCacheConfiguration) Validate() error { - switch { - case q.Size == nil: - return nil - case *q.Size <= 0: + if q.Size != nil && *q.Size <= 0 { return fmt.Errorf("must provide a positive size for query conversion config, instead got: %d", *q.Size) - default: - return nil } + + return nil } // LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit. diff --git a/src/cmd/services/m3query/config/config_test.go b/src/cmd/services/m3query/config/config_test.go index 8d3e50bbed..45fe549343 100644 --- a/src/cmd/services/m3query/config/config_test.go +++ b/src/cmd/services/m3query/config/config_test.go @@ -172,3 +172,10 @@ func TestNegativeQueryConversionSize(t *testing.T) { err := q.Validate() require.Error(t, err) } + +func TestNilQueryConversionSize(t *testing.T) { + q := &QueryConversionCacheConfiguration{} + + err := q.Validate() + require.NoError(t, err) +} diff --git a/src/query/storage/query_conversion_lru_test.go b/src/query/storage/query_conversion_lru_test.go index 86f2cf7a98..901335349e 100644 --- a/src/query/storage/query_conversion_lru_test.go +++ b/src/query/storage/query_conversion_lru_test.go @@ -75,4 +75,9 @@ func TestLRU(t *testing.T) { // since "e" is already in the cache. evicted = lru.Set([]byte("e"), idx.NewTermQuery([]byte("qux"), []byte("quz"))) require.False(t, evicted) + + // make sure "e" is still in the cache + q, ok = lru.Get([]byte("e")) + require.True(t, ok) + assert.Equal(t, "term(qux, quz)", q.String()) } From 94e7c12ceaa2528ba3f7057525df9c7b7851ded1 Mon Sep 17 00:00:00 2001 From: Benjamin Raskin Date: Tue, 26 Feb 2019 10:36:36 -0500 Subject: [PATCH 18/18] Add one test --- src/query/storage/query_conversion_lru_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/storage/query_conversion_lru_test.go b/src/query/storage/query_conversion_lru_test.go index 901335349e..992c296575 100644 --- a/src/query/storage/query_conversion_lru_test.go +++ b/src/query/storage/query_conversion_lru_test.go @@ -73,11 +73,11 @@ func TestLRU(t *testing.T) { // rewrite "e" and make sure nothing gets evicted // since "e" is already in the cache. - evicted = lru.Set([]byte("e"), idx.NewTermQuery([]byte("qux"), []byte("quz"))) + evicted = lru.Set([]byte("e"), idx.NewTermQuery([]byte("bar"), []byte("quz"))) require.False(t, evicted) // make sure "e" is still in the cache q, ok = lru.Get([]byte("e")) require.True(t, ok) - assert.Equal(t, "term(qux, quz)", q.String()) + assert.Equal(t, "term(bar, quz)", q.String()) }