Skip to content

Commit

Permalink
Remove circular dependency between default otel and connector
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Dec 14, 2023
1 parent 1d0d1ef commit deb315f
Show file tree
Hide file tree
Showing 36 changed files with 746 additions and 629 deletions.
13 changes: 13 additions & 0 deletions .chloggen/rmcircular.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'deprecated'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: 'connector'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Deprecate [Metrics|Logs|Traces]Router in favour of [Metrics|Logs|Traces]TracesRouterAndConsumer"

# One or more tracking issues or pull requests related to the change
issues: [9095]
13 changes: 13 additions & 0 deletions .chloggen/router.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: "connector"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Change interfaces connector.[Traces|Metrics|Logs]Router to disallow implementation"

# One or more tracking issues or pull requests related to the change
issues: [9095]
2 changes: 0 additions & 2 deletions config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter

replace go.opentelemetry.io/collector/receiver => ../../receiver

replace go.opentelemetry.io/collector/connector => ../../connector

replace go.opentelemetry.io/collector/featuregate => ../../featuregate

replace go.opentelemetry.io/collector/pdata => ../../pdata
Expand Down
2 changes: 0 additions & 2 deletions config/confighttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter

replace go.opentelemetry.io/collector/receiver => ../../receiver

replace go.opentelemetry.io/collector/connector => ../../connector

replace go.opentelemetry.io/collector/featuregate => ../../featuregate

replace go.opentelemetry.io/collector/pdata => ../../pdata
Expand Down
19 changes: 0 additions & 19 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ type Traces interface {
consumer.Traces
}

// TracesRouter feeds the first consumer.Traces in each of the specified pipelines.
// The router will create a fanout consumer for the set of pipelines and return a uuid
type TracesRouter interface {
Consumer(...component.ID) (consumer.Traces, error)
PipelineIDs() []component.ID
}

// A Metrics connector acts as an exporter from a metrics pipeline and a receiver
// to one or more traces, metrics, or logs pipelines.
// Metrics feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data.
Expand All @@ -53,12 +46,6 @@ type Metrics interface {
consumer.Metrics
}

// MetricsRouter feeds the first consumer.Metrics in each of the specified pipelines.
type MetricsRouter interface {
Consumer(...component.ID) (consumer.Metrics, error)
PipelineIDs() []component.ID
}

// A Logs connector acts as an exporter from a logs pipeline and a receiver
// to one or more traces, metrics, or logs pipelines.
// Logs feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data.
Expand All @@ -74,12 +61,6 @@ type Logs interface {
consumer.Logs
}

// LogsRouter feeds the first consumer.Logs in each of the specified pipelines.
type LogsRouter interface {
Consumer(...component.ID) (consumer.Logs, error)
PipelineIDs() []component.ID
}

// CreateSettings configures Connector creators.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
Expand Down
28 changes: 15 additions & 13 deletions connector/connectortest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,77 +8,79 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
)

// Deprecated: [v0.92.0] use connector.NewTracesRouter.
type TracesRouterOption struct {
id component.ID
cons consumer.Traces
}

// WithNopTraces creates a nop consumer for a connector.TracesRouter
// Deprecated: [v0.92.0] use connector.NewTracesRouter.
func WithNopTraces(id component.ID) TracesRouterOption {
return TracesRouterOption{id: id, cons: consumertest.NewNop()}
}

// WithTracesSink adds a consumer to a connector.TracesRouter
// Deprecated: [v0.92.0] use connector.NewTracesRouter.
func WithTracesSink(id component.ID, sink *consumertest.TracesSink) TracesRouterOption {
return TracesRouterOption{id: id, cons: sink}
}

// NewTracesRouter returns a connector.TracesRouter with sinks based on the options provided
// Deprecated: [v0.92.0] use connector.NewTracesRouter.
func NewTracesRouter(opts ...TracesRouterOption) connector.TracesRouter {
consumers := make(map[component.ID]consumer.Traces)
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewTracesRouter(consumers).(connector.TracesRouter)
return connector.NewTracesRouter(consumers)
}

// Deprecated: [v0.92.0] use connector.NewMetricsRouter.
type MetricsRouterOption struct {
id component.ID
cons consumer.Metrics
}

