Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Change Tag ID generation to avoid possible collisions #1286

Merged
merged 16 commits into from
Jan 29, 2019
Merged
2 changes: 2 additions & 0 deletions src/query/api/v1/handler/json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func NewWriteJSONHandler(store storage.Storage) http.Handler {

// WriteQuery represents the write request from the user
// NB(braskin): support only writing one datapoint for now
// TODO: build this out to be a legitimate batched endpoint, change
// Tags to take a list of tag structs
type WriteQuery struct {
Tags map[string]string `json:"tags" validate:"nonzero"`
Timestamp string `json:"timestamp" validate:"nonzero"`
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func renderM3QLResultsJSON(
for _, s := range series {
jw.BeginObject()
jw.BeginObjectField("target")
jw.WriteString(s.Name())
jw.WriteString(string(s.Name()))

jw.BeginObjectField("tags")
jw.BeginObject()
Expand Down
36 changes: 20 additions & 16 deletions src/query/api/v1/handler/prometheus/native/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ func TestRenderResultsJSON(t *testing.T) {
buffer := bytes.NewBuffer(nil)
params := models.RequestParams{}
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
ts.NewSeries([]byte("foo"),
ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries([]byte("bar"),
ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
}

renderResultsJSON(buffer, series, params)
Expand Down Expand Up @@ -187,14 +189,16 @@ func TestRenderInstantaneousResultsJSON(t *testing.T) {
start := time.Unix(1535948880, 0)
buffer := bytes.NewBuffer(nil)
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
ts.NewSeries([]byte("foo"),
ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries([]byte("bar"),
ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
}

renderResultsInstantaneousJSON(buffer, series)
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/validator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func tsListToMap(tsList []*ts.Series) map[string]*ts.Series {
for _, series := range tsList {
series.Tags = series.Tags.Normalize()
id := series.Tags.ID()
tsMap[id] = series
tsMap[string(id)] = series
}

return tsMap
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestSearchResponse(t *testing.T) {
results, err := searchHandler.search(context.TODO(), generateSearchReq(), &opts)
require.NoError(t, err)

assert.Equal(t, testID, results.Metrics[0].ID)
assert.Equal(t, []byte(testID), results.Metrics[0].ID)
expected := test.TagSliceToTags([]models.Tag{{Name: []byte("foo"), Value: []byte("bar")}})
assert.Equal(t, expected.Tags, results.Metrics[0].Tags.Tags)
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/benchmark/benchmarker/main/convert_to_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func calculateCardinality(fromFile string, logger *zap.Logger) (int, error) {
ts, _ := marshalTSDBToProm(tsdb)
tags := storage.PromLabelsToM3Tags(ts.GetLabels(), models.NewTagOptions())
id := tags.ID()
tagsSeen[id]++
tagsSeen[string(id)]++

read++
if read%marker == 0 {
Expand Down
4 changes: 2 additions & 2 deletions src/query/block/scalar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestScalarBlock(t *testing.T) {
}

assert.Equal(t, 0, series.Meta.Tags.Len())
assert.Equal(t, "", series.Meta.Name)
assert.Equal(t, []byte(nil), series.Meta.Name)

require.False(t, seriesIter.Next())
series, err = seriesIter.Current()
Expand All @@ -117,5 +117,5 @@ func verifyMetas(t *testing.T, meta Metadata, seriesMeta []SeriesMeta) {
assert.Len(t, seriesMeta, 1)
sMeta := seriesMeta[0]
assert.Equal(t, 0, sMeta.Tags.Len())
assert.Equal(t, "", sMeta.Name)
assert.Equal(t, []byte(nil), sMeta.Name)
}
2 changes: 1 addition & 1 deletion src/query/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type UnconsolidatedBlock interface {
// SeriesMeta is metadata data for the series
type SeriesMeta struct {
Tags models.Tags
Name string
Name []byte
}

// Iterator is the base iterator
Expand Down
16 changes: 9 additions & 7 deletions src/query/functions/aggregation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func NewAggregationOp(

// baseOp stores required properties for the baseOp
type baseOp struct {
params NodeParams
opType string
aggFn aggregationFn
params NodeParams
opType string
opTypeBytes []byte
aggFn aggregationFn
}

// OpType for the operator
Expand All @@ -97,9 +98,10 @@ func (o baseOp) Node(controller *transform.Controller, _ transform.Options) tran

func newBaseOp(params NodeParams, opType string, aggFn aggregationFn) baseOp {
return baseOp{
params: params,
opType: opType,
aggFn: aggFn,
params: params,
opType: opType,
opTypeBytes: []byte(opType),
aggFn: aggFn,
}
}

Expand All @@ -121,7 +123,7 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error {
buckets, metas := utils.GroupSeries(
params.MatchingTags,
params.Without,
n.op.opType,
n.op.opTypeBytes,
seriesMetas,
)
meta.Tags, metas = utils.DedupeMetadata(metas)
Expand Down
26 changes: 14 additions & 12 deletions src/query/functions/aggregation/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (
Duration: time.Minute * 5,
StepSize: time.Minute,
}

typeBytes = []byte(StandardDeviationType)
)

func processAggregationOp(t *testing.T, op parser.Params) *executor.SinkNode {
Expand Down Expand Up @@ -86,9 +88,9 @@ func TestFunctionFilteringWithA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("1")}})},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("2")}})},
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("1")}})},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("2")}})},
{Name: typeBytes, Tags: models.EmptyTags()},
}
expectedMetaTags := models.EmptyTags()

