Skip to content

Commit

Permalink
feat(influxdb): queue batches to the influxdb if it's slowing down
Browse files Browse the repository at this point in the history
Previously to this k6 will write to influxdb every second, but if that
write took more than 1 second it won't start a second write but instead
the wait for it. This will generally lead to the write times getting
bigger and bigger as more and more data is being written until the max
body that influxdb will take is reached when it will return an error and
k6 will drop that data.

With this commit there will be a configurable number of parallel writes
(10 by default) that will trigger again every 1 second (also now
configurable), but if those get exhausted it will start queueing the
samples each second instead of combining them and than writing a big
chunk which has a chance of hitting the max body.

I tested with a simple script doing batch request for an empty local
file with 40VUs. Without an output it was getting 8.1K RPS with 650MB of
memory usage.

previous to this commit the usage of ram was ~5.7GB for 5736 rps and
practically all the data gets lost if you don't up the max body and even
than a lot of the data is lost while the memory usage goes up.

After this commit the usage of ram was ~2.4GB(or less in some of the
tests) with 6273 RPS and there was no lost of data.

Even with this commit doing 2 hour of that simple script dies after
1hour and 35 minutes using around 15GB (the test system has 16). Can't
be sure of lost of data, as influxdb eat 32GB of memory trying to
visualize it.

Some minor problems with this solution is that:
1. We use a lot of goroutines if things start slowing down - probably
not a big problem
2. We can probably better batch stuff if we add/keep all the unsend
samples together
3. By far the biggest: because the writes are slow if the test is
stopped (with Ctrl+C) or it finishes naturally, waiting for those writes
can take considerable amount of time - in the above example the 4
minutes tests generally took around 5 minutes :(

All of those can be better handled with some more sophisticated queueing
code at later time.

closes #1081, fixes #1100, fixes #182
  • Loading branch information
mstoykov committed Aug 13, 2019
1 parent f8a88b6 commit 69a3454
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 21 deletions.
37 changes: 24 additions & 13 deletions stats/influxdb/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package influxdb

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -31,10 +32,6 @@ import (
log "github.com/sirupsen/logrus"
)

const (
pushInterval = 1 * time.Second
)

// Verify that Collector implements lib.Collector
var _ lib.Collector = &Collector{}

Expand All @@ -43,8 +40,10 @@ type Collector struct {
Config Config
BatchConf client.BatchPointsConfig

buffer []stats.Sample
bufferLock sync.Mutex
buffer []stats.Sample
bufferLock sync.Mutex
wg sync.WaitGroup
semaphoreCh chan struct{}
}

func New(conf Config) (*Collector, error) {
Expand All @@ -53,10 +52,14 @@ func New(conf Config) (*Collector, error) {
return nil, err
}
batchConf := MakeBatchConfig(conf)
if conf.ConcurrentWrites.Int64 <= 0 {
return nil, errors.New("influxdb's ConcurrentWrites must be a possitive number")
}
return &Collector{
Client: cl,
Config: conf,
BatchConf: batchConf,
Client: cl,
Config: conf,
BatchConf: batchConf,
semaphoreCh: make(chan struct{}, conf.ConcurrentWrites.Int64),
}, nil
}

Expand All @@ -73,13 +76,16 @@ func (c *Collector) Init() error {

func (c *Collector) Run(ctx context.Context) {
log.Debug("InfluxDB: Running!")
ticker := time.NewTicker(pushInterval)
ticker := time.NewTicker(time.Duration(c.Config.PushInterval.Duration))
for {
select {
case <-ticker.C:
c.commit()
c.wg.Add(1)
go c.commit()
case <-ctx.Done():
c.commit()
c.wg.Add(1)
go c.commit()
c.wg.Wait()
return
}
}
Expand All @@ -98,11 +104,16 @@ func (c *Collector) Link() string {
}

func (c *Collector) commit() {
defer c.wg.Done()
c.bufferLock.Lock()
samples := c.buffer
c.buffer = nil
c.bufferLock.Unlock()

// let first get the data and then wait our turn
c.semaphoreCh <- struct{}{}
defer func() {
<-c.semaphoreCh
}()
log.Debug("InfluxDB: Committing...")

batch, err := c.batchFromSamples(samples)
Expand Down
118 changes: 118 additions & 0 deletions stats/influxdb/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package influxdb

import (
"bytes"
"context"
"io"
"net"
"net/http"
"sync"
"testing"
"time"

"github.com/loadimpact/k6/stats"
"github.com/stretchr/testify/require"
null "gopkg.in/guregu/null.v3"
)

func TestBadConcurrentWrites(t *testing.T) {
c := NewConfig()
t.Run("0", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(0)
_, err := New(*c)
require.Error(t, err)
require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a possitive number")
})

t.Run("-2", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(-2)
_, err := New(*c)
require.Error(t, err)
require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a possitive number")
})

t.Run("2", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(2)
_, err := New(*c)
require.NoError(t, err)
})
}

func testCollectorCycle(t testing.TB, handler http.HandlerFunc, body func(testing.TB, *Collector)) {
s := &http.Server{
Addr: ":",
Handler: handler,
MaxHeaderBytes: 1 << 20,
}
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer func() {
_ = l.Close()
}()

defer func() {
require.NoError(t, s.Shutdown(context.Background()))
}()

go func() {
require.Equal(t, http.ErrServerClosed, s.Serve(l))
}()

config := NewConfig()
config.Addr = null.StringFrom("http://" + l.Addr().String())
c, err := New(*config)
require.NoError(t, err)

require.NoError(t, c.Init())
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
c.Run(ctx)
}()

body(t, c)

cancel()
wg.Wait()
}
func TestCollector(t *testing.T) {
var samplesRead int
defer func() {
require.Equal(t, samplesRead, 20)
}()
testCollectorCycle(t, func(rw http.ResponseWriter, r *http.Request) {
var b = bytes.NewBuffer(nil)
_, _ = io.Copy(b, r.Body)
for {
s, err := b.ReadString('\n')
if len(s) > 0 {
samplesRead++
}
if err != nil {
break
}
}

rw.WriteHeader(204)
}, func(tb testing.TB, c *Collector) {
var samples = make(stats.Samples, 10)
for i := 0; i < len(samples); i++ {
samples[i] = stats.Sample{
Metric: stats.New("testGauge", stats.Gauge),
Time: time.Now(),
Tags: stats.NewSampleTags(map[string]string{
"something": "else",
"VU": "21",
"else": "something",
}),
Value: 2.0,
}
}
c.Collect([]stats.SampleContainer{samples})
c.Collect([]stats.SampleContainer{samples})
})

}
44 changes: 36 additions & 8 deletions stats/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"strconv"
"strings"
"time"

