Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/solacereceiver] Added configurable flow control interval, baggage unmarshalling #16570

Merged
merged 13 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/solace-baggage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: solacereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added baggage unmarshalling support (introduced in Solace PubSub+ Event Broker 10.2.1)

# One or more tracking issues related to the change
issues: [16570]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
16 changes: 16 additions & 0 deletions .chloggen/solace-flowctrl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: solacereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added configurable retry interval for flow control scenarios

# One or more tracking issues related to the change
issues: [16570]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
7 changes: 5 additions & 2 deletions receiver/solacereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

| Status | |
|--------------------------|-----------|
| Stability | [alpha] |
| Stability | [beta] |
| Supported pipeline types | traces |
| Distributions | [contrib] |

Expand Down Expand Up @@ -47,6 +47,9 @@ The configuration parameters are:
- username (The username to use; required for sasl_xauth2 authentication)
- bearer (The bearer token in plain text; required for sasl_xauth2 authentication)
- sasl_external (SASL External required to be used for TLS client cert authentication. When this authentication type is chosen then tls cert_file and key_file are required)
- flow_control (Configures the behaviour to use when temporary errors are encountered from the next component)
- delayed_retry (Default flow control strategy. Sets the flow control strategy to delayed retry which will wait before trying to push the message to the next component again)
- delay (The delay, e.g. 10ms, to wait before retrying. Default is 10ms)

