Skip to content

Commit

Permalink
[coordinator] Influxdb write endpoint tag copy fix (#2126)
Browse files Browse the repository at this point in the history
  • Loading branch information
fingon authored Feb 13, 2020
1 parent 914105c commit 4c6ea78
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
38 changes: 35 additions & 3 deletions src/query/api/v1/handler/influxdb/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/m3db/m3/src/cmd/services/m3coordinator/ingest"
"github.com/m3db/m3/src/dbnode/client"
Expand Down Expand Up @@ -188,14 +189,45 @@ func (ii *ingestIterator) Next() bool {
return false
}

func copyTagsWithNewName(t models.Tags, name []byte) models.Tags {
copiedTags := make([]models.Tag, t.Len())
metricName := t.Opts.MetricName()
nameHandled := false
for i, tag := range t.Tags {
if !nameHandled && bytes.Equal(tag.Name, metricName) {
copiedTags[i] = models.Tag{Name: metricName, Value: name}
nameHandled = true
} else {
copiedTags[i] = tag
}
}
return models.Tags{Tags: copiedTags, Opts: t.Opts}
}

func determineTimeUnit(t time.Time) xtime.Unit {
ns := t.UnixNano()
if ns%int64(time.Second) == 0 {
return xtime.Second
}
if ns%int64(time.Millisecond) == 0 {
return xtime.Millisecond
}
if ns%int64(time.Microsecond) == 0 {
return xtime.Microsecond
}
return xtime.Nanosecond
}

func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, xtime.Unit, []byte) {
if ii.pointIndex < len(ii.points) && ii.nextFieldIndex > 0 && len(ii.fields) > (ii.nextFieldIndex-1) {
point := ii.points[ii.pointIndex]
field := ii.fields[ii.nextFieldIndex-1]
tags := ii.tags.SetName(field.name)
tags := copyTagsWithNewName(ii.tags, field.name)

t := point.Time()

return tags, []ts.Datapoint{ts.Datapoint{Timestamp: point.Time(),
Value: field.value}}, xtime.Nanosecond, nil
return tags, []ts.Datapoint{ts.Datapoint{Timestamp: t,
Value: field.value}}, determineTimeUnit(t), nil
}
return models.EmptyTags(), nil, 0, nil
}
Expand Down
33 changes: 33 additions & 0 deletions src/query/api/v1/handler/influxdb/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ package influxdb
import (
"fmt"
"testing"
"time"

xtime "github.com/m3db/m3/src/x/time"
imodels "github.com/influxdata/influxdb/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -94,3 +96,34 @@ func TestIngestIteratorDuplicateNameTag(t *testing.T) {
}
require.EqualError(t, iter.Error(), "non-unique Prometheus label __name__")
}

func TestIngestIteratorIssue2125(t *testing.T) {
// In the issue, the Tags object is reused across Next()+Current() calls
s := `measure,lab=foo k1=1,k2=2 1574838670386469800
`
points, err := imodels.ParsePoints([]byte(s))
require.NoError(t, err)

iter := &ingestIterator{points: points, promRewriter: newPromRewriter()}
require.NoError(t, iter.Error())

assert.True(t, iter.Next())
t1, _, _, _ := iter.Current()

assert.True(t, iter.Next())
t2, _, _, _ := iter.Current()
require.NoError(t, iter.Error())

assert.Equal(t, t1.String(), "__name__: measure_k1, lab: foo")
assert.Equal(t, t2.String(), "__name__: measure_k2, lab: foo")
}

func TestDetermineTimeUnit(t *testing.T) {
now := time.Now()
zerot := now.Add(time.Duration(-now.UnixNano() % int64(time.Second)))
assert.Equal(t, determineTimeUnit(zerot.Add(1*time.Second)), xtime.Second)
assert.Equal(t, determineTimeUnit(zerot.Add(2*time.Millisecond)), xtime.Millisecond)
assert.Equal(t, determineTimeUnit(zerot.Add(3*time.Microsecond)), xtime.Microsecond)
assert.Equal(t, determineTimeUnit(zerot.Add(4*time.Nanosecond)), xtime.Nanosecond)

}

0 comments on commit 4c6ea78

Please sign in to comment.