Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lru cache for query conversion #1398

Merged
merged 18 commits into from
Feb 26, 2019
45 changes: 45 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
errNoIDGenerationScheme = "error: a recent breaking change means that an ID " +
"generation scheme is required in coordinator configuration settings. " +
"More information is available here: %s"

defaultQueryConversionCacheSize = 4096
)

var (
Expand Down Expand Up @@ -123,6 +125,9 @@ type Configuration struct {

// LookbackDuration determines the lookback duration for queries
LookbackDuration *time.Duration `yaml:"lookbackDuration"`

// Cache configurations.
Cache CacheConfiguration `yaml:"cache"`
}

// Filter is a query filter type.
Expand All @@ -146,6 +151,46 @@ type FilterConfiguration struct {
CompleteTags Filter `yaml:"completeTags"`
}

// CacheConfiguration is the cache configurations.
type CacheConfiguration struct {
// QueryConversion cache policy.
QueryConversion *QueryConversionCacheConfiguration `yaml:"queryConversion"`
}

// QueryConversionCacheConfiguration is the query conversion cache configuration.
type QueryConversionCacheConfiguration struct {
Size *int `yaml:"size"`
}

// QueryConversionCacheConfiguration returns the query conversion cache configuration
// or default if none is specified.
func (c CacheConfiguration) QueryConversionCacheConfiguration() QueryConversionCacheConfiguration {
if c.QueryConversion == nil {
return QueryConversionCacheConfiguration{}
}

return *c.QueryConversion
}

// SizeOrDefault returns the provided size or the default value is none is
// provided.
func (q *QueryConversionCacheConfiguration) SizeOrDefault() int {
if q.Size == nil {
return defaultQueryConversionCacheSize
}

return *q.Size
}

// Validate validates the QueryConversionCacheConfiguration settings.
func (q *QueryConversionCacheConfiguration) Validate() error {
if q.Size != nil && *q.Size <= 0 {
return fmt.Errorf("must provide a positive size for query conversion config, instead got: %d", *q.Size)
}

return nil
}

benraskin92 marked this conversation as resolved.
Show resolved Hide resolved
// LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit.
type LimitsConfiguration struct {
MaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`
Expand Down
17 changes: 17 additions & 0 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,20 @@ func TestTagOptionsConfig(t *testing.T) {
assert.Equal(t, []byte("foo"), opts.BucketName())
assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType())
}

func TestNegativeQueryConversionSize(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test to validate nil case maybe? Up to you

size := -2
q := QueryConversionCacheConfiguration{
Size: &size,
}

err := q.Validate()
require.Error(t, err)
}

func TestNilQueryConversionSize(t *testing.T) {
q := &QueryConversionCacheConfiguration{}

err := q.Validate()
require.NoError(t, err)
}
21 changes: 19 additions & 2 deletions src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import (
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/uber-go/tally"
"go.uber.org/zap"
Expand Down Expand Up @@ -608,13 +608,30 @@ func newStorages(
) (storage.Storage, cleanupFn, error) {
cleanup := func() error { return nil }

localStorage := m3.NewStorage(
// Setup query conversion cache.
conversionCacheConfig := cfg.Cache.QueryConversionCacheConfiguration()
if err := conversionCacheConfig.Validate(); err != nil {
benraskin92 marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil, err
}

conversionCacheSize := conversionCacheConfig.SizeOrDefault()
conversionLRU, err := storage.NewQueryConversionLRU(conversionCacheSize)
if err != nil {
return nil, nil, err
}

localStorage, err := m3.NewStorage(
clusters,
readWorkerPool,
writeWorkerPool,
tagOptions,
*cfg.LookbackDuration,
storage.NewQueryConversionCache(conversionLRU),
)
if err != nil {
return nil, nil, err
}

stores := []storage.Storage{localStorage}
remoteEnabled := false
if cfg.RPC != nil && cfg.RPC.Enabled {
Expand Down
59 changes: 58 additions & 1 deletion src/query/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,36 @@ package storage

import (
"fmt"
"sync"

"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3x/ident"
)

// QueryConversionCache represents the query conversion LRU cache
type QueryConversionCache struct {
mu sync.Mutex

lru *QueryConversionLRU
}

// NewQueryConversionCache creates a new QueryConversionCache with a provided LRU cache
func NewQueryConversionCache(lru *QueryConversionLRU) *QueryConversionCache {
return &QueryConversionCache{
lru: lru,
}
}

func (q *QueryConversionCache) set(k []byte, v idx.Query) bool {
return q.lru.Set(k, v)
}

func (q *QueryConversionCache) get(k []byte) (idx.Query, bool) {
return q.lru.Get(k)
}

// FromM3IdentToMetric converts an M3 ident metric to a coordinator metric
func FromM3IdentToMetric(
identID ident.ID,
Expand Down Expand Up @@ -90,16 +113,49 @@ func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery)
}
}

var (
// byte representation for [1,2,3,4]
lookup = [4]byte{49, 50, 51, 52}
)

func queryKey(m models.Matchers) []byte {
benraskin92 marked this conversation as resolved.
Show resolved Hide resolved
l := len(m)
for _, t := range m {
l += len(t.Name) + len(t.Value)
}

key := make([]byte, l)
idx := 0
for _, t := range m {
idx += copy(key[idx:], t.Name)
key[idx] = lookup[t.Type]
idx += copy(key[idx+1:], t.Value)
idx++
}

return key
}

// FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query
func FetchQueryToM3Query(fetchQuery *FetchQuery) (index.Query, error) {
func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (index.Query, error) {
matchers := fetchQuery.TagMatchers
k := queryKey(matchers)

cache.mu.Lock()
defer cache.mu.Unlock()

if val, ok := cache.get(k); ok {
return index.Query{Query: val}, nil
}

// Optimization for single matcher case.
if len(matchers) == 1 {
q, err := matcherToQuery(matchers[0])
if err != nil {
return index.Query{}, err
}

cache.set(k, q)
return index.Query{Query: q}, nil
}

Expand All @@ -113,6 +169,7 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery) (index.Query, error) {
}

q := idx.NewConjunctionQuery(idxQueries...)
cache.set(k, q)
return index.Query{Query: q}, nil
}

Expand Down
78 changes: 77 additions & 1 deletion src/query/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ func TestFetchQueryToM3Query(t *testing.T) {
},
}

lru, err := NewQueryConversionLRU(10)
require.NoError(t, err)

cache := &QueryConversionCache{
lru: lru,
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fetchQuery := &FetchQuery{
Expand All @@ -142,10 +149,79 @@ func TestFetchQueryToM3Query(t *testing.T) {
Interval: 15 * time.Second,
}

m3Query, err := FetchQueryToM3Query(fetchQuery)
m3Query, err := FetchQueryToM3Query(fetchQuery, cache)
benraskin92 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
assert.Equal(t, test.expected, m3Query.String())

k := queryKey(test.matchers)
q, ok := cache.get(k)
require.True(t, ok)
assert.Equal(t, test.expected, q.String())
})
}
}

func TestQueryKey(t *testing.T) {
tests := []struct {
name string
expected string
matchers models.Matchers
}{
{
name: "exact match",
expected: "t11v1t22v2",
matchers: models.Matchers{
{
Type: models.MatchEqual,
Name: []byte("t1"),
Value: []byte("v1"),
},
{
Type: models.MatchNotEqual,
Name: []byte("t2"),
Value: []byte("v2"),
},
},
},
{
name: "exact match negated",
expected: "t12v1",
matchers: models.Matchers{
{
Type: models.MatchNotEqual,
Name: []byte("t1"),
Value: []byte("v1"),
},
},
},
{
name: "regexp match",
expected: "t13v1",
matchers: models.Matchers{
{
Type: models.MatchRegexp,
Name: []byte("t1"),
Value: []byte("v1"),
},
},
},
{
name: "regexp match negated",
expected: "t14v1",
matchers: models.Matchers{
{
Type: models.MatchNotRegexp,
Name: []byte("t1"),
Value: []byte("v1"),
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
keyByte := queryKey(test.matchers)
assert.Equal(t, []byte(test.expected), keyByte)
benraskin92 marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
11 changes: 7 additions & 4 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type m3storage struct {
writeWorkerPool xsync.PooledWorkerPool
opts m3db.Options
nowFn func() time.Time
conversionCache *storage.QueryConversionCache
}

// NewStorage creates a new local m3storage instance.
Expand All @@ -72,7 +73,8 @@ func NewStorage(
writeWorkerPool xsync.PooledWorkerPool,
tagOptions models.TagOptions,
lookbackDuration time.Duration,
) Storage {
queryConversionCache *storage.QueryConversionCache,
) (Storage, error) {
opts := m3db.NewOptions().
SetTagOptions(tagOptions).
SetLookbackDuration(lookbackDuration).
Expand All @@ -84,7 +86,8 @@ func NewStorage(
writeWorkerPool: writeWorkerPool,
opts: opts,
nowFn: time.Now,
}
conversionCache: queryConversionCache,
}, nil
}

func (s *m3storage) Fetch(
Expand Down Expand Up @@ -200,7 +203,7 @@ func (s *m3storage) fetchCompressed(
default:
}

m3query, err := storage.FetchQueryToM3Query(query)
m3query, err := storage.FetchQueryToM3Query(query, s.conversionCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -370,7 +373,7 @@ func (s *m3storage) SearchCompressed(
default:
}

m3query, err := storage.FetchQueryToM3Query(query)
m3query, err := storage.FetchQueryToM3Query(query, s.conversionCache)
if err != nil {
return nil, noop, err
}
Expand Down
5 changes: 4 additions & 1 deletion src/query/storage/m3/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ func newTestStorage(t *testing.T, clusters Clusters) storage.Storage {
require.NoError(t, err)
writePool.Init()
opts := models.NewTagOptions().SetMetricName([]byte("name"))
storage := NewStorage(clusters, nil, writePool, opts, time.Minute)
queryCache, err := storage.NewQueryConversionLRU(100)
require.NoError(t, err)
storage, err := NewStorage(clusters, nil, writePool, opts, time.Minute, storage.NewQueryConversionCache(queryCache))
require.NoError(t, err)
return storage
}

Expand Down
Loading