Skip to content

Commit

Permalink
[receiver/solacereceiver] Added configurable flow control interval, b…
Browse files Browse the repository at this point in the history
…aggage unmarshalling (open-telemetry#16570)

* Added Flow Control (open-telemetry#1614)

* Added new configuration for backpressure

SOL-79626, SOL-79630

* Added handling for delayed retry in receiver

SOL-79627, SOL-79631

* Added new flow control metrics to aid in flow control tuning

SOL-79628, SOL-79629, SOL-79632

* Added flowcontrol info to readme

Also fixed some linting errors

SOL-80997

* Updated protobuf spec to contain baggage string, added handling+tests for baggage (open-telemetry#1615)

SOL-79113, SOL-79114, SOL-79115

* Require an upgrade when receiving a telemetry message with an unsupported topic

Only applies to messages starting with _telemetry

SOL-81980

* Change stability from Alpha to Beta as configuration is now stable

SOL-81470

* Added changelog entries

* Fixed linting failure in tests for unchecked error

* Updated protobuf spec with latest comments
  • Loading branch information
mcardy authored and shalper2 committed Dec 6, 2022
1 parent f290c79 commit 3a570e0
Show file tree
Hide file tree
Showing 16 changed files with 891 additions and 371 deletions.
16 changes: 16 additions & 0 deletions .chloggen/solace-baggage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: solacereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added baggage unmarshalling support (introduced in Solace PubSub+ Event Broker 10.2.1)

# One or more tracking issues related to the change
issues: [16570]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
16 changes: 16 additions & 0 deletions .chloggen/solace-flowctrl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: solacereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added configurable retry interval for flow control scenarios

# One or more tracking issues related to the change
issues: [16570]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
7 changes: 5 additions & 2 deletions receiver/solacereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

| Status | |
|--------------------------|-----------|
| Stability | [alpha] |
| Stability | [beta] |
| Supported pipeline types | traces |
| Distributions | [contrib] |

Expand Down Expand Up @@ -47,6 +47,9 @@ The configuration parameters are:
- username (The username to use; required for sasl_xauth2 authentication)
- bearer (The bearer token in plain text; required for sasl_xauth2 authentication)
- sasl_external (SASL External required to be used for TLS client cert authentication. When this authentication type is chosen then tls cert_file and key_file are required)
- flow_control (Configures the behaviour to use when temporary errors are encountered from the next component)
- delayed_retry (Default flow control strategy. Sets the flow control strategy to delayed retry which will wait before trying to push the message to the next component again)
- delay (The delay, e.g. 10ms, to wait before retrying. Default is 10ms)

### Examples:
Simple single node configuration with SASL plain authentication (TLS enabled by default)
Expand Down Expand Up @@ -92,5 +95,5 @@ service:
receivers: [solace/primary,solace/backup]
```

[alpha]:https://github.com/open-telemetry/opentelemetry-collector#alpha
[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
28 changes: 24 additions & 4 deletions receiver/solacereceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package solacereceiver // import "github.com/open-telemetry/opentelemetry-collec
import (
"errors"
"strings"
"time"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtls"
Expand All @@ -28,10 +29,12 @@ const (
)

var (
errMissingAuthDetails = errors.New("authentication details are required, either for plain user name password or XOAUTH2 or client certificate")
errMissingQueueName = errors.New("queue definition is required, queue definition has format queue://<queuename>")
errMissingPlainTextParams = errors.New("missing plain text auth params: Username, Password")
errMissingXauth2Params = errors.New("missing xauth2 text auth params: Username, Bearer")
errMissingAuthDetails = errors.New("authentication details are required, either for plain user name password or XOAUTH2 or client certificate")
errMissingQueueName = errors.New("queue definition is required, queue definition has format queue://<queuename>")
errMissingPlainTextParams = errors.New("missing plain text auth params: Username, Password")
errMissingXauth2Params = errors.New("missing xauth2 text auth params: Username, Bearer")
errMissingFlowControl = errors.New("missing flow control configuration: DelayedRetry must be selected")
errInvalidDelayedRetryDelay = errors.New("delayed_retry.delay must > 0")
)

// Config defines configuration for Solace receiver.
Expand All @@ -49,6 +52,8 @@ type Config struct {
TLS configtls.TLSClientSetting `mapstructure:"tls,omitempty"`

Auth Authentication `mapstructure:"auth"`

Flow FlowControl `mapstructure:"flow_control"`
}

// Validate checks the receiver configuration is valid
Expand All @@ -59,6 +64,11 @@ func (cfg *Config) Validate() error {
if len(strings.TrimSpace(cfg.Queue)) == 0 {
return errMissingQueueName
}
if cfg.Flow.DelayedRetry == nil {
return errMissingFlowControl
} else if cfg.Flow.DelayedRetry.Delay <= 0 {
return errInvalidDelayedRetryDelay
}
return nil
}

Expand All @@ -84,3 +94,13 @@ type SaslXAuth2Config struct {
// SaslExternalConfig defines the configuration for the SASL External used in conjunction with TLS client authentication.
type SaslExternalConfig struct {
}

// FlowControl defines the configuration for what to do in backpressure scenarios, e.g. memorylimiter errors
type FlowControl struct {
DelayedRetry *FlowControlDelayedRetry `mapstructure:"delayed_retry"`
}

// FlowControlDelayedRetry represents the strategy of waiting for a defined amount of time (in time.Duration) and attempt redelivery
type FlowControlDelayedRetry struct {
Delay time.Duration `mapstructure:"delay"`
}
27 changes: 27 additions & 0 deletions receiver/solacereceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package solacereceiver // import "github.com/open-telemetry/opentelemetry-collec
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -54,6 +55,11 @@ func TestLoadConfig(t *testing.T) {
Insecure: false,
InsecureSkipVerify: false,
},
Flow: FlowControl{
DelayedRetry: &FlowControlDelayedRetry{
Delay: 1 * time.Second,
},
},
},
},
{
Expand Down Expand Up @@ -99,6 +105,27 @@ func TestConfigValidateMissingQueue(t *testing.T) {
assert.Equal(t, errMissingQueueName, err)
}

func TestConfigValidateMissingFlowControl(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Queue = "someQueue"
cfg.Auth.PlainText = &SaslPlainTextConfig{"Username", "Password"}
// this should never happen in reality, test validation anyway
cfg.Flow.DelayedRetry = nil
err := cfg.Validate()
assert.Equal(t, errMissingFlowControl, err)
}

func TestConfigValidateInvalidFlowControlDelayedRetryDelay(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Queue = "someQueue"
cfg.Auth.PlainText = &SaslPlainTextConfig{"Username", "Password"}
cfg.Flow.DelayedRetry = &FlowControlDelayedRetry{
Delay: -30 * time.Second,
}
err := cfg.Validate()
assert.Equal(t, errInvalidDelayedRetryDelay, err)
}

func TestConfigValidateSuccess(t *testing.T) {
successCases := map[string]func(*Config){
"With Plaintext Auth": func(c *Config) {
Expand Down
6 changes: 6 additions & 0 deletions receiver/solacereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package solacereceiver // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -54,6 +55,11 @@ func createDefaultConfig() component.Config {
InsecureSkipVerify: false,
Insecure: false,
},
Flow: FlowControl{
DelayedRetry: &FlowControlDelayedRetry{
Delay: 10 * time.Millisecond,
},
},
}
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/solacereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
go.opentelemetry.io/collector/confmap v0.0.0-20221201172708-2bdff61fa52a
go.opentelemetry.io/collector/consumer v0.66.1-0.20221202005155-1c54042beb70
go.opentelemetry.io/collector/pdata v0.66.1-0.20221202005155-1c54042beb70
go.opentelemetry.io/otel v1.11.1
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.28.1
Expand All @@ -45,7 +46,6 @@ require (
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/featuregate v0.66.1-0.20221202005155-1c54042beb70 // indirect
go.opentelemetry.io/otel v1.11.1 // indirect
go.opentelemetry.io/otel/metric v0.33.0 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
61 changes: 38 additions & 23 deletions receiver/solacereceiver/model/receive_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@ syntax = "proto3";

package solace.messaging.proto.broker.trace.receive.v1;

// Version 1.0
// A message will be compatible with this specification if its topic matches:
// _telemetry/broker/trace/receive/v1[/additional/topic/levels]
//
// Messages with a topic of matching the following topic contain a v1.x
// specification of this message.
// #telemetry/broker/trace/receive/v1[/<additional/topic/levels]
// Note that the specification allows for additional topic levels to be added
// in the future. Receiving clients must not assume there are no additional
// topic levels.
//
// The versioning strategy follows semantic versioning such that all v1.x
// specifications are compatible with each other.
// Note that the topic above allows for additional topic levels to be added in
// the future. Receiving clients must not assume there are no additional topic
// levels.
//
// This message describes telemetry data that a Solace PubSub+ broker captures
// when a received message is identified as a message to be traced.
Expand All @@ -26,7 +21,7 @@ package solace.messaging.proto.broker.trace.receive.v1;
// Special priority is given to fields that can be repeated.
// - Field numbers 16+ are used for other attributes.
//
// Next available field ID: 39
// Next available field ID: 40
//
message SpanData {

Expand All @@ -40,8 +35,18 @@ message SpanData {
// If not present, this is a root span. If present, this is an 8-byte span ID
// of the parent span.
optional bytes parent_span_id = 16;

// tracestate string value, as per
// <https://www.w3.org/TR/trace-context/#tracestate-header>
optional string trace_state = 17;

// A baggage string formatted as described here:
// https://www.w3.org/TR/baggage/#x3-2-1-1-baggage-string
// This string may be truncated if the complete string, as received, would
// cause the broker's limit for application message properties to be exceeded.
// See dropped_application_message_properties for more details.
optional string baggage = 39;

// The start and end timestamps of the receive span. The start of the span is
// when Guaranteed Messaging processing begins in the broker.
sfixed64 start_time_unix_nano = 3;
Expand Down Expand Up @@ -151,13 +156,15 @@ message SpanData {
map<string, UserPropertyValue> user_properties = 14;

// Application message properties refers to the collection of:
// * trace_state
// * application_message_id
// * correlation_id
// * baggage
// * user_properties
// If dropped_application_message_properties is true, not all application
// message properties in the original message were captured in the fields
// above because limits were exceeded. The broker supports up to a total of
// 8KiB of application message properties.
// The broker supports up to a total of 8KiB of application message
// properties. If the total amount of application message properties in the
// message being traced exceeds this limit, this flag is used to indicate
// some properties were dropped.
bool dropped_application_message_properties = 37;

// If present, this indicates the message is being rejected to the publisher
Expand Down Expand Up @@ -333,13 +340,21 @@ message SpanData {
// destination, but it is not being enqueued due to the description.
optional string error_description = 4;

// This will never be set when there is not an error_desription present.
// If this is set, it indicates that all other non-errored enqueue events
// part of this span are rejected. In other words, regardless of what some
// enqueue events may indicate, the message is not enqueued to any
// destinations. When this is set, the span's error_description is always
// set and the span's error_description is always sent to the publisher in
// the rejection message.
// This flag being set on one or more enqueue events for a message implies
// that the message is not enqueued to any destination, regardless of the
// presence of successful enqueue events in the span.
//
// This will never be set when there is not an error_description present. If
// this is set, it indicates that the error described by error_description
// is a cause for the message to be rejected.
//
// Rejected non-transacted messages cause the message to be nacked to the
// publisher and rejected transacted messages result in a change in
// transacted session state that will cause a future commit attempt to fail.
//
// The cause for message rejection indicated to the client in either a
// message nack or commit failure response is the error_description of the
// first enqueue event that has this flag set.
bool rejects_all_enqueues = 5;
}
}
}
Loading

0 comments on commit 3a570e0

Please sign in to comment.