Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic status reporting for exporterhelper and core exporters #8684

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/core-exporters-status.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: This PR adds a WithStatusReporting option to exporterhelper to opt-in to automatic status reporting and updates the core exporters to use it.

# One or more tracking issues or pull requests related to the change
issues: [7682]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
12 changes: 12 additions & 0 deletions component/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
}
}

// WithStatusReporting will wrap consume functions with automatic status reporting. StatusOK will be
// reported when consume returns without an error. When it returns an error, StatusRecoverableError
// will be reported. If a component wants to report a more severe error status (e.g.
// StatusPermamentError or StatusFatalError), it can report it manually while still using this
// option. The more severe statuses will transition the component state ahead of the wrapper, making
// the automatic status reporting effectively a no-op.
func WithStatusReporting() Option {
return func(o *baseExporter) {
o.reportStatus = true
}
}

// StatusSettings contains the settings for automatic status reporting.
type StatusSettings struct {
ReportOnStart bool
ReportOnConsume bool
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.StartFunc
Expand All @@ -117,6 +135,7 @@ type baseExporter struct {
marshaler RequestMarshaler
unmarshaler RequestUnmarshaler
signal component.DataType
reportStatus bool

set exporter.CreateSettings
obsrep *ObsReport
Expand Down
65 changes: 65 additions & 0 deletions exporter/exporterhelper/internal/statushelper/wrappers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package statushelper // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/statushelper"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// WrapConsumeTraces wraps a consumer.ConsumeTracesFunc to automatically report status based on the
// return value ConsumeTraces. If it returns without an error, StatusOK will be reported. If it
// with an error, StatusRecoverableError will be reported. If a component wants to report a more
// severe error status (e.g. StatusPermamentError or StatusFatalError), it can report it manually
// while still using this wrapper. The more severe statuses will transition the component state
// ahead of the wrapper, making the automatic status reporting effectively a no-op.
func WrapConsumeTraces(consumeFunc consumer.ConsumeTracesFunc, telemetry component.TelemetrySettings) consumer.ConsumeTracesFunc {
return func(ctx context.Context, td ptrace.Traces) error {
if err := consumeFunc.ConsumeTraces(ctx, td); err != nil {
telemetry.ReportStatus(component.NewRecoverableErrorEvent(err))
return err
}
telemetry.ReportStatus(component.NewStatusEvent(component.StatusOK))
return nil
}
}

// WrapConsumeMetrics wraps a consumer.ConsumeMetricsFunc to automatically report status based on the
// return value ConsumeMetrics. If it returns without an error, StatusOK will be reported. If it
// with an error, StatusRecoverableError will be reported. If a component wants to report a more
// severe error status (e.g. StatusPermamentError or StatusFatalError), it can report it manually
// while still using this wrapper. The more severe statuses will transition the component state
// ahead of the wrapper, making the automatic status reporting effectively a no-op.
func WrapConsumeMetrics(consumeFunc consumer.ConsumeMetricsFunc, telemetry component.TelemetrySettings) consumer.ConsumeMetricsFunc {
return func(ctx context.Context, md pmetric.Metrics) error {
if err := consumeFunc.ConsumeMetrics(ctx, md); err != nil {
telemetry.ReportStatus(component.NewRecoverableErrorEvent(err))
return err
}
telemetry.ReportStatus(component.NewStatusEvent(component.StatusOK))
return nil
}
}

// WrapConsumeLogs wraps a consumer.ConsumeLogsFunc to automatically report status based on the
// return value ConsumeLogs. If it returns without an error, StatusOK will be reported. If it
// with an error, StatusRecoverableError will be reported. If a component wants to report a more
// severe error status (e.g. StatusPermamentError or StatusFatalError), it can report it manually
// while still using this wrapper. The more severe statuses will transition the component state
// ahead of the wrapper, making the automatic status reporting effectively a no-op.
func WrapConsumeLogs(consumeFunc consumer.ConsumeLogsFunc, telemetry component.TelemetrySettings) consumer.ConsumeLogsFunc {
return func(ctx context.Context, ld plog.Logs) error {
if err := consumeFunc.ConsumeLogs(ctx, ld); err != nil {
telemetry.ReportStatus(component.NewRecoverableErrorEvent(err))
return err
}
telemetry.ReportStatus(component.NewStatusEvent(component.StatusOK))
return nil
}
}
131 changes: 131 additions & 0 deletions exporter/exporterhelper/internal/statushelper/wrappers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package statushelper

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestWrapConsumeTraces(t *testing.T) {
for _, tc := range []struct {
name string
retErr error
expectedEvent *component.StatusEvent
}{
{
name: "consume no error",
expectedEvent: component.NewStatusEvent(component.StatusOK),
},
{
name: "consume with error",
retErr: assert.AnError,
expectedEvent: component.NewRecoverableErrorEvent(assert.AnError),
},
} {
t.Run(tc.name, func(t *testing.T) {
telemetrySettings := componenttest.NewNopTelemetrySettings()
var lastEvent *component.StatusEvent
telemetrySettings.ReportComponentStatus = func(ev *component.StatusEvent) error {
lastEvent = ev
return nil
}

consumeFunc := func(ctx context.Context, td ptrace.Traces) error {
return tc.retErr
}

wrappedConsume := WrapConsumeTraces(consumeFunc, telemetrySettings)

assert.Equal(t, tc.retErr, wrappedConsume(context.Background(), ptrace.NewTraces()))
assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status())
assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err())

})
}
}

