Skip to content

Commit

Permalink
Fix database routing on retry with exclude_database_tag (influxdata#6486
Browse files Browse the repository at this point in the history
)
  • Loading branch information
danielnelson authored Oct 7, 2019
1 parent ed6f182 commit 5e31ded
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 0 deletions.
3 changes: 3 additions & 0 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

if c.config.ExcludeDatabaseTag {
// Avoid modifying the metric in case we need to retry the request.
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.config.DatabaseTag)
}

Expand Down
58 changes: 58 additions & 0 deletions plugins/outputs/influxdb/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,61 @@ func TestHTTP_UnixSocket(t *testing.T) {
})
}
}

func TestHTTP_WriteDatabaseTagWorksOnRetry(t *testing.T) {
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
r.ParseForm()
require.Equal(t, r.Form["db"], []string{"foo"})

body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu value=42")

w.WriteHeader(http.StatusNoContent)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
}),
)
defer ts.Close()

addr := &url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
}

config := influxdb.HTTPConfig{
URL: addr,
Database: "telegraf",
DatabaseTag: "database",
ExcludeDatabaseTag: true,
Log: testutil.Logger{},
}

client, err := influxdb.NewHTTPClient(config)
require.NoError(t, err)

metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
}

ctx := context.Background()
err = client.Write(ctx, metrics)
require.NoError(t, err)
err = client.Write(ctx, metrics)
require.NoError(t, err)
}
3 changes: 3 additions & 0 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

if c.ExcludeBucketTag {
// Avoid modifying the metric in case we need to retry the request.
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.BucketTag)
}

Expand Down
64 changes: 64 additions & 0 deletions plugins/outputs/influxdb_v2/http_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package influxdb_v2_test

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/influxdata/telegraf"
influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -47,3 +54,60 @@ func TestNewHTTPClient(t *testing.T) {
}
}
}

func TestWriteBucketTagWorksOnRetry(t *testing.T) {
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/write":
r.ParseForm()
require.Equal(t, r.Form["bucket"], []string{"foo"})

body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu value=42")

w.WriteHeader(http.StatusNoContent)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
}),
)
defer ts.Close()

addr := &url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
}

config := &influxdb.HTTPConfig{
URL: addr,
Bucket: "telegraf",
BucketTag: "bucket",
ExcludeBucketTag: true,
}

client, err := influxdb.NewHTTPClient(config)
require.NoError(t, err)

metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"bucket": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
}

ctx := context.Background()
err = client.Write(ctx, metrics)
require.NoError(t, err)
err = client.Write(ctx, metrics)
require.NoError(t, err)
}

0 comments on commit 5e31ded

Please sign in to comment.