From 7d03122d8278840c3c6d54f83364d41173cf051a Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Sun, 17 Dec 2023 10:30:14 -0800 Subject: [PATCH 1/2] [chore] move k8sattributes processor to generated lifecycle tests --- .../generated_component_test.go | 122 ++++++++++++++++++ processor/k8sattributesprocessor/go.mod | 5 +- processor/k8sattributesprocessor/go.sum | 6 +- .../k8sattributesprocessor/metadata.yaml | 3 + 4 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 processor/k8sattributesprocessor/generated_component_test.go diff --git a/processor/k8sattributesprocessor/generated_component_test.go b/processor/k8sattributesprocessor/generated_component_test.go new file mode 100644 index 000000000000..6679cd2f6c3d --- /dev/null +++ b/processor/k8sattributesprocessor/generated_component_test.go @@ -0,0 +1,122 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package k8sattributesprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" + + "go.opentelemetry.io/collector/confmap/confmaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" +) + +// assertNoErrorHost implements a component.Host that asserts that there were no errors. +type assertNoErrorHost struct { + component.Host + *testing.T +} + +var _ component.Host = (*assertNoErrorHost)(nil) + +// newAssertNoErrorHost returns a new instance of assertNoErrorHost. +func newAssertNoErrorHost(t *testing.T) component.Host { + return &assertNoErrorHost{ + componenttest.NewNopHost(), + t, + } +} + +func (aneh *assertNoErrorHost) ReportFatalError(err error) { + assert.NoError(aneh, err) +} + +func Test_ComponentLifecycle(t *testing.T) { + factory := NewFactory() + + tests := []struct { + name string + createFn func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) + }{ + + { + name: "logs", + createFn: func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsProcessor(ctx, set, cfg, consumertest.NewNop()) + }, + }, + + { + name: "metrics", + createFn: func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateMetricsProcessor(ctx, set, cfg, consumertest.NewNop()) + }, + }, + + { + name: "traces", + createFn: func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateTracesProcessor(ctx, set, cfg, consumertest.NewNop()) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, component.UnmarshalConfig(sub, cfg)) + + for _, test := range tests { + t.Run(test.name+"-shutdown", func(t *testing.T) { + c, err := test.createFn(context.Background(), processortest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + + t.Run(test.name+"-lifecycle", func(t *testing.T) { + + c, err := test.createFn(context.Background(), processortest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + host := newAssertNoErrorHost(t) + err = c.Start(context.Background(), host) + require.NoError(t, err) + assert.NotPanics(t, func() { + switch e := c.(type) { + case processor.Logs: + logs := testdata.GenerateLogsManyLogRecordsSameResource(2) + if !e.Capabilities().MutatesData { + logs.MarkReadOnly() + } + err = e.ConsumeLogs(context.Background(), logs) + case processor.Metrics: + metrics := testdata.GenerateMetricsTwoMetrics() + if !e.Capabilities().MutatesData { + metrics.MarkReadOnly() + } + err = e.ConsumeMetrics(context.Background(), metrics) + case processor.Traces: + traces := testdata.GenerateTracesTwoSpansSameResource() + if !e.Capabilities().MutatesData { + traces.MarkReadOnly() + } + err = e.ConsumeTraces(context.Background(), traces) + } + }) + assert.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + } +} diff --git a/processor/k8sattributesprocessor/go.mod b/processor/k8sattributesprocessor/go.mod index 43cefe5dfb43..4ab05520ad7d 100644 --- a/processor/k8sattributesprocessor/go.mod +++ b/processor/k8sattributesprocessor/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.5.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest v0.92.0 github.com/stretchr/testify v1.8.4 @@ -68,7 +69,7 @@ require ( github.com/mostynb/go-grpc-compression v1.2.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect - github.com/opencontainers/image-spec v1.0.2 // indirect + github.com/opencontainers/image-spec v1.1.0-rc5 // indirect github.com/openshift/api v3.9.0+incompatible // indirect github.com/openshift/client-go v0.0.0-20210521082421-73d9475a9142 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -127,3 +128,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8ste // ambiguous import: found package cloud.google.com/go/compute/metadata in multiple modules replace cloud.google.com/go v0.54.0 => cloud.google.com/go v0.110.10 + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal diff --git a/processor/k8sattributesprocessor/go.sum b/processor/k8sattributesprocessor/go.sum index 3a8595f44db6..3dbddd42fcd8 100644 --- a/processor/k8sattributesprocessor/go.sum +++ b/processor/k8sattributesprocessor/go.sum @@ -1134,10 +1134,12 @@ github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.91.0 h1:I3MFZXcQdnATObbeKseHLEWOWMFt1jHhHCbeunBw3mE= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.91.0/go.mod h1:xHPYTciFeEEE2HnPu65FMgsCQFYNns66mqiHsMqb+HM= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= -github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= -github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= +github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= +github.com/opencontainers/image-spec v1.1.0-rc5/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8= github.com/openshift/api v0.0.0-20180801171038-322a19404e37 h1:05irGU4HK4IauGGDbsk+ZHrm1wOzMLYjMlfaiqMrBYc= github.com/openshift/api v0.0.0-20180801171038-322a19404e37/go.mod h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY= github.com/openshift/api v0.0.0-20210521075222-e273a339932a/go.mod h1:izBmoXbUu3z5kUa4FjZhvekTsyzIWiOoaIgJiZBBMQs= diff --git a/processor/k8sattributesprocessor/metadata.yaml b/processor/k8sattributesprocessor/metadata.yaml index 6791bb24f94a..30cc9f9f7e5e 100644 --- a/processor/k8sattributesprocessor/metadata.yaml +++ b/processor/k8sattributesprocessor/metadata.yaml @@ -97,3 +97,6 @@ resource_attributes: description: Container image tag. Requires container.id or k8s.container.name. type: string enabled: true + +tests: + config: From f7b2292d4ebcd8ae8b90755ca37b4850b3e9aa9e Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 9 Jan 2024 23:40:34 -0800 Subject: [PATCH 2/2] [processor/k8sattributes] implement lifecycle tests and report errors through component status --- .chloggen/k8sattributes_lifecycle.yaml | 27 +++++++++ processor/k8sattributesprocessor/factory.go | 39 +++---------- processor/k8sattributesprocessor/go.mod | 8 ++- processor/k8sattributesprocessor/go.sum | 6 +- processor/k8sattributesprocessor/processor.go | 36 +++++++++--- .../k8sattributesprocessor/processor_test.go | 58 ++++++++++++------- 6 files changed, 111 insertions(+), 63 deletions(-) create mode 100755 .chloggen/k8sattributes_lifecycle.yaml diff --git a/.chloggen/k8sattributes_lifecycle.yaml b/.chloggen/k8sattributes_lifecycle.yaml new file mode 100755 index 000000000000..52a793a630e8 --- /dev/null +++ b/.chloggen/k8sattributes_lifecycle.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Apply lifecycle tests to k8sprocessor, change its behavior to report fatal error + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30387] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index 2786851c7c8c..e994ed6df256 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -75,10 +75,7 @@ func createTracesProcessorWithOptions( next consumer.Traces, options ...option, ) (processor.Traces, error) { - kp, err := createKubernetesProcessor(set, cfg, options...) - if err != nil { - return nil, err - } + kp := createKubernetesProcessor(set, cfg, options...) return processorhelper.NewTracesProcessor( ctx, @@ -98,10 +95,7 @@ func createMetricsProcessorWithOptions( nextMetricsConsumer consumer.Metrics, options ...option, ) (processor.Metrics, error) { - kp, err := createKubernetesProcessor(set, cfg, options...) - if err != nil { - return nil, err - } + kp := createKubernetesProcessor(set, cfg, options...) return processorhelper.NewMetricsProcessor( ctx, @@ -121,10 +115,7 @@ func createLogsProcessorWithOptions( nextLogsConsumer consumer.Logs, options ...option, ) (processor.Logs, error) { - kp, err := createKubernetesProcessor(set, cfg, options...) - if err != nil { - return nil, err - } + kp := createKubernetesProcessor(set, cfg, options...) return processorhelper.NewLogsProcessor( ctx, @@ -141,26 +132,14 @@ func createKubernetesProcessor( params processor.CreateSettings, cfg component.Config, options ...option, -) (*kubernetesprocessor, error) { - kp := &kubernetesprocessor{logger: params.Logger} - - allOptions := append(createProcessorOpts(cfg), options...) - - for _, opt := range allOptions { - if err := opt(kp); err != nil { - return nil, err - } - } - - // This might have been set by an option already - if kp.kc == nil { - err := kp.initKubeClient(kp.logger, kubeClientProvider) - if err != nil { - return nil, err - } +) *kubernetesprocessor { + kp := &kubernetesprocessor{logger: params.Logger, + cfg: cfg, + options: options, + telemetrySettings: params.TelemetrySettings, } - return kp, nil + return kp } func createProcessorOpts(cfg component.Config) []option { diff --git a/processor/k8sattributesprocessor/go.mod b/processor/k8sattributesprocessor/go.mod index 4ab05520ad7d..2a1ce3d31376 100644 --- a/processor/k8sattributesprocessor/go.mod +++ b/processor/k8sattributesprocessor/go.mod @@ -35,7 +35,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/docker v24.0.7+incompatible // indirect - github.com/docker/go-connections v0.4.0 // indirect + github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect @@ -130,3 +130,9 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8ste replace cloud.google.com/go v0.54.0 => cloud.google.com/go v0.110.10 replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/processor/k8sattributesprocessor/go.sum b/processor/k8sattributesprocessor/go.sum index 3dbddd42fcd8..9215a7b6bc82 100644 --- a/processor/k8sattributesprocessor/go.sum +++ b/processor/k8sattributesprocessor/go.sum @@ -827,8 +827,8 @@ github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m3 github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= -github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= -github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= @@ -1134,8 +1134,6 @@ github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= -github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.91.0 h1:I3MFZXcQdnATObbeKseHLEWOWMFt1jHhHCbeunBw3mE= -github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.91.0/go.mod h1:xHPYTciFeEEE2HnPu65FMgsCQFYNns66mqiHsMqb+HM= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index e158a3e62f05..136d224e0a8b 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -25,14 +25,17 @@ const ( ) type kubernetesprocessor struct { - logger *zap.Logger - apiConfig k8sconfig.APIConfig - kc kube.Client - passthroughMode bool - rules kube.ExtractionRules - filters kube.Filters - podAssociations []kube.Association - podIgnore kube.Excludes + cfg component.Config + options []option + telemetrySettings component.TelemetrySettings + logger *zap.Logger + apiConfig k8sconfig.APIConfig + kc kube.Client + passthroughMode bool + rules kube.ExtractionRules + filters kube.Filters + podAssociations []kube.Association + podIgnore kube.Excludes } func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kube.ClientProvider) error { @@ -50,6 +53,23 @@ func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kub } func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error { + allOptions := append(createProcessorOpts(kp.cfg), kp.options...) + + for _, opt := range allOptions { + if err := opt(kp); err != nil { + kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) + return nil + } + } + + // This might have been set by an option already + if kp.kc == nil { + err := kp.initKubeClient(kp.logger, kubeClientProvider) + if err != nil { + kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) + return nil + } + } if !kp.passthroughMode { go kp.kc.Start() } diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index ac7bf6587dc0..94d6ee91d7fd 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -42,36 +42,48 @@ func newPodIdentifier(from string, name string, value string) kube.PodIdentifier } } -func newTracesProcessor(cfg component.Config, next consumer.Traces, options ...option) (processor.Traces, error) { +func newTracesProcessor(cfg component.Config, next consumer.Traces, errFunc func(error), options ...option) (processor.Traces, error) { opts := options opts = append(opts, withKubeClientProvider(newFakeClient)) + set := processortest.NewNopCreateSettings() + set.ReportStatus = func(event *component.StatusEvent) { + errFunc(event.Err()) + } return createTracesProcessorWithOptions( context.Background(), - processortest.NewNopCreateSettings(), + set, cfg, next, opts..., ) } -func newMetricsProcessor(cfg component.Config, nextMetricsConsumer consumer.Metrics, options ...option) (processor.Metrics, error) { +func newMetricsProcessor(cfg component.Config, nextMetricsConsumer consumer.Metrics, errFunc func(error), options ...option) (processor.Metrics, error) { opts := options opts = append(opts, withKubeClientProvider(newFakeClient)) + set := processortest.NewNopCreateSettings() + set.ReportStatus = func(event *component.StatusEvent) { + errFunc(event.Err()) + } return createMetricsProcessorWithOptions( context.Background(), - processortest.NewNopCreateSettings(), + set, cfg, nextMetricsConsumer, opts..., ) } -func newLogsProcessor(cfg component.Config, nextLogsConsumer consumer.Logs, options ...option) (processor.Logs, error) { +func newLogsProcessor(cfg component.Config, nextLogsConsumer consumer.Logs, errFunc func(error), options ...option) (processor.Logs, error) { opts := options opts = append(opts, withKubeClientProvider(newFakeClient)) + set := processortest.NewNopCreateSettings() + set.ReportStatus = func(event *component.StatusEvent) { + errFunc(event.Err()) + } return createLogsProcessorWithOptions( context.Background(), - processortest.NewNopCreateSettings(), + set, cfg, nextLogsConsumer, opts..., @@ -122,31 +134,28 @@ func newMultiTest( nextLogs: new(consumertest.LogsSink), } - tp, err := newTracesProcessor(cfg, m.nextTrace, append(options, withExtractKubernetesProcessorInto(&m.kpTrace))...) + tp, err := newTracesProcessor(cfg, m.nextTrace, errFunc, append(options, withExtractKubernetesProcessorInto(&m.kpTrace))...) + require.NoError(t, err) + err = tp.Start(context.Background(), componenttest.NewNopHost()) if errFunc == nil { assert.NotNil(t, tp) require.NoError(t, err) - } else { - assert.Nil(t, tp) - errFunc(err) } - mp, err := newMetricsProcessor(cfg, m.nextMetrics, append(options, withExtractKubernetesProcessorInto(&m.kpMetrics))...) + mp, err := newMetricsProcessor(cfg, m.nextMetrics, errFunc, append(options, withExtractKubernetesProcessorInto(&m.kpMetrics))...) + require.NoError(t, err) + err = mp.Start(context.Background(), componenttest.NewNopHost()) if errFunc == nil { assert.NotNil(t, mp) require.NoError(t, err) - } else { - assert.Nil(t, mp) - errFunc(err) } - lp, err := newLogsProcessor(cfg, m.nextLogs, append(options, withExtractKubernetesProcessorInto(&m.kpLogs))...) + lp, err := newLogsProcessor(cfg, m.nextLogs, errFunc, append(options, withExtractKubernetesProcessorInto(&m.kpLogs))...) + require.NoError(t, err) + err = lp.Start(context.Background(), componenttest.NewNopHost()) if errFunc == nil { assert.NotNil(t, lp) require.NoError(t, err) - } else { - assert.Nil(t, lp) - errFunc(err) } m.tp = tp @@ -220,7 +229,7 @@ func TestProcessorBadClientProvider(t *testing.T) { } newMultiTest(t, NewFactory().CreateDefaultConfig(), func(err error) { - assert.Error(t, err) + require.Error(t, err) assert.Equal(t, "bad client error", err.Error()) }, withKubeClientProvider(clientProvider)) } @@ -1180,10 +1189,13 @@ func TestMetricsProcessorHostname(t *testing.T) { p, err := newMetricsProcessor( NewFactory().CreateDefaultConfig(), next, + nil, withExtractMetadata(conventions.AttributeK8SPodName), withExtractKubernetesProcessorInto(&kp), ) require.NoError(t, err) + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) kc := kp.kc.(*fakeClient) // invalid ip should not be used to lookup k8s pod @@ -1250,10 +1262,13 @@ func TestMetricsProcessorHostnameWithPodAssociation(t *testing.T) { p, err := newMetricsProcessor( NewFactory().CreateDefaultConfig(), next, + nil, withExtractMetadata(conventions.AttributeK8SPodName), withExtractKubernetesProcessorInto(&kp), ) require.NoError(t, err) + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) kc := kp.kc.(*fakeClient) kp.podAssociations = []kube.Association{ { @@ -1332,6 +1347,7 @@ func TestPassthroughStart(t *testing.T) { p, err := newTracesProcessor( NewFactory().CreateDefaultConfig(), next, + nil, opts..., ) require.NoError(t, err) @@ -1346,7 +1362,7 @@ func TestRealClient(t *testing.T) { t, NewFactory().CreateDefaultConfig(), func(err error) { - assert.Error(t, err) + require.Error(t, err) assert.Equal(t, "unable to load k8s config, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined", err.Error()) }, withKubeClientProvider(kubeClientProvider), @@ -1358,6 +1374,7 @@ func TestCapabilities(t *testing.T) { p, err := newTracesProcessor( NewFactory().CreateDefaultConfig(), consumertest.NewNop(), + nil, ) assert.NoError(t, err) caps := p.Capabilities() @@ -1369,6 +1386,7 @@ func TestStartStop(t *testing.T) { p, err := newTracesProcessor( NewFactory().CreateDefaultConfig(), consumertest.NewNop(), + nil, withExtractKubernetesProcessorInto(&kp), ) require.NoError(t, err)