func TestWrapConsumeMetrics(t *testing.T) {
for _, tc := range []struct {
name string
retErr error
expectedEvent *component.StatusEvent
}{
{
name: "consume no error",
expectedEvent: component.NewStatusEvent(component.StatusOK),
},
{
name: "consume with error",
retErr: assert.AnError,
expectedEvent: component.NewRecoverableErrorEvent(assert.AnError),
},
} {
t.Run(tc.name, func(t *testing.T) {
telemetrySettings := componenttest.NewNopTelemetrySettings()
var lastEvent *component.StatusEvent
telemetrySettings.ReportComponentStatus = func(ev *component.StatusEvent) error {
lastEvent = ev
return nil
}

consumeFunc := func(ctx context.Context, md pmetric.Metrics) error {
return tc.retErr
}

wrappedConsume := WrapConsumeMetrics(consumeFunc, telemetrySettings)

assert.Equal(t, tc.retErr, wrappedConsume(context.Background(), pmetric.NewMetrics()))
assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status())
assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err())

})
}
}

func TestWrapConsumeLogs(t *testing.T) {
for _, tc := range []struct {
name string
retErr error
expectedEvent *component.StatusEvent
}{
{
name: "consume no error",
expectedEvent: component.NewStatusEvent(component.StatusOK),
},
{
name: "consume with error",
retErr: assert.AnError,
expectedEvent: component.NewRecoverableErrorEvent(assert.AnError),
},
} {
t.Run(tc.name, func(t *testing.T) {
telemetrySettings := componenttest.NewNopTelemetrySettings()
var lastEvent *component.StatusEvent
telemetrySettings.ReportComponentStatus = func(ev *component.StatusEvent) error {
lastEvent = ev
return nil
}

consumeFunc := func(ctx context.Context, ld plog.Logs) error {
return tc.retErr
}

wrappedConsume := WrapConsumeLogs(consumeFunc, telemetrySettings)

assert.Equal(t, tc.retErr, wrappedConsume(context.Background(), plog.NewLogs()))
assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status())
assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err())

})
}
}
21 changes: 17 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/statushelper"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand Down Expand Up @@ -93,14 +94,20 @@ func NewLogsExporter(
return nil, err
}

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
consumeFunc := func(ctx context.Context, ld plog.Logs) error {
req := newLogsRequest(ld, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return serr
}, be.consumerOptions...)
}

if be.reportStatus {
consumeFunc = statushelper.WrapConsumeLogs(consumeFunc, set.TelemetrySettings)
}

lc, err := consumer.NewLogs(consumeFunc, be.consumerOptions...)

return &logsExporter{
baseExporter: be,
Expand Down Expand Up @@ -135,7 +142,7 @@ func NewLogsRequestExporter(
return nil, err
}

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
consumeFunc := func(ctx context.Context, ld plog.Logs) error {
req, cErr := converter(ctx, ld)
if cErr != nil {
set.Logger.Error("Failed to convert logs. Dropping data.",
Expand All @@ -148,7 +155,13 @@ func NewLogsRequestExporter(
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return sErr
}, be.consumerOptions...)
}

if be.reportStatus {
consumeFunc = statushelper.WrapConsumeLogs(consumeFunc, set.TelemetrySettings)
}

lc, err := consumer.NewLogs(consumeFunc, be.consumerOptions...)

return &logsExporter{
baseExporter: be,
Expand Down
Loading
Loading