Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Change Tag ID generation to avoid possible collisions #1286

Merged
merged 16 commits into from
Jan 29, 2019
Merged
21 changes: 20 additions & 1 deletion src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,16 @@ type RPCConfiguration struct {
// Currently only name, but can expand to cover deduplication settings, or other
// relevant options.
type TagOptionsConfiguration struct {
// Version specifies the version number of tag options. Defaults to 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much value does having Version in addition to Scheme add? It seems like just changing Scheme to TypeQuoted would be a more straightforward way of configuring us to use TypeQuoted than changing the version number and assuming the default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I'm assuming we'll make csv the actual default when we do a major version bump?

Copy link
Collaborator

@robskillington robskillington Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we will have a point in time where we do based on a "YAML" config version change which type (i.e. we'll use quoted by default), but we don't have that mechanism right now.

So removing version makes sense.

Version int `yaml:"version"`

// MetricName specifies the tag name that corresponds to the metric's name tag
// If not provided, defaults to `__name__`
// If not provided, defaults to `__name__`.
MetricName string `yaml:"metricName"`

// Scheme deterimnes the default ID generation scheme.
// If version is 0, defaults to TypeLegacy; otherwise, defaults to TypeQuoted.
Scheme models.IDSchemeType `yaml:"idScheme"`
}

// TagOptionsFromConfig translates tag option configuration into tag options.
Expand All @@ -157,6 +164,18 @@ func TagOptionsFromConfig(cfg TagOptionsConfiguration) (models.TagOptions, error
opts = opts.SetMetricName([]byte(name))
}

version := cfg.Version
opts = opts.SetVersion(version)

if cfg.Scheme == models.TypeDefault {
if version == 0 {
cfg.Scheme = models.TypeLegacy
} else {
cfg.Scheme = models.TypeQuoted
}
}

opts = opts.SetIDSchemeType(cfg.Scheme)
if err := opts.Validate(); err != nil {
return nil, err
}
Expand Down
33 changes: 33 additions & 0 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ package config
import (
"testing"

"github.com/m3db/m3/src/query/models"
xconfig "github.com/m3db/m3x/config"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
)

func TestTagOptionsFromEmptyConfig(t *testing.T) {
Expand Down Expand Up @@ -94,3 +96,34 @@ func TestConfigValidation(t *testing.T) {
})
}
}

func TestDefaultTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
require.NoError(t, yaml.Unmarshal([]byte(""), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 0, opts.Version())
assert.Equal(t, []byte("__name__"), opts.MetricName())
assert.Equal(t, models.TypeLegacy, opts.IDSchemeType())
}

