Skip to content

Commit

Permalink
Merge pull request #2 from greenbits/add-envar-overrides
Browse files Browse the repository at this point in the history
Switch back to kingpin for arg parsing
  • Loading branch information
Fiveside authored Apr 8, 2019
2 parents 74aa609 + ebca579 commit 66ede95
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 50 deletions.
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ module github.com/travisjeffery/kafka-statsd

require (
github.com/Shopify/sarama v1.21.0
github.com/acroca/go-symbols v0.1.1 // indirect
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/pkg/errors v0.8.1
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect
github.com/segmentio/go-log v1.9.0
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a
golang.org/x/tools v0.0.0-20190402160749-c5ac96b4c419 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
github.com/stretchr/testify v1.3.0 // indirect
gopkg.in/alexcesaro/statsd.v2 v2.0.0
)
27 changes: 11 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14=
github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/sarama v1.21.0 h1:0GKs+e8mn1RRUzfg9oUXv3v7ZieQLmOZF/bfnmmGhM8=
github.com/Shopify/sarama v1.21.0/go.mod h1:yuqtN/pe8cXRWG5zPaO7hCfNJp5MwmkoJEoLjkm5tCQ=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/acroca/go-symbols v0.1.1 h1:q3IzaMNYocw/Bnc2a8jkXf0hM3+POfLoq30x8HYuaPE=
github.com/acroca/go-symbols v0.1.1/go.mod h1:RKAIDWtcELAw6/wjNJGWRYZ7QEinSWoJeJ2H5cfK6AM=
github.com/alecthomas/kingpin v2.2.6+incompatible h1:5svnBTFgJjZvGKyYBtMB0+m5wvrbUHiqye8wRJMlnYI=
github.com/alecthomas/kingpin v2.2.6+incompatible/go.mod h1:59OFYbFVLKQKq+mqrL6Rw5bR0c3ACQaawgXx0QYndlE=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
Expand All @@ -23,21 +25,14 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314 h1:86XpVGN4oVnVheHik6ioWg+1fOnWu1GgyNzV6cr2ifs=
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/segmentio/go-log v1.9.0 h1:1dhZF9aIsQG3sK2To2l41bWcipIqPa+mXMEvc0Yx9zk=
github.com/segmentio/go-log v1.9.0/go.mod h1:OaWNxWOTRpA/fIXTfr2FmimINNi+0MyArbeI1FLjOkY=
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a h1:ILoU84rj4AQ3q6cjQvtb9jBjx4xzR/Riq/zYhmDQiOk=
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a/go.mod h1:vQQATAGxVK20DC1rRubTJbZDDhhpA4QfU02pMdPxGO4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190402160749-c5ac96b4c419 h1:54qTyKL5qb43D6hmmljebk91gCft2gy9eSyTMoS3NSg=
golang.org/x/tools v0.0.0-20190402160749-c5ac96b4c419/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=
96 changes: 68 additions & 28 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,76 @@
package main

import (
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"time"

// "os"
// "os/signal"
"strings"
"sync"

// "time"
"github.com/pkg/errors"

"github.com/Shopify/sarama"
"github.com/quipo/statsd"
"github.com/alecthomas/kingpin"
"github.com/pkg/errors"
"github.com/segmentio/go-log"
// "github.com/wvanbergen/kazoo-go"
"gopkg.in/alexcesaro/statsd.v2"
)

