Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Change Tag ID generation to avoid possible collisions #1286

Merged
merged 16 commits into from
Jan 29, 2019
Merged
17 changes: 14 additions & 3 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,17 @@ func NewIngester(
return nil, err
}

tagOpts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
err = tagOpts.Validate()
if err != nil {
return nil, err
}

return &ingester{
downsamplerAndWriter: downsamplerAndWriter,
opts: opts,
logger: opts.InstrumentOptions.Logger(),
tagOpts: tagOpts,
metrics: newCarbonIngesterMetrics(
opts.InstrumentOptions.MetricsScope()),
}, nil
Expand All @@ -97,6 +104,7 @@ type ingester struct {
opts Options
logger log.Logger
metrics carbonIngesterMetrics
tagOpts models.TagOptions
}

func (i *ingester) Handle(conn net.Conn) {
Expand Down Expand Up @@ -140,7 +148,7 @@ func (i *ingester) Handle(conn net.Conn) {
func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}}
// TODO(rartoul): Pool.
tags, err := GenerateTagsFromName(name)
tags, err := GenerateTagsFromName(name, i.tagOpts)
if err != nil {
i.logger.Errorf("err generating tags from carbon name: %s, err: %s",
string(name), err)
Expand Down Expand Up @@ -196,7 +204,10 @@ type carbonIngesterMetrics struct {
// __g0__:foo
// __g1__:bar
// __g2__:baz
func GenerateTagsFromName(name []byte) (models.Tags, error) {
func GenerateTagsFromName(
name []byte,
opts models.TagOptions,
) (models.Tags, error) {
if len(name) == 0 {
return models.Tags{}, errCannotGenerateTagsFromEmptyName
}
Expand Down Expand Up @@ -239,5 +250,5 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) {
})
}

return models.Tags{Tags: tags}, nil
return models.NewTags(numTags, opts).AddTags(tags), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func BenchmarkGenerateTagsFromName(b *testing.B) {
err error
)

opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
for i := 0; i < b.N; i++ {
benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName)
benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName, opts)
if err != nil {
panic(err)
}
Expand Down
12 changes: 10 additions & 2 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
xtime "github.com/m3db/m3x/time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -105,17 +106,20 @@ func TestIngesterHandleConn(t *testing.T) {
func TestGenerateTagsFromName(t *testing.T) {
testCases := []struct {
name string
id string
expectedTags []models.Tag
expectedErr error
}{
{
name: "foo",
id: "foo",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
},
},
{
name: "foo.bar.baz",
id: "foo.bar.baz",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
{Name: graphite.TagName(1), Value: []byte("bar")},
Expand All @@ -124,6 +128,7 @@ func TestGenerateTagsFromName(t *testing.T) {
},
{
name: "foo.bar.baz.",
id: "foo.bar.baz",
expectedTags: []models.Tag{
{Name: graphite.TagName(0), Value: []byte("foo")},
{Name: graphite.TagName(1), Value: []byte("bar")},
Expand All @@ -140,12 +145,14 @@ func TestGenerateTagsFromName(t *testing.T) {
},
}

opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
for _, tc := range testCases {
tags, err := GenerateTagsFromName([]byte(tc.name))
tags, err := GenerateTagsFromName([]byte(tc.name), opts)
if tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
} else {
require.NoError(t, err)
assert.Equal(t, []byte(tc.id), tags.ID())
}
require.Equal(t, tc.expectedTags, tags.Tags)
}
Expand Down Expand Up @@ -245,7 +252,8 @@ func init() {

metric = []byte(fmt.Sprintf("test.metric.%d", i))

tags, err := GenerateTagsFromName(metric)
opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite)
tags, err := GenerateTagsFromName(metric, opts)
if err != nil {
panic(err)
}
Expand Down
56 changes: 30 additions & 26 deletions src/cmd/services/m3coordinator/ingest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,38 @@ import (
)

var (
testTags1 = models.Tags{Tags: []models.Tag{
{
Name: []byte("test_1_key_1"),
Value: []byte("test_1_value_1"),
},
{
Name: []byte("test_1_key_2"),
Value: []byte("test_1_value_2"),
},
{
Name: []byte("test_1_key_3"),
Value: []byte("test_1_value_3"),
},
}}
testTags2 = models.Tags{Tags: []models.Tag{
{
Name: []byte("test_2_key_1"),
Value: []byte("test_2_value_1"),
},
{
Name: []byte("test_2_key_2"),
Value: []byte("test_2_value_2"),
testTags1 = models.NewTags(3, nil).AddTags(
[]models.Tag{
{
Name: []byte("test_1_key_1"),
Value: []byte("test_1_value_1"),
},
{
Name: []byte("test_1_key_2"),
Value: []byte("test_1_value_2"),
},
{
Name: []byte("test_1_key_3"),
Value: []byte("test_1_value_3"),
},
},
{
Name: []byte("test_2_key_3"),
Value: []byte("test_2_value_3"),
)
testTags2 = models.NewTags(3, nil).AddTags(
[]models.Tag{
{
Name: []byte("test_2_key_1"),
Value: []byte("test_2_value_1"),
},
{
Name: []byte("test_2_key_2"),
Value: []byte("test_2_value_2"),
},
{
Name: []byte("test_2_key_3"),
Value: []byte("test_2_value_3"),
},
},
}}
)

testDatapoints1 = []ts.Datapoint{
{
Expand Down
21 changes: 20 additions & 1 deletion src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,16 @@ type RPCConfiguration struct {
// Currently only name, but can expand to cover deduplication settings, or other
// relevant options.
type TagOptionsConfiguration struct {
// Version specifies the version number of tag options. Defaults to 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much value does having Version in addition to Scheme add? It seems like just changing Scheme to TypeQuoted would be a more straightforward way of configuring us to use TypeQuoted than changing the version number and assuming the default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I'm assuming we'll make csv the actual default when we do a major version bump?

Copy link
Collaborator

@robskillington robskillington Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we will have a point in time where we do based on a "YAML" config version change which type (i.e. we'll use quoted by default), but we don't have that mechanism right now.

So removing version makes sense.

Version int `yaml:"version"`

// MetricName specifies the tag name that corresponds to the metric's name tag
// If not provided, defaults to `__name__`
// If not provided, defaults to `__name__`.
MetricName string `yaml:"metricName"`

// Scheme deterimnes the default ID generation scheme.
// If version is 0, defaults to TypeLegacy; otherwise, defaults to TypeQuoted.
Scheme models.IDSchemeType `yaml:"idScheme"`
}

// TagOptionsFromConfig translates tag option configuration into tag options.
Expand All @@ -212,6 +219,18 @@ func TagOptionsFromConfig(cfg TagOptionsConfiguration) (models.TagOptions, error
opts = opts.SetMetricName([]byte(name))
}

version := cfg.Version
opts = opts.SetVersion(version)

if cfg.Scheme == models.TypeDefault {
if version == 0 {
cfg.Scheme = models.TypeLegacy
} else {
cfg.Scheme = models.TypeQuoted
}
}

opts = opts.SetIDSchemeType(cfg.Scheme)
if err := opts.Validate(); err != nil {
return nil, err
}
Expand Down
33 changes: 33 additions & 0 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ package config
import (
"testing"

"github.com/m3db/m3/src/query/models"
xconfig "github.com/m3db/m3x/config"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
)

func TestTagOptionsFromEmptyConfig(t *testing.T) {
Expand Down Expand Up @@ -94,3 +96,34 @@ func TestConfigValidation(t *testing.T) {
})
}
}

func TestDefaultTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
require.NoError(t, yaml.Unmarshal([]byte(""), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 0, opts.Version())
assert.Equal(t, []byte("__name__"), opts.MetricName())
assert.Equal(t, models.TypeLegacy, opts.IDSchemeType())
}

func TestDefaultTagOptionsConfigVersion1(t *testing.T) {
var cfg TagOptionsConfiguration
require.NoError(t, yaml.Unmarshal([]byte("version: 1"), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 1, opts.Version())
assert.Equal(t, []byte("__name__"), opts.MetricName())
assert.Equal(t, models.TypeQuoted, opts.IDSchemeType())
}

func TestTagOptionsConfig(t *testing.T) {
var cfg TagOptionsConfiguration
config := "version: 0\nmetricName: abcdefg\nidScheme: prepend_meta"
require.NoError(t, yaml.Unmarshal([]byte(config), &cfg))
opts, err := TagOptionsFromConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 0, opts.Version())
assert.Equal(t, []byte("abcdefg"), opts.MetricName())
assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType())
}
6 changes: 3 additions & 3 deletions src/query/api/v1/handler/graphite/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestParseQueryResults(t *testing.T) {
tags = tags.AddTag(models.Tag{Name: graphite.TagName(0), Value: []byte("foo")})
tags = tags.AddTag(models.Tag{Name: graphite.TagName(1), Value: []byte("bar")})
seriesList := ts.SeriesList{
ts.NewSeries("irrelevant_name", vals, tags),
ts.NewSeries([]byte("irrelevant_name"), vals, tags),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) {
resolution := 10 * time.Second
vals := ts.NewFixedStepValues(resolution, 4, 4, start)
seriesList := ts.SeriesList{
ts.NewSeries("a", vals, models.NewTags(0, nil)),
ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) {
resolution := 10 * time.Second
vals := ts.NewFixedStepValues(resolution, 3, 3, start)
seriesList := ts.SeriesList{
ts.NewSeries("a", vals, models.NewTags(0, nil)),
ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)),
}
for _, series := range seriesList {
series.SetResolution(resolution)
Expand Down
2 changes: 2 additions & 0 deletions src/query/api/v1/handler/json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func NewWriteJSONHandler(store storage.Storage) http.Handler {

// WriteQuery represents the write request from the user
// NB(braskin): support only writing one datapoint for now
// TODO: build this out to be a legitimate batched endpoint, change
// Tags to take a list of tag structs
type WriteQuery struct {
Tags map[string]string `json:"tags" validate:"nonzero"`
Timestamp string `json:"timestamp" validate:"nonzero"`
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func renderM3QLResultsJSON(
for _, s := range series {
jw.BeginObject()
jw.BeginObjectField("target")
jw.WriteString(s.Name())
jw.WriteString(string(s.Name()))

jw.BeginObjectField("tags")
jw.BeginObject()
Expand Down
36 changes: 20 additions & 16 deletions src/query/api/v1/handler/prometheus/native/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,16 @@ func TestRenderResultsJSON(t *testing.T) {
buffer := bytes.NewBuffer(nil)
params := models.RequestParams{}
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
ts.NewSeries([]byte("foo"),
ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries([]byte("bar"),
ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
}

renderResultsJSON(buffer, series, params)
Expand Down Expand Up @@ -216,14 +218,16 @@ func TestRenderInstantaneousResultsJSON(t *testing.T) {
start := time.Unix(1535948880, 0)
buffer := bytes.NewBuffer(nil)
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
ts.NewSeries([]byte("foo"),
ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("bar"), Value: []byte("baz")},
models.Tag{Name: []byte("qux"), Value: []byte("qaz")},
})),
ts.NewSeries([]byte("bar"),
ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{
models.Tag{Name: []byte("baz"), Value: []byte("bar")},
models.Tag{Name: []byte("qaz"), Value: []byte("qux")},
})),
}

renderResultsInstantaneousJSON(buffer, series)
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/validator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func tsListToMap(tsList []*ts.Series) map[string]*ts.Series {
for _, series := range tsList {
series.Tags = series.Tags.Normalize()
id := series.Tags.ID()
tsMap[id] = series
tsMap[string(id)] = series
}

return tsMap
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestSearchResponse(t *testing.T) {
results, err := searchHandler.search(context.TODO(), generateSearchReq(), &opts)
require.NoError(t, err)

assert.Equal(t, testID, results.Metrics[0].ID)
assert.Equal(t, []byte(testID), results.Metrics[0].ID)
expected := test.TagSliceToTags([]models.Tag{{Name: []byte("foo"), Value: []byte("bar")}})
assert.Equal(t, expected.Tags, results.Metrics[0].Tags.Tags)
}
Expand Down
Loading