From 47c8a1b6bc0970d8f7f320d0add094c374257e6f Mon Sep 17 00:00:00 2001 From: arnikola Date: Tue, 29 Jan 2019 16:56:40 -0500 Subject: [PATCH] [query] Change Tag ID generation to avoid possible collisions (#1286) - Changes series ID to byteslice instead of string - Changes tag ID generation to use collision-resistant scheme - Adds carbon metric ID scheme --- .../m3coordinator/ingest/carbon/ingest.go | 22 +- .../ingest/carbon/ingest_benchmark_test.go | 3 +- .../ingest/carbon/ingest_test.go | 22 +- .../m3coordinator/ingest/m3msg/ingest.go | 6 +- .../m3coordinator/ingest/m3msg/ingest_test.go | 8 +- .../m3coordinator/ingest/write_test.go | 58 ++-- src/cmd/services/m3query/config/config.go | 10 +- .../services/m3query/config/config_test.go | 21 ++ .../api/v1/handler/graphite/render_test.go | 8 +- src/query/api/v1/handler/json/write.go | 2 + .../v1/handler/prometheus/native/common.go | 2 +- .../handler/prometheus/native/common_test.go | 36 +- .../api/v1/handler/prometheus/remote/write.go | 2 +- .../handler/prometheus/validator/handler.go | 2 +- src/query/api/v1/handler/search_test.go | 2 +- .../benchmarker/main/convert_to_prom.go | 2 +- src/query/benchmark/common/parse_json.go | 7 +- src/query/block/scalar_test.go | 4 +- src/query/block/types.go | 2 +- src/query/functions/aggregation/base.go | 2 +- src/query/functions/aggregation/base_test.go | 26 +- .../functions/aggregation/count_values.go | 4 +- .../aggregation/count_values_test.go | 2 +- .../functions/aggregation/quantile_test.go | 10 +- src/query/functions/aggregation/take.go | 2 +- src/query/functions/binary/binary_test.go | 4 +- src/query/functions/binary/common.go | 8 +- src/query/functions/binary/or_test.go | 6 +- src/query/functions/binary/unless_test.go | 4 +- src/query/functions/temporal/base_test.go | 14 +- src/query/functions/utils/group.go | 6 +- src/query/functions/utils/group_test.go | 5 +- src/query/functions/utils/metadata_test.go | 10 +- src/query/graphite/storage/converter.go | 31 -- src/query/graphite/storage/m3_wrapper.go | 12 +- src/query/graphite/storage/m3_wrapper_test.go | 8 +- src/query/mocks/resolver.go | 72 ---- src/query/models/config.go | 83 +++++ src/query/models/config_test.go | 67 ++++ src/query/models/matcher.go | 4 +- src/query/models/options.go | 18 +- src/query/models/options_test.go | 63 ++++ src/query/models/strconv/checker.go | 66 ++++ .../strconv/checker_test.go} | 65 ++-- src/query/models/strconv/quote.go | 304 +++++++++++++++++ src/query/models/strconv/quote_test.go | 221 +++++++++++++ src/query/models/tags.go | 312 +++++++++++++----- src/query/models/tags_test.go | 302 ++++++++++++++--- src/query/models/types.go | 58 +++- src/query/policy/resolver/interface.go | 43 --- src/query/storage/consolidated_test.go | 2 +- src/query/storage/converter_test.go | 2 +- src/query/storage/fanout/storage_test.go | 2 + src/query/storage/index.go | 3 +- src/query/storage/index_test.go | 2 +- src/query/storage/m3/storage.go | 3 +- src/query/storage/m3/storage_test.go | 4 +- src/query/storage/types.go | 2 +- src/query/storage/validator/storage.go | 2 +- src/query/storage/validator/storage_test.go | 2 +- src/query/test/block.go | 7 +- src/query/test/comparison.go | 8 +- src/query/ts/m3db/convert_test.go | 2 +- src/query/ts/m3db/encoded_block_iterator.go | 2 +- src/query/ts/series.go | 6 +- src/query/ts/series_test.go | 5 +- src/query/tsdb/remote/codecs.go | 6 +- src/query/tsdb/remote/codecs_test.go | 2 +- src/query/util/execution/parallel.go | 9 +- src/query/util/writer/int_writer.go | 88 +++++ src/query/util/writer/int_writer_test.go | 139 ++++++++ 71 files changed, 1869 insertions(+), 480 deletions(-) delete mode 100644 src/query/mocks/resolver.go create mode 100644 src/query/models/config.go create mode 100644 src/query/models/config_test.go create mode 100644 src/query/models/options_test.go create mode 100644 src/query/models/strconv/checker.go rename src/query/{policy/resolver/static.go => models/strconv/checker_test.go} (50%) create mode 100644 src/query/models/strconv/quote.go create mode 100644 src/query/models/strconv/quote_test.go delete mode 100644 src/query/policy/resolver/interface.go create mode 100644 src/query/util/writer/int_writer.go create mode 100644 src/query/util/writer/int_writer_test.go diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go index e94cfcdc39..a11508c039 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go @@ -83,10 +83,17 @@ func NewIngester( return nil, err } + tagOpts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite) + err = tagOpts.Validate() + if err != nil { + return nil, err + } + return &ingester{ downsamplerAndWriter: downsamplerAndWriter, opts: opts, logger: opts.InstrumentOptions.Logger(), + tagOpts: tagOpts, metrics: newCarbonIngesterMetrics( opts.InstrumentOptions.MetricsScope()), }, nil @@ -97,6 +104,7 @@ type ingester struct { opts Options logger log.Logger metrics carbonIngesterMetrics + tagOpts models.TagOptions } func (i *ingester) Handle(conn net.Conn) { @@ -140,7 +148,7 @@ func (i *ingester) Handle(conn net.Conn) { func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool { datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}} // TODO(rartoul): Pool. - tags, err := GenerateTagsFromName(name) + tags, err := GenerateTagsFromName(name, i.tagOpts) if err != nil { i.logger.Errorf("err generating tags from carbon name: %s, err: %s", string(name), err) @@ -196,9 +204,12 @@ type carbonIngesterMetrics struct { // __g0__:foo // __g1__:bar // __g2__:baz -func GenerateTagsFromName(name []byte) (models.Tags, error) { +func GenerateTagsFromName( + name []byte, + opts models.TagOptions, +) (models.Tags, error) { if len(name) == 0 { - return models.Tags{}, errCannotGenerateTagsFromEmptyName + return models.EmptyTags(), errCannotGenerateTagsFromEmptyName } var ( @@ -211,7 +222,8 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) { for i, charByte := range name { if charByte == carbonSeparatorByte { if i+1 < len(name) && name[i+1] == carbonSeparatorByte { - return models.Tags{}, fmt.Errorf("carbon metric: %s has duplicate separator", string(name)) + return models.EmptyTags(), + fmt.Errorf("carbon metric: %s has duplicate separator", string(name)) } tags = append(tags, models.Tag{ @@ -239,5 +251,5 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) { }) } - return models.Tags{Tags: tags}, nil + return models.NewTags(numTags, opts).AddTags(tags), nil } diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest_benchmark_test.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest_benchmark_test.go index 082dba077c..a361df84b8 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest_benchmark_test.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest_benchmark_test.go @@ -34,8 +34,9 @@ func BenchmarkGenerateTagsFromName(b *testing.B) { err error ) + opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite) for i := 0; i < b.N; i++ { - benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName) + benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName, opts) if err != nil { panic(err) } diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go index 9cd2ace5db..78aead4084 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go @@ -42,6 +42,7 @@ import ( xtime "github.com/m3db/m3x/time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -105,17 +106,20 @@ func TestIngesterHandleConn(t *testing.T) { func TestGenerateTagsFromName(t *testing.T) { testCases := []struct { name string + id string expectedTags []models.Tag expectedErr error }{ { name: "foo", + id: "foo", expectedTags: []models.Tag{ {Name: graphite.TagName(0), Value: []byte("foo")}, }, }, { name: "foo.bar.baz", + id: "foo.bar.baz", expectedTags: []models.Tag{ {Name: graphite.TagName(0), Value: []byte("foo")}, {Name: graphite.TagName(1), Value: []byte("bar")}, @@ -124,6 +128,7 @@ func TestGenerateTagsFromName(t *testing.T) { }, { name: "foo.bar.baz.", + id: "foo.bar.baz", expectedTags: []models.Tag{ {Name: graphite.TagName(0), Value: []byte("foo")}, {Name: graphite.TagName(1), Value: []byte("bar")}, @@ -131,21 +136,25 @@ func TestGenerateTagsFromName(t *testing.T) { }, }, { - name: "foo..bar..baz..", - expectedErr: fmt.Errorf("carbon metric: foo..bar..baz.. has duplicate separator"), + name: "foo..bar..baz..", + expectedErr: fmt.Errorf("carbon metric: foo..bar..baz.. has duplicate separator"), + expectedTags: []models.Tag{}, }, { - name: "foo.bar.baz..", - expectedErr: fmt.Errorf("carbon metric: foo.bar.baz.. has duplicate separator"), + name: "foo.bar.baz..", + expectedErr: fmt.Errorf("carbon metric: foo.bar.baz.. has duplicate separator"), + expectedTags: []models.Tag{}, }, } + opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite) for _, tc := range testCases { - tags, err := GenerateTagsFromName([]byte(tc.name)) + tags, err := GenerateTagsFromName([]byte(tc.name), opts) if tc.expectedErr != nil { require.Equal(t, tc.expectedErr, err) } else { require.NoError(t, err) + assert.Equal(t, []byte(tc.id), tags.ID()) } require.Equal(t, tc.expectedTags, tags.Tags) } @@ -245,7 +254,8 @@ func init() { metric = []byte(fmt.Sprintf("test.metric.%d", i)) - tags, err := GenerateTagsFromName(metric) + opts := models.NewTagOptions().SetIDSchemeType(models.TypeGraphite) + tags, err := GenerateTagsFromName(metric, opts) if err != nil { panic(err) } diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go index 863e83138c..8fa74b450d 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go @@ -92,6 +92,9 @@ func NewIngester( m: m, logger: opts.InstrumentOptions.Logger(), sampler: opts.Sampler, + q: storage.WriteQuery{ + Tags: models.NewTags(0, nil), + }, } op.attemptFn = op.attempt op.ingestFn = op.ingest @@ -199,11 +202,12 @@ func (op *ingestOp) resetTags() error { op.q.Tags.Tags = op.q.Tags.Tags[:0] for op.it.Next() { name, value := op.it.Current() - op.q.Tags = op.q.Tags.AddTag(models.Tag{ + op.q.Tags = op.q.Tags.AddTagWithoutNormalizing(models.Tag{ Name: name, Value: value, }.Clone()) } + op.q.Tags.Normalize() return op.it.Err() } diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go index 0bae2d20e0..3022793aa3 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go @@ -87,18 +87,18 @@ func TestIngest(t *testing.T) { Value: val, }, }, - Tags: models.Tags{ - Tags: []models.Tag{ + Tags: models.NewTags(2, nil).AddTags( + []models.Tag{ models.Tag{ Name: []byte("__name__"), Value: []byte("foo"), }, - models.Tag{ + { Name: []byte("app"), Value: []byte("bar"), }, }, - }, + ), Unit: xtime.Second, }, *appender.received[0], diff --git a/src/cmd/services/m3coordinator/ingest/write_test.go b/src/cmd/services/m3coordinator/ingest/write_test.go index f6908f8b7e..29209995bb 100644 --- a/src/cmd/services/m3coordinator/ingest/write_test.go +++ b/src/cmd/services/m3coordinator/ingest/write_test.go @@ -37,34 +37,38 @@ import ( ) var ( - testTags1 = models.Tags{Tags: []models.Tag{ - { - Name: []byte("test_1_key_1"), - Value: []byte("test_1_value_1"), - }, - { - Name: []byte("test_1_key_2"), - Value: []byte("test_1_value_2"), - }, - { - Name: []byte("test_1_key_3"), - Value: []byte("test_1_value_3"), - }, - }} - testTags2 = models.Tags{Tags: []models.Tag{ - { - Name: []byte("test_2_key_1"), - Value: []byte("test_2_value_1"), - }, - { - Name: []byte("test_2_key_2"), - Value: []byte("test_2_value_2"), + testTags1 = models.NewTags(3, nil).AddTags( + []models.Tag{ + { + Name: []byte("test_1_key_1"), + Value: []byte("test_1_value_1"), + }, + { + Name: []byte("test_1_key_2"), + Value: []byte("test_1_value_2"), + }, + { + Name: []byte("test_1_key_3"), + Value: []byte("test_1_value_3"), + }, }, - { - Name: []byte("test_2_key_3"), - Value: []byte("test_2_value_3"), + ) + testTags2 = models.NewTags(3, nil).AddTags( + []models.Tag{ + { + Name: []byte("test_2_key_1"), + Value: []byte("test_2_value_1"), + }, + { + Name: []byte("test_2_key_2"), + Value: []byte("test_2_value_2"), + }, + { + Name: []byte("test_2_key_3"), + Value: []byte("test_2_value_3"), + }, }, - }} + ) testDatapoints1 = []ts.Datapoint{ { @@ -125,7 +129,7 @@ func (i *testIter) Next() bool { func (i *testIter) Current() (models.Tags, ts.Datapoints, xtime.Unit) { if len(i.entries) == 0 || i.idx < 0 || i.idx >= len(i.entries) { - return models.Tags{}, nil, 0 + return models.EmptyTags(), nil, 0 } curr := i.entries[i.idx] diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index d71f1178c1..444980a51a 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -200,8 +200,11 @@ type RPCConfiguration struct { // relevant options. type TagOptionsConfiguration struct { // MetricName specifies the tag name that corresponds to the metric's name tag - // If not provided, defaults to `__name__` + // If not provided, defaults to `__name__`. MetricName string `yaml:"metricName"` + + // Scheme determines the default ID generation scheme. Defaults to TypeLegacy. + Scheme models.IDSchemeType `yaml:"idScheme"` } // TagOptionsFromConfig translates tag option configuration into tag options. @@ -212,6 +215,11 @@ func TagOptionsFromConfig(cfg TagOptionsConfiguration) (models.TagOptions, error opts = opts.SetMetricName([]byte(name)) } + if cfg.Scheme == models.TypeDefault { + cfg.Scheme = models.TypeLegacy + } + + opts = opts.SetIDSchemeType(cfg.Scheme) if err := opts.Validate(); err != nil { return nil, err } diff --git a/src/cmd/services/m3query/config/config_test.go b/src/cmd/services/m3query/config/config_test.go index 002789b220..061c76cb76 100644 --- a/src/cmd/services/m3query/config/config_test.go +++ b/src/cmd/services/m3query/config/config_test.go @@ -23,11 +23,13 @@ package config import ( "testing" + "github.com/m3db/m3/src/query/models" xconfig "github.com/m3db/m3x/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/validator.v2" + yaml "gopkg.in/yaml.v2" ) func TestTagOptionsFromEmptyConfig(t *testing.T) { @@ -94,3 +96,22 @@ func TestConfigValidation(t *testing.T) { }) } } + +func TestDefaultTagOptionsConfig(t *testing.T) { + var cfg TagOptionsConfiguration + require.NoError(t, yaml.Unmarshal([]byte(""), &cfg)) + opts, err := TagOptionsFromConfig(cfg) + require.NoError(t, err) + assert.Equal(t, []byte("__name__"), opts.MetricName()) + assert.Equal(t, models.TypeLegacy, opts.IDSchemeType()) +} + +func TestTagOptionsConfig(t *testing.T) { + var cfg TagOptionsConfiguration + config := "metricName: abcdefg\nidScheme: prepend_meta" + require.NoError(t, yaml.Unmarshal([]byte(config), &cfg)) + opts, err := TagOptionsFromConfig(cfg) + require.NoError(t, err) + assert.Equal(t, []byte("abcdefg"), opts.MetricName()) + assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType()) +} diff --git a/src/query/api/v1/handler/graphite/render_test.go b/src/query/api/v1/handler/graphite/render_test.go index a6d57ef3c0..f35bfe5d9b 100644 --- a/src/query/api/v1/handler/graphite/render_test.go +++ b/src/query/api/v1/handler/graphite/render_test.go @@ -75,7 +75,7 @@ func TestParseQueryResults(t *testing.T) { tags = tags.AddTag(models.Tag{Name: graphite.TagName(0), Value: []byte("foo")}) tags = tags.AddTag(models.Tag{Name: graphite.TagName(1), Value: []byte("bar")}) seriesList := ts.SeriesList{ - ts.NewSeries("irrelevant_name", vals, tags), + ts.NewSeries([]byte("series_name"), vals, tags), } for _, series := range seriesList { series.SetResolution(resolution) @@ -96,7 +96,7 @@ func TestParseQueryResults(t *testing.T) { buf, err := ioutil.ReadAll(res.Body) require.NoError(t, err) expected := fmt.Sprintf( - `[{"target":"foo.bar","datapoints":[[3.000000,%d],`+ + `[{"target":"series_name","datapoints":[[3.000000,%d],`+ `[3.000000,%d],[3.000000,%d]],"step_size_ms":%d}]`, start.Unix(), start.Unix()+10, start.Unix()+20, resolution/time.Millisecond) @@ -116,7 +116,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { resolution := 10 * time.Second vals := ts.NewFixedStepValues(resolution, 4, 4, start) seriesList := ts.SeriesList{ - ts.NewSeries("a", vals, models.NewTags(0, nil)), + ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)), } for _, series := range seriesList { series.SetResolution(resolution) @@ -151,7 +151,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) { resolution := 10 * time.Second vals := ts.NewFixedStepValues(resolution, 3, 3, start) seriesList := ts.SeriesList{ - ts.NewSeries("a", vals, models.NewTags(0, nil)), + ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)), } for _, series := range seriesList { series.SetResolution(resolution) diff --git a/src/query/api/v1/handler/json/write.go b/src/query/api/v1/handler/json/write.go index e76ebeb296..10147eab80 100644 --- a/src/query/api/v1/handler/json/write.go +++ b/src/query/api/v1/handler/json/write.go @@ -60,6 +60,8 @@ func NewWriteJSONHandler(store storage.Storage) http.Handler { // WriteQuery represents the write request from the user // NB(braskin): support only writing one datapoint for now +// TODO: build this out to be a legitimate batched endpoint, change +// Tags to take a list of tag structs type WriteQuery struct { Tags map[string]string `json:"tags" validate:"nonzero"` Timestamp string `json:"timestamp" validate:"nonzero"` diff --git a/src/query/api/v1/handler/prometheus/native/common.go b/src/query/api/v1/handler/prometheus/native/common.go index 4511085b5a..7a051a0f1c 100644 --- a/src/query/api/v1/handler/prometheus/native/common.go +++ b/src/query/api/v1/handler/prometheus/native/common.go @@ -350,7 +350,7 @@ func renderM3QLResultsJSON( for _, s := range series { jw.BeginObject() jw.BeginObjectField("target") - jw.WriteString(s.Name()) + jw.WriteString(string(s.Name())) jw.BeginObjectField("tags") jw.BeginObject() diff --git a/src/query/api/v1/handler/prometheus/native/common_test.go b/src/query/api/v1/handler/prometheus/native/common_test.go index f23607c650..64b43e6fb1 100644 --- a/src/query/api/v1/handler/prometheus/native/common_test.go +++ b/src/query/api/v1/handler/prometheus/native/common_test.go @@ -152,14 +152,16 @@ func TestRenderResultsJSON(t *testing.T) { buffer := bytes.NewBuffer(nil) params := models.RequestParams{} series := []*ts.Series{ - ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("bar"), Value: []byte("baz")}, - models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, - })), - ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("baz"), Value: []byte("bar")}, - models.Tag{Name: []byte("qaz"), Value: []byte("qux")}, - })), + ts.NewSeries([]byte("foo"), + ts.NewFixedStepValues(10*time.Second, 2, 1, start), test.TagSliceToTags([]models.Tag{ + models.Tag{Name: []byte("bar"), Value: []byte("baz")}, + models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, + })), + ts.NewSeries([]byte("bar"), + ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{ + models.Tag{Name: []byte("baz"), Value: []byte("bar")}, + models.Tag{Name: []byte("qaz"), Value: []byte("qux")}, + })), } renderResultsJSON(buffer, series, params) @@ -216,14 +218,16 @@ func TestRenderInstantaneousResultsJSON(t *testing.T) { start := time.Unix(1535948880, 0) buffer := bytes.NewBuffer(nil) series := []*ts.Series{ - ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("bar"), Value: []byte("baz")}, - models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, - })), - ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("baz"), Value: []byte("bar")}, - models.Tag{Name: []byte("qaz"), Value: []byte("qux")}, - })), + ts.NewSeries([]byte("foo"), + ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{ + models.Tag{Name: []byte("bar"), Value: []byte("baz")}, + models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, + })), + ts.NewSeries([]byte("bar"), + ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{ + models.Tag{Name: []byte("baz"), Value: []byte("bar")}, + models.Tag{Name: []byte("qaz"), Value: []byte("qux")}, + })), } renderResultsInstantaneousJSON(buffer, series) diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index 1c89d7ec1c..0b08e5f44d 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -161,7 +161,7 @@ func (i *promTSIter) Next() bool { func (i *promTSIter) Current() (models.Tags, ts.Datapoints, xtime.Unit) { if len(i.tags) == 0 || i.idx < 0 || i.idx >= len(i.tags) { - return models.Tags{}, nil, 0 + return models.EmptyTags(), nil, 0 } return i.tags[i.idx], i.datapoints[i.idx], xtime.Millisecond diff --git a/src/query/api/v1/handler/prometheus/validator/handler.go b/src/query/api/v1/handler/prometheus/validator/handler.go index a6e49f071a..58c66235b5 100644 --- a/src/query/api/v1/handler/prometheus/validator/handler.go +++ b/src/query/api/v1/handler/prometheus/validator/handler.go @@ -152,7 +152,7 @@ func tsListToMap(tsList []*ts.Series) map[string]*ts.Series { for _, series := range tsList { series.Tags = series.Tags.Normalize() id := series.Tags.ID() - tsMap[id] = series + tsMap[string(id)] = series } return tsMap diff --git a/src/query/api/v1/handler/search_test.go b/src/query/api/v1/handler/search_test.go index 4ff69db883..e553b328df 100644 --- a/src/query/api/v1/handler/search_test.go +++ b/src/query/api/v1/handler/search_test.go @@ -110,7 +110,7 @@ func TestSearchResponse(t *testing.T) { results, err := searchHandler.search(context.TODO(), generateSearchReq(), &opts) require.NoError(t, err) - assert.Equal(t, testID, results.Metrics[0].ID) + assert.Equal(t, []byte(testID), results.Metrics[0].ID) expected := test.TagSliceToTags([]models.Tag{{Name: []byte("foo"), Value: []byte("bar")}}) assert.Equal(t, expected.Tags, results.Metrics[0].Tags.Tags) } diff --git a/src/query/benchmark/benchmarker/main/convert_to_prom.go b/src/query/benchmark/benchmarker/main/convert_to_prom.go index 43f03fc7a1..400be126cb 100644 --- a/src/query/benchmark/benchmarker/main/convert_to_prom.go +++ b/src/query/benchmark/benchmarker/main/convert_to_prom.go @@ -62,7 +62,7 @@ func calculateCardinality(fromFile string, logger *zap.Logger) (int, error) { ts, _ := marshalTSDBToProm(tsdb) tags := storage.PromLabelsToM3Tags(ts.GetLabels(), models.NewTagOptions()) id := tags.ID() - tagsSeen[id]++ + tagsSeen[string(id)]++ read++ if read%marker == 0 { diff --git a/src/query/benchmark/common/parse_json.go b/src/query/benchmark/common/parse_json.go index fa0555534e..9cf1a540c6 100644 --- a/src/query/benchmark/common/parse_json.go +++ b/src/query/benchmark/common/parse_json.go @@ -170,11 +170,14 @@ func id(lowerCaseTags map[string]string, name string) string { } func metricsToPromTS(m Metrics) *prompb.TimeSeries { - tags := models.Tags{} + tags := models.NewTags(len(m.Tags), nil) for n, v := range m.Tags { - tags = tags.AddTag(models.Tag{Name: []byte(n), Value: []byte(v)}) + tags = tags.AddTagWithoutNormalizing( + models.Tag{Name: []byte(n), Value: []byte(v)}, + ) } + tags.Normalize() labels := storage.TagsToPromLabels(tags) samples := metricsPointsToSamples(m.Value, m.Time) return &prompb.TimeSeries{ diff --git a/src/query/block/scalar_test.go b/src/query/block/scalar_test.go index c1a0ae871d..cf7af236a1 100644 --- a/src/query/block/scalar_test.go +++ b/src/query/block/scalar_test.go @@ -98,7 +98,7 @@ func TestScalarBlock(t *testing.T) { } assert.Equal(t, 0, series.Meta.Tags.Len()) - assert.Equal(t, "", series.Meta.Name) + assert.Equal(t, []byte(nil), series.Meta.Name) require.False(t, seriesIter.Next()) series, err = seriesIter.Current() @@ -117,5 +117,5 @@ func verifyMetas(t *testing.T, meta Metadata, seriesMeta []SeriesMeta) { assert.Len(t, seriesMeta, 1) sMeta := seriesMeta[0] assert.Equal(t, 0, sMeta.Tags.Len()) - assert.Equal(t, "", sMeta.Name) + assert.Equal(t, []byte(nil), sMeta.Name) } diff --git a/src/query/block/types.go b/src/query/block/types.go index fc9acb97d6..69f4bf3d47 100644 --- a/src/query/block/types.go +++ b/src/query/block/types.go @@ -59,7 +59,7 @@ type UnconsolidatedBlock interface { // SeriesMeta is metadata data for the series type SeriesMeta struct { Tags models.Tags - Name string + Name []byte } // Iterator is the base iterator diff --git a/src/query/functions/aggregation/base.go b/src/query/functions/aggregation/base.go index 58518163ad..594e460b6a 100644 --- a/src/query/functions/aggregation/base.go +++ b/src/query/functions/aggregation/base.go @@ -121,7 +121,7 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { buckets, metas := utils.GroupSeries( params.MatchingTags, params.Without, - n.op.opType, + []byte(n.op.opType), seriesMetas, ) meta.Tags, metas = utils.DedupeMetadata(metas) diff --git a/src/query/functions/aggregation/base_test.go b/src/query/functions/aggregation/base_test.go index 5e91e7645c..0af94abfb9 100644 --- a/src/query/functions/aggregation/base_test.go +++ b/src/query/functions/aggregation/base_test.go @@ -59,6 +59,8 @@ var ( Duration: time.Minute * 5, StepSize: time.Minute, } + + typeBytes = []byte(StandardDeviationType) ) func processAggregationOp(t *testing.T, op parser.Params) *executor.SinkNode { @@ -86,9 +88,9 @@ func TestFunctionFilteringWithA(t *testing.T) { } expectedMetas := []block.SeriesMeta{ - {Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("1")}})}, - {Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("2")}})}, - {Name: StandardDeviationType, Tags: models.EmptyTags()}, + {Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("1")}})}, + {Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("a"), Value: []byte("2")}})}, + {Name: typeBytes, Tags: models.EmptyTags()}, } expectedMetaTags := models.EmptyTags() @@ -113,9 +115,9 @@ func TestFunctionFilteringWithoutA(t *testing.T) { } expectedMetas := []block.SeriesMeta{ - {Name: StandardDeviationType, Tags: models.EmptyTags()}, - {Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})}, - {Name: StandardDeviationType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})}, + {Name: typeBytes, Tags: models.EmptyTags()}, + {Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})}, + {Name: typeBytes, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})}, } expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}}) @@ -136,7 +138,7 @@ func TestFunctionFilteringWithD(t *testing.T) { } expectedMetas := []block.SeriesMeta{ - {Name: StandardDeviationType, Tags: models.EmptyTags()}, + {Name: typeBytes, Tags: models.EmptyTags()}, } expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}}) @@ -166,11 +168,11 @@ func TestFunctionFilteringWithoutD(t *testing.T) { } expectedMetas := []block.SeriesMeta{ - {Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})}, - {Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})}, - {Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})}, - {Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})}, - {Name: StandardDeviationType, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})}, } expectedMetaTags := models.EmptyTags() diff --git a/src/query/functions/aggregation/count_values.go b/src/query/functions/aggregation/count_values.go index 1a117832c9..62245e339d 100644 --- a/src/query/functions/aggregation/count_values.go +++ b/src/query/functions/aggregation/count_values.go @@ -152,7 +152,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error { buckets, metas := utils.GroupSeries( params.MatchingTags, params.Without, - n.op.opType, + []byte(n.op.opType), seriesMetas, ) @@ -192,7 +192,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error { for k, v := range bucketBlock.indexMapping { // Add the metas of this bucketBlock right after the previous block blockMetas[v+previousBucketBlockIndex] = block.SeriesMeta{ - Name: n.op.OpType(), + Name: []byte(n.op.opType), Tags: metas[bucketIndex].Tags.Clone().AddTag(models.Tag{ Name: []byte(n.op.params.StringParameter), Value: utils.FormatFloatToBytes(k), diff --git a/src/query/functions/aggregation/count_values_test.go b/src/query/functions/aggregation/count_values_test.go index 8e874f8981..b74bcd1a8e 100644 --- a/src/query/functions/aggregation/count_values_test.go +++ b/src/query/functions/aggregation/count_values_test.go @@ -67,7 +67,7 @@ func tagsToSeriesMeta(tags []models.Tags) []block.SeriesMeta { expectedMetas := make([]block.SeriesMeta, len(tags)) for i, m := range tags { expectedMetas[i] = block.SeriesMeta{ - Name: CountValuesType, + Name: []byte(CountValuesType), Tags: m, } } diff --git a/src/query/functions/aggregation/quantile_test.go b/src/query/functions/aggregation/quantile_test.go index 545823a885..85cddce125 100644 --- a/src/query/functions/aggregation/quantile_test.go +++ b/src/query/functions/aggregation/quantile_test.go @@ -32,6 +32,10 @@ import ( "github.com/stretchr/testify/require" ) +var ( + typeBytesQuantile = []byte(QuantileType) +) + func TestQuantileFn(t *testing.T) { values := []float64{3.1, 100, 200, 300, 2.1, 800, 1.1, 4.1, 5.1} // NB Taken values by bucket: [3.1, 2.1, 1.1, 4.1] @@ -147,9 +151,9 @@ func TestQuantileFunctionFilteringWithoutA(t *testing.T) { } expectedMetas := []block.SeriesMeta{ - {Name: QuantileType, Tags: models.EmptyTags()}, - {Name: QuantileType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})}, - {Name: QuantileType, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})}, + {Name: typeBytesQuantile, Tags: models.EmptyTags()}, + {Name: typeBytesQuantile, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("b"), Value: []byte("2")}})}, + {Name: typeBytesQuantile, Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("c"), Value: []byte("3")}})}, } expectedMetaTags := test.TagSliceToTags([]models.Tag{{Name: []byte("d"), Value: []byte("4")}}) diff --git a/src/query/functions/aggregation/take.go b/src/query/functions/aggregation/take.go index 3b1524e5cb..2ff98fc732 100644 --- a/src/query/functions/aggregation/take.go +++ b/src/query/functions/aggregation/take.go @@ -123,7 +123,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error { buckets, _ := utils.GroupSeries( params.MatchingTags, params.Without, - n.op.opType, + []byte(n.op.opType), seriesMetas, ) diff --git a/src/query/functions/binary/binary_test.go b/src/query/functions/binary/binary_test.go index 3d6e40f204..1ee5a783cc 100644 --- a/src/query/functions/binary/binary_test.go +++ b/src/query/functions/binary/binary_test.go @@ -133,7 +133,7 @@ func TestScalars(t *testing.T) { assert.Equal(t, 0, sink.Meta.Tags.Len()) assert.Len(t, sink.Metas, 1) - assert.Equal(t, "", sink.Metas[0].Name) + assert.Equal(t, []byte(nil), sink.Metas[0].Name) assert.Equal(t, 0, sink.Metas[0].Tags.Len()) }) } @@ -185,7 +185,7 @@ func TestScalarsReturnBoolFalse(t *testing.T) { assert.Equal(t, 0, sink.Meta.Tags.Len()) assert.Len(t, sink.Metas, 1) - assert.Equal(t, "", sink.Metas[0].Name) + assert.Equal(t, []byte(nil), sink.Metas[0].Name) assert.Equal(t, 0, sink.Metas[0].Tags.Len()) }) } diff --git a/src/query/functions/binary/common.go b/src/query/functions/binary/common.go index f9ec76c1da..00b2f80350 100644 --- a/src/query/functions/binary/common.go +++ b/src/query/functions/binary/common.go @@ -63,10 +63,14 @@ type VectorMatching struct { // ignoring the provided labels. If on, then the given labels are only used instead. func HashFunc(on bool, names ...[]byte) func(models.Tags) uint64 { if on { - return func(tags models.Tags) uint64 { return tags.IDWithKeys(names...) } + return func(tags models.Tags) uint64 { + return tags.TagsWithKeys(names).HashedID() + } } - return func(tags models.Tags) uint64 { return tags.IDWithExcludes(names...) } + return func(tags models.Tags) uint64 { + return tags.TagsWithoutKeys(names).HashedID() + } } const initIndexSliceLength = 10 diff --git a/src/query/functions/binary/or_test.go b/src/query/functions/binary/or_test.go index c93c2ed70e..623eb6059d 100644 --- a/src/query/functions/binary/or_test.go +++ b/src/query/functions/binary/or_test.go @@ -100,11 +100,11 @@ func generateMetaDataWithTagsInRange(fromRange, toRange int) []block.SeriesMeta length := toRange - fromRange meta := make([]block.SeriesMeta, length) for i := 0; i < length; i++ { - strIdx := fmt.Sprint(fromRange + i) - tags := test.TagSliceToTags([]models.Tag{{Name: []byte(strIdx), Value: []byte(strIdx)}}) + idx := []byte(fmt.Sprint(fromRange + i)) + tags := test.TagSliceToTags([]models.Tag{{Name: idx, Value: idx}}) meta[i] = block.SeriesMeta{ Tags: tags, - Name: strIdx, + Name: idx, } } return meta diff --git a/src/query/functions/binary/unless_test.go b/src/query/functions/binary/unless_test.go index f59fe5485d..75a713181c 100644 --- a/src/query/functions/binary/unless_test.go +++ b/src/query/functions/binary/unless_test.go @@ -138,7 +138,7 @@ var unlessTests = []struct { test.NewSeriesMeta("a", 3)[1:], [][]float64{{3, 4}, {30, 40}}, test.NewSeriesMeta("a", 1)[0].Tags, - []block.SeriesMeta{{Tags: models.EmptyTags(), Name: "a0"}}, + []block.SeriesMeta{{Tags: models.EmptyTags(), Name: []byte("a0")}}, [][]float64{{1, 2}}, nil, }, @@ -149,7 +149,7 @@ var unlessTests = []struct { test.NewSeriesMeta("a", 4)[1:], [][]float64{{3, 4}, {30, 40}, {300, 400}}, test.NewSeriesMeta("a", 1)[0].Tags, - []block.SeriesMeta{{Tags: models.EmptyTags(), Name: "a0"}}, + []block.SeriesMeta{{Tags: models.EmptyTags(), Name: []byte("a0")}}, [][]float64{{1, 2}}, nil, }, diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index 9600340c94..c936850271 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -357,12 +357,12 @@ func TestSingleProcessRequest(t *testing.T) { boundStart := bounds.Start seriesMetas := []block.SeriesMeta{{ - Name: "s1", + Name: []byte("s1"), Tags: models.EmptyTags().AddTags([]models.Tag{{ Name: []byte("t1"), Value: []byte("v1"), }})}, { - Name: "s2", + Name: []byte("s2"), Tags: models.EmptyTags().AddTags([]models.Tag{{ Name: []byte("t1"), Value: []byte("v2"), @@ -404,14 +404,16 @@ func TestSingleProcessRequest(t *testing.T) { // Previous Block: 10 11 12 13 14 15 // i = 0; prev values [11, 12, 13, 14, 15], current values [0], sum = 50 // i = 1; prev values [12, 13, 14, 15], current values [0, 1], sum = 40 - assert.Equal(t, sink.Values[0], []float64{50, 40, 30, 20, 10}, "first series is 10 - 14 which sums to 60, the current block first series is 0-4 which sums to 10, we need 5 values per aggregation") - assert.Equal(t, sink.Values[1], []float64{75, 65, 55, 45, 35}, "second series is 15 - 19 which sums to 85 and second series is 5-9 which sums to 35") + assert.Equal(t, sink.Values[0], []float64{50, 40, 30, 20, 10}, + "first series is 10 - 14 which sums to 60, the current block first series is 0-4 which sums to 10, we need 5 values per aggregation") + assert.Equal(t, sink.Values[1], []float64{75, 65, 55, 45, 35}, + "second series is 15 - 19 which sums to 85 and second series is 5-9 which sums to 35") // processSingleRequest renames the series to use their ids; reflect this in our expectation. expectedSeriesMetas := make([]block.SeriesMeta, len(seriesMetas)) require.Equal(t, len(expectedSeriesMetas), copy(expectedSeriesMetas, seriesMetas)) - expectedSeriesMetas[0].Name = "t1=v1," - expectedSeriesMetas[1].Name = "t1=v2," + expectedSeriesMetas[0].Name = []byte("t1=v1,") + expectedSeriesMetas[1].Name = []byte("t1=v2,") assert.Equal(t, expectedSeriesMetas, sink.Metas, "Process should pass along series meta, renaming to the ID") } diff --git a/src/query/functions/utils/group.go b/src/query/functions/utils/group.go index abee3978d0..cc83959158 100644 --- a/src/query/functions/utils/group.go +++ b/src/query/functions/utils/group.go @@ -28,11 +28,11 @@ import ( type withKeysID func(tags models.Tags, matchingTags [][]byte) uint64 func includeKeysID(tags models.Tags, matchingTags [][]byte) uint64 { - return tags.IDWithKeys(matchingTags...) + return tags.TagsWithKeys(matchingTags).HashedID() } func excludeKeysID(tags models.Tags, matchingTags [][]byte) uint64 { - return tags.IDWithExcludes(matchingTags...) + return tags.TagsWithoutKeys(matchingTags).HashedID() } type withKeysTags func(tags models.Tags, matchingTags [][]byte) models.Tags @@ -52,7 +52,7 @@ func excludeKeysTags(tags models.Tags, matchingTags [][]byte) models.Tags { func GroupSeries( matchingTags [][]byte, without bool, - opName string, + opName []byte, metas []block.SeriesMeta, ) ([][]int, []block.SeriesMeta) { var idFunc withKeysID diff --git a/src/query/functions/utils/group_test.go b/src/query/functions/utils/group_test.go index 7abf9f86d3..2d14dd8ab0 100644 --- a/src/query/functions/utils/group_test.go +++ b/src/query/functions/utils/group_test.go @@ -250,7 +250,8 @@ func testCollect(t *testing.T, without bool) { match[i] = []byte(m) } - buckets, collected := GroupSeries(match, without, name, metas) + bName := []byte(name) + buckets, collected := GroupSeries(match, without, bName, metas) expectedTags := tt.withTagsExpectedTags expectedIndicies := tt.withTagsExpectedIndices if without { @@ -262,7 +263,7 @@ func testCollect(t *testing.T, without bool) { for i, tags := range expectedTags { expectedMetas[i] = block.SeriesMeta{ Tags: tags, - Name: name, + Name: bName, } } diff --git a/src/query/functions/utils/metadata_test.go b/src/query/functions/utils/metadata_test.go index 1445492f77..95f2971c53 100644 --- a/src/query/functions/utils/metadata_test.go +++ b/src/query/functions/utils/metadata_test.go @@ -37,18 +37,20 @@ func TestFlattenMetadata(t *testing.T) { })} seriesMetas := []block.SeriesMeta{ - {Name: "foo", Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("e"), Value: []byte("f")}})}, - {Name: "bar", Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("g"), Value: []byte("h")}})}, + {Name: []byte("foo"), + Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("e"), Value: []byte("f")}})}, + {Name: []byte("bar"), + Tags: test.TagSliceToTags([]models.Tag{{Name: []byte("g"), Value: []byte("h")}})}, } flattened := FlattenMetadata(meta, seriesMetas) expected := []block.SeriesMeta{ - {Name: "foo", Tags: test.TagSliceToTags([]models.Tag{ + {Name: []byte("foo"), Tags: test.TagSliceToTags([]models.Tag{ {Name: []byte("a"), Value: []byte("b")}, {Name: []byte("c"), Value: []byte("d")}, {Name: []byte("e"), Value: []byte("f")}, })}, - {Name: "bar", Tags: test.TagSliceToTags([]models.Tag{ + {Name: []byte("bar"), Tags: test.TagSliceToTags([]models.Tag{ {Name: []byte("a"), Value: []byte("b")}, {Name: []byte("c"), Value: []byte("d")}, {Name: []byte("g"), Value: []byte("h")}, diff --git a/src/query/graphite/storage/converter.go b/src/query/graphite/storage/converter.go index 7ea9219310..e1d358c501 100644 --- a/src/query/graphite/storage/converter.go +++ b/src/query/graphite/storage/converter.go @@ -21,10 +21,6 @@ package storage import ( - "bytes" - "fmt" - "strings" - "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/models" ) @@ -72,30 +68,3 @@ func matcherTerminator(count int) models.Matcher { Value: []byte(".*"), } } - -func convertTagsToMetricName(tags models.Tags) (string, error) { - var builder strings.Builder - for i, tag := range tags.Tags { - if bytes.Compare(tag.Name, graphite.TagName(i)) != 0 { - // If not in order or a completely different named tag - // then abort, we can't generate the metric name - err := fmt.Errorf("unexpected tag name: expected=%s, actual=%s", - graphite.TagName(i), tag.Name) - return "", err - } - - _, err := builder.Write(tag.Value) - if err != nil { - return "", err - } - - if i != len(tags.Tags)-1 { - _, err := builder.WriteRune('.') - if err != nil { - return "", err - } - } - } - - return builder.String(), nil -} diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index c081e50d2e..b2400d02eb 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -116,17 +116,7 @@ func translateTimeseries( values.SetValueAt(index, datapoint.Value) } - name := m3series.Name() - if tags := m3series.Tags; tags.Len() > 0 { - // Need to flatten the name back into graphite format - newName, err := convertTagsToMetricName(tags) - if err != nil { - return nil, err - } - - name = newName - } - + name := string(m3series.Name()) series[i] = ts.NewSeries(ctx, name, start, values) } diff --git a/src/query/graphite/storage/m3_wrapper_test.go b/src/query/graphite/storage/m3_wrapper_test.go index a84d8f899e..8fd7dfe60c 100644 --- a/src/query/graphite/storage/m3_wrapper_test.go +++ b/src/query/graphite/storage/m3_wrapper_test.go @@ -79,7 +79,8 @@ func TestTranslateTimeseries(t *testing.T) { seriesList := make(m3ts.SeriesList, expected) for i := 0; i < expected; i++ { vals := m3ts.NewFixedStepValues(resolution, steps, float64(i), start) - series := m3ts.NewSeries(fmt.Sprint("a", i), vals, models.NewTags(0, nil)) + series := m3ts.NewSeries([]byte(fmt.Sprint("a", i)), + vals, models.NewTags(0, nil)) series.SetResolution(resolution) seriesList[i] = series } @@ -105,7 +106,8 @@ func TestTranslateTimeseriesWithTags(t *testing.T) { seriesList := make(m3ts.SeriesList, expected) for i := 0; i < expected; i++ { vals := m3ts.NewFixedStepValues(resolution, steps, float64(i), start) - series := m3ts.NewSeries(fmt.Sprint("a", i), vals, models.NewTags(0, nil)) + series := m3ts.NewSeries([]byte(fmt.Sprint("a", i)), vals, + models.NewTags(0, nil)) series.SetResolution(resolution) seriesList[i] = series } @@ -128,7 +130,7 @@ func TestFetchByQuery(t *testing.T) { steps := 3 vals := m3ts.NewFixedStepValues(resolution, steps, 3, start) seriesList := m3ts.SeriesList{ - m3ts.NewSeries("a", vals, models.NewTags(0, nil)), + m3ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)), } for _, series := range seriesList { series.SetResolution(resolution) diff --git a/src/query/mocks/resolver.go b/src/query/mocks/resolver.go deleted file mode 100644 index 8a625ce88e..0000000000 --- a/src/query/mocks/resolver.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -// Code generated by MockGen. DO NOT EDIT. -// Source: policy/resolver/interface.go - -// Package mocks is a generated GoMock package. -package mocks - -import ( - "context" - "reflect" - "time" - - "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/tsdb" - - "github.com/golang/mock/gomock" -) - -// MockPolicyResolver is a mock of PolicyResolver interface -type MockPolicyResolver struct { - ctrl *gomock.Controller - recorder *MockPolicyResolverMockRecorder -} - -// MockPolicyResolverMockRecorder is the mock recorder for MockPolicyResolver -type MockPolicyResolverMockRecorder struct { - mock *MockPolicyResolver -} - -// NewMockPolicyResolver creates a new mock instance -func NewMockPolicyResolver(ctrl *gomock.Controller) *MockPolicyResolver { - mock := &MockPolicyResolver{ctrl: ctrl} - mock.recorder = &MockPolicyResolverMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockPolicyResolver) EXPECT() *MockPolicyResolverMockRecorder { - return m.recorder -} - -// Resolve mocks base method -func (m *MockPolicyResolver) Resolve(ctx context.Context, tagMatchers models.Matchers, startTime, endTime time.Time) ([]tsdb.FetchRequest, error) { - ret := m.ctrl.Call(m, "Resolve", ctx, tagMatchers, startTime, endTime) - ret0, _ := ret[0].([]tsdb.FetchRequest) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Resolve indicates an expected call of Resolve -func (mr *MockPolicyResolverMockRecorder) Resolve(ctx, tagMatchers, startTime, endTime interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resolve", reflect.TypeOf((*MockPolicyResolver)(nil).Resolve), ctx, tagMatchers, startTime, endTime) -} diff --git a/src/query/models/config.go b/src/query/models/config.go new file mode 100644 index 0000000000..1ba755e0ea --- /dev/null +++ b/src/query/models/config.go @@ -0,0 +1,83 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package models + +import ( + "errors" + "fmt" +) + +var validIDSchemes = []IDSchemeType{ + TypeLegacy, + TypeQuoted, + TypePrependMeta, + TypeGraphite, +} + +// Validate validates that the scheme type is valid. +func (t IDSchemeType) Validate() error { + if t == TypeDefault { + return errors.New("id scheme type not set") + } + + if t >= TypeLegacy && t <= TypeGraphite { + return nil + } + + return fmt.Errorf("invalid config id schema type '%v': should be one of %v", + t, validIDSchemes) +} + +func (t IDSchemeType) String() string { + switch t { + case TypeDefault: + return "" + case TypeLegacy: + return "legacy" + case TypeQuoted: + return "quoted" + case TypePrependMeta: + return "prepend_meta" + case TypeGraphite: + return "graphite" + default: + // Should never get here. + return "unknown" + } +} + +// UnmarshalYAML unmarshals a stored merics type. +func (t *IDSchemeType) UnmarshalYAML(unmarshal func(interface{}) error) error { + var str string + if err := unmarshal(&str); err != nil { + return err + } + + for _, valid := range validIDSchemes { + if str == valid.String() { + *t = valid + return nil + } + } + + return fmt.Errorf("invalid MetricsType '%s' valid types are: %v", + str, validIDSchemes) +} diff --git a/src/query/models/config_test.go b/src/query/models/config_test.go new file mode 100644 index 0000000000..18941eb4f1 --- /dev/null +++ b/src/query/models/config_test.go @@ -0,0 +1,67 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package models + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" +) + +func TestIDSchemeValidation(t *testing.T) { + err := TypeDefault.Validate() + assert.EqualError(t, err, "id scheme type not set") + err = TypeLegacy.Validate() + assert.NoError(t, err) + err = TypePrependMeta.Validate() + assert.NoError(t, err) + err = TypeQuoted.Validate() + assert.NoError(t, err) + err = TypeGraphite.Validate() + assert.NoError(t, err) + err = IDSchemeType(5).Validate() + assert.EqualError(t, err, "invalid config id schema type 'unknown':"+ + " should be one of [legacy quoted prepend_meta graphite]") +} + +func TestMetricsTypeUnmarshalYAML(t *testing.T) { + type config struct { + Type IDSchemeType `yaml:"type"` + } + + for _, value := range validIDSchemes { + str := fmt.Sprintf("type: %s\n", value.String()) + + var cfg config + require.NoError(t, yaml.Unmarshal([]byte(str), &cfg)) + + assert.Equal(t, value, cfg.Type) + } + + var cfg config + require.Error(t, yaml.Unmarshal([]byte("type: not_a_known_type\n"), &cfg)) + + require.NoError(t, yaml.Unmarshal([]byte(""), &cfg)) + assert.Equal(t, TypeDefault, cfg.Type) +} diff --git a/src/query/models/matcher.go b/src/query/models/matcher.go index c5075ecd68..de53ec586d 100644 --- a/src/query/models/matcher.go +++ b/src/query/models/matcher.go @@ -92,7 +92,9 @@ func (m Matchers) ToTags( tags := NewTags(len(m), tagOptions) for _, v := range m { if v.Type != MatchEqual { - return Tags{}, fmt.Errorf("illegal match type, got %v, but expecting: %v", v.Type, MatchEqual) + return EmptyTags(), + fmt.Errorf("illegal match type, got %v, but expecting: %v", + v.Type, MatchEqual) } tags = tags.AddTag(Tag{Name: v.Name, Value: v.Value}).Clone() diff --git a/src/query/models/options.go b/src/query/models/options.go index 2a46db52a5..43dc923af7 100644 --- a/src/query/models/options.go +++ b/src/query/models/options.go @@ -31,22 +31,26 @@ var ( ) type tagOptions struct { + version int + idScheme IDSchemeType metricName []byte } // NewTagOptions builds a new tag options with default values. func NewTagOptions() TagOptions { return &tagOptions{ + version: 0, metricName: defaultMetricName, + idScheme: TypeLegacy, } } func (o *tagOptions) Validate() error { - if o.MetricName() == nil { + if o.metricName == nil || len(o.metricName) == 0 { return errNoName } - return nil + return o.idScheme.Validate() } func (o *tagOptions) SetMetricName(metricName []byte) TagOptions { @@ -58,3 +62,13 @@ func (o *tagOptions) SetMetricName(metricName []byte) TagOptions { func (o *tagOptions) MetricName() []byte { return o.metricName } + +func (o *tagOptions) SetIDSchemeType(scheme IDSchemeType) TagOptions { + opts := *o + opts.idScheme = scheme + return &opts +} + +func (o *tagOptions) IDSchemeType() IDSchemeType { + return o.idScheme +} diff --git a/src/query/models/options_test.go b/src/query/models/options_test.go new file mode 100644 index 0000000000..ef33719448 --- /dev/null +++ b/src/query/models/options_test.go @@ -0,0 +1,63 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package models + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultTagOptions(t *testing.T) { + opts := NewTagOptions() + assert.NoError(t, opts.Validate()) + assert.Equal(t, defaultMetricName, opts.MetricName()) + assert.Equal(t, TypeLegacy, opts.IDSchemeType()) +} + +func TestValidTagOptions(t *testing.T) { + opts := NewTagOptions(). + SetIDSchemeType(TypePrependMeta). + SetMetricName([]byte("name")) + + assert.NoError(t, opts.Validate()) + assert.Equal(t, []byte("name"), opts.MetricName()) + assert.Equal(t, TypePrependMeta, opts.IDSchemeType()) +} + +func TestBadNameTagOptions(t *testing.T) { + msg := errNoName.Error() + opts := NewTagOptions(). + SetMetricName(nil) + assert.EqualError(t, opts.Validate(), msg) + + opts = NewTagOptions(). + SetMetricName([]byte{}) + assert.EqualError(t, opts.Validate(), msg) +} + +func TestBadSchemeTagOptions(t *testing.T) { + msg := "invalid config id schema type 'unknown': should be one of" + + " [legacy quoted prepend_meta graphite]" + opts := NewTagOptions(). + SetIDSchemeType(IDSchemeType(6)) + assert.EqualError(t, opts.Validate(), msg) +} diff --git a/src/query/models/strconv/checker.go b/src/query/models/strconv/checker.go new file mode 100644 index 0000000000..f5bea9e978 --- /dev/null +++ b/src/query/models/strconv/checker.go @@ -0,0 +1,66 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package strconv + +// NB: for nicer table formatting. +const ( + // Indicates this rune does not need escaping. + ff = false + // Indicates this rune needs escaping. + tt = true +) + +// Determines valid characters that do not require escaping. +// +// NB: escape all control characters, `"`, and any character above `~`. This +// table loosely based on constants used in utf8.DecodeRune. +var escape = [256]bool{ + // 1 2 3 4 5 6 7 8 9 A B C D E F + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0x00-0x0F + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0x10-0x1F + ff, ff, tt, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x20-0x2F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x30-0x3F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x40-0x4F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x50-0x5F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x60-0x6F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, tt, // 0x70-0x7F + // 1 2 3 4 5 6 7 8 9 A B C D E F + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0x80-0x8F + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0x90-0x9F + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0xA0-0xAF + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0xB0-0xBF + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0xC0-0xCF + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0xD0-0xDF + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0xE0-0xEF + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0xF0-0xFF +} + +// NeedToEscape returns true if the byte slice contains characters that will +// need to be escaped when quoting the slice. +func NeedToEscape(bb []byte) bool { + for _, b := range bb { + if escape[b] { + return true + } + } + + return false +} diff --git a/src/query/policy/resolver/static.go b/src/query/models/strconv/checker_test.go similarity index 50% rename from src/query/policy/resolver/static.go rename to src/query/models/strconv/checker_test.go index ed40c7a99f..cbd08f5823 100644 --- a/src/query/policy/resolver/static.go +++ b/src/query/models/strconv/checker_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -18,43 +18,48 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package resolver +package strconv import ( - "context" - "time" + "testing" - "github.com/m3db/m3/src/metrics/policy" - "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/tsdb" + "github.com/stretchr/testify/assert" ) -type staticResolver struct { - sp policy.StoragePolicy +func generateUnescapedSlice() []byte { + bottomBound := int(' ') + upperBound := int('~') + unescaped := make([]byte, upperBound-bottomBound) + ignore := int('"') + idx := 0 + for i := bottomBound; i <= upperBound; i++ { + if i != ignore { + unescaped[idx] = byte(i) + idx++ + } + } + + return unescaped } -// NewStaticResolver creates a static policy resolver. -func NewStaticResolver(sp policy.StoragePolicy) PolicyResolver { - return &staticResolver{sp: sp} +func TestUnescapedSliceDoesNotNeedToEscape(t *testing.T) { + unescaped := generateUnescapedSlice() + assert.False(t, NeedToEscape(unescaped)) } -func (r *staticResolver) Resolve( - // Context needed here to satisfy PolicyResolver interface - _ context.Context, - tagMatchers models.Matchers, - startTime, endTime time.Time, - tagOptions models.TagOptions, -) ([]tsdb.FetchRequest, error) { - ranges := tsdb.NewSingleRangeRequest("", startTime, endTime, r.sp).Ranges - requests := make([]tsdb.FetchRequest, 1) - tags, err := tagMatchers.ToTags(tagOptions) - if err != nil { - return nil, err - } - requests[0] = tsdb.FetchRequest{ - ID: tags.ID(), - Ranges: ranges, - } +func TestSliceWithQuoteNeedsToEscape(t *testing.T) { + unescaped := generateUnescapedSlice() + unescaped = append(unescaped, '"') + assert.True(t, NeedToEscape(unescaped)) +} + +func TestSliceWithControlCharactersNeedsToEscape(t *testing.T) { + unescaped := generateUnescapedSlice() + lowByte := byte(int(' ') - 1) + unescapedWithLowByte := append(unescaped, lowByte) + assert.True(t, NeedToEscape(unescapedWithLowByte)) - return requests, nil + highByte := byte(int('~') + 1) + unescaped = append(unescaped, highByte) + assert.True(t, NeedToEscape(unescaped)) } diff --git a/src/query/models/strconv/quote.go b/src/query/models/strconv/quote.go new file mode 100644 index 0000000000..6974889c4d --- /dev/null +++ b/src/query/models/strconv/quote.go @@ -0,0 +1,304 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package strconv + +import ( + "strconv" + "unicode/utf8" +) + +// NB: predefined strconv constants +const ( + tx = 0x80 // 1000 0000 + t2 = 0xC0 // 1100 0000 + t3 = 0xE0 // 1110 0000 + t4 = 0xF0 // 1111 0000 + + maskx = 0x3F // 0011 1111 + + rune1Max = 1<<7 - 1 + rune2Max = 1<<11 - 1 + rune3Max = 1<<16 - 1 + + runeError = '\uFFFD' // the "error" Rune or "Unicode replacement character" + maxRune = '\U0010FFFF' // Maximum valid Unicode code point. + + // NB: Code points in the surrogate range are not valid for UTF-8. + surrogateMin = 0xD800 + surrogateMax = 0xDFFF + + lowerhex = "0123456789abcdef" + + quote = byte('"') +) + +// EncodeRune writes into src (which must be large enough) the UTF-8 encoding +// of the rune at the given index. It returns the number of bytes written. +// +// NB: based on utf8.encodeRune method, but instead uses indexed insertion +// into a predefined buffer. +func encodeRune(dst []byte, r rune, idx int) int { + // Negative values are erroneous. Making it unsigned addresses the problem. + switch i := uint32(r); { + case i <= rune1Max: + dst[idx] = byte(r) + return idx + 1 + case i <= rune2Max: + dst[idx] = t2 | byte(r>>6) + dst[idx+1] = tx | byte(r)&maskx + return idx + 2 + case i > maxRune, surrogateMin <= i && i <= surrogateMax: + r = runeError + fallthrough + case i <= rune3Max: + dst[idx] = t3 | byte(r>>12) + dst[idx+2] = tx | byte(r)&maskx + dst[idx+1] = tx | byte(r>>6)&maskx + return idx + 3 + default: + dst[idx] = t4 | byte(r>>18) + dst[idx+1] = tx | byte(r>>12)&maskx + dst[idx+2] = tx | byte(r>>6)&maskx + dst[idx+3] = tx | byte(r)&maskx + return idx + 4 + } +} + +// It returns the number of bytes written. +func insertEscapedRune(dst []byte, r rune, idx int) int { + if r == rune(quote) || r == '\\' { // always backslashed + dst[idx] = '\\' + dst[idx+1] = byte(r) + return idx + 2 + } + + if strconv.IsPrint(r) { + return encodeRune(dst, r, idx) + } + + switch r { + case '\a': + dst[idx] = '\\' + dst[idx+1] = 'a' + return idx + 2 + case '\b': + dst[idx] = '\\' + dst[idx+1] = 'b' + return idx + 2 + case '\f': + dst[idx] = '\\' + dst[idx+1] = 'f' + return idx + 2 + case '\n': + dst[idx] = '\\' + dst[idx+1] = 'n' + return idx + 2 + case '\r': + dst[idx] = '\\' + dst[idx+1] = 'r' + return idx + 2 + case '\t': + dst[idx] = '\\' + dst[idx+1] = 't' + return idx + 2 + case '\v': + dst[idx] = '\\' + dst[idx+1] = 'v' + return idx + 2 + default: + switch { + case r < ' ': + dst[idx] = '\\' + dst[idx+1] = 'x' + dst[idx+2] = lowerhex[byte(r)>>4] + dst[idx+3] = lowerhex[byte(r)&0xF] + return idx + 4 + case r > utf8.MaxRune: + r = 0xFFFD + fallthrough + case r < 0x10000: + dst[idx] = '\\' + dst[idx+1] = 'u' + dst[idx+2] = lowerhex[r>>uint(12)&0xF] + dst[idx+3] = lowerhex[r>>uint(8)&0xF] + dst[idx+4] = lowerhex[r>>uint(4)&0xF] + dst[idx+5] = lowerhex[r>>uint(0)&0xF] + return idx + 6 + default: + dst[idx] = '\\' + dst[idx+1] = 'U' + dst[idx+2] = lowerhex[r>>uint(28)&0xF] + dst[idx+3] = lowerhex[r>>uint(24)&0xF] + dst[idx+4] = lowerhex[r>>uint(20)&0xF] + dst[idx+5] = lowerhex[r>>uint(16)&0xF] + dst[idx+6] = lowerhex[r>>uint(12)&0xF] + dst[idx+7] = lowerhex[r>>uint(8)&0xF] + dst[idx+8] = lowerhex[r>>uint(4)&0xF] + dst[idx+9] = lowerhex[r>>uint(0)&0xF] + return idx + 10 + } + } +} + +// Escape copies byte slice src to dst at a given index, adding escaping any +// quote or control characters. It returns the index at which the copy finished. +// +// NB: ensure that dst is large enough to store src, additional +// quotation runes, and any additional escape characters. +// as generated by Quote, to dst and returns the extended buffer. +func Escape(dst, src []byte, idx int) int { + // nolint + for width := 0; len(src) > 0; src = src[width:] { + r := rune(src[0]) + width = 1 + if r >= utf8.RuneSelf { + r, width = utf8.DecodeRune(src) + } + + if width == 1 && r == utf8.RuneError { + dst[idx] = '\\' + dst[idx+1] = 'x' + dst[idx+2] = lowerhex[src[0]>>4] + dst[idx+3] = lowerhex[src[0]&0xF] + idx += 4 + continue + } + + idx = insertEscapedRune(dst, r, idx) + } + + return idx +} + +// Quote copies byte slice src to dst at a given index, adding +// quotation runes around the src slice and escaping any quote or control +// characters. It returns the index at which the copy finished. +// +// NB: ensure that dst is large enough to store src, additional +// quotation runes, and any additional escape characters. +// as generated by Quote, to dst and returns the extended buffer. +// +// NB: based on stconv.Quote method, but instead uses indexed insertion +// into a predefined buffer. +func Quote(dst, src []byte, idx int) int { + dst[idx] = quote + idx++ + idx = Escape(dst, src, idx) + dst[idx] = quote + return idx + 1 +} + +// QuoteSimple copies byte slice src to dst at a given index, adding +// quotation runes around the src slice, but does not escape any +// characters. It returns the index at which the copy finished. +// +// NB: ensure that dst is large enough to store src and two other characters. +func QuoteSimple(dst, src []byte, idx int) int { + dst[idx] = quote + idx++ + idx += copy(dst[idx:], src) + dst[idx] = quote + return idx + 1 +} + +// EscapedLength computes the length required for a byte slice to hold +// a quoted byte slice. +// +// NB: essentially a dry-run of `Escape` that does not write characters, but +// instead counts total character counts for the destination byte slice. +func EscapedLength(src []byte) int { + length := 0 + // nolint + for width := 0; len(src) > 0; src = src[width:] { + r := rune(src[0]) + width = 1 + if r >= utf8.RuneSelf { + r, width = utf8.DecodeRune(src) + } + + if width == 1 && r == utf8.RuneError { + length += 4 + continue + } + + length += escapedRuneLength(r) + } + + return length +} + +// QuotedLength computes the length required for a byte slice to hold +// a quoted byte slice. +// +// NB: essentially a dry-run of `Quote` that does not write characters, but +// instead counts total character counts for the destination byte slice. +func QuotedLength(src []byte) int { + return 2 + EscapedLength(src) // account for opening and closing quotes +} + +func escapedRuneLength(r rune) int { + if r == rune(quote) || r == '\\' { // always backslashed + return 2 + } + + if strconv.IsPrint(r) { + switch i := uint32(r); { + case i <= rune1Max: + return 1 + case i <= rune2Max: + return 2 + case i > maxRune, surrogateMin <= i && i <= surrogateMax: + fallthrough + case i <= rune3Max: + return 3 + default: + return 4 + } + } + + switch r { + case '\a': + return 2 + case '\b': + return 2 + case '\f': + return 2 + case '\n': + return 2 + case '\r': + return 2 + case '\t': + return 2 + case '\v': + return 2 + default: + switch { + case r < ' ': + return 4 + case r > utf8.MaxRune: + fallthrough + case r < 0x10000: + return 6 + default: + return 10 + } + } +} diff --git a/src/query/models/strconv/quote_test.go b/src/query/models/strconv/quote_test.go new file mode 100644 index 0000000000..0641fa22f2 --- /dev/null +++ b/src/query/models/strconv/quote_test.go @@ -0,0 +1,221 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package strconv + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +type quoteTest struct { + in string + out string +} + +var quotetests = []quoteTest{ + // NB: original strconv tests + {"\a\b\f\r\n\t\v", `"\a\b\f\r\n\t\v"`}, + {"\\", `"\\"`}, + {"abc\xffdef", `"abc\xffdef"`}, + {"\u263a", `"☺"`}, + {"\U0010ffff", `"\U0010ffff"`}, + {"\x04", `"\x04"`}, + // Some non-printable but graphic runes. Final column is double-quoted. + {"!\u00a0!\u2000!\u3000!", `"!\u00a0!\u2000!\u3000!"`}, + + // NB: Additional tests + {`"tag"`, `"\"tag\""`}, + {`"t"a"g"`, `"\"t\"a\"g\""`}, +} + +func TestEscape(t *testing.T) { + for _, tt := range quotetests { + in := []byte(tt.in) + bufferLen := EscapedLength(in) + bb := make([]byte, bufferLen) + idx := Escape(bb, in, 0) + assert.Equal(t, idx, bufferLen) + expected := []byte(tt.out) + assert.Equal(t, expected[1:len(expected)-1], bb) + } +} + +func TestQuote(t *testing.T) { + for _, tt := range quotetests { + in := []byte(tt.in) + bufferLen := QuotedLength(in) + bb := make([]byte, bufferLen) + idx := Quote(bb, in, 0) + assert.Equal(t, idx, bufferLen) + assert.Equal(t, []byte(tt.out), bb) + } +} + +func TestQuoteWithOffset(t *testing.T) { + for _, tt := range quotetests { + in := []byte(tt.in) + bufferLen := QuotedLength(in) + bb := make([]byte, bufferLen+2) + bb[0] = '!' + bb[bufferLen+1] = '!' + idx := Quote(bb, in, 1) + assert.Equal(t, idx, bufferLen+1) + assert.Equal(t, []byte("!"+tt.out+"!"), bb) + } +} + +func TestSimpleQuote(t *testing.T) { + for _, tt := range quotetests { + in := []byte(tt.in) + // accounts for buffer and 2 quotation characters + bufferLen := len(in) + 2 + bb := make([]byte, bufferLen) + idx := QuoteSimple(bb, in, 0) + assert.Equal(t, idx, bufferLen) + expected := []byte("\"" + tt.in + "\"") + assert.Equal(t, expected, bb) + } +} + +func TestSimpleQuoteWithOffset(t *testing.T) { + for _, tt := range quotetests { + in := []byte(tt.in) + // accounts for buffer, additional characters, and 2 quotation characters + bufferLen := len(in) + 4 + bb := make([]byte, bufferLen) + bb[0] = '!' + bb[bufferLen-1] = '!' + idx := QuoteSimple(bb, in, 1) + assert.Equal(t, idx, bufferLen-1) + expected := []byte("!\"" + tt.in + "\"!") + assert.Equal(t, expected, bb) + } +} + +func TestLongQuoteWithOffset(t *testing.T) { + for _, tt := range quotetests { + repeat := 100 + in := []byte(tt.in) + bufferLen := QuotedLength(in) + bb := make([]byte, bufferLen*repeat) + for i := 0; i < repeat; i++ { + idx := Quote(bb, in, bufferLen*i) + assert.Equal(t, idx, bufferLen*(i+1)) + expected := "" + for j := 0; j <= i; j++ { + expected += tt.out + } + + assert.Equal(t, []byte(expected), bb[0:idx]) + } + } +} + +func BenchmarkQuoteSimple(b *testing.B) { + src := []byte("\a\b\f\r\n\t\v\a\b\f\r\n\t\v\a\b\f\r\n\t\v") + dst := make([]byte, len(src)+2) + for i := 0; i < b.N; i++ { + QuoteSimple(dst, src, 0) + } +} + +func BenchmarkQuote(b *testing.B) { + src := []byte("\a\b\f\r\n\t\v\a\b\f\r\n\t\v\a\b\f\r\n\t\v") + dst := make([]byte, QuotedLength(src)) + for i := 0; i < b.N; i++ { + Quote(dst, src, 0) + } +} + +func BenchmarkQuoteWithOffset(b *testing.B) { + src := []byte("\a\b\f\r\n\t\v\a\b\f\r\n\t\v\a\b\f\r\n\t\v") + l := QuotedLength(src) + dst := make([]byte, l*100) + for i := 0; i < b.N; i++ { + Quote(dst, src, l*(i%100)) + } +} + +// NB: original utf8.EncodeRune tests +var utf8map = []struct { + r rune + str string +}{ + {0x0000, "\x00"}, + {0x0001, "\x01"}, + {0x007e, "\x7e"}, + {0x007f, "\x7f"}, + {0x0080, "\xc2\x80"}, + {0x0081, "\xc2\x81"}, + {0x00bf, "\xc2\xbf"}, + {0x00c0, "\xc3\x80"}, + {0x00c1, "\xc3\x81"}, + {0x00c8, "\xc3\x88"}, + {0x00d0, "\xc3\x90"}, + {0x00e0, "\xc3\xa0"}, + {0x00f0, "\xc3\xb0"}, + {0x00f8, "\xc3\xb8"}, + {0x00ff, "\xc3\xbf"}, + {0x0100, "\xc4\x80"}, + {0x07ff, "\xdf\xbf"}, + {0x0400, "\xd0\x80"}, + {0x0800, "\xe0\xa0\x80"}, + {0x0801, "\xe0\xa0\x81"}, + {0x1000, "\xe1\x80\x80"}, + {0xd000, "\xed\x80\x80"}, + {0xd7ff, "\xed\x9f\xbf"}, // last code point before surrogate half. + {0xe000, "\xee\x80\x80"}, // first code point after surrogate half. + {0xfffe, "\xef\xbf\xbe"}, + {0xffff, "\xef\xbf\xbf"}, + {0x10000, "\xf0\x90\x80\x80"}, + {0x10001, "\xf0\x90\x80\x81"}, + {0x40000, "\xf1\x80\x80\x80"}, + {0x10fffe, "\xf4\x8f\xbf\xbe"}, + {0x10ffff, "\xf4\x8f\xbf\xbf"}, + {0xFFFD, "\xef\xbf\xbd"}, +} + +func TestEncodeRune(t *testing.T) { + for _, m := range utf8map { + b := []byte(m.str) + var bb [10]byte + n := encodeRune(bb[:], m.r, 0) + b1 := bb[0:n] + if !bytes.Equal(b, b1) { + t.Errorf("EncodeRune(%#04x) = %q want %q", m.r, b1, b) + } + } +} + +func TestEncodeRuneWithOffset(t *testing.T) { + for _, m := range utf8map { + b := []byte("!" + m.str) + var bb [10]byte + bb[0] = '!' + n := encodeRune(bb[:], m.r, 1) + b1 := bb[0:n] + if !bytes.Equal(b, b1) { + t.Errorf("EncodeRune(%#04x) = %q want %q", m.r, b1, b) + } + } +} diff --git a/src/query/models/tags.go b/src/query/models/tags.go index efed6e5237..65e73060fa 100644 --- a/src/query/models/tags.go +++ b/src/query/models/tags.go @@ -25,17 +25,15 @@ import ( "fmt" "hash/fnv" "sort" - "strings" -) -var ( - defaultOptions = NewTagOptions() + "github.com/m3db/m3/src/query/models/strconv" + "github.com/m3db/m3/src/query/util/writer" ) // NewTags builds a tags with the given size and tag options. func NewTags(size int, opts TagOptions) Tags { if opts == nil { - opts = defaultOptions + opts = NewTagOptions() } return Tags{ @@ -50,41 +48,41 @@ func EmptyTags() Tags { return NewTags(0, nil) } -// ID returns a string representation of the tags. -func (t Tags) ID() string { - var ( - idLen = t.IDLen() - strBuilder = strings.Builder{} - ) - - strBuilder.Grow(idLen) - for _, tag := range t.Tags { - strBuilder.Write(tag.Name) - strBuilder.WriteByte(eq) - strBuilder.Write(tag.Value) - strBuilder.WriteByte(sep) +// ID returns a byte slice representation of the tags, using the generation +// strategy from . +func (t Tags) ID() []byte { + schemeType := t.Opts.IDSchemeType() + switch schemeType { + case TypeLegacy: + return t.legacyID() + case TypeQuoted: + return t.quotedID() + case TypePrependMeta: + return t.prependMetaID() + case TypeGraphite: + return t.graphiteID() + default: + // Default to prepending meta + return t.prependMetaID() } - - return strBuilder.String() } -// IDMarshalTo writes out the ID representation -// of the tags into the provided buffer. -func (t Tags) IDMarshalTo(b []byte) []byte { +func (t Tags) legacyID() []byte { + // TODO: pool these bytes. + id := make([]byte, t.idLen()) + idx := -1 for _, tag := range t.Tags { - b = append(b, tag.Name...) - b = append(b, eq) - b = append(b, tag.Value...) - b = append(b, sep) + idx += copy(id[idx+1:], tag.Name) + 1 + id[idx] = eq + idx += copy(id[idx+1:], tag.Value) + 1 + id[idx] = sep } - return b + return id } -// IDLen returns the length of the ID that would be -// generated from the tags. -func (t Tags) IDLen() int { - idLen := 2 * t.Len() // account for eq and sep +func (t Tags) idLen() int { + idLen := 2 * t.Len() // account for separators for _, tag := range t.Tags { idLen += len(tag.Name) idLen += len(tag.Value) @@ -93,37 +91,178 @@ func (t Tags) IDLen() int { return idLen } -// IDWithExcludes returns a string representation of the tags excluding some tag keys. -func (t Tags) IDWithExcludes(excludeKeys ...[]byte) uint64 { - b := make([]byte, 0, t.Len()) - for _, tag := range t.Tags { - // Always exclude the metric name by default - if bytes.Equal(tag.Name, t.Opts.MetricName()) { - continue - } +type tagEscaping struct { + escapeName bool + escapeValue bool +} - found := false - for _, n := range excludeKeys { - if bytes.Equal(n, tag.Name) { - found = true - break +func (t Tags) quotedID() []byte { + var ( + idLen int + needEscaping []tagEscaping + l int + escape tagEscaping + ) + + for i, tt := range t.Tags { + l, escape = tt.serializedLength() + idLen += l + if escape.escapeName || escape.escapeValue { + if needEscaping == nil { + needEscaping = make([]tagEscaping, len(t.Tags)) } - } - // Skip the key - if found { - continue + needEscaping[i] = escape } + } - b = append(b, tag.Name...) - b = append(b, eq) - b = append(b, tag.Value...) - b = append(b, sep) + tagLength := 2 * len(t.Tags) + idLen += tagLength + 1 // account for separators and brackets + if needEscaping == nil { + return t.quoteIDSimple(idLen) } - h := fnv.New64a() - h.Write(b) - return h.Sum64() + // TODO: pool these bytes + lastIndex := len(t.Tags) - 1 + id := make([]byte, idLen) + id[0] = leftBracket + idx := 1 + for i, tt := range t.Tags[:lastIndex] { + idx = tt.writeAtIndex(id, needEscaping[i], idx) + id[idx] = sep + idx++ + } + + idx = t.Tags[lastIndex].writeAtIndex(id, needEscaping[lastIndex], idx) + id[idx] = rightBracket + return id +} + +// adds quotes to tag values when no characters need escaping. +func (t Tags) quoteIDSimple(length int) []byte { + // TODO: pool these bytes. + id := make([]byte, length) + id[0] = leftBracket + idx := 1 + lastIndex := len(t.Tags) - 1 + for _, tag := range t.Tags[:lastIndex] { + idx += copy(id[idx:], tag.Name) + id[idx] = eq + idx++ + idx = strconv.QuoteSimple(id, tag.Value, idx) + id[idx] = sep + idx++ + } + + tag := t.Tags[lastIndex] + idx += copy(id[idx:], tag.Name) + id[idx] = eq + idx++ + idx = strconv.QuoteSimple(id, tag.Value, idx) + id[idx] = rightBracket + + return id +} + +func (t Tag) writeAtIndex(id []byte, escape tagEscaping, idx int) int { + if escape.escapeName { + idx = strconv.Escape(id, t.Name, idx) + } else { + idx += copy(id[idx:], t.Name) + } + + // add = character + id[idx] = eq + idx++ + + if escape.escapeValue { + idx = strconv.Quote(id, t.Value, idx) + } else { + idx = strconv.QuoteSimple(id, t.Value, idx) + } + + return idx +} + +func (t Tag) serializedLength() (int, tagEscaping) { + var ( + idLen int + escaping tagEscaping + ) + if strconv.NeedToEscape(t.Name) { + idLen += strconv.EscapedLength(t.Name) + escaping.escapeName = true + } else { + idLen += len(t.Name) + } + + if strconv.NeedToEscape(t.Value) { + idLen += strconv.QuotedLength(t.Value) + escaping.escapeValue = true + } else { + idLen += len(t.Value) + 2 + } + + return idLen, escaping +} + +func (t Tags) prependMetaID() []byte { + l, metaLengths := t.prependMetaLen() + // TODO: pool these bytes. + id := make([]byte, l) + idx := writeTagLengthMeta(id, metaLengths) + for _, tag := range t.Tags { + idx += copy(id[idx:], tag.Name) + idx += copy(id[idx:], tag.Value) + } + + return id +} + +func writeTagLengthMeta(dst []byte, lengths []int) int { + idx := writer.WriteIntegers(dst, lengths, sep, 0) + dst[idx] = finish + return idx + 1 +} + +func (t Tags) prependMetaLen() (int, []int) { + idLen := 1 // account for separator + tagLengths := make([]int, len(t.Tags)*2) + for i, tag := range t.Tags { + tagLen := len(tag.Name) + tagLengths[2*i] = tagLen + idLen += tagLen + tagLen = len(tag.Value) + tagLengths[2*i+1] = tagLen + idLen += tagLen + } + + prefixLen := writer.IntsLength(tagLengths) + return idLen + prefixLen, tagLengths +} + +func (t Tags) graphiteID() []byte { + // TODO: pool these bytes. + id := make([]byte, t.idLenGraphite()) + idx := 0 + lastIndex := len(t.Tags) - 1 + for _, tag := range t.Tags[:lastIndex] { + idx += copy(id[idx:], tag.Value) + id[idx] = graphiteSep + idx++ + } + + copy(id[idx:], t.Tags[lastIndex].Value) + return id +} + +func (t Tags) idLenGraphite() int { + idLen := t.Len() - 1 // account for separators + for _, tag := range t.Tags { + idLen += len(tag.Value) + } + + return idLen } func (t Tags) tagSubset(keys [][]byte, include bool) Tags { @@ -150,26 +289,6 @@ func (t Tags) TagsWithoutKeys(excludeKeys [][]byte) Tags { return t.tagSubset(excludeKeys, false) } -// IDWithKeys returns a string representation of the tags only including the given keys. -func (t Tags) IDWithKeys(includeKeys ...[]byte) uint64 { - b := make([]byte, 0, t.Len()) - for _, tag := range t.Tags { - for _, k := range includeKeys { - if bytes.Equal(tag.Name, k) { - b = append(b, tag.Name...) - b = append(b, eq) - b = append(b, tag.Value...) - b = append(b, sep) - break - } - } - } - - h := fnv.New64a() - h.Write(b) - return h.Sum64() -} - // TagsWithKeys returns only the tags which have the given keys. func (t Tags) TagsWithKeys(includeKeys [][]byte) Tags { return t.tagSubset(includeKeys, true) @@ -211,6 +330,12 @@ func (t Tags) AddTag(tag Tag) Tags { return t.Normalize() } +// AddTagWithoutNormalizing is used to add a single tag. +func (t Tags) AddTagWithoutNormalizing(tag Tag) Tags { + t.Tags = append(t.Tags, tag) + return t +} + // SetName sets the metric name. func (t Tags) SetName(value []byte) Tags { return t.AddOrUpdateTag(Tag{Name: t.Opts.MetricName(), Value: value}) @@ -253,13 +378,46 @@ func (t Tags) Less(i, j int) bool { return bytes.Compare(t.Tags[i].Name, t.Tags[j].Name) == -1 } +type sortableTagsNumericallyAsc Tags + +func (t sortableTagsNumericallyAsc) Len() int { return len(t.Tags) } +func (t sortableTagsNumericallyAsc) Swap(i, j int) { + t.Tags[i], t.Tags[j] = t.Tags[j], t.Tags[i] +} +func (t sortableTagsNumericallyAsc) Less(i, j int) bool { + iName, jName := t.Tags[i].Name, t.Tags[j].Name + lenDiff := len(iName) - len(jName) + if lenDiff < 0 { + return true + } + + if lenDiff > 0 { + return false + } + + return bytes.Compare(iName, jName) == -1 +} + // Normalize normalizes the tags by sorting them in place. // In the future, it might also ensure other things like uniqueness. func (t Tags) Normalize() Tags { - sort.Sort(t) + // Graphite tags are sorted numerically rather than lexically. + if t.Opts.IDSchemeType() == TypeGraphite { + sort.Sort(sortableTagsNumericallyAsc(t)) + } else { + sort.Sort(t) + } + return t } +// HashedID returns the hashed ID for the tags. +func (t Tags) HashedID() uint64 { + h := fnv.New64a() + h.Write(t.ID()) + return h.Sum64() +} + func (t Tag) String() string { return fmt.Sprintf("%s: %s", t.Name, t.Value) } diff --git a/src/query/models/tags_test.go b/src/query/models/tags_test.go index 69cb8f0854..57b35244ac 100644 --- a/src/query/models/tags_test.go +++ b/src/query/models/tags_test.go @@ -22,78 +22,136 @@ package models import ( "bytes" + "fmt" "hash/fnv" "reflect" "testing" "unsafe" + "github.com/m3db/m3/src/query/util/writer" xtest "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func createTags(withName bool) Tags { - tags := NewTags(3, nil).AddTags([]Tag{ +func testLongTagIDOutOfOrder(t *testing.T, scheme IDSchemeType) Tags { + opts := NewTagOptions().SetIDSchemeType(scheme) + tags := NewTags(3, opts).AddTags([]Tag{ {Name: []byte("t1"), Value: []byte("v1")}, + {Name: []byte("t3"), Value: []byte("v3")}, {Name: []byte("t2"), Value: []byte("v2")}, + {Name: []byte("t4"), Value: []byte("v4")}, }) - if withName { - tags = tags.SetName([]byte("v0")) - } - return tags } -func TestTagID(t *testing.T) { - tags := createTags(false) - assert.Equal(t, "t1=v1,t2=v2,", tags.ID()) - assert.Equal(t, tags.IDLen(), len(tags.ID())) +func TestLongTagNewIDOutOfOrderLegacy(t *testing.T) { + tags := testLongTagIDOutOfOrder(t, TypeLegacy) + actual := tags.ID() + assert.Equal(t, tags.idLen(), len(actual)) + assert.Equal(t, []byte("t1=v1,t2=v2,t3=v3,t4=v4,"), actual) } -func TestTagIDMarshalTo(t *testing.T) { - var ( - tags = createTags(false) - b = tags.IDMarshalTo([]byte{}) - ) - assert.Equal(t, []byte("t1=v1,t2=v2,"), b) - assert.Equal(t, tags.IDLen(), len(b)) +func TestLongTagNewIDOutOfOrderQuoted(t *testing.T) { + tags := testLongTagIDOutOfOrder(t, TypeQuoted) + actual := tags.ID() + assert.Equal(t, []byte(`{t1="v1",t2="v2",t3="v3",t4="v4"}`), actual) } -func TestWithoutName(t *testing.T) { - tags := createTags(true) - tagsWithoutName := tags.WithoutName() +func TestLongTagNewIDOutOfOrderGraphite(t *testing.T) { + opts := NewTagOptions().SetIDSchemeType(TypeGraphite) + tags := NewTags(3, opts).AddTags([]Tag{ + {Name: []byte("__g0__"), Value: []byte("v0")}, + {Name: []byte("__g10__"), Value: []byte("v10")}, + {Name: []byte("__g9__"), Value: []byte("v9")}, + {Name: []byte("__g3__"), Value: []byte("v3")}, + {Name: []byte("__g6__"), Value: []byte("v6")}, + {Name: []byte("__g11__"), Value: []byte("v11")}, + {Name: []byte("__g8__"), Value: []byte("v8")}, + {Name: []byte("__g5__"), Value: []byte("v5")}, + {Name: []byte("__g1__"), Value: []byte("v1")}, + {Name: []byte("__g7__"), Value: []byte("v7")}, + {Name: []byte("__g2__"), Value: []byte("v2")}, + {Name: []byte("__g4__"), Value: []byte("v4")}, + {Name: []byte("__g12__"), Value: []byte("v12")}, + }) - assert.Equal(t, createTags(false), tagsWithoutName) + actual := tags.ID() + assert.Equal(t, []byte("v0.v1.v2.v3.v4.v5.v6.v7.v8.v9.v10.v11.v12"), actual) } -func TestIDWithKeys(t *testing.T) { - tags := createTags(true) +func TestHashedID(t *testing.T) { + tags := testLongTagIDOutOfOrder(t, TypeLegacy) + actual := tags.HashedID() - b := []byte("__name__=v0,t1=v1,t2=v2,") h := fnv.New64a() - h.Write(b) + h.Write([]byte("t1=v1,t2=v2,t3=v3,t4=v4,")) + expected := h.Sum64() - idWithKeys := tags.IDWithKeys([]byte("t1"), []byte("t2"), tags.Opts.MetricName()) - assert.Equal(t, h.Sum64(), idWithKeys) + assert.Equal(t, expected, actual) } -func TestTagsWithKeys(t *testing.T) { - tags := createTags(true) +func TestLongTagNewIDOutOfOrderQuotedWithEscape(t *testing.T) { + tags := testLongTagIDOutOfOrder(t, TypeQuoted) + tags = tags.AddTag(Tag{Name: []byte(`t5""`), Value: []byte(`v"5`)}) + actual := tags.ID() + assert.Equal(t, []byte(`{t1="v1",t2="v2",t3="v3",t4="v4",t5\"\"="v\"5"}`), actual) +} - tagsWithKeys := tags.TagsWithKeys([][]byte{[]byte("t1")}) - assert.Equal(t, []Tag{{Name: []byte("t1"), Value: []byte("v1")}}, tagsWithKeys.Tags) +func TestQuotedCollisions(t *testing.T) { + twoTags := NewTags(2, NewTagOptions().SetIDSchemeType(TypeQuoted)). + AddTags([]Tag{ + {Name: []byte("t1"), Value: []byte("v1")}, + {Name: []byte("t2"), Value: []byte("v2")}, + }) + + tagValue := NewTags(2, NewTagOptions().SetIDSchemeType(TypeQuoted)). + AddTag(Tag{Name: []byte("t1"), Value: []byte(`"v1"t2"v2"`)}) + assert.NotEqual(t, twoTags.ID(), tagValue.ID()) + + tagName := NewTags(2, NewTagOptions().SetIDSchemeType(TypeQuoted)). + AddTag(Tag{Name: []byte(`t1"v1"t2`), Value: []byte("v2")}) + assert.NotEqual(t, twoTags.ID(), tagName.ID()) + + assert.NotEqual(t, tagValue.ID(), tagName.ID()) } -func TestIDWithExcludes(t *testing.T) { +func TestLongTagNewIDOutOfOrderPrefixed(t *testing.T) { + tags := testLongTagIDOutOfOrder(t, TypePrependMeta). + AddTag(Tag{Name: []byte("t9"), Value: []byte(`"v1"t2"v2"`)}) + actual := tags.ID() + expectedLength, _ := tags.prependMetaLen() + require.Equal(t, expectedLength, len(actual)) + assert.Equal(t, []byte(`2,2,2,2,2,2,2,2,2,10!t1v1t2v2t3v3t4v4t9"v1"t2"v2"`), actual) +} + +func createTags(withName bool) Tags { + tags := NewTags(3, nil).AddTags([]Tag{ + {Name: []byte("t1"), Value: []byte("v1")}, + {Name: []byte("t2"), Value: []byte("v2")}, + }) + + if withName { + tags = tags.SetName([]byte("v0")) + } + + return tags +} + +func TestWithoutName(t *testing.T) { tags := createTags(true) + tagsWithoutName := tags.WithoutName() - b := []byte("t2=v2,") - h := fnv.New64a() - h.Write(b) + assert.Equal(t, createTags(false), tagsWithoutName) +} + +func TestTagsWithKeys(t *testing.T) { + tags := createTags(true) - idWithExcludes := tags.IDWithExcludes([]byte("t1")) - assert.Equal(t, h.Sum64(), idWithExcludes) + tagsWithKeys := tags.TagsWithKeys([][]byte{[]byte("t1")}) + assert.Equal(t, []Tag{{Name: []byte("t1"), Value: []byte("v1")}}, tagsWithKeys.Tags) } func TestTagsWithExcludes(t *testing.T) { @@ -103,19 +161,6 @@ func TestTagsWithExcludes(t *testing.T) { assert.Equal(t, []Tag{{Name: []byte("t2"), Value: []byte("v2")}}, tagsWithoutKeys.Tags) } -func TestTagsIDLen(t *testing.T) { - tags := NewTags(3, NewTagOptions().SetMetricName([]byte("N"))) - tags = tags.AddTags([]Tag{ - {Name: []byte("a"), Value: []byte("1")}, - {Name: []byte("b"), Value: []byte("2")}, - {Name: []byte("c"), Value: []byte("3")}, - }) - - tags = tags.SetName([]byte("9")) - idLen := len("a:1,b:2,c:3,N:9,") - assert.Equal(t, idLen, tags.IDLen()) -} - func TestTagsWithExcludesCustom(t *testing.T) { tags := NewTags(4, nil) tags = tags.AddTags([]Tag{ @@ -153,6 +198,28 @@ func TestAddTags(t *testing.T) { assert.Equal(t, expected, tags.Tags) } +func TestAddTagWithoutNormalizing(t *testing.T) { + tags := NewTags(4, nil) + + tagToAdd := Tag{Name: []byte("x"), Value: []byte("3")} + tags = tags.AddTagWithoutNormalizing(tagToAdd) + assert.Equal(t, []Tag{tagToAdd}, tags.Tags) + + tags = tags.AddTagWithoutNormalizing( + Tag{Name: []byte("a"), Value: []byte("1")}, + ) + expected := []Tag{ + {Name: []byte("x"), Value: []byte("3")}, + {Name: []byte("a"), Value: []byte("1")}, + } + + assert.Equal(t, expected, tags.Tags) + // Normalization should sort. + tags.Normalize() + expected[0], expected[1] = expected[1], expected[0] + assert.Equal(t, expected, tags.Tags) +} + func TestUpdateName(t *testing.T) { name := []byte("!") tags := NewTags(1, NewTagOptions().SetMetricName(name)) @@ -242,3 +309,138 @@ func TestTagAppend(t *testing.T) { assert.Equal(t, expected, tags.Tags) } + +func TestWriteTagLengthMeta(t *testing.T) { + lengths := []int{0, 1, 2, 8, 10, 8, 100, 8, 101, 8, 110, 123456, 12345} + l := writer.IntsLength(lengths) + 1 // account for final character + require.Equal(t, 42, l) + buf := make([]byte, l) + count := writeTagLengthMeta(buf, lengths) + require.Equal(t, 42, count) + assert.Equal(t, []byte("0,1,2,8,10,8,100,8,101,8,110,123456,12345!"), buf) +} + +func buildTags(b *testing.B, count, length int, opts TagOptions, escape bool) Tags { + tags := make([]Tag, count) + for i := range tags { + n := []byte(fmt.Sprint("t", i)) + v := make([]byte, length) + for j := range v { + if escape { + v[j] = '"' + } else { + v[j] = 'a' + } + } + + tags[i] = Tag{Name: n, Value: v} + } + + return NewTags(count, opts).AddTags(tags) +} + +var tagBenchmarks = []struct { + name string + tagCount, tagLength int +}{ + {"10 Tags 10 Length", 10, 10}, + {"100 Tags 10 Length", 100, 10}, + {"10 Tags 100 Length", 10, 100}, + {"100 Tags 100 Length", 100, 100}, +} + +const typeQuotedEscaped = IDSchemeType(100) + +var tagIDSchemes = []struct { + name string + scheme IDSchemeType +}{ + {"___legacy", TypeLegacy}, + {"_graphite", TypeGraphite}, + {"__prepend", TypePrependMeta}, + // only simple quotable tag values. + {"___quoted", TypeQuoted}, + // only escaped tag values. + {"__esc_qtd", typeQuotedEscaped}, +} + +/* +Benchmark results: + +10__Tags_10__Length___legacy 5000000 236 ns/op 144 B/op 1 allocs/op +10__Tags_10__Length_graphite 10000000 174 ns/op 112 B/op 1 allocs/op +10__Tags_10__Length__prepend 3000000 537 ns/op 336 B/op 2 allocs/op +10__Tags_10__Length___quoted 3000000 404 ns/op 176 B/op 1 allocs/op +10__Tags_10__Length__esc_qtd 1000000 1324 ns/op 320 B/op 2 allocs/op + +100_Tags_10__Length___legacy 1000000 2026 ns/op 1536 B/op 1 allocs/op +100_Tags_10__Length_graphite 1000000 1444 ns/op 1152 B/op 1 allocs/op +100_Tags_10__Length__prepend 300000 4601 ns/op 3584 B/op 2 allocs/op +100_Tags_10__Length___quoted 500000 3791 ns/op 1792 B/op 1 allocs/op +100_Tags_10__Length__esc_qtd 100000 12620 ns/op 3280 B/op 2 allocs/op + +10__Tags_100_Length___legacy 3000000 412 ns/op 1152 B/op 1 allocs/op +10__Tags_100_Length_graphite 5000000 396 ns/op 1024 B/op 1 allocs/op +10__Tags_100_Length__prepend 2000000 803 ns/op 1312 B/op 2 allocs/op +10__Tags_100_Length___quoted 1000000 1132 ns/op 1152 B/op 1 allocs/op +10__Tags_100_Length__esc_qtd 200000 11400 ns/op 2336 B/op 2 allocs/op + +100_Tags_100_Length___legacy 500000 3582 ns/op 10880 B/op 1 allocs/op +100_Tags_100_Length_graphite 500000 3183 ns/op 10240 B/op 1 allocs/op +100_Tags_100_Length__prepend 200000 6866 ns/op 14080 B/op 2 allocs/op +100_Tags_100_Length___quoted 200000 10604 ns/op 10880 B/op 1 allocs/op +100_Tags_100_Length__esc_qtd 20000 90575 ns/op 21969 B/op 2 allocs/op +*/ + +func BenchmarkIDs(b *testing.B) { + opts := NewTagOptions() + for _, bb := range tagBenchmarks { + for _, idScheme := range tagIDSchemes { + name := bb.name + idScheme.name + b.Run(name, func(b *testing.B) { + var ( + tags Tags + ) + + if idScheme.scheme == typeQuotedEscaped { + opts = opts.SetIDSchemeType(TypeQuoted) + tags = buildTags(b, bb.tagCount, bb.tagLength, opts, true) + } else { + opts = opts.SetIDSchemeType(idScheme.scheme) + tags = buildTags(b, bb.tagCount, bb.tagLength, opts, false) + } + + for i := 0; i < b.N; i++ { + _ = tags.ID() + } + }) + } + fmt.Println() + } +} + +func TestSerializedLength(t *testing.T) { + tag := Tag{Name: []byte("foo"), Value: []byte("bar")} + len, escaping := tag.serializedLength() + assert.Equal(t, 8, len) + assert.False(t, escaping.escapeName) + assert.False(t, escaping.escapeValue) + + tag.Name = []byte("f\ao") + len, escaping = tag.serializedLength() + assert.Equal(t, 9, len) + assert.True(t, escaping.escapeName) + assert.False(t, escaping.escapeValue) + + tag.Value = []byte(`b"ar`) + len, escaping = tag.serializedLength() + assert.Equal(t, 11, len) + assert.True(t, escaping.escapeName) + assert.True(t, escaping.escapeValue) + + tag.Name = []byte("baz") + len, escaping = tag.serializedLength() + assert.Equal(t, 10, len) + assert.False(t, escaping.escapeName) + assert.True(t, escaping.escapeValue) +} diff --git a/src/query/models/types.go b/src/query/models/types.go index 37fc6e151c..9031a130f8 100644 --- a/src/query/models/types.go +++ b/src/query/models/types.go @@ -26,20 +26,66 @@ import ( // Separators for tags. const ( - sep = byte(',') - eq = byte('=') + graphiteSep = byte('.') + sep = byte(',') + finish = byte('!') + eq = byte('=') + leftBracket = byte('{') + rightBracket = byte('}') +) + +// IDSchemeType determines the scheme for generating +// series IDs based on their tags. +type IDSchemeType uint16 + +const ( + // TypeDefault is an invalid scheme that indicates that the default scheme + // for the tag options version option should be used. + TypeDefault IDSchemeType = iota + // TypeLegacy describes a scheme where IDs are generated by appending + // tag name/value pairs with = and , separators. Note that an additional , is + // added to the end of the ID. + // + // NB: this should not be used, and exists here as a deprecated legacy + // ID generation scheme, as it may cause collisions in situations where + // incoming tags contain the following characters: << =," >>, for example: + // {t1:v1},{t2:v2} -> t1=v1,t2=v2, + // {t1:v1,t2:v2} -> t1=v1,t2=v2, + TypeLegacy + // TypeQuoted describes a scheme where IDs are generated by appending + // tag names with explicitly quoted and escaped tag values. Tag names are + // also escaped if they contain invalid characters. + // {t1:v1},{t2:v2} -> t1"v1"t2"v2" + // {t1:v1,t2:v2} -> t1"v1,t2:v2" + TypeQuoted + // TypePrependMeta describes a scheme where IDs are generated by prepending + // the length of each tag at the start of the ID + // {t1:v1},{t2:v2} -> 44t1v1t2v2 + // {t1:v1,t2:v2} -> 10t1v1,t2:v2 + TypePrependMeta + // TypeGraphite describes a scheme where IDs are generated to match graphite + // representation of the tags. This scheme should only be used on the graphite + // ingestion path, as it ignores tag names and is very prone to collisions if + // used on non-graphite data. + // {__g0__:v1},{__g1__:v2} -> v1.v2 + // + // NB: when TypeGraphite is specified, tags are ordered numerically rather + // than lexically. + TypeGraphite ) // TagOptions describes additional options for tags. type TagOptions interface { // Validate validates these tag options. Validate() error - // SetMetricName sets the name for the `metric name` metric. SetMetricName(metricName []byte) TagOptions - - // MetricName gets the name for the `metric name `metric`. + // MetricName gets the name for the metric name `metric`. MetricName() []byte + // SetIDSchemeType sets the ID generation scheme type. + SetIDSchemeType(scheme IDSchemeType) TagOptions + // IDSchemeType gets the ID generation scheme type. + IDSchemeType() IDSchemeType } // Tags represents a set of tags with options. @@ -80,7 +126,7 @@ type Matchers []Matcher // Metric is the individual metric that gets returned from the search endpoint. type Metric struct { - ID string + ID []byte Tags Tags } diff --git a/src/query/policy/resolver/interface.go b/src/query/policy/resolver/interface.go deleted file mode 100644 index e86d710f28..0000000000 --- a/src/query/policy/resolver/interface.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package resolver - -import ( - "context" - "time" - - "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/tsdb" -) - -// PolicyResolver resolves policy for a query. -type PolicyResolver interface { - // Resolve will resolve each metric ID to a FetchRequest with a list of FetchRanges. - // The list of ranges is guaranteed to cover the full [startTime, endTime). The best - // storage policy will be picked for the range with configured strategy, but there - // may still be no data retained for the range in the given storage policy. - Resolve( - ctx context.Context, - tagMatchers models.Matchers, - startTime, endTime time.Time, - tagOptions models.TagOptions, - ) ([]tsdb.FetchRequest, error) -} diff --git a/src/query/storage/consolidated_test.go b/src/query/storage/consolidated_test.go index 4a38137dfe..dda245d8e9 100644 --- a/src/query/storage/consolidated_test.go +++ b/src/query/storage/consolidated_test.go @@ -152,7 +152,7 @@ func TestConsolidation(t *testing.T) { seriesList := make(ts.SeriesList, len(datapoints)) for i, dp := range datapoints { seriesList[i] = ts.NewSeries( - fmt.Sprintf("name_%d", i), + []byte(fmt.Sprintf("name_%d", i)), dp, models.Tags{ Opts: models.NewTagOptions(), diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 3d7aac44e5..97cb7799c4 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -267,7 +267,7 @@ func BenchmarkFetchResultToPromResult(b *testing.B) { } series := ts.NewSeries( - fmt.Sprintf("series-%d", i), values, tags) + []byte(fmt.Sprintf("series-%d", i)), values, tags) fr.SeriesList = append(fr.SeriesList, series) } diff --git a/src/query/storage/fanout/storage_test.go b/src/query/storage/fanout/storage_test.go index 06754745ce..2be1f5c321 100644 --- a/src/query/storage/fanout/storage_test.go +++ b/src/query/storage/fanout/storage_test.go @@ -182,6 +182,7 @@ func TestFanoutWriteError(t *testing.T) { datapoints[0] = ts.Datapoint{Timestamp: time.Now(), Value: 1} err := store.Write(context.TODO(), &storage.WriteQuery{ Datapoints: datapoints, + Tags: models.NewTags(0, nil), }) assert.Error(t, err) } @@ -192,6 +193,7 @@ func TestFanoutWriteSuccess(t *testing.T) { datapoints[0] = ts.Datapoint{Timestamp: time.Now(), Value: 1} err := store.Write(context.TODO(), &storage.WriteQuery{ Datapoints: datapoints, + Tags: models.NewTags(0, nil), }) assert.NoError(t, err) } diff --git a/src/query/storage/index.go b/src/query/storage/index.go index bbbd3a83e0..b89b9bc5a9 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -35,14 +35,13 @@ func FromM3IdentToMetric( iterTags ident.TagIterator, tagOptions models.TagOptions, ) (models.Metric, error) { - id := identID.String() tags, err := FromIdentTagIteratorToTags(iterTags, tagOptions) if err != nil { return models.Metric{}, err } return models.Metric{ - ID: id, + ID: identID.Bytes(), Tags: tags, }, nil } diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index 1335d1911a..f5815aec69 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -66,7 +66,7 @@ func TestFromM3IdentToMetric(t *testing.T) { metric, err := FromM3IdentToMetric(testID, tagIters, models.NewTagOptions().SetMetricName(name)) require.NoError(t, err) - assert.Equal(t, testID.String(), metric.ID) + assert.Equal(t, testID.Bytes(), metric.ID) assert.Equal(t, testTags, metric.Tags.Tags) assert.Equal(t, name, metric.Tags.Opts.MetricName()) } diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index a47e1ea6b7..279c7d7ac0 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -413,8 +413,7 @@ func (s *m3storage) Write( var ( // TODO: Pool this once an ident pool is setup. We will have // to stop calling NoFinalize() below if we do that. - buf = make([]byte, 0, query.Tags.IDLen()) - idBuf = query.Tags.IDMarshalTo(buf) + idBuf = query.Tags.ID() id = ident.BytesID(idBuf) ) // Set id to NoFinalize to avoid cloning it in write operations diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index 23b55b87c4..ac938bf18c 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -538,14 +538,14 @@ func TestLocalSearchSuccess(t *testing.T) { actual := make(map[string]models.Metric) for _, m := range result.Metrics { - actual[m.ID] = m + actual[string(m.ID)] = m } for id, actual := range actual { expected, ok := expected[id] require.True(t, ok) - assert.Equal(t, expected.id, actual.ID) + assert.Equal(t, []byte(expected.id), actual.ID) assert.Equal(t, []models.Tag{{ Name: []byte(expected.tagName), Value: []byte(expected.tagValue), }}, actual.Tags.Tags) diff --git a/src/query/storage/types.go b/src/query/storage/types.go index dbfbd1328d..d392fedd85 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -133,7 +133,7 @@ type WriteQuery struct { } func (q *WriteQuery) String() string { - return q.Tags.ID() + return string(q.Tags.ID()) } // CompleteTagsQuery represents a query that returns an autocompleted diff --git a/src/query/storage/validator/storage.go b/src/query/storage/validator/storage.go index 4585b9eb49..a73cc5d213 100644 --- a/src/query/storage/validator/storage.go +++ b/src/query/storage/validator/storage.go @@ -127,7 +127,7 @@ func PromResultToSeriesList(promReadResp prometheus.PromResp, tagOptions models. } seriesList[i] = ts.NewSeries( - string(name), + name, dps, tags, ) diff --git a/src/query/storage/validator/storage_test.go b/src/query/storage/validator/storage_test.go index 567ff8da3e..5d3d7ca385 100644 --- a/src/query/storage/validator/storage_test.go +++ b/src/query/storage/validator/storage_test.go @@ -62,5 +62,5 @@ func TestConverter(t *testing.T) { assert.Equal(t, 3, tsList[0].Len()) assert.Equal(t, 10.0, tsList[0].Values().Datapoints()[0].Value) - assert.Equal(t, "test_name", tsList[0].Name()) + assert.Equal(t, []byte("test_name"), tsList[0].Name()) } diff --git a/src/query/test/block.go b/src/query/test/block.go index 2e4a4dea27..f52f3af704 100644 --- a/src/query/test/block.go +++ b/src/query/test/block.go @@ -101,10 +101,9 @@ func NewSeriesMeta(tagPrefix string, count int) []block.SeriesMeta { seriesMeta := make([]block.SeriesMeta, count) for i := range seriesMeta { tags := models.EmptyTags() - st := fmt.Sprintf("%s%d", tagPrefix, i) - t := []byte(st) - tags = tags.AddTag(models.Tag{Name: []byte("__name__"), Value: t}) - tags = tags.AddTag(models.Tag{Name: t, Value: t}) + st := []byte(fmt.Sprintf("%s%d", tagPrefix, i)) + tags = tags.AddTag(models.Tag{Name: []byte("__name__"), Value: st}) + tags = tags.AddTag(models.Tag{Name: st, Value: st}) seriesMeta[i] = block.SeriesMeta{ Name: st, Tags: tags, diff --git a/src/query/test/comparison.go b/src/query/test/comparison.go index 45f9cac894..7a17619b08 100644 --- a/src/query/test/comparison.go +++ b/src/query/test/comparison.go @@ -21,6 +21,7 @@ package test import ( + "bytes" "fmt" "math" "sort" @@ -104,7 +105,7 @@ func equalsWithDelta(t *testing.T, expected, actual, delta float64, debugMsg str type match struct { indices []int seriesTags []models.Tag - name string + name []byte values []float64 } @@ -113,7 +114,10 @@ type matches []match func (m matches) Len() int { return len(m) } func (m matches) Swap(i, j int) { m[i], m[j] = m[j], m[i] } func (m matches) Less(i, j int) bool { - return models.Tags{Tags: m[i].seriesTags}.ID() > models.Tags{Tags: m[j].seriesTags}.ID() + return bytes.Compare( + models.NewTags(0, nil).AddTags(m[i].seriesTags).ID(), + models.NewTags(0, nil).AddTags(m[j].seriesTags).ID(), + ) == -1 } // CompareLists compares series meta / index pairs diff --git a/src/query/ts/m3db/convert_test.go b/src/query/ts/m3db/convert_test.go index 78b6f2d213..a124ccc4c8 100644 --- a/src/query/ts/m3db/convert_test.go +++ b/src/query/ts/m3db/convert_test.go @@ -156,7 +156,7 @@ func verifyMetas( assert.Equal(t, []byte("b"), val) for i, m := range metas { - assert.Equal(t, fmt.Sprintf("abc%d", i), m.Name) + assert.Equal(t, []byte(fmt.Sprintf("abc%d", i)), m.Name) require.Equal(t, 1, m.Tags.Len()) val, found := m.Tags.Get([]byte("c")) assert.True(t, found) diff --git a/src/query/ts/m3db/encoded_block_iterator.go b/src/query/ts/m3db/encoded_block_iterator.go index 255ad244c9..fe0384d309 100644 --- a/src/query/ts/m3db/encoded_block_iterator.go +++ b/src/query/ts/m3db/encoded_block_iterator.go @@ -120,7 +120,7 @@ func (b *encodedBlock) buildSeriesMeta() error { } b.seriesMetas[i] = block.SeriesMeta{ - Name: iter.ID().String(), + Name: iter.ID().Bytes(), Tags: tags, } } diff --git a/src/query/ts/series.go b/src/query/ts/series.go index c44972fc7d..e8ceb2c089 100644 --- a/src/query/ts/series.go +++ b/src/query/ts/series.go @@ -31,13 +31,13 @@ import ( // indicating the number of milliseconds represented by each point. type Series struct { resolution time.Duration - name string + name []byte vals Values Tags models.Tags } // NewSeries creates a new Series at a given start time, backed by the provided values. -func NewSeries(name string, vals Values, tags models.Tags) *Series { +func NewSeries(name []byte, vals Values, tags models.Tags) *Series { return &Series{ name: name, vals: vals, @@ -46,7 +46,7 @@ func NewSeries(name string, vals Values, tags models.Tags) *Series { } // Name returns the name of the timeseries block -func (s *Series) Name() string { return s.name } +func (s *Series) Name() []byte { return s.name } // Len returns the number of values in the time series. Used for aggregation. func (s *Series) Len() int { return s.vals.Len() } diff --git a/src/query/ts/series_test.go b/src/query/ts/series_test.go index 5961028a0d..5a406526d0 100644 --- a/src/query/ts/series_test.go +++ b/src/query/ts/series_test.go @@ -35,9 +35,10 @@ func TestCreateNewSeries(t *testing.T) { {Name: []byte("biz"), Value: []byte("baz")}, }) values := NewFixedStepValues(1000, 10000, 1, time.Now()) - series := NewSeries("metrics", values, tags) + name := []byte("metrics") + series := NewSeries(name, values, tags) - assert.Equal(t, "metrics", series.Name()) + assert.Equal(t, name, series.Name()) assert.Equal(t, 10000, series.Len()) assert.Equal(t, 1.0, series.Values().ValueAt(0)) } diff --git a/src/query/tsdb/remote/codecs.go b/src/query/tsdb/remote/codecs.go index 6bdc1783a5..380260f8b2 100644 --- a/src/query/tsdb/remote/codecs.go +++ b/src/query/tsdb/remote/codecs.go @@ -75,7 +75,7 @@ func encodeFetchResult(results *storage.FetchResult) *rpc.FetchResponse { series[i] = &rpc.Series{ Meta: &rpc.SeriesMetadata{ - Id: []byte(result.Name()), + Id: result.Name(), }, Value: &rpc.Series_Decompressed{ Decompressed: &rpc.DecompressedSeries{ @@ -93,7 +93,7 @@ func encodeFetchResult(results *storage.FetchResult) *rpc.FetchResponse { // decodeDecompressedFetchResult decodes fetch results from a GRPC-compatible type. func decodeDecompressedFetchResult( - name string, + name []byte, tagOptions models.TagOptions, rpcSeries []*rpc.DecompressedSeries, ) ([]*ts.Series, error) { @@ -122,7 +122,7 @@ func decodeTags( } func decodeTs( - name string, + name []byte, tagOptions models.TagOptions, r *rpc.DecompressedSeries, ) (*ts.Series, error) { diff --git a/src/query/tsdb/remote/codecs_test.go b/src/query/tsdb/remote/codecs_test.go index 01fbd3e15d..0c2ee1ddeb 100644 --- a/src/query/tsdb/remote/codecs_test.go +++ b/src/query/tsdb/remote/codecs_test.go @@ -95,7 +95,7 @@ func createRPCSeries() []*rpc.DecompressedSeries { func TestDecodeFetchResult(t *testing.T) { rpcSeries := createRPCSeries() - name := "name" + name := []byte("name") metricName := []byte("!") tsSeries, err := decodeDecompressedFetchResult(name, models.NewTagOptions().SetMetricName(metricName), rpcSeries) diff --git a/src/query/util/execution/parallel.go b/src/query/util/execution/parallel.go index 2862aa84e2..67e8f53881 100644 --- a/src/query/util/execution/parallel.go +++ b/src/query/util/execution/parallel.go @@ -52,13 +52,10 @@ func ExecuteParallel(ctx context.Context, requests []Request) error { func processParallel(ctx context.Context, requests []Request) error { g, ctx := errgroup.WithContext(ctx) for _, req := range requests { - // Need to use a separate func since g.Go doesn't take input req := req - func() { - g.Go(func() error { - return req.Process(ctx) - }) - }() + g.Go(func() error { + return req.Process(ctx) + }) } return g.Wait() diff --git a/src/query/util/writer/int_writer.go b/src/query/util/writer/int_writer.go new file mode 100644 index 0000000000..bc6d0b448a --- /dev/null +++ b/src/query/util/writer/int_writer.go @@ -0,0 +1,88 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package writer + +// IntLength determines the number of digits in a base 10 integer. +func IntLength(i int) int { + if i == 0 { + return 1 + } + + count := 0 + for ; i > 0; i /= 10 { + count++ + } + + return count +} + +// WriteInteger writes a base 10 integer to a buffer at a given index. +// +// NB: based on fmt.Printf handling of integers, specifically base 10 case. +func WriteInteger(dst []byte, value, idx int) int { + // Because printing is easier right-to-left: format u into buf, ending at buf[i]. + // We could make things marginally faster by splitting the 32-bit case out + // into a separate block but it's not worth the duplication, so u has 64 bits. + // Use constants for the division and modulo for more efficient code. + // Switch cases ordered by popularity. + idx = idx + IntLength(value) + finalIndex := idx + for value >= 10 { + idx-- + dst[idx] = byte(48 + value%10) + next := value / 10 + value = next + } + + dst[idx-1] = byte(48 + value) + return finalIndex +} + +// IntsLength determines the number of digits in a list of base 10 integers, +// accounting for separators between each integer. +func IntsLength(is []int) int { + // initialize length accounting for separators. + l := len(is) - 1 + for _, i := range is { + l += IntLength(i) + } + + return l +} + +// WriteIntegers writes a slice of base 10 integer to a buffer at a given index, +// separating each value with the given separator, returning the index at which +// the write ends. +// +// NB: Ensure that there is sufficient space in the buffer to hold values and +// separators. +func WriteIntegers(dst []byte, values []int, sep byte, idx int) int { + l := len(values) - 1 + for _, v := range values[:l] { + idx = WriteInteger(dst, v, idx) + dst[idx] = sep + idx++ + } + + idx = WriteInteger(dst, values[l], idx) + // Write the last integer. + return idx +} diff --git a/src/query/util/writer/int_writer_test.go b/src/query/util/writer/int_writer_test.go new file mode 100644 index 0000000000..101f1f0840 --- /dev/null +++ b/src/query/util/writer/int_writer_test.go @@ -0,0 +1,139 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package writer + +import ( + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIntegerLength(t *testing.T) { + for i := 0; i < 18; i++ { + small := int(math.Pow(10, float64(i))) + large := small*10 - 1 + expected := i + 1 + assert.Equal(t, expected, IntLength(small)) + assert.Equal(t, expected, IntLength(large)) + } + + assert.Equal(t, 19, IntLength(1000000000000000000)) +} + +func TestWriteInteger(t *testing.T) { + ints := make([]int, 105) + for i := range ints { + l := IntLength(i) + buf := make([]byte, l) + idx := WriteInteger(buf, i, 0) + assert.Equal(t, l, idx) + assert.Equal(t, []byte(fmt.Sprint(i)), buf) + } +} + +func TestWriteIntegersAtIndex(t *testing.T) { + l := IntLength(345) + IntLength(12) + buf := make([]byte, l) + idx := WriteInteger(buf, 12, 0) + idx = WriteInteger(buf, 345, idx) + assert.Equal(t, 5, idx) + assert.Equal(t, []byte("12345"), buf) +} + +func TestWriteIntegersSingle(t *testing.T) { + sep := byte('!') + ints := []int{1} + l := IntsLength(ints) + assert.Equal(t, 1, l) + + buf := make([]byte, l) + idx := WriteIntegers(buf, ints, sep, 0) + assert.Equal(t, l, idx) + expected := []byte("1") + assert.Equal(t, expected, buf) + + ints = []int{10} + l = IntsLength(ints) + assert.Equal(t, 2, l) + buf = make([]byte, l) + idx = WriteIntegers(buf, ints, sep, 0) + assert.Equal(t, l, idx) + expected = []byte("10") + assert.Equal(t, expected, buf) +} + +func TestWriteIntegersSingleAtIndex(t *testing.T) { + sep := byte('!') + ints := []int{1} + buf := make([]byte, 2) + buf[0] = byte('?') + idx := WriteIntegers(buf, ints, sep, 1) + assert.Equal(t, 2, idx) + expected := []byte("?1") + assert.Equal(t, expected, buf) + + idx = 0 + idx = WriteIntegers(buf, ints, sep, idx) + idx = WriteIntegers(buf, ints, sep, idx) + assert.Equal(t, 2, idx) + expected = []byte("11") + assert.Equal(t, expected, buf) +} + +func TestWriteIntegersMultiple(t *testing.T) { + sep := byte('!') + ints := []int{1, 2} + l := IntsLength(ints) + assert.Equal(t, 3, l) + + buf := make([]byte, l) + idx := WriteIntegers(buf, ints, sep, 0) + assert.Equal(t, l, idx) + expected := []byte("1!2") + require.Equal(t, expected, buf) + + ints = []int{10, 20} + l = IntsLength(ints) + assert.Equal(t, 5, l) + buf = make([]byte, l) + idx = WriteIntegers(buf, ints, sep, 0) + assert.Equal(t, l, idx) + expected = []byte("10!20") + assert.Equal(t, expected, buf) +} + +func TestWriteIntegersMultipleAtIndex(t *testing.T) { + sep := byte('!') + ints := []int{1, 20, 300, 4000, 50000} + l := IntsLength(ints) + assert.Equal(t, 19, l) + + buf := make([]byte, l+2) + buf[0] = '?' + buf[l+1] = '?' + idx := WriteIntegers(buf, ints, sep, 1) + assert.Equal(t, l+1, idx) + expected := []byte("?1!20!300!4000!50000?") + require.Equal(t, expected, buf) +}