Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Change Tag ID generation to avoid possible collisions #1286

Merged
merged 16 commits into from
Jan 29, 2019
Merged
21 changes: 20 additions & 1 deletion src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,16 @@ type RPCConfiguration struct {
// Currently only name, but can expand to cover deduplication settings, or other
// relevant options.
type TagOptionsConfiguration struct {
// Version specifies the version number of tag options. Defaults to 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much value does having Version in addition to Scheme add? It seems like just changing Scheme to TypeQuoted would be a more straightforward way of configuring us to use TypeQuoted than changing the version number and assuming the default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I'm assuming we'll make csv the actual default when we do a major version bump?

Copy link
Collaborator

@robskillington robskillington Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we will have a point in time where we do based on a "YAML" config version change which type (i.e. we'll use quoted by default), but we don't have that mechanism right now.

So removing version makes sense.

Version int `yaml:"version"`

// MetricName specifies the tag name that corresponds to the metric's name tag
// If not provided, defaults to `__name__`
// If not provided, defaults to `__name__`.
MetricName string `yaml:"metricName"`

// Scheme deterimnes the default ID generation scheme.
// If version is 0, defaults to TypeLegacy; otherwise, defaults to TypeQuoted.
Scheme models.IDSchemeType `yaml:"idScheme"`
}

// TagOptionsFromConfig translates tag option configuration into tag options.
Expand All @@ -157,6 +164,18 @@ func TagOptionsFromConfig(cfg TagOptionsConfiguration) (models.TagOptions, error
opts = opts.SetMetricName([]byte(name))
}

version := cfg.Version
opts = opts.SetVersion(version)

if cfg.Scheme == models.TypeDefault {
if version == 0 {
cfg.Scheme = models.TypeLegacy
} else {
cfg.Scheme = models.TypeQuoted
}
}

opts = opts.SetIDSchemeType(cfg.Scheme)
if err := opts.Validate(); err != nil {
return nil, err
}
Expand Down
33 changes: 33 additions & 0 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ package config
import (
"testing"

"github.com/m3db/m3/src/query/models"
xconfig "github.com/m3db/m3x/config"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
)

func TestTagOptionsFromEmptyConfig(t *testing.T) {
Expand Down Expand Up @@ -94,3 +96,34 @@ func TestConfigValidation(t *testing.T) {
})
}
}

func TestDefaultTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
require.NoError(t, yaml.Unmarshal([]byte(""), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 0, opts.Version())
assert.Equal(t, []byte("__name__"), opts.MetricName())
assert.Equal(t, models.TypeLegacy, opts.IDSchemeType())
}

