Skip to content

Commit

Permalink
Add component node build methods
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jan 25, 2023
1 parent 9e4b354 commit ee3f056
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions service/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"fmt"
"hash/fnv"
"strings"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)

type nodeID int64
Expand Down Expand Up @@ -103,3 +109,76 @@ func newConnectorNode(exprPipelineType, rcvrPipelineType component.DataType, con
rcvrPipelineType: rcvrPipelineType,
}
}

func (n *connectorNode) build(
ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *connector.Builder,
nexts []baseConsumer,
) error {
if len(nexts) == 0 {
return fmt.Errorf("connector %q has no next consumer", n.componentID)
}
set := connector.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
set.TelemetrySettings.Logger = components.ConnectorLogger(set.TelemetrySettings.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType)

var err error
switch n.rcvrPipelineType {
case component.DataTypeTraces:
var consumers []consumer.Traces
for _, next := range nexts {
tracesConsumer, ok := next.(consumer.Traces)
if !ok {
return fmt.Errorf("next component is not a traces consumer: %s", n.componentID)
}
consumers = append(consumers, tracesConsumer)
}
fanoutConsumer := fanoutconsumer.NewTraces(consumers)
switch n.exprPipelineType {
case component.DataTypeTraces:
n.Component, err = builder.CreateTracesToTraces(ctx, set, fanoutConsumer)
case component.DataTypeMetrics:
n.Component, err = builder.CreateMetricsToTraces(ctx, set, fanoutConsumer)
case component.DataTypeLogs:
n.Component, err = builder.CreateLogsToTraces(ctx, set, fanoutConsumer)
}
case component.DataTypeMetrics:
var consumers []consumer.Metrics
for _, next := range nexts {
metricsConsumer, ok := next.(consumer.Metrics)
if !ok {
return fmt.Errorf("next component is not a metrics consumer: %s", n.componentID)
}
consumers = append(consumers, metricsConsumer)
}
fanoutConsumer := fanoutconsumer.NewMetrics(consumers)
switch n.exprPipelineType {
case component.DataTypeTraces:
n.Component, err = builder.CreateTracesToMetrics(ctx, set, fanoutConsumer)
case component.DataTypeMetrics:
n.Component, err = builder.CreateMetricsToMetrics(ctx, set, fanoutConsumer)
case component.DataTypeLogs:
n.Component, err = builder.CreateLogsToMetrics(ctx, set, fanoutConsumer)
}
case component.DataTypeLogs:
var consumers []consumer.Logs
for _, next := range nexts {
logsConsumer, ok := next.(consumer.Logs)
if !ok {
return fmt.Errorf("next component is not a logs consumer: %s", n.componentID)
}
consumers = append(consumers, logsConsumer)
}
fanoutConsumer := fanoutconsumer.NewLogs(consumers)
switch n.exprPipelineType {
case component.DataTypeTraces:
n.Component, err = builder.CreateTracesToLogs(ctx, set, fanoutConsumer)
case component.DataTypeMetrics:
n.Component, err = builder.CreateMetricsToLogs(ctx, set, fanoutConsumer)
case component.DataTypeLogs:
n.Component, err = builder.CreateLogsToLogs(ctx, set, fanoutConsumer)
}
}
return err
}

0 comments on commit ee3f056

Please sign in to comment.