Skip to content

Commit

Permalink
Update SDK impl
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Apr 5, 2023
1 parent 8f886ae commit 2b05ab1
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 141 deletions.
30 changes: 23 additions & 7 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,44 @@ func BenchmarkCounterAddNoAttrs(b *testing.B) {
}

func BenchmarkCounterAddOneAttr(b *testing.B) {
s := attribute.NewSet(attribute.String("K", "V"))
ctx, _, cntr := benchCounter(b)

for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.String("K", "V"))
cntr.Add(ctx, 1, instrument.WithAttributes(s))
}
}

func BenchmarkCounterAddOneInvalidAttr(b *testing.B) {
s := attribute.NewSet(
attribute.String("", "V"),
attribute.String("K", "V"),
)
ctx, _, cntr := benchCounter(b)

for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.String("", "V"), attribute.String("K", "V"))
cntr.Add(ctx, 1, instrument.WithAttributes(s))
}
}

func BenchmarkCounterAddSingleUseAttrs(b *testing.B) {
ctx, _, cntr := benchCounter(b)

for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("K", i))
s := attribute.NewSet(attribute.Int("K", i))
cntr.Add(ctx, 1, instrument.WithAttributes(s))
}
}

func BenchmarkCounterAddSingleUseInvalidAttrs(b *testing.B) {
ctx, _, cntr := benchCounter(b)

for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("", i), attribute.Int("K", i))
s := attribute.NewSet(
attribute.Int("", i),
attribute.Int("K", i),
)
cntr.Add(ctx, 1, instrument.WithAttributes(s))
}
}

Expand All @@ -83,15 +93,20 @@ func BenchmarkCounterAddSingleUseFilteredAttrs(b *testing.B) {
))

for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("L", i), attribute.Int("K", i))
s := attribute.NewSet(
attribute.Int("L", i),
attribute.Int("K", i),
)
cntr.Add(ctx, 1, instrument.WithAttributes(s))
}
}

func BenchmarkCounterCollectOneAttr(b *testing.B) {
s := attribute.NewSet(attribute.Int("K", 1))
ctx, rdr, cntr := benchCounter(b)

for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("K", 1))
cntr.Add(ctx, 1, instrument.WithAttributes(s))

