Skip to content

Commit

Permalink
Revert "ensure the initOnce is always called; handle multiple errors"
Browse files Browse the repository at this point in the history
This reverts commit 3356eb5.
  • Loading branch information
Joshua MacDonald committed Sep 24, 2021
1 parent 21f7401 commit 265232b
Showing 1 changed file with 27 additions and 43 deletions.
70 changes: 27 additions & 43 deletions sdk/metric/controller/basic/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOptio
newTmp := &initMeterOnce{}
m, _ := c.libraries.LoadOrStore(library, newTmp)
mo := m.(*initMeterOnce)

return metric.WrapMeterImpl(c.initializeUniqueMeter(library, mo))
}

func (c *Controller) initializeUniqueMeter(library instrumentation.Library, mo *initMeterOnce) *registry.UniqueInstrumentMeterImpl {
mo.initOnce.Do(func() {
checkpointer := c.checkpointerFactory.NewCheckpointer()
accumulator := sdk.NewAccumulator(checkpointer)
Expand All @@ -109,19 +104,8 @@ func (c *Controller) initializeUniqueMeter(library instrumentation.Library, mo *
library: library,
})
})
return mo.unique
}

// syncMapKeyValueToAccuulatorCheckpointer encapsulates the invariants
// placed on the libraries sync.Map, which is a
// map[instrumentation.Library]*initMeterOnce where the
// registry.UniqueInstrumentMeter's implementation is a
// *accumulatorCheckpointer.
func (c *Controller) syncMapKeyValueToAccumulatorCheckpointer(key, value interface{}) *accumulatorCheckpointer {
return c.initializeUniqueMeter(
key.(instrumentation.Library),
value.(*initMeterOnce),
).MeterImpl().(*accumulatorCheckpointer)
return metric.WrapMeterImpl(mo.unique)
}

type accumulatorCheckpointer struct {
Expand Down Expand Up @@ -261,27 +245,35 @@ func (c *Controller) collect(ctx context.Context) error {
return c.export(ctx)
}

// accumulatorList returns a snapshot of current accumulators
// registered to this controller. This briefly locks the controller.
func (c *Controller) accumulatorList() []*accumulatorCheckpointer {
c.lock.Lock()
defer c.lock.Unlock()

var r []*accumulatorCheckpointer
c.libraries.Range(func(_, value interface{}) bool {
mo := value.(*initMeterOnce)
acc, ok := mo.unique.MeterImpl().(*accumulatorCheckpointer)
if ok {
r = append(r, acc)
}
return true
})
return r
}

// checkpoint calls the Accumulator and Checkpointer interfaces to
// compute the Reader. This applies the configured collection
// timeout. Note that this does not try to cancel a Collect or Export
// when Stop() is called.
func (c *Controller) checkpoint(ctx context.Context) error {
var errs []error
c.libraries.Range(func(key, value interface{}) bool {
acPair := c.syncMapKeyValueToAccumulatorCheckpointer(key, value)

if err := c.checkpointSingleAccumulator(ctx, acPair); err != nil {
errs = append(errs, err)
for _, impl := range c.accumulatorList() {
if err := c.checkpointSingleAccumulator(ctx, impl); err != nil {
return err
}
return false
})
if errs == nil {
return nil
}
if len(errs) == 1 {
return errs[0]
}
return fmt.Errorf("multiple checkpoint errors %w %v", errs[0], errs[1:])
return nil
}

// checkpointSingleAccumulator checkpoints a single instrumentation
Expand Down Expand Up @@ -337,26 +329,18 @@ func (c *Controller) export(ctx context.Context) error {

// ForEach implements export.InstrumentationLibraryReader.
func (c *Controller) ForEach(readerFunc func(l instrumentation.Library, r export.Reader) error) error {
var errs []error
c.libraries.Range(func(key, value interface{}) bool {
acPair := c.syncMapKeyValueToAccumulatorCheckpointer(key, value)
for _, acPair := range c.accumulatorList() {
reader := acPair.checkpointer.Reader()
// TODO: We should not fail fast; instead accumulate errors.
if err := func() error {
reader.RLock()
defer reader.RUnlock()
return readerFunc(acPair.library, reader)
}(); err != nil {
errs = append(errs, err)
return err
}
return false
})
if errs == nil {
return nil
}
if len(errs) == 1 {
return errs[0]
}
return fmt.Errorf("multiple ForEach errors %w %v", errs[0], errs[1:])
return nil
}

// IsRunning returns true if the controller was started via Start(),
Expand Down

0 comments on commit 265232b

Please sign in to comment.