Skip to content

Commit

Permalink
Use LazySeriesIterator with fuzzy metric name queries (#442)
Browse files Browse the repository at this point in the history
* Create v8Schema with series index

* Update comment

* Use sha256 for seriesID

* Move iterators inside chunk store

* Create TestChunksToIterators

* Add merge tests

* Add iterator tests

* Add GetReadQueries for v8Schema

* Add merge iterator tests

* Remove absTimeDifference

* Rename MergeSamples to MergeSampleSets

* Remove parallelism for MergeNSampleSets

* Use lazy series iterator when fuzzy metric names

* Tidy up merge iterator

* Refactor getLazySeriesIterators

* Rename function

* Updagte chunk store tests

* Add chunk store get tests

* Add LazySeriesIterator tests

* Move LazySeriesIterator to chunk package

* Lookup chunks when accessing samples

* Create and use sampleSeriesIterator in lazySeriesIterator

* Add dynamodb.v8-schema-from to local k8s

* Move filter splitter inside get iterator methods

* Rename getFuzzyMetricLazySeriesIterators to getSeriesIterators

* Remove unnecessary splitting of matchers

* Use _ = iota to start at 1

* Update tests

* Fix TestChunkStore_Get_lazy tests

* Add comment about mergeIterator from prometheus fanin

* Improve schema comment

* Move chunksToMatrix to ingester tests

* Move metric name check into constructor

* Create matchers based on metric inside LazySeriesIterator

* Use recursive merger

* Create context inside lazy iterator

* Cache metricName on iterator

* Fix ingester import
  • Loading branch information
aaron7 authored Jul 12, 2017
1 parent 30b83fa commit aea9137
Show file tree
Hide file tree
Showing 25 changed files with 1,395 additions and 268 deletions.
1 change: 1 addition & 0 deletions k8s/querier-dep.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ spec:
- -dynamodb.v4-schema-from=2017-02-05
- -dynamodb.v5-schema-from=2017-02-22
- -dynamodb.v7-schema-from=2017-03-19
- -dynamodb.v8-schema-from=2017-06-05
- -dynamodb.chunk-table.from=2017-04-17
- -memcached.hostname=memcached.default.svc.cluster.local
- -memcached.timeout=100ms
Expand Down
1 change: 1 addition & 0 deletions k8s/ruler-dep.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ spec:
- -dynamodb.v4-schema-from=2017-02-05
- -dynamodb.v5-schema-from=2017-02-22
- -dynamodb.v7-schema-from=2017-03-19
- -dynamodb.v8-schema-from=2017-06-05
- -dynamodb.chunk-table.from=2017-04-17
- -memcached.hostname=memcached.default.svc.cluster.local
- -memcached.timeout=100ms
Expand Down
17 changes: 9 additions & 8 deletions pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local"
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"

"github.com/weaveworks/common/errors"
Expand Down Expand Up @@ -278,8 +279,7 @@ func (c *Chunk) decode(input []byte) error {
})
}

// ChunksToMatrix converts a slice of chunks into a model.Matrix.
func ChunksToMatrix(chunks []Chunk) (model.Matrix, error) {
func chunksToIterators(chunks []Chunk) ([]local.SeriesIterator, error) {
// Group chunks by series, sort and dedupe samples.
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
for _, c := range chunks {
Expand All @@ -292,23 +292,24 @@ func ChunksToMatrix(chunks []Chunk) (model.Matrix, error) {
sampleStreams[fp] = ss
}

samples, err := c.samples()
samples, err := c.Samples()
if err != nil {
return nil, err
}

ss.Values = util.MergeSamples(ss.Values, samples)
ss.Values = util.MergeSampleSets(ss.Values, samples)
}

matrix := make(model.Matrix, 0, len(sampleStreams))
iterators := make([]local.SeriesIterator, 0, len(sampleStreams))
for _, ss := range sampleStreams {
matrix = append(matrix, ss)
iterators = append(iterators, util.NewSampleStreamIterator(ss))
}

return matrix, nil
return iterators, nil
}

func (c *Chunk) samples() ([]model.SamplePair, error) {
// Samples returns all SamplePairs for the chunk.
func (c *Chunk) Samples() ([]model.SamplePair, error) {
it := c.Data.NewIterator()
// TODO(juliusv): Pre-allocate this with the right length again once we
// add a method upstream to get the number of samples in a chunk.
Expand Down
98 changes: 50 additions & 48 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"golang.org/x/net/context"

Expand Down Expand Up @@ -149,18 +150,35 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch
}

// Get implements ChunkStore
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) {
logger := util.WithContext(ctx)
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
if through < from {
return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from)
}

filters, matchers := util.SplitFiltersAndMatchers(allMatchers)
// Fetch metric name chunks if the matcher is of type equal,
metricNameMatcher, matchers, ok := util.ExtractMetricNameMatcherFromMatchers(allMatchers)
if ok && metricNameMatcher.Type == metric.Equal {
return c.getMetricNameIterators(ctx, from, through, matchers, metricNameMatcher.Value)
}

// Otherwise we will create lazy iterators for all series in our index
return c.getSeriesIterators(ctx, from, through, matchers, metricNameMatcher)
}

