Skip to content

Commit

Permalink
Reduce duplicate code in components helper
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 20, 2020
1 parent 29859f2 commit 8e20e5d
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 138 deletions.
62 changes: 62 additions & 0 deletions component/componenthelper/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenthelper

import (
"context"

"go.opentelemetry.io/collector/component"
)

// Start specifies the function invoked when the exporter is being started.
type Start func(context.Context, component.Host) error

// Shutdown specifies the function invoked when the exporter is being shutdown.
type Shutdown func(context.Context) error

type Settings struct {
Start
Shutdown
}

// DefaultSettings returns the default settings for a component. The Start and Shutdown are no-op.
func DefaultSettings() *Settings {
return &Settings{
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
}
}

type baseComponent struct {
start Start
shutdown Shutdown
}

// Start all senders and exporter and is invoked during service start.
func (be *baseComponent) Start(ctx context.Context, host component.Host) error {
return be.start(ctx, host)
}

// Shutdown all senders and exporter and is invoked during service shutdown.
func (be *baseComponent) Shutdown(ctx context.Context) error {
return be.shutdown(ctx)
}

func NewComponent(s *Settings) component.Component {
return &baseComponent{
start: s.Start,
shutdown: s.Shutdown,
}
}
69 changes: 69 additions & 0 deletions component/componenthelper/component_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenthelper

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
)

func TestDefaultSettings(t *testing.T) {
st := DefaultSettings()
require.NotNil(t, st)
cp := NewComponent(st)
require.NoError(t, cp.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, cp.Shutdown(context.Background()))
}

func TestWithStart(t *testing.T) {
startCalled := false
st := DefaultSettings()
st.Start = func(context.Context, component.Host) error { startCalled = true; return nil }
cp := NewComponent(st)
assert.NoError(t, cp.Start(context.Background(), componenttest.NewNopHost()))
assert.True(t, startCalled)
}

func TestWithStart_ReturnError(t *testing.T) {
want := errors.New("my_error")
st := DefaultSettings()
st.Start = func(context.Context, component.Host) error { return want }
cp := NewComponent(st)
assert.Equal(t, want, cp.Start(context.Background(), componenttest.NewNopHost()))
}

func TestWithShutdown(t *testing.T) {
shutdownCalled := false
st := DefaultSettings()
st.Shutdown = func(context.Context) error { shutdownCalled = true; return nil }
cp := NewComponent(st)
assert.NoError(t, cp.Shutdown(context.Background()))
assert.True(t, shutdownCalled)
}

func TestWithShutdown_ReturnError(t *testing.T) {
want := errors.New("my_error")
st := DefaultSettings()
st.Shutdown = func(context.Context) error { return want }
cp := NewComponent(st)
assert.Equal(t, want, cp.Shutdown(context.Background()))
}
100 changes: 39 additions & 61 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ package exporterhelper

import (
"context"
"sync"
"time"

"go.opencensus.io/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
)
Expand Down Expand Up @@ -76,34 +75,26 @@ func (req *baseRequest) setContext(ctx context.Context) {
req.ctx = ctx
}

// Start specifies the function invoked when the exporter is being started.
type Start func(context.Context, component.Host) error

// Shutdown specifies the function invoked when the exporter is being shutdown.
type Shutdown func(context.Context) error

