Skip to content

Commit

Permalink
Merge pull request open-telemetry#791 from MrAlias/err-handle
Browse files Browse the repository at this point in the history
Update Error Handling
  • Loading branch information
MrAlias authored Jun 2, 2020
2 parents cff4ddf + 5fa286d commit eb14b39
Show file tree
Hide file tree
Showing 17 changed files with 140 additions and 311 deletions.
1 change: 1 addition & 0 deletions exporters/metric/prometheus/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
15 changes: 1 addition & 14 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type Exporter struct {
lock sync.RWMutex
controller *pull.Controller

onError func(error)

defaultSummaryQuantiles []float64
defaultHistogramBoundaries []float64
}
Expand Down Expand Up @@ -85,10 +83,6 @@ type Config struct {
// DefaultHistogramBoundaries defines the default histogram bucket
// boundaries.
DefaultHistogramBoundaries []float64

// OnError is a function that handle errors that may occur while exporting metrics.
// TODO: This should be refactored or even removed once we have a better error handling mechanism.
OnError func(error)
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
Expand All @@ -106,19 +100,12 @@ func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error)
config.Gatherer = config.Registry
}

if config.OnError == nil {
config.OnError = func(err error) {
fmt.Println(err.Error())
}
}

e := &Exporter{
handler: promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}),
registerer: config.Registerer,
gatherer: config.Gatherer,
defaultSummaryQuantiles: config.DefaultSummaryQuantiles,
defaultHistogramBoundaries: config.DefaultHistogramBoundaries,
onError: config.OnError,
}

c := &collector{
Expand Down Expand Up @@ -257,7 +244,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
return nil
})
if err != nil {
c.exp.onError(err)
global.Handle(err)
}
}

