Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Change Tag ID generation to avoid possible collisions #1286

Merged
merged 16 commits into from
Jan 29, 2019
Merged
22 changes: 17 additions & 5 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,17 @@ func NewIngester(
return nil, err
}

tagOpts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
err = tagOpts.Validate()
if err != nil {
return nil, err
}

return &ingester{
downsamplerAndWriter: downsamplerAndWriter,
opts: opts,
logger: opts.InstrumentOptions.Logger(),
tagOpts: tagOpts,
metrics: newCarbonIngesterMetrics(
opts.InstrumentOptions.MetricsScope()),
}, nil
Expand All @@ -97,6 +104,7 @@ type ingester struct {
opts Options
logger log.Logger
metrics carbonIngesterMetrics
tagOpts models.TagOptions
}

func (i *ingester) Handle(conn net.Conn) {
Expand Down Expand Up @@ -140,7 +148,7 @@ func (i *ingester) Handle(conn net.Conn) {
func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}}
// TODO(rartoul): Pool.
tags, err := GenerateTagsFromName(name)
tags, err := GenerateTagsFromName(name, i.tagOpts)
if err != nil {
i.logger.Errorf("err generating tags from carbon name: %s, err: %s",
string(name), err)
Expand Down Expand Up @@ -196,9 +204,12 @@ type carbonIngesterMetrics struct {
// __g0__:foo
// __g1__:bar
// __g2__:baz
func GenerateTagsFromName(name []byte) (models.Tags, error) {
func GenerateTagsFromName(
name []byte,
opts models.TagOptions,
) (models.Tags, error) {
if len(name) == 0 {
return models.Tags{}, errCannotGenerateTagsFromEmptyName
return models.EmptyTags(), errCannotGenerateTagsFromEmptyName
}

var (
Expand All @@ -211,7 +222,8 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) {
for i, charByte := range name {
if charByte == carbonSeparatorByte {
if i+1 < len(name) && name[i+1] == carbonSeparatorByte {
return models.Tags{}, fmt.Errorf("carbon metric: %s has duplicate separator", string(name))
return models.EmptyTags(),
fmt.Errorf("carbon metric: %s has duplicate separator", string(name))
}

tags = append(tags, models.Tag{
Expand Down Expand Up @@ -239,5 +251,5 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) {
})
}

return models.Tags{Tags: tags}, nil
return models.NewTags(numTags, opts).AddTags(tags), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func BenchmarkGenerateTagsFromName(b *testing.B) {
err error
)

opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
for i := 0; i < b.N; i++ {
benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName)
benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName, opts)
if err != nil {
panic(err)
}
Expand Down
22 changes: 16 additions & 6 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
xtime "github.com/m3db/m3x/time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -105,17 +106,20 @@ func TestIngesterHandleConn(t *testing.T) {
func TestGenerateTagsFromName(t *testing.T) {
testCases := []struct {
name string
id string
expectedTags []models.Tag
expectedErr error
}{
{
name: "foo",
id: "foo",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
},
},
{
name: "foo.bar.baz",
id: "foo.bar.baz",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
{Name: graphite.TagName(1), Value: []byte("bar")},
Expand All @@ -124,28 +128,33 @@ func TestGenerateTagsFromName(t *testing.T) {
},
{
name: "foo.bar.baz.",
id: "foo.bar.baz",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
{Name: graphite.TagName(1), Value: []byte("bar")},
{Name: graphite.TagName(2), Value: []byte("baz")},
},
},
{
name: "foo..bar..baz..",
expectedErr: fmt.Errorf("carbon metric: foo..bar..baz.. has duplicate separator"),
name: "foo..bar..baz..",
expectedErr: fmt.Errorf("carbon metric: foo..bar..baz.. has duplicate separator"),
expectedTags: []models.Tag{},
},
{
name: "foo.bar.baz..",
expectedErr: fmt.Errorf("carbon metric: foo.bar.baz.. has duplicate separator"),
name: "foo.bar.baz..",
expectedErr: fmt.Errorf("carbon metric: foo.bar.baz.. has duplicate separator"),
expectedTags: []models.Tag{},
},
}

opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
for _, tc := range testCases {
tags, err := GenerateTagsFromName([]byte(tc.name))
tags, err := GenerateTagsFromName([]byte(tc.name), opts)
if tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
} else {
require.NoError(t, err)
assert.Equal(t, []byte(tc.id), tags.ID())
}
require.Equal(t, tc.expectedTags, tags.Tags)
}
Expand Down Expand Up @@ -245,7 +254,8 @@ func init() {

metric = []byte(fmt.Sprintf("test.metric.%d", i))

tags, err := GenerateTagsFromName(metric)
opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
tags, err := GenerateTagsFromName(metric, opts)
if err != nil {
panic(err)
}
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/services/m3coordinator/ingest/m3msg/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func NewIngester(
m: m,
logger: opts.InstrumentOptions.Logger(),
sampler: opts.Sampler,
q: storage.WriteQuery{
Tags: models.NewTags(0, nil),
},
}
op.attemptFn = op.attempt
op.ingestFn = op.ingest
Expand Down Expand Up @@ -199,11 +202,12 @@ func (op *ingestOp) resetTags() error {
op.q.Tags.Tags = op.q.Tags.Tags[:0]
for op.it.Next() {
name, value := op.it.Current()
op.q.Tags = op.q.Tags.AddTag(models.Tag{
op.q.Tags = op.q.Tags.AddTagWithoutNormalizing(models.Tag{
Name: name,
Value: value,
}.Clone())
}
op.q.Tags.Normalize()
return op.it.Err()
}

Expand Down
8 changes: 4 additions & 4 deletions src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ func TestIngest(t *testing.T) {
Value: val,
},
},
Tags: models.Tags{
Tags: []models.Tag{
Tags: models.NewTags(2, nil).AddTags(
[]models.Tag{
models.Tag{
Name: []byte("__name__"),
Value: []byte("foo"),
},
models.Tag{
{
Name: []byte("app"),
Value: []byte("bar"),
},
},
},
),
Unit: xtime.Second,
},
*appender.received[0],
Expand Down
58 changes: 31 additions & 27 deletions src/cmd/services/m3coordinator/ingest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,38 @@ import (
)

var (
testTags1 = models.Tags{Tags: []models.Tag{
{
Name: []byte("test_1_key_1"),
Value: []byte("test_1_value_1"),
},
{
Name: []byte("test_1_key_2"),
Value: []byte("test_1_value_2"),
},
{
Name: []byte("test_1_key_3"),
Value: []byte("test_1_value_3"),
},
}}
testTags2 = models.Tags{Tags: []models.Tag{
{
Name: []byte("test_2_key_1"),
Value: []byte("test_2_value_1"),
},
{
Name: []byte("test_2_key_2"),
Value: []byte("test_2_value_2"),
testTags1 = models.NewTags(3, nil).AddTags(
[]models.Tag{
{
Name: []byte("test_1_key_1"),
Value: []byte("test_1_value_1"),
},
{
Name: []byte("test_1_key_2"),
Value: []byte("test_1_value_2"),
},
{
Name: []byte("test_1_key_3"),
Value: []byte("test_1_value_3"),
},
},
{
Name: []byte("test_2_key_3"),
Value: []byte("test_2_value_3"),
)
testTags2 = models.NewTags(3, nil).AddTags(
[]models.Tag{
{
Name: []byte("test_2_key_1"),
Value: []byte("test_2_value_1"),
},
{
Name: []byte("test_2_key_2"),
Value: []byte("test_2_value_2"),
},
{
Name: []byte("test_2_key_3"),
Value: []byte("test_2_value_3"),
},
},
}}
)

testDatapoints1 = []ts.Datapoint{
{
Expand Down Expand Up @@ -125,7 +129,7 @@ func (i *testIter) Next() bool {

func (i *testIter) Current() (models.Tags, ts.Datapoints, xtime.Unit) {
if len(i.entries) == 0 || i.idx < 0 || i.idx >= len(i.entries) {
return models.Tags{}, nil, 0
return models.EmptyTags(), nil, 0
}

curr := i.entries[i.idx]
Expand Down
10 changes: 9 additions & 1 deletion src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ type RPCConfiguration struct {
// relevant options.
type TagOptionsConfiguration struct {
// MetricName specifies the tag name that corresponds to the metric's name tag
// If not provided, defaults to `__name__`
// If not provided, defaults to `__name__`.
MetricName string `yaml:"metricName"`

// Scheme determines the default ID generation scheme. Defaults to TypeLegacy.
Scheme models.IDSchemeType `yaml:"idScheme"`
}

// TagOptionsFromConfig translates tag option configuration into tag options.
Expand All @@ -212,6 +215,11 @@ func TagOptionsFromConfig(cfg TagOptionsConfiguration) (models.TagOptions, error
opts = opts.SetMetricName([]byte(name))
}

if cfg.Scheme == models.TypeDefault {
cfg.Scheme = models.TypeLegacy
}

opts = opts.SetIDSchemeType(cfg.Scheme)
if err := opts.Validate(); err != nil {
return nil, err
}
Expand Down
21 changes: 21 additions & 0 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ package config
import (
"testing"

"github.com/m3db/m3/src/query/models"
xconfig "github.com/m3db/m3x/config"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
)

func TestTagOptionsFromEmptyConfig(t *testing.T) {
Expand Down Expand Up @@ -94,3 +96,22 @@ func TestConfigValidation(t *testing.T) {
})
}
}

func TestDefaultTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
require.NoError(t, yaml.Unmarshal([]byte(""), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, []byte("__name__"), opts.MetricName())
assert.Equal(t, models.TypeLegacy, opts.IDSchemeType())
}

func TestTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
config := "metricName: abcdefg\nidScheme: prepend_meta"
require.NoError(t, yaml.Unmarshal([]byte(config), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, []byte("abcdefg"), opts.MetricName())
assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType())
}
8 changes: 4 additions & 4 deletions src/query/api/v1/handler/graphite/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestParseQueryResults(t *testing.T) {
tags = tags.AddTag(models.Tag{Name: graphite.TagName(0), Value: []byte("foo")})
tags = tags.AddTag(models.Tag{Name: graphite.TagName(1), Value: []byte("bar")})
seriesList := ts.SeriesList{
ts.NewSeries("irrelevant_name", vals, tags),
ts.NewSeries([]byte("series_name"), vals, tags),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand All @@ -96,7 +96,7 @@ func TestParseQueryResults(t *testing.T) {
buf, err := ioutil.ReadAll(res.Body)
require.NoError(t, err)
expected := fmt.Sprintf(
`[{"target":"foo.bar","datapoints":[[3.000000,%d],`+
`[{"target":"series_name","datapoints":[[3.000000,%d],`+
`[3.000000,%d],[3.000000,%d]],"step_size_ms":%d}]`,
start.Unix(), start.Unix()+10, start.Unix()+20, resolution/time.Millisecond)

Expand All @@ -116,7 +116,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) {
resolution := 10 * time.Second
vals := ts.NewFixedStepValues(resolution, 4, 4, start)
seriesList := ts.SeriesList{
ts.NewSeries("a", vals, models.NewTags(0, nil)),
ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) {
resolution := 10 * time.Second
vals := ts.NewFixedStepValues(resolution, 3, 3, start)
seriesList := ts.SeriesList{
ts.NewSeries("a", vals, models.NewTags(0, nil)),
ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand Down
2 changes: 2 additions & 0 deletions src/query/api/v1/handler/json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func NewWriteJSONHandler(store storage.Storage) http.Handler {

// WriteQuery represents the write request from the user
// NB(braskin): support only writing one datapoint for now
// TODO: build this out to be a legitimate batched endpoint, change
// Tags to take a list of tag structs
type WriteQuery struct {
Tags map[string]string `json:"tags" validate:"nonzero"`
Timestamp string `json:"timestamp" validate:"nonzero"`
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func renderM3QLResultsJSON(
for _, s := range series {
jw.BeginObject()
jw.BeginObjectField("target")
jw.WriteString(s.Name())
jw.WriteString(string(s.Name()))

jw.BeginObjectField("tags")
jw.BeginObject()
Expand Down
Loading