Skip to content

Commit

Permalink
[query] Change Tag ID generation to avoid possible collisions (#1286)
Browse files Browse the repository at this point in the history
- Changes series ID to byteslice instead of string
- Changes tag ID generation to use collision-resistant scheme
- Adds carbon metric ID scheme
  • Loading branch information
arnikola authored Jan 29, 2019
1 parent 620c0fe commit 47c8a1b
Show file tree
Hide file tree
Showing 71 changed files with 1,869 additions and 480 deletions.
22 changes: 17 additions & 5 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,17 @@ func NewIngester(
return nil, err
}

tagOpts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
err = tagOpts.Validate()
if err != nil {
return nil, err
}

return &ingester{
downsamplerAndWriter: downsamplerAndWriter,
opts: opts,
logger: opts.InstrumentOptions.Logger(),
tagOpts: tagOpts,
metrics: newCarbonIngesterMetrics(
opts.InstrumentOptions.MetricsScope()),
}, nil
Expand All @@ -97,6 +104,7 @@ type ingester struct {
opts Options
logger log.Logger
metrics carbonIngesterMetrics
tagOpts models.TagOptions
}

func (i *ingester) Handle(conn net.Conn) {
Expand Down Expand Up @@ -140,7 +148,7 @@ func (i *ingester) Handle(conn net.Conn) {
func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}}
// TODO(rartoul): Pool.
tags, err := GenerateTagsFromName(name)
tags, err := GenerateTagsFromName(name, i.tagOpts)
if err != nil {
i.logger.Errorf("err generating tags from carbon name: %s, err: %s",
string(name), err)
Expand Down Expand Up @@ -196,9 +204,12 @@ type carbonIngesterMetrics struct {
// __g0__:foo
// __g1__:bar
// __g2__:baz
func GenerateTagsFromName(name []byte) (models.Tags, error) {
func GenerateTagsFromName(
name []byte,
opts models.TagOptions,
) (models.Tags, error) {
if len(name) == 0 {
return models.Tags{}, errCannotGenerateTagsFromEmptyName
return models.EmptyTags(), errCannotGenerateTagsFromEmptyName
}

var (
Expand All @@ -211,7 +222,8 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) {
for i, charByte := range name {
if charByte == carbonSeparatorByte {
if i+1 < len(name) && name[i+1] == carbonSeparatorByte {
return models.Tags{}, fmt.Errorf("carbon metric: %s has duplicate separator", string(name))
return models.EmptyTags(),
fmt.Errorf("carbon metric: %s has duplicate separator", string(name))
}

tags = append(tags, models.Tag{
Expand Down Expand Up @@ -239,5 +251,5 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) {
})
}

return models.Tags{Tags: tags}, nil
return models.NewTags(numTags, opts).AddTags(tags), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func BenchmarkGenerateTagsFromName(b *testing.B) {
err error
)

opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
for i := 0; i < b.N; i++ {
benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName)
benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName, opts)
if err != nil {
panic(err)
}
Expand Down
22 changes: 16 additions & 6 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
xtime "github.com/m3db/m3x/time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -105,17 +106,20 @@ func TestIngesterHandleConn(t *testing.T) {
func TestGenerateTagsFromName(t *testing.T) {
testCases := []struct {
name string
id string
expectedTags []models.Tag
expectedErr error
}{
{
name: "foo",
id: "foo",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
},
},
{
name: "foo.bar.baz",
id: "foo.bar.baz",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
{Name: graphite.TagName(1), Value: []byte("bar")},
Expand All @@ -124,28 +128,33 @@ func TestGenerateTagsFromName(t *testing.T) {
},
{
name: "foo.bar.baz.",
id: "foo.bar.baz",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
{Name: graphite.TagName(1), Value: []byte("bar")},
{Name: graphite.TagName(2), Value: []byte("baz")},
},
},
{
name: "foo..bar..baz..",
expectedErr: fmt.Errorf("carbon metric: foo..bar..baz.. has duplicate separator"),
name: "foo..bar..baz..",
expectedErr: fmt.Errorf("carbon metric: foo..bar..baz.. has duplicate separator"),
expectedTags: []models.Tag{},
},
{
name: "foo.bar.baz..",
expectedErr: fmt.Errorf("carbon metric: foo.bar.baz.. has duplicate separator"),
name: "foo.bar.baz..",
expectedErr: fmt.Errorf("carbon metric: foo.bar.baz.. has duplicate separator"),
expectedTags: []models.Tag{},
},
}

opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
for _, tc := range testCases {
tags, err := GenerateTagsFromName([]byte(tc.name))
tags, err := GenerateTagsFromName([]byte(tc.name), opts)
if tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
} else {
require.NoError(t, err)
assert.Equal(t, []byte(tc.id), tags.ID())
}
require.Equal(t, tc.expectedTags, tags.Tags)
}
Expand Down Expand Up @@ -245,7 +254,8 @@ func init() {

metric = []byte(fmt.Sprintf("test.metric.%d", i))

tags, err := GenerateTagsFromName(metric)
opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
tags, err := GenerateTagsFromName(metric, opts)
if err != nil {
panic(err)
}
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/services/m3coordinator/ingest/m3msg/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func NewIngester(
m: m,
logger: opts.InstrumentOptions.Logger(),
sampler: opts.Sampler,
q: storage.WriteQuery{
Tags: models.NewTags(0, nil),
},
}
op.attemptFn = op.attempt
op.ingestFn = op.ingest
Expand Down Expand Up @@ -199,11 +202,12 @@ func (op *ingestOp) resetTags() error {
op.q.Tags.Tags = op.q.Tags.Tags[:0]
for op.it.Next() {
name, value := op.it.Current()
op.q.Tags = op.q.Tags.AddTag(models.Tag{
op.q.Tags = op.q.Tags.AddTagWithoutNormalizing(models.Tag{
Name: name,
Value: value,
}.Clone())
}
op.q.Tags.Normalize()
return op.it.Err()
}

Expand Down
8 changes: 4 additions & 4 deletions src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ func TestIngest(t *testing.T) {
Value: val,
},
},
Tags: models.Tags{
Tags: []models.Tag{
Tags: models.NewTags(2, nil).AddTags(
[]models.Tag{
models.Tag{
Name: []byte("__name__"),
Value: []byte("foo"),
},
models.Tag{
{
Name: []byte("app"),
Value: []byte("bar"),
},
},
},
),
Unit: xtime.Second,
},
*appender.received[0],
Expand Down
58 changes: 31 additions & 27 deletions src/cmd/services/m3coordinator/ingest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,38 @@ import (
)

var (
testTags1 = models.Tags{Tags: []models.Tag{
{
Name: []byte("test_1_key_1"),
Value: []byte("test_1_value_1"),
},
{
Name: []byte("test_1_key_2"),
Value: []byte("test_1_value_2"),
},
{
Name: []byte("test_1_key_3"),
Value: []byte("test_1_value_3"),
},
}}
testTags2 = models.Tags{Tags: []models.Tag{
{
Name: []byte("test_2_key_1"),
Value: []byte("test_2_value_1"),
},
{
Name: []byte("test_2_key_2"),
Value: []byte("test_2_value_2"),
testTags1 = models.NewTags(3, nil).AddTags(
[]models.Tag{
{
Name: []byte("test_1_key_1"),
Value: []byte("test_1_value_1"),
},
{
Name: []byte("test_1_key_2"),
Value: []byte("test_1_value_2"),
},
{
Name: []byte("test_1_key_3"),
Value: []byte("test_1_value_3"),
},
},
{
Name: []byte("test_2_key_3"),
Value: []byte("test_2_value_3"),
)
testTags2 = models.NewTags(3, nil).AddTags(
[]models.Tag{
{
Name: []byte("test_2_key_1"),
Value: []byte("test_2_value_1"),
},
{
Name: []byte("test_2_key_2"),
Value: []byte("test_2_value_2"),
},
{
Name: []byte("test_2_key_3"),
Value: []byte("test_2_value_3"),
},
},
}}
)

testDatapoints1 = []ts.Datapoint{
{
Expand Down Expand Up @@ -125,7 +129,7 @@ func (i *testIter) Next() bool {

func (i *testIter) Current() (models.Tags, ts.Datapoints, xtime.Unit) {
if len(i.entries) == 0 || i.idx < 0 || i.idx >= len(i.entries) {
return models.Tags{}, nil, 0
return models.EmptyTags(), nil, 0
}

curr := i.entries[i.idx]
Expand Down
10 changes: 9 additions & 1 deletion src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ type RPCConfiguration struct {
// relevant options.
type TagOptionsConfiguration struct {
// MetricName specifies the tag name that corresponds to the metric's name tag
// If not provided, defaults to `__name__`
// If not provided, defaults to `__name__`.
MetricName string `yaml:"metricName"`

// Scheme determines the default ID generation scheme. Defaults to TypeLegacy.
Scheme models.IDSchemeType `yaml:"idScheme"`
}

// TagOptionsFromConfig translates tag option configuration into tag options.
Expand All @@ -212,6 +215,11 @@ func TagOptionsFromConfig(cfg TagOptionsConfiguration) (models.TagOptions, error
opts = opts.SetMetricName([]byte(name))
}

if cfg.Scheme == models.TypeDefault {
cfg.Scheme = models.TypeLegacy
}

opts = opts.SetIDSchemeType(cfg.Scheme)
if err := opts.Validate(); err != nil {
return nil, err
}
Expand Down
21 changes: 21 additions & 0 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ package config
import (
"testing"

"github.com/m3db/m3/src/query/models"
xconfig "github.com/m3db/m3x/config"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
)

func TestTagOptionsFromEmptyConfig(t *testing.T) {
Expand Down Expand Up @@ -94,3 +96,22 @@ func TestConfigValidation(t *testing.T) {
})
}
}

func TestDefaultTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
require.NoError(t, yaml.Unmarshal([]byte(""), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, []byte("__name__"), opts.MetricName())
assert.Equal(t, models.TypeLegacy, opts.IDSchemeType())
}

func TestTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
config := "metricName: abcdefg\nidScheme: prepend_meta"
require.NoError(t, yaml.Unmarshal([]byte(config), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, []byte("abcdefg"), opts.MetricName())
assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType())
}
8 changes: 4 additions & 4 deletions src/query/api/v1/handler/graphite/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestParseQueryResults(t *testing.T) {
tags = tags.AddTag(models.Tag{Name: graphite.TagName(0), Value: []byte("foo")})
tags = tags.AddTag(models.Tag{Name: graphite.TagName(1), Value: []byte("bar")})
seriesList := ts.SeriesList{
ts.NewSeries("irrelevant_name", vals, tags),
ts.NewSeries([]byte("series_name"), vals, tags),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand All @@ -96,7 +96,7 @@ func TestParseQueryResults(t *testing.T) {
buf, err := ioutil.ReadAll(res.Body)
require.NoError(t, err)
expected := fmt.Sprintf(
`[{"target":"foo.bar","datapoints":[[3.000000,%d],`+
`[{"target":"series_name","datapoints":[[3.000000,%d],`+
`[3.000000,%d],[3.000000,%d]],"step_size_ms":%d}]`,
start.Unix(), start.Unix()+10, start.Unix()+20, resolution/time.Millisecond)

Expand All @@ -116,7 +116,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) {
resolution := 10 * time.Second
vals := ts.NewFixedStepValues(resolution, 4, 4, start)
seriesList := ts.SeriesList{
ts.NewSeries("a", vals, models.NewTags(0, nil)),
ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) {
resolution := 10 * time.Second
vals := ts.NewFixedStepValues(resolution, 3, 3, start)
seriesList := ts.SeriesList{
ts.NewSeries("a", vals, models.NewTags(0, nil)),
ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand Down
2 changes: 2 additions & 0 deletions src/query/api/v1/handler/json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func NewWriteJSONHandler(store storage.Storage) http.Handler {

// WriteQuery represents the write request from the user
// NB(braskin): support only writing one datapoint for now
// TODO: build this out to be a legitimate batched endpoint, change
// Tags to take a list of tag structs
type WriteQuery struct {
Tags map[string]string `json:"tags" validate:"nonzero"`
Timestamp string `json:"timestamp" validate:"nonzero"`
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func renderM3QLResultsJSON(
for _, s := range series {
jw.BeginObject()
jw.BeginObjectField("target")
jw.WriteString(s.Name())
jw.WriteString(string(s.Name()))

jw.BeginObjectField("tags")
jw.BeginObject()
Expand Down
Loading

0 comments on commit 47c8a1b

Please sign in to comment.