From 5762e98e761cadb93ab8a01eb01e3e8d90c6a9e0 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 12 Dec 2023 16:24:53 -0800 Subject: [PATCH] Remove circular dependency between default otel and connector Signed-off-by: Bogdan Drutu --- .chloggen/deprecatehelpers.yaml | 13 ++ .chloggen/rmcircular.yaml | 13 ++ config/configgrpc/go.mod | 2 - config/confighttp/go.mod | 2 - connector/connector.go | 19 --- connector/connectortest/router.go | 28 ++-- connector/go.mod | 2 +- connector/logs_router.go | 75 +++++++++ connector/logs_router_test.go | 156 ++++++++++++++++++ connector/metrics_router.go | 75 +++++++++ connector/metrics_router_test.go | 156 ++++++++++++++++++ connector/traces_router.go | 75 +++++++++ connector/traces_router_test.go | 156 ++++++++++++++++++ consumer/go.mod | 2 - exporter/debugexporter/go.mod | 2 - exporter/go.mod | 2 - exporter/loggingexporter/go.mod | 2 - exporter/otlpexporter/go.mod | 2 - exporter/otlphttpexporter/go.mod | 2 - extension/ballastextension/go.mod | 2 - extension/zpagesextension/go.mod | 2 - go.mod | 3 - internal/fanoutconsumer/logs.go | 50 ------ internal/fanoutconsumer/logs_test.go | 134 --------------- internal/fanoutconsumer/metrics.go | 50 ------ internal/fanoutconsumer/metrics_test.go | 134 --------------- internal/fanoutconsumer/traces.go | 50 ------ internal/fanoutconsumer/traces_test.go | 134 --------------- processor/batchprocessor/go.mod | 2 - processor/go.mod | 2 - processor/memorylimiterprocessor/go.mod | 2 - receiver/go.mod | 2 - receiver/otlpreceiver/go.mod | 4 +- service/internal/graph/nodes.go | 6 +- .../internal/testcomponents/example_router.go | 6 +- .../testcomponents/example_router_test.go | 8 +- 36 files changed, 746 insertions(+), 629 deletions(-) create mode 100755 .chloggen/deprecatehelpers.yaml create mode 100755 .chloggen/rmcircular.yaml create mode 100644 connector/logs_router.go create mode 100644 connector/logs_router_test.go create mode 100644 connector/metrics_router.go create mode 100644 connector/metrics_router_test.go create mode 100644 connector/traces_router.go create mode 100644 connector/traces_router_test.go diff --git a/.chloggen/deprecatehelpers.yaml b/.chloggen/deprecatehelpers.yaml new file mode 100755 index 00000000000..f206a0b51ba --- /dev/null +++ b/.chloggen/deprecatehelpers.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'deprecation' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: 'connectortest' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Deprecate connectortest.New[Metrics|Logs|Traces]Router in favour of connector.New[Metrics|Logs|Traces]Router" + +# One or more tracking issues or pull requests related to the change +issues: [9095] diff --git a/.chloggen/rmcircular.yaml b/.chloggen/rmcircular.yaml new file mode 100755 index 00000000000..7fa147cecb4 --- /dev/null +++ b/.chloggen/rmcircular.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'deprecation' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: 'connector' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Deprecate [Metrics|Logs|Traces]Router in favour of [Metrics|Logs|Traces]RouterAndConsumer" + +# One or more tracking issues or pull requests related to the change +issues: [9095] diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index 2ad1e24b493..fd7dc3d068a 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -102,8 +102,6 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter replace go.opentelemetry.io/collector/receiver => ../../receiver -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/featuregate => ../../featuregate replace go.opentelemetry.io/collector/pdata => ../../pdata diff --git a/config/confighttp/go.mod b/config/confighttp/go.mod index 30b07deb7fb..7ab2ba6f887 100644 --- a/config/confighttp/go.mod +++ b/config/confighttp/go.mod @@ -79,8 +79,6 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter replace go.opentelemetry.io/collector/receiver => ../../receiver -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/featuregate => ../../featuregate replace go.opentelemetry.io/collector/pdata => ../../pdata diff --git a/connector/connector.go b/connector/connector.go index dccf21fa49d..9fc576cb54e 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -30,13 +30,6 @@ type Traces interface { consumer.Traces } -// TracesRouter feeds the first consumer.Traces in each of the specified pipelines. -// The router will create a fanout consumer for the set of pipelines and return a uuid -type TracesRouter interface { - Consumer(...component.ID) (consumer.Traces, error) - PipelineIDs() []component.ID -} - // A Metrics connector acts as an exporter from a metrics pipeline and a receiver // to one or more traces, metrics, or logs pipelines. // Metrics feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data. @@ -53,12 +46,6 @@ type Metrics interface { consumer.Metrics } -// MetricsRouter feeds the first consumer.Metrics in each of the specified pipelines. -type MetricsRouter interface { - Consumer(...component.ID) (consumer.Metrics, error) - PipelineIDs() []component.ID -} - // A Logs connector acts as an exporter from a logs pipeline and a receiver // to one or more traces, metrics, or logs pipelines. // Logs feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data. @@ -74,12 +61,6 @@ type Logs interface { consumer.Logs } -// LogsRouter feeds the first consumer.Logs in each of the specified pipelines. -type LogsRouter interface { - Consumer(...component.ID) (consumer.Logs, error) - PipelineIDs() []component.ID -} - // CreateSettings configures Connector creators. type CreateSettings struct { // ID returns the ID of the component that will be created. diff --git a/connector/connectortest/router.go b/connector/connectortest/router.go index 390e9502c59..c4a73c17300 100644 --- a/connector/connectortest/router.go +++ b/connector/connectortest/router.go @@ -8,77 +8,79 @@ import ( "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/internal/fanoutconsumer" ) +// Deprecated: [v0.92.0] use connector.NewTracesRouter. type TracesRouterOption struct { id component.ID cons consumer.Traces } -// WithNopTraces creates a nop consumer for a connector.TracesRouter +// Deprecated: [v0.92.0] use connector.NewTracesRouter. func WithNopTraces(id component.ID) TracesRouterOption { return TracesRouterOption{id: id, cons: consumertest.NewNop()} } -// WithTracesSink adds a consumer to a connector.TracesRouter +// Deprecated: [v0.92.0] use connector.NewTracesRouter. func WithTracesSink(id component.ID, sink *consumertest.TracesSink) TracesRouterOption { return TracesRouterOption{id: id, cons: sink} } -// NewTracesRouter returns a connector.TracesRouter with sinks based on the options provided +// Deprecated: [v0.92.0] use connector.NewTracesRouter. func NewTracesRouter(opts ...TracesRouterOption) connector.TracesRouter { consumers := make(map[component.ID]consumer.Traces) for _, opt := range opts { consumers[opt.id] = opt.cons } - return fanoutconsumer.NewTracesRouter(consumers).(connector.TracesRouter) + return connector.NewTracesRouter(consumers) } +// Deprecated: [v0.92.0] use connector.NewMetricsRouter. type MetricsRouterOption struct { id component.ID cons consumer.Metrics } -// WithNopMetrics creates a nop consumer for a connector.MetricsRouter +// Deprecated: [v0.92.0] use connector.NewMetricsRouter. func WithNopMetrics(id component.ID) MetricsRouterOption { return MetricsRouterOption{id: id, cons: consumertest.NewNop()} } -// WithMetricsSink adds a consumer to a connector.MetricsRouter +// Deprecated: [v0.92.0] use connector.NewMetricsRouter. func WithMetricsSink(id component.ID, sink *consumertest.MetricsSink) MetricsRouterOption { return MetricsRouterOption{id: id, cons: sink} } -// NewMetricsRouter returns a connector.MetricsRouter with sinks based on the options provided +// Deprecated: [v0.92.0] use connector.NewMetricsRouter. func NewMetricsRouter(opts ...MetricsRouterOption) connector.MetricsRouter { consumers := make(map[component.ID]consumer.Metrics) for _, opt := range opts { consumers[opt.id] = opt.cons } - return fanoutconsumer.NewMetricsRouter(consumers).(connector.MetricsRouter) + return connector.NewMetricsRouter(consumers) } +// Deprecated: [v0.92.0] use connector.NewLogsRouter. type LogsRouterOption struct { id component.ID cons consumer.Logs } -// WithNopLogs creates a nop consumer for a connector.LogsRouter +// Deprecated: [v0.92.0] use connector.NewLogsRouter. func WithNopLogs(id component.ID) LogsRouterOption { return LogsRouterOption{id: id, cons: consumertest.NewNop()} } -// WithLogsSink adds a consumer to a connector.LogsRouter +// Deprecated: [v0.92.0] use connector.NewLogsRouter. func WithLogsSink(id component.ID, sink *consumertest.LogsSink) LogsRouterOption { return LogsRouterOption{id: id, cons: sink} } -// NewLogsRouter returns a connector.LogsRouter with sinks based on the options provided +// Deprecated: [v0.92.0] use connector.NewLogsRouter. func NewLogsRouter(opts ...LogsRouterOption) connector.LogsRouter { consumers := make(map[component.ID]consumer.Logs) for _, opt := range opts { consumers[opt.id] = opt.cons } - return fanoutconsumer.NewLogsRouter(consumers).(connector.LogsRouter) + return connector.NewLogsRouter(consumers) } diff --git a/connector/go.mod b/connector/go.mod index 1d19c3611c8..66d28697fac 100644 --- a/connector/go.mod +++ b/connector/go.mod @@ -8,6 +8,7 @@ require ( go.opentelemetry.io/collector/component v0.91.0 go.opentelemetry.io/collector/consumer v0.91.0 go.opentelemetry.io/collector/pdata v1.0.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 ) @@ -32,7 +33,6 @@ require ( go.opentelemetry.io/otel v1.21.0 // indirect go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/trace v1.21.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/connector/logs_router.go b/connector/logs_router.go new file mode 100644 index 00000000000..e5594327ffc --- /dev/null +++ b/connector/logs_router.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connector // import "go.opentelemetry.io/collector/connector" + +import ( + "fmt" + + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/internal/fanoutconsumer" +) + +// Deprecated: [v0.92.0] use LogsRouterAndConsumer +type LogsRouter interface { + Consumer(...component.ID) (consumer.Logs, error) + PipelineIDs() []component.ID +} + +// LogsRouterAndConsumer feeds the first consumer.Logs in each of the specified pipelines. +type LogsRouterAndConsumer interface { + consumer.Logs + Consumer(...component.ID) (consumer.Logs, error) + PipelineIDs() []component.ID + privateFunc() +} + +type logsRouter struct { + consumer.Logs + consumers map[component.ID]consumer.Logs +} + +func NewLogsRouter(cm map[component.ID]consumer.Logs) LogsRouterAndConsumer { + consumers := make([]consumer.Logs, 0, len(cm)) + for _, consumer := range cm { + consumers = append(consumers, consumer) + } + return &logsRouter{ + Logs: fanoutconsumer.NewLogs(consumers), + consumers: cm, + } +} + +func (r *logsRouter) PipelineIDs() []component.ID { + ids := make([]component.ID, 0, len(r.consumers)) + for id := range r.consumers { + ids = append(ids, id) + } + return ids +} + +func (r *logsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Logs, error) { + if len(pipelineIDs) == 0 { + return nil, fmt.Errorf("missing consumers") + } + consumers := make([]consumer.Logs, 0, len(pipelineIDs)) + var errors error + for _, pipelineID := range pipelineIDs { + c, ok := r.consumers[pipelineID] + if ok { + consumers = append(consumers, c) + } else { + errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) + } + } + if errors != nil { + // TODO potentially this could return a NewLogs with the valid consumers + return nil, errors + } + return fanoutconsumer.NewLogs(consumers), nil +} + +func (r *logsRouter) privateFunc() {} diff --git a/connector/logs_router_test.go b/connector/logs_router_test.go new file mode 100644 index 00000000000..3fd0d701cc0 --- /dev/null +++ b/connector/logs_router_test.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connector + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/internal/testdata" + "go.opentelemetry.io/collector/pdata/plog" +) + +type mutatingLogsSink struct { + *consumertest.LogsSink +} + +func (mts *mutatingLogsSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func TestLogsRouterMultiplexing(t *testing.T) { + var max = 20 + for numIDs := 1; numIDs < max; numIDs++ { + for numCons := 1; numCons < max; numCons++ { + for numLogs := 1; numLogs < max; numLogs++ { + t.Run( + fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numLogs), + fuzzLogs(numIDs, numCons, numLogs), + ) + } + } + } +} + +func fuzzLogs(numIDs, numCons, numLogs int) func(*testing.T) { + return func(t *testing.T) { + allIDs := make([]component.ID, 0, numCons) + allCons := make([]consumer.Logs, 0, numCons) + allConsMap := make(map[component.ID]consumer.Logs) + + // If any consumer is mutating, the router must report mutating + for i := 0; i < numCons; i++ { + allIDs = append(allIDs, component.NewIDWithName("sink", strconv.Itoa(numCons))) + // Random chance for each consumer to be mutating + if (numCons+numLogs+i)%4 == 0 { + allCons = append(allCons, &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}) + } else { + allCons = append(allCons, new(consumertest.LogsSink)) + } + allConsMap[allIDs[i]] = allCons[i] + } + + r := NewLogsRouter(allConsMap) + ld := testdata.GenerateLogs(1) + + // Keep track of how many logs each consumer should receive. + // This will be validated after every call to RouteLogs. + expected := make(map[component.ID]int, numCons) + + for i := 0; i < numLogs; i++ { + // Build a random set of ids (no duplicates) + randCons := make(map[component.ID]bool, numIDs) + for j := 0; j < numIDs; j++ { + // This number should be pretty random and less than numCons + conNum := (numCons + numIDs + i + j) % numCons + randCons[allIDs[conNum]] = true + } + + // Convert to slice, update expectations + conIDs := make([]component.ID, 0, len(randCons)) + for id := range randCons { + conIDs = append(conIDs, id) + expected[id]++ + } + + // Route to list of consumers + fanout, err := r.Consumer(conIDs...) + assert.NoError(t, err) + assert.NoError(t, fanout.ConsumeLogs(context.Background(), ld)) + + // Validate expectations for all consumers + for id := range expected { + logs := []plog.Logs{} + switch con := allConsMap[id].(type) { + case *consumertest.LogsSink: + logs = con.AllLogs() + case *mutatingLogsSink: + logs = con.AllLogs() + } + assert.Len(t, logs, expected[id]) + for n := 0; n < len(logs); n++ { + assert.EqualValues(t, ld, logs[n]) + } + } + } + } +} + +func TestLogsRouterConsumers(t *testing.T) { + ctx := context.Background() + ld := testdata.GenerateLogs(1) + + fooID := component.NewID("foo") + barID := component.NewID("bar") + + foo := new(consumertest.LogsSink) + bar := new(consumertest.LogsSink) + r := NewLogsRouter(map[component.ID]consumer.Logs{fooID: foo, barID: bar}) + + rcs := r.PipelineIDs() + assert.Len(t, rcs, 2) + assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs) + + assert.Len(t, foo.AllLogs(), 0) + assert.Len(t, bar.AllLogs(), 0) + + both, err := r.Consumer(fooID, barID) + assert.NotNil(t, both) + assert.NoError(t, err) + + assert.NoError(t, both.ConsumeLogs(ctx, ld)) + assert.Len(t, foo.AllLogs(), 1) + assert.Len(t, bar.AllLogs(), 1) + + fooOnly, err := r.Consumer(fooID) + assert.NotNil(t, fooOnly) + assert.NoError(t, err) + + assert.NoError(t, fooOnly.ConsumeLogs(ctx, ld)) + assert.Len(t, foo.AllLogs(), 2) + assert.Len(t, bar.AllLogs(), 1) + + barOnly, err := r.Consumer(barID) + assert.NotNil(t, barOnly) + assert.NoError(t, err) + + assert.NoError(t, barOnly.ConsumeLogs(ctx, ld)) + assert.Len(t, foo.AllLogs(), 2) + assert.Len(t, bar.AllLogs(), 2) + + none, err := r.Consumer() + assert.Nil(t, none) + assert.Error(t, err) + + fake, err := r.Consumer(component.NewID("fake")) + assert.Nil(t, fake) + assert.Error(t, err) +} diff --git a/connector/metrics_router.go b/connector/metrics_router.go new file mode 100644 index 00000000000..8e2b8e900a1 --- /dev/null +++ b/connector/metrics_router.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connector // import "go.opentelemetry.io/collector/connector" + +import ( + "fmt" + + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/internal/fanoutconsumer" +) + +// Deprecated: [v0.92.0] use MetricsRouterAndConsumer. +type MetricsRouter interface { + Consumer(...component.ID) (consumer.Metrics, error) + PipelineIDs() []component.ID +} + +// MetricsRouterAndConsumer feeds the first consumer.Metrics in each of the specified pipelines. +type MetricsRouterAndConsumer interface { + consumer.Metrics + Consumer(...component.ID) (consumer.Metrics, error) + PipelineIDs() []component.ID + privateFunc() +} + +type metricsRouter struct { + consumer.Metrics + consumers map[component.ID]consumer.Metrics +} + +func NewMetricsRouter(cm map[component.ID]consumer.Metrics) MetricsRouterAndConsumer { + consumers := make([]consumer.Metrics, 0, len(cm)) + for _, cons := range cm { + consumers = append(consumers, cons) + } + return &metricsRouter{ + Metrics: fanoutconsumer.NewMetrics(consumers), + consumers: cm, + } +} + +func (r *metricsRouter) PipelineIDs() []component.ID { + ids := make([]component.ID, 0, len(r.consumers)) + for id := range r.consumers { + ids = append(ids, id) + } + return ids +} + +func (r *metricsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Metrics, error) { + if len(pipelineIDs) == 0 { + return nil, fmt.Errorf("missing consumers") + } + consumers := make([]consumer.Metrics, 0, len(pipelineIDs)) + var errors error + for _, pipelineID := range pipelineIDs { + c, ok := r.consumers[pipelineID] + if ok { + consumers = append(consumers, c) + } else { + errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) + } + } + if errors != nil { + // TODO potentially this could return a NewMetrics with the valid consumers + return nil, errors + } + return fanoutconsumer.NewMetrics(consumers), nil +} + +func (r *metricsRouter) privateFunc() {} diff --git a/connector/metrics_router_test.go b/connector/metrics_router_test.go new file mode 100644 index 00000000000..69126b04ada --- /dev/null +++ b/connector/metrics_router_test.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connector + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/internal/testdata" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type mutatingMetricsSink struct { + *consumertest.MetricsSink +} + +func (mts *mutatingMetricsSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func TestMetricsRouterMultiplexing(t *testing.T) { + var max = 20 + for numIDs := 1; numIDs < max; numIDs++ { + for numCons := 1; numCons < max; numCons++ { + for numMetrics := 1; numMetrics < max; numMetrics++ { + t.Run( + fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numMetrics), + fuzzMetrics(numIDs, numCons, numMetrics), + ) + } + } + } +} + +func fuzzMetrics(numIDs, numCons, numMetrics int) func(*testing.T) { + return func(t *testing.T) { + allIDs := make([]component.ID, 0, numCons) + allCons := make([]consumer.Metrics, 0, numCons) + allConsMap := make(map[component.ID]consumer.Metrics) + + // If any consumer is mutating, the router must report mutating + for i := 0; i < numCons; i++ { + allIDs = append(allIDs, component.NewIDWithName("sink", strconv.Itoa(numCons))) + // Random chance for each consumer to be mutating + if (numCons+numMetrics+i)%4 == 0 { + allCons = append(allCons, &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}) + } else { + allCons = append(allCons, new(consumertest.MetricsSink)) + } + allConsMap[allIDs[i]] = allCons[i] + } + + r := NewMetricsRouter(allConsMap) + md := testdata.GenerateMetrics(1) + + // Keep track of how many logs each consumer should receive. + // This will be validated after every call to RouteMetrics. + expected := make(map[component.ID]int, numCons) + + for i := 0; i < numMetrics; i++ { + // Build a random set of ids (no duplicates) + randCons := make(map[component.ID]bool, numIDs) + for j := 0; j < numIDs; j++ { + // This number should be pretty random and less than numCons + conNum := (numCons + numIDs + i + j) % numCons + randCons[allIDs[conNum]] = true + } + + // Convert to slice, update expectations + conIDs := make([]component.ID, 0, len(randCons)) + for id := range randCons { + conIDs = append(conIDs, id) + expected[id]++ + } + + // Route to list of consumers + fanout, err := r.Consumer(conIDs...) + assert.NoError(t, err) + assert.NoError(t, fanout.ConsumeMetrics(context.Background(), md)) + + // Validate expectations for all consumers + for id := range expected { + metrics := []pmetric.Metrics{} + switch con := allConsMap[id].(type) { + case *consumertest.MetricsSink: + metrics = con.AllMetrics() + case *mutatingMetricsSink: + metrics = con.AllMetrics() + } + assert.Len(t, metrics, expected[id]) + for n := 0; n < len(metrics); n++ { + assert.EqualValues(t, md, metrics[n]) + } + } + } + } +} + +func TestMetricsRouterConsumers(t *testing.T) { + ctx := context.Background() + md := testdata.GenerateMetrics(1) + + fooID := component.NewID("foo") + barID := component.NewID("bar") + + foo := new(consumertest.MetricsSink) + bar := new(consumertest.MetricsSink) + r := NewMetricsRouter(map[component.ID]consumer.Metrics{fooID: foo, barID: bar}) + + rcs := r.PipelineIDs() + assert.Len(t, rcs, 2) + assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs) + + assert.Len(t, foo.AllMetrics(), 0) + assert.Len(t, bar.AllMetrics(), 0) + + both, err := r.Consumer(fooID, barID) + assert.NotNil(t, both) + assert.NoError(t, err) + + assert.NoError(t, both.ConsumeMetrics(ctx, md)) + assert.Len(t, foo.AllMetrics(), 1) + assert.Len(t, bar.AllMetrics(), 1) + + fooOnly, err := r.Consumer(fooID) + assert.NotNil(t, fooOnly) + assert.NoError(t, err) + + assert.NoError(t, fooOnly.ConsumeMetrics(ctx, md)) + assert.Len(t, foo.AllMetrics(), 2) + assert.Len(t, bar.AllMetrics(), 1) + + barOnly, err := r.Consumer(barID) + assert.NotNil(t, barOnly) + assert.NoError(t, err) + + assert.NoError(t, barOnly.ConsumeMetrics(ctx, md)) + assert.Len(t, foo.AllMetrics(), 2) + assert.Len(t, bar.AllMetrics(), 2) + + none, err := r.Consumer() + assert.Nil(t, none) + assert.Error(t, err) + + fake, err := r.Consumer(component.NewID("fake")) + assert.Nil(t, fake) + assert.Error(t, err) +} diff --git a/connector/traces_router.go b/connector/traces_router.go new file mode 100644 index 00000000000..97c31d7920c --- /dev/null +++ b/connector/traces_router.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connector // import "go.opentelemetry.io/collector/connector" + +import ( + "fmt" + + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/internal/fanoutconsumer" +) + +// Deprecated: [v0.92.0] use TracesRouterAndConsumer +type TracesRouter interface { + Consumer(...component.ID) (consumer.Traces, error) + PipelineIDs() []component.ID +} + +// TracesRouterAndConsumer feeds the first consumer.Traces in each of the specified pipelines. +type TracesRouterAndConsumer interface { + consumer.Traces + Consumer(...component.ID) (consumer.Traces, error) + PipelineIDs() []component.ID + privateFunc() +} + +type tracesRouter struct { + consumer.Traces + consumers map[component.ID]consumer.Traces +} + +func NewTracesRouter(cm map[component.ID]consumer.Traces) TracesRouterAndConsumer { + consumers := make([]consumer.Traces, 0, len(cm)) + for _, c := range cm { + consumers = append(consumers, c) + } + return &tracesRouter{ + Traces: fanoutconsumer.NewTraces(consumers), + consumers: cm, + } +} + +func (r *tracesRouter) PipelineIDs() []component.ID { + ids := make([]component.ID, 0, len(r.consumers)) + for id := range r.consumers { + ids = append(ids, id) + } + return ids +} + +func (r *tracesRouter) Consumer(pipelineIDs ...component.ID) (consumer.Traces, error) { + if len(pipelineIDs) == 0 { + return nil, fmt.Errorf("missing consumers") + } + consumers := make([]consumer.Traces, 0, len(pipelineIDs)) + var errors error + for _, pipelineID := range pipelineIDs { + c, ok := r.consumers[pipelineID] + if ok { + consumers = append(consumers, c) + } else { + errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) + } + } + if errors != nil { + // TODO potentially this could return a NewTraces with the valid consumers + return nil, errors + } + return fanoutconsumer.NewTraces(consumers), nil +} + +func (r *tracesRouter) privateFunc() {} diff --git a/connector/traces_router_test.go b/connector/traces_router_test.go new file mode 100644 index 00000000000..a89a687f9bc --- /dev/null +++ b/connector/traces_router_test.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connector + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/internal/testdata" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type mutatingTracesSink struct { + *consumertest.TracesSink +} + +func (mts *mutatingTracesSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func TestTracesRouterMultiplexing(t *testing.T) { + var max = 20 + for numIDs := 1; numIDs < max; numIDs++ { + for numCons := 1; numCons < max; numCons++ { + for numTraces := 1; numTraces < max; numTraces++ { + t.Run( + fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numTraces), + fuzzTraces(numIDs, numCons, numTraces), + ) + } + } + } +} + +func fuzzTraces(numIDs, numCons, numTraces int) func(*testing.T) { + return func(t *testing.T) { + allIDs := make([]component.ID, 0, numCons) + allCons := make([]consumer.Traces, 0, numCons) + allConsMap := make(map[component.ID]consumer.Traces) + + // If any consumer is mutating, the router must report mutating + for i := 0; i < numCons; i++ { + allIDs = append(allIDs, component.NewIDWithName("sink", strconv.Itoa(numCons))) + // Random chance for each consumer to be mutating + if (numCons+numTraces+i)%4 == 0 { + allCons = append(allCons, &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}) + } else { + allCons = append(allCons, new(consumertest.TracesSink)) + } + allConsMap[allIDs[i]] = allCons[i] + } + + r := NewTracesRouter(allConsMap) + td := testdata.GenerateTraces(1) + + // Keep track of how many logs each consumer should receive. + // This will be validated after every call to RouteTraces. + expected := make(map[component.ID]int, numCons) + + for i := 0; i < numTraces; i++ { + // Build a random set of ids (no duplicates) + randCons := make(map[component.ID]bool, numIDs) + for j := 0; j < numIDs; j++ { + // This number should be pretty random and less than numCons + conNum := (numCons + numIDs + i + j) % numCons + randCons[allIDs[conNum]] = true + } + + // Convert to slice, update expectations + conIDs := make([]component.ID, 0, len(randCons)) + for id := range randCons { + conIDs = append(conIDs, id) + expected[id]++ + } + + // Route to list of consumers + fanout, err := r.Consumer(conIDs...) + assert.NoError(t, err) + assert.NoError(t, fanout.ConsumeTraces(context.Background(), td)) + + // Validate expectations for all consumers + for id := range expected { + traces := []ptrace.Traces{} + switch con := allConsMap[id].(type) { + case *consumertest.TracesSink: + traces = con.AllTraces() + case *mutatingTracesSink: + traces = con.AllTraces() + } + assert.Len(t, traces, expected[id]) + for n := 0; n < len(traces); n++ { + assert.EqualValues(t, td, traces[n]) + } + } + } + } +} + +func TestTracesRouterConsumer(t *testing.T) { + ctx := context.Background() + td := testdata.GenerateTraces(1) + + fooID := component.NewID("foo") + barID := component.NewID("bar") + + foo := new(consumertest.TracesSink) + bar := new(consumertest.TracesSink) + r := NewTracesRouter(map[component.ID]consumer.Traces{fooID: foo, barID: bar}) + + rcs := r.PipelineIDs() + assert.Len(t, rcs, 2) + assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs) + + assert.Len(t, foo.AllTraces(), 0) + assert.Len(t, bar.AllTraces(), 0) + + both, err := r.Consumer(fooID, barID) + assert.NotNil(t, both) + assert.NoError(t, err) + + assert.NoError(t, both.ConsumeTraces(ctx, td)) + assert.Len(t, foo.AllTraces(), 1) + assert.Len(t, bar.AllTraces(), 1) + + fooOnly, err := r.Consumer(fooID) + assert.NotNil(t, fooOnly) + assert.NoError(t, err) + + assert.NoError(t, fooOnly.ConsumeTraces(ctx, td)) + assert.Len(t, foo.AllTraces(), 2) + assert.Len(t, bar.AllTraces(), 1) + + barOnly, err := r.Consumer(barID) + assert.NotNil(t, barOnly) + assert.NoError(t, err) + + assert.NoError(t, barOnly.ConsumeTraces(ctx, td)) + assert.Len(t, foo.AllTraces(), 2) + assert.Len(t, bar.AllTraces(), 2) + + none, err := r.Consumer() + assert.Nil(t, none) + assert.Error(t, err) + + fake, err := r.Consumer(component.NewID("fake")) + assert.Nil(t, fake) + assert.Error(t, err) +} diff --git a/consumer/go.mod b/consumer/go.mod index 9cc8e80c794..e7b0219260b 100644 --- a/consumer/go.mod +++ b/consumer/go.mod @@ -50,6 +50,4 @@ retract ( v0.69.0 // Release failed, use v0.69.1 ) -replace go.opentelemetry.io/collector/connector => ../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../config/configtelemetry diff --git a/exporter/debugexporter/go.mod b/exporter/debugexporter/go.mod index 1b9e82f856b..747eb68b28c 100644 --- a/exporter/debugexporter/go.mod +++ b/exporter/debugexporter/go.mod @@ -65,8 +65,6 @@ replace go.opentelemetry.io/collector/receiver => ../../receiver replace go.opentelemetry.io/collector/extension => ../../extension -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/processor => ../../processor replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry diff --git a/exporter/go.mod b/exporter/go.mod index d97458ef059..e797de14825 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -84,6 +84,4 @@ replace go.opentelemetry.io/collector/receiver => ../receiver retract v0.76.0 // Depends on retracted pdata v1.0.0-rc10 module -replace go.opentelemetry.io/collector/connector => ../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../config/configtelemetry diff --git a/exporter/loggingexporter/go.mod b/exporter/loggingexporter/go.mod index 08bd7924ab2..082e5ae2360 100644 --- a/exporter/loggingexporter/go.mod +++ b/exporter/loggingexporter/go.mod @@ -73,6 +73,4 @@ retract ( v0.69.0 // Release failed, use v0.69.1 ) -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 4b0b2f785f3..481d05f5821 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -112,8 +112,6 @@ retract ( v0.69.0 // Release failed, use v0.69.1 ) -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/confighttp => ../../config/confighttp diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index 119b89b743d..d46c5f0d101 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -95,8 +95,6 @@ replace go.opentelemetry.io/collector/config/internal => ../../config/internal replace go.opentelemetry.io/collector/confmap => ../../confmap -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/exporter => ../ replace go.opentelemetry.io/collector/extension => ../../extension diff --git a/extension/ballastextension/go.mod b/extension/ballastextension/go.mod index 9a1f98e4d52..82f9051b641 100644 --- a/extension/ballastextension/go.mod +++ b/extension/ballastextension/go.mod @@ -71,6 +71,4 @@ retract ( replace go.opentelemetry.io/collector/processor => ../../processor -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry diff --git a/extension/zpagesextension/go.mod b/extension/zpagesextension/go.mod index 08fe1b4b744..f474e48e88a 100644 --- a/extension/zpagesextension/go.mod +++ b/extension/zpagesextension/go.mod @@ -73,6 +73,4 @@ retract ( v0.69.0 // Release failed, use v0.69.1 ) -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry diff --git a/go.mod b/go.mod index 74de59716f3..d832601f9b0 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.91.0 go.opentelemetry.io/collector/config/configtelemetry v0.91.0 - go.opentelemetry.io/collector/connector v0.91.0 go.opentelemetry.io/collector/consumer v0.91.0 go.opentelemetry.io/collector/exporter v0.91.0 go.opentelemetry.io/collector/featuregate v1.0.0 @@ -81,8 +80,6 @@ replace go.opentelemetry.io/collector/confmap => ./confmap replace go.opentelemetry.io/collector/config/configtelemetry => ./config/configtelemetry -replace go.opentelemetry.io/collector/connector => ./connector - replace go.opentelemetry.io/collector/consumer => ./consumer replace go.opentelemetry.io/collector/exporter => ./exporter diff --git a/internal/fanoutconsumer/logs.go b/internal/fanoutconsumer/logs.go index 5047fbbf70a..bff5c1a897e 100644 --- a/internal/fanoutconsumer/logs.go +++ b/internal/fanoutconsumer/logs.go @@ -7,12 +7,9 @@ package fanoutconsumer // import "go.opentelemetry.io/collector/internal/fanoutc import ( "context" - "fmt" "go.uber.org/multierr" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" ) @@ -84,50 +81,3 @@ func cloneLogs(ld plog.Logs) plog.Logs { ld.CopyTo(clonedLogs) return clonedLogs } - -var _ connector.LogsRouter = (*logsRouter)(nil) - -type logsRouter struct { - consumer.Logs - consumers map[component.ID]consumer.Logs -} - -func NewLogsRouter(cm map[component.ID]consumer.Logs) consumer.Logs { - consumers := make([]consumer.Logs, 0, len(cm)) - for _, consumer := range cm { - consumers = append(consumers, consumer) - } - return &logsRouter{ - Logs: NewLogs(consumers), - consumers: cm, - } -} - -func (r *logsRouter) PipelineIDs() []component.ID { - ids := make([]component.ID, 0, len(r.consumers)) - for id := range r.consumers { - ids = append(ids, id) - } - return ids -} - -func (r *logsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Logs, error) { - if len(pipelineIDs) == 0 { - return nil, fmt.Errorf("missing consumers") - } - consumers := make([]consumer.Logs, 0, len(pipelineIDs)) - var errors error - for _, pipelineID := range pipelineIDs { - c, ok := r.consumers[pipelineID] - if ok { - consumers = append(consumers, c) - } else { - errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) - } - } - if errors != nil { - // TODO potentially this could return a NewLogs with the valid consumers - return nil, errors - } - return NewLogs(consumers), nil -} diff --git a/internal/fanoutconsumer/logs_test.go b/internal/fanoutconsumer/logs_test.go index 472239b3456..8c247cb1232 100644 --- a/internal/fanoutconsumer/logs_test.go +++ b/internal/fanoutconsumer/logs_test.go @@ -6,18 +6,13 @@ package fanoutconsumer import ( "context" "errors" - "fmt" - "strconv" "testing" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/pdata/plog" ) func TestLogsNotMultiplexing(t *testing.T) { @@ -250,132 +245,3 @@ type mutatingErr struct { func (mts mutatingErr) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } - -func TestLogsRouterMultiplexing(t *testing.T) { - var max = 20 - for numIDs := 1; numIDs < max; numIDs++ { - for numCons := 1; numCons < max; numCons++ { - for numLogs := 1; numLogs < max; numLogs++ { - t.Run( - fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numLogs), - fuzzLogsRouter(numIDs, numCons, numLogs), - ) - } - } - } -} - -func fuzzLogsRouter(numIDs, numCons, numLogs int) func(*testing.T) { - return func(t *testing.T) { - allIDs := make([]component.ID, 0, numCons) - allCons := make([]consumer.Logs, 0, numCons) - allConsMap := make(map[component.ID]consumer.Logs) - - // If any consumer is mutating, the router must report mutating - for i := 0; i < numCons; i++ { - allIDs = append(allIDs, component.NewIDWithName("sink", strconv.Itoa(numCons))) - // Random chance for each consumer to be mutating - if (numCons+numLogs+i)%4 == 0 { - allCons = append(allCons, &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}) - } else { - allCons = append(allCons, new(consumertest.LogsSink)) - } - allConsMap[allIDs[i]] = allCons[i] - } - - r := NewLogsRouter(allConsMap).(connector.LogsRouter) - ld := testdata.GenerateLogs(1) - - // Keep track of how many logs each consumer should receive. - // This will be validated after every call to RouteLogs. - expected := make(map[component.ID]int, numCons) - - for i := 0; i < numLogs; i++ { - // Build a random set of ids (no duplicates) - randCons := make(map[component.ID]bool, numIDs) - for j := 0; j < numIDs; j++ { - // This number should be pretty random and less than numCons - conNum := (numCons + numIDs + i + j) % numCons - randCons[allIDs[conNum]] = true - } - - // Convert to slice, update expectations - conIDs := make([]component.ID, 0, len(randCons)) - for id := range randCons { - conIDs = append(conIDs, id) - expected[id]++ - } - - // Route to list of consumers - fanout, err := r.Consumer(conIDs...) - assert.NoError(t, err) - assert.NoError(t, fanout.ConsumeLogs(context.Background(), ld)) - - // Validate expectations for all consumers - for id := range expected { - logs := []plog.Logs{} - switch con := allConsMap[id].(type) { - case *consumertest.LogsSink: - logs = con.AllLogs() - case *mutatingLogsSink: - logs = con.AllLogs() - } - assert.Len(t, logs, expected[id]) - for n := 0; n < len(logs); n++ { - assert.EqualValues(t, ld, logs[n]) - } - } - } - } -} - -func TestLogsRouterGetConsumers(t *testing.T) { - ctx := context.Background() - ld := testdata.GenerateLogs(1) - - fooID := component.NewID("foo") - barID := component.NewID("bar") - - foo := new(consumertest.LogsSink) - bar := new(consumertest.LogsSink) - r := NewLogsRouter(map[component.ID]consumer.Logs{fooID: foo, barID: bar}).(connector.LogsRouter) - - rcs := r.PipelineIDs() - assert.Len(t, rcs, 2) - assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs) - - assert.Len(t, foo.AllLogs(), 0) - assert.Len(t, bar.AllLogs(), 0) - - both, err := r.Consumer(fooID, barID) - assert.NotNil(t, both) - assert.NoError(t, err) - - assert.NoError(t, both.ConsumeLogs(ctx, ld)) - assert.Len(t, foo.AllLogs(), 1) - assert.Len(t, bar.AllLogs(), 1) - - fooOnly, err := r.Consumer(fooID) - assert.NotNil(t, fooOnly) - assert.NoError(t, err) - - assert.NoError(t, fooOnly.ConsumeLogs(ctx, ld)) - assert.Len(t, foo.AllLogs(), 2) - assert.Len(t, bar.AllLogs(), 1) - - barOnly, err := r.Consumer(barID) - assert.NotNil(t, barOnly) - assert.NoError(t, err) - - assert.NoError(t, barOnly.ConsumeLogs(ctx, ld)) - assert.Len(t, foo.AllLogs(), 2) - assert.Len(t, bar.AllLogs(), 2) - - none, err := r.Consumer() - assert.Nil(t, none) - assert.Error(t, err) - - fake, err := r.Consumer(component.NewID("fake")) - assert.Nil(t, fake) - assert.Error(t, err) -} diff --git a/internal/fanoutconsumer/metrics.go b/internal/fanoutconsumer/metrics.go index 023db2a70e3..32d9514561d 100644 --- a/internal/fanoutconsumer/metrics.go +++ b/internal/fanoutconsumer/metrics.go @@ -5,12 +5,9 @@ package fanoutconsumer // import "go.opentelemetry.io/collector/internal/fanoutc import ( "context" - "fmt" "go.uber.org/multierr" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -82,50 +79,3 @@ func cloneMetrics(md pmetric.Metrics) pmetric.Metrics { md.CopyTo(clonedMetrics) return clonedMetrics } - -var _ connector.MetricsRouter = (*metricsRouter)(nil) - -type metricsRouter struct { - consumer.Metrics - consumers map[component.ID]consumer.Metrics -} - -func NewMetricsRouter(cm map[component.ID]consumer.Metrics) consumer.Metrics { - consumers := make([]consumer.Metrics, 0, len(cm)) - for _, consumer := range cm { - consumers = append(consumers, consumer) - } - return &metricsRouter{ - Metrics: NewMetrics(consumers), - consumers: cm, - } -} - -func (r *metricsRouter) PipelineIDs() []component.ID { - ids := make([]component.ID, 0, len(r.consumers)) - for id := range r.consumers { - ids = append(ids, id) - } - return ids -} - -func (r *metricsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Metrics, error) { - if len(pipelineIDs) == 0 { - return nil, fmt.Errorf("missing consumers") - } - consumers := make([]consumer.Metrics, 0, len(pipelineIDs)) - var errors error - for _, pipelineID := range pipelineIDs { - c, ok := r.consumers[pipelineID] - if ok { - consumers = append(consumers, c) - } else { - errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) - } - } - if errors != nil { - // TODO potentially this could return a NewMetrics with the valid consumers - return nil, errors - } - return NewMetrics(consumers), nil -} diff --git a/internal/fanoutconsumer/metrics_test.go b/internal/fanoutconsumer/metrics_test.go index cf6455d0d07..efb39ec552a 100644 --- a/internal/fanoutconsumer/metrics_test.go +++ b/internal/fanoutconsumer/metrics_test.go @@ -6,18 +6,13 @@ package fanoutconsumer import ( "context" "errors" - "fmt" - "strconv" "testing" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/pdata/pmetric" ) func TestMetricsNotMultiplexing(t *testing.T) { @@ -242,132 +237,3 @@ type mutatingMetricsSink struct { func (mts *mutatingMetricsSink) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } - -func TestMetricsRouterMultiplexing(t *testing.T) { - var max = 20 - for numIDs := 1; numIDs < max; numIDs++ { - for numCons := 1; numCons < max; numCons++ { - for numMetrics := 1; numMetrics < max; numMetrics++ { - t.Run( - fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numMetrics), - fuzzMetricsRouter(numIDs, numCons, numMetrics), - ) - } - } - } -} - -func fuzzMetricsRouter(numIDs, numCons, numMetrics int) func(*testing.T) { - return func(t *testing.T) { - allIDs := make([]component.ID, 0, numCons) - allCons := make([]consumer.Metrics, 0, numCons) - allConsMap := make(map[component.ID]consumer.Metrics) - - // If any consumer is mutating, the router must report mutating - for i := 0; i < numCons; i++ { - allIDs = append(allIDs, component.NewIDWithName("sink", strconv.Itoa(numCons))) - // Random chance for each consumer to be mutating - if (numCons+numMetrics+i)%4 == 0 { - allCons = append(allCons, &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}) - } else { - allCons = append(allCons, new(consumertest.MetricsSink)) - } - allConsMap[allIDs[i]] = allCons[i] - } - - r := NewMetricsRouter(allConsMap).(connector.MetricsRouter) - md := testdata.GenerateMetrics(1) - - // Keep track of how many logs each consumer should receive. - // This will be validated after every call to RouteMetrics. - expected := make(map[component.ID]int, numCons) - - for i := 0; i < numMetrics; i++ { - // Build a random set of ids (no duplicates) - randCons := make(map[component.ID]bool, numIDs) - for j := 0; j < numIDs; j++ { - // This number should be pretty random and less than numCons - conNum := (numCons + numIDs + i + j) % numCons - randCons[allIDs[conNum]] = true - } - - // Convert to slice, update expectations - conIDs := make([]component.ID, 0, len(randCons)) - for id := range randCons { - conIDs = append(conIDs, id) - expected[id]++ - } - - // Route to list of consumers - fanout, err := r.Consumer(conIDs...) - assert.NoError(t, err) - assert.NoError(t, fanout.ConsumeMetrics(context.Background(), md)) - - // Validate expectations for all consumers - for id := range expected { - metrics := []pmetric.Metrics{} - switch con := allConsMap[id].(type) { - case *consumertest.MetricsSink: - metrics = con.AllMetrics() - case *mutatingMetricsSink: - metrics = con.AllMetrics() - } - assert.Len(t, metrics, expected[id]) - for n := 0; n < len(metrics); n++ { - assert.EqualValues(t, md, metrics[n]) - } - } - } - } -} - -func TestMetricsRouterGetConsumers(t *testing.T) { - ctx := context.Background() - md := testdata.GenerateMetrics(1) - - fooID := component.NewID("foo") - barID := component.NewID("bar") - - foo := new(consumertest.MetricsSink) - bar := new(consumertest.MetricsSink) - r := NewMetricsRouter(map[component.ID]consumer.Metrics{fooID: foo, barID: bar}).(connector.MetricsRouter) - - rcs := r.PipelineIDs() - assert.Len(t, rcs, 2) - assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs) - - assert.Len(t, foo.AllMetrics(), 0) - assert.Len(t, bar.AllMetrics(), 0) - - both, err := r.Consumer(fooID, barID) - assert.NotNil(t, both) - assert.NoError(t, err) - - assert.NoError(t, both.ConsumeMetrics(ctx, md)) - assert.Len(t, foo.AllMetrics(), 1) - assert.Len(t, bar.AllMetrics(), 1) - - fooOnly, err := r.Consumer(fooID) - assert.NotNil(t, fooOnly) - assert.NoError(t, err) - - assert.NoError(t, fooOnly.ConsumeMetrics(ctx, md)) - assert.Len(t, foo.AllMetrics(), 2) - assert.Len(t, bar.AllMetrics(), 1) - - barOnly, err := r.Consumer(barID) - assert.NotNil(t, barOnly) - assert.NoError(t, err) - - assert.NoError(t, barOnly.ConsumeMetrics(ctx, md)) - assert.Len(t, foo.AllMetrics(), 2) - assert.Len(t, bar.AllMetrics(), 2) - - none, err := r.Consumer() - assert.Nil(t, none) - assert.Error(t, err) - - fake, err := r.Consumer(component.NewID("fake")) - assert.Nil(t, fake) - assert.Error(t, err) -} diff --git a/internal/fanoutconsumer/traces.go b/internal/fanoutconsumer/traces.go index e13068a656f..f9a34027017 100644 --- a/internal/fanoutconsumer/traces.go +++ b/internal/fanoutconsumer/traces.go @@ -5,12 +5,9 @@ package fanoutconsumer // import "go.opentelemetry.io/collector/internal/fanoutc import ( "context" - "fmt" "go.uber.org/multierr" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -82,50 +79,3 @@ func cloneTraces(td ptrace.Traces) ptrace.Traces { td.CopyTo(clonedTraces) return clonedTraces } - -var _ connector.TracesRouter = (*tracesRouter)(nil) - -type tracesRouter struct { - consumer.Traces - consumers map[component.ID]consumer.Traces -} - -func NewTracesRouter(cm map[component.ID]consumer.Traces) consumer.Traces { - consumers := make([]consumer.Traces, 0, len(cm)) - for _, consumer := range cm { - consumers = append(consumers, consumer) - } - return &tracesRouter{ - Traces: NewTraces(consumers), - consumers: cm, - } -} - -func (r *tracesRouter) PipelineIDs() []component.ID { - ids := make([]component.ID, 0, len(r.consumers)) - for id := range r.consumers { - ids = append(ids, id) - } - return ids -} - -func (r *tracesRouter) Consumer(pipelineIDs ...component.ID) (consumer.Traces, error) { - if len(pipelineIDs) == 0 { - return nil, fmt.Errorf("missing consumers") - } - consumers := make([]consumer.Traces, 0, len(pipelineIDs)) - var errors error - for _, pipelineID := range pipelineIDs { - c, ok := r.consumers[pipelineID] - if ok { - consumers = append(consumers, c) - } else { - errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) - } - } - if errors != nil { - // TODO potentially this could return a NewTraces with the valid consumers - return nil, errors - } - return NewTraces(consumers), nil -} diff --git a/internal/fanoutconsumer/traces_test.go b/internal/fanoutconsumer/traces_test.go index 966fdcf627b..120c6524715 100644 --- a/internal/fanoutconsumer/traces_test.go +++ b/internal/fanoutconsumer/traces_test.go @@ -6,18 +6,13 @@ package fanoutconsumer import ( "context" "errors" - "fmt" - "strconv" "testing" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/pdata/ptrace" ) func TestTracesNotMultiplexing(t *testing.T) { @@ -243,132 +238,3 @@ type mutatingTracesSink struct { func (mts *mutatingTracesSink) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } - -func TestTracesRouterMultiplexing(t *testing.T) { - var max = 20 - for numIDs := 1; numIDs < max; numIDs++ { - for numCons := 1; numCons < max; numCons++ { - for numTraces := 1; numTraces < max; numTraces++ { - t.Run( - fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numTraces), - fuzzTracesRouter(numIDs, numCons, numTraces), - ) - } - } - } -} - -func fuzzTracesRouter(numIDs, numCons, numTraces int) func(*testing.T) { - return func(t *testing.T) { - allIDs := make([]component.ID, 0, numCons) - allCons := make([]consumer.Traces, 0, numCons) - allConsMap := make(map[component.ID]consumer.Traces) - - // If any consumer is mutating, the router must report mutating - for i := 0; i < numCons; i++ { - allIDs = append(allIDs, component.NewIDWithName("sink", strconv.Itoa(numCons))) - // Random chance for each consumer to be mutating - if (numCons+numTraces+i)%4 == 0 { - allCons = append(allCons, &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}) - } else { - allCons = append(allCons, new(consumertest.TracesSink)) - } - allConsMap[allIDs[i]] = allCons[i] - } - - r := NewTracesRouter(allConsMap).(connector.TracesRouter) - td := testdata.GenerateTraces(1) - - // Keep track of how many logs each consumer should receive. - // This will be validated after every call to RouteTraces. - expected := make(map[component.ID]int, numCons) - - for i := 0; i < numTraces; i++ { - // Build a random set of ids (no duplicates) - randCons := make(map[component.ID]bool, numIDs) - for j := 0; j < numIDs; j++ { - // This number should be pretty random and less than numCons - conNum := (numCons + numIDs + i + j) % numCons - randCons[allIDs[conNum]] = true - } - - // Convert to slice, update expectations - conIDs := make([]component.ID, 0, len(randCons)) - for id := range randCons { - conIDs = append(conIDs, id) - expected[id]++ - } - - // Route to list of consumers - fanout, err := r.Consumer(conIDs...) - assert.NoError(t, err) - assert.NoError(t, fanout.ConsumeTraces(context.Background(), td)) - - // Validate expectations for all consumers - for id := range expected { - traces := []ptrace.Traces{} - switch con := allConsMap[id].(type) { - case *consumertest.TracesSink: - traces = con.AllTraces() - case *mutatingTracesSink: - traces = con.AllTraces() - } - assert.Len(t, traces, expected[id]) - for n := 0; n < len(traces); n++ { - assert.EqualValues(t, td, traces[n]) - } - } - } - } -} - -func TestTracesRouterGetConsumer(t *testing.T) { - ctx := context.Background() - td := testdata.GenerateTraces(1) - - fooID := component.NewID("foo") - barID := component.NewID("bar") - - foo := new(consumertest.TracesSink) - bar := new(consumertest.TracesSink) - r := NewTracesRouter(map[component.ID]consumer.Traces{fooID: foo, barID: bar}).(connector.TracesRouter) - - rcs := r.PipelineIDs() - assert.Len(t, rcs, 2) - assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs) - - assert.Len(t, foo.AllTraces(), 0) - assert.Len(t, bar.AllTraces(), 0) - - both, err := r.Consumer(fooID, barID) - assert.NotNil(t, both) - assert.NoError(t, err) - - assert.NoError(t, both.ConsumeTraces(ctx, td)) - assert.Len(t, foo.AllTraces(), 1) - assert.Len(t, bar.AllTraces(), 1) - - fooOnly, err := r.Consumer(fooID) - assert.NotNil(t, fooOnly) - assert.NoError(t, err) - - assert.NoError(t, fooOnly.ConsumeTraces(ctx, td)) - assert.Len(t, foo.AllTraces(), 2) - assert.Len(t, bar.AllTraces(), 1) - - barOnly, err := r.Consumer(barID) - assert.NotNil(t, barOnly) - assert.NoError(t, err) - - assert.NoError(t, barOnly.ConsumeTraces(ctx, td)) - assert.Len(t, foo.AllTraces(), 2) - assert.Len(t, bar.AllTraces(), 2) - - none, err := r.Consumer() - assert.Nil(t, none) - assert.Error(t, err) - - fake, err := r.Consumer(component.NewID("fake")) - assert.Nil(t, fake) - assert.Error(t, err) -} diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index 91be1255cb0..d875971fc54 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -87,6 +87,4 @@ retract ( v0.69.0 // Release failed, use v0.69.1 ) -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry diff --git a/processor/go.mod b/processor/go.mod index 4d43ea9e8e3..a434df475dc 100644 --- a/processor/go.mod +++ b/processor/go.mod @@ -79,6 +79,4 @@ replace go.opentelemetry.io/collector/pdata => ../pdata replace go.opentelemetry.io/collector/receiver => ../receiver -replace go.opentelemetry.io/collector/connector => ../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../config/configtelemetry diff --git a/processor/memorylimiterprocessor/go.mod b/processor/memorylimiterprocessor/go.mod index e6fe03a5955..1657f21ce37 100644 --- a/processor/memorylimiterprocessor/go.mod +++ b/processor/memorylimiterprocessor/go.mod @@ -76,6 +76,4 @@ retract ( v0.69.0 // Release failed, use v0.69.1 ) -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry diff --git a/receiver/go.mod b/receiver/go.mod index 2d32e0d673e..78eda9378f7 100644 --- a/receiver/go.mod +++ b/receiver/go.mod @@ -81,6 +81,4 @@ replace go.opentelemetry.io/collector/processor => ../processor retract v0.76.0 // Depends on retracted pdata v1.0.0-rc10 module -replace go.opentelemetry.io/collector/connector => ../connector - replace go.opentelemetry.io/collector/config/configtelemetry => ../config/configtelemetry diff --git a/receiver/otlpreceiver/go.mod b/receiver/otlpreceiver/go.mod index 1f6ada09e05..147a26a4306 100644 --- a/receiver/otlpreceiver/go.mod +++ b/receiver/otlpreceiver/go.mod @@ -11,7 +11,6 @@ require ( go.opentelemetry.io/collector/config/configgrpc v0.91.0 go.opentelemetry.io/collector/config/confighttp v0.91.0 go.opentelemetry.io/collector/config/confignet v0.91.0 - go.opentelemetry.io/collector/config/configtelemetry v0.91.0 go.opentelemetry.io/collector/config/configtls v0.91.0 go.opentelemetry.io/collector/confmap v0.91.0 go.opentelemetry.io/collector/consumer v0.91.0 @@ -62,6 +61,7 @@ require ( go.opentelemetry.io/collector/config/configauth v0.91.0 // indirect go.opentelemetry.io/collector/config/configcompression v0.91.0 // indirect go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect go.opentelemetry.io/collector/config/internal v0.91.0 // indirect go.opentelemetry.io/collector/extension v0.91.0 // indirect go.opentelemetry.io/collector/extension/auth v0.91.0 // indirect @@ -107,8 +107,6 @@ replace go.opentelemetry.io/collector/config/internal => ../../config/internal replace go.opentelemetry.io/collector/confmap => ../../confmap -replace go.opentelemetry.io/collector/connector => ../../connector - replace go.opentelemetry.io/collector/exporter => ../../exporter replace go.opentelemetry.io/collector/extension => ../../extension diff --git a/service/internal/graph/nodes.go b/service/internal/graph/nodes.go index eb8e886b554..79d627d5bf7 100644 --- a/service/internal/graph/nodes.go +++ b/service/internal/graph/nodes.go @@ -244,7 +244,7 @@ func (n *connectorNode) buildComponent( consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces) capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData } - next := fanoutconsumer.NewTracesRouter(consumers) + next := connector.NewTracesRouter(consumers) switch n.exprPipelineType { case component.DataTypeTraces: @@ -280,7 +280,7 @@ func (n *connectorNode) buildComponent( consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics) capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData } - next := fanoutconsumer.NewMetricsRouter(consumers) + next := connector.NewMetricsRouter(consumers) switch n.exprPipelineType { case component.DataTypeTraces: @@ -315,7 +315,7 @@ func (n *connectorNode) buildComponent( consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs) capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData } - next := fanoutconsumer.NewLogsRouter(consumers) + next := connector.NewLogsRouter(consumers) switch n.exprPipelineType { case component.DataTypeTraces: diff --git a/service/internal/testcomponents/example_router.go b/service/internal/testcomponents/example_router.go index 67d97dbebcb..45d2a17c561 100644 --- a/service/internal/testcomponents/example_router.go +++ b/service/internal/testcomponents/example_router.go @@ -42,7 +42,7 @@ func createExampleRouterDefaultConfig() component.Config { func createExampleTracesRouter(_ context.Context, _ connector.CreateSettings, cfg component.Config, traces consumer.Traces) (connector.Traces, error) { c := cfg.(ExampleRouterConfig) - r := traces.(connector.TracesRouter) + r := traces.(connector.TracesRouterAndConsumer) left, _ := r.Consumer(c.Traces.Left) right, _ := r.Consumer(c.Traces.Right) return &ExampleRouter{ @@ -53,7 +53,7 @@ func createExampleTracesRouter(_ context.Context, _ connector.CreateSettings, cf func createExampleMetricsRouter(_ context.Context, _ connector.CreateSettings, cfg component.Config, metrics consumer.Metrics) (connector.Metrics, error) { c := cfg.(ExampleRouterConfig) - r := metrics.(connector.MetricsRouter) + r := metrics.(connector.MetricsRouterAndConsumer) left, _ := r.Consumer(c.Metrics.Left) right, _ := r.Consumer(c.Metrics.Right) return &ExampleRouter{ @@ -64,7 +64,7 @@ func createExampleMetricsRouter(_ context.Context, _ connector.CreateSettings, c func createExampleLogsRouter(_ context.Context, _ connector.CreateSettings, cfg component.Config, logs consumer.Logs) (connector.Logs, error) { c := cfg.(ExampleRouterConfig) - r := logs.(connector.LogsRouter) + r := logs.(connector.LogsRouterAndConsumer) left, _ := r.Consumer(c.Logs.Left) right, _ := r.Consumer(c.Logs.Right) return &ExampleRouter{ diff --git a/service/internal/testcomponents/example_router_test.go b/service/internal/testcomponents/example_router_test.go index bcf4f5b97b4..a34a70160e0 100644 --- a/service/internal/testcomponents/example_router_test.go +++ b/service/internal/testcomponents/example_router_test.go @@ -11,10 +11,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/connector/connectortest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/internal/fanoutconsumer" "go.opentelemetry.io/collector/internal/testdata" ) @@ -40,7 +40,7 @@ func TestTracesRouter(t *testing.T) { // The service will build a router to give to every connector. // Many connectors will just call router.ConsumeTraces, // but some implementation will call RouteTraces instead. - router := fanoutconsumer.NewTracesRouter( + router := connector.NewTracesRouter( map[component.ID]consumer.Traces{ leftID: sinkLeft, rightID: sinkRight, @@ -79,7 +79,7 @@ func TestMetricsRouter(t *testing.T) { // The service will build a router to give to every connector. // Many connectors will just call router.ConsumeMetrics, // but some implementation will call RouteMetrics instead. - router := fanoutconsumer.NewMetricsRouter( + router := connector.NewMetricsRouter( map[component.ID]consumer.Metrics{ leftID: sinkLeft, rightID: sinkRight, @@ -118,7 +118,7 @@ func TestLogsRouter(t *testing.T) { // The service will build a router to give to every connector. // Many connectors will just call router.ConsumeLogs, // but some implementation will call RouteLogs instead. - router := fanoutconsumer.NewLogsRouter( + router := connector.NewLogsRouter( map[component.ID]consumer.Logs{ leftID: sinkLeft, rightID: sinkRight,