Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch back to kingpin for arg parsing #2

Merged
merged 4 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ module github.com/travisjeffery/kafka-statsd

require (
github.com/Shopify/sarama v1.21.0
github.com/acroca/go-symbols v0.1.1 // indirect
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/pkg/errors v0.8.1
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect
github.com/segmentio/go-log v1.9.0
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a
golang.org/x/tools v0.0.0-20190402160749-c5ac96b4c419 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
github.com/stretchr/testify v1.3.0 // indirect
gopkg.in/alexcesaro/statsd.v2 v2.0.0
)
27 changes: 11 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14=
github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/sarama v1.21.0 h1:0GKs+e8mn1RRUzfg9oUXv3v7ZieQLmOZF/bfnmmGhM8=
github.com/Shopify/sarama v1.21.0/go.mod h1:yuqtN/pe8cXRWG5zPaO7hCfNJp5MwmkoJEoLjkm5tCQ=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/acroca/go-symbols v0.1.1 h1:q3IzaMNYocw/Bnc2a8jkXf0hM3+POfLoq30x8HYuaPE=
github.com/acroca/go-symbols v0.1.1/go.mod h1:RKAIDWtcELAw6/wjNJGWRYZ7QEinSWoJeJ2H5cfK6AM=
github.com/alecthomas/kingpin v2.2.6+incompatible h1:5svnBTFgJjZvGKyYBtMB0+m5wvrbUHiqye8wRJMlnYI=
github.com/alecthomas/kingpin v2.2.6+incompatible/go.mod h1:59OFYbFVLKQKq+mqrL6Rw5bR0c3ACQaawgXx0QYndlE=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
Expand All @@ -23,21 +25,14 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314 h1:86XpVGN4oVnVheHik6ioWg+1fOnWu1GgyNzV6cr2ifs=
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/segmentio/go-log v1.9.0 h1:1dhZF9aIsQG3sK2To2l41bWcipIqPa+mXMEvc0Yx9zk=
github.com/segmentio/go-log v1.9.0/go.mod h1:OaWNxWOTRpA/fIXTfr2FmimINNi+0MyArbeI1FLjOkY=
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a h1:ILoU84rj4AQ3q6cjQvtb9jBjx4xzR/Riq/zYhmDQiOk=
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a/go.mod h1:vQQATAGxVK20DC1rRubTJbZDDhhpA4QfU02pMdPxGO4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190402160749-c5ac96b4c419 h1:54qTyKL5qb43D6hmmljebk91gCft2gy9eSyTMoS3NSg=
golang.org/x/tools v0.0.0-20190402160749-c5ac96b4c419/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=
96 changes: 68 additions & 28 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,76 @@
package main

import (
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"time"

// "os"
// "os/signal"
"strings"
"sync"

// "time"
"github.com/pkg/errors"

"github.com/Shopify/sarama"
"github.com/quipo/statsd"
"github.com/alecthomas/kingpin"
"github.com/pkg/errors"
"github.com/segmentio/go-log"
// "github.com/wvanbergen/kazoo-go"
"gopkg.in/alexcesaro/statsd.v2"
)

var (
brokers = flag.String("brokers", "127.0.0.1:12008", "Kafka addresses (e.g. host1:9092,host2:9092)")
statsdAddr = flag.String("statsd-addr", "127.0.0.1:8125", "Statsd address")
statsdPrefix = flag.String("statsd-prefix", "kafka.", "Statsd prefix")
interval = flag.Int("refresh-interval", 5, "Interval to refresh offset lag in seconds")
useTags = flag.Bool("use-tags", true, "Use tags if your StatsD client supports them (like DataDog and InfluxDB)")
brokers = kingpin.Flag("broker", "A kafka broker to connect to. Specify multiple times for multiple brokers. (e.g. host1:9092)").HintOptions("host1:9092").Short('b').Envar("KSTATSD_BROKERS").Required().Strings()
statsdAddr = kingpin.Flag("statsd-addr", "Statsd address").Short('s').Default("127.0.0.1").Envar("KSTATSD_STATSD_ADDR").String()
statsdPort = kingpin.Flag("statsd-port", "Statsd port").Short('P').Default("8125").Envar("KSTATSD_STATSD_PORT").String()
statsdPrefix = kingpin.Flag("statsd-prefix", "Statsd prefix").Short('p').Envar("KSTATSD_STATSD_PREFIX").String()
interval = kingpin.Flag("refresh-interval", "Interval to refresh offset lag in seconds").Short('i').Default("5").Envar("KSTATSD_INTERVAL").Int()
tagType = kingpin.Flag("tag-format", "Format to use when encoding tags (Options: none, influxdb, datadog)").HintOptions(statsdTagOptionsEnum()...).Default("none").Envar("KSTATSD_USE_TAGS").Enum(statsdTagOptionsEnum()...)
includeTags = kingpin.Flag("tag", "Tags to include. Specify multiple times for multiple tags. (e.g. tagname:value)").HintOptions("tagname:value").Envar("KSTATSD_TAGS").Strings()
)

