Skip to content

Commit

Permalink
Add entity type to components
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrannajaryan committed Feb 21, 2024
1 parent 1b9ebad commit a47d4cd
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 1 deletion.
42 changes: 42 additions & 0 deletions consumer/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumer // import "go.opentelemetry.io/collector/consumer"

import (
"context"

"go.opentelemetry.io/collector/pdata/pentity"
)

// Entities is an interface that receives pentity.Entities, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Entities interface {
baseConsumer
// ConsumeEntities receives pentity.Entities for consumption.
ConsumeEntities(ctx context.Context, td pentity.Entities) error
}

// ConsumeEntitiesFunc is a helper function that is similar to ConsumeEntities.
type ConsumeEntitiesFunc func(ctx context.Context, td pentity.Entities) error

// ConsumeEntities calls f(ctx, td).
func (f ConsumeEntitiesFunc) ConsumeEntities(ctx context.Context, td pentity.Entities) error {
return f(ctx, td)
}

type baseEntities struct {
*baseImpl
ConsumeEntitiesFunc
}

// NewEntities returns a Entities configured with the provided options.
func NewEntities(consume ConsumeEntitiesFunc, options ...Option) (Entities, error) {
if consume == nil {
return nil, errNilFunc
}
return &baseEntities{
baseImpl: newBaseImpl(options...),
ConsumeEntitiesFunc: consume,
}, nil
}
55 changes: 55 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type Logs interface {
consumer.Logs
}

// Entities is an exporter that can consume entities.
type Entities interface {
component.Component
consumer.Entities
}

// CreateSettings configures exporter creators.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
Expand Down Expand Up @@ -73,6 +79,14 @@ type Factory interface {
// LogsExporterStability gets the stability level of the LogsExporter.
LogsExporterStability() component.StabilityLevel

// CreateEntitiesExporter creates a EntitiesExporter based on the config.
// If the exporter type does not support entities or if the config is not valid,
// an error will be returned instead.
CreateEntitiesExporter(ctx context.Context, set CreateSettings, cfg component.Config) (Entities, error)

// EntitiesExporterStability gets the stability level of the EntitiesExporter.
EntitiesExporterStability() component.StabilityLevel

unexportedFactoryFunc()
}

Expand Down Expand Up @@ -124,6 +138,17 @@ func (f CreateLogsFunc) CreateLogsExporter(ctx context.Context, set CreateSettin
return f(ctx, set, cfg)
}

// CreateEntitiesFunc is the equivalent of Factory.CreateEntities.
type CreateEntitiesFunc func(context.Context, CreateSettings, component.Config) (Entities, error)

// CreateEntitiesExporter implements Factory.CreateEntitiesExporter().
func (f CreateEntitiesFunc) CreateEntitiesExporter(ctx context.Context, set CreateSettings, cfg component.Config) (Entities, error) {
if f == nil {
return nil, component.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg)
}

type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
Expand All @@ -133,6 +158,8 @@ type factory struct {
metricsStabilityLevel component.StabilityLevel
CreateLogsFunc
logsStabilityLevel component.StabilityLevel
CreateEntitiesFunc
entitiesStabilityLevel component.StabilityLevel
}

func (f *factory) Type() component.Type {
Expand All @@ -153,6 +180,10 @@ func (f *factory) LogsExporterStability() component.StabilityLevel {
return f.logsStabilityLevel
}

func (f *factory) EntitiesExporterStability() component.StabilityLevel {
return f.entitiesStabilityLevel
}

// WithTraces overrides the default "error not supported" implementation for CreateTracesExporter and the default "undefined" stability level.
func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand All @@ -177,6 +208,14 @@ func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOpt
})
}

// WithEntities overrides the default "error not supported" implementation for CreateEntitiesExporter and the default "undefined" stability level.
func WithEntities(createEntities CreateEntitiesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.entitiesStabilityLevel = sl
o.CreateEntitiesFunc = createEntities
})
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down Expand Up @@ -261,6 +300,22 @@ func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings) (Logs, err
return f.CreateLogsExporter(ctx, set, cfg)
}

// CreateEntities creates a Entities exporter based on the settings and config.
func (b *Builder) CreateEntities(ctx context.Context, set CreateSettings) (Entities, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("exporter %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("exporter factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.EntitiesExporterStability())
return f.CreateEntitiesExporter(ctx, set, cfg)
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}
Expand Down
5 changes: 4 additions & 1 deletion exporter/internal/otlptext/resource.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package otlptext
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlptext // import "go.opentelemetry.io/collector/exporter/internal/otlptext"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down
60 changes: 60 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type Logs interface {
consumer.Logs
}

// Entities is a processor that can consume entities.
type Entities interface {
component.Component
consumer.Entities
}

// CreateSettings is passed to Create* functions in Factory.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
Expand Down Expand Up @@ -73,6 +79,14 @@ type Factory interface {
// LogsProcessorStability gets the stability level of the LogsProcessor.
LogsProcessorStability() component.StabilityLevel

// CreateEntitiesProcessor creates a EntitiesProcessor based on the config.
// If the processor type does not support entities or if the config is not valid,
// an error will be returned instead.
CreateEntitiesProcessor(ctx context.Context, set CreateSettings, cfg component.Config, nextConsumer consumer.Entities) (Entities, error)

// EntitiesProcessorStability gets the stability level of the EntitiesProcessor.
EntitiesProcessorStability() component.StabilityLevel

unexportedFactoryFunc()
}

Expand Down Expand Up @@ -138,6 +152,22 @@ func (f CreateLogsFunc) CreateLogsProcessor(
return f(ctx, set, cfg, nextConsumer)
}

