diff --git a/.chloggen/core-exporters-status.yaml b/.chloggen/core-exporters-status.yaml new file mode 100755 index 00000000000..068562a1850 --- /dev/null +++ b/.chloggen/core-exporters-status.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: This PR adds a WithStatusReporting option to exporterhelper to opt-in to automatic status reporting and updates the core exporters to use it. + +# One or more tracking issues or pull requests related to the change +issues: [7682] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/component/go.sum b/component/go.sum index ae955d3192b..03bb4eb3584 100644 --- a/component/go.sum +++ b/component/go.sum @@ -153,6 +153,10 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +<<<<<<< HEAD +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +======= +>>>>>>> 08f37f5e2 (Crosslink) github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -182,9 +186,17 @@ github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c/go.mod h1 github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +<<<<<<< HEAD +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +======= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +>>>>>>> 08f37f5e2 (Crosslink) github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index b5e7aa39a33..0a2b33a68ab 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -108,6 +108,24 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +// WithStatusReporting will wrap consume functions with automatic status reporting. StatusOK will be +// reported when consume returns without an error. When it returns an error, StatusRecoverableError +// will be reported. If a component wants to report a more severe error status (e.g. +// StatusPermamentError or StatusFatalError), it can report it manually while still using this +// option. The more severe statuses will transition the component state ahead of the wrapper, making +// the automatic status reporting effectively a no-op. +func WithStatusReporting() Option { + return func(o *baseExporter) { + o.reportStatus = true + } +} + +// StatusSettings contains the settings for automatic status reporting. +type StatusSettings struct { + ReportOnStart bool + ReportOnConsume bool +} + // baseExporter contains common fields between different exporter types. type baseExporter struct { component.StartFunc @@ -117,6 +135,7 @@ type baseExporter struct { marshaler RequestMarshaler unmarshaler RequestUnmarshaler signal component.DataType + reportStatus bool set exporter.CreateSettings obsrep *ObsReport diff --git a/exporter/exporterhelper/internal/statushelper/wrappers.go b/exporter/exporterhelper/internal/statushelper/wrappers.go new file mode 100644 index 00000000000..b09c67b639f --- /dev/null +++ b/exporter/exporterhelper/internal/statushelper/wrappers.go @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package statushelper // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/statushelper" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// WrapConsumeTraces wraps a consumer.ConsumeTracesFunc to automatically report status based on the +// return value ConsumeTraces. If it returns without an error, StatusOK will be reported. If it +// with an error, StatusRecoverableError will be reported. If a component wants to report a more +// severe error status (e.g. StatusPermamentError or StatusFatalError), it can report it manually +// while still using this wrapper. The more severe statuses will transition the component state +// ahead of the wrapper, making the automatic status reporting effectively a no-op. +func WrapConsumeTraces(consumeFunc consumer.ConsumeTracesFunc, telemetry component.TelemetrySettings) consumer.ConsumeTracesFunc { + return func(ctx context.Context, td ptrace.Traces) error { + if err := consumeFunc.ConsumeTraces(ctx, td); err != nil { + telemetry.ReportStatus(component.NewRecoverableErrorEvent(err)) + return err + } + telemetry.ReportStatus(component.NewStatusEvent(component.StatusOK)) + return nil + } +} + +// WrapConsumeMetrics wraps a consumer.ConsumeMetricsFunc to automatically report status based on the +// return value ConsumeMetrics. If it returns without an error, StatusOK will be reported. If it +// with an error, StatusRecoverableError will be reported. If a component wants to report a more +// severe error status (e.g. StatusPermamentError or StatusFatalError), it can report it manually +// while still using this wrapper. The more severe statuses will transition the component state +// ahead of the wrapper, making the automatic status reporting effectively a no-op. +func WrapConsumeMetrics(consumeFunc consumer.ConsumeMetricsFunc, telemetry component.TelemetrySettings) consumer.ConsumeMetricsFunc { + return func(ctx context.Context, md pmetric.Metrics) error { + if err := consumeFunc.ConsumeMetrics(ctx, md); err != nil { + telemetry.ReportStatus(component.NewRecoverableErrorEvent(err)) + return err + } + telemetry.ReportStatus(component.NewStatusEvent(component.StatusOK)) + return nil + } +} + +// WrapConsumeLogs wraps a consumer.ConsumeLogsFunc to automatically report status based on the +// return value ConsumeLogs. If it returns without an error, StatusOK will be reported. If it +// with an error, StatusRecoverableError will be reported. If a component wants to report a more +// severe error status (e.g. StatusPermamentError or StatusFatalError), it can report it manually +// while still using this wrapper. The more severe statuses will transition the component state +// ahead of the wrapper, making the automatic status reporting effectively a no-op. +func WrapConsumeLogs(consumeFunc consumer.ConsumeLogsFunc, telemetry component.TelemetrySettings) consumer.ConsumeLogsFunc { + return func(ctx context.Context, ld plog.Logs) error { + if err := consumeFunc.ConsumeLogs(ctx, ld); err != nil { + telemetry.ReportStatus(component.NewRecoverableErrorEvent(err)) + return err + } + telemetry.ReportStatus(component.NewStatusEvent(component.StatusOK)) + return nil + } +} diff --git a/exporter/exporterhelper/internal/statushelper/wrappers_test.go b/exporter/exporterhelper/internal/statushelper/wrappers_test.go new file mode 100644 index 00000000000..5a0906936fa --- /dev/null +++ b/exporter/exporterhelper/internal/statushelper/wrappers_test.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package statushelper + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestWrapConsumeTraces(t *testing.T) { + for _, tc := range []struct { + name string + retErr error + expectedEvent *component.StatusEvent + }{ + { + name: "consume no error", + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "consume with error", + retErr: assert.AnError, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + }, + } { + t.Run(tc.name, func(t *testing.T) { + telemetrySettings := componenttest.NewNopTelemetrySettings() + var lastEvent *component.StatusEvent + telemetrySettings.ReportComponentStatus = func(ev *component.StatusEvent) error { + lastEvent = ev + return nil + } + + consumeFunc := func(ctx context.Context, td ptrace.Traces) error { + return tc.retErr + } + + wrappedConsume := WrapConsumeTraces(consumeFunc, telemetrySettings) + + assert.Equal(t, tc.retErr, wrappedConsume(context.Background(), ptrace.NewTraces())) + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + + }) + } +} + +func TestWrapConsumeMetrics(t *testing.T) { + for _, tc := range []struct { + name string + retErr error + expectedEvent *component.StatusEvent + }{ + { + name: "consume no error", + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "consume with error", + retErr: assert.AnError, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + }, + } { + t.Run(tc.name, func(t *testing.T) { + telemetrySettings := componenttest.NewNopTelemetrySettings() + var lastEvent *component.StatusEvent + telemetrySettings.ReportComponentStatus = func(ev *component.StatusEvent) error { + lastEvent = ev + return nil + } + + consumeFunc := func(ctx context.Context, md pmetric.Metrics) error { + return tc.retErr + } + + wrappedConsume := WrapConsumeMetrics(consumeFunc, telemetrySettings) + + assert.Equal(t, tc.retErr, wrappedConsume(context.Background(), pmetric.NewMetrics())) + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + + }) + } +} + +func TestWrapConsumeLogs(t *testing.T) { + for _, tc := range []struct { + name string + retErr error + expectedEvent *component.StatusEvent + }{ + { + name: "consume no error", + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "consume with error", + retErr: assert.AnError, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + }, + } { + t.Run(tc.name, func(t *testing.T) { + telemetrySettings := componenttest.NewNopTelemetrySettings() + var lastEvent *component.StatusEvent + telemetrySettings.ReportComponentStatus = func(ev *component.StatusEvent) error { + lastEvent = ev + return nil + } + + consumeFunc := func(ctx context.Context, ld plog.Logs) error { + return tc.retErr + } + + wrappedConsume := WrapConsumeLogs(consumeFunc, telemetrySettings) + + assert.Equal(t, tc.retErr, wrappedConsume(context.Background(), plog.NewLogs())) + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + + }) + } +} diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index f08bf0e6da6..36286ade0ef 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/statushelper" "go.opentelemetry.io/collector/pdata/plog" ) @@ -93,14 +94,20 @@ func NewLogsExporter( return nil, err } - lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + consumeFunc := func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ld, pusher) serr := be.send(ctx, req) if errors.Is(serr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount())) } return serr - }, be.consumerOptions...) + } + + if be.reportStatus { + consumeFunc = statushelper.WrapConsumeLogs(consumeFunc, set.TelemetrySettings) + } + + lc, err := consumer.NewLogs(consumeFunc, be.consumerOptions...) return &logsExporter{ baseExporter: be, @@ -135,7 +142,7 @@ func NewLogsRequestExporter( return nil, err } - lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + consumeFunc := func(ctx context.Context, ld plog.Logs) error { req, cErr := converter(ctx, ld) if cErr != nil { set.Logger.Error("Failed to convert logs. Dropping data.", @@ -148,7 +155,13 @@ func NewLogsRequestExporter( be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount())) } return sErr - }, be.consumerOptions...) + } + + if be.reportStatus { + consumeFunc = statushelper.WrapConsumeLogs(consumeFunc, set.TelemetrySettings) + } + + lc, err := consumer.NewLogs(consumeFunc, be.consumerOptions...) return &logsExporter{ baseExporter: be, diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 92bbd48edac..9b572c8d22a 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -359,6 +359,125 @@ func TestLogsRequestExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, le.Shutdown(context.Background()), want) } +func TestLogsExporter_WithStatusReporting(t *testing.T) { + for _, tc := range []struct { + name string + reportStatus bool + expectedEvent *component.StatusEvent + consumeErr error + }{ + { + name: "Report status enabled / no error", + reportStatus: true, + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "Report status enabled / with error", + reportStatus: true, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + consumeErr: assert.AnError, + }, + { + name: "Report status disabled / no error", + reportStatus: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ld := plog.NewLogs() + createSettings := exportertest.NewNopCreateSettings() + + var lastEvent *component.StatusEvent + createSettings.TelemetrySettings.ReportStatus = func(ev *component.StatusEvent) { + lastEvent = ev + } + + var opts []Option + if tc.reportStatus { + opts = append(opts, WithStatusReporting()) + } + + le, err := NewLogsExporter( + context.Background(), + createSettings, + &fakeLogsExporterConfig, + newPushLogsData(tc.consumeErr), + opts..., + ) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.Nil(t, lastEvent) + assert.Equal(t, tc.consumeErr, le.ConsumeLogs(context.Background(), ld)) + assert.Equal(t, tc.expectedEvent == nil, lastEvent == nil) + + if tc.expectedEvent != nil { + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + } + }) + } +} + +func TestLogsRequestExporter_WithStatusReporting(t *testing.T) { + for _, tc := range []struct { + name string + reportStatus bool + expectedEvent *component.StatusEvent + consumeErr error + }{ + { + name: "Report status enabled / no error", + reportStatus: true, + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "Report status enabled / with error", + reportStatus: true, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + consumeErr: assert.AnError, + }, + { + name: "Report status disabled / no error", + reportStatus: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ld := plog.NewLogs() + createSettings := exportertest.NewNopCreateSettings() + + var lastEvent *component.StatusEvent + createSettings.TelemetrySettings.ReportStatus = func(ev *component.StatusEvent) { + lastEvent = ev + } + + var opts []Option + if tc.reportStatus { + opts = append(opts, WithStatusReporting()) + } + + le, err := NewLogsRequestExporter( + context.Background(), + createSettings, + (&fakeRequestConverter{requestError: tc.consumeErr}).requestFromLogsFunc, + opts..., + ) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.Nil(t, lastEvent) + assert.Equal(t, tc.consumeErr, le.ConsumeLogs(context.Background(), ld)) + assert.Equal(t, tc.expectedEvent == nil, lastEvent == nil) + + if tc.expectedEvent != nil { + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + } + }) + } +} + func newPushLogsData(retError error) consumer.ConsumeLogsFunc { return func(ctx context.Context, td plog.Logs) error { return retError diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 7360d548962..e49268b2699 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/statushelper" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -93,14 +94,20 @@ func NewMetricsExporter( return nil, err } - mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { + consumeFunc := func(ctx context.Context, md pmetric.Metrics) error { req := newMetricsRequest(md, pusher) serr := be.send(ctx, req) if errors.Is(serr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount())) } return serr - }, be.consumerOptions...) + } + + if be.reportStatus { + consumeFunc = statushelper.WrapConsumeMetrics(consumeFunc, set.TelemetrySettings) + } + + mc, err := consumer.NewMetrics(consumeFunc, be.consumerOptions...) return &metricsExporter{ baseExporter: be, @@ -135,7 +142,7 @@ func NewMetricsRequestExporter( return nil, err } - mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { + consumeFunc := func(ctx context.Context, md pmetric.Metrics) error { req, cErr := converter(ctx, md) if cErr != nil { set.Logger.Error("Failed to convert metrics. Dropping data.", @@ -148,7 +155,13 @@ func NewMetricsRequestExporter( be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount())) } return sErr - }, be.consumerOptions...) + } + + if be.reportStatus { + consumeFunc = statushelper.WrapConsumeMetrics(consumeFunc, set.TelemetrySettings) + } + + mc, err := consumer.NewMetrics(consumeFunc, be.consumerOptions...) return &metricsExporter{ baseExporter: be, diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 770ea801e48..a9291dc7cc1 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -365,6 +365,125 @@ func TestMetricsRequestExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, me.Shutdown(context.Background())) } +func TestMetricsExporter_WithStatusReporting(t *testing.T) { + for _, tc := range []struct { + name string + reportStatus bool + expectedEvent *component.StatusEvent + consumeErr error + }{ + { + name: "Report status enabled / no error", + reportStatus: true, + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "Report status enabled / with error", + reportStatus: true, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + consumeErr: assert.AnError, + }, + { + name: "Report status disabled / no error", + reportStatus: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + md := pmetric.NewMetrics() + createSettings := exportertest.NewNopCreateSettings() + + var lastEvent *component.StatusEvent + createSettings.TelemetrySettings.ReportStatus = func(ev *component.StatusEvent) { + lastEvent = ev + } + + var opts []Option + if tc.reportStatus { + opts = append(opts, WithStatusReporting()) + } + + me, err := NewMetricsExporter( + context.Background(), + createSettings, + fakeRequestConverter{}, + newPushMetricsData(tc.consumeErr), + opts..., + ) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Nil(t, lastEvent) + assert.Equal(t, tc.consumeErr, me.ConsumeMetrics(context.Background(), md)) + assert.Equal(t, tc.expectedEvent == nil, lastEvent == nil) + + if tc.expectedEvent != nil { + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + } + }) + } +} + +func TestMetricsRequestExporter_WithStatusReporting(t *testing.T) { + for _, tc := range []struct { + name string + reportStatus bool + expectedEvent *component.StatusEvent + consumeErr error + }{ + { + name: "Report status enabled / no error", + reportStatus: true, + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "Report status enabled / with error", + reportStatus: true, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + consumeErr: assert.AnError, + }, + { + name: "Report status disabled / no error", + reportStatus: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + md := pmetric.NewMetrics() + createSettings := exportertest.NewNopCreateSettings() + + var lastEvent *component.StatusEvent + createSettings.TelemetrySettings.ReportStatus = func(ev *component.StatusEvent) { + lastEvent = ev + } + + var opts []Option + if tc.reportStatus { + opts = append(opts, WithStatusReporting()) + } + + me, err := NewMetricsRequestExporter( + context.Background(), + createSettings, + (&fakeRequestConverter{requestError: tc.consumeErr}).requestFromMetricsFunc, + opts..., + ) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Nil(t, lastEvent) + assert.Equal(t, tc.consumeErr, me.ConsumeMetrics(context.Background(), md)) + assert.Equal(t, tc.expectedEvent == nil, lastEvent == nil) + + if tc.expectedEvent != nil { + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + } + }) + } +} + func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return func(ctx context.Context, td pmetric.Metrics) error { return retError diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 53c6533cc2d..d91faafc80c 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/statushelper" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -93,14 +94,20 @@ func NewTracesExporter( return nil, err } - tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + consumeFunc := func(ctx context.Context, td ptrace.Traces) error { req := newTracesRequest(td, pusher) serr := be.send(ctx, req) if errors.Is(serr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount())) } return serr - }, be.consumerOptions...) + } + + if be.reportStatus { + consumeFunc = statushelper.WrapConsumeTraces(consumeFunc, set.TelemetrySettings) + } + + tc, err := consumer.NewTraces(consumeFunc, be.consumerOptions...) return &traceExporter{ baseExporter: be, @@ -135,7 +142,7 @@ func NewTracesRequestExporter( return nil, err } - tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + consumeFunc := func(ctx context.Context, td ptrace.Traces) error { req, cErr := converter(ctx, td) if cErr != nil { set.Logger.Error("Failed to convert traces. Dropping data.", @@ -148,7 +155,13 @@ func NewTracesRequestExporter( be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount())) } return sErr - }, be.consumerOptions...) + } + + if be.reportStatus { + consumeFunc = statushelper.WrapConsumeTraces(consumeFunc, set.TelemetrySettings) + } + + tc, err := consumer.NewTraces(consumeFunc, be.consumerOptions...) return &traceExporter{ baseExporter: be, diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index d99b9025c91..ab5c0e20985 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -366,6 +366,125 @@ func TestTracesRequestExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, te.Shutdown(context.Background()), want) } +func TestTracesExporter_WithStatusReporting(t *testing.T) { + for _, tc := range []struct { + name string + reportStatus bool + expectedEvent *component.StatusEvent + consumeErr error + }{ + { + name: "Report status enabled / no error", + reportStatus: true, + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "Report status enabled / with error", + reportStatus: true, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + consumeErr: assert.AnError, + }, + { + name: "Report status disabled / no error", + reportStatus: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + td := ptrace.NewTraces() + createSettings := exportertest.NewNopCreateSettings() + + var lastEvent *component.StatusEvent + createSettings.TelemetrySettings.ReportStatus = func(ev *component.StatusEvent) { + lastEvent = ev + } + + var opts []Option + if tc.reportStatus { + opts = append(opts, WithStatusReporting()) + } + + te, err := NewTracesExporter( + context.Background(), + createSettings, + fakeRequestConverter{}, + newTraceDataPusher(tc.consumeErr), + opts..., + ) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.Nil(t, lastEvent) + assert.Equal(t, tc.consumeErr, te.ConsumeTraces(context.Background(), td)) + assert.Equal(t, tc.expectedEvent == nil, lastEvent == nil) + + if tc.expectedEvent != nil { + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + } + }) + } +} + +func TestTracesRequestExporter_WithStatusReporting(t *testing.T) { + for _, tc := range []struct { + name string + reportStatus bool + expectedEvent *component.StatusEvent + consumeErr error + }{ + { + name: "Report status enabled / no error", + reportStatus: true, + expectedEvent: component.NewStatusEvent(component.StatusOK), + }, + { + name: "Report status enabled / with error", + reportStatus: true, + expectedEvent: component.NewRecoverableErrorEvent(assert.AnError), + consumeErr: assert.AnError, + }, + { + name: "Report status enabled / no error", + reportStatus: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + td := ptrace.NewTraces() + createSettings := exportertest.NewNopCreateSettings() + + var lastEvent *component.StatusEvent + createSettings.TelemetrySettings.ReportStatus = func(ev *component.StatusEvent) { + lastEvent = ev + } + + var opts []Option + if tc.reportStatus { + opts = append(opts, WithStatusReporting()) + } + + te, err := NewTracesRequestExporter( + context.Background(), + createSettings, + (&fakeRequestConverter{requestError: tc.consumeErr}).requestFromTracesFunc, + opts..., + ) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.Nil(t, lastEvent) + assert.Equal(t, tc.consumeErr, te.ConsumeTraces(context.Background(), td)) + assert.Equal(t, tc.expectedEvent == nil, lastEvent == nil) + + if tc.expectedEvent != nil { + assert.Equal(t, tc.expectedEvent.Status(), lastEvent.Status()) + assert.Equal(t, tc.expectedEvent.Err(), lastEvent.Err()) + } + }) + } +} + func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { return func(ctx context.Context, td ptrace.Traces) error { return retError diff --git a/exporter/internal/common/factory.go b/exporter/internal/common/factory.go index 95eb888e819..c06cb94bc5d 100644 --- a/exporter/internal/common/factory.go +++ b/exporter/internal/common/factory.go @@ -37,6 +37,7 @@ func CreateTracesExporter(ctx context.Context, set exporter.CreateSettings, conf exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithShutdown(otlptext.LoggerSync(exporterLogger)), + exporterhelper.WithStatusReporting(), ) } @@ -48,6 +49,7 @@ func CreateMetricsExporter(ctx context.Context, set exporter.CreateSettings, con exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithShutdown(otlptext.LoggerSync(exporterLogger)), + exporterhelper.WithStatusReporting(), ) } @@ -59,6 +61,7 @@ func CreateLogsExporter(ctx context.Context, set exporter.CreateSettings, config exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithShutdown(otlptext.LoggerSync(exporterLogger)), + exporterhelper.WithStatusReporting(), ) } diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 95685eae5a9..d388ec22e72 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -60,7 +60,9 @@ func createTracesExporter( exporterhelper.WithRetry(oCfg.RetryConfig), exporterhelper.WithQueue(oCfg.QueueConfig), exporterhelper.WithStart(oce.start), - exporterhelper.WithShutdown(oce.shutdown)) + exporterhelper.WithShutdown(oce.shutdown), + exporterhelper.WithStatusReporting(), + ) } func createMetricsExporter( @@ -81,6 +83,7 @@ func createMetricsExporter( exporterhelper.WithQueue(oCfg.QueueConfig), exporterhelper.WithStart(oce.start), exporterhelper.WithShutdown(oce.shutdown), + exporterhelper.WithStatusReporting(), ) } @@ -102,5 +105,6 @@ func createLogsExporter( exporterhelper.WithQueue(oCfg.QueueConfig), exporterhelper.WithStart(oce.start), exporterhelper.WithShutdown(oce.shutdown), + exporterhelper.WithStatusReporting(), ) } diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 2c60c39ee4a..d78165619e3 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -92,7 +92,7 @@ func (e *baseExporter) shutdown(context.Context) error { func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { req := ptraceotlp.NewExportRequestFromTraces(td) resp, respErr := e.traceExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { + if err := e.processError(respErr); err != nil { return err } partialSuccess := resp.PartialSuccess() @@ -105,7 +105,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { req := pmetricotlp.NewExportRequestFromMetrics(md) resp, respErr := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { + if err := e.processError(respErr); err != nil { return err } partialSuccess := resp.PartialSuccess() @@ -118,7 +118,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { req := plogotlp.NewExportRequestFromLogs(ld) resp, respErr := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { + if err := e.processError(respErr); err != nil { return err } partialSuccess := resp.PartialSuccess() @@ -135,7 +135,7 @@ func (e *baseExporter) enhanceContext(ctx context.Context) context.Context { return ctx } -func processError(err error) error { +func (e *baseExporter) processError(err error) error { if err == nil { // Request is successful, we are done. return nil @@ -150,6 +150,10 @@ func processError(err error) error { // Now, this is this a real error. + if isComponentPermanentError(st) { + e.settings.ReportStatus(component.NewPermanentErrorEvent(err)) + } + retryInfo := getRetryInfo(st) if !shouldRetry(st.Code(), retryInfo) { @@ -206,3 +210,19 @@ func getThrottleDuration(t *errdetails.RetryInfo) time.Duration { } return 0 } + +// A component status of PermanentError indicates the component is in a state that will require user +// intervention to fix. Typically this is a misconfiguration detected at runtime. +func isComponentPermanentError(st *status.Status) bool { + switch st.Code() { + case codes.NotFound: + return true + case codes.PermissionDenied: + return true + case codes.Unauthenticated: + return true + default: + return false + } + +} diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index 419bb3a6467..bddb6d27322 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configopaque" @@ -651,6 +652,82 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) { }, 10*time.Second, 5*time.Millisecond, "Should retry if RetryInfo is included into status details by the server.") } +func TestComponentStatus(t *testing.T) { + for _, tc := range []struct { + name string + exportError error + componentStatus component.Status + }{ + { + name: "No Error", + exportError: nil, + componentStatus: component.StatusOK, + }, + { + name: "Permission Denied", + exportError: status.Error(codes.PermissionDenied, "permission denied"), + componentStatus: component.StatusPermanentError, + }, + { + name: "Not Found", + exportError: status.Error(codes.NotFound, "not found"), + componentStatus: component.StatusPermanentError, + }, + { + name: "Unauthenticated", + exportError: status.Error(codes.Unauthenticated, "unauthenticated"), + componentStatus: component.StatusPermanentError, + }, + { + name: "Resource Exhausted", + exportError: status.Error(codes.ResourceExhausted, "resource exhausted"), + componentStatus: component.StatusRecoverableError, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) + rcv.setExportError(tc.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + var lastStatus component.Status + set := exportertest.NewNopCreateSettings() + set.TelemetrySettings.ReportStatus = func(ev *component.StatusEvent) { + // simulate the finite-state machine used in real world status reporting + if lastStatus != component.StatusPermanentError { + lastStatus = ev.Status() + } + } + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) + + td := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), td) + + assert.Equal(t, tc.componentStatus != component.StatusOK, err != nil) + assert.Equal(t, tc.componentStatus, lastStatus) + }) + } +} + func startServerAndMakeRequest(t *testing.T, exp exporter.Traces, td ptrace.Traces, ln net.Listener) { rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) defer rcv.srv.GracefulStop() diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 3ed71d626d0..8a3d236c975 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -89,7 +89,9 @@ func createTracesExporter( // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(oCfg.RetryConfig), - exporterhelper.WithQueue(oCfg.QueueConfig)) + exporterhelper.WithQueue(oCfg.QueueConfig), + exporterhelper.WithStatusReporting(), + ) } func createMetricsExporter( @@ -115,7 +117,9 @@ func createMetricsExporter( // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(oCfg.RetryConfig), - exporterhelper.WithQueue(oCfg.QueueConfig)) + exporterhelper.WithQueue(oCfg.QueueConfig), + exporterhelper.WithStatusReporting(), + ) } func createLogsExporter( @@ -141,5 +145,7 @@ func createLogsExporter( // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(oCfg.RetryConfig), - exporterhelper.WithQueue(oCfg.QueueConfig)) + exporterhelper.WithQueue(oCfg.QueueConfig), + exporterhelper.WithStatusReporting(), + ) } diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index b118b72f2a3..6ae06cc93f5 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -118,6 +118,7 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p e.logger.Debug("Preparing to make HTTP request", zap.String("url", url)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) if err != nil { + e.settings.ReportStatus(component.NewPermanentErrorEvent(err)) return consumererror.NewPermanent(err) } req.Header.Set("Content-Type", protobufContentType) @@ -169,6 +170,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(retryAfter)*time.Second) } + if isComponentPermanentError(resp.StatusCode) { + e.settings.ReportStatus(component.NewPermanentErrorEvent(formattedErr)) + } + return consumererror.NewPermanent(formattedErr) } @@ -189,6 +194,29 @@ func isRetryableStatusCode(code int) bool { } } +// A component status of PermanentError indicates the component is in a state that will require user +// intervention to fix. Typically this is a misconfiguration detected at runtime. +func isComponentPermanentError(code int) bool { + switch code { + case http.StatusUnauthorized: + return true + case http.StatusForbidden: + return true + case http.StatusNotFound: + return true + case http.StatusMethodNotAllowed: + return true + case http.StatusRequestEntityTooLarge: + return true + case http.StatusRequestURITooLong: + return true + case http.StatusRequestHeaderFieldsTooLarge: + return true + default: + return false + } +} + func readResponseBody(resp *http.Response) ([]byte, error) { if resp.ContentLength == 0 { return nil, nil diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 578e6af3ace..94f6ac60e79 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" @@ -600,6 +601,107 @@ func TestPartialSuccess_metrics(t *testing.T) { require.Error(t, err) } +func TestComponentStatus(t *testing.T) { + tests := []struct { + name string + responseStatus int + componentStatus component.Status + }{ + { + name: "200", + responseStatus: http.StatusOK, + componentStatus: component.StatusOK, + }, + { + name: "401", + responseStatus: http.StatusUnauthorized, + componentStatus: component.StatusPermanentError, + }, + { + name: "403", + responseStatus: http.StatusForbidden, + componentStatus: component.StatusPermanentError, + }, + { + name: "404", + responseStatus: http.StatusNotFound, + componentStatus: component.StatusPermanentError, + }, + { + name: "405", + responseStatus: http.StatusMethodNotAllowed, + componentStatus: component.StatusPermanentError, + }, + { + name: "413", + responseStatus: http.StatusRequestEntityTooLarge, + componentStatus: component.StatusPermanentError, + }, + { + name: "414", + responseStatus: http.StatusRequestURITooLong, + componentStatus: component.StatusPermanentError, + }, + { + name: "419", + responseStatus: http.StatusTooManyRequests, + componentStatus: component.StatusRecoverableError, + }, + { + name: "431", + responseStatus: http.StatusRequestHeaderFieldsTooLarge, + componentStatus: component.StatusPermanentError, + }, + { + name: "503", + responseStatus: http.StatusServiceUnavailable, + componentStatus: component.StatusRecoverableError, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(tc.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), + // Create without QueueSettings and RetrySettings so that ConsumeTraces + // returns the errors that we want to check immediately. + } + + var lastStatus component.Status + set := exportertest.NewNopCreateSettings() + set.TelemetrySettings.ReportStatus = func(ev *component.StatusEvent) { + // simulate the finite-state machine used in real world status reporting + if lastStatus != component.StatusPermanentError { + lastStatus = ev.Status() + } + } + + exp, err := createTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate traces + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + if tc.componentStatus != component.StatusOK { + assert.Error(t, err) + } + assert.Equal(t, tc.componentStatus, lastStatus) + }) + } +} + func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(endpoint, handler)