Skip to content

Commit

Permalink
Send metrics in FIFO order (#7814)
Browse files Browse the repository at this point in the history
Metrics are send from older to newer metrics, even when outputs is
failing.  In case of buffer full, we still drop the oldest metrics, but
non-dropped metrics are send in the order they are received.

(cherry picked from commit 3ec3f1b)
  • Loading branch information
PierreF authored and danielnelson committed Jul 14, 2020
1 parent 9fb026c commit a0fa8a7
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
- [#7558](https://github.com/influxdata/telegraf/issues/7558): Remove trailing backslash from tag keys/values in influx serializer.
- [#7715](https://github.com/influxdata/telegraf/issues/7715): Fix incorrect Azure SQL DB server properties.
- [#7431](https://github.com/influxdata/telegraf/issues/7431): Fix json unmarshal error in the kibana input.
- [#5633](https://github.com/influxdata/telegraf/issues/5633): Send metrics in FIFO order.

## v1.14.5 [2020-06-30]

Expand Down
60 changes: 27 additions & 33 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (b *Buffer) add(m telegraf.Metric) int {
b.metricDropped(b.buf[b.last])
dropped++

if b.last == b.batchFirst && b.batchSize > 0 {
if b.batchSize > 0 {
b.batchSize--
b.batchFirst = b.next(b.batchFirst)
}
Expand Down Expand Up @@ -146,8 +146,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) int {
return dropped
}

// Batch returns a slice containing up to batchSize of the most recently added
// metrics. Metrics are ordered from newest to oldest in the batch. The
// Batch returns a slice containing up to batchSize of the oldest metrics not
// yet dropped. Metrics are ordered from oldest to newest in the batch. The
// batch must not be modified by the client.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.Lock()
Expand All @@ -159,18 +159,17 @@ func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
return out
}

b.batchFirst = b.cap + b.last - outLen
b.batchFirst %= b.cap
b.batchFirst = b.first
b.batchSize = outLen

batchIndex := b.batchFirst
for i := range out {
out[len(out)-1-i] = b.buf[batchIndex]
out[i] = b.buf[batchIndex]
b.buf[batchIndex] = nil
batchIndex = b.next(batchIndex)
}

b.last = b.batchFirst
b.first = b.nextby(b.first, b.batchSize)
b.size -= outLen
return out
}
Expand Down Expand Up @@ -198,38 +197,22 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
return
}

older := b.dist(b.first, b.batchFirst)
free := b.cap - b.size
restore := min(len(batch), free+older)
restore := min(len(batch), free)
skip := len(batch) - restore

// Rotate newer metrics forward the number of metrics that we can restore.
rb := b.batchFirst
rp := b.last
re := b.nextby(rp, restore)
b.last = re
b.first = b.prevby(b.first, restore)
b.size = min(b.size+restore, b.cap)

for rb != rp && rp != re {
rp = b.prev(rp)
re = b.prev(re)
re := b.first

if b.buf[re] != nil {
b.metricDropped(b.buf[re])
b.first = b.next(b.first)
}

b.buf[re] = b.buf[rp]
b.buf[rp] = nil
}

// Copy metrics from the batch back into the buffer; recall that the
// batch is in reverse order compared to b.buf
// Copy metrics from the batch back into the buffer
for i := range batch {
if i < restore {
re = b.prev(re)
b.buf[re] = batch[i]
b.size = min(b.size+1, b.cap)
} else {
if i < skip {
b.metricDropped(batch[i])
} else {
b.buf[re] = batch[i]
re = b.next(re)
}
}

Expand Down Expand Up @@ -273,6 +256,17 @@ func (b *Buffer) prev(index int) int {
return index
}

// prevby returns the index that is count older with wrapping.
func (b *Buffer) prevby(index, count int) int {
index -= count
for index < 0 {
index += b.cap
}

index %= b.cap
return index
}

func (b *Buffer) resetBatch() {
b.batchFirst = 0
b.batchSize = 0
Expand Down
98 changes: 49 additions & 49 deletions models/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestBuffer_BatchLatest(t *testing.T) {

testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(3),
MetricTime(1),
MetricTime(2),
}, batch)
}
Expand All @@ -177,8 +177,8 @@ func TestBuffer_BatchLatestWrap(t *testing.T) {

testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(2),
MetricTime(3),
}, batch)
}

Expand All @@ -193,17 +193,17 @@ func TestBuffer_MultipleBatch(t *testing.T) {
batch := b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch)
b.Accept(batch)
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(1),
MetricTime(6),
}, batch)
b.Accept(batch)
}
Expand All @@ -223,11 +223,11 @@ func TestBuffer_RejectWithRoom(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch)
}

Expand All @@ -246,11 +246,11 @@ func TestBuffer_RejectNothingNewFull(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch)
}

Expand All @@ -275,11 +275,11 @@ func TestBuffer_RejectNoRoom(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(8),
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(5),
MetricTime(6),
MetricTime(7),
MetricTime(8),
}, batch)
}

Expand All @@ -299,11 +299,11 @@ func TestBuffer_RejectRoomExact(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch)
}

Expand All @@ -324,11 +324,11 @@ func TestBuffer_RejectRoomOverwriteOld(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
MetricTime(6),
}, batch)
}

Expand All @@ -351,11 +351,11 @@ func TestBuffer_RejectPartialRoom(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(4),
MetricTime(5),
MetricTime(6),
MetricTime(7),
}, batch)
}

Expand Down Expand Up @@ -394,11 +394,11 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(12),
MetricTime(13),
MetricTime(14),
MetricTime(15),
}, batch)
}

Expand All @@ -425,11 +425,11 @@ func TestBuffer_RejectWrapped(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(9),
MetricTime(8),
MetricTime(9),
MetricTime(10),
MetricTime(11),
MetricTime(12),
}, batch)
}

Expand Down Expand Up @@ -467,16 +467,16 @@ func TestBuffer_RejectAdjustFirst(t *testing.T) {
batch = b.Batch(10)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(19),
MetricTime(18),
MetricTime(17),
MetricTime(16),
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(11),
MetricTime(12),
MetricTime(13),
MetricTime(14),
MetricTime(15),
MetricTime(16),
MetricTime(17),
MetricTime(18),
MetricTime(19),
}, batch)
}

Expand Down
10 changes: 5 additions & 5 deletions models/running_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
// Verify that 10 metrics were written
assert.Len(t, m.Metrics(), 10)
// Verify that they are in order
expected := append(reverse(next5), reverse(first5)...)
expected := append(first5, next5...)
assert.Equal(t, expected, m.Metrics())
}

Expand Down Expand Up @@ -421,9 +421,9 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
// Verify that 20 metrics were written
assert.Len(t, m.Metrics(), 20)
// Verify that they are in order
expected := append(reverse(next5), reverse(first5)...)
expected = append(expected, reverse(next5)...)
expected = append(expected, reverse(first5)...)
expected := append(first5, next5...)
expected = append(expected, first5...)
expected = append(expected, next5...)
assert.Equal(t, expected, m.Metrics())
}

Expand Down Expand Up @@ -464,7 +464,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
// Verify that 6 metrics were written
assert.Len(t, m.Metrics(), 6)
// Verify that they are in order
expected := []telegraf.Metric{next5[0], first5[4], first5[3], first5[2], first5[1], first5[0]}
expected := []telegraf.Metric{first5[0], first5[1], first5[2], first5[3], first5[4], next5[0]}
assert.Equal(t, expected, m.Metrics())
}

Expand Down

0 comments on commit a0fa8a7

Please sign in to comment.