Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added message serialization capability to AMQP. #4587

Merged
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ add_subdirectory(sdk/storage)
add_subdirectory(sdk/template)

if(ENABLE_AZURE_CORE_AMQP)
#messaging sdks need AMQP
add_subdirectory(sdk/eventhubs)
# Messaging sdks need AMQP
add_subdirectory(sdk/eventhubs)
endif( )

if(BUILD_SAMPLES)
add_subdirectory(samples/integration/vcpkg-all-smoke)
endif()
2 changes: 1 addition & 1 deletion sdk/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ endif()

# AMQP is an optional feature, don't include it unless explicitly enabled.
if (ENABLE_AZURE_CORE_AMQP)
add_subdirectory(azure-core-amqp)
add_subdirectory(azure-core-amqp)
endif()

if (BUILD_PERFORMANCE_TESTS)
Expand Down
1 change: 1 addition & 0 deletions sdk/core/azure-core-amqp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ set (AZURE_CORE_AMQP_HEADER
inc/azure/core/amqp/models/amqp_message.hpp
inc/azure/core/amqp/models/amqp_properties.hpp
inc/azure/core/amqp/models/amqp_value.hpp
inc/azure/core/amqp/models/amqp_protocol.hpp
inc/azure/core/amqp/models/message_source.hpp
inc/azure/core/amqp/models/message_target.hpp
inc/azure/core/amqp/models/messaging_values.hpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstdint>
#include <exception>
#include <stdexcept>
#include <vector>

struct HEADER_INSTANCE_TAG;

Expand All @@ -30,6 +31,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
MessageHeader() = default;
~MessageHeader() = default;

bool operator==(MessageHeader const&) const noexcept;
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved

/** @brief True if the message is considered "durable"
*
* @remarks For more information, see [AMQP
Expand Down Expand Up @@ -65,21 +68,28 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/

std::uint32_t DeliveryCount{0};

bool ShouldSerialize() const noexcept;
static size_t GetSerializedSize(MessageHeader const& messageHeader);
static std::vector<uint8_t> Serialize(MessageHeader const& messageHeader);
static MessageHeader Deserialize(std::uint8_t const* data, size_t size);
};
std::ostream& operator<<(std::ostream&, MessageHeader const&);

}}}} // namespace Azure::Core::Amqp::Models

namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _internal {
/**
* @brief uAMQP interoperability functions to convert a MessageHeader to a uAMQP HEADER_HANDLE and
* back.
* @brief uAMQP interoperability functions to convert a MessageHeader to a uAMQP HEADER_HANDLE
* and back.
*
* @remarks This class should not be used directly. It is used by the uAMQP interoperability
* layer.
*/
class MessageHeaderFactory {
public:
struct MessageHeaderFactory
{
static MessageHeader FromUamqp(UniqueMessageHeaderHandle const& properties);
static UniqueMessageHeaderHandle ToUamqp(MessageHeader const& properties);
};

}}}}} // namespace Azure::Core::Amqp::Models::_internal
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
class AmqpMessageFactory;
}

constexpr int AmqpMessageFormatValue = 0; // Specifies the message format for an AMQP message.