func TestDefaultTagOptionsConfigVersion1(t *testing.T) {
var cfg TagOptionsConfiguration
require.NoError(t, yaml.Unmarshal([]byte("version: 1"), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 1, opts.Version())
assert.Equal(t, []byte("__name__"), opts.MetricName())
assert.Equal(t, models.TypeQuoted, opts.IDSchemeType())
}

func TestTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
config := "version: 0\nmetricName: abcdefg\nidScheme: prependMeta"
require.NoError(t, yaml.Unmarshal([]byte(config), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 0, opts.Version())
assert.Equal(t, []byte("abcdefg"), opts.MetricName())
assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType())
}
2 changes: 2 additions & 0 deletions src/query/api/v1/handler/json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func NewWriteJSONHandler(store storage.Storage) http.Handler {

// WriteQuery represents the write request from the user
// NB(braskin): support only writing one datapoint for now
// TODO: build this out to be a legitimate batched endpoint, change
// Tags to take a list of tag structs
type WriteQuery struct {
Tags map[string]string `json:"tags" validate:"nonzero"`
Timestamp string `json:"timestamp" validate:"nonzero"`
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func renderM3QLResultsJSON(
for _, s := range series {
jw.BeginObject()
jw.BeginObjectField("target")
jw.WriteString(s.Name())
jw.WriteString(string(s.Name()))

jw.BeginObjectField("tags")
jw.BeginObject()
Expand Down
36 changes: 20 additions & 16 deletions src/query/api/v1/handler/prometheus/native/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ func TestRenderResultsJSON(t *testing.T) {
buffer := bytes.NewBuffer(nil)
params := models.RequestParams{}
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
ts.NewSeries([]byte("foo"),
ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries([]byte("bar"),
ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
}

renderResultsJSON(buffer, series, params)
Expand Down Expand Up @@ -187,14 +189,16 @@ func TestRenderInstantaneousResultsJSON(t *testing.T) {
start := time.Unix(1535948880, 0)
buffer := bytes.NewBuffer(nil)
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
ts.NewSeries([]byte("foo"),
ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries([]byte("bar"),
ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
}

renderResultsInstantaneousJSON(buffer, series)
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/validator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func tsListToMap(tsList []*ts.Series) map[string]*ts.Series {
for _, series := range tsList {
series.Tags = series.Tags.Normalize()
id := series.Tags.ID()
tsMap[id] = series
tsMap[string(id)] = series
}

return tsMap
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestSearchResponse(t *testing.T) {
results, err := searchHandler.search(context.TODO(), generateSearchReq(), &opts)
require.NoError(t, err)

assert.Equal(t, testID, results.Metrics[0].ID)
assert.Equal(t, []byte(testID), results.Metrics[0].ID)
expected := test.TagSliceToTags([]models.Tag{{Name: []byte("foo"), Value: []byte("bar")}})
assert.Equal(t, expected.Tags, results.Metrics[0].Tags.Tags)
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/benchmark/benchmarker/main/convert_to_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func calculateCardinality(fromFile string, logger *zap.Logger) (int, error) {
ts, _ := marshalTSDBToProm(tsdb)
tags := storage.PromLabelsToM3Tags(ts.GetLabels(), models.NewTagOptions())
id := tags.ID()
tagsSeen[id]++
tagsSeen[string(id)]++

read++
if read%marker == 0 {
Expand Down
4 changes: 2 additions & 2 deletions src/query/block/scalar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestScalarBlock(t *testing.T) {
}

assert.Equal(t, 0, series.Meta.Tags.Len())
assert.Equal(t, "", series.Meta.Name)
assert.Equal(t, []byte(nil), series.Meta.Name)

require.False(t, seriesIter.Next())
series, err = seriesIter.Current()
Expand All @@ -117,5 +117,5 @@ func verifyMetas(t *testing.T, meta Metadata, seriesMeta []SeriesMeta) {
assert.Len(t, seriesMeta, 1)
sMeta := seriesMeta[0]
assert.Equal(t, 0, sMeta.Tags.Len())
assert.Equal(t, "", sMeta.Name)
assert.Equal(t, []byte(nil), sMeta.Name)
}
2 changes: 1 addition & 1 deletion src/query/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type UnconsolidatedBlock interface {
// SeriesMeta is metadata data for the series
type SeriesMeta struct {
Tags models.Tags
Name string
Name []byte
}

// Iterator is the base iterator
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/aggregation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error {
buckets, metas := utils.GroupSeries(
params.MatchingTags,
params.Without,
n.op.opType,
[]byte(n.op.opType),
seriesMetas,
)
meta.Tags, metas = utils.DedupeMetadata(metas)
Expand Down
26 changes: 14 additions & 12 deletions src/query/functions/aggregation/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (
Duration: time.Minute * 5,
StepSize: time.Minute,
}

typeBytes = []byte(StandardDeviationType)
)

func processAggregationOp(t *testing.T, op parser.Params) *executor.SinkNode {
Expand Down Expand Up @@ -86,9 +88,9 @@ func TestFunctionFilteringWithA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("1")}})},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("2")}})},
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("1")}})},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("2")}})},
{Name: typeBytes, Tags: models.EmptyTags()},
}
expectedMetaTags := models.EmptyTags()

Expand All @@ -113,9 +115,9 @@ func TestFunctionFilteringWithoutA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
{Name: typeBytes, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
}

expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})
Expand All @@ -136,7 +138,7 @@ func TestFunctionFilteringWithD(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: models.EmptyTags()},
}

expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})
Expand Down Expand Up @@ -166,11 +168,11 @@ func TestFunctionFilteringWithoutD(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})},
}
expectedMetaTags := models.EmptyTags()

