Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/adding event hubs output plugin #1755

Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d0673f0
initial eventhubs implementation
polatengin Oct 20, 2020
5089e8b
event properties set to sample tags
polatengin Oct 20, 2020
2932a14
buffer and sender implemented
polatengin Oct 22, 2020
cd1cda5
option to send events one by one, added
polatengin Oct 22, 2020
a829d96
comment for hubtype structure added
polatengin Nov 26, 2020
86492c2
config test created
polatengin Nov 26, 2020
5936e48
collector test created
polatengin Nov 26, 2020
8b219fd
Merge branch 'master' into feature/adding-event-hubs-output-plugin
polatengin Dec 6, 2020
d03022b
Update stats/eventhubs/collector.go
polatengin Dec 8, 2020
7cbee51
Update stats/eventhubs/collector_test.go
polatengin Dec 8, 2020
8e132a8
Update stats/eventhubs/config.go
polatengin Dec 8, 2020
c9ad512
Update stats/eventhubs/config.go
polatengin Dec 8, 2020
117b7d9
Update stats/eventhubs/config_test.go
polatengin Dec 8, 2020
3535731
c.logger used instead of fmt
polatengin Dec 8, 2020
7c6b9ee
ran go mod vendor and go mod tidy commands
polatengin Dec 8, 2020
88e422d
Update stats/eventhubs/config.go
polatengin Dec 9, 2020
42cf0e8
Update stats/eventhubs/config_test.go
polatengin Dec 10, 2020
df5615d
sql dependency removed from config test
polatengin Jan 13, 2021
f555efb
fixing statsd mistake
polatengin Jan 20, 2021
0bf66ab
buffer option made mandatory, instead of optional
polatengin Jan 25, 2021
ee9e408
buffer test removed
polatengin Jan 25, 2021
51a5df4
sending events from buffer moved to pushMetrics from Collect method
polatengin Jan 25, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions cmd/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@ import (
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/stats/csv"
"github.com/loadimpact/k6/stats/datadog"
"github.com/loadimpact/k6/stats/eventhubs"
"github.com/loadimpact/k6/stats/influxdb"
jsonc "github.com/loadimpact/k6/stats/json"
"github.com/loadimpact/k6/stats/kafka"
"github.com/loadimpact/k6/stats/statsd"
)

const (
collectorInfluxDB = "influxdb"
collectorJSON = "json"
collectorKafka = "kafka"
collectorCloud = "cloud"
collectorStatsD = "statsd"
collectorDatadog = "datadog"
collectorCSV = "csv"
collectorInfluxDB = "influxdb"
collectorJSON = "json"
collectorKafka = "kafka"
collectorCloud = "cloud"
collectorStatsD = "statsd"
collectorDatadog = "datadog"
collectorCSV = "csv"
collectorEventHubs = "eventhubs"
)