var statsdTagFormat = map[string]statsd.TagFormat{
"influxdb": statsd.InfluxDB,
"datadog": statsd.Datadog,
"none": 0,
}

func statsdTagOptionsEnum() []string {
res := make([]string, 0, len(statsdTagFormat))
for k := range statsdTagFormat {
res = append(res, k)
}
return res
}

func newStatsdClient() (*statsd.Client, error) {
tags := make([]string, 0, len(*includeTags)*2)
for _, tag := range *includeTags {
splitTag := strings.SplitN(tag, ":", 2)
tags = append(tags, splitTag...)
}

opts := []statsd.Option{
statsd.Address(strings.Join([]string{*statsdAddr, *statsdPort}, ":")),
statsd.ErrorHandler(func(err error) {
log.Error("Statsd error: %s", err)
}),
}

if *statsdPrefix != "" {
opts = append(opts, statsd.Prefix(*statsdPrefix))
}

if len(tags) > 0 {
opts = append(opts, statsd.Tags(tags...))
}

tagFormat := statsdTagFormat[*tagType]
if tagFormat != 0 {
opts = append(opts, statsd.TagsFormat(tagFormat))
}

return statsd.New(opts...)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be statsd.New(), nil or does statsd.New return an (instance, error) itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statsd.New returns (instance, error)

}

type ClusterState struct {
// List of consumer groups that exist in the cluster
ConsumerGroups []string
Expand All @@ -38,20 +80,16 @@ type ClusterState struct {
}

func main() {
flag.Parse()
kingpin.Parse()

statsdClient := statsd.NewStatsdClient(*statsdAddr, *statsdPrefix)
err := statsdClient.CreateSocket()
statsdClient, err := newStatsdClient()
if err != nil {
log.Error("Error creating statsd client: %s", err)
return
}
stats := statsd.NewStatsdBuffer(time.Second, statsdClient)
defer stats.Close()
defer statsdClient.Close()

brokerList := strings.Split(*brokers, ",")

client, err := sarama.NewClient(brokerList, nil)
client, err := sarama.NewClient(*brokers, nil)
if err != nil {
log.Error("Error connecting to Kafka (client): %s", err)
return
Expand All @@ -60,7 +98,7 @@ func main() {

config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0
admin, err := sarama.NewClusterAdmin(brokerList, config)
admin, err := sarama.NewClusterAdmin(*brokers, config)
if err != nil {
log.Error("Error connecting to Kafka (admin): %s", err)
return
Expand Down Expand Up @@ -114,12 +152,14 @@ func main() {

lag := offset - partitionOffsets[partitionID]

if *useTags {
var tags []string
tags = append(tags, "topic="+topic)
tags = append(tags, fmt.Sprintf("partition=%d", partitionID))
tags = append(tags, "consumer_group="+cg)
stats.Gauge(fmt.Sprintf("consumer_lag,%s", strings.Join(tags, ",")), lag)
stats := statsdClient.Clone(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Go garbage collect automatically?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, go is a garbage collected language. The cloned client also doesn't need to be .Close()d because when the main client is closed, all cloned clients are also closed.

statsd.Tags("topic", topic),
statsd.Tags("partition", strconv.FormatInt(int64(partitionID), 10)),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any risk of this throwing? Say partitionID is nil?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionID is an int32. Int32 isn't a pointer, so it can't be nil. This is just doing a int->str conversion.

statsd.Tags("consumer_group", cg),
)

if *tagType != "none" {
stats.Gauge("consumer_lag", lag)
} else {
stats.Gauge(fmt.Sprintf("topic.%s.partition.%d.consumer_group.%s.lag", topic, partitionID, cg), lag)
}
Expand Down