"github.com/kubernetes/helm/pkg/strvals"
"github.com/loadimpact/k6/lib/types"
Expand All @@ -34,11 +35,13 @@ import (

type Config struct {
// Connection.
Addr null.String `json:"addr" envconfig:"INFLUXDB_ADDR"`
Username null.String `json:"username,omitempty" envconfig:"INFLUXDB_USERNAME"`
Password null.String `json:"password,omitempty" envconfig:"INFLUXDB_PASSWORD"`
Insecure null.Bool `json:"insecure,omitempty" envconfig:"INFLUXDB_INSECURE"`
PayloadSize null.Int `json:"payloadSize,omitempty" envconfig:"INFLUXDB_PAYLOAD_SIZE"`
Addr null.String `json:"addr" envconfig:"INFLUXDB_ADDR"`
Username null.String `json:"username,omitempty" envconfig:"INFLUXDB_USERNAME"`
Password null.String `json:"password,omitempty" envconfig:"INFLUXDB_PASSWORD"`
Insecure null.Bool `json:"insecure,omitempty" envconfig:"INFLUXDB_INSECURE"`
PayloadSize null.Int `json:"payloadSize,omitempty" envconfig:"INFLUXDB_PAYLOAD_SIZE"`
PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"INFLUXDB_PUSH_INTERVAL"`
ConcurrentWrites null.Int `json:"concurrentWrites,omitempty" envconfig:"INFLUXDB_CONCURRENT_WRITES"`

// Samples.
DB null.String `json:"db" envconfig:"INFLUXDB_DB"`
Expand All @@ -50,9 +53,11 @@ type Config struct {

func NewConfig() *Config {
c := &Config{
Addr: null.NewString("http://localhost:8086", false),
DB: null.NewString("k6", false),
TagsAsFields: []string{"vu", "iter", "url"},
Addr: null.NewString("http://localhost:8086", false),
DB: null.NewString("k6", false),
TagsAsFields: []string{"vu", "iter", "url"},
ConcurrentWrites: null.NewInt(10, false),
PushInterval: types.NewNullDuration(time.Second, false),
}
return c
}
Expand Down Expand Up @@ -88,6 +93,13 @@ func (c Config) Apply(cfg Config) Config {
if len(cfg.TagsAsFields) > 0 {
c.TagsAsFields = cfg.TagsAsFields
}
if cfg.PushInterval.Valid {
c.PushInterval = cfg.PushInterval
}

if cfg.ConcurrentWrites.Valid {
c.ConcurrentWrites = cfg.ConcurrentWrites
}
return c
}

Expand Down Expand Up @@ -154,13 +166,29 @@ func ParseURL(text string) (Config, error) {
case "payload_size":
var size int
size, err = strconv.Atoi(vs[0])
if err != nil {
return c, err
}
c.PayloadSize = null.IntFrom(int64(size))
case "precision":
c.Precision = null.StringFrom(vs[0])
case "retention":
c.Retention = null.StringFrom(vs[0])
case "consistency":
c.Consistency = null.StringFrom(vs[0])

case "pushInterval":
err = c.PushInterval.UnmarshalText([]byte(vs[0]))
if err != nil {
return c, err
}
case "concurrentWrites":
var writes int
writes, err = strconv.Atoi(vs[0])
if err != nil {
return c, err
}
c.ConcurrentWrites = null.IntFrom(int64(writes))
case "tagsAsFields":
c.TagsAsFields = vs
default:
Expand Down
1 change: 1 addition & 0 deletions stats/kafka/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestRun(t *testing.T) {

func TestFormatSamples(t *testing.T) {
c := Collector{}
c.Config.InfluxDBConfig.ConcurrentWrites = null.IntFrom(10)
metric := stats.New("my_metric", stats.Gauge)
samples := stats.Samples{
{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})},
Expand Down

0 comments on commit 69a3454

Please sign in to comment.