Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] start maintaining an internal count of time series produced per metric #246

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (s *Server) FlushGlobal(ctx context.Context) {
// and global counters
ms.totalLength += ms.totalSets
ms.totalLength += ms.totalGlobalCounters
ms.totalLength += ms.totalCardinalityCounts

finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)

Expand Down Expand Up @@ -134,6 +135,8 @@ type metricsSummary struct {
totalLocalSets int
totalLocalTimers int

totalCardinalityCounts int

totalLength int
}

Expand Down Expand Up @@ -165,6 +168,8 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu
ms.totalLocalHistograms += len(wm.localHistograms)
ms.totalLocalSets += len(wm.localSets)
ms.totalLocalTimers += len(wm.localTimers)

ms.totalCardinalityCounts += len(wm.cardinality.MetricCardinality)
}

s.Statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(gatherStart).Nanoseconds()), []string{"part:gather"}, 1.0)
Expand Down Expand Up @@ -230,6 +235,9 @@ func (s *Server) generateDDMetrics(ctx context.Context, percentiles []float64, t
finalMetrics = append(finalMetrics, s.Flush()...)
}

// cardinality counts also have no local parts to flush
finalMetrics = append(finalMetrics, wm.cardinality.Flush()...)

// also do this for global counters
// global counters have no local parts, so if we're a local veneur,
// there's nothing to flush
Expand Down
97 changes: 97 additions & 0 deletions samplers/samplers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package samplers
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"hash/fnv"
"log"
"math"
"strings"
"time"
Expand Down Expand Up @@ -72,6 +74,101 @@ type JSONMetric struct {
Value []byte `json:"value"`
}

const CardinalityCountName = "cardinality.count"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend adding comments explaining the difference here

const CardinalityCountType = "cardinality_count"

// CardinalityCount is a sampler that records an approximate cardinality count
// by metric name.
type CardinalityCount struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's really cool that you implemented this as a sampler!

MetricCardinality map[string]*hyperloglog.HyperLogLogPlus
}

func NewCardinalityCount() *CardinalityCount {
return &CardinalityCount{make(map[string]*hyperloglog.HyperLogLogPlus)}
}

// Sample adds a measurement to the cardinality counter. It makes the
// assumption that no two metrics with different types share a name (ie, name
// is unique across all types)
func (cc *CardinalityCount) Sample(name string, joinedTags string) {
if _, present := cc.MetricCardinality[name]; !present {
hll, _ := hyperloglog.NewPlus(18)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extract 18 into a const? we can keep it unexported for now

cc.MetricCardinality[name] = hll
}
hasher := fnv.New64a()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UDPMetric has a Digest wherein we've already computed the hash of tags + type + name. Rather than rehashing you can just use that. Then your function can be Sample(name string, digest uint32), your map keyed by the digest and you save a hash operation.

hasher.Write([]byte(joinedTags))
cc.MetricCardinality[name].Add(hasher)
}

// Export groups the CardinalityCount struct into sub structs, grouped by what
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment about 'sub strict'

