Skip to content

Commit

Permalink
WIP on query side tag completion endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola committed Mar 20, 2019
1 parent 69fffb9 commit 5a2ac6a
Show file tree
Hide file tree
Showing 20 changed files with 262 additions and 90 deletions.
2 changes: 1 addition & 1 deletion scripts/development/m3_stack/start_m3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ if [[ "$FORCE_BUILD" = true ]] ; then
fi

echo "Bringing up nodes in the background with docker compose, remember to run ./stop.sh when done"
docker-compose -f docker-compose.yml up $DOCKER_ARGS m3coordinator01
docker-compose -f docker-compose.yml up --build $DOCKER_ARGS m3coordinator01
docker-compose -f docker-compose.yml up $DOCKER_ARGS m3db_seed
docker-compose -f docker-compose.yml up $DOCKER_ARGS prometheus01
docker-compose -f docker-compose.yml up $DOCKER_ARGS grafana
Expand Down
50 changes: 49 additions & 1 deletion src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,12 @@ func (s *session) FetchIDs(
return result, err
}

func (s *session) FetchTags(
ns ident.ID, q index.Query, opts index.AggregateQueryOptions,
) (FetchedTags, bool, error) {
return FetchedTags{}, true, fmt.Errorf("not implemented")
}

func (s *session) FetchTagged(
ns ident.ID, q index.Query, opts index.QueryOptions,
) (encoding.SeriesIterators, bool, error) {
Expand Down
23 changes: 23 additions & 0 deletions src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type Session interface {
// FetchTaggedIDs resolves the provided query to known IDs.
FetchTaggedIDs(namespace ident.ID, q index.Query, opts index.QueryOptions) (iter TaggedIDsIterator, exhaustive bool, err error)

// FetchTags resolves the optionally provided query to any known tag matchers.
// NB: this is a provisional function until the `session` contract is fully defined
FetchTags(namespace ident.ID, q index.Query, opts index.AggregateQueryOptions) (tags FetchedTags, exhaustive bool, err error)

// ShardID returns the given shard for an ID for callers
// to easily discern what shard is failing when operations
// for given IDs begin failing
Expand All @@ -90,6 +94,25 @@ type Session interface {
Close() error
}

// FetchedTag represents a tag name and an iterator containing any existing tag
// values for that iterator.
// NB: this is a provisional type until the `session` contract is fully defined
type FetchedTag struct {
TagName ident.ID
TagValues ident.Iterator
}

// FetchedTags is a pooled list of FetchedTags.
// NB: this is a provisional type until the `session` contract is fully defined
type FetchedTags struct {
Tags []FetchedTag
pool ident.Pool
}

// Close returns all resources held by FetchedTags.
// NB: this is a provisional type until the `session` contract is fully defined
func (f *FetchedTags) Close() error { return nil }

// TaggedIDsIterator iterates over a collection of IDs with associated tags and namespace.
type TaggedIDsIterator interface {
// Next returns whether there are more items in the collection.
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
"github.com/m3db/m3/src/m3ninx/doc"
Expand Down Expand Up @@ -60,6 +61,13 @@ type Query struct {
idx.Query
}

type AggregateQueryOptions struct {
QueryOptions

TagNameFilter [][]byte
AggregateQueryType rpc.AggregateQueryType
}

// QueryOptions enables users to specify constraints on query execution.
type QueryOptions struct {
StartInclusive time.Time
Expand Down
4 changes: 3 additions & 1 deletion src/query/api/v1/handler/graphite/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func (h *grahiteFindHandler) ServeHTTP(
}

opts := storage.NewFetchOptions()
result, err := h.storage.FetchTags(ctx, query, opts)
// FIXME: arnikola, use the tag completion point instead of this one here
// if someone finds this in the PR I owe you a beer
result, err := h.storage.SearchSeries(ctx, query, opts)
if err != nil {
logger.Error("unable to complete tags", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
Expand Down
1 change: 1 addition & 0 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func newPromReadMetrics(scope tally.Scope) promReadMetrics {

func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)

logger := logging.WithContext(ctx)

req, rErr := h.parseRequest(r)
Expand Down
8 changes: 6 additions & 2 deletions src/query/api/v1/handler/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ func (h *SearchHandler) parseURLParams(r *http.Request) *storage.FetchOptions {
return &fetchOptions
}

func (h *SearchHandler) search(ctx context.Context, query *storage.FetchQuery, opts *storage.FetchOptions) (*storage.SearchResults, error) {
return h.store.FetchTags(ctx, query, opts)
func (h *SearchHandler) search(
ctx context.Context,
query *storage.FetchQuery,
opts *storage.FetchOptions,
) (*storage.SearchResults, error) {
return h.store.SearchSeries(ctx, query, opts)
}

func newFetchOptions(limit int) storage.FetchOptions {
Expand Down
4 changes: 2 additions & 2 deletions src/query/storage/fanout/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func handleFetchResponses(requests []execution.Request) (*storage.FetchResult, e
return result, nil
}

func (s *fanoutStorage) FetchTags(
func (s *fanoutStorage) SearchSeries(
ctx context.Context,
query *storage.FetchQuery,
options *storage.FetchOptions,
Expand All @@ -127,7 +127,7 @@ func (s *fanoutStorage) FetchTags(

stores := filterStores(s.stores, s.fetchFilter, query)
for _, store := range stores {
results, err := store.FetchTags(ctx, query, options)
results, err := store.SearchSeries(ctx, query, options)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions src/query/storage/fanout/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestFanoutReadSuccess(t *testing.T) {

func TestFanoutSearchEmpty(t *testing.T) {
store := setupFanoutRead(t, false)
res, err := store.FetchTags(context.TODO(), nil, nil)
res, err := store.SearchSeries(context.TODO(), nil, nil)
assert.NoError(t, err, "No error")
require.NotNil(t, res, "Non empty result")
assert.Len(t, res.Metrics, 0, "No series")
Expand All @@ -170,7 +170,7 @@ func TestFanoutSearchEmpty(t *testing.T) {
func TestFanoutSearchError(t *testing.T) {
store := setupFanoutRead(t, true)
opts := storage.NewFetchOptions()
_, err := store.FetchTags(context.TODO(), &storage.FetchQuery{}, opts)
_, err := store.SearchSeries(context.TODO(), &storage.FetchQuery{}, opts)
assert.Error(t, err)
}

Expand Down Expand Up @@ -203,6 +203,7 @@ func TestFanoutWriteSuccess(t *testing.T) {
}

func TestCompleteTagsFailure(t *testing.T) {
t.Skip("TODO: test skipped until proper format defined")
store := setupFanoutWrite(t, true, fmt.Errorf("err"))
datapoints := make(ts.Datapoints, 1)
datapoints[0] = ts.Datapoint{Timestamp: time.Now(), Value: 1}
Expand Down
69 changes: 56 additions & 13 deletions src/query/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ package storage
import (
"fmt"
"sync"
"time"

"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3x/ident"
)

// QueryConversionCache represents the query conversion LRU cache
// QueryConversionCache represents the query conversion LRU cache.
type QueryConversionCache struct {
mu sync.Mutex
sync.RWMutex

lru *QueryConversionLRU
}

// NewQueryConversionCache creates a new QueryConversionCache with a provided LRU cache
// NewQueryConversionCache creates a new QueryConversionCache with a provided LRU cache.
func NewQueryConversionCache(lru *QueryConversionLRU) *QueryConversionCache {
return &QueryConversionCache{
lru: lru,
Expand All @@ -52,7 +54,7 @@ func (q *QueryConversionCache) get(k []byte) (idx.Query, bool) {
return q.lru.Get(k)
}

// FromM3IdentToMetric converts an M3 ident metric to a coordinator metric
// FromM3IdentToMetric converts an M3 ident metric to a coordinator metric.
func FromM3IdentToMetric(
identID ident.ID,
iterTags ident.TagIterator,
Expand All @@ -69,7 +71,7 @@ func FromM3IdentToMetric(
}, nil
}

// FromIdentTagIteratorToTags converts ident tags to coordinator tags
// FromIdentTagIteratorToTags converts ident tags to coordinator tags.
func FromIdentTagIteratorToTags(
identTags ident.TagIterator,
tagOptions models.TagOptions,
Expand All @@ -90,9 +92,9 @@ func FromIdentTagIteratorToTags(
return tags, nil
}

// TagsToIdentTagIterator converts coordinator tags to ident tags
// TagsToIdentTagIterator converts coordinator tags to ident tags.
func TagsToIdentTagIterator(tags models.Tags) ident.TagIterator {
//TODO get a tags and tag iterator from an ident.Pool here rather than allocing them here
// TODO: get a tags and tag iterator from an ident.Pool here rather than allocing them here
identTags := make([]ident.Tag, 0, tags.Len())
for _, t := range tags.Tags {
identTags = append(identTags, ident.Tag{
Expand All @@ -104,7 +106,7 @@ func TagsToIdentTagIterator(tags models.Tags) ident.TagIterator {
return ident.NewTagsIterator(ident.NewTags(identTags...))
}

// FetchOptionsToM3Options converts a set of coordinator options to M3 options
// FetchOptionsToM3Options converts a set of coordinator options to M3 options.
func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery) index.QueryOptions {
return index.QueryOptions{
Limit: fetchOptions.Limit,
Expand All @@ -113,6 +115,31 @@ func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery)
}
}

func convertAggregateQueryType(completeNameOnly bool) rpc.AggregateQueryType {
if completeNameOnly {
return rpc.AggregateQueryType_AGGREGATE_BY_TAG_NAME
}

return rpc.AggregateQueryType_AGGREGATE_BY_TAG_NAME_VALUE
}

// FetchOptionsToAggregateOptions converts a set of coordinator options as well
// as complete tags query to an M3 aggregate query option.
func FetchOptionsToAggregateOptions(
fetchOptions *FetchOptions,
fetchQuery *CompleteTagsQuery,
) index.AggregateQueryOptions {
return index.AggregateQueryOptions{
QueryOptions: index.QueryOptions{
Limit: fetchOptions.Limit,
StartInclusive: time.Time{},
EndExclusive: time.Now(),
},
TagNameFilter: fetchQuery.FilterNameTags,
AggregateQueryType: convertAggregateQueryType(fetchQuery.CompleteNameOnly),
}
}

var (
// byte representation for [1,2,3,4]
lookup = [4]byte{49, 50, 51, 52}
Expand All @@ -136,26 +163,39 @@ func queryKey(m models.Matchers) []byte {
return key
}

// FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query
func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (index.Query, error) {
// FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query.
func FetchQueryToM3Query(
fetchQuery *FetchQuery,
cache *QueryConversionCache,
) (index.Query, error) {
matchers := fetchQuery.TagMatchers
k := queryKey(matchers)
// If no matchers provided, explicitly set this to an AllQuery
if len(matchers) == 0 {
return index.Query{
// TODO: change this to an idx.AllQuery: https://github.com/m3db/m3/pull/1478
Query: idx.Query{},
}, nil
}

cache.mu.Lock()
defer cache.mu.Unlock()
k := queryKey(matchers)
cache.RLock()

if val, ok := cache.get(k); ok {
cache.RUnlock()
return index.Query{Query: val}, nil
}

cache.RUnlock()
// Optimization for single matcher case.
if len(matchers) == 1 {
q, err := matcherToQuery(matchers[0])
if err != nil {
return index.Query{}, err
}

cache.Lock()
cache.set(k, q)
cache.Unlock()
return index.Query{Query: q}, nil
}

Expand All @@ -169,7 +209,10 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (i
}

q := idx.NewConjunctionQuery(idxQueries...)
cache.Lock()
cache.set(k, q)
cache.Unlock()

return index.Query{Query: q}, nil
}

Expand Down
Loading

0 comments on commit 5a2ac6a

Please sign in to comment.