// internalOptions represents all the options that users can configure.
type internalOptions struct {
// baseSettings represents all the options that users can configure.
type baseSettings struct {
*componenthelper.Settings
TimeoutSettings
QueueSettings
RetrySettings
ResourceToTelemetrySettings
Start
Shutdown
}

// fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
// fromOptions returns the internal options starting from the default and applying all configured options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &internalOptions{
opts := &baseSettings{
Settings: componenthelper.DefaultSettings(),
TimeoutSettings: CreateDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call CreateDefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
ResourceToTelemetrySettings: createDefaultResourceToTelemetrySettings(),
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
}

for _, op := range options {
Expand All @@ -113,79 +104,75 @@ func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
return opts
}

// ExporterOption apply changes to internalOptions.
type ExporterOption func(*internalOptions)
// Option apply changes to baseSettings.
type Option func(*baseSettings)

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown Shutdown) ExporterOption {
return func(o *internalOptions) {
func WithShutdown(shutdown componenthelper.Shutdown) Option {
return func(o *baseSettings) {
o.Shutdown = shutdown
}
}

// WithStart overrides the default Start function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithStart(start Start) ExporterOption {
return func(o *internalOptions) {
func WithStart(start componenthelper.Start) Option {
return func(o *baseSettings) {
o.Start = start
}
}

// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption {
return func(o *internalOptions) {
func WithTimeout(timeoutSettings TimeoutSettings) Option {
return func(o *baseSettings) {
o.TimeoutSettings = timeoutSettings
}
}

// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) ExporterOption {
return func(o *internalOptions) {
func WithRetry(retrySettings RetrySettings) Option {
return func(o *baseSettings) {
o.RetrySettings = retrySettings
}
}

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
func WithQueue(queueSettings QueueSettings) ExporterOption {
return func(o *internalOptions) {
func WithQueue(queueSettings QueueSettings) Option {
return func(o *baseSettings) {
o.QueueSettings = queueSettings
}
}

// WithResourceToTelemetryConversion overrides the default ResourceToTelemetrySettings for an exporter.
// The default ResourceToTelemetrySettings is to disable resource attributes to metric labels conversion.
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) ExporterOption {
return func(o *internalOptions) {
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) Option {
return func(o *baseSettings) {
o.ResourceToTelemetrySettings = resourceToTelemetrySettings
}
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
cfg configmodels.Exporter
sender requestSender
qrSender *queuedRetrySender
start Start
shutdown Shutdown
startOnce sync.Once
shutdownOnce sync.Once
convertResourceToTelemetry bool
}

func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...ExporterOption) *baseExporter {
opts := fromConfiguredOptions(options...)
func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...Option) *baseExporter {
bs := fromOptions(options)
be := &baseExporter{
Component: componenthelper.NewComponent(bs.Settings),
cfg: cfg,
start: opts.Start,
shutdown: opts.Shutdown,
convertResourceToTelemetry: opts.ResourceToTelemetrySettings.Enabled,
convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled,
}

be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}, logger)
be.qrSender = newQueuedRetrySender(bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
be.sender = be.qrSender

return be
Expand All @@ -199,31 +186,22 @@ func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) reques

// Start all senders and exporter and is invoked during service start.
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
err := componenterror.ErrAlreadyStarted
be.startOnce.Do(func() {
// First start the wrapped exporter.
err = be.start(ctx, host)
if err != nil {
// TODO: Log errors, or check if it is recorded by the caller.
return
}
// First start the wrapped exporter.
if err := be.Component.Start(ctx, host); err != nil {
return err
}

// If no error then start the queuedRetrySender.
be.qrSender.start()
})
return err
// If no error then start the queuedRetrySender.
be.qrSender.start()
return nil
}

// Shutdown all senders and exporter and is invoked during service shutdown.
func (be *baseExporter) Shutdown(ctx context.Context) error {
err := componenterror.ErrAlreadyStopped
be.shutdownOnce.Do(func() {
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
err = be.shutdown(ctx)
})
return err
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
return be.Component.Shutdown(ctx)
}

// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
Expand Down
9 changes: 5 additions & 4 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ func TestBaseExporter(t *testing.T) {
}

func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be := newBaseExporter(
defaultExporterCfg,
zap.NewNop(),
WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }),
WithShutdown(func(ctx context.Context) error { return errors.New("my error") }),
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithResourceToTelemetryConversion(createDefaultResourceToTelemetrySettings()),
WithTimeout(CreateDefaultTimeoutSettings()),
)
require.Error(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, be.Shutdown(context.Background()))
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
}

func errToStatus(err error) trace.Status {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewLogsExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
pushLogsData PushLogsData,
options ...ExporterOption,
options ...Option,
) (component.LogsExporter, error) {
if cfg == nil {
return nil, errNilConfig
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewMetricsExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
pushMetricsData PushMetricsData,
options ...ExporterOption,
options ...Option,
) (component.MetricsExporter, error) {
if cfg == nil {
return nil, errNilConfig
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/tracehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewTraceExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
dataPusher traceDataPusher,
options ...ExporterOption,
options ...Option,
) (component.TracesExporter, error) {

if cfg == nil {
Expand Down
Loading

0 comments on commit 8e20e5d

Please sign in to comment.