Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial creation of EventHubs service by @gearama #4755

Merged
merged 81 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
96eb81c
Sync eng/common directory with azure-sdk-tools for PR 5608 (#4411) (#…
LarryOsterman Mar 31, 2023
0726d84
Merge branch 'feature/amqp' of https://github.com/Azure/azure-sdk-for…
LarryOsterman Apr 3, 2023
881552e
Initial cut at AMQP protocol implementation. (#4498)
LarryOsterman Apr 3, 2023
9c07c88
Merge remote-tracking branch 'upstream/main' into feature/amqp
LarryOsterman Apr 11, 2023
290c2ae
Merge remote-tracking branch 'upstream/main' into feature/amqp
LarryOsterman Apr 11, 2023
cf02575
Merge remote-tracking branch 'upstream/main' into feature/amqp
LarryOsterman Apr 11, 2023
64182f7
Setup Eventhubs service (#4536)
gearama Apr 12, 2023
67c415b
Cleaned up AmqpValue, MessageProperties, MessageHeader and Message to…
LarryOsterman Apr 14, 2023
409b19e
Create AMQP Management APIs; significant cleanup of Impl types to rem…
LarryOsterman Apr 20, 2023
7133941
Added message serialization capability to AMQP. (#4587)
LarryOsterman Apr 28, 2023
4d0efd8
Merge remote-tracking branch 'upstream/main' into larryo/mergemaintoamqp
LarryOsterman Apr 28, 2023
2868b4a
Merge remote-tracking branch 'upstream/main' into feature/amqp
LarryOsterman May 10, 2023
ebf2d08
merge fixes
LarryOsterman May 10, 2023
50e1b53
more bad merge fixes
LarryOsterman May 10, 2023
87cf868
even more bad merge fixes
LarryOsterman May 10, 2023
08c4f68
Merge remote-tracking branch 'upstream/main' into feature/amqp
LarryOsterman May 16, 2023
892bac8
Merge main to feature/amqp starit up merge (#4665)
gearama May 30, 2023
0600744
Merge remote-tracking branch 'upstream/main' into feature/amqp
LarryOsterman Jun 22, 2023
5b283f7
merge fixes
LarryOsterman Jun 22, 2023
ce9fa23
More merge fixes
LarryOsterman Jun 22, 2023
917d1f7
more merge fixes 2
LarryOsterman Jun 22, 2023
268086e
more merge fixes 3
LarryOsterman Jun 22, 2023
b0fb657
more merge fixes 4
LarryOsterman Jun 22, 2023
68801c9
more merge fixes 5
LarryOsterman Jun 22, 2023
dfcd8db
[feature branch] Update eventhubs unit test CMakeLists.txt to remove …
ahsonkhan Jun 26, 2023
2056201
Merge remote-tracking branch 'upstream/main' into feature/amqp
LarryOsterman Jul 5, 2023
2821bb6
Copied over EventHubs implementation from gearama/amqp_gearama2
LarryOsterman Jul 6, 2023
6c2e161
Removed some unused files
LarryOsterman Jul 6, 2023
6ec09fe
Created CHANGELOG
LarryOsterman Jul 6, 2023
f436ffd
Doxygen fixes
LarryOsterman Jul 6, 2023
ef09341
Mac fixes
LarryOsterman Jul 6, 2023
2477985
Fixed documentation location
LarryOsterman Jul 6, 2023
916447e
More mac fixes
LarryOsterman Jul 6, 2023
e0e9080
More mac fixes 2
LarryOsterman Jul 6, 2023
e806696
More mac fixes 3
LarryOsterman Jul 6, 2023
355a0e4
More mac fixes 3
LarryOsterman Jul 6, 2023
7825e0e
clang-format
LarryOsterman Jul 6, 2023
8848b76
clang-format 2
LarryOsterman Jul 6, 2023
f3a55da
Line coverage at 22% not 24% :(
LarryOsterman Jul 6, 2023
b27e473
Renamed messaging-eventhubs back to azure-messaging-eventhubs
LarryOsterman Jul 6, 2023
cdd036a
updated ci trigger filter
LarryOsterman Jul 6, 2023
9f9929e
Fixed path to eng/pipelines
LarryOsterman Jul 6, 2023
efcea81
gcc9 fix
LarryOsterman Jul 6, 2023
d5dff0f
clang-format 3
LarryOsterman Jul 6, 2023
8644848
code review
LarryOsterman Jul 6, 2023
5d220d3
Removed all the changes for the eventhub_reader_sample.cpp
LarryOsterman Jul 6, 2023
510dcd2
clang-format
LarryOsterman Jul 6, 2023
dab3669
doxygen fix
LarryOsterman Jul 6, 2023
0d908e4
clang-format
LarryOsterman Jul 6, 2023
9ff7cd0
Started moving options out of Models namespace
LarryOsterman Jul 7, 2023
87bca38
clang-format
LarryOsterman Jul 7, 2023
6b36f3e
update vcpkg baseline
LarryOsterman Jul 7, 2023
ea2f28e
cspell fixes; mac build fixes
LarryOsterman Jul 7, 2023
7aeb5b9
clang-format
LarryOsterman Jul 7, 2023
90d3490
clang-format
LarryOsterman Jul 7, 2023
4d8afbb
clang-format again
LarryOsterman Jul 7, 2023
6742135
More model type fixes
LarryOsterman Jul 7, 2023
acfcb2c
Context goes at the end of the parameters, not the middle
LarryOsterman Jul 7, 2023
c45f7d1
More model type fixes; CreateXxx not NewXxx
LarryOsterman Jul 7, 2023
6f8eae5
cspell
LarryOsterman Jul 7, 2023
d3246c5
clang-format
LarryOsterman Jul 7, 2023
f211ae5
diagnostics for checkpoint store test; doxygen fix
LarryOsterman Jul 7, 2023
f089378
fixed azure-core vcpkg loading
LarryOsterman Jul 7, 2023
a46442c
clang-format
LarryOsterman Jul 7, 2023
8dbfd4f
minor method cleanup; fixed includes for apiview
LarryOsterman Jul 7, 2023
f604b98
Added cmaketestoptions to storage
LarryOsterman Jul 8, 2023
69e5e62
Backed out storage change because it failed worse than expected
LarryOsterman Jul 8, 2023
279cb6a
clang-format
LarryOsterman Jul 10, 2023
80acc8c
clang-format;doxygen
LarryOsterman Jul 10, 2023
e11f642
Merge branch 'main' into larryo/createeventhubs
LarryOsterman Jul 10, 2023
3f40909
Pull request feedback
LarryOsterman Jul 12, 2023
043c010
live test fixes
LarryOsterman Jul 12, 2023
24b5358
pull request feedback
LarryOsterman Jul 12, 2023
25b1517
cmake fix
LarryOsterman Jul 12, 2023
754331e
unit test fixes; doxygen fixes
LarryOsterman Jul 12, 2023
ebb1d6c
clang fix
LarryOsterman Jul 12, 2023
3f1c51b
clang fix 2
LarryOsterman Jul 12, 2023
e529228
mac fix
LarryOsterman Jul 12, 2023
ee728af
clang-format
LarryOsterman Jul 12, 2023
cadb33e
Merge branch 'main' into larryo/createeventhubs
LarryOsterman Jul 13, 2023
05f512e
Fixed copyright and license text
LarryOsterman Jul 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved

Initial Release.

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
],
"additionalCompilerSwitches": [
],
"allowInternal": true,
"includeInternal": false,
"allowInternal": false,
"includeDetail": false,
"includePrivate": false,
"filterNamespace": "Azure::Messaging",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@
#include "azure/messaging/eventhubs/processor.hpp"
#include "azure/messaging/eventhubs/processor_load_balancer.hpp"
#include "azure/messaging/eventhubs/producer_client.hpp"
#include "azure/messaging/eventhubs/retry_operation.hpp"
#include "azure/messaging/eventhubs/rtti.hpp"
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,8 @@
#include <vector>

namespace Azure { namespace Messaging { namespace EventHubs {
/**@brief ListCheckpointsOptions contains optional parameters for the ListCheckpoints
* function
*/
struct ListCheckpointsOptions
{
// For future expansion
};

/**@brief ListOwnershipOptions contains optional parameters for the ListOwnership function
*/
struct ListOwnershipOptions
{
// For future expansion
};

/**@brief UpdateCheckpointOptions contains optional parameters for the UpdateCheckpoint
* function
*/
struct UpdateCheckpointOptions
{
// For future expansion
};

/**@brief ClaimOwnershipOptions contains optional parameters for the ClaimOwnership function
*/
struct ClaimOwnershipOptions
{
// For future expansion
};

/**@brief CheckpointStore is used by multiple consumers to coordinate progress and ownership for
/**@brief CheckpointStore is used by multiple consumers to coordinate progress and ownership for
* partitions.
*/
class CheckpointStore {
Expand All @@ -62,12 +33,10 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*/
virtual std::vector<Models::Ownership> ClaimOwnership(
std::vector<Models::Ownership> partitionOwnership,
ClaimOwnershipOptions const& options = {},
Azure::Core::Context ctx = {})
Core::Context const& context = {})
{
(void)partitionOwnership;
(void)ctx;
(void)options;
(void)context;
throw std::runtime_error("Not Implemented");
}

Expand All @@ -77,14 +46,12 @@ namespace Azure { namespace Messaging { namespace EventHubs {
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
ListCheckpointsOptions options = {},
Azure::Core::Context ctx = {})
Core::Context const& context = {})
{
(void)fullyQualifiedNamespace;
(void)consumerGroup;
(void)eventHubName;
(void)ctx;
(void)options;
(void)context;
throw std::runtime_error("Not Implemented");
}

Expand All @@ -94,27 +61,23 @@ namespace Azure { namespace Messaging { namespace EventHubs {
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
ListOwnershipOptions options = {},
Azure::Core::Context ctx = {})
Core::Context const& context = {})
{
(void)fullyQualifiedNamespace;
(void)eventHubName;
(void)consumerGroup;
(void)ctx;
(void)options;
(void)context;
throw std::runtime_error("Not Implemented");
}

/**@brief UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
*/
virtual void UpdateCheckpoint(
Models::Checkpoint const& checkpoint,
UpdateCheckpointOptions options = {},
Azure::Core::Context ctx = {})
Core::Context const& context = {})
{
(void)checkpoint;
(void)ctx;
(void)options;
(void)context;
throw std::runtime_error("Not Implemented");
}

Expand All @@ -124,7 +87,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
/** @brief BlobCheckpointStore is an implementation of a CheckpointStore backed by Azure Blob
* Storage.
*/
class BlobCheckpointStore : public CheckpointStore {
class BlobCheckpointStore final : public CheckpointStore {

std::string m_connectionString;
std::string m_containerName;
Expand All @@ -144,7 +107,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
std::string const& blobName,
Azure::Storage::Metadata const& metadata,
Azure::ETag const& etag,
Azure::Core::Context const& context = Azure::Core::Context());
Core::Context const& context = {});

public:
/** @brief Construct a BlobCheckpointStore from another BlobCheckpointStore.
Expand All @@ -171,30 +134,26 @@ namespace Azure { namespace Messaging { namespace EventHubs {

std::vector<Models::Ownership> ClaimOwnership(
std::vector<Models::Ownership> partitionOwnership,
ClaimOwnershipOptions const& options = {},
Azure::Core::Context ctx = {}) override;
Core::Context const& context = {}) override;

std::vector<Models::Checkpoint> ListCheckpoints(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
ListCheckpointsOptions options = {},
Azure::Core::Context ctx = {}) override;
Core::Context const& context = {}) override;

/**@brief ListOwnership lists all ownerships.
*/
std::vector<Models::Ownership> ListOwnership(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
ListOwnershipOptions options = {},
Azure::Core::Context ctx = {}) override;
Core::Context const& context = {}) override;

/**@brief UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
*/
void UpdateCheckpoint(
Models::Checkpoint const& checkpoint,
UpdateCheckpointOptions options = {},
Azure::Core::Context ctx = {}) override;
Core::Context const& context = {}) override;
};
}}} // namespace Azure::Messaging::EventHubs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
namespace Azure { namespace Messaging { namespace EventHubs {
/**@brief Contains options for the ConsumerClient creation
*/
struct ConsumerClientOptions
struct ConsumerClientOptions final
{
/**@brief ApplicationID is used as the identifier when setting the User-Agent property.
*/
Expand All @@ -34,65 +34,39 @@ namespace Azure { namespace Messaging { namespace EventHubs {
Azure::Core::Amqp::_internal::MessageReceiverOptions ReceiverOptions{};
};

/**@brief Contains credentials for the ConsumerClient creation
/**
* @brief The ConsumerClient class is a high level class used to consume events from an Event Hub.
*
* @details The ConsumerClient class uses a #Azure::Messaging::EventHubs::PartitionClient to
* receive events from a specific partition of an Event Hub. The
* #Azure::Messaging::EventHubs::PartitionClient is created by the #ConsumerClient and is
* available via the NewPartitionClient method. The ConsumerClient is also responsible for
* managing the connection to the Event Hub and will reconnect as necessary.
*/
struct ConsumerClientCreds
{
class ConsumerClient final {
/// The connection string for the Event Hubs namespace
std::string ConnectionString;
std::string m_connectionString;

/// the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net)
std::string HostName;
std::string m_hostName;

/// The name of the Event Hub
std::string EventHub;
std::string m_eventHub;

/// The name of the consumer group
std::string ConsumerGroup;
std::string m_consumerGroup;

/// Credentials to be used to authenticate the client.
std::shared_ptr<Core::Credentials::TokenCredential> Credential{};
std::shared_ptr<Core::Credentials::TokenCredential> m_credential{};
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved

/// The URL to the Event Hubs namespace
std::string HostUrl{};
};

/**@brief GetEventHubPropertiesOptions contains optional parameters for the GetEventHubProperties
* function
*/
struct GetEventHubPropertiesOptions
{
// For future expansion
};

/**@brief GetPartitionPropertiesOptions contains optional parameters for the
* GetPartitionProperties function
*/
struct GetPartitionPropertiesOptions
{
// For future expansion
};
std::string m_hostUrl{};
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief The ConsumerClient class is a high level class used to consume events from an Event Hub.
*
* @details The ConsumerClient class uses a #Azure::Messaging::EventHubs::PartitionClient to
* receive events from a specific partition of an Event Hub. The
* #Azure::Messaging::EventHubs::PartitionClient is created by the #ConsumerClient and is
* available via the NewPartitionClient method. The ConsumerClient is also responsible for
* managing the connection to the Event Hub and will reconnect as necessary.
*/
class ConsumerClient {

protected:
/// @brief The message receivers used to receive messages for a given partition.
std::map<std::string, Azure::Core::Amqp::_internal::MessageReceiver> m_receivers{};
/// @brief The AMQP Sessions used to receive messages for a given partition.
std::map<std::string, Azure::Core::Amqp::_internal::Session> m_sessions{};

/// @brief the credentials used to connect to the event hub.
ConsumerClientCreds m_credentials;

/// @brief The options used to configure the consumer client.
ConsumerClientOptions m_consumerClientOptions;

Expand All @@ -107,31 +81,31 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*
* @returns Event hub name for client
*/
std::string const& GetEventHubName() { return m_credentials.EventHub; }
std::string const& GetEventHubName() const { return m_eventHub; }

/** @brief Getter for consumer group name
*
* @returns Consumer group name for client
*/
std::string const& GetConsumerGroup() { return m_credentials.ConsumerGroup; }
std::string const& GetConsumerGroup() const { return m_consumerGroup; }

/** @brief Getter FQDN
*
* @returns FQDN client
*/
std::string const& GetHostName() { return m_credentials.HostName; }
std::string const& GetHostName() const { return m_hostName; }

/** @brief Getter for client id
*
* @returns Clientid for client
*/
std::string const& GetClientId() { return m_consumerClientOptions.ApplicationID; }
std::string const& GetClientId() const { return m_consumerClientOptions.ApplicationID; }

/** @brief Getter for client details
*
* @returns Client details for client
*/
Models::ConsumerClientDetails GetDetails()
Models::ConsumerClientDetails GetDetails() const
{
Models::ConsumerClientDetails details;
details.ClientID = GetClientId();
Expand Down Expand Up @@ -170,6 +144,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
std::string const& eventHub = {},
std::string const& consumerGroup = _detail::DefaultConsumerGroup,
ConsumerClientOptions const& options = {});

/** @brief creates a ConsumerClient from a token credential.
*
* @param fullyQualifiedNamespace fully qualified namespace name (e.g.
Expand Down Expand Up @@ -201,7 +176,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*
* @param options Additional options for getting partition properties
*/
Models::EventHubProperties GetEventHubProperties(GetEventHubPropertiesOptions options = {});
Models::EventHubProperties GetEventHubProperties(Core::Context const& context = {});

/**@brief GetPartitionProperties gets properties for a specific partition. This includes data
* like the last enqueued sequence number, the first sequence number and when an event was last
Expand All @@ -212,6 +187,6 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*/
Models::EventHubPartitionProperties GetPartitionProperties(
std::string const& partitionID,
GetPartitionPropertiesOptions options = {});
Core::Context const& context = {});
};
}}} // namespace Azure::Messaging::EventHubs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
* @remark If both PartitionKey and PartitionID are nil, Event Hubs will choose an arbitrary
* partition for any events in this [EventDataBatch].
*/
struct EventDataBatchOptions
struct EventDataBatchOptions final
{

/** @brief MaxBytes overrides the max size (in bytes) for a batch.
Expand All @@ -48,7 +48,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
* [ProducerClient.CreateEventDataBatch], which will create them with the proper size limit for
* your Event Hub.
*/
class EventDataBatch {
class EventDataBatch final {
private:
const std::string anyPartitionId = "";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
constexpr const char* SequenceNumberAnnotation = "x-opt-sequence-number";
constexpr const char* OffsetNumberAnnotation = "x-opt-offset";
constexpr const char* EnqueuedTimeAnnotation = "x-opt-enqueued-time";
}}}} // namespace Azure::Messaging::EventHubs::_detail
}}}} // namespace Azure::Messaging::EventHubs::_detail
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {

/**@brief Contains options for the ConsumerClient creation
*/
struct ConsumerClientDetails
struct ConsumerClientDetails final
{
/**@brief The Fully Qualified Namespace that the Event Hub exists in.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
/** @brief The type of the body of an EventData Message.
*
*/
struct EventDataBody
struct EventDataBody final
{
/** @brief Value is encoded / decoded as the amqp - value section in the body.
*
Expand Down Expand Up @@ -61,6 +61,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
std::map<std::string, Azure::Core::Amqp::Models::AmqpValue> Properties;

EventData() = default;
virtual ~EventData() = default;
};

/** @brief Represents an event received from the Azure Event Hubs service.
Expand All @@ -69,7 +70,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
* specifically the date and time that the event was enqueued, the offset of the event data within
* the partition, and the partition key for sending a message to a partition.
*/
class ReceivedEventData : public EventData {
class ReceivedEventData final : public EventData {
public:
/** @brief The date and time that the event was enqueued.
*
Expand Down
Loading