Expand All @@ -113,9 +115,9 @@ func TestFunctionFilteringWithoutA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
{Name: typeBytes, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
}

expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})
Expand All @@ -136,7 +138,7 @@ func TestFunctionFilteringWithD(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: models.EmptyTags()},
}

expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})
Expand Down Expand Up @@ -166,11 +168,11 @@ func TestFunctionFilteringWithoutD(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})},
}
expectedMetaTags := models.EmptyTags()

Expand Down
14 changes: 8 additions & 6 deletions src/query/functions/aggregation/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func NewCountValuesOp(

// countValuesOp stores required properties for count values ops
type countValuesOp struct {
params NodeParams
opType string
params NodeParams
opType string
opTypeBytes []byte
}

// OpType for the operator
Expand All @@ -78,8 +79,9 @@ func (o countValuesOp) Node(

func newCountValuesOp(params NodeParams, opType string) countValuesOp {
return countValuesOp{
params: params,
opType: opType,
params: params,
opType: opType,
opTypeBytes: []byte(opType),
}
}

Expand Down Expand Up @@ -152,7 +154,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {
buckets, metas := utils.GroupSeries(
params.MatchingTags,
params.Without,
n.op.opType,
n.op.opTypeBytes,
seriesMetas,
)

Expand Down Expand Up @@ -192,7 +194,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {
for k, v := range bucketBlock.indexMapping {
// Add the metas of this bucketBlock right after the previous block
blockMetas[v+previousBucketBlockIndex] = block.SeriesMeta{
Name: n.op.OpType(),
Name: n.op.opTypeBytes,
Tags: metas[bucketIndex].Tags.Clone().AddTag(models.Tag{
Name: []byte(n.op.params.StringParameter),
Value: utils.FormatFloatToBytes(k),
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/aggregation/count_values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func tagsToSeriesMeta(tags []models.Tags) []block.SeriesMeta {
expectedMetas := make([]block.SeriesMeta, len(tags))
for i, m := range tags {
expectedMetas[i] = block.SeriesMeta{
Name: CountValuesType,
Name: []byte(CountValuesType),
Tags: m,
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/query/functions/aggregation/quantile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
"github.com/stretchr/testify/require"
)

var (
typeBytesQuantile = []byte(QuantileType)
)

func TestQuantileFn(t *testing.T) {
values := []float64{3.1, 100, 200, 300, 2.1, 800, 1.1, 4.1, 5.1}
// NB Taken values by bucket: [3.1, 2.1, 1.1, 4.1]
Expand Down Expand Up @@ -147,9 +151,9 @@ func TestQuantileFunctionFilteringWithoutA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: QuantileType, Tags: models.EmptyTags()},
{Name: QuantileType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: QuantileType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
{Name: typeBytesQuantile, Tags: models.EmptyTags()},
{Name: typeBytesQuantile, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: typeBytesQuantile, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
}
expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})

Expand Down
16 changes: 9 additions & 7 deletions src/query/functions/aggregation/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ func NewTakeOp(

// takeOp stores required properties for take ops
type takeOp struct {
params NodeParams
opType string
takeFunc takeFunc
params NodeParams
opType string
opTypeBytes []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we really need both opTypeBytes and opType, or could just store one and convert as needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah damn, thought I'd reverted these, good catch.

takeFunc takeFunc
}

// OpType for the operator
Expand All @@ -96,9 +97,10 @@ func (o takeOp) Node(

func newTakeOp(params NodeParams, opType string, takeFunc takeFunc) takeOp {
return takeOp{
params: params,
opType: opType,
takeFunc: takeFunc,
params: params,
opType: opType,
opTypeBytes: []byte(opType),
takeFunc: takeFunc,
}
}

Expand All @@ -123,7 +125,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error {
buckets, _ := utils.GroupSeries(
params.MatchingTags,
params.Without,
n.op.opType,
n.op.opTypeBytes,
seriesMetas,
)

Expand Down
4 changes: 2 additions & 2 deletions src/query/functions/binary/binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestScalars(t *testing.T) {
assert.Equal(t, 0, sink.Meta.Tags.Len())

assert.Len(t, sink.Metas, 1)
assert.Equal(t, "", sink.Metas[0].Name)
assert.Equal(t, []byte(nil), sink.Metas[0].Name)
assert.Equal(t, 0, sink.Metas[0].Tags.Len())
})
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestScalarsReturnBoolFalse(t *testing.T) {
assert.Equal(t, 0, sink.Meta.Tags.Len())

assert.Len(t, sink.Metas, 1)
assert.Equal(t, "", sink.Metas[0].Name)
assert.Equal(t, []byte(nil), sink.Metas[0].Name)
assert.Equal(t, 0, sink.Metas[0].Tags.Len())
})
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/functions/binary/or_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ func generateMetaDataWithTagsInRange(fromRange, toRange int) []block.SeriesMeta
length := toRange - fromRange
meta := make([]block.SeriesMeta, length)
for i := 0; i < length; i++ {
strIdx := fmt.Sprint(fromRange + i)
tags := test.TagSliceToTags([]models.Tag{{Name: []byte(strIdx), Value: []byte(strIdx)}})
idx := []byte(fmt.Sprint(fromRange + i))
tags := test.TagSliceToTags([]models.Tag{{Name: idx, Value: idx}})
meta[i] = block.SeriesMeta{
Tags: tags,
Name: strIdx,
Name: idx,
}
}
return meta
Expand Down
4 changes: 2 additions & 2 deletions src/query/functions/binary/unless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var unlessTests = []struct {
test.NewSeriesMeta("a", 3)[1:],
[][]float64{{3, 4}, {30, 40}},
test.NewSeriesMeta("a", 1)[0].Tags,
[]block.SeriesMeta{{Tags: models.EmptyTags(), Name: "a0"}},
[]block.SeriesMeta{{Tags: models.EmptyTags(), Name: []byte("a0")}},
[][]float64{{1, 2}},
nil,
},
Expand All @@ -149,7 +149,7 @@ var unlessTests = []struct {
test.NewSeriesMeta("a", 4)[1:],
[][]float64{{3, 4}, {30, 40}, {300, 400}},
test.NewSeriesMeta("a", 1)[0].Tags,
[]block.SeriesMeta{{Tags: models.EmptyTags(), Name: "a0"}},
[]block.SeriesMeta{{Tags: models.EmptyTags(), Name: []byte("a0")}},
[][]float64{{1, 2}},
nil,
},
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/utils/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func excludeKeysTags(tags models.Tags, matchingTags [][]byte) models.Tags {
func GroupSeries(
matchingTags [][]byte,
without bool,
opName string,
opName []byte,
metas []block.SeriesMeta,
) ([][]int, []block.SeriesMeta) {
var idFunc withKeysID
Expand Down
Loading