var (
brokers = flag.String("brokers", "127.0.0.1:12008", "Kafka addresses (e.g. host1:9092,host2:9092)")
statsdAddr = flag.String("statsd-addr", "127.0.0.1:8125", "Statsd address")
statsdPrefix = flag.String("statsd-prefix", "kafka.", "Statsd prefix")
interval = flag.Int("refresh-interval", 5, "Interval to refresh offset lag in seconds")
useTags = flag.Bool("use-tags", true, "Use tags if your StatsD client supports them (like DataDog and InfluxDB)")
brokers = kingpin.Flag("broker", "A kafka broker to connect to. Specify multiple times for multiple brokers. (e.g. host1:9092)").HintOptions("host1:9092").Short('b').Envar("KSTATSD_BROKERS").Required().Strings()
statsdAddr = kingpin.Flag("statsd-addr", "Statsd address").Short('s').Default("127.0.0.1").Envar("KSTATSD_STATSD_ADDR").String()
statsdPort = kingpin.Flag("statsd-port", "Statsd port").Short('P').Default("8125").Envar("KSTATSD_STATSD_PORT").String()
statsdPrefix = kingpin.Flag("statsd-prefix", "Statsd prefix").Short('p').Envar("KSTATSD_STATSD_PREFIX").String()
interval = kingpin.Flag("refresh-interval", "Interval to refresh offset lag in seconds").Short('i').Default("5").Envar("KSTATSD_INTERVAL").Int()
tagType = kingpin.Flag("tag-format", "Format to use when encoding tags (Options: none, influxdb, datadog)").HintOptions(statsdTagOptionsEnum()...).Default("none").Envar("KSTATSD_USE_TAGS").Enum(statsdTagOptionsEnum()...)
includeTags = kingpin.Flag("tag", "Tags to include. Specify multiple times for multiple tags. (e.g. tagname:value)").HintOptions("tagname:value").Envar("KSTATSD_TAGS").Strings()
)

var statsdTagFormat = map[string]statsd.TagFormat{
"influxdb": statsd.InfluxDB,
"datadog": statsd.Datadog,
"none": 0,
}

func statsdTagOptionsEnum() []string {
res := make([]string, 0, len(statsdTagFormat))
for k := range statsdTagFormat {
res = append(res, k)
}
return res
}

func newStatsdClient() (*statsd.Client, error) {
tags := make([]string, 0, len(*includeTags)*2)
for _, tag := range *includeTags {
splitTag := strings.SplitN(tag, ":", 2)
tags = append(tags, splitTag...)
}

opts := []statsd.Option{
statsd.Address(strings.Join([]string{*statsdAddr, *statsdPort}, ":")),
statsd.ErrorHandler(func(err error) {
log.Error("Statsd error: %s", err)
}),
}

if *statsdPrefix != "" {
opts = append(opts, statsd.Prefix(*statsdPrefix))
}

if len(tags) > 0 {
opts = append(opts, statsd.Tags(tags...))
}

tagFormat := statsdTagFormat[*tagType]
if tagFormat != 0 {
opts = append(opts, statsd.TagsFormat(tagFormat))
}

return statsd.New(opts...)
}

type ClusterState struct {
// List of consumer groups that exist in the cluster
ConsumerGroups []string
Expand All @@ -38,20 +80,16 @@ type ClusterState struct {
}

func main() {
flag.Parse()
kingpin.Parse()

statsdClient := statsd.NewStatsdClient(*statsdAddr, *statsdPrefix)
err := statsdClient.CreateSocket()
statsdClient, err := newStatsdClient()
if err != nil {
log.Error("Error creating statsd client: %s", err)
return
}
stats := statsd.NewStatsdBuffer(time.Second, statsdClient)
defer stats.Close()
defer statsdClient.Close()

brokerList := strings.Split(*brokers, ",")

client, err := sarama.NewClient(brokerList, nil)
client, err := sarama.NewClient(*brokers, nil)
if err != nil {
log.Error("Error connecting to Kafka (client): %s", err)
return
Expand All @@ -60,7 +98,7 @@ func main() {

config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0
admin, err := sarama.NewClusterAdmin(brokerList, config)
admin, err := sarama.NewClusterAdmin(*brokers, config)
if err != nil {
log.Error("Error connecting to Kafka (admin): %s", err)
return
Expand Down Expand Up @@ -114,12 +152,14 @@ func main() {

lag := offset - partitionOffsets[partitionID]

if *useTags {
var tags []string
tags = append(tags, "topic="+topic)
tags = append(tags, fmt.Sprintf("partition=%d", partitionID))
tags = append(tags, "consumer_group="+cg)
stats.Gauge(fmt.Sprintf("consumer_lag,%s", strings.Join(tags, ",")), lag)
stats := statsdClient.Clone(
statsd.Tags("topic", topic),
statsd.Tags("partition", strconv.FormatInt(int64(partitionID), 10)),
statsd.Tags("consumer_group", cg),
)

if *tagType != "none" {
stats.Gauge("consumer_lag", lag)
} else {
stats.Gauge(fmt.Sprintf("topic.%s.partition.%d.consumer_group.%s.lag", topic, partitionID, cg), lag)
}
Expand Down

0 comments on commit 66ede95

Please sign in to comment.