diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 107d7b7fff4..e627ccc5a1f 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -63,6 +63,20 @@ type ( Points() ([]core.Number, error) } + // Buckets represents histogram buckets boundaries and counts. + // + // For a Histogram with N defined boundaries, e.g, [x, y, z]. + // There are N+1 counts: [-inf, x), [x, y), [y, z), [z, +inf] + Buckets struct { + Boundaries []core.Number + Counts []core.Number + } + + // Histogram returns the count of events in pre-determined buckets. + Histogram interface { + Histogram() (Buckets, error) + } + // MinMaxSumCount supports the Min, Max, Sum, and Count interfaces. MinMaxSumCount interface { Min diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go new file mode 100644 index 00000000000..652d46072d6 --- /dev/null +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -0,0 +1,184 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram // import "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" + +import ( + "context" + "sort" + + "go.opentelemetry.io/otel/api/core" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" +) + +type ( + // Aggregator observe events and counts them in pre-determined buckets. + // It also calculates the sum and count of all events. + Aggregator struct { + // state needs to be aligned for 64-bit atomic operations. + current state + // checkpoint needs to be aligned for 64-bit atomic operations. + checkpoint state + boundaries []core.Number + kind core.NumberKind + } + + // state represents the state of a histogram, consisting of + // the sum and counts for all observed values and + // the less than equal bucket count for the pre-determined boundaries. + state struct { + // all fields have to be aligned for 64-bit atomic operations. + buckets aggregator.Buckets + count core.Number + sum core.Number + } +) + +var _ export.Aggregator = &Aggregator{} +var _ aggregator.Sum = &Aggregator{} +var _ aggregator.Count = &Aggregator{} +var _ aggregator.Histogram = &Aggregator{} + +// New returns a new measure aggregator for computing Histograms. +// +// A Histogram observe events and counts them in pre-defined buckets. +// And also provides the total sum and count of all observations. +// +// Note that this aggregator maintains each value using independent +// atomic operations, which introduces the possibility that +// checkpoints are inconsistent. +func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator { + // Boundaries MUST be ordered otherwise the histogram could not + // be properly computed. + sortedBoundaries := numbers{ + numbers: make([]core.Number, len(boundaries)), + kind: desc.NumberKind(), + } + + copy(sortedBoundaries.numbers, boundaries) + sort.Sort(&sortedBoundaries) + boundaries = sortedBoundaries.numbers + + agg := Aggregator{ + kind: desc.NumberKind(), + boundaries: boundaries, + current: state{ + buckets: aggregator.Buckets{ + Boundaries: boundaries, + Counts: make([]core.Number, len(boundaries)+1), + }, + }, + checkpoint: state{ + buckets: aggregator.Buckets{ + Boundaries: boundaries, + Counts: make([]core.Number, len(boundaries)+1), + }, + }, + } + return &agg +} + +// Sum returns the sum of all values in the checkpoint. +func (c *Aggregator) Sum() (core.Number, error) { + return c.checkpoint.sum, nil +} + +// Count returns the number of values in the checkpoint. +func (c *Aggregator) Count() (int64, error) { + return int64(c.checkpoint.count.AsUint64()), nil +} + +// Histogram returns the count of events in pre-determined buckets. +func (c *Aggregator) Histogram() (aggregator.Buckets, error) { + return c.checkpoint.buckets, nil +} + +// Checkpoint saves the current state and resets the current state to +// the empty set. Since no locks are taken, there is a chance that +// the independent Sum, Count and Bucket Count are not consistent with each +// other. +func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { + // N.B. There is no atomic operation that can update all three + // values at once without a memory allocation. + // + // This aggregator is intended to trade this correctness for + // speed. + // + // Therefore, atomically swap fields independently, knowing + // that individually the three parts of this aggregation could + // be spread across multiple collections in rare cases. + + c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0)) + c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0)) + + for i := 0; i < len(c.checkpoint.buckets.Counts); i++ { + c.checkpoint.buckets.Counts[i].SetUint64(c.current.buckets.Counts[i].SwapUint64Atomic(0)) + } +} + +// Update adds the recorded measurement to the current data set. +func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { + kind := desc.NumberKind() + + c.current.count.AddUint64Atomic(1) + c.current.sum.AddNumberAtomic(kind, number) + + for i, boundary := range c.boundaries { + if number.CompareNumber(kind, boundary) < 0 { + c.current.buckets.Counts[i].AddUint64Atomic(1) + return nil + } + } + + // Observed event is bigger than all defined boundaries. + c.current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1) + return nil +} + +// Merge combines two data sets into one. +func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error { + o, _ := oa.(*Aggregator) + if o == nil { + return aggregator.NewInconsistentMergeError(c, oa) + } + + c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum) + c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count) + + for i := 0; i < len(c.current.buckets.Counts); i++ { + c.checkpoint.buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.buckets.Counts[i]) + } + return nil +} + +// numbers is an auxiliary struct to order histogram bucket boundaries (slice of core.Number) +type numbers struct { + numbers []core.Number + kind core.NumberKind +} + +var _ sort.Interface = (*numbers)(nil) + +func (n *numbers) Len() int { + return len(n.numbers) +} + +func (n *numbers) Less(i, j int) bool { + return -1 == n.numbers[i].CompareNumber(n.kind, n.numbers[j]) +} + +func (n *numbers) Swap(i, j int) { + n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i] +} diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go new file mode 100644 index 00000000000..c70e6667be0 --- /dev/null +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -0,0 +1,259 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram + +import ( + "context" + "fmt" + "math" + "math/rand" + "os" + "sort" + "testing" + "unsafe" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/core" + ottest "go.opentelemetry.io/otel/internal/testing" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/test" +) + +const count = 100 + +type policy struct { + name string + absolute bool + sign func() int +} + +var ( + positiveOnly = policy{ + name: "absolute", + absolute: true, + sign: func() int { return +1 }, + } + negativeOnly = policy{ + name: "negative", + absolute: false, + sign: func() int { return -1 }, + } + positiveAndNegative = policy{ + name: "positiveAndNegative", + absolute: false, + sign: func() int { + if rand.Uint32() > math.MaxUint32/2 { + return -1 + } + return 1 + }, + } + + boundaries = map[core.NumberKind][]core.Number{ + core.Float64NumberKind: {core.NewFloat64Number(500), core.NewFloat64Number(250), core.NewFloat64Number(750)}, + core.Int64NumberKind: {core.NewInt64Number(500), core.NewInt64Number(250), core.NewInt64Number(750)}, + } +) + +// Ensure struct alignment prior to running tests. +func TestMain(m *testing.M) { + fields := []ottest.FieldOffset{ + { + Name: "Aggregator.current", + Offset: unsafe.Offsetof(Aggregator{}.current), + }, + { + Name: "Aggregator.checkpoint", + Offset: unsafe.Offsetof(Aggregator{}.checkpoint), + }, + { + Name: "state.buckets", + Offset: unsafe.Offsetof(state{}.buckets), + }, + { + Name: "state.sum", + Offset: unsafe.Offsetof(state{}.sum), + }, + { + Name: "state.count", + Offset: unsafe.Offsetof(state{}.count), + }, + } + fmt.Println(fields) + + if !ottest.Aligned8Byte(fields, os.Stderr) { + os.Exit(1) + } + + os.Exit(m.Run()) +} + +func TestHistogramAbsolute(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + histogram(t, profile, positiveOnly) + }) +} + +func TestHistogramNegativeOnly(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + histogram(t, profile, negativeOnly) + }) +} + +func TestHistogramPositiveAndNegative(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + histogram(t, profile, positiveAndNegative) + }) +} + +// Validates count, sum and buckets for a given profile and policy +func histogram(t *testing.T, profile test.Profile, policy policy) { + ctx := context.Background() + descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, !policy.absolute) + + agg := New(descriptor, boundaries[profile.NumberKind]) + + all := test.NewNumbers(profile.NumberKind) + + for i := 0; i < count; i++ { + x := profile.Random(policy.sign()) + all.Append(x) + test.CheckedUpdate(t, agg, x, descriptor) + } + + agg.Checkpoint(ctx, descriptor) + + all.Sort() + + asum, err := agg.Sum() + sum := all.Sum() + require.InEpsilon(t, + sum.CoerceToFloat64(profile.NumberKind), + asum.CoerceToFloat64(profile.NumberKind), + 0.000000001, + "Same sum - "+policy.name) + require.Nil(t, err) + + count, err := agg.Count() + require.Equal(t, all.Count(), count, "Same count -"+policy.name) + require.Nil(t, err) + + require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + + counts := calcBuckets(all.Points(), profile) + for i, v := range counts { + bCount := agg.checkpoint.buckets.Counts[i].AsUint64() + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts) + } +} + +func TestHistogramMerge(t *testing.T) { + ctx := context.Background() + + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) + + agg1 := New(descriptor, boundaries[profile.NumberKind]) + agg2 := New(descriptor, boundaries[profile.NumberKind]) + + all := test.NewNumbers(profile.NumberKind) + + for i := 0; i < count; i++ { + x := profile.Random(+1) + all.Append(x) + test.CheckedUpdate(t, agg1, x, descriptor) + } + for i := 0; i < count; i++ { + x := profile.Random(+1) + all.Append(x) + test.CheckedUpdate(t, agg2, x, descriptor) + } + + agg1.Checkpoint(ctx, descriptor) + agg2.Checkpoint(ctx, descriptor) + + test.CheckedMerge(t, agg1, agg2, descriptor) + + all.Sort() + + asum, err := agg1.Sum() + sum := all.Sum() + require.InEpsilon(t, + sum.CoerceToFloat64(profile.NumberKind), + asum.CoerceToFloat64(profile.NumberKind), + 0.000000001, + "Same sum - absolute") + require.Nil(t, err) + + count, err := agg1.Count() + require.Equal(t, all.Count(), count, "Same count - absolute") + require.Nil(t, err) + + require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + + counts := calcBuckets(all.Points(), profile) + for i, v := range counts { + bCount := agg1.checkpoint.buckets.Counts[i].AsUint64() + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts) + } + }) +} + +func TestHistogramNotSet(t *testing.T) { + ctx := context.Background() + + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) + + agg := New(descriptor, boundaries[profile.NumberKind]) + agg.Checkpoint(ctx, descriptor) + + asum, err := agg.Sum() + require.Equal(t, core.Number(0), asum, "Empty checkpoint sum = 0") + require.Nil(t, err) + + count, err := agg.Count() + require.Equal(t, int64(0), count, "Empty checkpoint count = 0") + require.Nil(t, err) + + require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + for i, bCount := range agg.checkpoint.buckets.Counts { + require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i) + } + }) +} + +func calcBuckets(points []core.Number, profile test.Profile) []uint64 { + sortedBoundaries := numbers{ + numbers: make([]core.Number, len(boundaries[profile.NumberKind])), + kind: profile.NumberKind, + } + + copy(sortedBoundaries.numbers, boundaries[profile.NumberKind]) + sort.Sort(&sortedBoundaries) + boundaries := sortedBoundaries.numbers + + counts := make([]uint64, len(boundaries)+1) + idx := 0 + for _, p := range points { + for idx < len(boundaries) && p.CompareNumber(profile.NumberKind, boundaries[idx]) != -1 { + idx++ + } + counts[idx]++ + } + + return counts +}