Skip to content

Commit

Permalink
Metric histogram aggregator: Swap in SynchronizedMove to avoid alloca…
Browse files Browse the repository at this point in the history
…tions (open-telemetry#1435)

* Move emptyState() allocations outside lock

* Add more testing

* Re-comment; add CHANGELOG

* Add CHANGELOG PR number

* Update CHANGELOG.md

Co-authored-by: Sam Xie <[email protected]>

Co-authored-by: Bogdan Drutu <[email protected]>
Co-authored-by: Sam Xie <[email protected]>
Co-authored-by: Anthony Mirabella <[email protected]>
  • Loading branch information
4 people authored Jan 14, 2021
1 parent c29c6fd commit 207587b
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369)
- Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369)
- Unify endpoint API that related to OTel exporter. (#1401)
- Optimize metric histogram aggregator to re-use its slice of buckets. (#1435)
- Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430)
- `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432)
- Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420)
Expand Down
36 changes: 29 additions & 7 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type (
lock sync.Mutex
boundaries []float64
kind number.Kind
state state
state *state
}

// state represents the state of a histogram, consisting of
Expand Down Expand Up @@ -78,8 +78,8 @@ func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator {
aggs[i] = Aggregator{
kind: desc.NumberKind(),
boundaries: sortedBoundaries,
state: emptyState(sortedBoundaries),
}
aggs[i].state = aggs[i].newState()
}
return aggs
}
Expand Down Expand Up @@ -123,20 +123,42 @@ func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descrip
return aggregator.NewInconsistentAggregatorError(c, oa)
}

if o != nil {
// Swap case: This is the ordinary case for a
// synchronous instrument, where the SDK allocates two
// Aggregators and lock contention is anticipated.
// Reset the target state before swapping it under the
// lock below.
o.clearState()
}

c.lock.Lock()
if o != nil {
o.state = c.state
c.state, o.state = o.state, c.state
} else {
// No swap case: This is the ordinary case for an
// asynchronous instrument, where the SDK allocates a
// single Aggregator and there is no anticipated lock
// contention.
c.clearState()
}
c.state = emptyState(c.boundaries)
c.lock.Unlock()

return nil
}

func emptyState(boundaries []float64) state {
return state{
bucketCounts: make([]uint64, len(boundaries)+1),
func (c *Aggregator) newState() *state {
return &state{
bucketCounts: make([]uint64, len(c.boundaries)+1),
}
}

func (c *Aggregator) clearState() {
for i := range c.state.bucketCounts {
c.state.bucketCounts[i] = 0
}
c.state.sum = 0
c.state.count = 0
}

// Update adds the recorded measurement to the current data set.
Expand Down
108 changes: 46 additions & 62 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,42 +115,23 @@ func testHistogram(t *testing.T, profile aggregatortest.Profile, policy policy)

agg, ckpt := new2(descriptor)

all := aggregatortest.NewNumbers(profile.NumberKind)

for i := 0; i < count; i++ {
x := profile.Random(policy.sign())
all.Append(x)
aggregatortest.CheckedUpdate(t, agg, x, descriptor)
}

require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

checkZero(t, agg, descriptor)

all.Sort()

asum, err := ckpt.Sum()
sum := all.Sum()
require.InEpsilon(t,
sum.CoerceToFloat64(profile.NumberKind),
asum.CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - "+policy.name)
require.NoError(t, err)
// This needs to repeat at least 3 times to uncover a failure to reset
// for the overall sum and count fields, since the third time through
// is the first time a `histogram.state` object is reused.
for repeat := 0; repeat < 3; repeat++ {
all := aggregatortest.NewNumbers(profile.NumberKind)

count, err := ckpt.Count()
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.NoError(t, err)
for i := 0; i < count; i++ {
x := profile.Random(policy.sign())
all.Append(x)
aggregatortest.CheckedUpdate(t, agg, x, descriptor)
}

buckets, err := ckpt.Histogram()
require.NoError(t, err)
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
checkZero(t, agg, descriptor)

counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := uint64(buckets.Counts[i])
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
checkHistogram(t, all, profile, ckpt)
}
}

Expand Down Expand Up @@ -191,31 +172,7 @@ func TestHistogramMerge(t *testing.T) {

aggregatortest.CheckedMerge(t, ckpt1, ckpt2, descriptor)

all.Sort()

asum, err := ckpt1.Sum()
sum := all.Sum()
require.InEpsilon(t,
sum.CoerceToFloat64(profile.NumberKind),
asum.CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - absolute")
require.NoError(t, err)

count, err := ckpt1.Count()
require.Equal(t, all.Count(), count, "Same count - absolute")
require.NoError(t, err)

buckets, err := ckpt1.Histogram()
require.NoError(t, err)

require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")

counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := uint64(buckets.Counts[i])
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
}
checkHistogram(t, all, profile, ckpt1)
})
}

Expand All @@ -233,22 +190,49 @@ func TestHistogramNotSet(t *testing.T) {
})
}

func calcBuckets(points []number.Number, profile aggregatortest.Profile) []uint64 {
sortedBoundaries := make([]float64, len(boundaries))
// checkHistogram ensures the correct aggregated state between `all`
// (test aggregator) and `agg` (code under test).
func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregatortest.Profile, agg *histogram.Aggregator) {

all.Sort()

asum, err := agg.Sum()
require.NoError(t, err)

sum := all.Sum()
require.InEpsilon(t,
sum.CoerceToFloat64(profile.NumberKind),
asum.CoerceToFloat64(profile.NumberKind),
0.000000001)

count, err := agg.Count()
require.NoError(t, err)
require.Equal(t, all.Count(), count)

buckets, err := agg.Histogram()
require.NoError(t, err)

require.Equal(t, len(buckets.Counts), len(boundaries)+1,
"There should be b + 1 counts, where b is the number of boundaries")

sortedBoundaries := make([]float64, len(boundaries))
copy(sortedBoundaries, boundaries)
sort.Float64s(sortedBoundaries)

require.EqualValues(t, sortedBoundaries, buckets.Boundaries)

counts := make([]uint64, len(sortedBoundaries)+1)
idx := 0
for _, p := range points {
for _, p := range all.Points() {
for idx < len(sortedBoundaries) && p.CoerceToFloat64(profile.NumberKind) >= sortedBoundaries[idx] {
idx++
}
counts[idx]++
}

return counts
for i, v := range counts {
bCount := uint64(buckets.Counts[i])
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
}
}

func TestSynchronizedMoveReset(t *testing.T) {
Expand Down

0 comments on commit 207587b

Please sign in to comment.