Skip to content

Commit

Permalink
[comparator] Support generating random histogram metrics (#2379)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored May 30, 2020
1 parent eec6a8e commit 63f5a1a
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/cmd/services/m3comparator/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func main() {
seriesLoader,
time.Hour*12,
time.Second*15,
5,
)
if err != nil {
logger.Error("could not create querier", zap.Error(err))
Expand Down
123 changes: 98 additions & 25 deletions src/cmd/services/m3comparator/main/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ import (
var _ m3.Querier = (*querier)(nil)

type querier struct {
iteratorOpts iteratorOptions
handler seriesLoadHandler
blockSize time.Duration
defaultResolution time.Duration
iteratorOpts iteratorOptions
handler seriesLoadHandler
blockSize time.Duration
defaultResolution time.Duration
histogramBucketCount uint
sync.Mutex
}

Expand All @@ -58,6 +59,7 @@ func newQuerier(
handler seriesLoadHandler,
blockSize time.Duration,
defaultResolution time.Duration,
histogramBucketCount uint,
) (*querier, error) {
if blockSize <= 0 {
return nil, fmt.Errorf("blockSize must be positive, got %d", blockSize)
Expand All @@ -66,10 +68,11 @@ func newQuerier(
return nil, fmt.Errorf("defaultResolution must be positive, got %d", defaultResolution)
}
return &querier{
iteratorOpts: iteratorOpts,
handler: handler,
blockSize: blockSize,
defaultResolution: defaultResolution,
iteratorOpts: iteratorOpts,
handler: handler,
blockSize: blockSize,
defaultResolution: defaultResolution,
histogramBucketCount: histogramBucketCount,
}, nil
}

Expand All @@ -85,15 +88,22 @@ type series struct {
func (q *querier) generateSeriesBlock(
start time.Time,
resolution time.Duration,
integerValues bool,
) seriesBlock {
numPoints := int(q.blockSize / resolution)
dps := make(seriesBlock, 0, numPoints)
for i := 0; i < numPoints; i++ {
stamp := start.Add(resolution * time.Duration(i))
var value float64
if integerValues {
value = float64(rand.Intn(1000))
} else {
value = rand.Float64()
}
dp := ts.Datapoint{
Timestamp: stamp,
TimestampNanos: xtime.ToUnixNano(stamp),
Value: rand.Float64(),
Value: value,
}

dps = append(dps, dp)
Expand All @@ -107,6 +117,7 @@ func (q *querier) generateSeries(
end time.Time,
resolution time.Duration,
tags parser.Tags,
integerValues bool,
) (series, error) {
numBlocks := int(math.Ceil(float64(end.Sub(start)) / float64(q.blockSize)))
if numBlocks == 0 {
Expand All @@ -115,7 +126,7 @@ func (q *querier) generateSeries(

blocks := make([]seriesBlock, 0, numBlocks)
for i := 0; i < numBlocks; i++ {
blocks = append(blocks, q.generateSeriesBlock(start, resolution))
blocks = append(blocks, q.generateSeriesBlock(start, resolution, integerValues))
start = start.Add(q.blockSize)
}

Expand Down Expand Up @@ -216,6 +227,10 @@ func (q *querier) generateRandomSeries(
series, err = q.generateMultiSeriesMetrics(string(matcher.Value), start, end)
return
}
if matched, _ := regexp.Match(`^histogram_\d+_bucket$`, matcher.Value); matched {
series, err = q.generateHistogramMetrics(string(matcher.Value), start, end)
return
}
}
}

Expand All @@ -239,9 +254,8 @@ func (q *querier) generateSingleSeriesMetrics(
actualGens []seriesGen
)

q.Lock()
defer q.Unlock()
rand.Seed(start.Unix())
unlock := q.lockAndSeed(start)
defer unlock()

metricNameTag := q.iteratorOpts.tagOptions.MetricName()
for _, matcher := range query.TagMatchers {
Expand Down Expand Up @@ -287,7 +301,7 @@ func (q *querier) generateSingleSeriesMetrics(
parser.NewTag("name", gen.name),
}

series, err := q.generateSeries(start, end, gen.res, tags)
series, err := q.generateSeries(start, end, gen.res, tags, false)
if err != nil {
return nil, err
}
Expand All @@ -309,20 +323,14 @@ func (q *querier) generateMultiSeriesMetrics(
return nil, err
}

q.Lock()
defer q.Unlock()
rand.Seed(start.Unix())
unlock := q.lockAndSeed(start)
defer unlock()

seriesList := make([]series, 0, seriesCount)
for i := 0; i < seriesCount; i++ {
tags := parser.Tags{
parser.NewTag(model.MetricNameLabel, metricsName),
parser.NewTag("id", strconv.Itoa(i)),
parser.NewTag("parity", strconv.Itoa(i%2)),
parser.NewTag("const", "x"),
}
for id := 0; id < seriesCount; id++ {
tags := multiSeriesTags(metricsName, id)

series, err := q.generateSeries(start, end, q.defaultResolution, tags)
series, err := q.generateSeries(start, end, q.defaultResolution, tags, false)
if err != nil {
return nil, err
}
Expand All @@ -333,6 +341,71 @@ func (q *querier) generateMultiSeriesMetrics(
return seriesList, nil
}

func (q *querier) generateHistogramMetrics(
metricsName string,
start time.Time,
end time.Time,
) ([]series, error) {
suffix := strings.TrimPrefix(metricsName, "histogram_")
countStr := strings.TrimSuffix(suffix, "_bucket")
seriesCount, err := strconv.Atoi(countStr)
if err != nil {
return nil, err
}

unlock := q.lockAndSeed(start)
defer unlock()

seriesList := make([]series, 0, seriesCount)
for id := 0; id < seriesCount; id++ {
le := 1.0
var previousSeriesBlocks []seriesBlock
for bucket := uint(0); bucket < q.histogramBucketCount; bucket++ {
tags := multiSeriesTags(metricsName, id)
leStr := "+Inf"
if bucket < q.histogramBucketCount - 1 {
leStr = strconv.FormatFloat(le, 'f', -1, 64)
}
leTag := parser.NewTag("le", leStr)
tags = append(tags, leTag)
le *= 10

series, err := q.generateSeries(start, end, q.defaultResolution, tags, true)
if err != nil {
return nil, err
}

for i, prevBlock := range previousSeriesBlocks {
for j, prevValue := range prevBlock {
series.blocks[i][j].Value += prevValue.Value
}
}

seriesList = append(seriesList, series)

previousSeriesBlocks = series.blocks
}
}

return seriesList, nil
}

func multiSeriesTags(metricsName string, id int) parser.Tags {
return parser.Tags{
parser.NewTag(model.MetricNameLabel, metricsName),
parser.NewTag("id", strconv.Itoa(id)),
parser.NewTag("parity", strconv.Itoa(id%2)),
parser.NewTag("const", "x"),
}
}

func (q *querier) lockAndSeed(start time.Time) func() {
q.Lock()
rand.Seed(start.Unix())

return q.Unlock
}

// SearchCompressed fetches matching tags based on a query.
func (q *querier) SearchCompressed(
ctx context.Context,
Expand Down
118 changes: 109 additions & 9 deletions src/cmd/services/m3comparator/main/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
defaultResolution = time.Second * 30
metricsName = "preloaded"
predefinedSeriesCount = 10
histogramBucketCount = 4
)

func TestFetchCompressed(t *testing.T) {
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestFetchCompressed(t *testing.T) {
}
}

func TestFetchCompressedGeneratesRandomData(t *testing.T) {
func TestGenerateRandomSeries(t *testing.T) {
tests := []struct {
name string
givenQuery *storage.FetchQuery
Expand Down Expand Up @@ -187,6 +188,69 @@ func TestFetchCompressedGeneratesRandomData(t *testing.T) {
},
},
},
{
name: "histogram metrics",
givenQuery: matcherQuery(t, metricNameTag, "histogram_2_bucket"),
wantSeries: []tagMap{
{
metricNameTag: "histogram_2_bucket",
"const": "x",
"id": "0",
"parity": "0",
"le": "1",
},
{
metricNameTag: "histogram_2_bucket",
"const": "x",
"id": "0",
"parity": "0",
"le": "10",
},
{
metricNameTag: "histogram_2_bucket",
"const": "x",
"id": "0",
"parity": "0",
"le": "100",
},
{
metricNameTag: "histogram_2_bucket",
"const": "x",
"id": "0",
"parity": "0",
"le": "+Inf",
},

{
metricNameTag: "histogram_2_bucket",
"const": "x",
"id": "1",
"parity": "1",
"le": "1",
},
{
metricNameTag: "histogram_2_bucket",
"const": "x",
"id": "1",
"parity": "1",
"le": "10",
},
{
metricNameTag: "histogram_2_bucket",
"const": "x",
"id": "1",
"parity": "1",
"le": "100",
},
{
metricNameTag: "histogram_2_bucket",
"const": "x",
"id": "1",
"parity": "1",
"le": "+Inf",
},
},
},
{
name: "apply tag filter",
givenQuery: and(
Expand Down Expand Up @@ -214,7 +278,8 @@ func TestFetchCompressedGeneratesRandomData(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

querier, _ := newQuerier(iteratorOpts, emptySeriesLoader(ctrl), blockSize, defaultResolution)
querier, err := setupRandomGenQuerier(ctrl)
assert.NoError(t, err)

result, cleanup, err := querier.FetchCompressed(nil, tt.givenQuery, nil)
assert.NoError(t, err)
Expand All @@ -230,6 +295,39 @@ func TestFetchCompressedGeneratesRandomData(t *testing.T) {
}
}

func TestHistogramBucketsAddUp(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

querier, err := setupRandomGenQuerier(ctrl)
assert.NoError(t, err)

histogramQuery := matcherQuery(t, metricNameTag, "histogram_1_bucket")
result, cleanup, err := querier.FetchCompressed(nil, histogramQuery, nil)
assert.NoError(t, err)
defer cleanup()

require.Equal(t, histogramBucketCount, result.SeriesIterators.Len(), "number of histogram buckets")

iters := result.SeriesIterators.Iters()
iter0 := iters[0]
for iter0.Next() {
v0, t1, _ := iter0.Current()
for i := 1; i < histogramBucketCount; i++ {
iter := iters[i]
require.True(t, iter.Next(), "all buckets must have the same length")
vi, ti, _ := iter.Current()
assert.True(t, vi.Value >= v0.Value, "bucket values must be non decreasing")
assert.Equal(t, v0.Timestamp, vi.Timestamp, "bucket values timestamps must match")
assert.Equal(t, t1, ti)
}
}

for _, iter := range iters {
require.False(t, iter.Next(), "all buckets must have the same length")
}
}

func matcherQuery(t *testing.T, matcherName, matcherValue string) *storage.FetchQuery {
matcher, err := models.NewMatcher(models.MatchEqual, []byte(matcherName), []byte(matcherValue))
assert.NoError(t, err)
Expand All @@ -251,13 +349,6 @@ func and(query1, query2 *storage.FetchQuery) *storage.FetchQuery {
}
}

func emptySeriesLoader(ctrl *gomock.Controller) seriesLoadHandler {
iters := encoding.NewMockSeriesIterators(ctrl)
iters.EXPECT().Len().Return(0).AnyTimes()

return &testSeriesLoadHandler{iters}
}

func extractTags(seriesIter encoding.SeriesIterator) tagMap {
tagsIter := seriesIter.Tags().Duplicate()
defer tagsIter.Close()
Expand Down Expand Up @@ -315,3 +406,12 @@ func setupQuerier(ctrl *gomock.Controller, query *storage.FetchQuery) *querier {

return &querier{iteratorOpts: iteratorOpts, handler: seriesLoader}
}

func setupRandomGenQuerier(ctrl *gomock.Controller) (*querier, error) {
iters := encoding.NewMockSeriesIterators(ctrl)
iters.EXPECT().Len().Return(0).AnyTimes()

emptySeriesLoader := &testSeriesLoadHandler{iters}

return newQuerier(iteratorOpts, emptySeriesLoader, blockSize, defaultResolution, histogramBucketCount)
}

0 comments on commit 63f5a1a

Please sign in to comment.