Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add convenience interface in consumertest that implements all consumers #2878

Merged
merged 2 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
- `batch` processor: - Support max batch size for logs (#2736)
- Use `Endpoint` for health check extension (#2782)
- Use `confignet.TCPAddr` for `pprof` and `zpages` extensions (#2829)
- Deprecate `consumetest.New[${SIGNAL}]Nop` in favor of `consumetest.NewNop` (#2878)
- Deprecate `consumetest.New[${SIGNAL}]Err` in favor of `consumetest.NewErr` (#2878)

## 🧰 Bug fixes 🧰

Expand Down
9 changes: 2 additions & 7 deletions component/componenttest/nop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
)

Expand Down Expand Up @@ -75,15 +74,11 @@ func (f *nopExporterFactory) CreateLogsExporter(

var nopExporterInstance = &nopExporter{
Component: componenthelper.New(),
Traces: consumertest.NewTracesNop(),
Metrics: consumertest.NewMetricsNop(),
Logs: consumertest.NewLogsNop(),
Consumer: consumertest.NewNop(),
}

// nopExporter stores consumed traces and metrics for testing purposes.
type nopExporter struct {
component.Component
consumer.Traces
consumer.Metrics
consumer.Logs
consumertest.Consumer
}
8 changes: 2 additions & 6 deletions component/componenttest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,13 @@ func (f *nopProcessorFactory) CreateLogsProcessor(

var nopProcessorInstance = &nopProcessor{
Component: componenthelper.New(),
Traces: consumertest.NewTracesNop(),
Metrics: consumertest.NewMetricsNop(),
Logs: consumertest.NewLogsNop(),
Consumer: consumertest.NewNop(),
}

// nopProcessor stores consumed traces and metrics for testing purposes.
type nopProcessor struct {
component.Component
consumer.Traces
consumer.Metrics
consumer.Logs
consumertest.Consumer
}

func (*nopProcessor) GetCapabilities() component.ProcessorCapabilities {
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/nop_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ func TestNewNopProcessorFactory(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &config.ProcessorSettings{TypeVal: factory.Type()}, cfg)

traces, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
traces, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, traces.GetCapabilities())
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewMetricsNop())
metrics, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, metrics.GetCapabilities())
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop())
logs, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, logs.GetCapabilities())
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/nop_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ func TestNewNopReceiverFactory(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &config.ReceiverSettings{TypeVal: factory.Type()}, cfg)

traces, err := factory.CreateTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewTracesNop())
traces, err := factory.CreateTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewMetricsNop())
metrics, err := factory.CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewLogsNop())
logs, err := factory.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.Shutdown(context.Background()))
Expand Down
40 changes: 40 additions & 0 deletions consumer/consumertest/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumertest

import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
)

// Consumer is a convenience interface that implements all consumer interfaces.
// It has a private function on it to forbid external users to implement it,
// to allow us to add extra functions without breaking compatibility because
// nobody else implements this interface.
type Consumer interface {
// ConsumeTraces to implement the consumer.Traces.
ConsumeTraces(context.Context, pdata.Traces) error
// ConsumeMetrics to implement the consumer.Metrics.
ConsumeMetrics(context.Context, pdata.Metrics) error
// ConsumeLogs to implement the consumer.Logs.
ConsumeLogs(context.Context, pdata.Logs) error
unexported()
}

var _ consumer.Logs = (Consumer)(nil)
var _ consumer.Metrics = (Consumer)(nil)
var _ consumer.Traces = (Consumer)(nil)
10 changes: 10 additions & 0 deletions consumer/consumertest/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type errConsumer struct {
err error
}

func (er *errConsumer) unexported() {}

func (er *errConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
return er.err
}
Expand All @@ -37,17 +39,25 @@ func (er *errConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
return er.err
}

// NewErr returns a Consumer that just drops all received data and returns no error.
func NewErr(err error) Consumer {
return &errConsumer{err: err}
}

// NewTracesErr returns a consumer.Traces that just drops all received data and returns the given error.
// Deprecated: Use NewErr().
func NewTracesErr(err error) consumer.Traces {
return &errConsumer{err: err}
}

// NewMetricsErr returns a consumer.Metrics that just drops all received data and returns the given error.
// Deprecated: Use NewErr().
func NewMetricsErr(err error) consumer.Metrics {
return &errConsumer{err: err}
}

// NewLogsErr returns a consumer.Logs that just drops all received data and returns the given error.
// Deprecated: Use NewErr().
func NewLogsErr(err error) consumer.Logs {
return &errConsumer{err: err}
}
10 changes: 10 additions & 0 deletions consumer/consumertest/err_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

