Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/solacereceiver] Updates Solace Receiver for compatibility with Solace PubSub+ Event Broker 10.2.0 #15244

Merged
merged 23 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0eea1f9
Use modify message instead of reject message to indicate failure
mcardy Aug 23, 2022
089a30d
Set delivery mode on propagated spans
mcardy Aug 23, 2022
db40111
Added character type to user property mapping
mcardy Aug 23, 2022
df89a98
Changed dropped_user_properties to dropped_application_message_proper…
mcardy Aug 23, 2022
72f28e2
Updated protobuf spec, incorporated transactional changes
mcardy Aug 23, 2022
bff5a48
Scoped metrics to receiver instances based on instance name if present
mcardy Aug 24, 2022
bd03456
Made delivery mode mapped strings lowercase
mcardy Aug 25, 2022
ea21b59
Use string of length 1 instead of int for character value mapping
mcardy Aug 29, 2022
da76eab
Merge remote-tracking branch 'origin/main' into phase_1
mcardy Sep 21, 2022
4715c43
Gracefully handle enums beyond the current known set
mcardy Sep 27, 2022
48b9730
Use the slice type in the attribute map when mapping user properties
mcardy Sep 28, 2022
6aab7ff
Revert "Use the slice type in the attribute map when mapping user pro…
mcardy Sep 28, 2022
87dcd11
Change name of Enqueue Event in OpenTelemetry Spans to always be '<en…
mcardy Sep 29, 2022
081214f
Merge remote-tracking branch 'origin/main' into phase_1
mcardy Sep 29, 2022
d3b1f29
Merge remote-tracking branch 'origin/main' into phase_1
mcardy Sep 30, 2022
a2af875
Handle the case of an empty payload, returning an error
mcardy Oct 11, 2022
579d8a6
Merge remote-tracking branch 'origin/main' into phase_1
mcardy Oct 11, 2022
bf61326
Fixed an issue causing AcceptMessage to hang indefinitely
mcardy Oct 14, 2022
5a0b445
Merge remote-tracking branch 'origin/main' into phase_1
mcardy Oct 14, 2022
2dfb861
Revert "Fixed an issue causing AcceptMessage to hang indefinitely"
mcardy Oct 14, 2022
ac35627
Added changelog for Solace Receiver changes
mcardy Oct 18, 2022
a2642c6
Moved changelog to .chloggen directory
mcardy Oct 18, 2022
62fe712
Fixed lint errors on tests
mcardy Oct 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/solacereceiver-improvements.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: solacereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Updates Solace Receiver with a variety of improvements and fixes for compatibility with Solace PubSub+ Event Broker 10.2.0

# One or more tracking issues related to the change
issues: [15244]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
33 changes: 33 additions & 0 deletions receiver/solacereceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -84,6 +86,37 @@ func TestCreateTracesReceiverBadConfigIncompleteAuth(t *testing.T) {
assert.Equal(t, errMissingPlainTextParams, err)
}

func TestCreateTracesReceiverBadMetrics(t *testing.T) {
// register a metric first with the same name
statName := "solacereceiver/primary/failed_reconnections"
stat := stats.Int64(statName, "", stats.UnitDimensionless)
err := view.Register(&view.View{
Name: buildReceiverCustomMetricName(statName),
Description: "some description",
Measure: stat,
Aggregation: view.Sum(),
})
require.NoError(t, err)

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(config.NewComponentIDWithName(componentType, "primary").String())
require.NoError(t, err)
require.NoError(t, config.UnmarshalReceiver(sub, cfg))

receiver, err := factory.CreateTracesReceiver(
context.Background(),
componenttest.NewNopReceiverCreateSettings(),
cfg,
consumertest.NewNop(),
)
assert.Error(t, err)
assert.Nil(t, receiver)
}

func getTestNopFactories(t *testing.T) component.Factories {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)
Expand Down
10 changes: 5 additions & 5 deletions receiver/solacereceiver/messaging_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type messagingService interface {
dial() error
close(ctx context.Context)
receiveMessage(ctx context.Context) (*inboundMessage, error)
ack(ctx context.Context, msg *inboundMessage) error
nack(ctx context.Context, msg *inboundMessage) error
accept(ctx context.Context, msg *inboundMessage) error
failed(ctx context.Context, msg *inboundMessage) error
}

// messagingServiceFactory is a factory to create new messagingService instances
Expand Down Expand Up @@ -175,12 +175,12 @@ func (m *amqpMessagingService) receiveMessage(ctx context.Context) (*inboundMess
return m.receiver.Receive(ctx)
}