// CreateEntitiesFunc is the equivalent of Factory.CreateEntities().
type CreateEntitiesFunc func(context.Context, CreateSettings, component.Config, consumer.Entities) (Entities, error)

// CreateEntitiesProcessor implements Factory.CreateEntitiesProcessor().
func (f CreateEntitiesFunc) CreateEntitiesProcessor(
ctx context.Context,
set CreateSettings,
cfg component.Config,
nextConsumer consumer.Entities,
) (Entities, error) {
if f == nil {
return nil, component.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
Expand All @@ -147,6 +177,8 @@ type factory struct {
metricsStabilityLevel component.StabilityLevel
CreateLogsFunc
logsStabilityLevel component.StabilityLevel
CreateEntitiesFunc
entitiesStabilityLevel component.StabilityLevel
}

func (f *factory) Type() component.Type {
Expand All @@ -167,6 +199,10 @@ func (f factory) LogsProcessorStability() component.StabilityLevel {
return f.logsStabilityLevel
}

func (f factory) EntitiesProcessorStability() component.StabilityLevel {
return f.entitiesStabilityLevel
}

// WithTraces overrides the default "error not supported" implementation for CreateTraces and the default "undefined" stability level.
func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand All @@ -191,6 +227,14 @@ func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOpt
})
}

// WithEntities overrides the default "error not supported" implementation for CreateEntities and the default "undefined" stability level.
func WithEntities(createEntities CreateEntitiesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.entitiesStabilityLevel = sl
o.CreateEntitiesFunc = createEntities
})
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down Expand Up @@ -275,6 +319,22 @@ func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consu
return f.CreateLogsProcessor(ctx, set, cfg, next)
}

// CreateEntities creates a Entities processor based on the settings and config.
func (b *Builder) CreateEntities(ctx context.Context, set CreateSettings, next consumer.Entities) (Entities, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.EntitiesProcessorStability())
return f.CreateEntitiesProcessor(ctx, set, cfg, next)
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}
Expand Down
63 changes: 63 additions & 0 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ type Logs interface {
component.Component
}

// Entities receiver receives entities.
// Its purpose is to translate data from any format to the collector's internal entities data format.
// EntitiesReceiver feeds a consumer.Entities with data.
//
// For example, it could be a receiver that reads sysentities and convert them into plog.Entities.
type Entities interface {
component.Component
}

// CreateSettings configures Receiver creators.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
Expand Down Expand Up @@ -82,6 +91,14 @@ type Factory interface {
// LogsReceiverStability gets the stability level of the LogsReceiver.
LogsReceiverStability() component.StabilityLevel

// CreateEntitiesReceiver creates a EntitiesReceiver based on this config.
// If the receiver type does not support the data type or if the config is not valid
// an error will be returned instead.
CreateEntitiesReceiver(ctx context.Context, set CreateSettings, cfg component.Config, nextConsumer consumer.Entities) (Entities, error)

// EntitiesReceiverStability gets the stability level of the EntitiesReceiver.
EntitiesReceiverStability() component.StabilityLevel

unexportedFactoryFunc()
}

Expand Down Expand Up @@ -145,6 +162,22 @@ func (f CreateLogsFunc) CreateLogsReceiver(
return f(ctx, set, cfg, nextConsumer)
}

// CreateEntitiesFunc is the equivalent of ReceiverFactory.CreateEntitiesReceiver().
type CreateEntitiesFunc func(context.Context, CreateSettings, component.Config, consumer.Entities) (Entities, error)

// CreateEntitiesReceiver implements Factory.CreateEntitiesReceiver().
func (f CreateEntitiesFunc) CreateEntitiesReceiver(
ctx context.Context,
set CreateSettings,
cfg component.Config,
nextConsumer consumer.Entities,
) (Entities, error) {
if f == nil {
return nil, component.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
Expand All @@ -154,6 +187,8 @@ type factory struct {
metricsStabilityLevel component.StabilityLevel
CreateLogsFunc
logsStabilityLevel component.StabilityLevel
CreateEntitiesFunc
entitiesStabilityLevel component.StabilityLevel
}

func (f *factory) Type() component.Type {
Expand All @@ -174,6 +209,10 @@ func (f *factory) LogsReceiverStability() component.StabilityLevel {
return f.logsStabilityLevel
}

func (f *factory) EntitiesReceiverStability() component.StabilityLevel {
return f.entitiesStabilityLevel
}

// WithTraces overrides the default "error not supported" implementation for CreateTracesReceiver and the default "undefined" stability level.
func WithTraces(createTracesReceiver CreateTracesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand All @@ -198,6 +237,14 @@ func WithLogs(createLogsReceiver CreateLogsFunc, sl component.StabilityLevel) Fa
})
}

// WithEntities overrides the default "error not supported" implementation for CreateEntitiesReceiver and the default "undefined" stability level.
func WithEntities(createEntitiesReceiver CreateEntitiesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.entitiesStabilityLevel = sl
o.CreateEntitiesFunc = createEntitiesReceiver
})
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down Expand Up @@ -282,6 +329,22 @@ func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consu
return f.CreateLogsReceiver(ctx, set, cfg, next)
}

// CreateEntities creates a Entities receiver based on the settings and config.
func (b *Builder) CreateEntities(ctx context.Context, set CreateSettings, next consumer.Entities) (Entities, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.EntitiesReceiverStability())
return f.CreateEntitiesReceiver(ctx, set, cfg, next)
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}
Expand Down

0 comments on commit a47d4cd

Please sign in to comment.