// WithNopMetrics creates a nop consumer for a connector.MetricsRouter
// Deprecated: [v0.92.0] use connector.NewMetricsRouter.
func WithNopMetrics(id component.ID) MetricsRouterOption {
return MetricsRouterOption{id: id, cons: consumertest.NewNop()}
}

// WithMetricsSink adds a consumer to a connector.MetricsRouter
// Deprecated: [v0.92.0] use connector.NewMetricsRouter.
func WithMetricsSink(id component.ID, sink *consumertest.MetricsSink) MetricsRouterOption {
return MetricsRouterOption{id: id, cons: sink}
}

// NewMetricsRouter returns a connector.MetricsRouter with sinks based on the options provided
// Deprecated: [v0.92.0] use connector.NewMetricsRouter.
func NewMetricsRouter(opts ...MetricsRouterOption) connector.MetricsRouter {
consumers := make(map[component.ID]consumer.Metrics)
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewMetricsRouter(consumers).(connector.MetricsRouter)
return connector.NewMetricsRouter(consumers)
}

// Deprecated: [v0.92.0] use connector.NewLogsRouter.
type LogsRouterOption struct {
id component.ID
cons consumer.Logs
}

// WithNopLogs creates a nop consumer for a connector.LogsRouter
// Deprecated: [v0.92.0] use connector.NewLogsRouter.
func WithNopLogs(id component.ID) LogsRouterOption {
return LogsRouterOption{id: id, cons: consumertest.NewNop()}
}

// WithLogsSink adds a consumer to a connector.LogsRouter
// Deprecated: [v0.92.0] use connector.NewLogsRouter.
func WithLogsSink(id component.ID, sink *consumertest.LogsSink) LogsRouterOption {
return LogsRouterOption{id: id, cons: sink}
}

// NewLogsRouter returns a connector.LogsRouter with sinks based on the options provided
// Deprecated: [v0.92.0] use connector.NewLogsRouter.
func NewLogsRouter(opts ...LogsRouterOption) connector.LogsRouter {
consumers := make(map[component.ID]consumer.Logs)
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewLogsRouter(consumers).(connector.LogsRouter)
return connector.NewLogsRouter(consumers)
}
2 changes: 1 addition & 1 deletion connector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
go.opentelemetry.io/collector/component v0.91.0
go.opentelemetry.io/collector/consumer v0.91.0
go.opentelemetry.io/collector/pdata v1.0.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
)

Expand All @@ -32,7 +33,6 @@ require (
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
75 changes: 75 additions & 0 deletions connector/logs_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package connector // import "go.opentelemetry.io/collector/connector"

import (
"fmt"

"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
)

// Deprecated: [v0.92.0] use LogsRouterAndConsumer
type LogsRouter interface {
Consumer(...component.ID) (consumer.Logs, error)
PipelineIDs() []component.ID
}

// LogsRouterAndConsumer feeds the first consumer.Logs in each of the specified pipelines.
type LogsRouterAndConsumer interface {
consumer.Logs
Consumer(...component.ID) (consumer.Logs, error)
PipelineIDs() []component.ID
privateFunc()
}

type logsRouter struct {
consumer.Logs
consumers map[component.ID]consumer.Logs
}

func NewLogsRouter(cm map[component.ID]consumer.Logs) LogsRouterAndConsumer {
consumers := make([]consumer.Logs, 0, len(cm))
for _, consumer := range cm {
consumers = append(consumers, consumer)
}
return &logsRouter{
Logs: fanoutconsumer.NewLogs(consumers),
consumers: cm,
}
}

func (r *logsRouter) PipelineIDs() []component.ID {
ids := make([]component.ID, 0, len(r.consumers))
for id := range r.consumers {
ids = append(ids, id)
}
return ids
}

func (r *logsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Logs, error) {
if len(pipelineIDs) == 0 {
return nil, fmt.Errorf("missing consumers")
}
consumers := make([]consumer.Logs, 0, len(pipelineIDs))
var errors error
for _, pipelineID := range pipelineIDs {
c, ok := r.consumers[pipelineID]
if ok {
consumers = append(consumers, c)
} else {
errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID))
}
}
if errors != nil {
// TODO potentially this could return a NewLogs with the valid consumers
return nil, errors
}
return fanoutconsumer.NewLogs(consumers), nil
}

func (r *logsRouter) privateFunc() {}
Loading

0 comments on commit deb315f

Please sign in to comment.