Skip to content

Commit

Permalink
[query] Ignore name tags for matching binary functions (#1604)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored May 6, 2019
1 parent fce9cae commit b1b848e
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 11 deletions.
2 changes: 0 additions & 2 deletions src/query/api/v1/handler/graphite/find_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,10 @@ func (m *completeTagQueryMatcher) Matches(x interface{}) bool {
}

if !q.Start.Equal(from.t) {
fmt.Println("Not equal start", q.Start, from.t)
return false
}

if !q.End.Equal(until.t) {
fmt.Println("Not equal end", q.End, until.t)
return false
}

Expand Down
15 changes: 10 additions & 5 deletions src/query/functions/binary/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ func processSingleBlock(
return nil, err
}

builder, err := controller.BlockBuilder(queryCtx, it.Meta(), it.SeriesMeta())
meta, metas := it.Meta(), it.SeriesMeta()
meta, metas = removeNameTags(meta, metas)
builder, err := controller.BlockBuilder(queryCtx, meta, metas)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -171,13 +173,16 @@ func processBothSeries(
return nil, errMismatchedStepCounts
}

lMeta, rMeta := lIter.Meta(), rIter.Meta()
lMeta, lSeriesMeta := lIter.Meta(), lIter.SeriesMeta()
lMeta, lSeriesMeta = removeNameTags(lMeta, lSeriesMeta)

lSeriesMeta := utils.FlattenMetadata(lMeta, lIter.SeriesMeta())
rSeriesMeta := utils.FlattenMetadata(rMeta, rIter.SeriesMeta())
rMeta, rSeriesMeta := rIter.Meta(), rIter.SeriesMeta()
rMeta, rSeriesMeta = removeNameTags(rMeta, rSeriesMeta)

takeLeft, correspondingRight, lSeriesMeta := intersect(matching, lSeriesMeta, rSeriesMeta)
lSeriesMeta = utils.FlattenMetadata(lMeta, lSeriesMeta)
rSeriesMeta = utils.FlattenMetadata(rMeta, rSeriesMeta)

takeLeft, correspondingRight, lSeriesMeta := intersect(matching, lSeriesMeta, rSeriesMeta)
lMeta.Tags, lSeriesMeta = utils.DedupeMetadata(lSeriesMeta)

// Use metas from only taken left series
Expand Down
80 changes: 78 additions & 2 deletions src/query/functions/binary/binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,85 @@ func TestBothSeries(t *testing.T) {

// Extract duped expected metas
expectedMeta := block.Metadata{Bounds: bounds}
expectedMeta.Tags, tt.expectedMetas = utils.DedupeMetadata(tt.expectedMetas)
var expectedMetas []block.SeriesMeta
expectedMeta.Tags, expectedMetas = utils.DedupeMetadata(tt.expectedMetas)
expectedMeta, expectedMetas = removeNameTags(expectedMeta, expectedMetas)
assert.Equal(t, expectedMeta, sink.Meta)
assert.Equal(t, tt.expectedMetas, sink.Metas)
assert.Equal(t, expectedMetas, sink.Metas)
})
}
}

func TestBinaryFunctionWithDifferentNames(t *testing.T) {
now := time.Now()

meta := func(bounds models.Bounds, name string) block.Metadata {
return block.Metadata{
Bounds: bounds,
Tags: models.NewTags(1, models.NewTagOptions()).SetName([]byte(name)),
}
}

var (
bounds = models.Bounds{
Start: now,
Duration: time.Minute * 3,
StepSize: time.Minute,
}

lhsMeta = meta(bounds, "left")
lhsMetas = test.NewSeriesMeta("a", 2)
lhs = [][]float64{{1, 2, 3}, {4, 5, 6}}
left = test.NewBlockFromValuesWithMetaAndSeriesMeta(
lhsMeta, lhsMetas, lhs,
)

rhsMeta = meta(bounds, "right")
rhsMetas = test.NewSeriesMeta("a", 3)[1:]
rhs = [][]float64{{10, 20, 30}, {40, 50, 60}}
right = test.NewBlockFromValuesWithMetaAndSeriesMeta(
rhsMeta, rhsMetas, rhs,
)

expected = [][]float64{{14, 25, 36}}
)

op, err := NewOp(
PlusType,
NodeParams{
LNode: parser.NodeID(0),
RNode: parser.NodeID(1),
LIsScalar: false,
RIsScalar: false,
VectorMatching: &VectorMatching{},
},
)
require.NoError(t, err)

c, sink := executor.NewControllerWithSink(parser.NodeID(2))
node := op.(baseOp).Node(c, transform.Options{})

err = node.Process(models.NoopQueryContext(), parser.NodeID(0), left)
require.NoError(t, err)

err = node.Process(models.NoopQueryContext(), parser.NodeID(1), right)
require.NoError(t, err)

test.EqualsWithNans(t, expected, sink.Values)

// Extract duped expected metas
expectedMeta := block.Metadata{
Bounds: bounds,
Tags: models.NewTags(1, models.NewTagOptions()).AddTag(toTag("a1", "a1")),
}

expectedMetas := []block.SeriesMeta{
block.SeriesMeta{
Name: []byte("a1"),
Tags: models.EmptyTags(),
},
}

assert.Equal(t, expectedMeta, sink.Meta)
assert.Equal(t, expectedMetas, sink.Metas)
}
13 changes: 13 additions & 0 deletions src/query/functions/binary/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,16 @@ func appendValuesAtIndices(idxArray []int, iter block.StepIter, builder block.Bu

return iter.Err()
}

