Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for NonAbsolute Measurement MaxSumCount #335

Merged
merged 13 commits into from
Nov 25, 2019
Merged
15 changes: 15 additions & 0 deletions api/core/number.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ const (
Uint64NumberKind
)

// Minimum returns the minimum representable value
// for a given NumberKind
func (k NumberKind) Minimum() Number {
switch k {
case Int64NumberKind:
return NewInt64Number(math.MinInt64)
case Float64NumberKind:
return NewFloat64Number(-1. * math.MaxFloat64)
case Uint64NumberKind:
return NewUint64Number(0)
default:
return Number(0)
}
}

// Number represents either an integral or a floating point value. It
// needs to be accompanied with a source of NumberKind that describes
// the actual type of the value stored within Number.
Expand Down
12 changes: 6 additions & 6 deletions exporter/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
expose.Count = count
}

// TODO: Should tolerate ErrEmptyDataSet here,
// just like ErrNoLastValue below, since
// there's a race condition between creating
// the Aggregator and updating the first
// value.

if max, err := msc.Max(); err != nil {
if err == aggregator.ErrEmptyDataSet {
// This is a special case, indicates an aggregator that
// was checkpointed before its first value was set.
return
}

aggError = err
expose.Max = "NaN"
} else {
Expand Down
33 changes: 20 additions & 13 deletions exporter/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestStdoutMaxSumCount(t *testing.T) {
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())

desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
magg := maxsumcount.New()
magg := maxsumcount.New(desc)
aggtest.CheckedUpdate(fix.t, magg, core.NewFloat64Number(123.456), desc)
aggtest.CheckedUpdate(fix.t, magg, core.NewFloat64Number(876.543), desc)
magg.Checkpoint(fix.ctx, desc)
Expand Down Expand Up @@ -220,23 +220,30 @@ func TestStdoutMeasureFormat(t *testing.T) {
}`, fix.Output())
}

func TestStdoutAggError(t *testing.T) {
fix := newFixture(t, stdout.Options{})
func TestStdoutEmptyDataSet(t *testing.T) {
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
for name, tc := range map[string]export.Aggregator{
"ddsketch": ddsketch.New(ddsketch.NewDefaultConfig(), desc),
"maxsumcount": maxsumcount.New(desc),
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()

checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
fix := newFixture(t, stdout.Options{})

desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
magg := ddsketch.New(ddsketch.NewDefaultConfig(), desc)
magg.Checkpoint(fix.ctx, desc)
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())

checkpointSet.Add(desc, magg)
magg := tc
magg.Checkpoint(fix.ctx, desc)

err := fix.exporter.Export(fix.ctx, checkpointSet)
checkpointSet.Add(desc, magg)

// An error is returned and NaN values are printed.
require.Error(t, err)
require.Equal(t, aggregator.ErrEmptyDataSet, err)
require.Equal(t, `{"updates":[{"name":"test.name","max":"NaN","sum":0,"count":0,"quantiles":[{"q":0.5,"v":"NaN"},{"q":0.9,"v":"NaN"},{"q":0.99,"v":"NaN"}]}]}`, fix.Output())
fix.Export(checkpointSet)

require.Equal(t, `{"updates":null}`, fix.Output())
})
}
}

func TestStdoutGaugeNotSet(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *Aggregator) Max() (core.Number, error) {
return c.Quantile(1)
}

// Min returns the mininum value in the checkpoint.
// Min returns the minimum value in the checkpoint.
func (c *Aggregator) Min() (core.Number, error) {
return c.Quantile(0)
}
Expand Down
22 changes: 18 additions & 4 deletions sdk/metric/aggregator/maxsumcount/msc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type (
Aggregator struct {
current state
checkpoint state
kind core.NumberKind
}

state struct {
Expand All @@ -50,8 +51,15 @@ var _ aggregator.MaxSumCount = &Aggregator{}
// atomic operations, which introduces the possibility that
// checkpoints are inconsistent. For greater consistency and lower
// performance, consider using Array or DDSketch aggregators.
func New() *Aggregator {
return &Aggregator{}
func New(desc *export.Descriptor) *Aggregator {
return &Aggregator{
kind: desc.NumberKind(),
current: unsetMaxSumCount(desc.NumberKind()),
}
}

func unsetMaxSumCount(kind core.NumberKind) state {
return state{max: kind.Minimum()}
}

// Sum returns the sum of values in the checkpoint.
Expand All @@ -65,15 +73,21 @@ func (c *Aggregator) Count() (int64, error) {
}

// Max returns the maximum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet
// will be returned if (due to a race condition) the checkpoint was
// computed before the first value was set.
func (c *Aggregator) Max() (core.Number, error) {
if c.checkpoint == unsetMaxSumCount(c.kind) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the MaxSumCount aggregator does not take a lock during Update, each field of the state will be independently modified, and a race with Update could happen such that the count and sum are non-zero but the max field hasn't been set. So, I think you should test c.checkpoint.max == c.kind.Minimum() here.

(Note: this means, now, that the minimum and maximum values are considered invalid. This might be worth adding comments about, i.e., that if you record a measure value which is the Minimum possible value, that the Max() function will return a no-data error. I don't think this will be a problem, but it would probably become a problem if this library supported unsigned integer instruments. Then a value of Record(0) would lead to a no-data error in Max(). If we did ever support unsigned instruments, I we'd probably want to offset by one or something. Nevermind.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return core.Number(0), aggregator.ErrEmptyDataSet
}
return c.checkpoint.max, nil
}

// Checkpoint saves the current state and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Max, Sum, and Count are not consistent with each
// other.
func (c *Aggregator) Checkpoint(ctx context.Context, _ *export.Descriptor) {
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
// N.B. There is no atomic operation that can update all three
// values at once without a memory allocation.
//
Expand All @@ -86,7 +100,7 @@ func (c *Aggregator) Checkpoint(ctx context.Context, _ *export.Descriptor) {

c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
c.checkpoint.max = c.current.max.SwapNumberAtomic(core.Number(0))
c.checkpoint.max = c.current.max.SwapNumberAtomic(desc.NumberKind().Minimum())
evantorrie marked this conversation as resolved.
Show resolved Hide resolved
}

// Update adds the recorded measurement to the current data set.
Expand Down
132 changes: 102 additions & 30 deletions sdk/metric/aggregator/maxsumcount/msc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,104 @@ package maxsumcount

import (
"context"
"math"
"math/rand"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
)

const count = 100

type policy struct {
name string
absolute bool
sign func() int
}

var (
positiveOnly = policy{
name: "absolute",
absolute: true,
sign: func() int { return +1 },
}
negativeOnly = policy{
name: "negative",
absolute: false,
sign: func() int { return -1 },
}
positiveAndNegative = policy{
name: "positiveAndNegative",
absolute: false,
sign: func() int {
if rand.Uint32() > math.MaxUint32/2 {
return -1
}
return 1
},
}
)

func TestMaxSumCountAbsolute(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
maxSumCount(t, profile, positiveOnly)
})
}

func TestMaxSumCountNegativeOnly(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
record := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false)
maxSumCount(t, profile, negativeOnly)
})
}

agg := New()
func TestMaxSumCountPositiveAndNegative(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
maxSumCount(t, profile, positiveAndNegative)
})
}

all := test.NewNumbers(profile.NumberKind)
// Validates max, sum and count for a given profile and policy
func maxSumCount(t *testing.T, profile test.Profile, policy policy) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, !policy.absolute)

for i := 0; i < count; i++ {
x := profile.Random(+1)
all.Append(x)
test.CheckedUpdate(t, agg, x, record)
}
agg := New(descriptor)

agg.Checkpoint(ctx, record)
all := test.NewNumbers(profile.NumberKind)

all.Sort()
for i := 0; i < count; i++ {
x := profile.Random(policy.sign())
all.Append(x)
test.CheckedUpdate(t, agg, x, descriptor)
}

asum, err := agg.Sum()
require.InEpsilon(t,
all.Sum().CoerceToFloat64(profile.NumberKind),
asum.CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - absolute")
require.Nil(t, err)
agg.Checkpoint(ctx, descriptor)

count, err := agg.Count()
require.Equal(t, all.Count(), count, "Same count - absolute")
require.Nil(t, err)
all.Sort()

max, err := agg.Max()
require.Nil(t, err)
require.Equal(t,
all.Max(),
max,
"Same max - absolute")
})
asum, err := agg.Sum()
require.InEpsilon(t,
all.Sum().CoerceToFloat64(profile.NumberKind),
asum.CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - "+policy.name)
require.Nil(t, err)

count, err := agg.Count()
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.Nil(t, err)

max, err := agg.Max()
require.Nil(t, err)
require.Equal(t,
all.Max(),
max,
"Same max -"+policy.name)
}

func TestMaxSumCountMerge(t *testing.T) {
Expand All @@ -73,8 +122,8 @@ func TestMaxSumCountMerge(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false)

agg1 := New()
agg2 := New()
agg1 := New(descriptor)
agg2 := New(descriptor)

all := test.NewNumbers(profile.NumberKind)

Expand Down Expand Up @@ -116,3 +165,26 @@ func TestMaxSumCountMerge(t *testing.T) {
"Same max - absolute")
})
}

func TestMaxSumCountNotSet(t *testing.T) {
ctx := context.Background()

test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false)

agg := New(descriptor)
agg.Checkpoint(ctx, descriptor)

asum, err := agg.Sum()
require.Equal(t, core.Number(0), asum, "Empty checkpoint sum = 0")
require.Nil(t, err)

count, err := agg.Count()
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
require.Nil(t, err)

max, err := agg.Max()
require.Equal(t, aggregator.ErrEmptyDataSet, err)
require.Equal(t, core.Number(0), max)
})
}
2 changes: 1 addition & 1 deletion sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (*benchFixture) AggregatorFor(descriptor *export.Descriptor) export.Aggrega
return gauge.New()
case export.MeasureKind:
if strings.HasSuffix(descriptor.Name(), "maxsumcount") {
return maxsumcount.New()
return maxsumcount.New(descriptor)
} else if strings.HasSuffix(descriptor.Name(), "ddsketch") {
return ddsketch.New(ddsketch.NewDefaultConfig(), descriptor)
} else if strings.HasSuffix(descriptor.Name(), "array") {
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/selector/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *export.Descriptor) export.A
case export.GaugeKind:
return gauge.New()
case export.MeasureKind:
return maxsumcount.New()
return maxsumcount.New(descriptor)
default:
return counter.New()
}
Expand Down