Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broker heartbeat as an optional parameter #188

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions src/Channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const std::string Channel::EXCHANGE_TYPE_DIRECT("direct");
const std::string Channel::EXCHANGE_TYPE_FANOUT("fanout");
const std::string Channel::EXCHANGE_TYPE_TOPIC("topic");

Channel::ptr_t Channel::CreateFromUri(const std::string &uri, int frame_max) {
Channel::ptr_t Channel::CreateFromUri(const std::string &uri, int frame_max, int broker_heartbeat) {
amqp_connection_info info;
amqp_default_connection_info(&info);

Expand All @@ -80,14 +80,14 @@ Channel::ptr_t Channel::CreateFromUri(const std::string &uri, int frame_max) {
}

return Create(std::string(info.host), info.port, std::string(info.user),
std::string(info.password), std::string(info.vhost), frame_max);
std::string(info.password), std::string(info.vhost), frame_max, broker_heartbeat);
}

Channel::ptr_t Channel::CreateSecureFromUri(
const std::string &uri, const std::string &path_to_ca_cert,
const std::string &path_to_client_key,
const std::string &path_to_client_cert, bool verify_hostname,
int frame_max) {
int frame_max, int broker_heartbeat) {
amqp_connection_info info;
amqp_default_connection_info(&info);

Expand All @@ -102,15 +102,16 @@ Channel::ptr_t Channel::CreateSecureFromUri(
return CreateSecure(path_to_ca_cert, std::string(info.host),
path_to_client_key, path_to_client_cert, info.port,
std::string(info.user), std::string(info.password),
std::string(info.vhost), frame_max, verify_hostname);
std::string(info.vhost), frame_max, verify_hostname,
broker_heartbeat);
}
throw std::runtime_error(
"CreateSecureFromUri only supports SSL-enabled URIs.");
}

