diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go index ba8546e6b9..95aa614941 100644 --- a/src/query/api/v1/handler/influxdb/write.go +++ b/src/query/api/v1/handler/influxdb/write.go @@ -26,6 +26,7 @@ import ( "fmt" "io/ioutil" "net/http" + "time" "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/dbnode/client" @@ -188,14 +189,45 @@ func (ii *ingestIterator) Next() bool { return false } +func copyTagsWithNewName(t models.Tags, name []byte) models.Tags { + copiedTags := make([]models.Tag, t.Len()) + metricName := t.Opts.MetricName() + nameHandled := false + for i, tag := range t.Tags { + if !nameHandled && bytes.Equal(tag.Name, metricName) { + copiedTags[i] = models.Tag{Name: metricName, Value: name} + nameHandled = true + } else { + copiedTags[i] = tag + } + } + return models.Tags{Tags: copiedTags, Opts: t.Opts} +} + +func determineTimeUnit(t time.Time) xtime.Unit { + ns := t.UnixNano() + if ns%int64(time.Second) == 0 { + return xtime.Second + } + if ns%int64(time.Millisecond) == 0 { + return xtime.Millisecond + } + if ns%int64(time.Microsecond) == 0 { + return xtime.Microsecond + } + return xtime.Nanosecond +} + func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, xtime.Unit, []byte) { if ii.pointIndex < len(ii.points) && ii.nextFieldIndex > 0 && len(ii.fields) > (ii.nextFieldIndex-1) { point := ii.points[ii.pointIndex] field := ii.fields[ii.nextFieldIndex-1] - tags := ii.tags.SetName(field.name) + tags := copyTagsWithNewName(ii.tags, field.name) + + t := point.Time() - return tags, []ts.Datapoint{ts.Datapoint{Timestamp: point.Time(), - Value: field.value}}, xtime.Nanosecond, nil + return tags, []ts.Datapoint{ts.Datapoint{Timestamp: t, + Value: field.value}}, determineTimeUnit(t), nil } return models.EmptyTags(), nil, 0, nil } diff --git a/src/query/api/v1/handler/influxdb/write_test.go b/src/query/api/v1/handler/influxdb/write_test.go index 4314f6ff88..d3aa2c3bd1 100644 --- a/src/query/api/v1/handler/influxdb/write_test.go +++ b/src/query/api/v1/handler/influxdb/write_test.go @@ -23,7 +23,9 @@ package influxdb import ( "fmt" "testing" + "time" + xtime "github.com/m3db/m3/src/x/time" imodels "github.com/influxdata/influxdb/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -94,3 +96,34 @@ func TestIngestIteratorDuplicateNameTag(t *testing.T) { } require.EqualError(t, iter.Error(), "non-unique Prometheus label __name__") } + +func TestIngestIteratorIssue2125(t *testing.T) { + // In the issue, the Tags object is reused across Next()+Current() calls + s := `measure,lab=foo k1=1,k2=2 1574838670386469800 +` + points, err := imodels.ParsePoints([]byte(s)) + require.NoError(t, err) + + iter := &ingestIterator{points: points, promRewriter: newPromRewriter()} + require.NoError(t, iter.Error()) + + assert.True(t, iter.Next()) + t1, _, _, _ := iter.Current() + + assert.True(t, iter.Next()) + t2, _, _, _ := iter.Current() + require.NoError(t, iter.Error()) + + assert.Equal(t, t1.String(), "__name__: measure_k1, lab: foo") + assert.Equal(t, t2.String(), "__name__: measure_k2, lab: foo") +} + +func TestDetermineTimeUnit(t *testing.T) { + now := time.Now() + zerot := now.Add(time.Duration(-now.UnixNano() % int64(time.Second))) + assert.Equal(t, determineTimeUnit(zerot.Add(1*time.Second)), xtime.Second) + assert.Equal(t, determineTimeUnit(zerot.Add(2*time.Millisecond)), xtime.Millisecond) + assert.Equal(t, determineTimeUnit(zerot.Add(3*time.Microsecond)), xtime.Microsecond) + assert.Equal(t, determineTimeUnit(zerot.Add(4*time.Nanosecond)), xtime.Nanosecond) + +}