diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index fa2d5f5c8b..e02c950a1a 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -53,6 +53,8 @@ const ( errNoIDGenerationScheme = "error: a recent breaking change means that an ID " + "generation scheme is required in coordinator configuration settings. " + "More information is available here: %s" + + defaultQueryConversionCacheSize = 4096 ) var ( @@ -123,6 +125,9 @@ type Configuration struct { // LookbackDuration determines the lookback duration for queries LookbackDuration *time.Duration `yaml:"lookbackDuration"` + + // Cache configurations. + Cache CacheConfiguration `yaml:"cache"` } // Filter is a query filter type. @@ -146,6 +151,46 @@ type FilterConfiguration struct { CompleteTags Filter `yaml:"completeTags"` } +// CacheConfiguration is the cache configurations. +type CacheConfiguration struct { + // QueryConversion cache policy. + QueryConversion *QueryConversionCacheConfiguration `yaml:"queryConversion"` +} + +// QueryConversionCacheConfiguration is the query conversion cache configuration. +type QueryConversionCacheConfiguration struct { + Size *int `yaml:"size"` +} + +// QueryConversionCacheConfiguration returns the query conversion cache configuration +// or default if none is specified. +func (c CacheConfiguration) QueryConversionCacheConfiguration() QueryConversionCacheConfiguration { + if c.QueryConversion == nil { + return QueryConversionCacheConfiguration{} + } + + return *c.QueryConversion +} + +// SizeOrDefault returns the provided size or the default value is none is +// provided. +func (q *QueryConversionCacheConfiguration) SizeOrDefault() int { + if q.Size == nil { + return defaultQueryConversionCacheSize + } + + return *q.Size +} + +// Validate validates the QueryConversionCacheConfiguration settings. +func (q *QueryConversionCacheConfiguration) Validate() error { + if q.Size != nil && *q.Size <= 0 { + return fmt.Errorf("must provide a positive size for query conversion config, instead got: %d", *q.Size) + } + + return nil +} + // LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit. type LimitsConfiguration struct { MaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` diff --git a/src/cmd/services/m3query/config/config_test.go b/src/cmd/services/m3query/config/config_test.go index 8d7c2f8d94..45fe549343 100644 --- a/src/cmd/services/m3query/config/config_test.go +++ b/src/cmd/services/m3query/config/config_test.go @@ -162,3 +162,20 @@ func TestTagOptionsConfig(t *testing.T) { assert.Equal(t, []byte("foo"), opts.BucketName()) assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType()) } + +func TestNegativeQueryConversionSize(t *testing.T) { + size := -2 + q := QueryConversionCacheConfiguration{ + Size: &size, + } + + err := q.Validate() + require.Error(t, err) +} + +func TestNilQueryConversionSize(t *testing.T) { + q := &QueryConversionCacheConfiguration{} + + err := q.Validate() + require.NoError(t, err) +} diff --git a/src/query/server/server.go b/src/query/server/server.go index 81eecf661b..38f940feb5 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -64,7 +64,7 @@ import ( xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/uber-go/tally" "go.uber.org/zap" @@ -608,13 +608,30 @@ func newStorages( ) (storage.Storage, cleanupFn, error) { cleanup := func() error { return nil } - localStorage := m3.NewStorage( + // Setup query conversion cache. + conversionCacheConfig := cfg.Cache.QueryConversionCacheConfiguration() + if err := conversionCacheConfig.Validate(); err != nil { + return nil, nil, err + } + + conversionCacheSize := conversionCacheConfig.SizeOrDefault() + conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize) + if err != nil { + return nil, nil, err + } + + localStorage, err := m3.NewStorage( clusters, readWorkerPool, writeWorkerPool, tagOptions, *cfg.LookbackDuration, + storage.NewQueryConversionCache(conversionLRU), ) + if err != nil { + return nil, nil, err + } + stores := []storage.Storage{localStorage} remoteEnabled := false if cfg.RPC != nil && cfg.RPC.Enabled { diff --git a/src/query/storage/index.go b/src/query/storage/index.go index b89b9bc5a9..22dcae15ba 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -22,6 +22,7 @@ package storage import ( "fmt" + "sync" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/m3ninx/idx" @@ -29,6 +30,28 @@ import ( "github.com/m3db/m3x/ident" ) +// QueryConversionCache represents the query conversion LRU cache +type QueryConversionCache struct { + mu sync.Mutex + + lru *QueryConversionLRU +} + +// NewQueryConversionCache creates a new QueryConversionCache with a provided LRU cache +func NewQueryConversionCache(lru *QueryConversionLRU) *QueryConversionCache { + return &QueryConversionCache{ + lru: lru, + } +} + +func (q *QueryConversionCache) set(k []byte, v idx.Query) bool { + return q.lru.Set(k, v) +} + +func (q *QueryConversionCache) get(k []byte) (idx.Query, bool) { + return q.lru.Get(k) +} + // FromM3IdentToMetric converts an M3 ident metric to a coordinator metric func FromM3IdentToMetric( identID ident.ID, @@ -90,9 +113,41 @@ func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery) } } +var ( + // byte representation for [1,2,3,4] + lookup = [4]byte{49, 50, 51, 52} +) + +func queryKey(m models.Matchers) []byte { + l := len(m) + for _, t := range m { + l += len(t.Name) + len(t.Value) + } + + key := make([]byte, l) + idx := 0 + for _, t := range m { + idx += copy(key[idx:], t.Name) + key[idx] = lookup[t.Type] + idx += copy(key[idx+1:], t.Value) + idx++ + } + + return key +} + // FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query -func FetchQueryToM3Query(fetchQuery *FetchQuery) (index.Query, error) { +func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (index.Query, error) { matchers := fetchQuery.TagMatchers + k := queryKey(matchers) + + cache.mu.Lock() + defer cache.mu.Unlock() + + if val, ok := cache.get(k); ok { + return index.Query{Query: val}, nil + } + // Optimization for single matcher case. if len(matchers) == 1 { q, err := matcherToQuery(matchers[0]) @@ -100,6 +155,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery) (index.Query, error) { return index.Query{}, err } + cache.set(k, q) return index.Query{Query: q}, nil } @@ -113,6 +169,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery) (index.Query, error) { } q := idx.NewConjunctionQuery(idxQueries...) + cache.set(k, q) return index.Query{Query: q}, nil } diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index f5815aec69..71541e05d5 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -132,6 +132,13 @@ func TestFetchQueryToM3Query(t *testing.T) { }, } + lru, err := NewQueryConversionLRU(10) + require.NoError(t, err) + + cache := &QueryConversionCache{ + lru: lru, + } + for _, test := range tests { t.Run(test.name, func(t *testing.T) { fetchQuery := &FetchQuery{ @@ -142,10 +149,79 @@ func TestFetchQueryToM3Query(t *testing.T) { Interval: 15 * time.Second, } - m3Query, err := FetchQueryToM3Query(fetchQuery) + m3Query, err := FetchQueryToM3Query(fetchQuery, cache) require.NoError(t, err) assert.Equal(t, test.expected, m3Query.String()) + + k := queryKey(test.matchers) + q, ok := cache.get(k) + require.True(t, ok) + assert.Equal(t, test.expected, q.String()) }) } +} + +func TestQueryKey(t *testing.T) { + tests := []struct { + name string + expected string + matchers models.Matchers + }{ + { + name: "exact match", + expected: "t11v1t22v2", + matchers: models.Matchers{ + { + Type: models.MatchEqual, + Name: []byte("t1"), + Value: []byte("v1"), + }, + { + Type: models.MatchNotEqual, + Name: []byte("t2"), + Value: []byte("v2"), + }, + }, + }, + { + name: "exact match negated", + expected: "t12v1", + matchers: models.Matchers{ + { + Type: models.MatchNotEqual, + Name: []byte("t1"), + Value: []byte("v1"), + }, + }, + }, + { + name: "regexp match", + expected: "t13v1", + matchers: models.Matchers{ + { + Type: models.MatchRegexp, + Name: []byte("t1"), + Value: []byte("v1"), + }, + }, + }, + { + name: "regexp match negated", + expected: "t14v1", + matchers: models.Matchers{ + { + Type: models.MatchNotRegexp, + Name: []byte("t1"), + Value: []byte("v1"), + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + keyByte := queryKey(test.matchers) + assert.Equal(t, []byte(test.expected), keyByte) + }) + } } diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index d5a92992be..5627447e30 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -62,6 +62,7 @@ type m3storage struct { writeWorkerPool xsync.PooledWorkerPool opts m3db.Options nowFn func() time.Time + conversionCache *storage.QueryConversionCache } // NewStorage creates a new local m3storage instance. @@ -72,7 +73,8 @@ func NewStorage( writeWorkerPool xsync.PooledWorkerPool, tagOptions models.TagOptions, lookbackDuration time.Duration, -) Storage { + queryConversionCache *storage.QueryConversionCache, +) (Storage, error) { opts := m3db.NewOptions(). SetTagOptions(tagOptions). SetLookbackDuration(lookbackDuration). @@ -84,7 +86,8 @@ func NewStorage( writeWorkerPool: writeWorkerPool, opts: opts, nowFn: time.Now, - } + conversionCache: queryConversionCache, + }, nil } func (s *m3storage) Fetch( @@ -200,7 +203,7 @@ func (s *m3storage) fetchCompressed( default: } - m3query, err := storage.FetchQueryToM3Query(query) + m3query, err := storage.FetchQueryToM3Query(query, s.conversionCache) if err != nil { return nil, err } @@ -370,7 +373,7 @@ func (s *m3storage) SearchCompressed( default: } - m3query, err := storage.FetchQueryToM3Query(query) + m3query, err := storage.FetchQueryToM3Query(query, s.conversionCache) if err != nil { return nil, noop, err } diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index bce35f0842..e38f8c7075 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -122,7 +122,10 @@ func newTestStorage(t *testing.T, clusters Clusters) storage.Storage { require.NoError(t, err) writePool.Init() opts := models.NewTagOptions().SetMetricName([]byte("name")) - storage := NewStorage(clusters, nil, writePool, opts, time.Minute) + queryCache, err := storage.NewQueryConversionLRU(100) + require.NoError(t, err) + storage, err := NewStorage(clusters, nil, writePool, opts, time.Minute, storage.NewQueryConversionCache(queryCache)) + require.NoError(t, err) return storage } diff --git a/src/query/storage/query_conversion_lru.go b/src/query/storage/query_conversion_lru.go new file mode 100644 index 0000000000..f1358ba16d --- /dev/null +++ b/src/query/storage/query_conversion_lru.go @@ -0,0 +1,104 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "container/list" + "fmt" + + "github.com/m3db/m3/src/m3ninx/idx" +) + +// QueryConversionLRU implements a fixed size LRU cache +type QueryConversionLRU struct { + size int + evictList *list.List + items map[string]*list.Element +} + +// entry is used to hold a value in the evictList +type entry struct { + key []byte + value idx.Query +} + +// NewQueryConversionLRU constructs an LRU of the given size +func NewQueryConversionLRU(size int) (*QueryConversionLRU, error) { + if size <= 0 { + return nil, fmt.Errorf("must provide a positive size, instead got: %d", size) + } + + c := &QueryConversionLRU{ + size: size, + evictList: list.New(), + items: make(map[string]*list.Element, size), + } + + return c, nil +} + +// Set adds a value to the cache. Returns true if an eviction occurred. +func (c *QueryConversionLRU) Set(key []byte, value idx.Query) (evicted bool) { + // Check for existing item + if ent, ok := c.items[string(key)]; ok { + c.evictList.MoveToFront(ent) + ent.Value.(*entry).value = value + return false + } + + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[string(key)] = entry + + evict := c.evictList.Len() > c.size + // Verify size not exceeded + if evict { + c.removeOldest() + } + + return evict +} + +// Get looks up a key's value from the cache. +func (c *QueryConversionLRU) Get(key []byte) (value idx.Query, ok bool) { + if ent, ok := c.items[string(key)]; ok { + c.evictList.MoveToFront(ent) + return ent.Value.(*entry).value, true + } + + return +} + +// removeOldest removes the oldest item from the cache. +func (c *QueryConversionLRU) removeOldest() { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + } +} + +// removeElement is used to remove a given list element from the cache +func (c *QueryConversionLRU) removeElement(e *list.Element) { + c.evictList.Remove(e) + kv := e.Value.(*entry) + delete(c.items, string(kv.key)) +} diff --git a/src/query/storage/query_conversion_lru_test.go b/src/query/storage/query_conversion_lru_test.go new file mode 100644 index 0000000000..992c296575 --- /dev/null +++ b/src/query/storage/query_conversion_lru_test.go @@ -0,0 +1,83 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "testing" + + "github.com/m3db/m3/src/m3ninx/idx" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLRU(t *testing.T) { + lru, err := NewQueryConversionLRU(5) + require.NoError(t, err) + + var ( + ok bool + evicted bool + q idx.Query + ) + + // test add and get + evicted = lru.Set([]byte("a"), idx.NewTermQuery([]byte("foo"), []byte("bar"))) + require.False(t, evicted) + evicted = lru.Set([]byte("b"), idx.NewTermQuery([]byte("biz"), []byte("baz"))) + require.False(t, evicted) + + q, ok = lru.Get([]byte("a")) + require.True(t, ok) + assert.Equal(t, "term(foo, bar)", q.String()) + q, ok = lru.Get([]byte("b")) + require.True(t, ok) + assert.Equal(t, "term(biz, baz)", q.String()) + + // fill up the cache + evicted = lru.Set([]byte("c"), idx.NewTermQuery([]byte("bar"), []byte("foo"))) + require.False(t, evicted) + evicted = lru.Set([]byte("d"), idx.NewTermQuery([]byte("baz"), []byte("biz"))) + require.False(t, evicted) + evicted = lru.Set([]byte("e"), idx.NewTermQuery([]byte("qux"), []byte("quz"))) + require.False(t, evicted) + evicted = lru.Set([]byte("f"), idx.NewTermQuery([]byte("quz"), []byte("qux"))) + require.True(t, evicted) + + // make sure "a" is no longer in the cache + _, ok = lru.Get([]byte("a")) + require.False(t, ok) + + // make sure "b" is still in the cache + q, ok = lru.Get([]byte("b")) + require.True(t, ok) + assert.Equal(t, "term(biz, baz)", q.String()) + + // rewrite "e" and make sure nothing gets evicted + // since "e" is already in the cache. + evicted = lru.Set([]byte("e"), idx.NewTermQuery([]byte("bar"), []byte("quz"))) + require.False(t, evicted) + + // make sure "e" is still in the cache + q, ok = lru.Get([]byte("e")) + require.True(t, ok) + assert.Equal(t, "term(bar, quz)", q.String()) +} diff --git a/src/query/test/m3/test_storage.go b/src/query/test/m3/test_storage.go index 22af2fe2bc..e2277f3e5d 100644 --- a/src/query/test/m3/test_storage.go +++ b/src/query/test/m3/test_storage.go @@ -64,7 +64,10 @@ func NewStorageAndSession( require.NoError(t, err) writePool.Init() tagOptions := models.NewTagOptions().SetMetricName([]byte("name")) - storage := m3.NewStorage(clusters, nil, writePool, tagOptions, defaultLookbackDuration) + queryCache, err := storage.NewQueryConversionLRU(100) + require.NoError(t, err) + storage, err := m3.NewStorage(clusters, nil, writePool, tagOptions, defaultLookbackDuration, storage.NewQueryConversionCache(queryCache)) + require.NoError(t, err) return storage, session } @@ -91,6 +94,9 @@ func NewStorageAndSessionWithAggregatedNamespaces( require.NoError(t, err) writePool.Init() tagOptions := models.NewTagOptions().SetMetricName([]byte("name")) - storage := m3.NewStorage(clusters, nil, writePool, tagOptions, defaultLookbackDuration) + queryCache, err := storage.NewQueryConversionLRU(100) + require.NoError(t, err) + storage, err := m3.NewStorage(clusters, nil, writePool, tagOptions, defaultLookbackDuration, storage.NewQueryConversionCache(queryCache)) + require.NoError(t, err) return storage, session }