Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][graph] Separate node types #11321

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions service/internal/graph/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/pipeline"
)

const capabilitiesSeed = "capabilities"

var _ consumerNode = (*capabilitiesNode)(nil)

// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
// There are two purposes for this node:
// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data.
// 2. Present a consistent "first consumer" for each pipeline.
// The nodeID is derived from "pipeline ID".
type capabilitiesNode struct {
nodeID
pipelineID pipeline.ID
baseConsumer
consumer.ConsumeTracesFunc
consumer.ConsumeMetricsFunc
consumer.ConsumeLogsFunc
consumerprofiles.ConsumeProfilesFunc
}

func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
return &capabilitiesNode{
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
pipelineID: pipelineID,
}
}

func (n *capabilitiesNode) getConsumer() baseConsumer {
return n
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,215 +5,20 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"context"
"fmt"
"hash/fnv"
"strings"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentprofiles"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/connectorprofiles"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/components"
)

const (
receiverSeed = "receiver"
processorSeed = "processor"
exporterSeed = "exporter"
connectorSeed = "connector"
capabilitiesSeed = "capabilities"
fanOutToExporters = "fanout_to_exporters"
)

// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
type baseConsumer interface {
Capabilities() consumer.Capabilities
}

type nodeID int64

func (n nodeID) ID() int64 {
return int64(n)
}

func newNodeID(parts ...string) nodeID {
h := fnv.New64a()
h.Write([]byte(strings.Join(parts, "|")))
return nodeID(h.Sum64())
}

type consumerNode interface {
getConsumer() baseConsumer
}

// A receiver instance can be shared by multiple pipelines of the same type.
// Therefore, nodeID is derived from "pipeline type" and "component ID".
type receiverNode struct {
nodeID
componentID component.ID
pipelineType pipeline.Signal
component.Component
}

func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode {
return &receiverNode{
nodeID: newNodeID(receiverSeed, pipelineType.String(), recvID.String()),
componentID: recvID,
pipelineType: pipelineType,
}
}

func (n *receiverNode) buildComponent(ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ReceiverBuilder,
nexts []baseConsumer,
) error {
tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType)
set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
var err error
switch n.pipelineType {
case pipeline.SignalTraces:
var consumers []consumer.Traces
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers))
case pipeline.SignalMetrics:
var consumers []consumer.Metrics
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Metrics))
}
n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers))
case pipeline.SignalLogs:
var consumers []consumer.Logs
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers))
case componentprofiles.SignalProfiles:
var consumers []consumerprofiles.Profiles
for _, next := range nexts {
consumers = append(consumers, next.(consumerprofiles.Profiles))
}
n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers))
default:
return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType)
}
if err != nil {
return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, n.pipelineType, err)
}
return nil
}

var _ consumerNode = (*processorNode)(nil)

// Every processor instance is unique to one pipeline.
// Therefore, nodeID is derived from "pipeline ID" and "component ID".
type processorNode struct {
nodeID
componentID component.ID
pipelineID pipeline.ID
component.Component
}

func newProcessorNode(pipelineID pipeline.ID, procID component.ID) *processorNode {
return &processorNode{
nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()),
componentID: procID,
pipelineID: pipelineID,
}
}

func (n *processorNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *processorNode) buildComponent(ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ProcessorBuilder,
next baseConsumer,
) error {
tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID)
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
var err error
switch n.pipelineID.Signal() {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTraces(ctx, set, next.(consumer.Traces))
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetrics(ctx, set, next.(consumer.Metrics))
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogs(ctx, set, next.(consumer.Logs))
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfiles(ctx, set, next.(consumerprofiles.Profiles))
default:
return fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, n.pipelineID.String(), n.pipelineID.Signal())
}
if err != nil {
return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err)
}
return nil
}

var _ consumerNode = (*exporterNode)(nil)

// An exporter instance can be shared by multiple pipelines of the same type.
// Therefore, nodeID is derived from "pipeline type" and "component ID".
type exporterNode struct {
nodeID
componentID component.ID
pipelineType pipeline.Signal
component.Component
}

func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
return &exporterNode{
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
componentID: exprID,
pipelineType: pipelineType,
}
}

func (n *exporterNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *exporterNode) buildComponent(
ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ExporterBuilder,
) error {
tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType)
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
var err error
switch n.pipelineType {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTraces(ctx, set)
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetrics(ctx, set)
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogs(ctx, set)
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfiles(ctx, set)
default:
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType)
}
if err != nil {
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
}
return nil
}
const connectorSeed = "connector"

var _ consumerNode = (*connectorNode)(nil)

Expand Down Expand Up @@ -420,52 +225,3 @@ func (n *connectorNode) buildComponent(
}
return nil
}

var _ consumerNode = (*capabilitiesNode)(nil)

// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
// There are two purposes for this node:
// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data.
// 2. Present a consistent "first consumer" for each pipeline.
// The nodeID is derived from "pipeline ID".
type capabilitiesNode struct {
nodeID
pipelineID pipeline.ID
baseConsumer
consumer.ConsumeTracesFunc
consumer.ConsumeMetricsFunc
consumer.ConsumeLogsFunc
consumerprofiles.ConsumeProfilesFunc
}

func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
return &capabilitiesNode{
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
pipelineID: pipelineID,
}
}

func (n *capabilitiesNode) getConsumer() baseConsumer {
return n
}

var _ consumerNode = (*fanOutNode)(nil)

// Each pipeline has one fan-out node before exporters.
// Therefore, nodeID is derived from "pipeline ID".
type fanOutNode struct {
nodeID
pipelineID pipeline.ID
baseConsumer
}

func newFanOutNode(pipelineID pipeline.ID) *fanOutNode {
return &fanOutNode{
nodeID: newNodeID(fanOutToExporters, pipelineID.String()),
pipelineID: pipelineID,
}
}

func (n *fanOutNode) getConsumer() baseConsumer {
return n.baseConsumer
}
17 changes: 17 additions & 0 deletions service/internal/graph/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"go.opentelemetry.io/collector/consumer"
)

// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
type baseConsumer interface {
Capabilities() consumer.Capabilities
}

type consumerNode interface {
getConsumer() baseConsumer
}
68 changes: 68 additions & 0 deletions service/internal/graph/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentprofiles"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/components"
)

const exporterSeed = "exporter"

var _ consumerNode = (*exporterNode)(nil)

// An exporter instance can be shared by multiple pipelines of the same type.
// Therefore, nodeID is derived from "pipeline type" and "component ID".
type exporterNode struct {
nodeID
componentID component.ID
pipelineType pipeline.Signal
component.Component
}

func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
return &exporterNode{
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
componentID: exprID,
pipelineType: pipelineType,
}
}

func (n *exporterNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *exporterNode) buildComponent(
ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ExporterBuilder,
) error {
tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType)
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
var err error
switch n.pipelineType {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTraces(ctx, set)
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetrics(ctx, set)
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogs(ctx, set)
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfiles(ctx, set)
default:
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType)
}
if err != nil {

Check warning on line 64 in service/internal/graph/exporter.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/exporter.go#L61-L64

Added lines #L61 - L64 were not covered by tests
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
}
return nil
}
Loading
Loading