// NB: binary functions should remove the name tag from relevant series.
func removeNameTags(
meta block.Metadata,
metas []block.SeriesMeta,
) (block.Metadata, []block.SeriesMeta) {
meta.Tags = meta.Tags.WithoutName()
for i, sm := range metas {
metas[i].Tags = sm.Tags.WithoutName()
}

return meta, metas
}
50 changes: 50 additions & 0 deletions src/query/functions/binary/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,53 @@ func TestCombineMetaAndSeriesMetaError(t *testing.T) {
_, _, _, err := combineMetaAndSeriesMeta(meta, otherMeta, metas, otherMetas)
assert.Error(t, err, errMismatchedBounds.Error())
}

func toTag(n, v string) models.Tag {
return models.Tag{Name: []byte(n), Value: []byte(v)}
}

func TestRemoveNameTags(t *testing.T) {
mTags := models.NewTags(2, models.NewTagOptions()).AddTags([]models.Tag{
toTag("foo", "bar"),
}).SetName([]byte("baz"))

meta := block.Metadata{
Tags: mTags,
}

firstTags := models.NewTags(2, models.NewTagOptions()).AddTags([]models.Tag{
toTag("qux", "qaz"),
}).SetName([]byte("quail"))

secondTags := models.NewTags(1, models.NewTagOptions()).AddTags([]models.Tag{
toTag("foobar", "baz"),
})

metas := []block.SeriesMeta{
{Tags: firstTags},
{Tags: secondTags},
}

// NB: ensure the name tags exist initially
ex := []models.Tag{toTag("__name__", "baz"), toTag("foo", "bar")}
assert.Equal(t, ex, meta.Tags.Tags)

ex = []models.Tag{toTag("__name__", "quail"), toTag("qux", "qaz")}
assert.Equal(t, ex, metas[0].Tags.Tags)

ex = []models.Tag{toTag("foobar", "baz")}
assert.Equal(t, ex, metas[1].Tags.Tags)

meta, metas = removeNameTags(meta, metas)

// NB: ensure name tags are removed
ex = []models.Tag{toTag("foo", "bar")}
assert.Equal(t, ex, meta.Tags.Tags)

ex = []models.Tag{toTag("qux", "qaz")}
assert.Equal(t, ex, metas[0].Tags.Tags)

ex = []models.Tag{toTag("foobar", "baz")}
assert.Equal(t, ex, metas[1].Tags.Tags)

}
1 change: 0 additions & 1 deletion src/query/models/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,6 @@ func BenchmarkIDs(b *testing.B) {
}
})
}
fmt.Println()
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/test/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewBlockFromValuesWithSeriesMeta(
seriesMeta []block.SeriesMeta,
seriesValues [][]float64,
) block.Block {
blockMeta := block.Metadata{Bounds: bounds}
blockMeta := block.Metadata{Bounds: bounds, Tags: models.NewTags(0, models.NewTagOptions())}

return NewBlockFromValuesWithMetaAndSeriesMeta(blockMeta, seriesMeta, seriesValues)
}
Expand Down

0 comments on commit b1b848e

Please sign in to comment.