Expand Down
4 changes: 2 additions & 2 deletions src/query/functions/aggregation/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {
buckets, metas := utils.GroupSeries(
params.MatchingTags,
params.Without,
n.op.opType,
[]byte(n.op.opType),
seriesMetas,
)

Expand Down Expand Up @@ -192,7 +192,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {
for k, v := range bucketBlock.indexMapping {
// Add the metas of this bucketBlock right after the previous block
blockMetas[v+previousBucketBlockIndex] = block.SeriesMeta{
Name: n.op.OpType(),
Name: []byte(n.op.opType),
Tags: metas[bucketIndex].Tags.Clone().AddTag(models.Tag{
Name: []byte(n.op.params.StringParameter),
Value: utils.FormatFloatToBytes(k),
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/aggregation/count_values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func tagsToSeriesMeta(tags []models.Tags) []block.SeriesMeta {
expectedMetas := make([]block.SeriesMeta, len(tags))
for i, m := range tags {
expectedMetas[i] = block.SeriesMeta{
Name: CountValuesType,
Name: []byte(CountValuesType),
Tags: m,
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/query/functions/aggregation/quantile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
"github.com/stretchr/testify/require"
)

var (
typeBytesQuantile = []byte(QuantileType)
)

func TestQuantileFn(t *testing.T) {
values := []float64{3.1, 100, 200, 300, 2.1, 800, 1.1, 4.1, 5.1}
// NB Taken values by bucket: [3.1, 2.1, 1.1, 4.1]
Expand Down Expand Up @@ -147,9 +151,9 @@ func TestQuantileFunctionFilteringWithoutA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: QuantileType, Tags: models.EmptyTags()},
{Name: QuantileType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: QuantileType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
{Name: typeBytesQuantile, Tags: models.EmptyTags()},
{Name: typeBytesQuantile, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: typeBytesQuantile, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
}
expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})

Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/aggregation/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error {
buckets, _ := utils.GroupSeries(
params.MatchingTags,
params.Without,
n.op.opType,
[]byte(n.op.opType),
seriesMetas,
)

Expand Down
4 changes: 2 additions & 2 deletions src/query/functions/binary/binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestScalars(t *testing.T) {
assert.Equal(t, 0, sink.Meta.Tags.Len())

assert.Len(t, sink.Metas, 1)
assert.Equal(t, "", sink.Metas[0].Name)
assert.Equal(t, []byte(nil), sink.Metas[0].Name)
assert.Equal(t, 0, sink.Metas[0].Tags.Len())
})
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestScalarsReturnBoolFalse(t *testing.T) {
assert.Equal(t, 0, sink.Meta.Tags.Len())

assert.Len(t, sink.Metas, 1)
assert.Equal(t, "", sink.Metas[0].Name)
assert.Equal(t, []byte(nil), sink.Metas[0].Name)
assert.Equal(t, 0, sink.Metas[0].Tags.Len())
})
}
Expand Down
8 changes: 6 additions & 2 deletions src/query/functions/binary/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ type VectorMatching struct {
// ignoring the provided labels. If on, then the given labels are only used instead.
func HashFunc(on bool, names ...[]byte) func(models.Tags) uint64 {
if on {
return func(tags models.Tags) uint64 { return tags.IDWithKeys(names...) }
return func(tags models.Tags) uint64 {
return tags.TagsWithKeys(names).HashedID()
}
}

return func(tags models.Tags) uint64 { return tags.IDWithExcludes(names...) }
return func(tags models.Tags) uint64 {
return tags.TagsWithoutKeys(names).HashedID()
}
}

const initIndexSliceLength = 10
Expand Down
6 changes: 3 additions & 3 deletions src/query/functions/binary/or_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ func generateMetaDataWithTagsInRange(fromRange, toRange int) []block.SeriesMeta
length := toRange - fromRange
meta := make([]block.SeriesMeta, length)
for i := 0; i < length; i++ {
strIdx := fmt.Sprint(fromRange + i)
tags := test.TagSliceToTags([]models.Tag{{Name: []byte(strIdx), Value: []byte(strIdx)}})
idx := []byte(fmt.Sprint(fromRange + i))
tags := test.TagSliceToTags([]models.Tag{{Name: idx, Value: idx}})
meta[i] = block.SeriesMeta{
Tags: tags,
Name: strIdx,
Name: idx,
}
}
return meta
Expand Down
Loading