Skip to content

Commit

Permalink
Adding support for new lines in influx line protocol fields. (influxd…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivorybilled authored Dec 4, 2020
1 parent 2187bac commit f7950be
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
34 changes: 34 additions & 0 deletions plugins/processors/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
)
Expand Down Expand Up @@ -117,6 +118,12 @@ func (e *Execd) Stop() error {
}

func (e *Execd) cmdReadOut(out io.Reader) {
// Prefer using the StreamParser when parsing influx format.
if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser {
e.cmdReadOutStream(out)
return
}

scanner := bufio.NewScanner(out)
scanBuf := make([]byte, 4096)
scanner.Buffer(scanBuf, 262144)
Expand All @@ -137,6 +144,33 @@ func (e *Execd) cmdReadOut(out io.Reader) {
}
}

func (e *Execd) cmdReadOutStream(out io.Reader) {
parser := influx.NewStreamParser(out)

for {
metric, err := parser.Next()

if err != nil {
// Stop parsing when we've reached the end.
if err == influx.EOF {
break
}

if parseErr, isParseError := err.(*influx.ParseError); isParseError {
// Continue past parse errors.
e.acc.AddError(parseErr)
continue
}

// Stop reading on any non-recoverable error.
e.acc.AddError(err)
return
}

e.acc.AddMetric(metric)
}
}

func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out)

Expand Down
50 changes: 50 additions & 0 deletions plugins/processors/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,56 @@ func TestExternalProcessorWorks(t *testing.T) {
}
}

func TestParseLinesWithNewLines(t *testing.T) {
e := New()
e.Log = testutil.Logger{}

exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)
e.Command = []string{exe, "-countmultiplier"}
e.RestartDelay = config.Duration(5 * time.Second)

acc := &testutil.Accumulator{}

require.NoError(t, e.Start(acc))

now := time.Now()
orig := now

m, err := metric.New("test",
map[string]string{
"author": "Mr. Gopher",
},
map[string]interface{}{
"phrase": "Gophers are amazing creatures.\nAbsolutely amazing.",
"count": 3,
},
now)

require.NoError(t, err)

e.Add(m, acc)

acc.Wait(1)
require.NoError(t, e.Stop())

processedMetric := acc.GetTelegrafMetrics()[0]

expectedMetric := testutil.MustMetric("test",
map[string]string{
"author": "Mr. Gopher",
},
map[string]interface{}{
"phrase": "Gophers are amazing creatures.\nAbsolutely amazing.",
"count": 6,
},
orig,
)

testutil.RequireMetricEqual(t, expectedMetric, processedMetric)
}

var countmultiplier = flag.Bool("countmultiplier", false,
"if true, act like line input program instead of test")

Expand Down

0 comments on commit f7950be

Please sign in to comment.