func (m *amqpMessagingService) ack(ctx context.Context, msg *inboundMessage) error {
func (m *amqpMessagingService) accept(ctx context.Context, msg *inboundMessage) error {
return m.receiver.AcceptMessage(ctx, msg)
}

func (m *amqpMessagingService) nack(ctx context.Context, msg *inboundMessage) error {
return m.receiver.RejectMessage(ctx, msg, nil)
func (m *amqpMessagingService) failed(ctx context.Context, msg *inboundMessage) error {
return m.receiver.ModifyMessage(ctx, msg, true, false, nil)
}

// Allow for substitution in testing to assert correct data is passed to AMQP
Expand Down
10 changes: 5 additions & 5 deletions receiver/solacereceiver/messaging_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,28 +364,28 @@ func TestAMQPAcknowledgeMessage(t *testing.T) {
close(writeCalled)
return len(b), nil
}
err = service.ack(context.Background(), msg)
err = service.accept(context.Background(), msg)
assert.NoError(t, err)
assertChannelClosed(t, writeCalled)
closeMockedAMQPService(t, service, conn)
}

func TestAMQPRejectMessage(t *testing.T) {
func TestAMQPModifyMessage(t *testing.T) {
service, conn := startMockedService(t)
conn.nextData <- []byte(amqpHelloWorldMsg)
msg, err := service.receiveMessage(context.Background())
assert.NoError(t, err)
writeCalled := make(chan struct{})
// Expected reject from AMQP frame for first received message
// Expected modify from AMQP frame for first received message
// "\x00\x00\x00\x1c\x02\x00\x00\x00\x00\x53\x15\xd0\x00\x00\x00\x0c\x00\x00\x00\x05\x41\x43\x40\x41\x00\x53\x25\x45"
conn.writeHandle = func(b []byte) (n int, err error) {
// assert that a disposition is written
assert.Equal(t, byte(0x15), b[10])
assert.Equal(t, byte(0x25), b[26]) // 0x25 at the 27th byte in this case means reject
assert.Equal(t, byte(0x27), b[26]) // 0x27 at the 27th byte in this case means modify
close(writeCalled)
return len(b), nil
}
err = service.nack(context.Background(), msg)
err = service.failed(context.Background(), msg)
assert.NoError(t, err)
select {
case <-writeCalled:
Expand Down
67 changes: 52 additions & 15 deletions receiver/solacereceiver/model/receive_v1.proto
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,17 @@ message SpanData {
// NON_PERSISTENT when it is enqueued.
DeliveryMode delivery_mode = 19;

// The receiving broker's router-name and message-vpn-name.
//
// These fields may be removed in the future. If they are removed, this will
// not result in a major version change since the initial specification makes
// no promise to provide them.
// The receiving broker's router-name at the time the message was received.
string router_name = 20;

// The receiving broker's message-vpn name. This field may be removed in the
// future without a major version change since the field is specified as
// optional.
//
// Rather than rely on them, receiving clients should use an SMF API to
// extract the PEER_ROUTER_NAME and VPN_NAME_IN_USE from the API's Session
// object.
optional string router_name = 20;
// Rather than rely on this field, receiving clients should obtain the VPN
// by using an SMF API to extract the VPN_NAME_IN_USE from the API's Session
// object. The message_vpn_name of all messages received from via an SMF
// API's session will match the session's VPN_NAME_IN_USE.
optional string message_vpn_name = 21;

// The receiving broker's SolOS version when the message was initially
Expand Down Expand Up @@ -149,10 +150,15 @@ message SpanData {
// captured.
map<string, UserPropertyValue> user_properties = 14;

// If true, not all user properties in the original message were captured in
// the user_properties map because limits were exceeded. The broker supports
// up to a total of 8KiB of user property data.
bool dropped_user_properties = 37;
// Application message properties refers to the collection of:
// * application_message_id
// * correlation_id
// * user_properties
// If dropped_application_message_properties is true, not all application
// message properties in the original message were captured in the fields
// above because limits were exceeded. The broker supports up to a total of
// 8KiB of application message properties.
bool dropped_application_message_properties = 37;

// If present, this indicates the message is being rejected to the publisher
// and matches the error string provided back to the publisher as an error.
Expand Down Expand Up @@ -238,17 +244,48 @@ message SpanData {
message TransactionEvent {
sfixed64 time_unix_nano = 1;
enum Type {
// COMMIT and ROLLBACK are always initiated by either a CLIENT or ADMIN.
// The initiator is ADMIN when the management interface is used to
// to perform a heuristic commit or rollback.
COMMIT = 0;
ROLLBACK = 1;
// PREPARE and END can only occur with a CLIENT initiator, and spans for
// these operations are only generated if the operation fails. Therefore,
// the error_description of the TransactionEvent will always be present
// for END and PREPARE.
END = 2;
PREPARE = 3;

// The initiator of a SESSION_TIMEOUT is always BROKER. All messages
// received as part of the transaction are discarded.
SESSION_TIMEOUT = 4;

// The initiator of ROLLBACK_ONLY is always BROKER. The first such event
// in a transaction always has an error_description in the span,
// indicating there was a problem processing the message when it was
// received, and the message is being discarded. This also transitions
// the transaction itself to a "rollback only" state, which causes
// all subsequent messages received as part of the transaction to also
// be discarded. Spans generated by these subsequent discards will not
// have the span's error_description set, but all ROLLBACK_ONLY
// transaction events will have an error_description set, which indicate
// the transaction's error.
//
// Since the only record of these messages in the context of the
// transaction has been discarded, no further span can be generated in
// the context of a client, admin, or session timeout operation. When a
// subsequent operation such as rollback or commit occurs on a
// transaction marked rollback only, only messages received prior to the
// error triggering the transition to rollback only will generate
// receive spans.
ROLLBACK_ONLY = 5;
}
Type type = 2;

enum Initiator {
CLIENT = 0;
ADMIN = 1;
SESSION_TIMEOUT = 2;
BROKER = 2;
}
Initiator initiator = 3;

Expand Down Expand Up @@ -305,4 +342,4 @@ message SpanData {
// the rejection message.
bool rejects_all_enqueues = 5;
}
}
}
Loading