func TestDefaultTagOptionsConfigVersion1(t *testing.T) {
var cfg TagOptionsConfiguration
require.NoError(t, yaml.Unmarshal([]byte("version: 1"), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 1, opts.Version())
assert.Equal(t, []byte("__name__"), opts.MetricName())
assert.Equal(t, models.TypeQuoted, opts.IDSchemeType())
}

func TestTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
config := "version: 0\nmetricName: abcdefg\nidScheme: prependMeta"
require.NoError(t, yaml.Unmarshal([]byte(config), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 0, opts.Version())
assert.Equal(t, []byte("abcdefg"), opts.MetricName())
assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType())
}
2 changes: 2 additions & 0 deletions src/query/api/v1/handler/json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func NewWriteJSONHandler(store storage.Storage) http.Handler {

// WriteQuery represents the write request from the user
// NB(braskin): support only writing one datapoint for now
// TODO: build this out to be a legitimate batched endpoint, change
// Tags to take a list of tag structs
type WriteQuery struct {
Tags map[string]string `json:"tags" validate:"nonzero"`
Timestamp string `json:"timestamp" validate:"nonzero"`
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func renderM3QLResultsJSON(
for _, s := range series {
jw.BeginObject()
jw.BeginObjectField("target")
jw.WriteString(s.Name())
jw.WriteString(string(s.Name()))

jw.BeginObjectField("tags")
jw.BeginObject()
Expand Down
36 changes: 20 additions & 16 deletions src/query/api/v1/handler/prometheus/native/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ func TestRenderResultsJSON(t *testing.T) {
buffer := bytes.NewBuffer(nil)
params := models.RequestParams{}
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
ts.NewSeries([]byte("foo"),
ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries([]byte("bar"),
ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
}

renderResultsJSON(buffer, series, params)
Expand Down Expand Up @@ -187,14 +189,16 @@ func TestRenderInstantaneousResultsJSON(t *testing.T) {
start := time.Unix(1535948880, 0)
buffer := bytes.NewBuffer(nil)
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
ts.NewSeries([]byte("foo"),
ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries([]byte("bar"),
ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
}

renderResultsInstantaneousJSON(buffer, series)
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/validator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func tsListToMap(tsList []*ts.Series) map[string]*ts.Series {
for _, series := range tsList {
series.Tags = series.Tags.Normalize()
id := series.Tags.ID()
tsMap[id] = series
tsMap[string(id)] = series
}

return tsMap
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestSearchResponse(t *testing.T) {
results, err := searchHandler.search(context.TODO(), generateSearchReq(), &opts)
require.NoError(t, err)

assert.Equal(t, testID, results.Metrics[0].ID)
assert.Equal(t, []byte(testID), results.Metrics[0].ID)
expected := test.TagSliceToTags([]models.Tag{{Name: []byte("foo"), Value: []byte("bar")}})
assert.Equal(t, expected.Tags, results.Metrics[0].Tags.Tags)
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/benchmark/benchmarker/main/convert_to_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func calculateCardinality(fromFile string, logger *zap.Logger) (int, error) {
ts, _ := marshalTSDBToProm(tsdb)
tags := storage.PromLabelsToM3Tags(ts.GetLabels(), models.NewTagOptions())
id := tags.ID()
tagsSeen[id]++
tagsSeen[string(id)]++

read++
if read%marker == 0 {
Expand Down
4 changes: 2 additions & 2 deletions src/query/block/scalar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestScalarBlock(t *testing.T) {
}

assert.Equal(t, 0, series.Meta.Tags.Len())
assert.Equal(t, "", series.Meta.Name)
assert.Equal(t, []byte(nil), series.Meta.Name)

require.False(t, seriesIter.Next())
series, err = seriesIter.Current()
Expand All @@ -117,5 +117,5 @@ func verifyMetas(t *testing.T, meta Metadata, seriesMeta []SeriesMeta) {
assert.Len(t, seriesMeta, 1)
sMeta := seriesMeta[0]
assert.Equal(t, 0, sMeta.Tags.Len())
assert.Equal(t, "", sMeta.Name)
assert.Equal(t, []byte(nil), sMeta.Name)
}
2 changes: 1 addition & 1 deletion src/query/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type UnconsolidatedBlock interface {
// SeriesMeta is metadata data for the series
type SeriesMeta struct {
Tags models.Tags
Name string
Name []byte
}

// Iterator is the base iterator
Expand Down
16 changes: 9 additions & 7 deletions src/query/functions/aggregation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func NewAggregationOp(

// baseOp stores required properties for the baseOp
type baseOp struct {
params NodeParams
opType string
aggFn aggregationFn
params NodeParams
opType string
opTypeBytes []byte
aggFn aggregationFn
}

// OpType for the operator
Expand All @@ -97,9 +98,10 @@ func (o baseOp) Node(controller *transform.Controller, _ transform.Options) tran

func newBaseOp(params NodeParams, opType string, aggFn aggregationFn) baseOp {
return baseOp{
params: params,
opType: opType,
aggFn: aggFn,
params: params,
opType: opType,
opTypeBytes: []byte(opType),
aggFn: aggFn,
}
}

Expand All @@ -121,7 +123,7 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error {
buckets, metas := utils.GroupSeries(
params.MatchingTags,
params.Without,
n.op.opType,
n.op.opTypeBytes,
seriesMetas,
)
meta.Tags, metas = utils.DedupeMetadata(metas)
Expand Down
26 changes: 14 additions & 12 deletions src/query/functions/aggregation/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (
Duration: time.Minute * 5,
StepSize: time.Minute,
}

typeBytes = []byte(StandardDeviationType)
)

func processAggregationOp(t *testing.T, op parser.Params) *executor.SinkNode {
Expand Down Expand Up @@ -86,9 +88,9 @@ func TestFunctionFilteringWithA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("1")}})},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("2")}})},
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("1")}})},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("2")}})},
{Name: typeBytes, Tags: models.EmptyTags()},
}
expectedMetaTags := models.EmptyTags()

