-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add connector types #6689
Add connector types #6689
Conversation
Codecov ReportBase: 90.89% // Head: 90.46% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## main #6689 +/- ##
==========================================
- Coverage 90.89% 90.46% -0.43%
==========================================
Files 244 244
Lines 14161 14269 +108
==========================================
+ Hits 12872 12909 +37
- Misses 1039 1111 +72
+ Partials 250 249 -1
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
608f057
to
bc0b765
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the previous PR. I fail to understand the need of these 20+ interfaces... Please clarify
There are nine interfaces, one for each combination of data types. We need a representation for which combination the connector supports. For a receiver used in a logs pipeline, we can just check that it is For a connector used to from a traces pipeline to a metrics pipeline, we need to know that there is an explicit implementation of consuming traces and emitting metrics. Can you help me understand which part is unclear here? Is it the existence of the interfaces in the first place? Or that they each have an associated method? Something else? |
I worked out a change that would reduce down to three connector types. This does reduce code in the core packages. However, it also increases complexity in some individual connector implementations. Specifically, connectors that support consumption of a signal in more than one way (e.g. both This can be seen in this diff @bogdandrutu, do you mind taking a look at the linked diff to let me know if you think this approach is what you have in mind? |
Try this as replacement for that file, what do you think? // Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package connector // import "go.opentelemetry.io/collector/service/internal/testcomponents"
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)
const connType = "exampleconnector"
// ExampleConnectorConfig config for ExampleConnector.
type ExampleConnectorConfig struct {
config.ConnectorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}
// ExampleConnectorFactory is factory for ExampleConnector.
var ExampleConnectorFactory = component.NewConnectorFactory(
connType,
createExampleConnectorDefaultConfig,
component.WithTracesToTracesConnector(createExampleTracesToTracesConnector, component.StabilityLevelDevelopment),
component.WithTracesToMetricsConnector(createExampleTracesToMetricsConnector, component.StabilityLevelDevelopment),
component.WithTracesToLogsConnector(createExampleTracesToLogsConnector, component.StabilityLevelDevelopment),
component.WithMetricsToTracesConnector(createExampleMetricsToTracesConnector, component.StabilityLevelDevelopment),
component.WithMetricsToMetricsConnector(createExampleMetricsToMetricsConnector, component.StabilityLevelDevelopment),
component.WithMetricsToLogsConnector(createExampleMetricsToLogsConnector, component.StabilityLevelDevelopment),
component.WithLogsToTracesConnector(createExampleLogsToTracesConnector, component.StabilityLevelDevelopment),
component.WithLogsToMetricsConnector(createExampleLogsToMetricsConnector, component.StabilityLevelDevelopment),
component.WithLogsToLogsConnector(createExampleLogsToLogsConnector, component.StabilityLevelDevelopment),
)
func createExampleConnectorDefaultConfig() component.Config {
return &ExampleConnectorConfig{
ConnectorSettings: config.NewConnectorSettings(component.NewID(typeStr)),
}
}
func createExampleTracesToTracesConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, traces consumer.Traces) (component.TracesConnector, error) {
return &ExampleConnector{
ConsumeTracesFunc: traces.ConsumeTraces,
}, nil
}
func createExampleTracesToMetricsConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, metrics consumer.Metrics) (component.TracesConnector, error) {
return &ExampleConnector{
ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error {
return metrics.ConsumeMetrics(ctx, testdata.GenerateMetrics(td.SpanCount()))
},
}, nil
}
func createExampleTracesToLogsConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, logs consumer.Logs) (component.TracesConnector, error) {
return &ExampleConnector{
ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error {
return logs.ConsumeLogs(ctx, testdata.GenerateLogs(td.SpanCount()))
},
}, nil
}
func createExampleMetricsToTracesConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, traces consumer.Traces) (component.MetricsConnector, error) {
return &ExampleConnector{
ConsumeMetricsFunc: func(ctx context.Context, md pmetric.Metrics) error {
return traces.ConsumeTraces(ctx, testdata.GenerateTraces(md.MetricCount()))
},
}, nil
}
func createExampleMetricsToMetricsConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, metrics consumer.Metrics) (component.MetricsConnector, error) {
return &ExampleConnector{
ConsumeMetricsFunc: metrics.ConsumeMetrics,
}, nil
}
func createExampleMetricsToLogsConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, logs consumer.Logs) (component.MetricsConnector, error) {
return &ExampleConnector{
ConsumeMetricsFunc: func(ctx context.Context, md pmetric.Metrics) error {
return logs.ConsumeLogs(ctx, testdata.GenerateLogs(md.MetricCount()))
},
}, nil
}
func createExampleLogsToTracesConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, traces consumer.Traces) (component.LogsConnector, error) {
return &ExampleConnector{
ConsumeLogsFunc: func(ctx context.Context, ld plog.Logs) error {
return traces.ConsumeTraces(ctx, testdata.GenerateTraces(ld.LogRecordCount()))
},
}, nil
}
func createExampleLogsToMetricsConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, metrics consumer.Metrics) (component.LogsConnector, error) {
return &ExampleConnector{
ConsumeLogsFunc: func(ctx context.Context, ld plog.Logs) error {
return metrics.ConsumeMetrics(ctx, testdata.GenerateMetrics(ld.LogRecordCount()))
},
}, nil
}
func createExampleLogsToLogsConnector(_ context.Context, _ component.ConnectorCreateSettings, _ component.Config, logs consumer.Logs) (component.LogsConnector, error) {
return &ExampleConnector{
ConsumeLogsFunc: logs.ConsumeLogs,
}, nil
}
var _ StatefulComponent = &ExampleConnector{}
type ExampleConnector struct {
consumer.ConsumeTracesFunc
consumer.ConsumeMetricsFunc
consumer.ConsumeLogsFunc
componentState
}
// Start tells the Connector to start.
func (c *ExampleConnector) Start(_ context.Context, _ component.Host) error {
c.started = true
return nil
}
// Shutdown is invoked during shutdown.
func (c *ExampleConnector) Shutdown(context.Context) error {
c.stopped = true
return nil
}
func (c *ExampleConnector) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
} |
bc0b765
to
8904de5
Compare
I see what you are saying, that the example connector can be compressed down quite a bit. I think this is somewhat of a special case though, where the Consume funcs are so trivial that we don't mind declaring them inline. In many cases, the functions may be much longer, and then I think we end up with something closer to my implementation. To me this seems like the kind of concern that the framework should solve once for all component authors, but I'm fine with the reduced interfaces and have updated this PR accordingly. |
b9a2bdb
to
f0be2b7
Compare
* Add connector types * bump
* Add connector types * bump
Part of #6372, reimplemented within
connectors
package.Conversation here questions whether this is the proper implementation, but I would like to suggest that we use it initially. It's currently localized to the
connectors
package and related functionality will be hidden behind a feature gate. Once the functionality is fully implemented, it will be easier to reconsider alternatives.