Skip to content

Commit

Permalink
Update the buffer_size internal metric after writes (influxdata#5314)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Jean-Louis Dupond committed Apr 22, 2019
1 parent a519517 commit 8c5d9a2
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 45 deletions.
29 changes: 26 additions & 3 deletions internal/models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Buffer struct {
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
}

// NewBuffer returns a new empty Buffer with the given capacity.
Expand All @@ -53,7 +55,19 @@ func NewBuffer(name string, capacity int) *Buffer {
"metrics_dropped",
map[string]string{"output": name},
),
BufferSize: selfstat.Register(
"write",
"buffer_size",
map[string]string{"output": name},
),
BufferLimit: selfstat.Register(
"write",
"buffer_limit",
map[string]string{"output": name},
),
}
b.BufferSize.Set(int64(0))
b.BufferLimit.Set(int64(capacity))
return b
}

Expand All @@ -62,7 +76,11 @@ func (b *Buffer) Len() int {
b.Lock()
defer b.Unlock()

return b.size
return b.length()
}

func (b *Buffer) length() int {
return min(b.size+b.batchSize, b.cap)
}

func (b *Buffer) metricAdded() {
Expand Down Expand Up @@ -112,6 +130,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i := range metrics {
b.add(metrics[i])
}

b.BufferSize.Set(int64(b.length()))
}

// Batch returns a slice containing up to batchSize of the most recently added
Expand Down Expand Up @@ -153,6 +173,7 @@ func (b *Buffer) Accept(batch []telegraf.Metric) {
}

b.resetBatch()
b.BufferSize.Set(int64(b.length()))
}

// Reject returns the batch, acquired from Batch(), to the buffer and marks it
Expand All @@ -176,6 +197,7 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {

if b.buf[re] != nil {
b.metricDropped(b.buf[re])
b.first = b.next(b.first)
}

b.buf[re] = b.buf[rp]
Expand All @@ -188,13 +210,14 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
if i < restore {
re = b.prev(re)
b.buf[re] = batch[i]
b.size++
b.size = min(b.size+1, b.cap)
} else {
b.metricDropped(batch[i])
}
}

b.resetBatch()
b.BufferSize.Set(int64(b.length()))
}

// dist returns the distance between two indexes. Because this data structure
Expand All @@ -204,7 +227,7 @@ func (b *Buffer) dist(begin, end int) int {
if begin <= end {
return end - begin
} else {
return b.cap - begin - 1 + end
return b.cap - begin + end
}
}

Expand Down
82 changes: 80 additions & 2 deletions internal/models/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func TestBuffer_RejectPartialRoom(t *testing.T) {
}, batch)
}

func TestBuffer_RejectWrapped(t *testing.T) {
func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
Expand Down Expand Up @@ -402,6 +402,84 @@ func TestBuffer_RejectWrapped(t *testing.T) {
}, batch)
}

func TestBuffer_RejectWrapped(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))

b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Add(MetricTime(8))
batch := b.Batch(3)

b.Add(MetricTime(9))
b.Add(MetricTime(10))
b.Add(MetricTime(11))
b.Add(MetricTime(12))

b.Reject(batch)

batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(9),
MetricTime(8),
}, batch)
}

func TestBuffer_RejectAdjustFirst(t *testing.T) {
b := setup(NewBuffer("test", 10))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(3)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Reject(batch)

b.Add(MetricTime(7))
b.Add(MetricTime(8))
b.Add(MetricTime(9))
batch = b.Batch(3)
b.Add(MetricTime(10))
b.Add(MetricTime(11))
b.Add(MetricTime(12))
b.Reject(batch)

b.Add(MetricTime(13))
b.Add(MetricTime(14))
b.Add(MetricTime(15))
batch = b.Batch(3)
b.Add(MetricTime(16))
b.Add(MetricTime(17))
b.Add(MetricTime(18))
b.Reject(batch)

b.Add(MetricTime(19))

batch = b.Batch(10)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(19),
MetricTime(18),
MetricTime(17),
MetricTime(16),
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(10),
}, batch)
}

