diff --git a/src/query/api/v1/handler/graphite/find_parser.go b/src/query/api/v1/handler/graphite/find_parser.go index 4c9c1eb9c1..bc2ae371ab 100644 --- a/src/query/api/v1/handler/graphite/find_parser.go +++ b/src/query/api/v1/handler/graphite/find_parser.go @@ -76,7 +76,12 @@ func parseFindParamsToQuery(r *http.Request) ( return nil, xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest) } - matchers := graphiteStorage.TranslateQueryToMatchers(query) + matchers, err := graphiteStorage.TranslateQueryToMatchers(query) + if err != nil { + return nil, xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query), + http.StatusBadRequest) + } + return &storage.FetchQuery{ Raw: query, TagMatchers: matchers, diff --git a/src/query/api/v1/handler/graphite/pickle/opcodes.go b/src/query/api/v1/handler/graphite/pickle/opcodes.go new file mode 100644 index 0000000000..668c39c2a7 --- /dev/null +++ b/src/query/api/v1/handler/graphite/pickle/opcodes.go @@ -0,0 +1,36 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pickle + +// list of opcodes required for pickling graphite query results. +const ( + opNone = 0x4e + opMark = 0x28 + opStop = 0x2e + opBinInt = 0x4a + opBinUnicode = 0x58 + opBinFloat = 0x47 + opEmptyList = 0x5d + opAppends = 0x65 + opEmptyDict = 0x7d + opSetItems = 0x75 + opProto = 0x80 +) diff --git a/src/query/api/v1/handler/graphite/pickle/pickle_writer.go b/src/query/api/v1/handler/graphite/pickle/pickle_writer.go new file mode 100644 index 0000000000..6d3c781850 --- /dev/null +++ b/src/query/api/v1/handler/graphite/pickle/pickle_writer.go @@ -0,0 +1,175 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pickle + +import ( + "bufio" + "encoding/binary" + "io" + "math" +) + +var ( + programStart = []uint8{opProto, 0x2} + programEnd = []uint8{opStop} + listStart = []uint8{opEmptyList, opMark} + dictStart = []uint8{opEmptyDict, opMark} +) + +// A Writer is capable writing out the opcodes required by the pickle format. +// Note that this is a very limited implementation of pickling; just enough for +// us to implement the opcodes required by graphite /render. +type Writer struct { + w *bufio.Writer + buf [8]byte + err error +} + +// NewWriter creates a new pickle writer. +func NewWriter(w io.Writer) *Writer { + pw := &Writer{ + w: bufio.NewWriter(w), + } + + _, pw.err = pw.w.Write(programStart) + return pw +} + +// BeginDict starts marshalling a python dict. +func (p *Writer) BeginDict() { + if p.err != nil { + return + } + + if _, p.err = p.w.Write(dictStart); p.err != nil { + return + } +} + +// WriteDictKey writes a dictionary key. +func (p *Writer) WriteDictKey(s string) { + p.WriteString(s) +} + +// EndDict ends marshalling a python dict. +func (p *Writer) EndDict() { + if p.err != nil { + return + } + + p.err = p.w.WriteByte(opSetItems) +} + +// BeginList begins writing a new python list. +func (p *Writer) BeginList() { + if p.err != nil { + return + } + + _, p.err = p.w.Write(listStart) +} + +// EndList ends writing a python list. +func (p *Writer) EndList() { + if p.err != nil { + return + } + + p.w.WriteByte(opAppends) +} + +// WriteNone writes a python `None`. +func (p *Writer) WriteNone() { + if p.err != nil { + return + } + + p.err = p.w.WriteByte(opNone) +} + +// WriteFloat64 writes a float64 value. NaNs are converted in `None`. +func (p *Writer) WriteFloat64(v float64) { + if math.IsNaN(v) { + p.WriteNone() + return + } + + if p.err != nil { + return + } + + if p.err = p.w.WriteByte(opBinFloat); p.err != nil { + return + } + + binary.BigEndian.PutUint64(p.buf[:], math.Float64bits(v)) + _, p.err = p.w.Write(p.buf[:]) +} + +// WriteString writes a python string. +func (p *Writer) WriteString(s string) { + if p.err != nil { + return + } + + if p.err = p.w.WriteByte(opBinUnicode); p.err != nil { + return + } + + binary.LittleEndian.PutUint32(p.buf[:4], uint32(len(s))) + if _, p.err = p.w.Write(p.buf[:4]); p.err != nil { + return + } + + _, p.err = p.w.WriteString(s) +} + +// WriteInt writes an int value. +func (p *Writer) WriteInt(n int) { + if p.err != nil { + return + } + + if p.err = p.w.WriteByte(opBinInt); p.err != nil { + return + } + + binary.LittleEndian.PutUint32(p.buf[:4], uint32(n)) + _, p.err = p.w.Write(p.buf[:4]) +} + +// Close closes the writer, marking the end of the stream and flushing any +// pending values. +func (p *Writer) Close() error { + if p.err != nil { + return p.err + } + + if _, p.err = p.w.Write(programEnd); p.err != nil { + return p.err + } + + if p.err = p.w.Flush(); p.err != nil { + return p.err + } + + return nil +} diff --git a/src/query/api/v1/handler/graphite/pickle/pickle_writer_test.go b/src/query/api/v1/handler/graphite/pickle/pickle_writer_test.go new file mode 100644 index 0000000000..27b72f9eda --- /dev/null +++ b/src/query/api/v1/handler/graphite/pickle/pickle_writer_test.go @@ -0,0 +1,106 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pickle + +import ( + "bytes" + "math" + "testing" + + "github.com/hydrogen18/stalecucumber" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWriteEmptyDict(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + w.BeginDict() + w.EndDict() + require.NoError(t, w.Close()) + + var m map[interface{}]interface{} + require.NoError(t, unpickle(buf.Bytes(), &m)) + assert.Equal(t, map[interface{}]interface{}{}, m) +} + +func TestWriteEmptyList(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + w.BeginList() + w.EndList() + require.NoError(t, w.Close()) + + var m []string + require.NoError(t, unpickle(buf.Bytes(), &m)) + assert.Equal(t, []string{}, m) +} + +func TestWriteComplex(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + w.BeginDict() + w.WriteDictKey("step") + w.WriteInt(3494945) + w.WriteDictKey("pi") + w.WriteFloat64(3.45E10) + w.WriteDictKey("none") + w.WriteNone() + w.WriteDictKey("noNumber") + w.WriteFloat64(math.NaN()) + w.WriteDictKey("skey") + w.WriteString("hello world") + w.WriteDictKey("nested") + w.BeginDict() + w.WriteDictKey("fooBar") + w.BeginList() + w.WriteFloat64(349439.3494) + w.WriteInt(-9459450) + w.WriteString("A Nested String") + w.EndList() + w.EndDict() + w.EndDict() + require.NoError(t, w.Close()) + + s := struct { + Step int + Pi float64 + NoNumber *float64 + Skey string + Nested struct { + FooBar []interface{} + } + }{} + + require.NoError(t, unpickle(buf.Bytes(), &s)) + assert.Equal(t, 3494945, s.Step) + assert.Equal(t, 3.45E10, s.Pi) + assert.Nil(t, s.NoNumber) + assert.Equal(t, "hello world", s.Skey) + assert.Equal(t, []interface{}{ + 349439.3494, int64(-9459450), "A Nested String", + }, s.Nested.FooBar) +} + +func unpickle(b []byte, data interface{}) error { + r := bytes.NewReader(b) + return stalecucumber.UnpackInto(data).From(stalecucumber.Unpickle(r)) +} diff --git a/src/query/api/v1/handler/graphite/render.go b/src/query/api/v1/handler/graphite/render.go index 2596eb033e..f050dd0857 100644 --- a/src/query/api/v1/handler/graphite/render.go +++ b/src/query/api/v1/handler/graphite/render.go @@ -194,6 +194,6 @@ func (h *renderHandler) serveHTTP( SortApplied: true, } - err = WriteRenderResponse(w, response) + err = WriteRenderResponse(w, response, p.Format) return respError{err: err, code: http.StatusOK} } diff --git a/src/query/api/v1/handler/graphite/render_parser.go b/src/query/api/v1/handler/graphite/render_parser.go index 5fc5d6210c..8a1e495c4a 100644 --- a/src/query/api/v1/handler/graphite/render_parser.go +++ b/src/query/api/v1/handler/graphite/render_parser.go @@ -28,6 +28,7 @@ import ( "strconv" "time" + "github.com/m3db/m3/src/query/api/v1/handler/graphite/pickle" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/graphite/ts" @@ -38,6 +39,7 @@ const ( realTimeQueryThreshold = time.Minute queryRangeShiftThreshold = 55 * time.Minute queryRangeShift = 15 * time.Second + pickleFormat = "pickle" ) var ( @@ -49,7 +51,14 @@ var ( func WriteRenderResponse( w http.ResponseWriter, series ts.SeriesList, + format string, ) error { + if format == pickleFormat { + w.Header().Set("Content-Type", "application/octet-stream") + return renderResultsPickle(w, series.Values) + } + + // NB: return json unless requesting specifically `pickleFormat` w.Header().Set("Content-Type", "application/json") return renderResultsJSON(w, series.Values) } @@ -214,3 +223,36 @@ func renderResultsJSON(w io.Writer, series []*ts.Series) error { jw.EndArray() return jw.Close() } + +func renderResultsPickle(w io.Writer, series []*ts.Series) error { + pw := pickle.NewWriter(w) + pw.BeginList() + + for _, s := range series { + pw.BeginDict() + pw.WriteDictKey("name") + pw.WriteString(s.Name()) + + pw.WriteDictKey("start") + pw.WriteInt(int(s.StartTime().UTC().Unix())) + + pw.WriteDictKey("end") + pw.WriteInt(int(s.EndTime().UTC().Unix())) + + pw.WriteDictKey("step") + pw.WriteInt(s.MillisPerStep() / 1000) + + pw.WriteDictKey("values") + pw.BeginList() + for i := 0; i < s.Len(); i++ { + pw.WriteFloat64(s.ValueAt(i)) + } + pw.EndList() + + pw.EndDict() + } + + pw.EndList() + + return pw.Close() +} diff --git a/src/query/graphite/storage/converter.go b/src/query/graphite/storage/converter.go index e1d358c501..01d4624f11 100644 --- a/src/query/graphite/storage/converter.go +++ b/src/query/graphite/storage/converter.go @@ -30,6 +30,10 @@ const ( carbonGlobRune = '*' ) +var ( + wildcard = []byte(".*") +) + func glob(metric string) []byte { globLen := len(metric) for _, c := range metric { @@ -65,6 +69,6 @@ func matcherTerminator(count int) models.Matcher { return models.Matcher{ Type: models.MatchNotRegexp, Name: graphite.TagName(count), - Value: []byte(".*"), + Value: wildcard, } } diff --git a/src/query/graphite/storage/converter_test.go b/src/query/graphite/storage/converter_test.go index dd25dec31f..c5f5fb2ae4 100644 --- a/src/query/graphite/storage/converter_test.go +++ b/src/query/graphite/storage/converter_test.go @@ -61,7 +61,7 @@ func TestConvertMetricPartToMatcher(t *testing.T) { } } -func TestGetMatcherTerminator(t *testing.T) { +func TestMatcherTerminator(t *testing.T) { for i := 0; i < 100; i++ { expected := models.Matcher{ Type: models.MatchNotRegexp, diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index bbf6481ed0..3c437f9190 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -23,6 +23,7 @@ package storage import ( "context" "errors" + "fmt" "time" "github.com/m3db/m3/src/query/cost" @@ -32,6 +33,9 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" m3ts "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/query/util/logging" + + "go.uber.org/zap" ) var ( @@ -56,18 +60,40 @@ func NewM3WrappedStorage( return &m3WrappedStore{m3: m3storage, enforcer: enforcer} } -// TranslateQueryToMatchers converts a graphite query to tag matcher pairs. -func TranslateQueryToMatchers(query string) models.Matchers { +// translates a graphite query to tag matcher pairs. +func translateQueryToMatchers( + query string, + withTerminator bool, +) (models.Matchers, error) { metricLength := graphite.CountMetricParts(query) - matchers := make(models.Matchers, metricLength) + matchersLength := metricLength + if withTerminator { + // Add space for a terminator character. + matchersLength++ + } + + matchers := make(models.Matchers, matchersLength) for i := 0; i < metricLength; i++ { metric := graphite.ExtractNthMetricPart(query, i) if len(metric) > 0 { matchers[i] = convertMetricPartToMatcher(i, metric) + } else { + return nil, fmt.Errorf("invalid matcher format: %s", query) } } - return matchers + if withTerminator { + // Add a terminator matcher at the end to ensure expansion is terminated at + // the last given metric part. + matchers[metricLength] = matcherTerminator(metricLength) + } + + return matchers, nil +} + +// TranslateQueryToMatchers converts a graphite query to tag matcher pairs. +func TranslateQueryToMatchers(query string) (models.Matchers, error) { + return translateQueryToMatchers(query, false) } // GetQueryTerminatorTagName will return the name for the terminator matcher in @@ -77,19 +103,12 @@ func GetQueryTerminatorTagName(query string) []byte { return graphite.TagName(metricLength) } -func translateQuery(query string, opts FetchOptions) *storage.FetchQuery { - metricLength := graphite.CountMetricParts(query) - matchers := make(models.Matchers, metricLength+1) - for i := 0; i < metricLength; i++ { - metric := graphite.ExtractNthMetricPart(query, i) - if len(metric) > 0 { - matchers[i] = convertMetricPartToMatcher(i, metric) - } +func translateQuery(query string, opts FetchOptions) (*storage.FetchQuery, error) { + matchers, err := translateQueryToMatchers(query, true) + if err != nil { + return nil, err } - // Add a terminator matcher at the end to ensure expansion is terminated at - // the last given metric part. - matchers[metricLength] = matcherTerminator(metricLength) return &storage.FetchQuery{ Raw: query, TagMatchers: matchers, @@ -98,7 +117,7 @@ func translateQuery(query string, opts FetchOptions) *storage.FetchQuery { // NB: interval is not used for initial consolidation step from the storage // so it's fine to use default here. Interval: time.Duration(0), - } + }, nil } func translateTimeseries( @@ -135,7 +154,18 @@ func translateTimeseries( func (s *m3WrappedStore) FetchByQuery( ctx xctx.Context, query string, opts FetchOptions, ) (*FetchResult, error) { - m3query := translateQuery(query, opts) + m3query, err := translateQuery(query, opts) + if err != nil { + // NB: error here implies the query cannot be translated; empty set expected + // rather than propagating an error. + logger := logging.WithContext(ctx.RequestContext()) + logger.Info("could not translate query, returning empty results", + zap.String("query", query)) + return &FetchResult{ + SeriesList: []*ts.Series{}, + }, nil + } + m3ctx, cancel := context.WithTimeout(ctx.RequestContext(), opts.Timeout) defer cancel() fetchOptions := storage.NewFetchOptions() diff --git a/src/query/graphite/storage/m3_wrapper_test.go b/src/query/graphite/storage/m3_wrapper_test.go index 3e5e09f87c..f6e523d01d 100644 --- a/src/query/graphite/storage/m3_wrapper_test.go +++ b/src/query/graphite/storage/m3_wrapper_test.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/mock" m3ts "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/query/util/logging" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -40,7 +41,7 @@ import ( ) func TestTranslateQuery(t *testing.T) { - query := `foo.ba[rz].q*x.terminator.will.be.back?` + query := `foo.ba[rz].q*x.terminator.will.be.*.back?` end := time.Now() start := end.Add(time.Hour * -2) opts := FetchOptions{ @@ -51,7 +52,8 @@ func TestTranslateQuery(t *testing.T) { }, } - translated := translateQuery(query, opts) + translated, err := translateQuery(query, opts) + assert.NoError(t, err) assert.Equal(t, end, translated.End) assert.Equal(t, start, translated.Start) assert.Equal(t, time.Duration(0), translated.Interval) @@ -64,13 +66,35 @@ func TestTranslateQuery(t *testing.T) { {Type: models.MatchRegexp, Name: graphite.TagName(3), Value: []byte("terminator")}, {Type: models.MatchRegexp, Name: graphite.TagName(4), Value: []byte("will")}, {Type: models.MatchRegexp, Name: graphite.TagName(5), Value: []byte("be")}, - {Type: models.MatchRegexp, Name: graphite.TagName(6), Value: []byte("back?")}, - {Type: models.MatchNotRegexp, Name: graphite.TagName(7), Value: []byte(".*")}, + {Type: models.MatchRegexp, Name: graphite.TagName(6), Value: []byte(".*")}, + {Type: models.MatchRegexp, Name: graphite.TagName(7), Value: []byte("back?")}, + {Type: models.MatchNotRegexp, Name: graphite.TagName(8), Value: []byte(".*")}, } assert.Equal(t, expected, matchers) } +func TestTranslateQueryTrailingDot(t *testing.T) { + query := `foo.` + end := time.Now() + start := end.Add(time.Hour * -2) + opts := FetchOptions{ + StartTime: start, + EndTime: end, + DataOptions: DataOptions{ + Timeout: time.Minute, + }, + } + + translated, err := translateQuery(query, opts) + assert.Nil(t, translated) + assert.Error(t, err) + + matchers, err := TranslateQueryToMatchers(query) + assert.Nil(t, matchers) + assert.Error(t, err) +} + func TestTranslateTimeseries(t *testing.T) { ctx := xctx.New() resolution := 10 * time.Second @@ -171,3 +195,24 @@ func TestFetchByQuery(t *testing.T) { // NB: ensure the fetch was called with the base enforcer's child correctly assert.Equal(t, childEnforcer, store.LastFetchOptions().Enforcer) } + +func TestFetchByInvalidQuery(t *testing.T) { + logging.InitWithCores(nil) + store := mock.NewMockStorage() + start := time.Now().Add(time.Hour * -1) + end := time.Now() + opts := FetchOptions{ + StartTime: start, + EndTime: end, + DataOptions: DataOptions{ + Timeout: time.Minute, + }, + } + + query := "a." + ctx := xctx.New() + wrapper := NewM3WrappedStorage(store, nil) + result, err := wrapper.FetchByQuery(ctx, query, opts) + assert.NoError(t, err) + require.Equal(t, 0, len(result.SeriesList)) +}