Skip to content

Commit

Permalink
[receiver/solacereceiver] Updates Solace Receiver for compatibility w…
Browse files Browse the repository at this point in the history
…ith Solace PubSub+ Event Broker 10.2.0 (#15244)

* Use modify message instead of reject message to indicate failure

SOL-75279

* Set delivery mode on propagated spans

SOL-75196

* Added character type to user property mapping

SOL-75145

* Changed dropped_user_properties to dropped_application_message_properties

SOL-74920

* Updated protobuf spec, incorporated transactional changes

SOL-75362

* Scoped metrics to receiver instances based on instance name if present

SOL-75870, SOL-74853

* Made delivery mode mapped strings lowercase

SOL-75196

* Use string of length 1 instead of int for character value mapping

SOL-75145

* Gracefully handle enums beyond the current known set

This allows forward compatibility in the event that new entries are added in the future

SOL-78861

* Use the slice type in the attribute map when mapping user properties

OpenTelemetry+backends do not support the Bytes type, and instead support a slice of integers

SOL-78624

* Revert "Use the slice type in the attribute map when mapping user properties"

This reverts commit 48b9730.

* Change name of Enqueue Event in OpenTelemetry Spans to always be '<endpoint name> enqueue', even for anonymous endpoints

Also removes messaging.destination as an attribute on enqueue events as its now redundant

SOL-79098

* Handle the case of an empty payload, returning an error

In this case, the message will be dropped and stats incremented

SOL-79796

* Fixed an issue causing AcceptMessage to hang indefinitely

Working around Azure/go-amqp#126 by batching dispositions

SOL-79806

* Revert "Fixed an issue causing AcceptMessage to hang indefinitely"

This reverts commit bf61326.
  • Loading branch information
mcardy authored Oct 18, 2022
1 parent a0126a5 commit fa933d6
Show file tree
Hide file tree
Showing 12 changed files with 898 additions and 644 deletions.
16 changes: 16 additions & 0 deletions .chloggen/solacereceiver-improvements.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: solacereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Updates Solace Receiver with a variety of improvements and fixes for compatibility with Solace PubSub+ Event Broker 10.2.0

# One or more tracking issues related to the change
issues: [15244]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
33 changes: 33 additions & 0 deletions receiver/solacereceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -84,6 +86,37 @@ func TestCreateTracesReceiverBadConfigIncompleteAuth(t *testing.T) {
assert.Equal(t, errMissingPlainTextParams, err)
}

func TestCreateTracesReceiverBadMetrics(t *testing.T) {
// register a metric first with the same name
statName := "solacereceiver/primary/failed_reconnections"
stat := stats.Int64(statName, "", stats.UnitDimensionless)
err := view.Register(&view.View{
Name: buildReceiverCustomMetricName(statName),
Description: "some description",
Measure: stat,
Aggregation: view.Sum(),
})
require.NoError(t, err)

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(config.NewComponentIDWithName(componentType, "primary").String())
require.NoError(t, err)
require.NoError(t, config.UnmarshalReceiver(sub, cfg))

receiver, err := factory.CreateTracesReceiver(
context.Background(),
componenttest.NewNopReceiverCreateSettings(),
cfg,
consumertest.NewNop(),
)
assert.Error(t, err)
assert.Nil(t, receiver)
}

func getTestNopFactories(t *testing.T) component.Factories {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)
Expand Down
10 changes: 5 additions & 5 deletions receiver/solacereceiver/messaging_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type messagingService interface {
dial() error
close(ctx context.Context)
receiveMessage(ctx context.Context) (*inboundMessage, error)
ack(ctx context.Context, msg *inboundMessage) error
nack(ctx context.Context, msg *inboundMessage) error
accept(ctx context.Context, msg *inboundMessage) error
failed(ctx context.Context, msg *inboundMessage) error
}

// messagingServiceFactory is a factory to create new messagingService instances
Expand Down Expand Up @@ -175,12 +175,12 @@ func (m *amqpMessagingService) receiveMessage(ctx context.Context) (*inboundMess
return m.receiver.Receive(ctx)
}

func (m *amqpMessagingService) ack(ctx context.Context, msg *inboundMessage) error {
func (m *amqpMessagingService) accept(ctx context.Context, msg *inboundMessage) error {
return m.receiver.AcceptMessage(ctx, msg)
}

func (m *amqpMessagingService) nack(ctx context.Context, msg *inboundMessage) error {
return m.receiver.RejectMessage(ctx, msg, nil)
func (m *amqpMessagingService) failed(ctx context.Context, msg *inboundMessage) error {
return m.receiver.ModifyMessage(ctx, msg, true, false, nil)
}

// Allow for substitution in testing to assert correct data is passed to AMQP
Expand Down
10 changes: 5 additions & 5 deletions receiver/solacereceiver/messaging_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,28 +364,28 @@ func TestAMQPAcknowledgeMessage(t *testing.T) {
close(writeCalled)
return len(b), nil
}
err = service.ack(context.Background(), msg)
err = service.accept(context.Background(), msg)
assert.NoError(t, err)
assertChannelClosed(t, writeCalled)
closeMockedAMQPService(t, service, conn)
}

func TestAMQPRejectMessage(t *testing.T) {
func TestAMQPModifyMessage(t *testing.T) {
service, conn := startMockedService(t)
conn.nextData <- []byte(amqpHelloWorldMsg)
msg, err := service.receiveMessage(context.Background())
assert.NoError(t, err)
writeCalled := make(chan struct{})
// Expected reject from AMQP frame for first received message
// Expected modify from AMQP frame for first received message
// "\x00\x00\x00\x1c\x02\x00\x00\x00\x00\x53\x15\xd0\x00\x00\x00\x0c\x00\x00\x00\x05\x41\x43\x40\x41\x00\x53\x25\x45"
conn.writeHandle = func(b []byte) (n int, err error) {
// assert that a disposition is written
assert.Equal(t, byte(0x15), b[10])
assert.Equal(t, byte(0x25), b[26]) // 0x25 at the 27th byte in this case means reject
assert.Equal(t, byte(0x27), b[26]) // 0x27 at the 27th byte in this case means modify
close(writeCalled)
return len(b), nil
}
err = service.nack(context.Background(), msg)
err = service.failed(context.Background(), msg)
assert.NoError(t, err)
select {
case <-writeCalled:
Expand Down
67 changes: 52 additions & 15 deletions receiver/solacereceiver/model/receive_v1.proto
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,17 @@ message SpanData {
// NON_PERSISTENT when it is enqueued.
DeliveryMode delivery_mode = 19;

// The receiving broker's router-name and message-vpn-name.
//
// These fields may be removed in the future. If they are removed, this will
// not result in a major version change since the initial specification makes
// no promise to provide them.
// The receiving broker's router-name at the time the message was received.
string router_name = 20;

// The receiving broker's message-vpn name. This field may be removed in the
// future without a major version change since the field is specified as
// optional.
//
// Rather than rely on them, receiving clients should use an SMF API to
// extract the PEER_ROUTER_NAME and VPN_NAME_IN_USE from the API's Session
// object.
optional string router_name = 20;
// Rather than rely on this field, receiving clients should obtain the VPN
// by using an SMF API to extract the VPN_NAME_IN_USE from the API's Session
// object. The message_vpn_name of all messages received from via an SMF
// API's session will match the session's VPN_NAME_IN_USE.
optional string message_vpn_name = 21;

// The receiving broker's SolOS version when the message was initially
Expand Down Expand Up @@ -149,10 +150,15 @@ message SpanData {
// captured.
map<string, UserPropertyValue> user_properties = 14;

// If true, not all user properties in the original message were captured in
// the user_properties map because limits were exceeded. The broker supports
// up to a total of 8KiB of user property data.
bool dropped_user_properties = 37;
// Application message properties refers to the collection of:
// * application_message_id
// * correlation_id
// * user_properties
// If dropped_application_message_properties is true, not all application
// message properties in the original message were captured in the fields
// above because limits were exceeded. The broker supports up to a total of
// 8KiB of application message properties.
bool dropped_application_message_properties = 37;

// If present, this indicates the message is being rejected to the publisher
// and matches the error string provided back to the publisher as an error.
Expand Down Expand Up @@ -238,17 +244,48 @@ message SpanData {
message TransactionEvent {
sfixed64 time_unix_nano = 1;
enum Type {
// COMMIT and ROLLBACK are always initiated by either a CLIENT or ADMIN.
// The initiator is ADMIN when the management interface is used to
// to perform a heuristic commit or rollback.
COMMIT = 0;
ROLLBACK = 1;
// PREPARE and END can only occur with a CLIENT initiator, and spans for
// these operations are only generated if the operation fails. Therefore,
// the error_description of the TransactionEvent will always be present
// for END and PREPARE.
END = 2;
PREPARE = 3;

// The initiator of a SESSION_TIMEOUT is always BROKER. All messages
// received as part of the transaction are discarded.
SESSION_TIMEOUT = 4;

// The initiator of ROLLBACK_ONLY is always BROKER. The first such event
// in a transaction always has an error_description in the span,
// indicating there was a problem processing the message when it was
// received, and the message is being discarded. This also transitions
// the transaction itself to a "rollback only" state, which causes
// all subsequent messages received as part of the transaction to also
// be discarded. Spans generated by these subsequent discards will not
// have the span's error_description set, but all ROLLBACK_ONLY
// transaction events will have an error_description set, which indicate
// the transaction's error.
//
// Since the only record of these messages in the context of the
// transaction has been discarded, no further span can be generated in
// the context of a client, admin, or session timeout operation. When a
// subsequent operation such as rollback or commit occurs on a
// transaction marked rollback only, only messages received prior to the
// error triggering the transition to rollback only will generate
// receive spans.
ROLLBACK_ONLY = 5;
}
Type type = 2;

enum Initiator {
CLIENT = 0;
ADMIN = 1;
SESSION_TIMEOUT = 2;
BROKER = 2;
}
Initiator initiator = 3;

Expand Down Expand Up @@ -305,4 +342,4 @@ message SpanData {
// the rejection message.
bool rejects_all_enqueues = 5;
}
}
}
Loading

0 comments on commit fa933d6

Please sign in to comment.