From 48f79ce9ffda82e1f9e6021f2ca05d0a8ad7d65c Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Thu, 6 Sep 2018 14:03:00 -0600 Subject: [PATCH 01/11] Add output plugin for influx2.0 --- plugins/outputs/all/all.go | 1 + plugins/outputs/influxdb_v2/http.go | 258 ++++++++++++++++++++++++ plugins/outputs/influxdb_v2/influxdb.go | 194 ++++++++++++++++++ 3 files changed, 453 insertions(+) create mode 100644 plugins/outputs/influxdb_v2/http.go create mode 100644 plugins/outputs/influxdb_v2/influxdb.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 037807c22575c..fc58ec0ccc363 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -14,6 +14,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/graylog" _ "github.com/influxdata/telegraf/plugins/outputs/http" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" + _ "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" _ "github.com/influxdata/telegraf/plugins/outputs/instrumental" _ "github.com/influxdata/telegraf/plugins/outputs/kafka" _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go new file mode 100644 index 0000000000000..5fa6df3284a63 --- /dev/null +++ b/plugins/outputs/influxdb_v2/http.go @@ -0,0 +1,258 @@ +package influxdb_v2 + +import ( + "compress/gzip" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "path" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers/influx" +) + +type APIErrorType int + +type APIError struct { + StatusCode int + Title string + Description string + Type APIErrorType +} + +func (e APIError) Error() string { + if e.Description != "" { + return fmt.Sprintf("%s: %s", e.Title, e.Description) + } + return e.Title +} + +const ( + defaultRequestTimeout = time.Second * 5 + defaultDatabase = "telegraf" + defaultUserAgent = "telegraf" +) + +// WriteResponse is the response body from the /write endpoint +type WriteResponse struct { + Err string `json:"error,omitempty"` +} + +func (r WriteResponse) Error() string { + return r.Err +} + +type HTTPConfig struct { + URL *url.URL + Token string + Organization string + Bucket string + Timeout time.Duration + Headers map[string]string + Proxy *url.URL + UserAgent string + ContentEncoding string + TLSConfig *tls.Config + + Serializer *influx.Serializer +} + +type httpClient struct { + WriteURL string + ContentEncoding string + Timeout time.Duration + Headers map[string]string + + client *http.Client + serializer *influx.Serializer + url *url.URL +} + +func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { + if config.URL == nil { + return nil, ErrMissingURL + } + + timeout := config.Timeout + if timeout == 0 { + timeout = defaultRequestTimeout + } + + userAgent := config.UserAgent + if userAgent == "" { + userAgent = defaultUserAgent + } + + var headers = make(map[string]string, len(config.Headers)+2) + headers["User-Agent"] = userAgent + headers["Authorization"] = "Token " + config.Token + for k, v := range config.Headers { + headers[k] = v + } + + var proxy func(*http.Request) (*url.URL, error) + if config.Proxy != nil { + proxy = http.ProxyURL(config.Proxy) + } else { + proxy = http.ProxyFromEnvironment + } + + serializer := config.Serializer + if serializer == nil { + serializer = influx.NewSerializer() + } + + writeURL, err := makeWriteURL( + *config.URL, + config.Organization, + config.Bucket) + if err != nil { + return nil, err + } + + var transport *http.Transport + switch config.URL.Scheme { + case "http", "https": + transport = &http.Transport{ + Proxy: proxy, + TLSClientConfig: config.TLSConfig, + } + case "unix": + transport = &http.Transport{ + Dial: func(_, _ string) (net.Conn, error) { + return net.DialTimeout( + config.URL.Scheme, + config.URL.Path, + defaultRequestTimeout, + ) + }, + } + default: + return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme) + } + + client := &httpClient{ + serializer: serializer, + client: &http.Client{ + Timeout: timeout, + Transport: transport, + }, + url: config.URL, + WriteURL: writeURL, + ContentEncoding: config.ContentEncoding, + Timeout: timeout, + Headers: headers, + } + return client, nil +} + +// URL returns the origin URL that this client connects too. +func (c *httpClient) URL() string { + return c.url.String() +} + +func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { + var err error + + reader := influx.NewReader(metrics, c.serializer) + req, err := c.makeWriteRequest(reader) + if err != nil { + return err + } + + resp, err := c.client.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNoContent { + return nil + } + + writeResp := &WriteResponse{} + dec := json.NewDecoder(resp.Body) + + var desc string + err = dec.Decode(writeResp) + if err == nil { + desc = writeResp.Err + } else { + desc = resp.Header.Get("X-Influx-Error") + } + + return &APIError{ + StatusCode: resp.StatusCode, + Title: resp.Status, + Description: desc, + } +} + +func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { + var err error + if c.ContentEncoding == "gzip" { + body, err = compressWithGzip(body) + if err != nil { + return nil, err + } + } + + req, err := http.NewRequest("POST", c.WriteURL, body) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "text/plain; charset=utf-8") + c.addHeaders(req) + + if c.ContentEncoding == "gzip" { + req.Header.Set("Content-Encoding", "gzip") + } + + return req, nil +} + +func (c *httpClient) addHeaders(req *http.Request) { + for header, value := range c.Headers { + req.Header.Set(header, value) + } +} + +func compressWithGzip(data io.Reader) (io.Reader, error) { + pipeReader, pipeWriter := io.Pipe() + gzipWriter := gzip.NewWriter(pipeWriter) + var err error + + go func() { + _, err = io.Copy(gzipWriter, data) + gzipWriter.Close() + pipeWriter.Close() + }() + + return pipeReader, err +} + +func makeWriteURL(loc url.URL, org, bucket string) (string, error) { + params := url.Values{} + params.Set("bucket", bucket) + params.Set("org", org) + + switch loc.Scheme { + case "unix": + loc.Scheme = "http" + loc.Host = "127.0.0.1" + loc.Path = "v2/write" + case "http", "https": + loc.Path = path.Join(loc.Path, "v2/write") + default: + return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) + } + loc.RawQuery = params.Encode() + return loc.String(), nil +} diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go new file mode 100644 index 0000000000000..20eeb7d9895f3 --- /dev/null +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -0,0 +1,194 @@ +package influxdb_v2 + +import ( + "context" + "errors" + "fmt" + "log" + "math/rand" + "net/url" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers/influx" +) + +var ( + defaultURL = "http://localhost:8086" + + ErrMissingURL = errors.New("missing URL") +) + +var sampleConfig = ` + ## The URLs of the InfluxDB cluster nodes. + ## + ## Multiple URLs can be specified for a single cluster, only ONE of the + ## urls will be written to each interval. + urls = ["http://127.0.0.1:8086"] + + ## Token for authentication. + token = "" + + ## Organization is the name of the organization you wish to write to. + organization = "" + + ## Bucket to the name fo the bucketwrite into; must exist. + bucket = "" + + ## Timeout for HTTP messages. + # timeout = "5s" + + ## Additional HTTP headers + # http_headers = {"X-Special-Header" = "Special-Value"} + + ## HTTP Proxy override, if unset values the standard proxy environment + ## variables are consulted to determine which proxy, if any, should be used. + # http_proxy = "http://corporate.proxy:3128" + + ## HTTP User-Agent + # user_agent = "telegraf" + + ## Content-Encoding for write request body, can be set to "gzip" to + ## compress body or "identity" to apply no encoding. + # content_encoding = "identity" + + ## Optional TLS Config for use on HTTP connections. + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +type Client interface { + Write(context.Context, []telegraf.Metric) error + + URL() string // for logging +} + +type InfluxDB struct { + URLs []string `toml:"urls"` + Token string `toml:"token"` + Organization string `toml:"organization"` + Bucket string `toml:"bucket"` + Timeout internal.Duration `toml:"timeout"` + HTTPHeaders map[string]string `toml:"http_headers"` + HTTPProxy string `toml:"http_proxy"` + UserAgent string `toml:"user_agent"` + ContentEncoding string `toml:"content_encoding"` + tls.ClientConfig + + clients []Client + serializer *influx.Serializer +} + +func (i *InfluxDB) Connect() error { + ctx := context.Background() + + if len(i.URLs) == 0 { + i.URLs = append(i.URLs, defaultURL) + } + + i.serializer = influx.NewSerializer() + i.serializer.SetFieldTypeSupport(influx.UintSupport) + + for _, u := range i.URLs { + parts, err := url.Parse(u) + if err != nil { + return fmt.Errorf("error parsing url [%q]: %v", u, err) + } + + var proxy *url.URL + if len(i.HTTPProxy) > 0 { + proxy, err = url.Parse(i.HTTPProxy) + if err != nil { + return fmt.Errorf("error parsing proxy_url [%s]: %v", i.HTTPProxy, err) + } + } + + switch parts.Scheme { + case "http", "https", "unix": + c, err := i.getHTTPClient(ctx, parts, proxy) + if err != nil { + return err + } + + i.clients = append(i.clients, c) + default: + return fmt.Errorf("unsupported scheme [%q]: %q", u, parts.Scheme) + } + } + + return nil +} + +func (i *InfluxDB) Close() error { + return nil +} + +func (i *InfluxDB) Description() string { + return "Configuration for sending metrics to InfluxDB" +} + +func (i *InfluxDB) SampleConfig() string { + return sampleConfig +} + +// Write sends metrics to one of the configured servers, logging each +// unsuccessful. If all servers fail, return an error. +func (i *InfluxDB) Write(metrics []telegraf.Metric) error { + ctx := context.Background() + + var err error + p := rand.Perm(len(i.clients)) + for _, n := range p { + client := i.clients[n] + err = client.Write(ctx, metrics) + if err == nil { + return nil + } + + log.Printf("E! [outputs.influxdb] when writing to [%s]: %v", client.URL(), err) + } + + return errors.New("could not write any address") +} + +func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.URL) (Client, error) { + tlsConfig, err := i.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + config := &HTTPConfig{ + URL: url, + Token: i.Token, + Organization: i.Organization, + Bucket: i.Bucket, + Timeout: i.Timeout.Duration, + Headers: i.HTTPHeaders, + Proxy: proxy, + UserAgent: i.UserAgent, + ContentEncoding: i.ContentEncoding, + TLSConfig: tlsConfig, + Serializer: i.serializer, + } + + c, err := NewHTTPClient(config) + if err != nil { + return nil, fmt.Errorf("error creating HTTP client [%s]: %v", url, err) + } + + return c, nil +} + +func init() { + outputs.Add("influxdb_v2", func() telegraf.Output { + return &InfluxDB{ + Timeout: internal.Duration{Duration: time.Second * 5}, + } + }) +} From ae8498be7093b394e803b9eaf5bb07bae6ef2b83 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Thu, 6 Sep 2018 16:43:17 -0600 Subject: [PATCH 02/11] Add tests --- plugins/outputs/influxdb_v2/http.go | 21 +--- .../outputs/influxdb_v2/http_internal_test.go | 59 ++++++++++ plugins/outputs/influxdb_v2/http_test.go | 49 +++++++++ plugins/outputs/influxdb_v2/influxdb.go | 4 +- plugins/outputs/influxdb_v2/influxdb_test.go | 103 ++++++++++++++++++ 5 files changed, 214 insertions(+), 22 deletions(-) create mode 100644 plugins/outputs/influxdb_v2/http_internal_test.go create mode 100644 plugins/outputs/influxdb_v2/http_test.go create mode 100644 plugins/outputs/influxdb_v2/influxdb_test.go diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 5fa6df3284a63..38762bcd9864a 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -4,7 +4,6 @@ import ( "compress/gzip" "context" "crypto/tls" - "encoding/json" "fmt" "io" "net" @@ -39,15 +38,6 @@ const ( defaultUserAgent = "telegraf" ) -// WriteResponse is the response body from the /write endpoint -type WriteResponse struct { - Err string `json:"error,omitempty"` -} - -func (r WriteResponse) Error() string { - return r.Err -} - type HTTPConfig struct { URL *url.URL Token string @@ -176,16 +166,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error return nil } - writeResp := &WriteResponse{} - dec := json.NewDecoder(resp.Body) - - var desc string - err = dec.Decode(writeResp) - if err == nil { - desc = writeResp.Err - } else { - desc = resp.Header.Get("X-Influx-Error") - } + desc := resp.Header.Get("X-Influx-Error") return &APIError{ StatusCode: resp.StatusCode, diff --git a/plugins/outputs/influxdb_v2/http_internal_test.go b/plugins/outputs/influxdb_v2/http_internal_test.go new file mode 100644 index 0000000000000..5df51fc85f1e7 --- /dev/null +++ b/plugins/outputs/influxdb_v2/http_internal_test.go @@ -0,0 +1,59 @@ +package influxdb_v2 + +import ( + "io" + "net/url" + "testing" + + "github.com/stretchr/testify/require" +) + +func genURL(u string) *url.URL { + URL, _ := url.Parse(u) + return URL +} + +func TestMakeWriteURL(t *testing.T) { + tests := []struct { + err bool + url *url.URL + act string + }{ + { + url: genURL("http://localhost:9999"), + act: "http://localhost:9999/v2/write?bucket=telegraf&org=influx", + }, + { + url: genURL("unix://var/run/influxd.sock"), + act: "http://127.0.0.1/v2/write?bucket=telegraf&org=influx", + }, + { + err: true, + url: genURL("udp://localhost:9999"), + }, + } + + for i := range tests { + rURL, err := makeWriteURL(*tests[i].url, "influx", "telegraf") + if !tests[i].err { + require.NoError(t, err) + } else { + require.Error(t, err) + t.Log(err) + } + if err == nil { + require.Equal(t, tests[i].act, rURL) + } + } +} + +func TestMakeWriteRequest(t *testing.T) { + reader, _ := io.Pipe() + cli := httpClient{ + WriteURL: "http://localhost:9999/v2/write?bucket=telegraf&org=influx", + ContentEncoding: "gzip", + Headers: map[string]string{"x": "y"}, + } + _, err := cli.makeWriteRequest(reader) + require.NoError(t, err) +} diff --git a/plugins/outputs/influxdb_v2/http_test.go b/plugins/outputs/influxdb_v2/http_test.go new file mode 100644 index 0000000000000..33ff9e24b90e3 --- /dev/null +++ b/plugins/outputs/influxdb_v2/http_test.go @@ -0,0 +1,49 @@ +package influxdb_v2_test + +import ( + "net/url" + "testing" + + influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" + "github.com/stretchr/testify/require" +) + +func genURL(u string) *url.URL { + URL, _ := url.Parse(u) + return URL +} +func TestNewHTTPClient(t *testing.T) { + tests := []struct { + err bool + cfg *influxdb.HTTPConfig + }{ + { + err: true, + cfg: &influxdb.HTTPConfig{}, + }, + { + err: true, + cfg: &influxdb.HTTPConfig{ + URL: genURL("udp://localhost:9999"), + }, + }, + { + cfg: &influxdb.HTTPConfig{ + URL: genURL("unix://var/run/influxd.sock"), + }, + }, + } + + for i := range tests { + client, err := influxdb.NewHTTPClient(tests[i].cfg) + if !tests[i].err { + require.NoError(t, err) + } else { + require.Error(t, err) + t.Log(err) + } + if err == nil { + client.URL() + } + } +} diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go index 20eeb7d9895f3..f8ff7d8afbc70 100644 --- a/plugins/outputs/influxdb_v2/influxdb.go +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -17,7 +17,7 @@ import ( ) var ( - defaultURL = "http://localhost:8086" + defaultURL = "http://localhost:9999" ErrMissingURL = errors.New("missing URL") ) @@ -27,7 +27,7 @@ var sampleConfig = ` ## ## Multiple URLs can be specified for a single cluster, only ONE of the ## urls will be written to each interval. - urls = ["http://127.0.0.1:8086"] + urls = ["http://127.0.0.1:9999"] ## Token for authentication. token = "" diff --git a/plugins/outputs/influxdb_v2/influxdb_test.go b/plugins/outputs/influxdb_v2/influxdb_test.go new file mode 100644 index 0000000000000..3702b4309d774 --- /dev/null +++ b/plugins/outputs/influxdb_v2/influxdb_test.go @@ -0,0 +1,103 @@ +package influxdb_v2_test + +import ( + "testing" + + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/outputs" + influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" + "github.com/stretchr/testify/require" +) + +func TestDefaultURL(t *testing.T) { + output := influxdb.InfluxDB{} + err := output.Connect() + require.NoError(t, err) + if len(output.URLs) < 1 { + t.Fatal("Default URL failed to get set") + } + require.Equal(t, "http://localhost:9999", output.URLs[0]) +} +func TestConnect(t *testing.T) { + tests := []struct { + err bool + out influxdb.InfluxDB + }{ + { + out: influxdb.InfluxDB{ + URLs: []string{"http://localhost:1234"}, + HTTPProxy: "http://localhost:9999", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{"!@#$qwert"}, + HTTPProxy: "http://localhost:9999", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{"http://localhost:1234"}, + HTTPProxy: "!@#$%^&*()_+", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{"!@#$%^&*()_+"}, + HTTPProxy: "http://localhost:9999", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{":::@#$qwert"}, + HTTPProxy: "http://localhost:9999", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{"https://localhost:8080"}, + ClientConfig: tls.ClientConfig{ + TLSCA: "thing", + }, + }, + }, + } + + for i := range tests { + err := tests[i].out.Connect() + if !tests[i].err { + require.NoError(t, err) + } else { + require.Error(t, err) + t.Log(err) + } + } +} + +func TestUnused(t *testing.T) { + thing := influxdb.InfluxDB{} + thing.Close() + thing.Description() + thing.SampleConfig() + outputs.Outputs["influxdb_v2"]() +} From 93a47fff31f603287e375fe15849e3c3cdd8a83a Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Thu, 6 Sep 2018 16:48:02 -0600 Subject: [PATCH 03/11] Add Readme --- plugins/outputs/influxdb_v2/README.md | 48 +++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 plugins/outputs/influxdb_v2/README.md diff --git a/plugins/outputs/influxdb_v2/README.md b/plugins/outputs/influxdb_v2/README.md new file mode 100644 index 0000000000000..23693d268b893 --- /dev/null +++ b/plugins/outputs/influxdb_v2/README.md @@ -0,0 +1,48 @@ +# InfluxDB Output Plugin + +This InfluxDB output plugin writes metrics to the [InfluxDB 2.0](https://github.com/influxdata/platform) HTTP service. + +### Configuration: + +```toml +# Configuration for sending metrics to InfluxDB 2.0 +[[outputs.influxdb_v2]] + ## The URLs of the InfluxDB cluster nodes. + ## + ## Multiple URLs can be specified for a single cluster, only ONE of the + ## urls will be written to each interval. + urls = ["http://127.0.0.1:9999"] + + ## Token for authentication. + token = "" + + ## Organization is the name of the organization you wish to write to. + organization = "" + + ## Bucket to the name fo the bucketwrite into; must exist. + bucket = "" + + ## Timeout for HTTP messages. + # timeout = "5s" + + ## Additional HTTP headers + # http_headers = {"X-Special-Header" = "Special-Value"} + + ## HTTP Proxy override, if unset values the standard proxy environment + ## variables are consulted to determine which proxy, if any, should be used. + # http_proxy = "http://corporate.proxy:3128" + + ## HTTP User-Agent + # user_agent = "telegraf" + + ## Content-Encoding for write request body, can be set to "gzip" to + ## compress body or "identity" to apply no encoding. + # content_encoding = "identity" + + ## Optional TLS Config for use on HTTP connections. + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` From 5b4958d1c2e4d733992b3074f6b7db7b78babfd3 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Tue, 11 Sep 2018 12:34:18 -0600 Subject: [PATCH 04/11] Implement remaining spec --- plugins/outputs/influxdb_v2/http.go | 54 +++++++++++++++++++++++-- plugins/outputs/influxdb_v2/influxdb.go | 7 +++- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 38762bcd9864a..2635547343d64 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -4,12 +4,14 @@ import ( "compress/gzip" "context" "crypto/tls" + "encoding/json" "fmt" "io" "net" "net/http" "net/url" "path" + "strconv" "time" "github.com/influxdata/telegraf" @@ -43,6 +45,7 @@ type HTTPConfig struct { Token string Organization string Bucket string + Precision string Timeout time.Duration Headers map[string]string Proxy *url.URL @@ -101,7 +104,8 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { writeURL, err := makeWriteURL( *config.URL, config.Organization, - config.Bucket) + config.Bucket, + config.Precision) if err != nil { return nil, err } @@ -119,7 +123,7 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { return net.DialTimeout( config.URL.Scheme, config.URL.Path, - defaultRequestTimeout, + timeout, ) }, } @@ -147,6 +151,19 @@ func (c *httpClient) URL() string { return c.url.String() } +type genericRespError struct { + Code string + Message string + Op string + Err string + Line int32 + MaxLength int32 +} + +func (g genericRespError) String() string { + return fmt.Sprintf("%s: %s", g.Code, g.Message) +} + func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { var err error @@ -166,7 +183,33 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error return nil } - desc := resp.Header.Get("X-Influx-Error") + writeResp := &genericRespError{} + json.NewDecoder(resp.Body).Decode(writeResp) + var desc string + + switch resp.StatusCode { + case http.StatusBadRequest: // 400 + // LineProtocolError + desc = fmt.Sprintf("%s - %s;%d;%s", writeResp, writeResp.Op, writeResp.Line, writeResp.Err) + case http.StatusUnauthorized, http.StatusForbidden: // 401, 403 + // Error + desc = fmt.Sprintf("%s - %s;%s", writeResp, writeResp.Op, writeResp.Err) + case http.StatusRequestEntityTooLarge: // 413 + // LineProtocolLengthError + desc = fmt.Sprintf("%s - %s;%d", writeResp, writeResp.Op, writeResp.MaxLength) + case http.StatusTooManyRequests, http.StatusServiceUnavailable: // 429, 503 + retryAfter := resp.Header.Get("Retry-After") + retry, err := strconv.Atoi(retryAfter) + if err != nil { + return fmt.Errorf("Bad value for 'Retry-After': %s", err.Error()) + } + time.Sleep(time.Second * time.Duration(retry)) + c.Write(ctx, metrics) + } + + if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" { + desc = fmt.Sprintf("%s - %s", desc, xErr) + } return &APIError{ StatusCode: resp.StatusCode, @@ -219,10 +262,13 @@ func compressWithGzip(data io.Reader) (io.Reader, error) { return pipeReader, err } -func makeWriteURL(loc url.URL, org, bucket string) (string, error) { +func makeWriteURL(loc url.URL, org, bucket, precision string) (string, error) { params := url.Values{} params.Set("bucket", bucket) params.Set("org", org) + if precision != "" { + params.Set("precision", precision) + } switch loc.Scheme { case "unix": diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go index f8ff7d8afbc70..73c4a808c2499 100644 --- a/plugins/outputs/influxdb_v2/influxdb.go +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -32,12 +32,15 @@ var sampleConfig = ` ## Token for authentication. token = "" - ## Organization is the name of the organization you wish to write to. + ## Organization is the name of the organization you wish to write to; must exist. organization = "" ## Bucket to the name fo the bucketwrite into; must exist. bucket = "" + ## Precision for the unix timestamps within the body line-protocol. + # precision = "ns" + ## Timeout for HTTP messages. # timeout = "5s" @@ -74,6 +77,7 @@ type InfluxDB struct { Token string `toml:"token"` Organization string `toml:"organization"` Bucket string `toml:"bucket"` + Precision string `toml:"precision"` Timeout internal.Duration `toml:"timeout"` HTTPHeaders map[string]string `toml:"http_headers"` HTTPProxy string `toml:"http_proxy"` @@ -168,6 +172,7 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U Token: i.Token, Organization: i.Organization, Bucket: i.Bucket, + Precision: i.Precision, Timeout: i.Timeout.Duration, Headers: i.HTTPHeaders, Proxy: proxy, From a68e828fce1733d812648d773fb9e9d3b96a3d14 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 12 Sep 2018 10:19:13 -0600 Subject: [PATCH 05/11] Address feedback Unimplement non-important parts of the spec --- plugins/outputs/influxdb_v2/http.go | 60 +++++++++++++------------ plugins/outputs/influxdb_v2/influxdb.go | 5 --- 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 2635547343d64..e8c406c3543a3 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "log" "net" "net/http" "net/url" @@ -36,6 +37,7 @@ func (e APIError) Error() string { const ( defaultRequestTimeout = time.Second * 5 + defaultMaxWait = 10 // seconds defaultDatabase = "telegraf" defaultUserAgent = "telegraf" ) @@ -45,7 +47,6 @@ type HTTPConfig struct { Token string Organization string Bucket string - Precision string Timeout time.Duration Headers map[string]string Proxy *url.URL @@ -104,8 +105,7 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { writeURL, err := makeWriteURL( *config.URL, config.Organization, - config.Bucket, - config.Precision) + config.Bucket) if err != nil { return nil, err } @@ -154,19 +154,21 @@ func (c *httpClient) URL() string { type genericRespError struct { Code string Message string - Op string - Err string - Line int32 - MaxLength int32 + Line *int32 + MaxLength *int32 } -func (g genericRespError) String() string { - return fmt.Sprintf("%s: %s", g.Code, g.Message) +func (g genericRespError) Error() string { + errString := fmt.Sprintf("%s: %s", g.Code, g.Message) + if g.Line != nil { + return fmt.Sprintf("%s - line[%d]", errString, g.Line) + } else if g.MaxLength != nil { + return fmt.Sprintf("%s - maxlen[%d]", errString, g.MaxLength) + } + return errString } func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { - var err error - reader := influx.NewReader(metrics, c.serializer) req, err := c.makeWriteRequest(reader) if err != nil { @@ -184,31 +186,36 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } writeResp := &genericRespError{} - json.NewDecoder(resp.Body).Decode(writeResp) - var desc string + err = json.NewDecoder(resp.Body).Decode(writeResp) + desc := writeResp.Error() + if err != nil { + desc = err.Error() + } switch resp.StatusCode { - case http.StatusBadRequest: // 400 - // LineProtocolError - desc = fmt.Sprintf("%s - %s;%d;%s", writeResp, writeResp.Op, writeResp.Line, writeResp.Err) - case http.StatusUnauthorized, http.StatusForbidden: // 401, 403 - // Error - desc = fmt.Sprintf("%s - %s;%s", writeResp, writeResp.Op, writeResp.Err) - case http.StatusRequestEntityTooLarge: // 413 - // LineProtocolLengthError - desc = fmt.Sprintf("%s - %s;%d", writeResp, writeResp.Op, writeResp.MaxLength) - case http.StatusTooManyRequests, http.StatusServiceUnavailable: // 429, 503 + case http.StatusBadRequest, http.StatusUnauthorized, + http.StatusForbidden, http.StatusRequestEntityTooLarge: + log.Printf("E! [outputs.influxdb_v2] Failed to write metric: %s\n", desc) + return nil + case http.StatusTooManyRequests, http.StatusServiceUnavailable: retryAfter := resp.Header.Get("Retry-After") retry, err := strconv.Atoi(retryAfter) if err != nil { return fmt.Errorf("Bad value for 'Retry-After': %s", err.Error()) } + if retry > defaultMaxWait { + log.Println("E! [outputs.influxdb_v2] Failed to write metric: retry interval too long") + return nil + } + // TODO: Don't sleep and write (#2919) time.Sleep(time.Second * time.Duration(retry)) c.Write(ctx, metrics) } + // This is only until platform spec is fully implemented. As of the + // time of writing, there is no error body returned. if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" { - desc = fmt.Sprintf("%s - %s", desc, xErr) + desc = fmt.Sprintf("%s; %s", desc, xErr) } return &APIError{ @@ -262,13 +269,10 @@ func compressWithGzip(data io.Reader) (io.Reader, error) { return pipeReader, err } -func makeWriteURL(loc url.URL, org, bucket, precision string) (string, error) { +func makeWriteURL(loc url.URL, org, bucket string) (string, error) { params := url.Values{} params.Set("bucket", bucket) params.Set("org", org) - if precision != "" { - params.Set("precision", precision) - } switch loc.Scheme { case "unix": diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go index 73c4a808c2499..6c73bd6a802d0 100644 --- a/plugins/outputs/influxdb_v2/influxdb.go +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -38,9 +38,6 @@ var sampleConfig = ` ## Bucket to the name fo the bucketwrite into; must exist. bucket = "" - ## Precision for the unix timestamps within the body line-protocol. - # precision = "ns" - ## Timeout for HTTP messages. # timeout = "5s" @@ -77,7 +74,6 @@ type InfluxDB struct { Token string `toml:"token"` Organization string `toml:"organization"` Bucket string `toml:"bucket"` - Precision string `toml:"precision"` Timeout internal.Duration `toml:"timeout"` HTTPHeaders map[string]string `toml:"http_headers"` HTTPProxy string `toml:"http_proxy"` @@ -172,7 +168,6 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U Token: i.Token, Organization: i.Organization, Bucket: i.Bucket, - Precision: i.Precision, Timeout: i.Timeout.Duration, Headers: i.HTTPHeaders, Proxy: proxy, From 295b990bfe4bc89e52776953bd8b53d16863eba7 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 12 Sep 2018 12:06:19 -0600 Subject: [PATCH 06/11] Better retry-time handling --- plugins/outputs/influxdb_v2/http.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index e8c406c3543a3..37f88b1b72115 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "log" @@ -66,6 +67,7 @@ type httpClient struct { client *http.Client serializer *influx.Serializer url *url.URL + retryTime time.Time } func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { @@ -169,6 +171,9 @@ func (g genericRespError) Error() string { } func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { + if c.retryTime.After(time.Now()) { + return errors.New("E! Retry time has not elapsed") + } reader := influx.NewReader(metrics, c.serializer) req, err := c.makeWriteRequest(reader) if err != nil { @@ -207,9 +212,8 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error log.Println("E! [outputs.influxdb_v2] Failed to write metric: retry interval too long") return nil } - // TODO: Don't sleep and write (#2919) - time.Sleep(time.Second * time.Duration(retry)) - c.Write(ctx, metrics) + c.retryTime = time.Now().Add(time.Duration(retry) * time.Second) + return fmt.Errorf("Waiting %ds for server before sending metric again", retry) } // This is only until platform spec is fully implemented. As of the From c3e3c42c29585f31c225cae59317497492dcb48b Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 12 Sep 2018 12:24:24 -0600 Subject: [PATCH 07/11] Ignore if no retry-after header is returned --- plugins/outputs/influxdb_v2/http.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 37f88b1b72115..6980b5659e9b8 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -172,7 +172,7 @@ func (g genericRespError) Error() string { func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { if c.retryTime.After(time.Now()) { - return errors.New("E! Retry time has not elapsed") + return errors.New("Retry time has not elapsed") } reader := influx.NewReader(metrics, c.serializer) req, err := c.makeWriteRequest(reader) @@ -206,7 +206,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error retryAfter := resp.Header.Get("Retry-After") retry, err := strconv.Atoi(retryAfter) if err != nil { - return fmt.Errorf("Bad value for 'Retry-After': %s", err.Error()) + retry = 0 } if retry > defaultMaxWait { log.Println("E! [outputs.influxdb_v2] Failed to write metric: retry interval too long") From 3bf912f9bea92a26ffe8f38050091f2ea12d6355 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 12 Sep 2018 14:59:30 -0600 Subject: [PATCH 08/11] Update default encoding, set retry if greater than max --- plugins/outputs/influxdb_v2/http.go | 3 +-- plugins/outputs/influxdb_v2/influxdb.go | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 6980b5659e9b8..1e7061a270a39 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -209,8 +209,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error retry = 0 } if retry > defaultMaxWait { - log.Println("E! [outputs.influxdb_v2] Failed to write metric: retry interval too long") - return nil + retry = defaultMaxWait } c.retryTime = time.Now().Add(time.Duration(retry) * time.Second) return fmt.Errorf("Waiting %ds for server before sending metric again", retry) diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go index 6c73bd6a802d0..a161b326cc878 100644 --- a/plugins/outputs/influxdb_v2/influxdb.go +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -53,7 +53,7 @@ var sampleConfig = ` ## Content-Encoding for write request body, can be set to "gzip" to ## compress body or "identity" to apply no encoding. - # content_encoding = "identity" + # content_encoding = "gzip" ## Optional TLS Config for use on HTTP connections. # tls_ca = "/etc/telegraf/ca.pem" @@ -188,7 +188,8 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U func init() { outputs.Add("influxdb_v2", func() telegraf.Output { return &InfluxDB{ - Timeout: internal.Duration{Duration: time.Second * 5}, + Timeout: internal.Duration{Duration: time.Second * 5}, + ContentEncoding: "gzip", } }) } From b938d419aa8d2dd896fe050a5296b8a4c23b204e Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 12 Sep 2018 15:00:12 -0600 Subject: [PATCH 09/11] Update readme --- plugins/outputs/influxdb_v2/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/outputs/influxdb_v2/README.md b/plugins/outputs/influxdb_v2/README.md index 23693d268b893..2793e5b6e566a 100644 --- a/plugins/outputs/influxdb_v2/README.md +++ b/plugins/outputs/influxdb_v2/README.md @@ -37,7 +37,7 @@ This InfluxDB output plugin writes metrics to the [InfluxDB 2.0](https://github. ## Content-Encoding for write request body, can be set to "gzip" to ## compress body or "identity" to apply no encoding. - # content_encoding = "identity" + # content_encoding = "gzip" ## Optional TLS Config for use on HTTP connections. # tls_ca = "/etc/telegraf/ca.pem" From 66839d6f2b795315bf186aca6efecce88dc8f8fb Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 12 Sep 2018 15:31:22 -0600 Subject: [PATCH 10/11] Force no uint support --- plugins/outputs/influxdb_v2/README.md | 3 +++ plugins/outputs/influxdb_v2/influxdb.go | 9 ++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/plugins/outputs/influxdb_v2/README.md b/plugins/outputs/influxdb_v2/README.md index 2793e5b6e566a..a79d717fbd190 100644 --- a/plugins/outputs/influxdb_v2/README.md +++ b/plugins/outputs/influxdb_v2/README.md @@ -39,6 +39,9 @@ This InfluxDB output plugin writes metrics to the [InfluxDB 2.0](https://github. ## compress body or "identity" to apply no encoding. # content_encoding = "gzip" + ## Enable or disable uint support for writing uints influxdb 2.0. + # uint_support = "disable" + ## Optional TLS Config for use on HTTP connections. # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go index a161b326cc878..a284464c71c9d 100644 --- a/plugins/outputs/influxdb_v2/influxdb.go +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -55,6 +55,9 @@ var sampleConfig = ` ## compress body or "identity" to apply no encoding. # content_encoding = "gzip" + ## Enable or disable uint support for writing uints influxdb 2.0. + # uint_support = "disable" + ## Optional TLS Config for use on HTTP connections. # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" @@ -79,6 +82,7 @@ type InfluxDB struct { HTTPProxy string `toml:"http_proxy"` UserAgent string `toml:"user_agent"` ContentEncoding string `toml:"content_encoding"` + UintSupport string `toml:"uint_support"` tls.ClientConfig clients []Client @@ -93,7 +97,9 @@ func (i *InfluxDB) Connect() error { } i.serializer = influx.NewSerializer() - i.serializer.SetFieldTypeSupport(influx.UintSupport) + if i.UintSupport != "disabled" { + i.serializer.SetFieldTypeSupport(influx.UintSupport) + } for _, u := range i.URLs { parts, err := url.Parse(u) @@ -190,6 +196,7 @@ func init() { return &InfluxDB{ Timeout: internal.Duration{Duration: time.Second * 5}, ContentEncoding: "gzip", + UintSupport: "disabled", } }) } From d561ef393a37eb7da8362753fcfd7c824414b150 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 12 Sep 2018 15:36:53 -0600 Subject: [PATCH 11/11] Match old pluign --- plugins/outputs/influxdb_v2/README.md | 2 +- plugins/outputs/influxdb_v2/influxdb.go | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/plugins/outputs/influxdb_v2/README.md b/plugins/outputs/influxdb_v2/README.md index a79d717fbd190..795f4467c415b 100644 --- a/plugins/outputs/influxdb_v2/README.md +++ b/plugins/outputs/influxdb_v2/README.md @@ -40,7 +40,7 @@ This InfluxDB output plugin writes metrics to the [InfluxDB 2.0](https://github. # content_encoding = "gzip" ## Enable or disable uint support for writing uints influxdb 2.0. - # uint_support = "disable" + # influx_uint_support = false ## Optional TLS Config for use on HTTP connections. # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go index a284464c71c9d..886907c035ebd 100644 --- a/plugins/outputs/influxdb_v2/influxdb.go +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -56,7 +56,7 @@ var sampleConfig = ` # content_encoding = "gzip" ## Enable or disable uint support for writing uints influxdb 2.0. - # uint_support = "disable" + # influx_uint_support = false ## Optional TLS Config for use on HTTP connections. # tls_ca = "/etc/telegraf/ca.pem" @@ -82,7 +82,7 @@ type InfluxDB struct { HTTPProxy string `toml:"http_proxy"` UserAgent string `toml:"user_agent"` ContentEncoding string `toml:"content_encoding"` - UintSupport string `toml:"uint_support"` + UintSupport bool `toml:"influx_uint_support"` tls.ClientConfig clients []Client @@ -97,7 +97,7 @@ func (i *InfluxDB) Connect() error { } i.serializer = influx.NewSerializer() - if i.UintSupport != "disabled" { + if i.UintSupport { i.serializer.SetFieldTypeSupport(influx.UintSupport) } @@ -196,7 +196,6 @@ func init() { return &InfluxDB{ Timeout: internal.Duration{Duration: time.Second * 5}, ContentEncoding: "gzip", - UintSupport: "disabled", } }) }