Channel::Channel(const std::string &host, int port, const std::string &username,
const std::string &password, const std::string &vhost,
int frame_max)
int frame_max, int broker_heartbeat)
: m_impl(new Detail::ChannelImpl) {
m_impl->m_connection = amqp_new_connection();

Expand All @@ -123,7 +124,7 @@ Channel::Channel(const std::string &host, int port, const std::string &username,
int sock = amqp_socket_open(socket, host.c_str(), port);
m_impl->CheckForError(sock);

m_impl->DoLogin(username, password, vhost, frame_max);
m_impl->DoLogin(username, password, vhost, frame_max, broker_heartbeat);
} catch (...) {
amqp_destroy_connection(m_impl->m_connection);
throw;
Expand All @@ -135,7 +136,7 @@ Channel::Channel(const std::string &host, int port, const std::string &username,
#ifdef SAC_SSL_SUPPORT_ENABLED
Channel::Channel(const std::string &host, int port, const std::string &username,
const std::string &password, const std::string &vhost,
int frame_max, const SSLConnectionParams &ssl_params)
int frame_max, const SSLConnectionParams &ssl_params, int broker_heartbeat)
: m_impl(new Detail::ChannelImpl) {
m_impl->m_connection = amqp_new_connection();
if (NULL == m_impl->m_connection) {
Expand Down Expand Up @@ -178,7 +179,7 @@ Channel::Channel(const std::string &host, int port, const std::string &username,
status, "Error setting client certificate for socket");
}

m_impl->DoLogin(username, password, vhost, frame_max);
m_impl->DoLogin(username, password, vhost, frame_max, broker_heartbeat);
} catch (...) {
amqp_destroy_connection(m_impl->m_connection);
throw;
Expand All @@ -189,7 +190,7 @@ Channel::Channel(const std::string &host, int port, const std::string &username,
#else
Channel::Channel(const std::string &, int, const std::string &,
const std::string &, const std::string &, int,
const SSLConnectionParams &) {
const SSLConnectionParams &, int) {
throw std::logic_error(
"SSL support has not been compiled into SimpleAmqpClient");
}
Expand Down
6 changes: 2 additions & 4 deletions src/ChannelImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@

#include <string.h>

#define BROKER_HEARTBEAT 0

namespace AmqpClient {
namespace Detail {

Expand All @@ -64,7 +62,7 @@ ChannelImpl::~ChannelImpl() {}

void ChannelImpl::DoLogin(const std::string &username,
const std::string &password, const std::string &vhost,
int frame_max) {
int frame_max, int broker_heartbeat) {
amqp_table_entry_t capabilties[1];
amqp_table_entry_t capability_entry;
amqp_table_t client_properties;
Expand All @@ -84,7 +82,7 @@ void ChannelImpl::DoLogin(const std::string &username,

CheckRpcReply(
0, amqp_login_with_properties(m_connection, vhost.c_str(), 0, frame_max,
BROKER_HEARTBEAT, &client_properties,
broker_heartbeat, &client_properties,
AMQP_SASL_METHOD_PLAIN, username.c_str(),
password.c_str()));

Expand Down
21 changes: 13 additions & 8 deletions src/SimpleAmqpClient/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
static ptr_t Create(const std::string &host = "127.0.0.1", int port = 5672,
const std::string &username = "guest",
const std::string &password = "guest",
const std::string &vhost = "/", int frame_max = 131072) {
const std::string &vhost = "/", int frame_max = 131072,
int broker_heartbeat = 0) {
return boost::make_shared<Channel>(host, port, username, password, vhost,
frame_max);
frame_max, broker_heartbeat);
}

protected:
Expand Down Expand Up @@ -134,15 +135,16 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
const std::string &password = "guest",
const std::string &vhost = "/",
int frame_max = 131072,
bool verify_hostname = true) {
bool verify_hostname = true,
int broker_heartbeat = 0) {
SSLConnectionParams ssl_params;
ssl_params.path_to_ca_cert = path_to_ca_cert;
ssl_params.path_to_client_key = path_to_client_key;
ssl_params.path_to_client_cert = path_to_client_cert;
ssl_params.verify_hostname = verify_hostname;

return boost::make_shared<Channel>(host, port, username, password, vhost,
frame_max, ssl_params);
frame_max, ssl_params, broker_heartbeat);
}

/**
Expand All @@ -154,7 +156,7 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
* any frame to this value
* @returns a new Channel object
*/
static ptr_t CreateFromUri(const std::string &uri, int frame_max = 131072);
static ptr_t CreateFromUri(const std::string &uri, int frame_max = 131072, int broker_heartbeat = 0);

/**
* Create a new Channel object from an AMQP URI, secured with SSL.
Expand All @@ -177,16 +179,19 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
const std::string &path_to_client_key = "",
const std::string &path_to_client_cert = "",
bool verify_hostname = true,
int frame_max = 131072);
int frame_max = 131072,
int broker_heartbeat = 0);

explicit Channel(const std::string &host, int port,
const std::string &username, const std::string &password,
const std::string &vhost, int frame_max);
const std::string &vhost, int frame_max,
int broker_heartbeat);

explicit Channel(const std::string &host, int port,
const std::string &username, const std::string &password,
const std::string &vhost, int frame_max,
const SSLConnectionParams &ssl_params);
const SSLConnectionParams &ssl_params,
int broker_heartbeat);

public:
virtual ~Channel();
Expand Down
2 changes: 1 addition & 1 deletion src/SimpleAmqpClient/ChannelImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ChannelImpl : boost::noncopyable {
typedef channel_map_t::iterator channel_map_iterator_t;

void DoLogin(const std::string &username, const std::string &password,
const std::string &vhost, int frame_max);
const std::string &vhost, int frame_max, int broker_heartbeat);
amqp_channel_t GetChannel();
void ReturnChannel(amqp_channel_t channel);
bool IsChannelOpen(amqp_channel_t channel);
Expand Down