func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", 5))
Expand Down Expand Up @@ -509,7 +587,7 @@ func TestBuffer_BatchNotRemoved(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(m, m, m, m, m)
b.Batch(2)
require.Equal(t, 3, b.Len())
require.Equal(t, 5, b.Len())
}

func TestBuffer_BatchRejectAcceptNoop(t *testing.T) {
Expand Down
54 changes: 14 additions & 40 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"log"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -29,23 +30,23 @@ type OutputConfig struct {

// RunningOutput contains the output configuration
type RunningOutput struct {
// Must be 64-bit aligned
newMetricsCount int64

Name string
Output telegraf.Output
Config *OutputConfig
MetricBufferLimit int
MetricBatchSize int

MetricsFiltered selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
WriteTime selfstat.Stat

batch []telegraf.Metric
buffer *Buffer
BatchReady chan time.Time

aggMutex sync.Mutex
batchMutex sync.Mutex
buffer *Buffer

aggMutex sync.Mutex
}

func NewRunningOutput(
Expand All @@ -69,7 +70,6 @@ func NewRunningOutput(
}
ro := &RunningOutput{
Name: name,
batch: make([]telegraf.Metric, 0, batchSize),
buffer: NewBuffer(name, bufferLimit),
BatchReady: make(chan time.Time, 1),
Output: output,
Expand All @@ -81,24 +81,13 @@ func NewRunningOutput(
"metrics_filtered",
map[string]string{"output": name},
),
BufferSize: selfstat.Register(
"write",
"buffer_size",
map[string]string{"output": name},
),
BufferLimit: selfstat.Register(
"write",
"buffer_limit",
map[string]string{"output": name},
),
WriteTime: selfstat.RegisterTiming(
"write",
"write_time_ns",
map[string]string{"output": name},
),
}

ro.BufferLimit.Set(int64(ro.MetricBufferLimit))
return ro
}

Expand Down Expand Up @@ -129,28 +118,16 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
return
}

ro.batchMutex.Lock()

ro.batch = append(ro.batch, metric)
if len(ro.batch) == ro.MetricBatchSize {
ro.addBatchToBuffer()

nBuffer := ro.buffer.Len()
ro.BufferSize.Set(int64(nBuffer))
ro.buffer.Add(metric)

count := atomic.AddInt64(&ro.newMetricsCount, 1)
if count == int64(ro.MetricBatchSize) {
atomic.StoreInt64(&ro.newMetricsCount, 0)
select {
case ro.BatchReady <- time.Now():
default:
}
}

ro.batchMutex.Unlock()
}

// AddBatchToBuffer moves the metrics from the batch into the metric buffer.
func (ro *RunningOutput) addBatchToBuffer() {
ro.buffer.Add(ro.batch...)
ro.batch = ro.batch[:0]
}

// Write writes all metrics to the output, stopping when all have been sent on
Expand All @@ -163,15 +140,12 @@ func (ro *RunningOutput) Write() error {
output.Reset()
ro.aggMutex.Unlock()
}
// add and write can be called concurrently
ro.batchMutex.Lock()
ro.addBatchToBuffer()
ro.batchMutex.Unlock()

nBuffer := ro.buffer.Len()
atomic.StoreInt64(&ro.newMetricsCount, 0)

// Only process the metrics in the buffer now. Metrics added while we are
// writing will be sent on the next call.
nBuffer := ro.buffer.Len()
nBatches := nBuffer/ro.MetricBatchSize + 1
for i := 0; i < nBatches; i++ {
batch := ro.buffer.Batch(ro.MetricBatchSize)
Expand All @@ -189,7 +163,7 @@ func (ro *RunningOutput) Write() error {
return nil
}

// WriteBatch writes only the batch metrics to the output.
// WriteBatch writes a single batch of metrics to the output.
func (ro *RunningOutput) WriteBatch() error {
batch := ro.buffer.Batch(ro.MetricBatchSize)
if len(batch) == 0 {
Expand Down

0 comments on commit 8c5d9a2

Please sign in to comment.