Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/pulsarreceiver] enhance pulsar receiver #28681

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/pulsar_receiver_enhance.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pulsarreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: remove `topic`,`subscription`,`encoding`,`consumer_name` configuration items but introduce `trace`,`metric`,`log` items to make pulsar receiver more configurable.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [28685]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
93 changes: 61 additions & 32 deletions receiver/pulsarreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,78 @@ Pulsar receiver receives logs, metrics, and traces from Pulsar.
## Getting Started

The following settings can be optionally configured:

- `endpoint` (default = pulsar://localhost:6650): The url of pulsar cluster.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the pulsar topic to consume from.
- `encoding` (default = otlp_proto): The encoding of the payload sent to pulsar. Available encodings:
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
- `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
- `consumer_name`: specifies the consumer name.
- `trace`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make this breaking change if we still want to migrate to the encoding extensions after that? I think it's better to avoid introducing this intermediary config interface.

Also, we need to try to avoid such breaking changes. Ideally, we need to keep the old interface functioning for some time with deprecation warning logs.

If you have any other changes not breaking the user config like refactoring, feel free to submit other PRs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of new encoding methods is only a small part of the reason, I created an issue for the PR #28685, please take a look.

The component currently in the Alpha phase, I think the configuration changes comply with the agreement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for alpha, we should try to avoid breaking changes if possible.

- `topic`: The name of the pulsar topic. (default = ``, skip start traces exporter is topic not set)
- `encoding`
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`.
- `otlp_json`: the payload is json deserialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
- `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
- `subscription`: (default = otlp_subscription): the subscription name of consumer.
- `consumer_name`: specifies the consumer name.
- `metric`
- `topic`: The name of the pulsar topic. (default = ``, skip start metrics exporter is topic not set)
- `encoding`
- `otlp_proto`: the payload is deserialized to `ExportMetricServiceRequest`.
- `otlp_json`: the payload is json deserialized to `ExportMetricServiceRequest`.
- `subscription`: (default = otlp_subscription): the subscription name of consumer.
- `consumer_name`: specifies the consumer name.
- `log`
- `topic`: The name of the pulsar topic. (default = ``, skip start logs exporter is topic not set)
- `encoding`
- `otlp_proto`: the payload is deserialized to `ExportLogServiceRequest`.
- `otlp_json`: the payload is json deserialized to `ExportLogServiceRequest`.
- `subscription`: (default = otlp_subscription): the subscription name of consumer.
- `consumer_name`: specifies the consumer name.
- `auth`
- `tls`
- `cert_file`:
- `key_file`:
- `token`
- `tls`
- `cert_file`:
- `key_file`:
- `token`
- `oauth2`
- `issuer_url`:
- `client_id`:
- `audience`:
- `athenz`
- `provider_domain`:
- `tenant_domain`:
- `tenant_service`:
- `private_key`:
- `key_id`:
- `principal_header`:
- `zts_url`:
- `subscription` (default = otlp_subscription): the subscription name of consumer.
- `token`
- `oauth2`
- `issuer_url`:
- `client_id`:
- `audience`:
- `athenz`
- `provider_domain`:
- `tenant_domain`:
- `tenant_service`:
- `private_key`:
- `key_id`:
- `principal_header`:
- `zts_url`:
- `tls_trust_certs_file_path`: path to the CA cert. For a client this verifies the server certificate. Should
only be used if `insecure` is set to true.
- `tls_allow_insecure_connection`: configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)

- `tls_allow_insecure_connection`: configure whether the Pulsar client accept untrusted TLS certificate from broker (
default: false)

Example configuration:

```yaml
receivers:
pulsar:
endpoint: pulsar://localhost:6650
topic: otlp-spans
subscription: otlp_spans_sub
consumer_name: otlp_spans_sub_1
encoding: otlp_proto
trace:
topic: pulsar://public/default/otlp-spans
subscription: otlp_spans_sub
consumer_name: otlp_spans_sub_1
encoding: otlp_proto
metric:
topic: pulsar://public/default/otlp-metrics
subscription: otlp_metrics_sub
consumer_name: otlp_metrics_sub_1
encoding: otlp_proto
log:
topic: pulsar://public/default/otlp-logs
subscription: otlp_logs_sub
consumer_name: otlp_logs_sub_1
encoding: otlp_proto
auth:
tls:
cert_file: cert.pem
Expand Down
67 changes: 49 additions & 18 deletions receiver/pulsarreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,14 @@ import (
"go.opentelemetry.io/collector/config/configopaque"
)

var errMissTopicName = errors.New("miss topic name")

type Config struct {
// Configure the service URL for the Pulsar service.
Endpoint string `mapstructure:"endpoint"`
// The topic of pulsar to consume logs,metrics,traces. (default = "otlp_traces" for traces,
// "otlp_metrics" for metrics, "otlp_logs" for logs)
Topic string `mapstructure:"topic"`
// The Subscription that receiver will be consuming messages from (default "otlp_subscription")
Subscription string `mapstructure:"subscription"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
// Name specifies the consumer name.
ConsumerName string `mapstructure:"consumer_name"`
Endpoint string `mapstructure:"endpoint"`
Trace ReceiverOption `mapstructure:"trace"`
Log ReceiverOption `mapstructure:"log"`
Metric ReceiverOption `mapstructure:"metric"`
// Set the path to the trusted TLS certificate file
TLSTrustCertsFilePath string `mapstructure:"tls_trust_certs_file_path"`
// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
Expand Down Expand Up @@ -62,6 +58,30 @@ type OAuth2 struct {
Audience string `mapstructure:"audience"`
}

type ReceiverOption struct {
// The topic of pulsar to consume logs,metrics,traces. (default = "")
Topic string `mapstructure:"topic"`
// The Subscription that receiver will be consuming messages from (default "otlp_subscription")
Subscription string `mapstructure:"subscription"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
// Name specifies the consumer name.
ConsumerName string `mapstructure:"consumer_name"`
}

func (opt *ReceiverOption) validate() error {
if len(opt.Encoding) == 0 {
opt.Encoding = defaultEncoding
}
if len(opt.Topic) == 0 {
return errMissTopicName
}
if len(opt.Subscription) == 0 {
opt.Subscription = defaultSubscription
}
return nil
}

var _ component.Config = (*Config)(nil)

// Validate checks the receiver configuration is valid
Expand Down Expand Up @@ -118,20 +138,31 @@ func (cfg *Config) clientOptions() pulsar.ClientOptions {
return options
}

func (cfg *Config) consumerOptions() (pulsar.ConsumerOptions, error) {
func (cfg *Config) consumerOptions(option ReceiverOption) pulsar.ConsumerOptions {
options := pulsar.ConsumerOptions{
Type: pulsar.Failover,
Topic: cfg.Topic,
SubscriptionName: cfg.Subscription,
Topic: option.Topic,
SubscriptionName: option.Subscription,
}

if len(cfg.ConsumerName) > 0 {
options.Name = cfg.ConsumerName
if len(option.ConsumerName) > 0 {
options.Name = option.ConsumerName
}
return options
}

if options.SubscriptionName == "" || options.Topic == "" {
return options, errors.New("topic and subscription is required")
func (cfg *Config) createConsumer(option ReceiverOption) (pulsar.Client, pulsar.Consumer, error) {
client, err := pulsar.NewClient(cfg.clientOptions())
if err != nil {
return nil, nil, err
}

return options, nil
consumerOpts := cfg.consumerOptions(option)
consumer, err := client.Subscribe(consumerOpts)
if err != nil {
// Close the client if err happens
client.Close()
return nil, nil, err
}
return client, consumer, nil
}
24 changes: 19 additions & 5 deletions receiver/pulsarreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,25 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.Equal(t, &Config{
Topic: "otel-pulsar",
Endpoint: "pulsar://localhost:6500",
ConsumerName: "otel-collector",
Subscription: "otel-collector",
Encoding: defaultEncoding,
Endpoint: "pulsar://localhost:6500",
Trace: ReceiverOption{
Topic: "otel-pulsar",
ConsumerName: "otel-collector",
Subscription: "otel-collector",
Encoding: defaultEncoding,
},
Metric: ReceiverOption{
Topic: "otel-pulsar",
ConsumerName: "otel-collector",
Subscription: "otel-collector",
Encoding: defaultEncoding,
},
Log: ReceiverOption{
Topic: "otel-pulsar",
ConsumerName: "otel-collector",
Subscription: "otel-collector",
Encoding: defaultEncoding,
},
TLSTrustCertsFilePath: "ca.pem",
Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}},
},
Expand Down
Loading
Loading