Expand All @@ -113,9 +115,9 @@ func TestFunctionFilteringWithoutA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
{Name: typeBytes, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
}

expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})
Expand All @@ -136,7 +138,7 @@ func TestFunctionFilteringWithD(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: models.EmptyTags()},
{Name: typeBytes, Tags: models.EmptyTags()},
}

expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})
Expand Down Expand Up @@ -166,11 +168,11 @@ func TestFunctionFilteringWithoutD(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})},
{Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})},
{Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})},
}
expectedMetaTags := models.EmptyTags()

Expand Down
14 changes: 8 additions & 6 deletions src/query/functions/aggregation/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func NewCountValuesOp(

// countValuesOp stores required properties for count values ops
type countValuesOp struct {
params NodeParams
opType string
params NodeParams
opType string
opTypeBytes []byte
}

// OpType for the operator
Expand All @@ -78,8 +79,9 @@ func (o countValuesOp) Node(

func newCountValuesOp(params NodeParams, opType string) countValuesOp {
return countValuesOp{
params: params,
opType: opType,
params: params,
opType: opType,
opTypeBytes: []byte(opType),
}
}

Expand Down Expand Up @@ -152,7 +154,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {
buckets, metas := utils.GroupSeries(
params.MatchingTags,
params.Without,
n.op.opType,
n.op.opTypeBytes,
seriesMetas,
)

Expand Down Expand Up @@ -192,7 +194,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {
for k, v := range bucketBlock.indexMapping {
// Add the metas of this bucketBlock right after the previous block
blockMetas[v+previousBucketBlockIndex] = block.SeriesMeta{
Name: n.op.OpType(),
Name: n.op.opTypeBytes,
Tags: metas[bucketIndex].Tags.Clone().AddTag(models.Tag{
Name: []byte(n.op.params.StringParameter),
Value: utils.FormatFloatToBytes(k),
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/aggregation/count_values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func tagsToSeriesMeta(tags []models.Tags) []block.SeriesMeta {
expectedMetas := make([]block.SeriesMeta, len(tags))
for i, m := range tags {
expectedMetas[i] = block.SeriesMeta{
Name: CountValuesType,
Name: []byte(CountValuesType),
Tags: m,
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/query/functions/aggregation/quantile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
"github.com/stretchr/testify/require"
)

var (
typeBytesQuantile = []byte(QuantileType)
)

func TestQuantileFn(t *testing.T) {
values := []float64{3.1, 100, 200, 300, 2.1, 800, 1.1, 4.1, 5.1}
// NB Taken values by bucket: [3.1, 2.1, 1.1, 4.1]
Expand Down Expand Up @@ -147,9 +151,9 @@ func TestQuantileFunctionFilteringWithoutA(t *testing.T) {
}

expectedMetas := []block.SeriesMeta{
{Name: QuantileType, Tags: models.EmptyTags()},
{Name: QuantileType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: QuantileType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
{Name: typeBytesQuantile, Tags: models.EmptyTags()},
{Name: typeBytesQuantile, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})},
{Name: typeBytesQuantile, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})},
}
expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}})

Expand Down
Loading