// Fetch chunk descriptors (just ID really) from storage
chunks, err := c.lookupChunksByMatchers(ctx, from, through, matchers)
func (c *Store) getMetricNameIterators(ctx context.Context, from, through model.Time, allMatchers []*metric.LabelMatcher, metricName model.LabelValue) ([]local.SeriesIterator, error) {
chunks, err := c.getMetricNameChunks(ctx, from, through, allMatchers, metricName)
if err != nil {
return nil, promql.ErrStorage(err)
return nil, err
}
return chunksToIterators(chunks)
}

func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*metric.LabelMatcher, metricName model.LabelValue) ([]Chunk, error) {
logger := util.WithContext(ctx)
filters, matchers := util.SplitFiltersAndMatchers(allMatchers)
chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName)
if err != nil {
return nil, err
}

// Filter out chunks that are not in the selected time range.
Expand Down Expand Up @@ -208,69 +226,53 @@ outer:
return filteredChunks, nil
}

func (c *Store) lookupChunksByMatchers(ctx context.Context, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) {
metricNameMatcher, matchers, ok := util.ExtractMetricNameMatcherFromMatchers(matchers)

// Only lookup by metric name if the matcher is of type equal, otherwise we
// have to fetch chunks for all metric names as other metric names could match.
if ok && metricNameMatcher.Type == metric.Equal {
return c.lookupChunksByMetricName(ctx, from, through, matchers, metricNameMatcher.Value)
}

func (c *Store) getSeriesIterators(ctx context.Context, from, through model.Time, allMatchers []*metric.LabelMatcher, metricNameMatcher *metric.LabelMatcher) ([]local.SeriesIterator, error) {
// Get all series from the index
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

// If there is no metric name, we want return chunks for all metric names
metricNameQueries, err := c.schema.GetReadQueries(from, through, userID)
seriesQueries, err := c.schema.GetReadQueries(from, through, userID)
if err != nil {
return nil, err
}
metricNameEntries, err := c.lookupEntriesByQueries(ctx, metricNameQueries)
seriesEntries, err := c.lookupEntriesByQueries(ctx, seriesQueries)
if err != nil {
return nil, err
}

incomingChunkSets := make(chan ByKey)
incomingErrors := make(chan error)
skippedMetricNames := 0

for _, metricNameEntry := range metricNameEntries {
metricName, err := parseMetricNameRangeValue(metricNameEntry.RangeValue, metricNameEntry.Value)
lazyIterators := make([]local.SeriesIterator, 0, len(seriesEntries))
outer:
for _, seriesEntry := range seriesEntries {
metric, err := parseSeriesRangeValue(seriesEntry.RangeValue, seriesEntry.Value)
if err != nil {
return nil, err
}

// We are fetching all metric name chunks, however if there is a metricNameMatcher,
// we only want metric names that match
if ok && !metricNameMatcher.Match(metricName) {
skippedMetricNames++
continue
// Apply metric name matcher
if metricNameMatcher != nil && !metricNameMatcher.Match(metric[metricNameMatcher.Name]) {
continue outer
}

go func(metricName model.LabelValue) {
chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName)
if err != nil {
incomingErrors <- err
} else {
incomingChunkSets <- chunks
// Apply matchers
for _, matcher := range allMatchers {
if !matcher.Match(metric[matcher.Name]) {
continue outer
}
}(metricName)
}
}

var chunkSets []ByKey
var lastErr error
for i := 0; i < (len(metricNameEntries) - skippedMetricNames); i++ {
select {
case incoming := <-incomingChunkSets:
chunkSets = append(chunkSets, incoming)
case err := <-incomingErrors:
lastErr = err
orgID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
newIterator, err := NewLazySeriesIterator(c, metric, from, through, orgID)
if err != nil {
return nil, err
}
}

return nWayUnion(chunkSets), lastErr
lazyIterators = append(lazyIterators, newIterator)
}
return lazyIterators, nil
}

func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*metric.LabelMatcher, metricName model.LabelValue) ([]Chunk, error) {
Expand Down
Loading

0 comments on commit aea9137

Please sign in to comment.