From 9dd3b163330a290910dec86b1ae71635842a2cd3 Mon Sep 17 00:00:00 2001 From: Markus Stenberg Date: Thu, 30 Jan 2020 14:41:48 +0200 Subject: [PATCH 1/2] [coordinator] Influxdb write endpoint tag copy fix Reusing Tags objects directly over the iteration did not work. Second best performing option was to reuse the Tag objects within array, but replace the Tag with the __name__ with new one. Fixes issue #2125. --- src/query/api/v1/handler/influxdb/write.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go index ba8546e6b9..b8f84b3e4b 100644 --- a/src/query/api/v1/handler/influxdb/write.go +++ b/src/query/api/v1/handler/influxdb/write.go @@ -188,11 +188,26 @@ func (ii *ingestIterator) Next() bool { return false } +func copyTagsWithNewName(t models.Tags, name []byte) models.Tags { + copiedTags := make([]models.Tag, t.Len()) + metricName := t.Opts.MetricName() + nameHandled := false + for i, tag := range t.Tags { + if !nameHandled && bytes.Equal(tag.Name, metricName) { + copiedTags[i] = models.Tag{Name: metricName, Value: name} + nameHandled = true + } else { + copiedTags[i] = tag + } + } + return models.Tags{Tags: copiedTags, Opts: t.Opts} +} + func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, xtime.Unit, []byte) { if ii.pointIndex < len(ii.points) && ii.nextFieldIndex > 0 && len(ii.fields) > (ii.nextFieldIndex-1) { point := ii.points[ii.pointIndex] field := ii.fields[ii.nextFieldIndex-1] - tags := ii.tags.SetName(field.name) + tags := copyTagsWithNewName(ii.tags, field.name) return tags, []ts.Datapoint{ts.Datapoint{Timestamp: point.Time(), Value: field.value}}, xtime.Nanosecond, nil From 1739c846728139f78c47400f543a70c231315ede Mon Sep 17 00:00:00 2001 From: Markus Stenberg Date: Wed, 12 Feb 2020 08:56:31 +0200 Subject: [PATCH 2/2] [coordinator] Influxdb write precision estimation Even with fully per-second data points, this seems to shave off ~10% in my test env; not as much as I had hoped. --- src/query/api/v1/handler/influxdb/write.go | 21 ++++++++++-- .../api/v1/handler/influxdb/write_test.go | 33 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go index b8f84b3e4b..95aa614941 100644 --- a/src/query/api/v1/handler/influxdb/write.go +++ b/src/query/api/v1/handler/influxdb/write.go @@ -26,6 +26,7 @@ import ( "fmt" "io/ioutil" "net/http" + "time" "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/dbnode/client" @@ -203,14 +204,30 @@ func copyTagsWithNewName(t models.Tags, name []byte) models.Tags { return models.Tags{Tags: copiedTags, Opts: t.Opts} } +func determineTimeUnit(t time.Time) xtime.Unit { + ns := t.UnixNano() + if ns%int64(time.Second) == 0 { + return xtime.Second + } + if ns%int64(time.Millisecond) == 0 { + return xtime.Millisecond + } + if ns%int64(time.Microsecond) == 0 { + return xtime.Microsecond + } + return xtime.Nanosecond +} + func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, xtime.Unit, []byte) { if ii.pointIndex < len(ii.points) && ii.nextFieldIndex > 0 && len(ii.fields) > (ii.nextFieldIndex-1) { point := ii.points[ii.pointIndex] field := ii.fields[ii.nextFieldIndex-1] tags := copyTagsWithNewName(ii.tags, field.name) - return tags, []ts.Datapoint{ts.Datapoint{Timestamp: point.Time(), - Value: field.value}}, xtime.Nanosecond, nil + t := point.Time() + + return tags, []ts.Datapoint{ts.Datapoint{Timestamp: t, + Value: field.value}}, determineTimeUnit(t), nil } return models.EmptyTags(), nil, 0, nil } diff --git a/src/query/api/v1/handler/influxdb/write_test.go b/src/query/api/v1/handler/influxdb/write_test.go index 4314f6ff88..d3aa2c3bd1 100644 --- a/src/query/api/v1/handler/influxdb/write_test.go +++ b/src/query/api/v1/handler/influxdb/write_test.go @@ -23,7 +23,9 @@ package influxdb import ( "fmt" "testing" + "time" + xtime "github.com/m3db/m3/src/x/time" imodels "github.com/influxdata/influxdb/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -94,3 +96,34 @@ func TestIngestIteratorDuplicateNameTag(t *testing.T) { } require.EqualError(t, iter.Error(), "non-unique Prometheus label __name__") } + +func TestIngestIteratorIssue2125(t *testing.T) { + // In the issue, the Tags object is reused across Next()+Current() calls + s := `measure,lab=foo k1=1,k2=2 1574838670386469800 +` + points, err := imodels.ParsePoints([]byte(s)) + require.NoError(t, err) + + iter := &ingestIterator{points: points, promRewriter: newPromRewriter()} + require.NoError(t, iter.Error()) + + assert.True(t, iter.Next()) + t1, _, _, _ := iter.Current() + + assert.True(t, iter.Next()) + t2, _, _, _ := iter.Current() + require.NoError(t, iter.Error()) + + assert.Equal(t, t1.String(), "__name__: measure_k1, lab: foo") + assert.Equal(t, t2.String(), "__name__: measure_k2, lab: foo") +} + +func TestDetermineTimeUnit(t *testing.T) { + now := time.Now() + zerot := now.Add(time.Duration(-now.UnixNano() % int64(time.Second))) + assert.Equal(t, determineTimeUnit(zerot.Add(1*time.Second)), xtime.Second) + assert.Equal(t, determineTimeUnit(zerot.Add(2*time.Millisecond)), xtime.Millisecond) + assert.Equal(t, determineTimeUnit(zerot.Add(3*time.Microsecond)), xtime.Microsecond) + assert.Equal(t, determineTimeUnit(zerot.Add(4*time.Nanosecond)), xtime.Nanosecond) + +}