_ = rdr.Collect(ctx, nil)
}
Expand All @@ -102,7 +117,8 @@ func BenchmarkCounterCollectTenAttrs(b *testing.B) {

for i := 0; i < b.N; i++ {
for j := 0; j < 10; j++ {
cntr.Add(ctx, 1, attribute.Int("K", j))
s := attribute.NewSet(attribute.Int("K", j))
cntr.Add(ctx, 1, instrument.WithAttributes(s))
}
_ = rdr.Collect(ctx, nil)
}
Expand Down
69 changes: 44 additions & 25 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,40 +170,63 @@ type streamID struct {
Number string
}

type instrumentImpl[N int64 | float64] struct {
aggregators []internal.Aggregator[N]
type int64Inst struct {
aggregators []internal.Aggregator[int64]

embedded.Float64Counter
embedded.Float64UpDownCounter
embedded.Float64Histogram
embedded.Int64Counter
embedded.Int64UpDownCounter
embedded.Int64Histogram
}

var _ instrument.Float64Counter = (*instrumentImpl[float64])(nil)
var _ instrument.Float64UpDownCounter = (*instrumentImpl[float64])(nil)
var _ instrument.Float64Histogram = (*instrumentImpl[float64])(nil)
var _ instrument.Int64Counter = (*instrumentImpl[int64])(nil)
var _ instrument.Int64UpDownCounter = (*instrumentImpl[int64])(nil)
var _ instrument.Int64Histogram = (*instrumentImpl[int64])(nil)
var _ instrument.Int64Counter = (*int64Inst)(nil)
var _ instrument.Int64UpDownCounter = (*int64Inst)(nil)
var _ instrument.Int64Histogram = (*int64Inst)(nil)

func (i *int64Inst) Add(ctx context.Context, val int64, opts ...instrument.Int64AddOption) {
c := instrument.NewInt64AddConfig(opts...)
i.aggregate(ctx, val, c.Attributes())
}

func (i *int64Inst) Record(ctx context.Context, val int64, opts ...instrument.Int64RecordOption) {
c := instrument.NewInt64RecordConfig(opts...)
i.aggregate(ctx, val, c.Attributes())
}

func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) {
if err := ctx.Err(); err != nil {
return
}
for _, agg := range i.aggregators {
agg.Aggregate(val, s)
}
}

type float64Inst struct {
aggregators []internal.Aggregator[float64]

embedded.Float64Counter
embedded.Float64UpDownCounter
embedded.Float64Histogram
}

var _ instrument.Float64Counter = (*float64Inst)(nil)
var _ instrument.Float64UpDownCounter = (*float64Inst)(nil)
var _ instrument.Float64Histogram = (*float64Inst)(nil)

func (i *instrumentImpl[N]) Add(ctx context.Context, val N, attrs ...attribute.KeyValue) {
i.aggregate(ctx, val, attrs)
func (i *float64Inst) Add(ctx context.Context, val float64, opts ...instrument.Float64AddOption) {
c := instrument.NewFloat64AddConfig(opts...)
i.aggregate(ctx, val, c.Attributes())
}

func (i *instrumentImpl[N]) Record(ctx context.Context, val N, attrs ...attribute.KeyValue) {
i.aggregate(ctx, val, attrs)
func (i *float64Inst) Record(ctx context.Context, val float64, opts ...instrument.Float64RecordOption) {
c := instrument.NewFloat64RecordConfig(opts...)
i.aggregate(ctx, val, c.Attributes())
}

func (i *instrumentImpl[N]) aggregate(ctx context.Context, val N, attrs []attribute.KeyValue) {
func (i *float64Inst) aggregate(ctx context.Context, val float64, s attribute.Set) {
if err := ctx.Err(); err != nil {
return
}
// Do not use single attribute.Sortable and attribute.NewSetWithSortable,
// this method needs to be concurrent safe. Let the sync.Pool in the
// attribute package handle allocations of the Sortable.
s := attribute.NewSet(attrs...)
for _, agg := range i.aggregators {
agg.Aggregate(val, s)
}
Expand Down Expand Up @@ -277,11 +300,7 @@ func newObservable[N int64 | float64](scope instrumentation.Scope, kind Instrume
}

// observe records the val for the set of attrs.
func (o *observable[N]) observe(val N, attrs []attribute.KeyValue) {
// Do not use single attribute.Sortable and attribute.NewSetWithSortable,
// this method needs to be concurrent safe. Let the sync.Pool in the
// attribute package handle allocations of the Sortable.
s := attribute.NewSet(attrs...)
func (o *observable[N]) observe(val N, s attribute.Set) {
for _, agg := range o.aggregators {
agg.Aggregate(val, s)
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import (
)

func BenchmarkInstrument(b *testing.B) {
attr := func(id int) []attribute.KeyValue {
return []attribute.KeyValue{
attr := func(id int) attribute.Set {
return attribute.NewSet(
attribute.String("user", "Alice"),
attribute.Bool("admin", true),
attribute.Int("id", id),
}
)
}

b.Run("instrumentImpl/aggregate", func(b *testing.B) {
inst := instrumentImpl[int64]{aggregators: []internal.Aggregator[int64]{
inst := int64Inst{aggregators: []internal.Aggregator[int64]{
internal.NewLastValue[int64](),
internal.NewCumulativeSum[int64](true),
internal.NewDeltaSum[int64](true),
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregator_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ type inst struct {
embedded.Int64Histogram
}

func (inst) Add(context.Context, int64, ...attribute.KeyValue) {}
func (inst) Record(context.Context, int64, ...attribute.KeyValue) {}
func (inst) Add(context.Context, int64, ...instrument.Int64AddOption) {}
func (inst) Record(context.Context, int64, ...instrument.Int64RecordOption) {}

func Example() {
m := meter{}
Expand Down
77 changes: 54 additions & 23 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
Expand All @@ -38,8 +37,8 @@ type meter struct {
scope instrumentation.Scope
pipes pipelines

int64IP *instProvider[int64]
float64IP *instProvider[float64]
int64IP *int64InstProvider
float64IP *float64InstProvider
}

func newMeter(s instrumentation.Scope, p pipelines) *meter {
Expand All @@ -50,8 +49,8 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
return &meter{
scope: s,
pipes: p,
int64IP: newInstProvider[int64](s, p, &viewCache),
float64IP: newInstProvider[float64](s, p, &viewCache),
int64IP: newInt64InstProvider(s, p, &viewCache),
float64IP: newFloat64InstProvider(s, p, &viewCache),
}
}

Expand Down Expand Up @@ -297,7 +296,7 @@ var (
errUnregObserver = errors.New("observable instrument not registered for callback")
)

func (r observer) ObserveFloat64(o instrument.Float64Observable, v float64, a ...attribute.KeyValue) {
func (r observer) ObserveFloat64(o instrument.Float64Observable, v float64, opts ...instrument.Float64ObserveOption) {
var oImpl float64Observable
switch conv := o.(type) {
case float64Observable:
Expand Down Expand Up @@ -326,10 +325,11 @@ func (r observer) ObserveFloat64(o instrument.Float64Observable, v float64, a ..
)
return
}
oImpl.observe(v, a)
c := instrument.NewFloat64ObserveConfig(opts...)
oImpl.observe(v, c.Attributes())
}

func (r observer) ObserveInt64(o instrument.Int64Observable, v int64, a ...attribute.KeyValue) {
func (r observer) ObserveInt64(o instrument.Int64Observable, v int64, opts ...instrument.Int64ObserveOption) {
var oImpl int64Observable
switch conv := o.(type) {
case int64Observable:
Expand Down Expand Up @@ -358,7 +358,8 @@ func (r observer) ObserveInt64(o instrument.Int64Observable, v int64, a ...attri
)
return
}
oImpl.observe(v, a)
c := instrument.NewInt64ObserveConfig(opts...)
oImpl.observe(v, c.Attributes())
}

type noopRegister struct{ embedded.Registration }
Expand All @@ -367,18 +368,18 @@ func (noopRegister) Unregister() error {
return nil
}

// instProvider provides all OpenTelemetry instruments.
type instProvider[N int64 | float64] struct {
// int64InstProvider provides int64 OpenTelemetry instruments.
type int64InstProvider struct {
scope instrumentation.Scope
pipes pipelines
resolve resolver[N]
resolve resolver[int64]
}

func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver[N](p, c)}
func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *int64InstProvider {
return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)}
}

func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc, u string) ([]internal.Aggregator[N], error) {
func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]internal.Aggregator[int64], error) {
inst := Instrument{
Name: name,
Description: desc,
Expand All @@ -390,12 +391,40 @@ func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc, u string) ([]int
}

// lookup returns the resolved instrumentImpl.
func (p *instProvider[N]) lookup(kind InstrumentKind, name, desc, u string) (*instrumentImpl[N], error) {
func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &instrumentImpl[N]{aggregators: aggs}, err
return &int64Inst{aggregators: aggs}, err
}

type int64ObservProvider struct{ *instProvider[int64] }
// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct {
scope instrumentation.Scope
pipes pipelines
resolve resolver[float64]
}

func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *float64InstProvider {
return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)}
}

func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]internal.Aggregator[float64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
}
return p.resolve.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{aggregators: aggs}, err
}

type int64ObservProvider struct{ *int64InstProvider }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
aggs, err := p.aggs(kind, name, desc, u)
Expand Down Expand Up @@ -423,11 +452,12 @@ type int64Observer struct {
int64Observable
}

func (o int64Observer) Observe(val int64, attrs ...attribute.KeyValue) {
o.observe(val, attrs)
func (o int64Observer) Observe(val int64, opts ...instrument.Int64ObserveOption) {
c := instrument.NewInt64ObserveConfig(opts...)
o.observe(val, c.Attributes())
}

type float64ObservProvider struct{ *instProvider[float64] }
type float64ObservProvider struct{ *float64InstProvider }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
aggs, err := p.aggs(kind, name, desc, u)
Expand Down Expand Up @@ -455,6 +485,7 @@ type float64Observer struct {
float64Observable
}

func (o float64Observer) Observe(val float64, attrs ...attribute.KeyValue) {
o.observe(val, attrs)
func (o float64Observer) Observe(val float64, opts ...instrument.Float64ObserveOption) {
c := instrument.NewFloat64ObserveConfig(opts...)
o.observe(val, c.Attributes())
}
Loading

0 comments on commit 2b05ab1

Please sign in to comment.