func parseCollector(s string) (t, arg string) {
Expand Down Expand Up @@ -140,6 +142,14 @@ func getCollector(

return csv.New(logger, afero.NewOsFs(), conf.SystemTags.Map(), config)

case collectorEventHubs:
config := eventhubs.NewConfig().Apply(conf.Collectors.EventHubs)
if err := envconfig.Process("k6_eventhubs", &config); err != nil {
return nil, err
}

return eventhubs.New(logger, config)

default:
return nil, errors.Errorf("unknown output type: %s", collectorName)
}
Expand Down
16 changes: 10 additions & 6 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/stats/csv"
"github.com/loadimpact/k6/stats/datadog"
"github.com/loadimpact/k6/stats/eventhubs"
"github.com/loadimpact/k6/stats/influxdb"
"github.com/loadimpact/k6/stats/kafka"
"github.com/loadimpact/k6/stats/statsd"
Expand Down Expand Up @@ -74,12 +75,13 @@ type Config struct {
SummaryExport null.String `json:"summaryExport" envconfig:"K6_SUMMARY_EXPORT"`

Collectors struct {
InfluxDB influxdb.Config `json:"influxdb"`
Kafka kafka.Config `json:"kafka"`
Cloud cloud.Config `json:"cloud"`
StatsD statsd.Config `json:"statsd"`
Datadog datadog.Config `json:"datadog"`
CSV csv.Config `json:"csv"`
InfluxDB influxdb.Config `json:"influxdb"`
Kafka kafka.Config `json:"kafka"`
Cloud cloud.Config `json:"cloud"`
StatsD common.Config `json:"statsd"`
Datadog datadog.Config `json:"datadog"`
CSV csv.Config `json:"csv"`
EventHubs eventhubs.Config `json:"eventhubs"`
} `json:"collectors"`
}

Expand Down Expand Up @@ -118,6 +120,7 @@ func (c Config) Apply(cfg Config) Config {
c.Collectors.StatsD = c.Collectors.StatsD.Apply(cfg.Collectors.StatsD)
c.Collectors.Datadog = c.Collectors.Datadog.Apply(cfg.Collectors.Datadog)
c.Collectors.CSV = c.Collectors.CSV.Apply(cfg.Collectors.CSV)
c.Collectors.EventHubs = c.Collectors.EventHubs.Apply(cfg.Collectors.EventHubs)
return c
}

Expand Down Expand Up @@ -220,6 +223,7 @@ func getConsolidatedConfig(fs afero.Fs, cliConf Config, runner lib.Runner) (conf
cliConf.Collectors.Kafka = kafka.NewConfig().Apply(cliConf.Collectors.Kafka)
cliConf.Collectors.StatsD = statsd.NewConfig().Apply(cliConf.Collectors.StatsD)
cliConf.Collectors.Datadog = datadog.NewConfig().Apply(cliConf.Collectors.Datadog)
cliConf.Collectors.EventHubs = eventhubs.NewConfig().Apply(cliConf.Collectors.EventHubs)

fileConf, _, err := readDiskConfig(fs)
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/loadimpact/k6
go 1.14

require (
github.com/Azure/azure-event-hubs-go/v3 v3.3.1
github.com/Azure/go-ntlmssp v0.0.0-20180810175552-4a21cbd618b4
github.com/DataDog/datadog-go v0.0.0-20180330214955-e67964b4021a
github.com/GeertJohan/go.rice v0.0.0-20170420135705-c02ca9a983da
Expand Down Expand Up @@ -47,21 +48,21 @@ require (
github.com/mattn/go-colorable v0.0.9
github.com/mattn/go-isatty v0.0.4
github.com/mccutchen/go-httpbin v1.1.2-0.20190116014521-c5cb2f4802fa
github.com/mitchellh/mapstructure v0.0.0-20180220230111-00c29f56e238
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
github.com/mitchellh/mapstructure v1.3.3
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d // indirect
github.com/onsi/ginkgo v1.14.0 // indirect
github.com/oxtoacart/bpool v0.0.0-20150712133111-4e1c5567d7c2
github.com/pierrec/lz4 v1.0.2-0.20171218195038-2fcda4cb7018 // indirect
github.com/pierrec/xxHash v0.1.1 // indirect
github.com/pkg/errors v0.8.0
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 // indirect
github.com/serenize/snaker v0.0.0-20171204205717-a683aaf2d516
github.com/sirupsen/logrus v1.6.0
github.com/spf13/afero v1.1.1
github.com/spf13/cobra v0.0.4-0.20180629152535-a114f312e075
github.com/spf13/pflag v1.0.1
github.com/stretchr/testify v1.2.2
github.com/stretchr/testify v1.6.1
github.com/tidwall/gjson v1.6.1
github.com/tidwall/pretty v1.0.2
github.com/ugorji/go v1.1.7 // indirect
Expand All @@ -71,6 +72,7 @@ require (
github.com/zyedidia/highlight v0.0.0-20170330143449-201131ce5cf5
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/net v0.0.0-20201008223702-a5fa9d4b7c91
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 // indirect
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f // indirect
golang.org/x/text v0.3.3
golang.org/x/time v0.0.0-20170927054726-6dc17368e09b
Expand Down
176 changes: 176 additions & 0 deletions stats/eventhubs/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2017 Load Impact
polatengin marked this conversation as resolved.
Show resolved Hide resolved
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package eventhubs

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/stats"

eh "github.com/Azure/azure-event-hubs-go/v3"
)

// Collector sends result data to the Load Impact cloud service.
type Collector struct {
Comment on lines +40 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update all documentation comments for this collector.

config Config

client *eh.Hub

ctx context.Context

buffer []*eh.Event
bufferLock sync.Mutex

logger logrus.FieldLogger
opts lib.Options
}

// Verify that Collector implements lib.Collector
var _ lib.Collector = &Collector{}

// New creates a new cloud collector
func New(logger logrus.FieldLogger, conf Config) (*Collector, error) {
hub, _ := eh.NewHubFromConnectionString(conf.ConnectionString.String)
return &Collector{
config: conf,
client: hub,
buffer: make([]*eh.Event, 0),
logger: logger,
}, nil
}

// Init is called between the collector's creation and the call to Run().
// You should do any lengthy setup here rather than in New.
func (c *Collector) Init() error {
return nil
}

// Link return a link that is shown to the user.
func (c *Collector) Link() string {
return ""
}

// Run is called in a goroutine and starts the collector. Should commit samples to the backend
// at regular intervals and when the context is terminated.
func (c *Collector) Run(ctx context.Context) {
c.ctx = ctx

if c.config.BufferEnabled.Bool {
ticker := time.NewTicker(time.Duration(c.config.PushInterval.Duration))

for {
select {
case <-ticker.C:
c.pushMetrics()
case <-ctx.Done():
c.pushMetrics()
c.finish()
return
}
}
}
}

// HubEvent holds data to be sent to EventHubs
type HubEvent struct {
Time time.Time `json:"time"`
Value float64 `json:"value"`
Tags *stats.SampleTags `json:"tags"`
Name string `json:"name"`
Contains string `json:"contains"`
}

// Collect receives a set of samples. This method is never called concurrently, and only while
// the context for Run() is valid, but should defer as much work as possible to Run().
func (c *Collector) Collect(sampleContainers []stats.SampleContainer) {
for _, sampleContainer := range sampleContainers {
for _, sample := range sampleContainer.GetSamples() {
if &sample == nil {
fmt.Println("sample is null")
polatengin marked this conversation as resolved.
Show resolved Hide resolved

continue
}

data := HubEvent{
Time: sample.Time,
Value: sample.Value,
Tags: sample.Tags,
Name: sample.Metric.Name,
Contains: sample.Metric.Contains.String(),
}

m, _ := json.Marshal(data)

p := make(map[string]interface{})
for key, value := range sample.Tags.CloneTags() {
p[key] = value
}

event := eh.NewEvent(m)
event.Properties = p

if c.config.BufferEnabled.Bool {
c.buffer = append(c.buffer, event)
} else {
c.client.Send(c.ctx, event)
}
}
}
}

func (c *Collector) pushMetrics() {
c.bufferLock.Lock()
if len(c.buffer) == 0 {
c.bufferLock.Unlock()
return
}
buffer := c.buffer
c.buffer = nil
c.bufferLock.Unlock()

fmt.Printf("pushing (%d)......\n", len(buffer))

c.client.SendBatch(c.ctx, eh.NewEventBatchIterator(buffer...))
}

func (c *Collector) finish() {
// Close when context is done

fmt.Printf("done (%d)......\n", len(c.buffer))

c.client.Close(c.ctx)
}

// GetRequiredSystemTags returns which sample tags are needed by this collector
func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet {
return stats.TagName | stats.TagMethod | stats.TagStatus | stats.TagError | stats.TagCheck | stats.TagGroup
}

// SetRunStatus Set run status
func (c *Collector) SetRunStatus(status lib.RunStatus) {
}
57 changes: 57 additions & 0 deletions stats/eventhubs/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2016 Load Impact
polatengin marked this conversation as resolved.
Show resolved Hide resolved
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package eventhubs

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/loadimpact/k6/stats"
)

func TestSampleToRow(t *testing.T) {
sample := &stats.Sample{
Time: time.Unix(1562324644, 0),
Metric: stats.New("my_metric", stats.Gauge),
Value: 1,
}

expected := HubEvent{
Time: time.Unix(1562324644, 0),
Value: 1,
Tags: (*stats.SampleTags)(nil),
Name: "my_metric",
Contains: "\"default\"",
}

t.Run("test", func(t *testing.T) {
row := HubEvent{
Time: sample.Time,
Value: sample.Value,
Tags: sample.Tags,
Name: sample.Metric.Name,
Contains: sample.Metric.Contains.String(),
}
assert.Equal(t, expected, row)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear what you're testing here... SampleToRow() is a CSV collector function, and here you're comparing two structs for equality, so I don't see how it could fail. 😕

In this file I would expect to see tests of the Collector methods. If you can't start a mock server to test against, consider abstracting away the c.client calls, e.g. by passing some interface you can mock out in the tests.

})
}
Loading