Skip to content

Commit

Permalink
Remove circular dependency between default otel and connector
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Dec 13, 2023
1 parent defc1bb commit fd26316
Show file tree
Hide file tree
Showing 35 changed files with 721 additions and 617 deletions.
13 changes: 13 additions & 0 deletions .chloggen/rmcircular.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: 'connector'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Remove circular dependency between default otel and connector"

# One or more tracking issues or pull requests related to the change
issues: [9095]
13 changes: 13 additions & 0 deletions .chloggen/router.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: "connector"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Change interfaces connector.[Traces|Metrics|Logs]Router to disallow implementation"

# One or more tracking issues or pull requests related to the change
issues: [9095]
2 changes: 0 additions & 2 deletions config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter

replace go.opentelemetry.io/collector/receiver => ../../receiver

replace go.opentelemetry.io/collector/connector => ../../connector

replace go.opentelemetry.io/collector/featuregate => ../../featuregate

replace go.opentelemetry.io/collector/pdata => ../../pdata
Expand Down
2 changes: 0 additions & 2 deletions config/confighttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter

replace go.opentelemetry.io/collector/receiver => ../../receiver

replace go.opentelemetry.io/collector/connector => ../../connector

replace go.opentelemetry.io/collector/featuregate => ../../featuregate

replace go.opentelemetry.io/collector/pdata => ../../pdata
Expand Down
19 changes: 0 additions & 19 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ type Traces interface {
consumer.Traces
}

// TracesRouter feeds the first consumer.Traces in each of the specified pipelines.
// The router will create a fanout consumer for the set of pipelines and return a uuid
type TracesRouter interface {
Consumer(...component.ID) (consumer.Traces, error)
PipelineIDs() []component.ID
}

// A Metrics connector acts as an exporter from a metrics pipeline and a receiver
// to one or more traces, metrics, or logs pipelines.
// Metrics feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data.
Expand All @@ -53,12 +46,6 @@ type Metrics interface {
consumer.Metrics
}

// MetricsRouter feeds the first consumer.Metrics in each of the specified pipelines.
type MetricsRouter interface {
Consumer(...component.ID) (consumer.Metrics, error)
PipelineIDs() []component.ID
}

// A Logs connector acts as an exporter from a logs pipeline and a receiver
// to one or more traces, metrics, or logs pipelines.
// Logs feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data.
Expand All @@ -74,12 +61,6 @@ type Logs interface {
consumer.Logs
}

// LogsRouter feeds the first consumer.Logs in each of the specified pipelines.
type LogsRouter interface {
Consumer(...component.ID) (consumer.Logs, error)
PipelineIDs() []component.ID
}

// CreateSettings configures Connector creators.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
Expand Down
7 changes: 3 additions & 4 deletions connector/connectortest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
)

type TracesRouterOption struct {
Expand All @@ -32,7 +31,7 @@ func NewTracesRouter(opts ...TracesRouterOption) connector.TracesRouter {
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewTracesRouter(consumers).(connector.TracesRouter)
return connector.NewTracesRouter(consumers)
}

type MetricsRouterOption struct {
Expand All @@ -56,7 +55,7 @@ func NewMetricsRouter(opts ...MetricsRouterOption) connector.MetricsRouter {
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewMetricsRouter(consumers).(connector.MetricsRouter)
return connector.NewMetricsRouter(consumers)
}

type LogsRouterOption struct {
Expand All @@ -80,5 +79,5 @@ func NewLogsRouter(opts ...LogsRouterOption) connector.LogsRouter {
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewLogsRouter(consumers).(connector.LogsRouter)
return connector.NewLogsRouter(consumers)
}
2 changes: 1 addition & 1 deletion connector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
go.opentelemetry.io/collector/component v0.91.0
go.opentelemetry.io/collector/consumer v0.91.0
go.opentelemetry.io/collector/pdata v1.0.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
)

Expand All @@ -32,7 +33,6 @@ require (
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
72 changes: 72 additions & 0 deletions connector/logs_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package connector // import "go.opentelemetry.io/collector/connector"

import (
"fmt"

"go.opentelemetry.io/collector/internal/fanoutconsumer"

"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)

// LogsRouter feeds the first consumer.Logs in each of the specified pipelines.
type LogsRouter interface {
consumer.Logs
Consumer(...component.ID) (consumer.Logs, error)
PipelineIDs() []component.ID
privateFunc()
}

var _ LogsRouter = (*logsRouter)(nil)

type logsRouter struct {
consumer.Logs
consumers map[component.ID]consumer.Logs
}

func NewLogsRouter(cm map[component.ID]consumer.Logs) LogsRouter {
consumers := make([]consumer.Logs, 0, len(cm))
for _, consumer := range cm {
consumers = append(consumers, consumer)
}
return &logsRouter{
Logs: fanoutconsumer.NewLogs(consumers),
consumers: cm,
}
}

func (r *logsRouter) PipelineIDs() []component.ID {
ids := make([]component.ID, 0, len(r.consumers))
for id := range r.consumers {
ids = append(ids, id)
}
return ids
}

func (r *logsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Logs, error) {
if len(pipelineIDs) == 0 {
return nil, fmt.Errorf("missing consumers")
}
consumers := make([]consumer.Logs, 0, len(pipelineIDs))
var errors error
for _, pipelineID := range pipelineIDs {
c, ok := r.consumers[pipelineID]
if ok {
consumers = append(consumers, c)
} else {
errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID))
}
}
if errors != nil {
// TODO potentially this could return a NewLogs with the valid consumers
return nil, errors
}
return fanoutconsumer.NewLogs(consumers), nil
}

func (r *logsRouter) privateFunc() {}

Check warning on line 72 in connector/logs_router.go

View check run for this annotation

Codecov / codecov/patch

connector/logs_router.go#L72

Added line #L72 was not covered by tests
156 changes: 156 additions & 0 deletions connector/logs_router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package connector

import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/plog"
)

type mutatingLogsSink struct {
*consumertest.LogsSink
}

func (mts *mutatingLogsSink) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

func TestLogsMultiplexing(t *testing.T) {
var max = 20
for numIDs := 1; numIDs < max; numIDs++ {
for numCons := 1; numCons < max; numCons++ {
for numLogs := 1; numLogs < max; numLogs++ {
t.Run(
fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numLogs),
fuzzLogs(numIDs, numCons, numLogs),
)
}
}
}
}