func TestErr(t *testing.T) {
err := errors.New("my error")
ec := NewErr(err)
require.NotNil(t, ec)
assert.NotPanics(t, ec.unexported)
assert.Equal(t, err, ec.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.Equal(t, err, ec.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.Equal(t, err, ec.ConsumeTraces(context.Background(), pdata.NewTraces()))
}

func TestTracesErr(t *testing.T) {
err := errors.New("my error")
nt := NewTracesErr(err)
Expand Down
10 changes: 10 additions & 0 deletions consumer/consumertest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (

type nopConsumer struct{}

func (nc *nopConsumer) unexported() {}

func (nc *nopConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
return nil
}
Expand All @@ -39,17 +41,25 @@ func (nc *nopConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
return nil
}

// NewNop returns a Consumer that just drops all received data and returns no error.
func NewNop() Consumer {
return nopInstance
}

// NewTracesNop returns a consumer.Traces that just drops all received data and returns no error.
// Deprecated: Use NewNop().
func NewTracesNop() consumer.Traces {
return nopInstance
}

// NewMetricsNop returns a consumer.Metrics that just drops all received data and returns no error.
// Deprecated: Use NewNop().
func NewMetricsNop() consumer.Metrics {
return nopInstance
}

// NewLogsNop returns a consumer.Logs that just drops all received data and returns no error.
// Deprecated: Use NewNop().
func NewLogsNop() consumer.Logs {
return nopInstance
}
9 changes: 9 additions & 0 deletions consumer/consumertest/nop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

func TestNop(t *testing.T) {
nc := NewNop()
require.NotNil(t, nc)
assert.NotPanics(t, nc.unexported)
assert.NoError(t, nc.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, nc.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, nc.ConsumeTraces(context.Background(), pdata.NewTraces()))
}

func TestTracesNop(t *testing.T) {
nt := NewTracesNop()
require.NotNil(t, nt)
Expand Down
6 changes: 3 additions & 3 deletions consumer/fanoutconsumer/cloningconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func TestTraceProcessorCloningNotMultiplexing(t *testing.T) {
nop := consumertest.NewTracesNop()
nop := consumertest.NewNop()
tfc := NewTracesCloning([]consumer.Traces{nop})
assert.Same(t, nop, tfc)
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) {
}

func TestMetricsProcessorCloningNotMultiplexing(t *testing.T) {
nop := consumertest.NewMetricsNop()
nop := consumertest.NewNop()
mfc := NewMetrics([]consumer.Metrics{nop})
assert.Same(t, nop, mfc)
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) {
}

func TestLogsProcessorCloningNotMultiplexing(t *testing.T) {
nop := consumertest.NewLogsNop()
nop := consumertest.NewNop()
lfc := NewLogsCloning([]consumer.Logs{nop})
assert.Same(t, nop, lfc)
}
Expand Down
12 changes: 6 additions & 6 deletions consumer/fanoutconsumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestTracesProcessorNotMultiplexing(t *testing.T) {
nop := consumertest.NewTracesNop()
nop := consumertest.NewNop()
tfc := NewTraces([]consumer.Traces{nop})
assert.Same(t, nop, tfc)
}
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1] = consumertest.NewTracesErr(errors.New("my error"))
processors[1] = consumertest.NewErr(errors.New("my error"))

tfc := NewTraces(processors)
td := testdata.GenerateTraceDataOneSpan()
Expand All @@ -81,7 +81,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) {
}

func TestMetricsProcessorNotMultiplexing(t *testing.T) {
nop := consumertest.NewMetricsNop()
nop := consumertest.NewNop()
mfc := NewMetrics([]consumer.Metrics{nop})
assert.Same(t, nop, mfc)
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1] = consumertest.NewMetricsErr(errors.New("my error"))
processors[1] = consumertest.NewErr(errors.New("my error"))

mfc := NewMetrics(processors)
md := testdata.GenerateMetricsOneMetric()
Expand All @@ -135,7 +135,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) {
}

func TestLogsProcessorNotMultiplexing(t *testing.T) {
nop := consumertest.NewLogsNop()
nop := consumertest.NewNop()
lfc := NewLogs([]consumer.Logs{nop})
assert.Same(t, nop, lfc)
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestLogsProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1] = consumertest.NewLogsErr(errors.New("my error"))
processors[1] = consumertest.NewErr(errors.New("my error"))

lfc := NewLogs(processors)
ld := testdata.GenerateLogDataOneLog()
Expand Down
6 changes: 3 additions & 3 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestTraceInvalidUrl(t *testing.T) {
func TestTraceError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

startTraceReceiver(t, addr, consumertest.NewTracesErr(errors.New("my_error")))
startTraceReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startTraceExporter(t, "", fmt.Sprintf("http://%s/v1/traces", addr))

td := testdata.GenerateTraceDataOneSpan()
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestCompressionOptions(t *testing.T) {
func TestMetricsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

startMetricsReceiver(t, addr, consumertest.NewMetricsErr(errors.New("my_error")))
startMetricsReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startMetricsExporter(t, "", fmt.Sprintf("http://%s/v1/metrics", addr))

md := testdata.GenerateMetricsOneMetric()
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestMetricsRoundTrip(t *testing.T) {
func TestLogsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

startLogsReceiver(t, addr, consumertest.NewLogsErr(errors.New("my_error")))
startLogsReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startLogsExporter(t, "", fmt.Sprintf("http://%s/v1/logs", addr))

md := testdata.GenerateLogDataOneLog()
Expand Down
Loading