// their metric consistently hashes to
func (cc *CardinalityCount) ExportSets() ([]JSONMetric, error) {
jsonMetrics := make([]JSONMetric, 0, len(cc.MetricCardinality))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Thank you for pre-make-ing! :D

for metricName, hll := range cc.MetricCardinality {

buf := new(bytes.Buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm not much of a fan of new, and it's generally avoided style-wise (for no particularly good reason) in Go projects.

In this case, the zero value would work if we then pass &buf everywhere; we could also do buf := &bytes.Buffer{}.

encoder := gob.NewEncoder(buf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We recently had #220 show up which got us into a bit of trouble because the transmission of this sort of data isn't versioned or anything. I get a bit worried when I see new gob-encoded things. This is probably not actionable now but something we should consider for the future. Should we version the import endpoint or something?


err := encoder.Encode(map[string]*hyperloglog.HyperLogLogPlus{metricName: hll})
if err != nil {
// TODO (kiran): do we return an array of metrics, instead?
log.Printf("failed to export cardinality count for metric name:%s, error:%v", metricName, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, we don't use logrus (and therefore our Sentry hooks) here. Ideally, this should go to Sentry, though that'll require a little more work to configure, since this is in a subpackage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from IRL: we're going to make a custom error type to return to the caller here, and have the caller emit to Sentry via logrus, instead

}

jm := JSONMetric{
MetricKey: MetricKey{
Name: CardinalityCountName,
Type: CardinalityCountType,
JoinedTags: metricName,
},
Tags: []string{metricName},
Value: buf.Bytes(),
}
jsonMetrics = append(jsonMetrics, jm)
}
// TODO (kiran): do we return an array of metrics, instead?
return jsonMetrics, nil
}

// Combine merges cardinality count structs exported from locals
func (cc *CardinalityCount) Combine(other []byte) error {
var decodedMap map[string]*hyperloglog.HyperLogLogPlus
buf := bytes.NewReader(other)
decoder := gob.NewDecoder(buf)

err := decoder.Decode(&decodedMap)
if err != nil {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return instead of panic?

}

otherCC := CardinalityCount{
MetricCardinality: decodedMap,
}
for name, hll := range otherCC.MetricCardinality {
if _, ok := cc.MetricCardinality[name]; ok {
cc.MetricCardinality[name].Merge(hll)
} else {
cc.MetricCardinality[name] = hll
}
}
return nil
}

// Flush emits the names + cardinality of all the metrics in the map
// TODO (kiran): investigate whether we'd want to emit only the top k metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also emit a counter here based on len(cc.MetricCardinality) to get the total # of unique time series seen?

func (cc *CardinalityCount) Flush() []DDMetric {
ddMetrics := make([]DDMetric, 0, len(cc.MetricCardinality))
for metricName, hll := range cc.MetricCardinality {
ddMetric := DDMetric{
Name: CardinalityCountName,
Value: [1][2]float64{{float64(time.Now().Unix()), float64(hll.Count())}},
Tags: []string{metricName},
}
ddMetrics = append(ddMetrics, ddMetric)
}
return ddMetrics
}

// Counter is an accumulator
type Counter struct {
Name string
Expand Down
52 changes: 52 additions & 0 deletions samplers/samplers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,58 @@ func TestCounterMerge(t *testing.T) {
assert.Equal(t, float64(3.8), metrics[0].Value[0][1])
}

func TestCardinality(t *testing.T) {
rand.Seed(time.Now().Unix())
c := NewCardinalityCount()
c.Sample("abc", "tag1tag2tag3")

for i := 0; i < 100; i++ {
c.Sample("abc", strconv.Itoa(rand.Int()))
}
assert.Equal(t, uint64(101), c.MetricCardinality["abc"].Count(), "counts did not match")
}

func TestCardinalityMerge(t *testing.T) {
rand.Seed(time.Now().Unix())

cc := NewCardinalityCount()
for i := 0; i < 100; i++ {
cc.Sample("abc", strconv.Itoa(rand.Int()))
}
for i := 0; i < 150; i++ {
cc.Sample("cde", strconv.Itoa(rand.Int()))
}
assert.Equal(t, uint64(100), cc.MetricCardinality["abc"].Count(), "counts did not match")
assert.Equal(t, uint64(150), cc.MetricCardinality["cde"].Count(), "counts did not match")

jsonMetrics, err := cc.ExportSets()
assert.NoError(t, err, "should have exported successfully")
assert.NotEmpty(t, jsonMetrics, "should have returned a value")
assert.NotNil(t, jsonMetrics[0], "Need a value")

cc2 := NewCardinalityCount()
for i := 0; i < 50; i++ {
cc2.Sample("abc", strconv.Itoa(rand.Int()))
}
for i := 0; i < 50; i++ {
cc2.Sample("cde", strconv.Itoa(rand.Int()))
}

for _, jm := range jsonMetrics {
assert.NoError(t, cc2.Combine(jm.Value), "should have combined successfully")
}

// // HLLs are approximate, and we've seen error of +-1 here in the past, so
// // we're giving the test some room for error to reduce flakes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract the margin of error into a constant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use InDelta here!

receivedCount := int(cc2.MetricCardinality["abc"].Count())
countDifference := 150 - int(receivedCount)
assert.True(t, -2 <= countDifference && countDifference <= 2, "counts did not match after merging (%d and %d)", 150, receivedCount)

receivedCount = int(cc2.MetricCardinality["cde"].Count())
countDifference = 200 - int(receivedCount)
assert.True(t, -2 <= countDifference && countDifference <= 2, "counts did not match after merging (%d and %d)", 200, receivedCount)
}

func TestGauge(t *testing.T) {
g := NewGauge("a.b.c", []string{"a:b"})

Expand Down
10 changes: 10 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type WorkerMetrics struct {
sets map[samplers.MetricKey]*samplers.Set
timers map[samplers.MetricKey]*samplers.Histo

// metametrics -- these store information about the metrics themselves
cardinality *samplers.CardinalityCount

// this is for counters which are globally aggregated
globalCounters map[samplers.MetricKey]*samplers.Counter

Expand All @@ -54,6 +57,7 @@ func NewWorkerMetrics() WorkerMetrics {
histograms: make(map[samplers.MetricKey]*samplers.Histo),
sets: make(map[samplers.MetricKey]*samplers.Set),
timers: make(map[samplers.MetricKey]*samplers.Histo),
cardinality: samplers.NewCardinalityCount(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity maybe note that we can't use a map[MetricKey]HLL here because we need a per-name map not a per-name, per-unique-tag map.

localHistograms: make(map[samplers.MetricKey]*samplers.Histo),
localSets: make(map[samplers.MetricKey]*samplers.Set),
localTimers: make(map[samplers.MetricKey]*samplers.Histo),
Expand Down Expand Up @@ -200,6 +204,8 @@ func (w *Worker) ProcessMetric(m *samplers.UDPMetric) {
default:
log.WithField("type", m.Type).Error("Unknown metric type for processing")
}
// record the metric's cardinality
w.wm.cardinality.Sample(m.MetricKey.Name, m.MetricKey.JoinedTags)
}

// ImportMetric receives a metric from another veneur instance
Expand Down Expand Up @@ -234,6 +240,10 @@ func (w *Worker) ImportMetric(other samplers.JSONMetric) {
if err := w.wm.timers[other.MetricKey].Combine(other.Value); err != nil {
log.WithError(err).Error("Could not merge timers")
}
case samplers.CardinalityCountType:
if err := w.wm.cardinality.Combine(other.Value); err != nil {
log.WithError(err).Error("Could not merge cardinality")
}
default:
log.WithField("type", other.Type).Error("Unknown metric type for importing")
}
Expand Down
24 changes: 24 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package veneur

import (
"math/rand"
"strconv"
"testing"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -78,3 +80,25 @@ func TestWorkerImportHistogram(t *testing.T) {
wm := w.Flush()
assert.Len(t, wm.histograms, 1, "number of flushed histograms")
}

func TestWorkerImportCardinality(t *testing.T) {
w := NewWorker(1, nil, logrus.New())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can actually do log here instead of logrus.New(), because it will pick up the global one initialized in server.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooh yes


cc := samplers.NewCardinalityCount()
for i := 0; i < 100; i++ {
cc.Sample("abc", strconv.Itoa(rand.Int()))
}
for i := 0; i < 150; i++ {
cc.Sample("cde", strconv.Itoa(rand.Int()))
}

jsonMetrics, err := cc.ExportSets()
assert.NoError(t, err, "should have exported successfully")

for _, jm := range jsonMetrics {
w.ImportMetric(jm)
}

wm := w.Flush()
assert.Len(t, wm.cardinality.MetricCardinality, 2, "number of flushed outputs")
}