class AmqpMessage final {
public:
/** @brief Construct a new AMQP Message object. */
Expand All @@ -60,6 +62,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
/** @brief Move an AMQP message object to another object. @returns A reference to this.*/
AmqpMessage& operator=(AmqpMessage&&) noexcept = default;

bool operator==(AmqpMessage const& other) const noexcept;

AmqpMessage(std::nullptr_t) : m_hasValue{false} {}
operator bool() const noexcept { return m_hasValue; }

Expand All @@ -70,13 +74,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
MessageHeader Header;

/** @brief Specifies the 'format' of the message.
*
* For more information, see [AMQP Transfer
* performative](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-transfer)
*/
Azure::Nullable<uint32_t> MessageFormat;

/** @brief Delivery Annotations for the message.
*
* For more information, see [AMQP Delivery
Expand Down Expand Up @@ -123,21 +120,42 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
MessageBodyType BodyType{MessageBodyType::None};

/** @brief Set the body of the message.
/** @brief Sets the body of the message to a list of sequence sections.
*
* An AMQP Message Body can be one of the following formats:
*
* - One or more binary data sections
* - One or more sequence sections.
* - A single AMQP Value.
*
* This method sets the body of the message to a sequence of sections. See [Amqp
* This method appends the bodySequence value to the sequence of sections. See [Amqp
* Sequence](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-sequence)
* for more information.
*
* @param bodySequence - the list of AMQP values which make up the body of the message.
*
*/
void SetBody(std::vector<AmqpList> const& bodySequence);

/** @brief Appends a list to the body of the message.
*
* An AMQP Message Body can be one of the following formats:
*
* - One or more binary data sections
* - One or more sequence sections.
* - A single AMQP Value.
*
* This method appends the bodySequence value to the sequence of sections. See [Amqp
* Sequence](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-sequence)
* for more information.
*
* @param bodySequence - the list of AMQP values which make up the body of the message.
*
* @remarks This is a convenience method to make it simpler to append a single binary value to
* the message body.
*
*
*/
void SetBody(AmqpList const& bodySequence);

/** @brief Set the body of the message.
Expand All @@ -158,7 +176,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
void SetBody(std::vector<AmqpBinaryData> const& bodyBinarySequence);

/** @brief Set the body of the message.
/** @brief Appends a binary value to the body of the message.
*
* An AMQP Message Body can be one of the following formats:
*
Expand All @@ -172,7 +190,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*
* @param bodyBinary - a single value binary data.
*
* @remarks This is a convenience method to make it simpler to set a single binary value for
* @remarks This is a convenience method to make it simpler to append a single binary value to
* the message body.
*
*/
Expand All @@ -195,11 +213,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
void SetBody(AmqpValue const& bodyValue);

/** @brief Returns an Amqp Sequence message body.
/** @brief Returns a list of Amqp Sequence values.
*
* @remarks This API will fail if BodyType is not MessageBodyType::Sequence.
*/
AmqpList GetBodyAsAmqpList() const;
std::vector<AmqpList> GetBodyAsAmqpList() const;

/** @brief Returns an Amqp Value message body.
*
Expand All @@ -209,15 +227,33 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {

/** @brief Returns an Amqp Binary message body.
*
* @remarks This API will fail if BodyType is not MessageBodyType::Data.
* @remarks This API will fail if BodyType is not MessageBodyType::Binary.
*/
std::vector<AmqpBinaryData> GetBodyAsBinary() const;

/** @brief Returns the serialized size of the message.
*
* @remarks This API will fail if BodyType is not set.
*/
static size_t GetSerializedSize(AmqpMessage const& message);

/** @brief Serialize the message into a buffer.
*
* @remarks This API will fail if BodyType is not set.
*/
static std::vector<uint8_t> Serialize(AmqpMessage const& message);

/** @brief Deserialize the message from a buffer.
*
* @remarks This API will fail if BodyType is not set.
*/
static AmqpMessage Deserialize(std::uint8_t const* buffer, size_t size);

friend class _internal::AmqpMessageFactory;

private:
std::vector<AmqpBinaryData> m_binaryDataBody;
AmqpList m_amqpSequenceBody;
std::vector<AmqpList> m_amqpSequenceBody;
AmqpValue m_amqpValueBody;
bool m_hasValue{true}; // By default, an AmqpMessage has a value.
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
Azure::Nullable<std::string> GroupId;
Azure::Nullable<uint32_t> GroupSequence;
Azure::Nullable<std::string> ReplyToGroupId;

bool operator==(MessageProperties const&) const noexcept;
bool ShouldSerialize() const noexcept;

static size_t GetSerializedSize(MessageProperties const& properties);
static std::vector<uint8_t> Serialize(MessageProperties const& properties);
static MessageProperties Deserialize(uint8_t const* data, size_t size);
};
std::ostream& operator<<(std::ostream&, MessageProperties const&);
}}}} // namespace Azure::Core::Amqp::Models
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT

//
// This file contains protocol level definitions for the AMQP protocol.

#pragma once

#include <cstdint>

struct AMQPVALUE_DECODER_HANDLE_DATA_TAG;

template <> struct Azure::Core::_internal::UniqueHandleHelper<AMQPVALUE_DECODER_HANDLE_DATA_TAG>
{
static void FreeAmqpDecoder(AMQPVALUE_DECODER_HANDLE_DATA_TAG* obj);

using type = Azure::Core::_internal::
BasicUniqueHandle<AMQPVALUE_DECODER_HANDLE_DATA_TAG, FreeAmqpDecoder>;
};

namespace Azure { namespace Core { namespace Amqp { namespace _detail {
using UniqueAmqpDecoderHandle
= Azure::Core::_internal::UniqueHandleHelper<AMQPVALUE_DECODER_HANDLE_DATA_TAG>::type;

/** @brief AMQP Descriptor values. Note that the AMQP descriptor is technically a tuple of
* domain+id, the domain for internal-to-amqp is defined to be 0x00000000.
*
* See [AMQP Descriptor
* values](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#section-descriptor-values)
* for more information.
*/
enum class AmqpDescriptors : std::int64_t
{
// AMQP Performative descriptors.
Error = 0x1d,
Open = 0x10,
Begin = 0x11,
Attach = 0x12,
Flow = 0x13,
Transfer = 0x14,
Disposition = 0x15,
Detach = 0x16,
End = 0x17,
Close = 0x18,

// Message Dispositions.
Received = 0x23,
Accepted = 0x24,
Rejected = 0x25,
Released = 0x26,
Modified = 0x27,

// Terminus related descriptors.
Source = 0x28,
Target = 0x29,

// AMQP Sasl descriptors.
SaslMechanism = 0x40,
SaslInit = 0x41,
SaslChallenge = 0x42,
SaslResponse = 0x43,
SaslOutcome = 0x44,

// Message related descriptors.
Header = 0x70,
DeliveryAnnotations = 0x71,
MessageAnnotations = 0x72,
Properties = 0x73,
ApplicationProperties = 0x74,
DataBinary = 0x75,
DataAmqpSequence = 0x76,
DataAmqpValue = 0x77,
Footer = 0x78,
};

}}}} // namespace Azure::Core::Amqp::_detail
Loading