From 04e1379ce62154d1b9de59d485cbc5c63d37a063 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 12 Aug 2019 14:15:48 +0300 Subject: [PATCH] chore(influxdb): update the import path of influxdb lib to the new one MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This also upgrades it to the latest version and adds a benchmark which shows some less allocation: name old time/op new time/op delta Influxdb1Second-4 3.56ms ± 0% 3.56ms ± 1% ~ (p=0.841 n=5+5) Influxdb2Second-4 12.1ms ± 0% 12.1ms ± 0% ~ (p=0.095 n=5+5) Influxdb100Milliseconds-4 341µs ± 1% 333µs ± 1% -2.34% (p=0.008 n=5+5) name old alloc/op new alloc/op delta Influxdb1Second-4 19.1kB ± 0% 16.6kB ± 0% -13.37% (p=0.008 n=5+5) Influxdb2Second-4 18.8kB ± 0% 16.2kB ± 0% -13.63% (p=0.016 n=5+4) Influxdb100Milliseconds-4 18.7kB ± 0% 16.1kB ± 0% -13.72% (p=0.008 n=5+5) name old allocs/op new allocs/op delta Influxdb1Second-4 291 ± 0% 231 ± 0% -20.62% (p=0.008 n=5+5) Influxdb2Second-4 291 ± 0% 231 ± 0% -20.62% (p=0.008 n=5+5) Influxdb100Milliseconds-4 291 ± 0% 231 ± 0% -20.62% (p=0.008 n=5+5) The same test as in fce3884b gets 6909 RPS with 2.5GB of memory usage so a considerate bump in rps for small amount of memory. --- Gopkg.lock | 12 +- stats/influxdb/bench_test.go | 59 +++ stats/influxdb/collector.go | 6 +- stats/influxdb/util.go | 2 +- stats/influxdb/util_test.go | 2 +- vendor/github.com/influxdata/influxdb/LICENSE | 20 -- .../influxdb/LICENSE_OF_DEPENDENCIES.md | 62 ---- .../influxdata/influxdb/models/consistency.go | 48 --- .../influxdata/influxdb1-client/LICENSE | 21 ++ .../models/inline_fnv.go | 2 +- .../models/inline_strconv_parse.go | 2 +- .../models/points.go | 339 +++++++++++------- .../models/rows.go | 0 .../models/statistic.go | 0 .../models/time.go | 0 .../models/uint_support.go | 0 .../pkg/escape/bytes.go | 2 +- .../pkg/escape/strings.go | 0 .../client => influxdb1-client}/v2/client.go | 216 +++++++---- .../client => influxdb1-client}/v2/udp.go | 4 + 20 files changed, 460 insertions(+), 337 deletions(-) create mode 100644 stats/influxdb/bench_test.go delete mode 100644 vendor/github.com/influxdata/influxdb/LICENSE delete mode 100644 vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md delete mode 100644 vendor/github.com/influxdata/influxdb/models/consistency.go create mode 100644 vendor/github.com/influxdata/influxdb1-client/LICENSE rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/inline_fnv.go (91%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/inline_strconv_parse.go (94%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/points.go (91%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/rows.go (100%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/statistic.go (100%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/time.go (100%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/uint_support.go (100%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/pkg/escape/bytes.go (96%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/pkg/escape/strings.go (100%) rename vendor/github.com/influxdata/{influxdb/client => influxdb1-client}/v2/client.go (83%) rename vendor/github.com/influxdata/{influxdb/client => influxdb1-client}/v2/udp.go (94%) diff --git a/Gopkg.lock b/Gopkg.lock index 459b3e502b0..955039335a0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -256,16 +256,16 @@ version = "v1.0" [[projects]] - digest = "1:32b1db7dd0c6cee1e65ab3841b7f8d8a9f0a6d97f85229137efee95a0dfcfddd" - name = "github.com/influxdata/influxdb" + branch = "master" + digest = "1:50708c8fc92aec981df5c446581cf9f90ba9e2a5692118e0ce75d4534aaa14a2" + name = "github.com/influxdata/influxdb1-client" packages = [ - "client/v2", "models", "pkg/escape", + "v2", ] pruneopts = "NUT" - revision = "6ac835404e7e64ea7299a6eebcce1ab1ef15fe3c" - version = "v1.5.0" + revision = "8ff2fc3824fcb533795f9a2f233275f0bb18d6c5" [[projects]] branch = "master" @@ -697,7 +697,7 @@ "github.com/dustin/go-humanize", "github.com/fatih/color", "github.com/gorilla/websocket", - "github.com/influxdata/influxdb/client/v2", + "github.com/influxdata/influxdb1-client/v2", "github.com/julienschmidt/httprouter", "github.com/kelseyhightower/envconfig", "github.com/klauspost/compress/zstd", diff --git a/stats/influxdb/bench_test.go b/stats/influxdb/bench_test.go new file mode 100644 index 00000000000..e3f3b046fa8 --- /dev/null +++ b/stats/influxdb/bench_test.go @@ -0,0 +1,59 @@ +package influxdb + +import ( + "io" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/loadimpact/k6/stats" +) + +func benchmarkInfluxdb(b *testing.B, t time.Duration) { + testCollectorCycle(b, func(rw http.ResponseWriter, r *http.Request) { + for { + time.Sleep(t) + m, _ := io.CopyN(ioutil.Discard, r.Body, 1<<18) // read 1/4 mb a time + if m == 0 { + break + } + } + rw.WriteHeader(204) + }, func(tb testing.TB, c *Collector) { + b = tb.(*testing.B) + b.ResetTimer() + + var samples = make(stats.Samples, 10) + for i := 0; i < len(samples); i++ { + samples[i] = stats.Sample{ + Metric: stats.New("testGauge", stats.Gauge), + Time: time.Now(), + Tags: stats.NewSampleTags(map[string]string{ + "something": "else", + "VU": "21", + "else": "something", + }), + Value: 2.0, + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.Collect([]stats.SampleContainer{samples}) + time.Sleep(time.Nanosecond * 20) + } + }) +} + +func BenchmarkInfluxdb1Second(b *testing.B) { + benchmarkInfluxdb(b, time.Second) +} + +func BenchmarkInfluxdb2Second(b *testing.B) { + benchmarkInfluxdb(b, 2*time.Second) +} + +func BenchmarkInfluxdb100Milliseconds(b *testing.B) { + benchmarkInfluxdb(b, 100*time.Millisecond) +} diff --git a/stats/influxdb/collector.go b/stats/influxdb/collector.go index a6414c733f3..0b9bb9e6584 100644 --- a/stats/influxdb/collector.go +++ b/stats/influxdb/collector.go @@ -26,11 +26,10 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/client/v2" - "github.com/sirupsen/logrus" - + client "github.com/influxdata/influxdb1-client/v2" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/stats" + "github.com/sirupsen/logrus" ) // Verify that Collector implements lib.Collector @@ -116,6 +115,7 @@ func (c *Collector) commit() { <-c.semaphoreCh }() logrus.Debug("InfluxDB: Committing...") + logrus.WithField("samples", len(samples)).Debug("InfluxDB: Writing...") batch, err := c.batchFromSamples(samples) if err != nil { diff --git a/stats/influxdb/util.go b/stats/influxdb/util.go index 4f88975836a..eeaa31cf4c9 100644 --- a/stats/influxdb/util.go +++ b/stats/influxdb/util.go @@ -23,7 +23,7 @@ package influxdb import ( "strings" - client "github.com/influxdata/influxdb/client/v2" + client "github.com/influxdata/influxdb1-client/v2" null "gopkg.in/guregu/null.v3" ) diff --git a/stats/influxdb/util_test.go b/stats/influxdb/util_test.go index 2ad746d1082..29bef1b7e0f 100644 --- a/stats/influxdb/util_test.go +++ b/stats/influxdb/util_test.go @@ -23,7 +23,7 @@ package influxdb import ( "testing" - client "github.com/influxdata/influxdb/client/v2" + client "github.com/influxdata/influxdb1-client/v2" "github.com/stretchr/testify/assert" null "gopkg.in/guregu/null.v3" ) diff --git a/vendor/github.com/influxdata/influxdb/LICENSE b/vendor/github.com/influxdata/influxdb/LICENSE deleted file mode 100644 index 63cef79ba6f..00000000000 --- a/vendor/github.com/influxdata/influxdb/LICENSE +++ /dev/null @@ -1,20 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2013-2016 Errplane Inc. - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software is furnished to do so, -subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md b/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md deleted file mode 100644 index ea6fc69f30d..00000000000 --- a/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md +++ /dev/null @@ -1,62 +0,0 @@ -- # List -- bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE) -- collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE) -- github.com/BurntSushi/toml [MIT LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING) -- github.com/RoaringBitmap/roaring [APACHE LICENSE](https://github.com/RoaringBitmap/roaring/blob/master/LICENSE) -- github.com/beorn7/perks [MIT LICENSE](https://github.com/beorn7/perks/blob/master/LICENSE) -- github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license) -- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE) -- github.com/cespare/xxhash [MIT LICENSE](https://github.com/cespare/xxhash/blob/master/LICENSE.txt) -- github.com/clarkduvall/hyperloglog [MIT LICENSE](https://github.com/clarkduvall/hyperloglog/blob/master/LICENSE) -- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE) -- github.com/dgrijalva/jwt-go [MIT LICENSE](https://github.com/dgrijalva/jwt-go/blob/master/LICENSE) -- github.com/dgryski/go-bits [MIT LICENSE](https://github.com/dgryski/go-bits/blob/master/LICENSE) -- github.com/dgryski/go-bitstream [MIT LICENSE](https://github.com/dgryski/go-bitstream/blob/master/LICENSE) -- github.com/glycerine/go-unsnap-stream [MIT LICENSE](https://github.com/glycerine/go-unsnap-stream/blob/master/LICENSE) -- github.com/gogo/protobuf/proto [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE) -- github.com/golang/protobuf [BSD LICENSE](https://github.com/golang/protobuf/blob/master/LICENSE) -- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE) -- github.com/google/go-cmp [BSD LICENSE](https://github.com/google/go-cmp/blob/master/LICENSE) -- github.com/influxdata/influxql [MIT LICENSE](https://github.com/influxdata/influxql/blob/master/LICENSE) -- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt) -- github.com/influxdata/yamux [MOZILLA PUBLIC LICENSE](https://github.com/influxdata/yamux/blob/master/LICENSE) -- github.com/influxdata/yarpc [MIT LICENSE](https://github.com/influxdata/yarpc/blob/master/LICENSE) -- github.com/jsternberg/zap-logfmt [MIT LICENSE](https://github.com/jsternberg/zap-logfmt/blob/master/LICENSE) -- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE) -- github.com/mattn/go-isatty [MIT LICENSE](https://github.com/mattn/go-isatty/blob/master/LICENSE) -- github.com/matttproud/golang_protobuf_extensions [APACHE LICENSE](https://github.com/matttproud/golang_protobuf_extensions/blob/master/LICENSE) -- github.com/opentracing/opentracing-go [MIT LICENSE](https://github.com/opentracing/opentracing-go/blob/master/LICENSE) -- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE) -- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) -- github.com/philhofer/fwd [MIT LICENSE](https://github.com/philhofer/fwd/blob/master/LICENSE.md) -- github.com/prometheus/client_golang [MIT LICENSE](https://github.com/prometheus/client_golang/blob/master/LICENSE) -- github.com/prometheus/client_model [MIT LICENSE](https://github.com/prometheus/client_model/blob/master/LICENSE) -- github.com/prometheus/common [APACHE LICENSE](https://github.com/prometheus/common/blob/master/LICENSE) -- github.com/prometheus/procfs [APACHE LICENSE](https://github.com/prometheus/procfs/blob/master/LICENSE) -- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) -- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE) -- github.com/tinylib/msgp [MIT LICENSE](https://github.com/tinylib/msgp/blob/master/LICENSE) -- go.uber.org/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt) -- go.uber.org/multierr [MIT LICENSE](https://github.com/uber-go/multierr/blob/master/LICENSE.txt) -- go.uber.org/zap [MIT LICENSE](https://github.com/uber-go/zap/blob/master/LICENSE.txt) -- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) -- golang.org/x/net [BSD LICENSE](https://github.com/golang/net/blob/master/LICENSE) -- golang.org/x/sys [BSD LICENSE](https://github.com/golang/sys/blob/master/LICENSE) -- golang.org/x/text [BSD LICENSE](https://github.com/golang/text/blob/master/LICENSE) -- golang.org/x/time [BSD LICENSE](https://github.com/golang/time/blob/master/LICENSE) -- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt) -- github.com/xlab/treeprint [MIT LICENSE](https://github.com/xlab/treeprint/blob/master/LICENSE) - - - - - - - - - - - - - - diff --git a/vendor/github.com/influxdata/influxdb/models/consistency.go b/vendor/github.com/influxdata/influxdb/models/consistency.go deleted file mode 100644 index 2a3269bca11..00000000000 --- a/vendor/github.com/influxdata/influxdb/models/consistency.go +++ /dev/null @@ -1,48 +0,0 @@ -package models - -import ( - "errors" - "strings" -) - -// ConsistencyLevel represent a required replication criteria before a write can -// be returned as successful. -// -// The consistency level is handled in open-source InfluxDB but only applicable to clusters. -type ConsistencyLevel int - -const ( - // ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet. - ConsistencyLevelAny ConsistencyLevel = iota - - // ConsistencyLevelOne requires at least one data node acknowledged a write. - ConsistencyLevelOne - - // ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write. - ConsistencyLevelQuorum - - // ConsistencyLevelAll requires all data nodes to acknowledge a write. - ConsistencyLevelAll -) - -var ( - // ErrInvalidConsistencyLevel is returned when parsing the string version - // of a consistency level. - ErrInvalidConsistencyLevel = errors.New("invalid consistency level") -) - -// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const. -func ParseConsistencyLevel(level string) (ConsistencyLevel, error) { - switch strings.ToLower(level) { - case "any": - return ConsistencyLevelAny, nil - case "one": - return ConsistencyLevelOne, nil - case "quorum": - return ConsistencyLevelQuorum, nil - case "all": - return ConsistencyLevelAll, nil - default: - return 0, ErrInvalidConsistencyLevel - } -} diff --git a/vendor/github.com/influxdata/influxdb1-client/LICENSE b/vendor/github.com/influxdata/influxdb1-client/LICENSE new file mode 100644 index 00000000000..83bafde92ee --- /dev/null +++ b/vendor/github.com/influxdata/influxdb1-client/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 InfluxData + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/influxdata/influxdb/models/inline_fnv.go b/vendor/github.com/influxdata/influxdb1-client/models/inline_fnv.go similarity index 91% rename from vendor/github.com/influxdata/influxdb/models/inline_fnv.go rename to vendor/github.com/influxdata/influxdb1-client/models/inline_fnv.go index eec1ae8b013..177885d0cb8 100644 --- a/vendor/github.com/influxdata/influxdb/models/inline_fnv.go +++ b/vendor/github.com/influxdata/influxdb1-client/models/inline_fnv.go @@ -1,4 +1,4 @@ -package models // import "github.com/influxdata/influxdb/models" +package models // import "github.com/influxdata/influxdb1-client/models" // from stdlib hash/fnv/fnv.go const ( diff --git a/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go b/vendor/github.com/influxdata/influxdb1-client/models/inline_strconv_parse.go similarity index 94% rename from vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go rename to vendor/github.com/influxdata/influxdb1-client/models/inline_strconv_parse.go index 8db4837384a..7d171b31332 100644 --- a/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go +++ b/vendor/github.com/influxdata/influxdb1-client/models/inline_strconv_parse.go @@ -1,4 +1,4 @@ -package models // import "github.com/influxdata/influxdb/models" +package models // import "github.com/influxdata/influxdb1-client/models" import ( "reflect" diff --git a/vendor/github.com/influxdata/influxdb/models/points.go b/vendor/github.com/influxdata/influxdb1-client/models/points.go similarity index 91% rename from vendor/github.com/influxdata/influxdb/models/points.go rename to vendor/github.com/influxdata/influxdb1-client/models/points.go index 7cfebd0717c..f51163070d5 100644 --- a/vendor/github.com/influxdata/influxdb/models/points.go +++ b/vendor/github.com/influxdata/influxdb1-client/models/points.go @@ -1,5 +1,5 @@ // Package models implements basic objects used throughout the TICK stack. -package models // import "github.com/influxdata/influxdb/models" +package models // import "github.com/influxdata/influxdb1-client/models" import ( "bytes" @@ -12,20 +12,27 @@ import ( "strconv" "strings" "time" + "unicode" + "unicode/utf8" - "github.com/influxdata/influxdb/pkg/escape" + "github.com/influxdata/influxdb1-client/pkg/escape" ) +type escapeSet struct { + k [1]byte + esc [2]byte +} + var ( - measurementEscapeCodes = map[byte][]byte{ - ',': []byte(`\,`), - ' ': []byte(`\ `), + measurementEscapeCodes = [...]escapeSet{ + {k: [1]byte{','}, esc: [2]byte{'\\', ','}}, + {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}}, } - tagEscapeCodes = map[byte][]byte{ - ',': []byte(`\,`), - ' ': []byte(`\ `), - '=': []byte(`\=`), + tagEscapeCodes = [...]escapeSet{ + {k: [1]byte{','}, esc: [2]byte{'\\', ','}}, + {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}}, + {k: [1]byte{'='}, esc: [2]byte{'\\', '='}}, } // ErrPointMustHaveAField is returned when operating on a point that does not have any fields. @@ -64,6 +71,9 @@ type Point interface { // Tags returns the tag set for the point. Tags() Tags + // ForEachTag iterates over each tag invoking fn. If fn return false, iteration stops. + ForEachTag(fn func(k, v []byte) bool) + // AddTag adds or replaces a tag value for a point. AddTag(key, value string) @@ -263,36 +273,46 @@ func ParsePointsString(buf string) ([]Point, error) { // NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf. // This can have the unintended effect preventing buf from being garbage collected. func ParseKey(buf []byte) (string, Tags) { - meas, tags := ParseKeyBytes(buf) - return string(meas), tags + name, tags := ParseKeyBytes(buf) + return string(name), tags } func ParseKeyBytes(buf []byte) ([]byte, Tags) { + return ParseKeyBytesWithTags(buf, nil) +} + +func ParseKeyBytesWithTags(buf []byte, tags Tags) ([]byte, Tags) { // Ignore the error because scanMeasurement returns "missing fields" which we ignore // when just parsing a key state, i, _ := scanMeasurement(buf, 0) - var tags Tags + var name []byte if state == tagKeyState { - tags = parseTags(buf) + tags = parseTags(buf, tags) // scanMeasurement returns the location of the comma if there are tags, strip that off - return buf[:i-1], tags + name = buf[:i-1] + } else { + name = buf[:i] } - return buf[:i], tags + return unescapeMeasurement(name), tags } func ParseTags(buf []byte) Tags { - return parseTags(buf) + return parseTags(buf, nil) } -func ParseName(buf []byte) ([]byte, error) { +func ParseName(buf []byte) []byte { // Ignore the error because scanMeasurement returns "missing fields" which we ignore // when just parsing a key state, i, _ := scanMeasurement(buf, 0) + var name []byte if state == tagKeyState { - return buf[:i-1], nil + name = buf[:i-1] + } else { + name = buf[:i] } - return buf[:i], nil + + return unescapeMeasurement(name) } // ParsePointsWithPrecision is similar to ParsePoints, but allows the @@ -315,7 +335,6 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin continue } - // lines which start with '#' are comments start := skipWhitespace(block, 0) // If line is all whitespace, just skip it @@ -323,6 +342,7 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin continue } + // lines which start with '#' are comments if block[start] == '#' { continue } @@ -348,7 +368,7 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin } func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { - // scan the first block which is measurement[,tag1=value1,tag2=value=2...] + // scan the first block which is measurement[,tag1=value1,tag2=value2...] pos, key, err := scanKey(buf, 0) if err != nil { return nil, err @@ -375,7 +395,7 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err } var maxKeyErr error - walkFields(fields, func(k, v []byte) bool { + err = walkFields(fields, func(k, v []byte) bool { if sz := seriesKeySize(key, k); sz > MaxKeyLength { maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength) return false @@ -383,6 +403,10 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err return true }) + if err != nil { + return nil, err + } + if maxKeyErr != nil { return nil, maxKeyErr } @@ -1199,23 +1223,33 @@ func scanFieldValue(buf []byte, i int) (int, []byte) { } func EscapeMeasurement(in []byte) []byte { - for b, esc := range measurementEscapeCodes { - in = bytes.Replace(in, []byte{b}, esc, -1) + for _, c := range measurementEscapeCodes { + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.k[:], c.esc[:], -1) + } } return in } func unescapeMeasurement(in []byte) []byte { - for b, esc := range measurementEscapeCodes { - in = bytes.Replace(in, esc, []byte{b}, -1) + if bytes.IndexByte(in, '\\') == -1 { + return in + } + + for i := range measurementEscapeCodes { + c := &measurementEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.esc[:], c.k[:], -1) + } } return in } func escapeTag(in []byte) []byte { - for b, esc := range tagEscapeCodes { - if bytes.IndexByte(in, b) != -1 { - in = bytes.Replace(in, []byte{b}, esc, -1) + for i := range tagEscapeCodes { + c := &tagEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.k[:], c.esc[:], -1) } } return in @@ -1226,9 +1260,10 @@ func unescapeTag(in []byte) []byte { return in } - for b, esc := range tagEscapeCodes { - if bytes.IndexByte(in, b) != -1 { - in = bytes.Replace(in, esc, []byte{b}, -1) + for i := range tagEscapeCodes { + c := &tagEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.esc[:], c.k[:], -1) } } return in @@ -1280,7 +1315,8 @@ func unescapeStringField(in string) string { } // NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If -// an unsupported field value (NaN) or out of range time is passed, this function returns an error. +// an unsupported field value (NaN, or +/-Inf) or out of range time is passed, this function +// returns an error. func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) { key, err := pointKey(name, tags, fields, t) if err != nil { @@ -1311,11 +1347,17 @@ func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte switch value := value.(type) { case float64: // Ensure the caller validates and handles invalid field values + if math.IsInf(value, 0) { + return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key) + } if math.IsNaN(value) { return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) } case float32: // Ensure the caller validates and handles invalid field values + if math.IsInf(float64(value), 0) { + return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key) + } if math.IsNaN(float64(value)) { return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) } @@ -1441,10 +1483,14 @@ func (p *point) Tags() Tags { if p.cachedTags != nil { return p.cachedTags } - p.cachedTags = parseTags(p.key) + p.cachedTags = parseTags(p.key, nil) return p.cachedTags } +func (p *point) ForEachTag(fn func(k, v []byte) bool) { + walkTags(p.key, fn) +} + func (p *point) HasTag(tag []byte) bool { if len(p.key) == 0 { return false @@ -1504,11 +1550,14 @@ func walkTags(buf []byte, fn func(key, value []byte) bool) { // walkFields walks each field key and value via fn. If fn returns false, the iteration // is stopped. The values are the raw byte slices and not the converted types. -func walkFields(buf []byte, fn func(key, value []byte) bool) { +func walkFields(buf []byte, fn func(key, value []byte) bool) error { var i int var key, val []byte for len(buf) > 0 { i, key = scanTo(buf, 0, '=') + if i > len(buf)-2 { + return fmt.Errorf("invalid value: field-key=%s", key) + } buf = buf[i+1:] i, val = scanFieldValue(buf, 0) buf = buf[i:] @@ -1521,29 +1570,52 @@ func walkFields(buf []byte, fn func(key, value []byte) bool) { buf = buf[1:] } } + return nil } -func parseTags(buf []byte) Tags { +// parseTags parses buf into the provided destination tags, returning destination +// Tags, which may have a different length and capacity. +func parseTags(buf []byte, dst Tags) Tags { if len(buf) == 0 { return nil } - tags := make(Tags, bytes.Count(buf, []byte(","))) - p := 0 + n := bytes.Count(buf, []byte(",")) + if cap(dst) < n { + dst = make(Tags, n) + } else { + dst = dst[:n] + } + + // Ensure existing behaviour when point has no tags and nil slice passed in. + if dst == nil { + dst = Tags{} + } + + // Series keys can contain escaped commas, therefore the number of commas + // in a series key only gives an estimation of the upper bound on the number + // of tags. + var i int walkTags(buf, func(key, value []byte) bool { - tags[p].Key = key - tags[p].Value = value - p++ + dst[i].Key, dst[i].Value = key, value + i++ return true }) - return tags + return dst[:i] } // MakeKey creates a key for a set of tags. func MakeKey(name []byte, tags Tags) []byte { + return AppendMakeKey(nil, name, tags) +} + +// AppendMakeKey appends the key derived from name and tags to dst and returns the extended buffer. +func AppendMakeKey(dst []byte, name []byte, tags Tags) []byte { // unescape the name and then re-escape it to avoid double escaping. // The key should always be stored in escaped form. - return append(EscapeMeasurement(unescapeMeasurement(name)), tags.HashKey()...) + dst = append(dst, EscapeMeasurement(unescapeMeasurement(name))...) + dst = tags.AppendHashKey(dst) + return dst } // SetTags replaces the tags for the point. @@ -1868,28 +1940,80 @@ func NewTags(m map[string]string) Tags { return a } -// Keys returns the list of keys for a tag set. -func (a Tags) Keys() []string { - if len(a) == 0 { - return nil - } - keys := make([]string, len(a)) - for i, tag := range a { - keys[i] = string(tag.Key) +// HashKey hashes all of a tag's keys. +func (a Tags) HashKey() []byte { + return a.AppendHashKey(nil) +} + +func (a Tags) needsEscape() bool { + for i := range a { + t := &a[i] + for j := range tagEscapeCodes { + c := &tagEscapeCodes[j] + if bytes.IndexByte(t.Key, c.k[0]) != -1 || bytes.IndexByte(t.Value, c.k[0]) != -1 { + return true + } + } } - return keys + return false } -// Values returns the list of values for a tag set. -func (a Tags) Values() []string { +// AppendHashKey appends the result of hashing all of a tag's keys and values to dst and returns the extended buffer. +func (a Tags) AppendHashKey(dst []byte) []byte { + // Empty maps marshal to empty bytes. if len(a) == 0 { - return nil + return dst + } + + // Type invariant: Tags are sorted + + sz := 0 + var escaped Tags + if a.needsEscape() { + var tmp [20]Tag + if len(a) < len(tmp) { + escaped = tmp[:len(a)] + } else { + escaped = make(Tags, len(a)) + } + + for i := range a { + t := &a[i] + nt := &escaped[i] + nt.Key = escapeTag(t.Key) + nt.Value = escapeTag(t.Value) + sz += len(nt.Key) + len(nt.Value) + } + } else { + sz = a.Size() + escaped = a } - values := make([]string, len(a)) - for i, tag := range a { - values[i] = string(tag.Value) + + sz += len(escaped) + (len(escaped) * 2) // separators + + // Generate marshaled bytes. + if cap(dst)-len(dst) < sz { + nd := make([]byte, len(dst), len(dst)+sz) + copy(nd, dst) + dst = nd } - return values + buf := dst[len(dst) : len(dst)+sz] + idx := 0 + for i := range escaped { + k := &escaped[i] + if len(k.Value) == 0 { + continue + } + buf[idx] = ',' + idx++ + copy(buf[idx:], k.Key) + idx += len(k.Key) + buf[idx] = '=' + idx++ + copy(buf[idx:], k.Value) + idx += len(k.Value) + } + return dst[:len(dst)+idx] } // String returns the string representation of the tags. @@ -1911,8 +2035,8 @@ func (a Tags) String() string { // for data structures or delimiters for example. func (a Tags) Size() int { var total int - for _, t := range a { - total += t.Size() + for i := range a { + total += a[i].Size() } return total } @@ -2008,18 +2132,6 @@ func (a *Tags) SetString(key, value string) { a.Set([]byte(key), []byte(value)) } -// Delete removes a tag by key. -func (a *Tags) Delete(key []byte) { - for i, t := range *a { - if bytes.Equal(t.Key, key) { - copy((*a)[i:], (*a)[i+1:]) - (*a)[len(*a)-1] = Tag{} - *a = (*a)[:len(*a)-1] - return - } - } -} - // Map returns a map representation of the tags. func (a Tags) Map() map[string]string { m := make(map[string]string, len(a)) @@ -2029,60 +2141,6 @@ func (a Tags) Map() map[string]string { return m } -// Merge merges the tags combining the two. If both define a tag with the -// same key, the merged value overwrites the old value. -// A new map is returned. -func (a Tags) Merge(other map[string]string) Tags { - merged := make(map[string]string, len(a)+len(other)) - for _, t := range a { - merged[string(t.Key)] = string(t.Value) - } - for k, v := range other { - merged[k] = v - } - return NewTags(merged) -} - -// HashKey hashes all of a tag's keys. -func (a Tags) HashKey() []byte { - // Empty maps marshal to empty bytes. - if len(a) == 0 { - return nil - } - - // Type invariant: Tags are sorted - - escaped := make(Tags, 0, len(a)) - sz := 0 - for _, t := range a { - ek := escapeTag(t.Key) - ev := escapeTag(t.Value) - - if len(ev) > 0 { - escaped = append(escaped, Tag{Key: ek, Value: ev}) - sz += len(ek) + len(ev) - } - } - - sz += len(escaped) + (len(escaped) * 2) // separators - - // Generate marshaled bytes. - b := make([]byte, sz) - buf := b - idx := 0 - for _, k := range escaped { - buf[idx] = ',' - idx++ - copy(buf[idx:idx+len(k.Key)], k.Key) - idx += len(k.Key) - buf[idx] = '=' - idx++ - copy(buf[idx:idx+len(k.Value)], k.Value) - idx += len(k.Value) - } - return b[:idx] -} - // CopyTags returns a shallow copy of tags. func CopyTags(a Tags) Tags { other := make(Tags, len(a)) @@ -2326,3 +2384,30 @@ func appendField(b []byte, k string, v interface{}) []byte { return b } + +// ValidKeyToken returns true if the token used for measurement, tag key, or tag +// value is a valid unicode string and only contains printable, non-replacement characters. +func ValidKeyToken(s string) bool { + if !utf8.ValidString(s) { + return false + } + for _, r := range s { + if !unicode.IsPrint(r) || r == unicode.ReplacementChar { + return false + } + } + return true +} + +// ValidKeyTokens returns true if the measurement name and all tags are valid. +func ValidKeyTokens(name string, tags Tags) bool { + if !ValidKeyToken(name) { + return false + } + for _, tag := range tags { + if !ValidKeyToken(string(tag.Key)) || !ValidKeyToken(string(tag.Value)) { + return false + } + } + return true +} diff --git a/vendor/github.com/influxdata/influxdb/models/rows.go b/vendor/github.com/influxdata/influxdb1-client/models/rows.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/models/rows.go rename to vendor/github.com/influxdata/influxdb1-client/models/rows.go diff --git a/vendor/github.com/influxdata/influxdb/models/statistic.go b/vendor/github.com/influxdata/influxdb1-client/models/statistic.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/models/statistic.go rename to vendor/github.com/influxdata/influxdb1-client/models/statistic.go diff --git a/vendor/github.com/influxdata/influxdb/models/time.go b/vendor/github.com/influxdata/influxdb1-client/models/time.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/models/time.go rename to vendor/github.com/influxdata/influxdb1-client/models/time.go diff --git a/vendor/github.com/influxdata/influxdb/models/uint_support.go b/vendor/github.com/influxdata/influxdb1-client/models/uint_support.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/models/uint_support.go rename to vendor/github.com/influxdata/influxdb1-client/models/uint_support.go diff --git a/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go b/vendor/github.com/influxdata/influxdb1-client/pkg/escape/bytes.go similarity index 96% rename from vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go rename to vendor/github.com/influxdata/influxdb1-client/pkg/escape/bytes.go index f3b31f42d36..39a33f6742c 100644 --- a/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go +++ b/vendor/github.com/influxdata/influxdb1-client/pkg/escape/bytes.go @@ -1,6 +1,6 @@ // Package escape contains utilities for escaping parts of InfluxQL // and InfluxDB line protocol. -package escape // import "github.com/influxdata/influxdb/pkg/escape" +package escape // import "github.com/influxdata/influxdb1-client/pkg/escape" import ( "bytes" diff --git a/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go b/vendor/github.com/influxdata/influxdb1-client/pkg/escape/strings.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/pkg/escape/strings.go rename to vendor/github.com/influxdata/influxdb1-client/pkg/escape/strings.go diff --git a/vendor/github.com/influxdata/influxdb/client/v2/client.go b/vendor/github.com/influxdata/influxdb1-client/v2/client.go similarity index 83% rename from vendor/github.com/influxdata/influxdb/client/v2/client.go rename to vendor/github.com/influxdata/influxdb1-client/v2/client.go index 6c2c56a114a..0cf7b5f21a2 100644 --- a/vendor/github.com/influxdata/influxdb/client/v2/client.go +++ b/vendor/github.com/influxdata/influxdb1-client/v2/client.go @@ -1,5 +1,5 @@ // Package client (v2) is the current official Go client for InfluxDB. -package client // import "github.com/influxdata/influxdb/client/v2" +package client // import "github.com/influxdata/influxdb1-client/v2" import ( "bytes" @@ -17,7 +17,7 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb1-client/models" ) // HTTPConfig is the config data needed to create an HTTP Client. @@ -45,6 +45,9 @@ type HTTPConfig struct { // TLSConfig allows the user to set their own TLS config for the HTTP // Client. If set, this option overrides InsecureSkipVerify. TLSConfig *tls.Config + + // Proxy configures the Proxy function on the HTTP client. + Proxy func(req *http.Request) (*url.URL, error) } // BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct. @@ -75,6 +78,10 @@ type Client interface { // the UDP client. Query(q Query) (*Response, error) + // QueryAsChunk makes an InfluxDB Query on the database. This will fail if using + // the UDP client. + QueryAsChunk(q Query) (*ChunkedResponse, error) + // Close releases any resources a Client may be using. Close() error } @@ -99,6 +106,7 @@ func NewHTTPClient(conf HTTPConfig) (Client, error) { TLSClientConfig: &tls.Config{ InsecureSkipVerify: conf.InsecureSkipVerify, }, + Proxy: conf.Proxy, } if conf.TLSConfig != nil { tr.TLSClientConfig = conf.TLSConfig @@ -153,7 +161,7 @@ func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) { } if resp.StatusCode != http.StatusNoContent { - var err = fmt.Errorf(string(body)) + var err = errors.New(string(body)) return 0, "", err } @@ -359,6 +367,9 @@ func (c *client) Write(bp BatchPoints) error { var b bytes.Buffer for _, p := range bp.Points() { + if p == nil { + continue + } if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil { return err } @@ -400,7 +411,7 @@ func (c *client) Write(bp BatchPoints) error { } if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { - var err = fmt.Errorf(string(body)) + var err = errors.New(string(body)) return err } @@ -409,12 +420,13 @@ func (c *client) Write(bp BatchPoints) error { // Query defines a query to send to the server. type Query struct { - Command string - Database string - Precision string - Chunked bool - ChunkSize int - Parameters map[string]interface{} + Command string + Database string + RetentionPolicy string + Precision string + Chunked bool + ChunkSize int + Parameters map[string]interface{} } // NewQuery returns a query object. @@ -428,6 +440,19 @@ func NewQuery(command, database, precision string) Query { } } +// NewQueryWithRP returns a query object. +// The database, retention policy, and precision arguments can be empty strings if they are not needed +// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater. +func NewQueryWithRP(command, database, retentionPolicy, precision string) Query { + return Query{ + Command: command, + Database: database, + RetentionPolicy: retentionPolicy, + Precision: precision, + Parameters: make(map[string]interface{}), + } +} + // NewQueryWithParameters returns a query object. // The database and precision arguments can be empty strings if they are not needed for the query. // parameters is a map of the parameter names used in the command to their values. @@ -450,11 +475,11 @@ type Response struct { // It returns nil if no errors occurred on any statements. func (r *Response) Error() error { if r.Err != "" { - return fmt.Errorf(r.Err) + return errors.New(r.Err) } for _, result := range r.Results { if result.Err != "" { - return fmt.Errorf(result.Err) + return errors.New(result.Err) } } return nil @@ -475,72 +500,26 @@ type Result struct { // Query sends a command to the server and returns the Response. func (c *client) Query(q Query) (*Response, error) { - u := c.url - u.Path = path.Join(u.Path, "query") - - jsonParameters, err := json.Marshal(q.Parameters) - - if err != nil { - return nil, err - } - - req, err := http.NewRequest("POST", u.String(), nil) + req, err := c.createDefaultRequest(q) if err != nil { return nil, err } - - req.Header.Set("Content-Type", "") - req.Header.Set("User-Agent", c.useragent) - - if c.username != "" { - req.SetBasicAuth(c.username, c.password) - } - params := req.URL.Query() - params.Set("q", q.Command) - params.Set("db", q.Database) - params.Set("params", string(jsonParameters)) if q.Chunked { params.Set("chunked", "true") if q.ChunkSize > 0 { params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) } + req.URL.RawQuery = params.Encode() } - - if q.Precision != "" { - params.Set("epoch", q.Precision) - } - req.URL.RawQuery = params.Encode() - resp, err := c.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() - // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb - // but instead some other service. If the error code is also a 500+ code, then some - // downstream loadbalancer/proxy/etc had an issue and we should report that. - if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError { - body, err := ioutil.ReadAll(resp.Body) - if err != nil || len(body) == 0 { - return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode) - } - - return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body) - } - - // If we get an unexpected content type, then it is also not from influx direct and therefore - // we want to know what we received and what status code was returned for debugging purposes. - if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" { - // Read up to 1kb of the body to help identify downstream errors and limit the impact of things - // like downstream serving a large file - body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) - if err != nil || len(body) == 0 { - return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) - } - - return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) + if err := checkResponse(resp); err != nil { + return nil, err } var response Response @@ -549,6 +528,9 @@ func (c *client) Query(q Query) (*Response, error) { for { r, err := cr.NextResponse() if err != nil { + if err == io.EOF { + break + } // If we got an error while decoding the response, send that back. return nil, err } @@ -586,10 +568,99 @@ func (c *client) Query(q Query) (*Response, error) { return &response, nil } +// QueryAsChunk sends a command to the server and returns the Response. +func (c *client) QueryAsChunk(q Query) (*ChunkedResponse, error) { + req, err := c.createDefaultRequest(q) + if err != nil { + return nil, err + } + params := req.URL.Query() + params.Set("chunked", "true") + if q.ChunkSize > 0 { + params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) + } + req.URL.RawQuery = params.Encode() + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + + if err := checkResponse(resp); err != nil { + return nil, err + } + return NewChunkedResponse(resp.Body), nil +} + +func checkResponse(resp *http.Response) error { + // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb + // but instead some other service. If the error code is also a 500+ code, then some + // downstream loadbalancer/proxy/etc had an issue and we should report that. + if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError { + body, err := ioutil.ReadAll(resp.Body) + if err != nil || len(body) == 0 { + return fmt.Errorf("received status code %d from downstream server", resp.StatusCode) + } + + return fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body) + } + + // If we get an unexpected content type, then it is also not from influx direct and therefore + // we want to know what we received and what status code was returned for debugging purposes. + if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" { + // Read up to 1kb of the body to help identify downstream errors and limit the impact of things + // like downstream serving a large file + body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) + if err != nil || len(body) == 0 { + return fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) + } + + return fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) + } + return nil +} + +func (c *client) createDefaultRequest(q Query) (*http.Request, error) { + u := c.url + u.Path = path.Join(u.Path, "query") + + jsonParameters, err := json.Marshal(q.Parameters) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("q", q.Command) + params.Set("db", q.Database) + if q.RetentionPolicy != "" { + params.Set("rp", q.RetentionPolicy) + } + params.Set("params", string(jsonParameters)) + + if q.Precision != "" { + params.Set("epoch", q.Precision) + } + req.URL.RawQuery = params.Encode() + + return req, nil + +} + // duplexReader reads responses and writes it to another writer while // satisfying the reader interface. type duplexReader struct { - r io.Reader + r io.ReadCloser w io.Writer } @@ -601,6 +672,11 @@ func (r *duplexReader) Read(p []byte) (n int, err error) { return n, err } +// Close closes the response. +func (r *duplexReader) Close() error { + return r.r.Close() +} + // ChunkedResponse represents a response from the server that // uses chunking to stream the output. type ChunkedResponse struct { @@ -611,8 +687,12 @@ type ChunkedResponse struct { // NewChunkedResponse reads a stream and produces responses from the stream. func NewChunkedResponse(r io.Reader) *ChunkedResponse { + rc, ok := r.(io.ReadCloser) + if !ok { + rc = ioutil.NopCloser(r) + } resp := &ChunkedResponse{} - resp.duplex = &duplexReader{r: r, w: &resp.buf} + resp.duplex = &duplexReader{r: rc, w: &resp.buf} resp.dec = json.NewDecoder(resp.duplex) resp.dec.UseNumber() return resp @@ -621,10 +701,9 @@ func NewChunkedResponse(r io.Reader) *ChunkedResponse { // NextResponse reads the next line of the stream and returns a response. func (r *ChunkedResponse) NextResponse() (*Response, error) { var response Response - if err := r.dec.Decode(&response); err != nil { if err == io.EOF { - return nil, nil + return nil, err } // A decoding error happened. This probably means the server crashed // and sent a last-ditch error message to us. Ensure we have read the @@ -636,3 +715,8 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) { r.buf.Reset() return &response, nil } + +// Close closes the response. +func (r *ChunkedResponse) Close() error { + return r.duplex.Close() +} diff --git a/vendor/github.com/influxdata/influxdb/client/v2/udp.go b/vendor/github.com/influxdata/influxdb1-client/v2/udp.go similarity index 94% rename from vendor/github.com/influxdata/influxdb/client/v2/udp.go rename to vendor/github.com/influxdata/influxdb1-client/v2/udp.go index 779a28b33f3..9867868b41c 100644 --- a/vendor/github.com/influxdata/influxdb/client/v2/udp.go +++ b/vendor/github.com/influxdata/influxdb1-client/v2/udp.go @@ -107,6 +107,10 @@ func (uc *udpclient) Query(q Query) (*Response, error) { return nil, fmt.Errorf("Querying via UDP is not supported") } +func (uc *udpclient) QueryAsChunk(q Query) (*ChunkedResponse, error) { + return nil, fmt.Errorf("Querying via UDP is not supported") +} + func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) { return 0, "", nil }