func fuzzLogs(numIDs, numCons, numLogs int) func(*testing.T) {
return func(t *testing.T) {
allIDs := make([]component.ID, 0, numCons)
allCons := make([]consumer.Logs, 0, numCons)
allConsMap := make(map[component.ID]consumer.Logs)

// If any consumer is mutating, the router must report mutating
for i := 0; i < numCons; i++ {
allIDs = append(allIDs, component.NewIDWithName("sink", strconv.Itoa(numCons)))
// Random chance for each consumer to be mutating
if (numCons+numLogs+i)%4 == 0 {
allCons = append(allCons, &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)})
} else {
allCons = append(allCons, new(consumertest.LogsSink))
}
allConsMap[allIDs[i]] = allCons[i]
}

r := NewLogsRouter(allConsMap)
ld := testdata.GenerateLogs(1)

// Keep track of how many logs each consumer should receive.
// This will be validated after every call to RouteLogs.
expected := make(map[component.ID]int, numCons)

for i := 0; i < numLogs; i++ {
// Build a random set of ids (no duplicates)
randCons := make(map[component.ID]bool, numIDs)
for j := 0; j < numIDs; j++ {
// This number should be pretty random and less than numCons
conNum := (numCons + numIDs + i + j) % numCons
randCons[allIDs[conNum]] = true
}

// Convert to slice, update expectations
conIDs := make([]component.ID, 0, len(randCons))
for id := range randCons {
conIDs = append(conIDs, id)
expected[id]++
}

// Route to list of consumers
fanout, err := r.Consumer(conIDs...)
assert.NoError(t, err)
assert.NoError(t, fanout.ConsumeLogs(context.Background(), ld))

// Validate expectations for all consumers
for id := range expected {
logs := []plog.Logs{}
switch con := allConsMap[id].(type) {
case *consumertest.LogsSink:
logs = con.AllLogs()
case *mutatingLogsSink:
logs = con.AllLogs()
}
assert.Len(t, logs, expected[id])
for n := 0; n < len(logs); n++ {
assert.EqualValues(t, ld, logs[n])
}
}
}
}
}

func TestLogsGetConsumers(t *testing.T) {
ctx := context.Background()
ld := testdata.GenerateLogs(1)

fooID := component.NewID("foo")
barID := component.NewID("bar")

foo := new(consumertest.LogsSink)
bar := new(consumertest.LogsSink)
r := NewLogsRouter(map[component.ID]consumer.Logs{fooID: foo, barID: bar})

rcs := r.PipelineIDs()
assert.Len(t, rcs, 2)
assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs)

assert.Len(t, foo.AllLogs(), 0)
assert.Len(t, bar.AllLogs(), 0)

both, err := r.Consumer(fooID, barID)
assert.NotNil(t, both)
assert.NoError(t, err)

assert.NoError(t, both.ConsumeLogs(ctx, ld))
assert.Len(t, foo.AllLogs(), 1)
assert.Len(t, bar.AllLogs(), 1)

fooOnly, err := r.Consumer(fooID)
assert.NotNil(t, fooOnly)
assert.NoError(t, err)

assert.NoError(t, fooOnly.ConsumeLogs(ctx, ld))
assert.Len(t, foo.AllLogs(), 2)
assert.Len(t, bar.AllLogs(), 1)

barOnly, err := r.Consumer(barID)
assert.NotNil(t, barOnly)
assert.NoError(t, err)

assert.NoError(t, barOnly.ConsumeLogs(ctx, ld))
assert.Len(t, foo.AllLogs(), 2)
assert.Len(t, bar.AllLogs(), 2)

none, err := r.Consumer()
assert.Nil(t, none)
assert.Error(t, err)

fake, err := r.Consumer(component.NewID("fake"))
assert.Nil(t, fake)
assert.Error(t, err)
}
Loading

0 comments on commit fd26316

Please sign in to comment.