Expand Down
24 changes: 1 addition & 23 deletions exporters/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package jaeger
import (
"context"
"encoding/binary"
"log"

"go.opentelemetry.io/otel/api/kv/value"

Expand All @@ -37,11 +36,6 @@ type Option func(*options)

// options are the options to be used when initializing a Jaeger export.
type options struct {
// OnError is the hook to be called when there is
// an error occurred when uploading the span data.
// If no custom hook is set, errors are logged.
OnError func(err error)

// Process contains the information about the exporting process.
Process Process

Expand All @@ -55,15 +49,6 @@ type options struct {
RegisterGlobal bool
}

// WithOnError sets the hook to be called when there is
// an error occurred when uploading the span data.
// If no custom hook is set, errors are logged.
func WithOnError(onError func(err error)) Option {
return func(o *options) {
o.OnError = onError
}
}

// WithProcess sets the process with the information about the exporting process.
func WithProcess(process Process) Option {
return func(o *options) {
Expand Down Expand Up @@ -106,13 +91,6 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
opt(&o)
}

onError := func(err error) {
if o.OnError != nil {
o.OnError(err)
return
}
log.Printf("Error when uploading spans to Jaeger: %v", err)
}
service := o.Process.ServiceName
if service == "" {
service = defaultServiceName
Expand All @@ -134,7 +112,7 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
}
bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) {
if err := e.upload(bundle.([]*gen.Span)); err != nil {
onError(err)
global.Handle(err)
}
})

Expand Down
27 changes: 7 additions & 20 deletions internal/metric/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"context"
"errors"
"fmt"
"os"
"sync"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
)
Expand All @@ -41,11 +41,9 @@ type AsyncCollector interface {
type AsyncInstrumentState struct {
lock sync.Mutex

// errorHandler will be called in case of an invalid
// metric.AsyncRunner, i.e., one that does not implement
// either the single-or batch-runner interfaces.
errorHandler func(error)
errorOnce sync.Once
// errorOnce will use the global.Handler to report an error
// once in case of an invalid runner attempting to run.
errorOnce sync.Once

// runnerMap keeps the set of runners that will run each
// collection interval. Singletons are entered with a real
Expand Down Expand Up @@ -79,20 +77,9 @@ type asyncRunnerPair struct {
// NewAsyncInstrumentState returns a new *AsyncInstrumentState, for
// use by MeterImpl to manage running the set of observer callbacks in
// the correct order.
//
// errorHandler is used to print an error condition. If errorHandler
// nil, the default error handler will be used that prints to
// os.Stderr. Only the first error is passed to the handler, after
// which errors are skipped.
func NewAsyncInstrumentState(errorHandler func(error)) *AsyncInstrumentState {
if errorHandler == nil {
errorHandler = func(err error) {
fmt.Fprintln(os.Stderr, "Metrics Async state error:", err)
}
}
func NewAsyncInstrumentState() *AsyncInstrumentState {
return &AsyncInstrumentState{
errorHandler: errorHandler,
runnerMap: map[asyncRunnerPair]struct{}{},
runnerMap: map[asyncRunnerPair]struct{}{},
}
}

Expand Down Expand Up @@ -155,7 +142,7 @@ func (a *AsyncInstrumentState) Run(ctx context.Context, collector AsyncCollector
}

a.errorOnce.Do(func() {
a.errorHandler(fmt.Errorf("%w: type %T (reported once)", ErrInvalidAsyncRunner, rp))
global.Handle(fmt.Errorf("%w: type %T (reported once)", ErrInvalidAsyncRunner, rp))
})
}
}
2 changes: 1 addition & 1 deletion internal/metric/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (m *MeterImpl) doRecordSingle(ctx context.Context, labels []kv.KeyValue, in

func NewProvider() (*MeterImpl, apimetric.Provider) {
impl := &MeterImpl{
asyncInstruments: NewAsyncInstrumentState(nil),
asyncInstruments: NewAsyncInstrumentState(),
}
return impl, registry.NewProvider(impl)
}
Expand Down
21 changes: 3 additions & 18 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@

package metric

import "go.opentelemetry.io/otel/sdk/resource"
import (
"go.opentelemetry.io/otel/sdk/resource"
)

// Config contains configuration for an SDK.
type Config struct {
// ErrorHandler is the function called when the SDK encounters an error.
//
// This option can be overridden after instantiation of the SDK
// with the `SetErrorHandler` method.
ErrorHandler ErrorHandler

// Resource describes all the metric records processed by the
// Accumulator.
Resource *resource.Resource
Expand All @@ -35,17 +31,6 @@ type Option interface {
Apply(*Config)
}

// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn ErrorHandler) Option {
return errorHandlerOption(fn)
}

type errorHandlerOption ErrorHandler

func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = ErrorHandler(o)
}

// WithResource sets the Resource configuration option of a Config.
func WithResource(res *resource.Resource) Option {
return resourceOption{res}
Expand Down
45 changes: 0 additions & 45 deletions sdk/metric/config_test.go

This file was deleted.

17 changes: 0 additions & 17 deletions sdk/metric/controller/pull/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@ package pull
import (
"time"

sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

// Config contains configuration for a push Controller.
type Config struct {
// ErrorHandler is the function called when the Controller encounters an error.
//
// This option can be overridden after instantiation of the Controller
// with the `SetErrorHandler` method.
ErrorHandler sdk.ErrorHandler

// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
Expand All @@ -52,17 +46,6 @@ type Option interface {
Apply(*Config)
}

// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn sdk.ErrorHandler) Option {
return errorHandlerOption(fn)
}

type errorHandlerOption sdk.ErrorHandler

func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = sdk.ErrorHandler(o)
}

// WithResource sets the Resource configuration option of a Config.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}
Expand Down
6 changes: 2 additions & 4 deletions sdk/metric/controller/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ type Controller struct {
// New returns a *Controller configured with an aggregation selector and options.
func New(selector export.AggregationSelector, options ...Option) *Controller {
config := &Config{
Resource: resource.Empty(),
ErrorHandler: sdk.DefaultErrorHandler,
CachePeriod: DefaultCachePeriod,
Resource: resource.Empty(),
CachePeriod: DefaultCachePeriod,
}
for _, opt := range options {
opt.Apply(config)
Expand All @@ -58,7 +57,6 @@ func New(selector export.AggregationSelector, options ...Option) *Controller {
accum := sdk.NewAccumulator(
integrator,
sdk.WithResource(config.Resource),
sdk.WithErrorHandler(config.ErrorHandler),
)
return &Controller{
accumulator: accum,
Expand Down
18 changes: 0 additions & 18 deletions sdk/metric/controller/push/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@ package push
import (
"time"

sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

// Config contains configuration for a push Controller.
type Config struct {
// ErrorHandler is the function called when the Controller encounters an error.
//
// This option can be overridden after instantiation of the Controller
// with the `SetErrorHandler` method.
ErrorHandler sdk.ErrorHandler

// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
Resource *resource.Resource
Expand All @@ -53,17 +46,6 @@ type Option interface {
Apply(*Config)
}

// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn sdk.ErrorHandler) Option {
return errorHandlerOption(fn)
}

type errorHandlerOption sdk.ErrorHandler

func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = sdk.ErrorHandler(o)
}

// WithResource sets the Resource configuration option of a Config.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}
Expand Down
Loading

0 comments on commit eb14b39

Please sign in to comment.