diff --git a/glide.lock b/glide.lock index a0c7c35dbc..4279b9cc3f 100644 --- a/glide.lock +++ b/glide.lock @@ -650,10 +650,6 @@ imports: - stylecheck - unused - version -- name: github.com/influxdata/influxdb - version: 01c8dd416270f424ab0c40f9291e269ac6921964 - subpackages: - - models testImports: - name: github.com/glycerine/go-unsnap-stream version: 98d31706395aaac22e29676617f2ee37bee55b5a diff --git a/glide.yaml b/glide.yaml index 31403de3ad..d65d9d9bde 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,10 +1,5 @@ package: github.com/m3db/m3 import: - - package: github.com/influxdata/influxdb - version: 01c8dd416270f424ab0c40f9291e269ac6921964 - subpackages: - - models - - package: github.com/m3db/bitset version: 07973db6b78acb62ac207d0538055e874b49d90d diff --git a/src/query/api/v1/handler/influxdb/rewrite.go b/src/query/api/v1/handler/influxdb/rewrite.go deleted file mode 100644 index 615ad1b6c8..0000000000 --- a/src/query/api/v1/handler/influxdb/rewrite.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright (c) 2019 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package influxdb - -import ( - "regexp" -) - -type regexpRewriter struct { - okStart, okRest [256]bool - replacement byte -} - -func newRegexpRewriter(startRe, restRe string) *regexpRewriter { - createArray := func(okRe string) (ret [256]bool) { - re := regexp.MustCompile(okRe) - // Check for only 7 bit non-control ASCII characters - for i := 32; i < 128; i++ { - if re.Match([]byte{byte(i)}) { - ret[i] = true - } - } - return - } - return ®expRewriter{okStart: createArray(startRe), okRest: createArray(restRe), replacement: byte('_')} -} - -func (rr *regexpRewriter) rewrite(input []byte) { - if len(input) == 0 { - return - } - if !rr.okStart[input[0]] { - input[0] = rr.replacement - } - for i := 1; i < len(input); i++ { - if !rr.okRest[input[i]] { - input[i] = rr.replacement - } - } -} - -// Utility, which handles both __name__ ('metric') tag, as well as -// rest of tags ('labels') -// -// It allow using any influxdb client, rewriting the tag names + the -// magic __name__ tag to match what Prometheus expects -type promRewriter struct { - metric, metricTail, label *regexpRewriter -} - -func newPromRewriter() *promRewriter { - return &promRewriter{ - metric: newRegexpRewriter( - "[a-zA-Z_:]", - "[a-zA-Z0-9_:]"), - metricTail: newRegexpRewriter( - "[a-zA-Z0-9_:]", - "[a-zA-Z0-9_:]"), - label: newRegexpRewriter( - "[a-zA-Z_]", "[a-zA-Z0-9_]")} -} - -func (pr *promRewriter) rewriteMetric(data []byte) { - pr.metric.rewrite(data) -} - -func (pr *promRewriter) rewriteMetricTail(data []byte) { - pr.metricTail.rewrite(data) -} - -func (pr *promRewriter) rewriteLabel(data []byte) { - pr.label.rewrite(data) -} diff --git a/src/query/api/v1/handler/influxdb/rewrite_test.go b/src/query/api/v1/handler/influxdb/rewrite_test.go deleted file mode 100644 index 213a7b7239..0000000000 --- a/src/query/api/v1/handler/influxdb/rewrite_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (c) 2019 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package influxdb - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -type test struct { - in, outMetric, outMetricTail, outLabel string -} - -func TestPromRewriter(t *testing.T) { - r := newPromRewriter() - tests := []test{{"foo", "foo", "foo", "foo"}, - {".bar", "_bar", "_bar", "_bar"}, - {"b.ar", "b_ar", "b_ar", "b_ar"}, - {":bar", ":bar", ":bar", "_bar"}, - {"ba:r", "ba:r", "ba:r", "ba_r"}, - {"9bar", "_bar", "9bar", "_bar"}, - } - for _, test := range tests { - in1 := []byte(test.in) - r.rewriteMetric(in1) - assert.Equal(t, test.outMetric, string(in1)) - in2 := []byte(test.in) - r.rewriteMetricTail(in2) - assert.Equal(t, test.outMetricTail, string(in2)) - in3 := []byte(test.in) - r.rewriteLabel(in3) - assert.Equal(t, test.outLabel, string(in3)) - } -} diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go deleted file mode 100644 index ba8546e6b9..0000000000 --- a/src/query/api/v1/handler/influxdb/write.go +++ /dev/null @@ -1,290 +0,0 @@ -// Copyright (c) 2019 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package influxdb - -import ( - "bytes" - "errors" - "fmt" - "io/ioutil" - "net/http" - - "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" - "github.com/m3db/m3/src/dbnode/client" - "github.com/m3db/m3/src/query/api/v1/handler" - "github.com/m3db/m3/src/query/api/v1/options" - "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/ts" - "github.com/m3db/m3/src/query/util/logging" - - xerrors "github.com/m3db/m3/src/x/errors" - xhttp "github.com/m3db/m3/src/x/net/http" - xtime "github.com/m3db/m3/src/x/time" - imodels "github.com/influxdata/influxdb/models" - "go.uber.org/zap" -) - -const ( - // InfluxWriteURL is the Influx DB write handler URL - InfluxWriteURL = handler.RoutePrefixV1 + "/influxdb/write" - - // InfluxWriteHTTPMethod is the HTTP method used with this resource - InfluxWriteHTTPMethod = http.MethodPost -) - -type ingestWriteHandler struct { - handlerOpts options.HandlerOptions - tagOpts models.TagOptions - promRewriter *promRewriter -} - -type ingestField struct { - name []byte // to be stored in __name__; rest of tags stay constant for the Point - value float64 -} - -type ingestIterator struct { - // what is being iterated (comes from outside) - points []imodels.Point - tagOpts models.TagOptions - promRewriter *promRewriter - - // internal - pointIndex int - err xerrors.MultiError - - // following entries are within current point, and initialized - // when we go to the first entry in the current point - fields []*ingestField - nextFieldIndex int - tags models.Tags -} - -func (ii *ingestIterator) populateFields() bool { - point := ii.points[ii.pointIndex] - it := point.FieldIterator() - n := 0 - ii.fields = make([]*ingestField, 0, 10) - bname := make([]byte, 0, len(point.Name())+1) - bname = append(bname, point.Name()...) - bname = append(bname, byte('_')) - bnamelen := len(bname) - ii.promRewriter.rewriteMetric(bname) - for it.Next() { - var value float64 = 0 - n += 1 - switch it.Type() { - case imodels.Boolean: - v, err := it.BooleanValue() - if err != nil { - ii.err = ii.err.Add(err) - continue - } - if v { - value = 1.0 - } - case imodels.Integer: - v, err := it.IntegerValue() - if err != nil { - ii.err = ii.err.Add(err) - continue - } - value = float64(v) - case imodels.Unsigned: - v, err := it.UnsignedValue() - if err != nil { - ii.err = ii.err.Add(err) - continue - } - value = float64(v) - case imodels.Float: - v, err := it.FloatValue() - if err != nil { - ii.err = ii.err.Add(err) - continue - } - value = v - default: - // TBD if we should stick strings as - // tags or not; to prevent cardinality - // explosion, we drop them for now - continue - } - tail := it.FieldKey() - name := make([]byte, 0, bnamelen+len(tail)) - name = append(name, bname...) - name = append(name, tail...) - ii.promRewriter.rewriteMetricTail(name[bnamelen:]) - ii.fields = append(ii.fields, &ingestField{name: name, value: value}) - } - return n > 0 -} - -func (ii *ingestIterator) Next() bool { - for len(ii.points) > ii.pointIndex { - if ii.nextFieldIndex == 0 { - // Populate tags only if we have fields we care about - if ii.populateFields() { - point := ii.points[ii.pointIndex] - ptags := point.Tags() - tags := models.NewTags(len(ptags), ii.tagOpts) - for _, tag := range ptags { - name := make([]byte, len(tag.Key)) - copy(name, tag.Key) - ii.promRewriter.rewriteLabel(name) - tags = tags.AddTagWithoutNormalizing(models.Tag{Name: name, Value: tag.Value}) - } - // sanity check no duplicate Name's; - // after Normalize, they are sorted so - // can just check them sequentially - valid := true - if len(tags.Tags) > 0 { - // Dummy w/o value set; used for dupe check and value is rewrittein in-place in SetName later on - tags = tags.AddTag(models.Tag{Name: tags.Opts.MetricName()}) - name := tags.Tags[0].Name - for i := 1; i < len(tags.Tags); i++ { - iname := tags.Tags[i].Name - if bytes.Equal(name, iname) { - ii.err = ii.err.Add(fmt.Errorf("non-unique Prometheus label %v", string(iname))) - valid = false - break - } - name = iname - } - } - if !valid { - ii.pointIndex += 1 - continue - } - ii.tags = tags - } - } - ii.nextFieldIndex += 1 - if ii.nextFieldIndex > len(ii.fields) { - ii.pointIndex += 1 - ii.nextFieldIndex = 0 - continue - } - return true - } - return false -} - -func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, xtime.Unit, []byte) { - if ii.pointIndex < len(ii.points) && ii.nextFieldIndex > 0 && len(ii.fields) > (ii.nextFieldIndex-1) { - point := ii.points[ii.pointIndex] - field := ii.fields[ii.nextFieldIndex-1] - tags := ii.tags.SetName(field.name) - - return tags, []ts.Datapoint{ts.Datapoint{Timestamp: point.Time(), - Value: field.value}}, xtime.Nanosecond, nil - } - return models.EmptyTags(), nil, 0, nil -} - -func (ii *ingestIterator) Reset() error { - ii.pointIndex = 0 - ii.nextFieldIndex = 0 - ii.err = xerrors.NewMultiError() - return nil -} - -func (ii *ingestIterator) Error() error { - return ii.err.FinalError() -} - -func NewInfluxWriterHandler(options options.HandlerOptions) http.Handler { - return &ingestWriteHandler{handlerOpts: options, - tagOpts: options.TagOptions(), - promRewriter: newPromRewriter()} -} - -func (iwh *ingestWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - bytes, err := ioutil.ReadAll(r.Body) - if err != nil { - xhttp.Error(w, err, http.StatusInternalServerError) - return - } - points, err := imodels.ParsePoints(bytes) - if err != nil { - xhttp.Error(w, err, http.StatusInternalServerError) - return - } - opts := ingest.WriteOptions{} - iter := &ingestIterator{points: points, tagOpts: iwh.tagOpts, promRewriter: iwh.promRewriter} - batchErr := iwh.handlerOpts.DownsamplerAndWriter().WriteBatch(r.Context(), iter, opts) - if batchErr == nil { - w.WriteHeader(http.StatusNoContent) - return - } - var ( - errs = batchErr.Errors() - lastRegularErr string - lastBadRequestErr string - numRegular int - numBadRequest int - ) - for _, err := range errs { - switch { - case client.IsBadRequestError(err): - numBadRequest++ - lastBadRequestErr = err.Error() - case xerrors.IsInvalidParams(err): - numBadRequest++ - lastBadRequestErr = err.Error() - default: - numRegular++ - lastRegularErr = err.Error() - } - } - - var status int - switch { - case numBadRequest == len(errs): - status = http.StatusBadRequest - default: - status = http.StatusInternalServerError - } - - logger := logging.WithContext(r.Context(), iwh.handlerOpts.InstrumentOpts()) - logger.Error("write error", - zap.String("remoteAddr", r.RemoteAddr), - zap.Int("httpResponseStatusCode", status), - zap.Int("numRegularErrors", numRegular), - zap.Int("numBadRequestErrors", numBadRequest), - zap.String("lastRegularError", lastRegularErr), - zap.String("lastBadRequestErr", lastBadRequestErr)) - - var resultErr string - if lastRegularErr != "" { - resultErr = fmt.Sprintf("retryable_errors: count=%d, last=%s", - numRegular, lastRegularErr) - } - if lastBadRequestErr != "" { - var sep string - if lastRegularErr != "" { - sep = ", " - } - resultErr = fmt.Sprintf("%s%sbad_request_errors: count=%d, last=%s", - resultErr, sep, numBadRequest, lastBadRequestErr) - } - xhttp.Error(w, errors.New(resultErr), status) -} diff --git a/src/query/api/v1/handler/influxdb/write_test.go b/src/query/api/v1/handler/influxdb/write_test.go deleted file mode 100644 index 4314f6ff88..0000000000 --- a/src/query/api/v1/handler/influxdb/write_test.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright (c) 2019 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package influxdb - -import ( - "fmt" - "testing" - - imodels "github.com/influxdata/influxdb/models" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// human-readable string out of what the iterator produces; -// they are easiest for human to handle -func (self *ingestIterator) pop(t *testing.T) string { - if self.Next() { - tags, dp, _, _ := self.Current() - assert.Equal(t, 1, len(dp)) - - return fmt.Sprintf("%s %v %s", tags.String(), dp[0].Value, dp[0].Timestamp) - } - return "" -} - -func TestIngestIterator(t *testing.T) { - // test prometheus-illegal measure and label components (should be _s) - // as well as all value types influxdb supports - s := `?measure:!,?tag1:!=tval1,?tag2:!=tval2 ?key1:!=3,?key2:!=2i 1574838670386469800 -?measure:!,?tag1:!=tval1,?tag2:!=tval2 ?key3:!="string",?key4:!=T 1574838670386469801 -` - points, err := imodels.ParsePoints([]byte(s)) - require.NoError(t, err) - iter := &ingestIterator{points: points, promRewriter: newPromRewriter()} - require.NoError(t, iter.Error()) - for _, line := range []string{ - "__name__: _measure:___key1:_, _tag1__: tval1, _tag2__: tval2 3 2019-11-27 07:11:10.3864698 +0000 UTC", - "__name__: _measure:___key2:_, _tag1__: tval1, _tag2__: tval2 2 2019-11-27 07:11:10.3864698 +0000 UTC", - "__name__: _measure:___key4:_, _tag1__: tval1, _tag2__: tval2 1 2019-11-27 07:11:10.386469801 +0000 UTC", - "", - "", - } { - assert.Equal(t, line, iter.pop(t)) - } - require.NoError(t, iter.Error()) -} - -func TestIngestIteratorDuplicateTag(t *testing.T) { - // Ensure that duplicate tag causes error and no metrics entries - s := `measure,lab!=2,lab?=3 key=2i 1574838670386469800 -` - points, err := imodels.ParsePoints([]byte(s)) - require.NoError(t, err) - iter := &ingestIterator{points: points, promRewriter: newPromRewriter()} - require.NoError(t, iter.Error()) - for _, line := range []string{ - "", - } { - assert.Equal(t, line, iter.pop(t)) - } - require.EqualError(t, iter.Error(), "non-unique Prometheus label lab_") -} - -func TestIngestIteratorDuplicateNameTag(t *testing.T) { - // Ensure that duplicate name tag causes error and no metrics entries - s := `measure,__name__=x key=2i 1574838670386469800 -` - points, err := imodels.ParsePoints([]byte(s)) - require.NoError(t, err) - iter := &ingestIterator{points: points, promRewriter: newPromRewriter()} - require.NoError(t, iter.Error()) - for _, line := range []string{ - "", - } { - assert.Equal(t, line, iter.pop(t)) - } - require.EqualError(t, iter.Error(), "non-unique Prometheus label __name__") -} diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 6a37244a32..da2e4f07f2 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -31,7 +31,6 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/database" "github.com/m3db/m3/src/query/api/v1/handler/graphite" - "github.com/m3db/m3/src/query/api/v1/handler/influxdb" m3json "github.com/m3db/m3/src/query/api/v1/handler/json" "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/query/api/v1/handler/openapi" @@ -170,10 +169,6 @@ func (h *Handler) RegisterRoutes() error { wrapped(native.NewPromReadInstantHandler(h.options)).ServeHTTP, ).Methods(native.PromReadInstantHTTPMethods...) - // InfluxDB write endpoint. - h.router.HandleFunc(influxdb.InfluxWriteURL, - wrapped(influxdb.NewInfluxWriterHandler(h.options)).ServeHTTP).Methods(influxdb.InfluxWriteHTTPMethod) - // Native M3 search and write endpoints. h.router.HandleFunc(handler.SearchURL, wrapped(handler.NewSearchHandler(h.options)).ServeHTTP,