From 869368f6145f37a7a572a7f1004a33f149033c80 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Tue, 24 Dec 2019 00:14:36 -0300 Subject: [PATCH 01/14] histogram aggregator draft --- sdk/export/metric/aggregator/aggregator.go | 12 ++ sdk/metric/aggregator/histogram/histogram.go | 123 ++++++++++++ .../aggregator/histogram/histogram_test.go | 180 ++++++++++++++++++ 3 files changed, 315 insertions(+) create mode 100644 sdk/metric/aggregator/histogram/histogram.go create mode 100644 sdk/metric/aggregator/histogram/histogram_test.go diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 107d7b7fff4..4c64ba1811a 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -63,6 +63,18 @@ type ( Points() ([]core.Number, error) } + // Quantile returns an exact or estimated quantile over the + // set of values that were aggregated. + Histogram interface { + Buckets() (HistogramValue, error) + } + + HistogramValue struct { + Buckets []core.Number + Count core.Number + Sum core.Number + } + // MinMaxSumCount supports the Min, Max, Sum, and Count interfaces. MinMaxSumCount interface { Min diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go new file mode 100644 index 00000000000..e1b18d52206 --- /dev/null +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -0,0 +1,123 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram // import "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" + +import ( + "context" + + "go.opentelemetry.io/otel/api/core" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" +) + +type ( + // Aggregator aggregates measure events, keeping only the le, + // sum, and count. + Aggregator struct { + current aggregator.HistogramValue + checkpoint aggregator.HistogramValue + bounds []float64 + kind core.NumberKind + } +) + +var _ export.Aggregator = &Aggregator{} +var _ aggregator.Sum = &Aggregator{} +var _ aggregator.Count = &Aggregator{} +var _ aggregator.Histogram = &Aggregator{} + +// New returns a new measure aggregator for computing count, sum and buckets. +// +// Note that this aggregator maintains each value using independent +// atomic operations, which introduces the possibility that +// checkpoints are inconsistent. +func New(desc *export.Descriptor, bounds []float64) *Aggregator { + return &Aggregator{ + kind: desc.NumberKind(), + current: aggregator.HistogramValue{ + Buckets: make([]core.Number, len(bounds)+1), + }, + bounds: bounds, + } +} + +// Count returns the number of values in the checkpoint. +func (c *Aggregator) Sum() (core.Number, error) { + return c.checkpoint.Sum, nil +} + +// Count returns the number of values in the checkpoint. +func (c *Aggregator) Count() (int64, error) { + return int64(c.checkpoint.Count.AsUint64()), nil +} + +func (c *Aggregator) Buckets() (aggregator.HistogramValue, error) { + return c.checkpoint, nil +} + +// Checkpoint saves the current bucket and resets the current bucket to +// the empty set. Since no locks are taken, there is a chance that +// the independent Min, Max, Sum, and Count are not consistent with each +// other. +func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { + // N.B. There is no atomic operation that can update all three + // values at once without a memory allocation. + // + // This aggregator is intended to trade this correctness for + // speed. + // + // Therefore, atomically swap fields independently, knowing + // that individually the three parts of this aggregation could + // be spread across multiple collections in rare cases. + + c.checkpoint.Count.SetUint64(c.current.Count.SwapUint64Atomic(0)) + c.checkpoint.Sum = c.current.Sum.SwapNumberAtomic(core.Number(0)) + c.checkpoint.Buckets = c.current.Buckets + c.current.Buckets = make([]core.Number, len(c.bounds)+1) +} + +// Update adds the recorded measurement to the current data set. +func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { + kind := desc.NumberKind() + + c.current.Count.AddUint64Atomic(1) + c.current.Sum.AddNumberAtomic(kind, number) + + for i, boundary := range c.bounds { + if number.CoerceToFloat64(kind) <= boundary { + c.current.Buckets[i].AddUint64Atomic(1) + return nil + } + } + + c.current.Buckets[len(c.bounds)].AddUint64Atomic(1) + return nil +} + +// Merge combines two data sets into one. +func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error { + o, _ := oa.(*Aggregator) + if o == nil { + return aggregator.NewInconsistentMergeError(c, oa) + } + + c.checkpoint.Sum.AddNumber(desc.NumberKind(), o.checkpoint.Sum) + c.checkpoint.Count.AddNumber(core.Uint64NumberKind, o.checkpoint.Count) + + for i := 0; i < len(c.current.Buckets); i++ { + c.checkpoint.Buckets[i].AddNumber(desc.NumberKind(), o.checkpoint.Buckets[i]) + } + return nil +} diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go new file mode 100644 index 00000000000..21ea3e639df --- /dev/null +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -0,0 +1,180 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram + +import ( + "context" + "fmt" + "math" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/core" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/test" +) + +const count = 100 + +type policy struct { + name string + absolute bool + sign func() int +} + +var ( + positiveOnly = policy{ + name: "absolute", + absolute: true, + sign: func() int { return +1 }, + } + negativeOnly = policy{ + name: "negative", + absolute: false, + sign: func() int { return -1 }, + } + positiveAndNegative = policy{ + name: "positiveAndNegative", + absolute: false, + sign: func() int { + if rand.Uint32() > math.MaxUint32/2 { + return -1 + } + return 1 + }, + } +) + +func TestHistogramAbsolute(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + histogram(t, profile, positiveOnly) + }) +} + +func TestHistogramNegativeOnly(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + histogram(t, profile, negativeOnly) + }) +} + +func TestHistogramPositiveAndNegative(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + histogram(t, profile, positiveAndNegative) + }) +} + +// Validates count, sum and buckets for a given profile and policy +func histogram(t *testing.T, profile test.Profile, policy policy) { + ctx := context.Background() + descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, !policy.absolute) + + agg := New(descriptor, []float64{250, 500, 700}) + + all := test.NewNumbers(profile.NumberKind) + + for i := 0; i < count; i++ { + x := profile.Random(policy.sign()) + all.Append(x) + test.CheckedUpdate(t, agg, x, descriptor) + } + + agg.Checkpoint(ctx, descriptor) + + all.Sort() + + asum, err := agg.Sum() + require.InEpsilon(t, + all.Sum().CoerceToFloat64(profile.NumberKind), + asum.CoerceToFloat64(profile.NumberKind), + 0.000000001, + "Same sum - "+policy.name) + require.Nil(t, err) + + count, err := agg.Count() + require.Equal(t, all.Count(), count, "Same count -"+policy.name) + require.Nil(t, err) + + for _, p := range all.Points() { + fmt.Print(p.Emit(profile.NumberKind), " ") + } + fmt.Println() + fmt.Println(agg.checkpoint) +} + +func TestHistogramMerge(t *testing.T) { + ctx := context.Background() + + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) + + agg1 := New(descriptor, []float64{250, 500, 700}) + agg2 := New(descriptor, []float64{250, 500, 700}) + + all := test.NewNumbers(profile.NumberKind) + + for i := 0; i < count; i++ { + x := profile.Random(+1) + all.Append(x) + test.CheckedUpdate(t, agg1, x, descriptor) + } + for i := 0; i < count; i++ { + x := profile.Random(+1) + all.Append(x) + test.CheckedUpdate(t, agg2, x, descriptor) + } + + agg1.Checkpoint(ctx, descriptor) + agg2.Checkpoint(ctx, descriptor) + + test.CheckedMerge(t, agg1, agg2, descriptor) + + all.Sort() + + asum, err := agg1.Sum() + require.InEpsilon(t, + all.Sum().CoerceToFloat64(profile.NumberKind), + asum.CoerceToFloat64(profile.NumberKind), + 0.000000001, + "Same sum - absolute") + require.Nil(t, err) + + count, err := agg1.Count() + require.Equal(t, all.Count(), count, "Same count - absolute") + require.Nil(t, err) + + }) +} + +func TestHistogramNotSet(t *testing.T) { + ctx := context.Background() + + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) + + agg := New(descriptor, []float64{250, 500, 700}) + agg.Checkpoint(ctx, descriptor) + + asum, err := agg.Sum() + require.Equal(t, core.Number(0), asum, "Empty checkpoint sum = 0") + require.Nil(t, err) + + count, err := agg.Count() + require.Equal(t, int64(0), count, "Empty checkpoint count = 0") + require.Nil(t, err) + + }) +} From 784d9cdbf5645535432996f743f3767420ea5552 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Sat, 11 Jan 2020 00:34:00 -0300 Subject: [PATCH 02/14] add tests for buckets --- .../aggregator/histogram/histogram_test.go | 52 +++++++++++++++---- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 21ea3e639df..b8392442d11 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -16,7 +16,6 @@ package histogram import ( "context" - "fmt" "math" "math/rand" "testing" @@ -57,6 +56,8 @@ var ( return 1 }, } + + boundaries = []float64{250, 500, 700} ) func TestHistogramAbsolute(t *testing.T) { @@ -82,7 +83,7 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { ctx := context.Background() descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, !policy.absolute) - agg := New(descriptor, []float64{250, 500, 700}) + agg := New(descriptor, boundaries) all := test.NewNumbers(profile.NumberKind) @@ -97,8 +98,9 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { all.Sort() asum, err := agg.Sum() + sum := all.Sum() require.InEpsilon(t, - all.Sum().CoerceToFloat64(profile.NumberKind), + sum.CoerceToFloat64(profile.NumberKind), asum.CoerceToFloat64(profile.NumberKind), 0.000000001, "Same sum - "+policy.name) @@ -108,11 +110,13 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { require.Equal(t, all.Count(), count, "Same count -"+policy.name) require.Nil(t, err) - for _, p := range all.Points() { - fmt.Print(p.Emit(profile.NumberKind), " ") + require.Equal(t, len(agg.checkpoint.Buckets), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + + counts := calcBuckets(all.Points(), profile) + for i, v := range counts { + bCount := agg.checkpoint.Buckets[i].AsUint64() + require.Equal(t, v, bCount, "Wrong bucket #%d count", i) } - fmt.Println() - fmt.Println(agg.checkpoint) } func TestHistogramMerge(t *testing.T) { @@ -121,8 +125,8 @@ func TestHistogramMerge(t *testing.T) { test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) - agg1 := New(descriptor, []float64{250, 500, 700}) - agg2 := New(descriptor, []float64{250, 500, 700}) + agg1 := New(descriptor, boundaries) + agg2 := New(descriptor, boundaries) all := test.NewNumbers(profile.NumberKind) @@ -145,8 +149,9 @@ func TestHistogramMerge(t *testing.T) { all.Sort() asum, err := agg1.Sum() + sum := all.Sum() require.InEpsilon(t, - all.Sum().CoerceToFloat64(profile.NumberKind), + sum.CoerceToFloat64(profile.NumberKind), asum.CoerceToFloat64(profile.NumberKind), 0.000000001, "Same sum - absolute") @@ -156,6 +161,13 @@ func TestHistogramMerge(t *testing.T) { require.Equal(t, all.Count(), count, "Same count - absolute") require.Nil(t, err) + require.Equal(t, len(agg1.checkpoint.Buckets), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + + counts := calcBuckets(all.Points(), profile) + for i, v := range counts { + bCount := agg1.checkpoint.Buckets[i].AsUint64() + require.Equal(t, v, bCount, "Wrong bucket #%d count", i) + } }) } @@ -165,7 +177,7 @@ func TestHistogramNotSet(t *testing.T) { test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) - agg := New(descriptor, []float64{250, 500, 700}) + agg := New(descriptor, boundaries) agg.Checkpoint(ctx, descriptor) asum, err := agg.Sum() @@ -176,5 +188,23 @@ func TestHistogramNotSet(t *testing.T) { require.Equal(t, int64(0), count, "Empty checkpoint count = 0") require.Nil(t, err) + require.Equal(t, len(agg.checkpoint.Buckets), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + for i, bCount := range agg.checkpoint.Buckets { + require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i) + } }) } + +func calcBuckets(points []core.Number, profile test.Profile) []uint64 { + counts := make([]uint64, len(boundaries)+1) + idx := 0 + for _, p := range points { + v := p.CoerceToFloat64(profile.NumberKind) + if idx < len(boundaries) && v > boundaries[idx] { + idx++ + } + counts[idx]++ + } + + return counts +} From f26f8f035d539c735857dbb748a67bf135c71263 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Sat, 11 Jan 2020 00:56:53 -0300 Subject: [PATCH 03/14] naming stuffs --- sdk/export/metric/aggregator/aggregator.go | 9 ++------- sdk/metric/aggregator/histogram/histogram.go | 20 +++++++++++++------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 4c64ba1811a..58460695903 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" ) // These interfaces describe the various ways to access state from an @@ -66,13 +67,7 @@ type ( // Quantile returns an exact or estimated quantile over the // set of values that were aggregated. Histogram interface { - Buckets() (HistogramValue, error) - } - - HistogramValue struct { - Buckets []core.Number - Count core.Number - Sum core.Number + Histogram() (histogram.State, error) } // MinMaxSumCount supports the Min, Max, Sum, and Count interfaces. diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index e1b18d52206..1d013292577 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -23,14 +23,20 @@ import ( ) type ( - // Aggregator aggregates measure events, keeping only the le, - // sum, and count. + // Aggregator aggregates measure events and calculates + // sum, count and buckets count. Aggregator struct { - current aggregator.HistogramValue - checkpoint aggregator.HistogramValue + current State + checkpoint State bounds []float64 kind core.NumberKind } + + State struct { + Buckets []core.Number + Count core.Number + Sum core.Number + } ) var _ export.Aggregator = &Aggregator{} @@ -38,7 +44,7 @@ var _ aggregator.Sum = &Aggregator{} var _ aggregator.Count = &Aggregator{} var _ aggregator.Histogram = &Aggregator{} -// New returns a new measure aggregator for computing count, sum and buckets. +// New returns a new measure aggregator for computing count, sum and buckets count. // // Note that this aggregator maintains each value using independent // atomic operations, which introduces the possibility that @@ -46,7 +52,7 @@ var _ aggregator.Histogram = &Aggregator{} func New(desc *export.Descriptor, bounds []float64) *Aggregator { return &Aggregator{ kind: desc.NumberKind(), - current: aggregator.HistogramValue{ + current: State{ Buckets: make([]core.Number, len(bounds)+1), }, bounds: bounds, @@ -63,7 +69,7 @@ func (c *Aggregator) Count() (int64, error) { return int64(c.checkpoint.Count.AsUint64()), nil } -func (c *Aggregator) Buckets() (aggregator.HistogramValue, error) { +func (c *Aggregator) Histogram() (State, error) { return c.checkpoint, nil } From 7d3692375ef31ada4dc55b99d5999f71a3c1b653 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Sat, 11 Jan 2020 01:54:58 -0300 Subject: [PATCH 04/14] docs --- sdk/metric/aggregator/histogram/histogram.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 1d013292577..30fe40dfc0b 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -23,8 +23,8 @@ import ( ) type ( - // Aggregator aggregates measure events and calculates - // sum, count and buckets count. + // Aggregator observe events and counts them in pre-determined boundaries. + // It also calculates the sum and count of all events. Aggregator struct { current State checkpoint State @@ -32,6 +32,9 @@ type ( kind core.NumberKind } + // State represents the state of a histogram, consisting of + // the sum and counts for all observed values and + // the less than equal bucket count for the pre-determined boundaries. State struct { Buckets []core.Number Count core.Number @@ -73,9 +76,9 @@ func (c *Aggregator) Histogram() (State, error) { return c.checkpoint, nil } -// Checkpoint saves the current bucket and resets the current bucket to +// Checkpoint saves the current state and resets the current state to // the empty set. Since no locks are taken, there is a chance that -// the independent Min, Max, Sum, and Count are not consistent with each +// the independent Sum, Count and Bucket Count are not consistent with each // other. func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { // N.B. There is no atomic operation that can update all three @@ -90,8 +93,7 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { c.checkpoint.Count.SetUint64(c.current.Count.SwapUint64Atomic(0)) c.checkpoint.Sum = c.current.Sum.SwapNumberAtomic(core.Number(0)) - c.checkpoint.Buckets = c.current.Buckets - c.current.Buckets = make([]core.Number, len(c.bounds)+1) + c.checkpoint.Buckets, c.current.Buckets = c.current.Buckets, make([]core.Number, len(c.bounds)+1) } // Update adds the recorded measurement to the current data set. From 9504b9af5ed090b1ea180d3c8bc2515ad96ad279 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Thu, 16 Jan 2020 02:16:09 -0300 Subject: [PATCH 05/14] add tests for buckets --- sdk/export/metric/aggregator/aggregator.go | 15 +++-- sdk/metric/aggregator/histogram/histogram.go | 65 ++++++++++++------- .../aggregator/histogram/histogram_test.go | 17 +++-- 3 files changed, 60 insertions(+), 37 deletions(-) diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 58460695903..4bfe471ed0b 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -21,7 +21,6 @@ import ( "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" ) // These interfaces describe the various ways to access state from an @@ -64,10 +63,18 @@ type ( Points() ([]core.Number, error) } - // Quantile returns an exact or estimated quantile over the - // set of values that were aggregated. + // Buckets represents histogram buckets boundaries and counts. + // + // For a Histogram with N defined boundaries, e.g, [x, y, z]. + // There are N+1 counts: [-inf, x], (x, y], (y, z], (z, +inf) + Buckets struct { + Boundaries []core.Number + Counts []core.Number + } + + // Histogram returns the count of events in pre-determined buckets. Histogram interface { - Histogram() (histogram.State, error) + Histogram() (Buckets, error) } // MinMaxSumCount supports the Min, Max, Sum, and Count interfaces. diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 30fe40dfc0b..9d873809ed2 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -23,20 +23,20 @@ import ( ) type ( - // Aggregator observe events and counts them in pre-determined boundaries. + // Aggregator observe events and counts them in pre-determined buckets. // It also calculates the sum and count of all events. Aggregator struct { - current State - checkpoint State - bounds []float64 + current state + checkpoint state + boundaries []core.Number kind core.NumberKind } - // State represents the state of a histogram, consisting of + // state represents the state of a histogram, consisting of // the sum and counts for all observed values and // the less than equal bucket count for the pre-determined boundaries. - State struct { - Buckets []core.Number + state struct { + Buckets aggregator.Buckets Count core.Number Sum core.Number } @@ -47,22 +47,35 @@ var _ aggregator.Sum = &Aggregator{} var _ aggregator.Count = &Aggregator{} var _ aggregator.Histogram = &Aggregator{} -// New returns a new measure aggregator for computing count, sum and buckets count. +// New returns a new measure aggregator for computing Histograms. +// +// A Histogram observe events and counts them in pre-defined buckets. +// And also provides the total sum and count of all observations. // // Note that this aggregator maintains each value using independent // atomic operations, which introduces the possibility that // checkpoints are inconsistent. -func New(desc *export.Descriptor, bounds []float64) *Aggregator { - return &Aggregator{ - kind: desc.NumberKind(), - current: State{ - Buckets: make([]core.Number, len(bounds)+1), +func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator { + agg := Aggregator{ + kind: desc.NumberKind(), + boundaries: boundaries, + current: state{ + Buckets: aggregator.Buckets{ + Boundaries: boundaries, + Counts: make([]core.Number, len(boundaries)+1), + }, + }, + checkpoint: state{ + Buckets: aggregator.Buckets{ + Boundaries: boundaries, + Counts: make([]core.Number, len(boundaries)+1), + }, }, - bounds: bounds, } + return &agg } -// Count returns the number of values in the checkpoint. +// Sum returns the sum of all values in the checkpoint. func (c *Aggregator) Sum() (core.Number, error) { return c.checkpoint.Sum, nil } @@ -72,8 +85,8 @@ func (c *Aggregator) Count() (int64, error) { return int64(c.checkpoint.Count.AsUint64()), nil } -func (c *Aggregator) Histogram() (State, error) { - return c.checkpoint, nil +func (c *Aggregator) Histogram() (aggregator.Buckets, error) { + return c.checkpoint.Buckets, nil } // Checkpoint saves the current state and resets the current state to @@ -93,7 +106,10 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { c.checkpoint.Count.SetUint64(c.current.Count.SwapUint64Atomic(0)) c.checkpoint.Sum = c.current.Sum.SwapNumberAtomic(core.Number(0)) - c.checkpoint.Buckets, c.current.Buckets = c.current.Buckets, make([]core.Number, len(c.bounds)+1) + + for i := 0; i < len(c.checkpoint.Buckets.Counts); i++ { + c.checkpoint.Buckets.Counts[i].SetUint64(c.current.Buckets.Counts[i].SwapUint64Atomic(0)) + } } // Update adds the recorded measurement to the current data set. @@ -103,14 +119,15 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export. c.current.Count.AddUint64Atomic(1) c.current.Sum.AddNumberAtomic(kind, number) - for i, boundary := range c.bounds { - if number.CoerceToFloat64(kind) <= boundary { - c.current.Buckets[i].AddUint64Atomic(1) + for i, boundary := range c.boundaries { + if number.CompareNumber(kind, boundary) < 1 { + c.current.Buckets.Counts[i].AddUint64Atomic(1) return nil } } - c.current.Buckets[len(c.bounds)].AddUint64Atomic(1) + // Observed event is bigger than every boundary. + c.current.Buckets.Counts[len(c.boundaries)].AddUint64Atomic(1) return nil } @@ -124,8 +141,8 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error c.checkpoint.Sum.AddNumber(desc.NumberKind(), o.checkpoint.Sum) c.checkpoint.Count.AddNumber(core.Uint64NumberKind, o.checkpoint.Count) - for i := 0; i < len(c.current.Buckets); i++ { - c.checkpoint.Buckets[i].AddNumber(desc.NumberKind(), o.checkpoint.Buckets[i]) + for i := 0; i < len(c.current.Buckets.Counts); i++ { + c.checkpoint.Buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.Buckets.Counts[i]) } return nil } diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index b8392442d11..55ded7879e2 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -57,7 +57,7 @@ var ( }, } - boundaries = []float64{250, 500, 700} + boundaries = []core.Number{core.NewFloat64Number(250), core.NewFloat64Number(500), core.NewFloat64Number(700)} ) func TestHistogramAbsolute(t *testing.T) { @@ -110,11 +110,11 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { require.Equal(t, all.Count(), count, "Same count -"+policy.name) require.Nil(t, err) - require.Equal(t, len(agg.checkpoint.Buckets), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(agg.checkpoint.Buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") counts := calcBuckets(all.Points(), profile) for i, v := range counts { - bCount := agg.checkpoint.Buckets[i].AsUint64() + bCount := agg.checkpoint.Buckets.Counts[i].AsUint64() require.Equal(t, v, bCount, "Wrong bucket #%d count", i) } } @@ -161,11 +161,11 @@ func TestHistogramMerge(t *testing.T) { require.Equal(t, all.Count(), count, "Same count - absolute") require.Nil(t, err) - require.Equal(t, len(agg1.checkpoint.Buckets), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(agg1.checkpoint.Buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") counts := calcBuckets(all.Points(), profile) for i, v := range counts { - bCount := agg1.checkpoint.Buckets[i].AsUint64() + bCount := agg1.checkpoint.Buckets.Counts[i].AsUint64() require.Equal(t, v, bCount, "Wrong bucket #%d count", i) } }) @@ -188,8 +188,8 @@ func TestHistogramNotSet(t *testing.T) { require.Equal(t, int64(0), count, "Empty checkpoint count = 0") require.Nil(t, err) - require.Equal(t, len(agg.checkpoint.Buckets), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") - for i, bCount := range agg.checkpoint.Buckets { + require.Equal(t, len(agg.checkpoint.Buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + for i, bCount := range agg.checkpoint.Buckets.Counts { require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i) } }) @@ -199,8 +199,7 @@ func calcBuckets(points []core.Number, profile test.Profile) []uint64 { counts := make([]uint64, len(boundaries)+1) idx := 0 for _, p := range points { - v := p.CoerceToFloat64(profile.NumberKind) - if idx < len(boundaries) && v > boundaries[idx] { + if idx < len(boundaries) && p.CompareNumber(profile.NumberKind, boundaries[idx]) == 1 { idx++ } counts[idx]++ From 595a947b300ba46a419e79f0c268e0570b27d28e Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Thu, 16 Jan 2020 02:17:35 -0300 Subject: [PATCH 06/14] fix doc --- sdk/export/metric/aggregator/aggregator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 4bfe471ed0b..6a1b9c3cfb6 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -66,7 +66,7 @@ type ( // Buckets represents histogram buckets boundaries and counts. // // For a Histogram with N defined boundaries, e.g, [x, y, z]. - // There are N+1 counts: [-inf, x], (x, y], (y, z], (z, +inf) + // There are N+1 counts: [-inf, x], (x, y], (y, z], (z, +inf] Buckets struct { Boundaries []core.Number Counts []core.Number From a58785e505978cd9bf4350e15b24171a942e989a Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Thu, 16 Jan 2020 02:18:51 -0300 Subject: [PATCH 07/14] update year --- sdk/metric/aggregator/histogram/histogram.go | 2 +- sdk/metric/aggregator/histogram/histogram_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 9d873809ed2..7bd6407d78d 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -1,4 +1,4 @@ -// Copyright 2019, OpenTelemetry Authors +// Copyright 2020, OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 55ded7879e2..4c9934ca72d 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -1,4 +1,4 @@ -// Copyright 2019, OpenTelemetry Authors +// Copyright 2020, OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 0db4700e8c4fbd5ab5f31caceb2ae855513928ab Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Thu, 16 Jan 2020 02:26:18 -0300 Subject: [PATCH 08/14] adds docs for Histogram --- sdk/metric/aggregator/histogram/histogram.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 7bd6407d78d..c9c54c33561 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -85,6 +85,7 @@ func (c *Aggregator) Count() (int64, error) { return int64(c.checkpoint.Count.AsUint64()), nil } +// Histogram returns the count of events in pre-determined buckets. func (c *Aggregator) Histogram() (aggregator.Buckets, error) { return c.checkpoint.Buckets, nil } From 551cf20285edd939e863757dcc6d37683d58cc74 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Thu, 16 Jan 2020 02:28:52 -0300 Subject: [PATCH 09/14] docs for boundaries. --- sdk/metric/aggregator/histogram/histogram.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index c9c54c33561..4f6c9bb4ff7 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -52,6 +52,9 @@ var _ aggregator.Histogram = &Aggregator{} // A Histogram observe events and counts them in pre-defined buckets. // And also provides the total sum and count of all observations. // +// Boundaries MUST be ordered otherwise the histogram could not +// be properly computed. +// // Note that this aggregator maintains each value using independent // atomic operations, which introduces the possibility that // checkpoints are inconsistent. @@ -127,7 +130,7 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export. } } - // Observed event is bigger than every boundary. + // Observed event is bigger than all defined boundaries. c.current.Buckets.Counts[len(c.boundaries)].AddUint64Atomic(1) return nil } From fee0c3ee071eb6254e4600813cea30009df8d4b8 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Sun, 19 Jan 2020 22:28:38 -0300 Subject: [PATCH 10/14] addresses review comments Change to less-than buckets. Add offset checks. Unexport fields that don't need to be exported. Fix tests when running on profile with int64 number kind. --- sdk/export/metric/aggregator/aggregator.go | 2 +- sdk/metric/aggregator/histogram/histogram.go | 48 +++++++------ .../aggregator/histogram/histogram_test.go | 68 +++++++++++++++---- 3 files changed, 82 insertions(+), 36 deletions(-) diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 6a1b9c3cfb6..e627ccc5a1f 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -66,7 +66,7 @@ type ( // Buckets represents histogram buckets boundaries and counts. // // For a Histogram with N defined boundaries, e.g, [x, y, z]. - // There are N+1 counts: [-inf, x], (x, y], (y, z], (z, +inf] + // There are N+1 counts: [-inf, x), [x, y), [y, z), [z, +inf] Buckets struct { Boundaries []core.Number Counts []core.Number diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 4f6c9bb4ff7..9c321256864 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -26,7 +26,9 @@ type ( // Aggregator observe events and counts them in pre-determined buckets. // It also calculates the sum and count of all events. Aggregator struct { - current state + // state needs to be aligned for 64-bit atomic operations. + current state + // checkpoint needs to be aligned for 64-bit atomic operations. checkpoint state boundaries []core.Number kind core.NumberKind @@ -36,9 +38,11 @@ type ( // the sum and counts for all observed values and // the less than equal bucket count for the pre-determined boundaries. state struct { - Buckets aggregator.Buckets - Count core.Number - Sum core.Number + a uint8 + // all fields have to be aligned for 64-bit atomic operations. + buckets aggregator.Buckets + count core.Number + sum core.Number } ) @@ -63,13 +67,13 @@ func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator { kind: desc.NumberKind(), boundaries: boundaries, current: state{ - Buckets: aggregator.Buckets{ + buckets: aggregator.Buckets{ Boundaries: boundaries, Counts: make([]core.Number, len(boundaries)+1), }, }, checkpoint: state{ - Buckets: aggregator.Buckets{ + buckets: aggregator.Buckets{ Boundaries: boundaries, Counts: make([]core.Number, len(boundaries)+1), }, @@ -80,17 +84,17 @@ func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator { // Sum returns the sum of all values in the checkpoint. func (c *Aggregator) Sum() (core.Number, error) { - return c.checkpoint.Sum, nil + return c.checkpoint.sum, nil } // Count returns the number of values in the checkpoint. func (c *Aggregator) Count() (int64, error) { - return int64(c.checkpoint.Count.AsUint64()), nil + return int64(c.checkpoint.count.AsUint64()), nil } // Histogram returns the count of events in pre-determined buckets. func (c *Aggregator) Histogram() (aggregator.Buckets, error) { - return c.checkpoint.Buckets, nil + return c.checkpoint.buckets, nil } // Checkpoint saves the current state and resets the current state to @@ -108,11 +112,11 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { // that individually the three parts of this aggregation could // be spread across multiple collections in rare cases. - c.checkpoint.Count.SetUint64(c.current.Count.SwapUint64Atomic(0)) - c.checkpoint.Sum = c.current.Sum.SwapNumberAtomic(core.Number(0)) + c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0)) + c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0)) - for i := 0; i < len(c.checkpoint.Buckets.Counts); i++ { - c.checkpoint.Buckets.Counts[i].SetUint64(c.current.Buckets.Counts[i].SwapUint64Atomic(0)) + for i := 0; i < len(c.checkpoint.buckets.Counts); i++ { + c.checkpoint.buckets.Counts[i].SetUint64(c.current.buckets.Counts[i].SwapUint64Atomic(0)) } } @@ -120,18 +124,18 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { kind := desc.NumberKind() - c.current.Count.AddUint64Atomic(1) - c.current.Sum.AddNumberAtomic(kind, number) + c.current.count.AddUint64Atomic(1) + c.current.sum.AddNumberAtomic(kind, number) for i, boundary := range c.boundaries { - if number.CompareNumber(kind, boundary) < 1 { - c.current.Buckets.Counts[i].AddUint64Atomic(1) + if number.CompareNumber(kind, boundary) < 0 { + c.current.buckets.Counts[i].AddUint64Atomic(1) return nil } } // Observed event is bigger than all defined boundaries. - c.current.Buckets.Counts[len(c.boundaries)].AddUint64Atomic(1) + c.current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1) return nil } @@ -142,11 +146,11 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error return aggregator.NewInconsistentMergeError(c, oa) } - c.checkpoint.Sum.AddNumber(desc.NumberKind(), o.checkpoint.Sum) - c.checkpoint.Count.AddNumber(core.Uint64NumberKind, o.checkpoint.Count) + c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum) + c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count) - for i := 0; i < len(c.current.Buckets.Counts); i++ { - c.checkpoint.Buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.Buckets.Counts[i]) + for i := 0; i < len(c.current.buckets.Counts); i++ { + c.checkpoint.buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.buckets.Counts[i]) } return nil } diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 4c9934ca72d..4ed3ecdb344 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -16,11 +16,15 @@ package histogram import ( "context" + "fmt" "math" "math/rand" + "os" "testing" + "unsafe" "github.com/stretchr/testify/require" + ottest "go.opentelemetry.io/otel/internal/testing" "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" @@ -57,9 +61,47 @@ var ( }, } - boundaries = []core.Number{core.NewFloat64Number(250), core.NewFloat64Number(500), core.NewFloat64Number(700)} + boundaries = map[core.NumberKind][]core.Number{ + core.Float64NumberKind: {core.NewFloat64Number(250), core.NewFloat64Number(500), core.NewFloat64Number(750)}, + core.Int64NumberKind: {core.NewInt64Number(250), core.NewInt64Number(500), core.NewInt64Number(750)}, + } ) +// Ensure struct alignment prior to running tests. +func TestMain(m *testing.M) { + fields := []ottest.FieldOffset{ + { + Name: "Aggregator.current", + Offset: unsafe.Offsetof(Aggregator{}.current), + }, + { + Name: "Aggregator.checkpoint", + Offset: unsafe.Offsetof(Aggregator{}.checkpoint), + }, + { + Name: "state.buckets", + Offset: unsafe.Offsetof(state{}.buckets), + }, + { + Name: "state.sum", + Offset: unsafe.Offsetof(state{}.sum), + }, + { + Name: "state.count", + Offset: unsafe.Offsetof(state{}.count), + }, + } + fmt.Println(fields) + + if !ottest.Aligned8Byte(fields, os.Stderr) { + fmt.Println("QUEBROU") + + os.Exit(1) + } + + os.Exit(m.Run()) +} + func TestHistogramAbsolute(t *testing.T) { test.RunProfiles(t, func(t *testing.T, profile test.Profile) { histogram(t, profile, positiveOnly) @@ -83,7 +125,7 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { ctx := context.Background() descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, !policy.absolute) - agg := New(descriptor, boundaries) + agg := New(descriptor, boundaries[profile.NumberKind]) all := test.NewNumbers(profile.NumberKind) @@ -110,11 +152,11 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { require.Equal(t, all.Count(), count, "Same count -"+policy.name) require.Nil(t, err) - require.Equal(t, len(agg.checkpoint.Buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") counts := calcBuckets(all.Points(), profile) for i, v := range counts { - bCount := agg.checkpoint.Buckets.Counts[i].AsUint64() + bCount := agg.checkpoint.buckets.Counts[i].AsUint64() require.Equal(t, v, bCount, "Wrong bucket #%d count", i) } } @@ -125,8 +167,8 @@ func TestHistogramMerge(t *testing.T) { test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) - agg1 := New(descriptor, boundaries) - agg2 := New(descriptor, boundaries) + agg1 := New(descriptor, boundaries[profile.NumberKind]) + agg2 := New(descriptor, boundaries[profile.NumberKind]) all := test.NewNumbers(profile.NumberKind) @@ -161,11 +203,11 @@ func TestHistogramMerge(t *testing.T) { require.Equal(t, all.Count(), count, "Same count - absolute") require.Nil(t, err) - require.Equal(t, len(agg1.checkpoint.Buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") counts := calcBuckets(all.Points(), profile) for i, v := range counts { - bCount := agg1.checkpoint.Buckets.Counts[i].AsUint64() + bCount := agg1.checkpoint.buckets.Counts[i].AsUint64() require.Equal(t, v, bCount, "Wrong bucket #%d count", i) } }) @@ -177,7 +219,7 @@ func TestHistogramNotSet(t *testing.T) { test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) - agg := New(descriptor, boundaries) + agg := New(descriptor, boundaries[profile.NumberKind]) agg.Checkpoint(ctx, descriptor) asum, err := agg.Sum() @@ -188,18 +230,18 @@ func TestHistogramNotSet(t *testing.T) { require.Equal(t, int64(0), count, "Empty checkpoint count = 0") require.Nil(t, err) - require.Equal(t, len(agg.checkpoint.Buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") - for i, bCount := range agg.checkpoint.Buckets.Counts { + require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + for i, bCount := range agg.checkpoint.buckets.Counts { require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i) } }) } func calcBuckets(points []core.Number, profile test.Profile) []uint64 { - counts := make([]uint64, len(boundaries)+1) + counts := make([]uint64, len(boundaries[profile.NumberKind])+1) idx := 0 for _, p := range points { - if idx < len(boundaries) && p.CompareNumber(profile.NumberKind, boundaries[idx]) == 1 { + for idx < len(boundaries[profile.NumberKind]) && p.CompareNumber(profile.NumberKind, boundaries[profile.NumberKind][idx]) != -1 { idx++ } counts[idx]++ From 5741b831d4c69698930879d3344f09cb8883663d Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Sun, 19 Jan 2020 22:54:28 -0300 Subject: [PATCH 11/14] sort boundaries --- sdk/metric/aggregator/histogram/histogram.go | 35 +++++++++++++++++-- .../aggregator/histogram/histogram_test.go | 22 ++++++++---- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 9c321256864..069bcc837d0 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -16,6 +16,7 @@ package histogram // import "go.opentelemetry.io/otel/sdk/metric/aggregator/hist import ( "context" + "sort" "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" @@ -56,13 +57,21 @@ var _ aggregator.Histogram = &Aggregator{} // A Histogram observe events and counts them in pre-defined buckets. // And also provides the total sum and count of all observations. // -// Boundaries MUST be ordered otherwise the histogram could not -// be properly computed. -// // Note that this aggregator maintains each value using independent // atomic operations, which introduces the possibility that // checkpoints are inconsistent. func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator { + // Boundaries MUST be ordered otherwise the histogram could not + // be properly computed. + sortedBoundaries := numbers{ + numbers: make([]core.Number, len(boundaries)), + kind: desc.NumberKind(), + } + + copy(sortedBoundaries.numbers, boundaries) + sort.Sort(&sortedBoundaries) + boundaries = sortedBoundaries.numbers + agg := Aggregator{ kind: desc.NumberKind(), boundaries: boundaries, @@ -154,3 +163,23 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error } return nil } + +// numbers is an auxiliary struct to order histogram bucket boundaries (slice of core.Number) +type numbers struct { + numbers []core.Number + kind core.NumberKind +} + +var _ sort.Interface = (*numbers)(nil) + +func (n *numbers) Len() int { + return len(n.numbers) +} + +func (n *numbers) Less(i, j int) bool { + return -1 == n.numbers[i].CompareNumber(n.kind, n.numbers[j]) +} + +func (n *numbers) Swap(i, j int) { + n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i] +} diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 4ed3ecdb344..25ce7c22f44 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -20,6 +20,7 @@ import ( "math" "math/rand" "os" + "sort" "testing" "unsafe" @@ -62,8 +63,8 @@ var ( } boundaries = map[core.NumberKind][]core.Number{ - core.Float64NumberKind: {core.NewFloat64Number(250), core.NewFloat64Number(500), core.NewFloat64Number(750)}, - core.Int64NumberKind: {core.NewInt64Number(250), core.NewInt64Number(500), core.NewInt64Number(750)}, + core.Float64NumberKind: {core.NewFloat64Number(500), core.NewFloat64Number(250), core.NewFloat64Number(750)}, + core.Int64NumberKind: {core.NewInt64Number(500), core.NewInt64Number(250), core.NewInt64Number(750)}, } ) @@ -157,7 +158,7 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { counts := calcBuckets(all.Points(), profile) for i, v := range counts { bCount := agg.checkpoint.buckets.Counts[i].AsUint64() - require.Equal(t, v, bCount, "Wrong bucket #%d count", i) + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts) } } @@ -208,7 +209,7 @@ func TestHistogramMerge(t *testing.T) { counts := calcBuckets(all.Points(), profile) for i, v := range counts { bCount := agg1.checkpoint.buckets.Counts[i].AsUint64() - require.Equal(t, v, bCount, "Wrong bucket #%d count", i) + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts) } }) } @@ -238,10 +239,19 @@ func TestHistogramNotSet(t *testing.T) { } func calcBuckets(points []core.Number, profile test.Profile) []uint64 { - counts := make([]uint64, len(boundaries[profile.NumberKind])+1) + sortedBoundaries := numbers{ + numbers: make([]core.Number, len(boundaries[profile.NumberKind])), + kind: profile.NumberKind, + } + + copy(sortedBoundaries.numbers, boundaries[profile.NumberKind]) + sort.Sort(&sortedBoundaries) + boundaries := sortedBoundaries.numbers + + counts := make([]uint64, len(boundaries)+1) idx := 0 for _, p := range points { - for idx < len(boundaries[profile.NumberKind]) && p.CompareNumber(profile.NumberKind, boundaries[profile.NumberKind][idx]) != -1 { + for idx < len(boundaries) && p.CompareNumber(profile.NumberKind, boundaries[idx]) != -1 { idx++ } counts[idx]++ From 0fb71fba70bc033fdc25e17a19a784692a1d69c6 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Sun, 19 Jan 2020 22:55:30 -0300 Subject: [PATCH 12/14] remove testing field --- sdk/metric/aggregator/histogram/histogram.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 069bcc837d0..652d46072d6 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -39,7 +39,6 @@ type ( // the sum and counts for all observed values and // the less than equal bucket count for the pre-determined boundaries. state struct { - a uint8 // all fields have to be aligned for 64-bit atomic operations. buckets aggregator.Buckets count core.Number From ceccec2e8c50f982751b6fbfa706f3e91700143c Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Sun, 19 Jan 2020 22:58:14 -0300 Subject: [PATCH 13/14] fixes import order --- sdk/metric/aggregator/histogram/histogram_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 25ce7c22f44..65f7d3c470f 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -25,9 +25,9 @@ import ( "unsafe" "github.com/stretchr/testify/require" - ottest "go.opentelemetry.io/otel/internal/testing" "go.opentelemetry.io/otel/api/core" + ottest "go.opentelemetry.io/otel/internal/testing" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/test" ) From d294e22936a375461776e5373206fd6ba5cea072 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Mon, 20 Jan 2020 15:59:09 -0300 Subject: [PATCH 14/14] =?UTF-8?q?remove=20print=20=F0=9F=99=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdk/metric/aggregator/histogram/histogram_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 65f7d3c470f..c70e6667be0 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -95,8 +95,6 @@ func TestMain(m *testing.M) { fmt.Println(fields) if !ottest.Aligned8Byte(fields, os.Stderr) { - fmt.Println("QUEBROU") - os.Exit(1) }