### Examples:
Simple single node configuration with SASL plain authentication (TLS enabled by default)
Expand Down Expand Up @@ -92,5 +95,5 @@ service:
receivers: [solace/primary,solace/backup]
```

[alpha]:https://github.com/open-telemetry/opentelemetry-collector#alpha
[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
28 changes: 24 additions & 4 deletions receiver/solacereceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package solacereceiver // import "github.com/open-telemetry/opentelemetry-collec
import (
"errors"
"strings"
"time"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtls"
Expand All @@ -28,10 +29,12 @@ const (
)

var (
errMissingAuthDetails = errors.New("authentication details are required, either for plain user name password or XOAUTH2 or client certificate")
errMissingQueueName = errors.New("queue definition is required, queue definition has format queue://<queuename>")
errMissingPlainTextParams = errors.New("missing plain text auth params: Username, Password")
errMissingXauth2Params = errors.New("missing xauth2 text auth params: Username, Bearer")
errMissingAuthDetails = errors.New("authentication details are required, either for plain user name password or XOAUTH2 or client certificate")
errMissingQueueName = errors.New("queue definition is required, queue definition has format queue://<queuename>")
errMissingPlainTextParams = errors.New("missing plain text auth params: Username, Password")
errMissingXauth2Params = errors.New("missing xauth2 text auth params: Username, Bearer")
errMissingFlowControl = errors.New("missing flow control configuration: DelayedRetry must be selected")
errInvalidDelayedRetryDelay = errors.New("delayed_retry.delay must > 0")
)

// Config defines configuration for Solace receiver.
Expand All @@ -49,6 +52,8 @@ type Config struct {
TLS configtls.TLSClientSetting `mapstructure:"tls,omitempty"`

Auth Authentication `mapstructure:"auth"`

Flow FlowControl `mapstructure:"flow_control"`
}

// Validate checks the receiver configuration is valid
Expand All @@ -59,6 +64,11 @@ func (cfg *Config) Validate() error {
if len(strings.TrimSpace(cfg.Queue)) == 0 {
return errMissingQueueName
}
if cfg.Flow.DelayedRetry == nil {
return errMissingFlowControl
} else if cfg.Flow.DelayedRetry.Delay <= 0 {
return errInvalidDelayedRetryDelay
}
return nil
}

Expand All @@ -84,3 +94,13 @@ type SaslXAuth2Config struct {
// SaslExternalConfig defines the configuration for the SASL External used in conjunction with TLS client authentication.
type SaslExternalConfig struct {
}

// FlowControl defines the configuration for what to do in backpressure scenarios, e.g. memorylimiter errors
type FlowControl struct {
DelayedRetry *FlowControlDelayedRetry `mapstructure:"delayed_retry"`
}

// FlowControlDelayedRetry represents the strategy of waiting for a defined amount of time (in time.Duration) and attempt redelivery
type FlowControlDelayedRetry struct {
Delay time.Duration `mapstructure:"delay"`
}
27 changes: 27 additions & 0 deletions receiver/solacereceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package solacereceiver // import "github.com/open-telemetry/opentelemetry-collec
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -54,6 +55,11 @@ func TestLoadConfig(t *testing.T) {
Insecure: false,
InsecureSkipVerify: false,
},
Flow: FlowControl{
DelayedRetry: &FlowControlDelayedRetry{
Delay: 1 * time.Second,
},
},
},
},
{
Expand Down Expand Up @@ -99,6 +105,27 @@ func TestConfigValidateMissingQueue(t *testing.T) {
assert.Equal(t, errMissingQueueName, err)
}

func TestConfigValidateMissingFlowControl(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Queue = "someQueue"
cfg.Auth.PlainText = &SaslPlainTextConfig{"Username", "Password"}
// this should never happen in reality, test validation anyway
cfg.Flow.DelayedRetry = nil
err := cfg.Validate()
assert.Equal(t, errMissingFlowControl, err)
}

func TestConfigValidateInvalidFlowControlDelayedRetryDelay(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Queue = "someQueue"
cfg.Auth.PlainText = &SaslPlainTextConfig{"Username", "Password"}
cfg.Flow.DelayedRetry = &FlowControlDelayedRetry{
Delay: -30 * time.Second,
}
err := cfg.Validate()
assert.Equal(t, errInvalidDelayedRetryDelay, err)
}

func TestConfigValidateSuccess(t *testing.T) {
successCases := map[string]func(*Config){
"With Plaintext Auth": func(c *Config) {
Expand Down
6 changes: 6 additions & 0 deletions receiver/solacereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package solacereceiver // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -54,6 +55,11 @@ func createDefaultConfig() component.Config {
InsecureSkipVerify: false,
Insecure: false,
},
Flow: FlowControl{
DelayedRetry: &FlowControlDelayedRetry{
Delay: 10 * time.Millisecond,
},
},
}
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/solacereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
go.opentelemetry.io/collector/confmap v0.0.0-20221201172708-2bdff61fa52a
go.opentelemetry.io/collector/consumer v0.66.1-0.20221202005155-1c54042beb70
go.opentelemetry.io/collector/pdata v0.66.1-0.20221202005155-1c54042beb70
go.opentelemetry.io/otel v1.11.1
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.28.1
Expand All @@ -45,7 +46,6 @@ require (
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/featuregate v0.66.1-0.20221202005155-1c54042beb70 // indirect
go.opentelemetry.io/otel v1.11.1 // indirect
go.opentelemetry.io/otel/metric v0.33.0 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
61 changes: 38 additions & 23 deletions receiver/solacereceiver/model/receive_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@ syntax = "proto3";

package solace.messaging.proto.broker.trace.receive.v1;

// Version 1.0
// A message will be compatible with this specification if its topic matches:
// _telemetry/broker/trace/receive/v1[/additional/topic/levels]
//
// Messages with a topic of matching the following topic contain a v1.x
// specification of this message.
// #telemetry/broker/trace/receive/v1[/<additional/topic/levels]
// Note that the specification allows for additional topic levels to be added
// in the future. Receiving clients must not assume there are no additional
// topic levels.
//
// The versioning strategy follows semantic versioning such that all v1.x
// specifications are compatible with each other.
// Note that the topic above allows for additional topic levels to be added in
// the future. Receiving clients must not assume there are no additional topic
// levels.
//
// This message describes telemetry data that a Solace PubSub+ broker captures
// when a received message is identified as a message to be traced.
Expand All @@ -26,7 +21,7 @@ package solace.messaging.proto.broker.trace.receive.v1;
// Special priority is given to fields that can be repeated.
// - Field numbers 16+ are used for other attributes.
//
// Next available field ID: 39
// Next available field ID: 40
//
message SpanData {

Expand All @@ -40,8 +35,18 @@ message SpanData {
// If not present, this is a root span. If present, this is an 8-byte span ID
// of the parent span.
optional bytes parent_span_id = 16;

// tracestate string value, as per
// <https://www.w3.org/TR/trace-context/#tracestate-header>
optional string trace_state = 17;

// A baggage string formatted as described here:
// https://www.w3.org/TR/baggage/#x3-2-1-1-baggage-string
// This string may be truncated if the complete string, as received, would
// cause the broker's limit for application message properties to be exceeded.
// See dropped_application_message_properties for more details.
optional string baggage = 39;

// The start and end timestamps of the receive span. The start of the span is
// when Guaranteed Messaging processing begins in the broker.
sfixed64 start_time_unix_nano = 3;
Expand Down Expand Up @@ -151,13 +156,15 @@ message SpanData {
map<string, UserPropertyValue> user_properties = 14;

// Application message properties refers to the collection of:
// * trace_state
// * application_message_id
// * correlation_id
// * baggage
// * user_properties
// If dropped_application_message_properties is true, not all application
// message properties in the original message were captured in the fields
// above because limits were exceeded. The broker supports up to a total of
// 8KiB of application message properties.
// The broker supports up to a total of 8KiB of application message
// properties. If the total amount of application message properties in the
// message being traced exceeds this limit, this flag is used to indicate
// some properties were dropped.
bool dropped_application_message_properties = 37;

// If present, this indicates the message is being rejected to the publisher
Expand Down Expand Up @@ -333,13 +340,21 @@ message SpanData {
// destination, but it is not being enqueued due to the description.
optional string error_description = 4;

// This will never be set when there is not an error_desription present.
// If this is set, it indicates that all other non-errored enqueue events
// part of this span are rejected. In other words, regardless of what some
// enqueue events may indicate, the message is not enqueued to any
// destinations. When this is set, the span's error_description is always
// set and the span's error_description is always sent to the publisher in
// the rejection message.
// This flag being set on one or more enqueue events for a message implies
// that the message is not enqueued to any destination, regardless of the
// presence of successful enqueue events in the span.
//
// This will never be set when there is not an error_description present. If
// this is set, it indicates that the error described by error_description
// is a cause for the message to be rejected.
//
// Rejected non-transacted messages cause the message to be nacked to the
// publisher and rejected transacted messages result in a change in
// transacted session state that will cause a future commit attempt to fail.
//
// The cause for message rejection indicated to the client in either a
// message nack or commit failure response is the error_description of the
// first enqueue event that has this flag set.
bool rejects_all_enqueues = 5;
}
}
}
Loading