From 842938cdfdf4f57988d312fb1ea7494adec16734 Mon Sep 17 00:00:00 2001 From: Jose Antonio Santos Cadenas Date: Fri, 27 Nov 2015 12:29:07 +0100 Subject: [PATCH] Remove rabbitmq support We are not using them and it is not manained Change-Id: Ic701307c42190e0f994c59ce18cacf98f9041e39 --- CMakeLists.txt | 1 - debian/control | 3 +- kurento.conf.info | 8 - kurento.conf.ini | 8 - kurento.conf.json | 10 - server/transport/CMakeLists.txt | 6 +- server/transport/TransportFactory.cpp | 3 - server/transport/rabbitmq/CMakeLists.txt | 40 -- .../rabbitmq/ExponentialBackoffStrategy.cpp | 85 ---- .../rabbitmq/ExponentialBackoffStrategy.hpp | 49 --- .../transport/rabbitmq/RabbitMQConnection.cpp | 381 ------------------ .../transport/rabbitmq/RabbitMQConnection.hpp | 147 ------- .../rabbitmq/RabbitMQEventHandler.cpp | 72 ---- .../rabbitmq/RabbitMQEventHandler.hpp | 57 --- .../transport/rabbitmq/RabbitMQListener.cpp | 212 ---------- .../transport/rabbitmq/RabbitMQListener.hpp | 78 ---- .../transport/rabbitmq/RabbitMQPipeline.cpp | 213 ---------- .../transport/rabbitmq/RabbitMQPipeline.hpp | 63 --- .../rabbitmq/RabbitMQReconnectStrategy.hpp | 31 -- .../transport/rabbitmq/RabbitMQTransport.cpp | 187 --------- .../transport/rabbitmq/RabbitMQTransport.hpp | 65 --- .../rabbitmq/RabbitMQTransportFactory.cpp | 28 -- .../rabbitmq/RabbitMQTransportFactory.hpp | 41 -- 23 files changed, 2 insertions(+), 1786 deletions(-) delete mode 100644 server/transport/rabbitmq/CMakeLists.txt delete mode 100644 server/transport/rabbitmq/ExponentialBackoffStrategy.cpp delete mode 100644 server/transport/rabbitmq/ExponentialBackoffStrategy.hpp delete mode 100644 server/transport/rabbitmq/RabbitMQConnection.cpp delete mode 100644 server/transport/rabbitmq/RabbitMQConnection.hpp delete mode 100644 server/transport/rabbitmq/RabbitMQEventHandler.cpp delete mode 100644 server/transport/rabbitmq/RabbitMQEventHandler.hpp delete mode 100644 server/transport/rabbitmq/RabbitMQListener.cpp delete mode 100644 server/transport/rabbitmq/RabbitMQListener.hpp delete mode 100644 server/transport/rabbitmq/RabbitMQPipeline.cpp delete mode 100644 server/transport/rabbitmq/RabbitMQPipeline.hpp delete mode 100644 server/transport/rabbitmq/RabbitMQReconnectStrategy.hpp delete mode 100644 server/transport/rabbitmq/RabbitMQTransport.cpp delete mode 100644 server/transport/rabbitmq/RabbitMQTransport.hpp delete mode 100644 server/transport/rabbitmq/RabbitMQTransportFactory.cpp delete mode 100644 server/transport/rabbitmq/RabbitMQTransportFactory.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4301740f6..1ba021663 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,7 +41,6 @@ pkg_check_modules(GSTREAMER REQUIRED gstreamer-1.5) pkg_check_modules(GSTREAMER REQUIRED gstreamer-1.5>=1.7.0~0) pkg_check_modules(GSTREAMER_SDP REQUIRED gstreamer-sdp-1.5>=1.7.0~0) pkg_check_modules(EVENT REQUIRED libevent>=2.0.16-stable) -pkg_check_modules(RABBITMQ REQUIRED librabbitmq>=0.4.1) add_subdirectory(server) diff --git a/debian/control b/debian/control index 7962c6612..043103dc0 100644 --- a/debian/control +++ b/debian/control @@ -16,8 +16,7 @@ Build-Depends: libboost-thread-dev, libboost-log-dev, libevent-dev, - libssl-dev, - librabbitmq-dev + libssl-dev Standards-Version: 3.9.4 Package: kurento-media-server-6.0 diff --git a/kurento.conf.info b/kurento.conf.info index ba82621f3..d3f1b9ccd 100644 --- a/kurento.conf.info +++ b/kurento.conf.info @@ -23,13 +23,5 @@ mediaServer ; password "" ; } } -; rabbitmq -; { -; address 127.0.0.1 -; port 5672 -; username guest -; password guest -; vhost / -; } } } diff --git a/kurento.conf.ini b/kurento.conf.ini index 980911001..2b459d32f 100644 --- a/kurento.conf.ini +++ b/kurento.conf.ini @@ -19,11 +19,3 @@ threads=10 ; [mediaServer.net.websocket.registrar] ; address=ws://localhost:9090 ; localAddress=localhost - -; [mediaServer.net.rabbitmq] -; address=127.0.0.1 -; port=5672 -; username=guest -; password=guest -; vhost=/ - diff --git a/kurento.conf.json b/kurento.conf.json index bf1bfd846..e95469d17 100644 --- a/kurento.conf.json +++ b/kurento.conf.json @@ -9,16 +9,6 @@ "garbageCollectorPeriod": 240 }, "net" : { - // Uncomment just one of them - /* - "rabbitmq": { - "address" : "127.0.0.1", - "port" : 5672, - "username" : "guest", - "password" : "guest", - "vhost" : "/" - } - */ "websocket": { "port": 8888, //"secure": { diff --git a/server/transport/CMakeLists.txt b/server/transport/CMakeLists.txt index 7e737fbcc..09a7b1a5d 100644 --- a/server/transport/CMakeLists.txt +++ b/server/transport/CMakeLists.txt @@ -7,12 +7,11 @@ set (TRANSPORT_SOURCES add_library (transport ${TRANSPORT_SOURCES}) -add_dependencies(transport rabbitMQTransport) +add_dependencies(transport websocketTransport) target_link_libraries(transport ${GSTREAMER_LIBRARIES} ${KMSCORE_LIBRARIES} - rabbitMQTransport websocketTransport ) @@ -20,13 +19,10 @@ set_property (TARGET transport PROPERTY INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/server/signalHandler ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/rabbitmq/ ${CMAKE_CURRENT_SOURCE_DIR}/websocket/ ${GSTREAMER_INCLUDE_DIRS} ${GLIBMM_INCLUDE_DIRS} ${KMSCORE_INCLUDE_DIRS} ) -add_subdirectory(rabbitmq) - add_subdirectory(websocket) diff --git a/server/transport/TransportFactory.cpp b/server/transport/TransportFactory.cpp index 34d29ef62..2ef4dc6ae 100644 --- a/server/transport/TransportFactory.cpp +++ b/server/transport/TransportFactory.cpp @@ -20,7 +20,6 @@ GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); #define GST_DEFAULT_NAME "KurentoTransportFactory" -#include #include namespace kurento @@ -60,8 +59,6 @@ TransportFactory::StaticConstructor::StaticConstructor() { GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, GST_DEFAULT_NAME); - TransportFactory::registerFactory (std::shared_ptr - (new RabbitMQTransportFactory() ) ); TransportFactory::registerFactory (std::shared_ptr (new WebSocketTransportFactory() ) ); } diff --git a/server/transport/rabbitmq/CMakeLists.txt b/server/transport/rabbitmq/CMakeLists.txt deleted file mode 100644 index cfe53f374..000000000 --- a/server/transport/rabbitmq/CMakeLists.txt +++ /dev/null @@ -1,40 +0,0 @@ -set (RABBIT_MQ_TRANSPORT_SOURCES - RabbitMQTransport.cpp - RabbitMQTransport.hpp - RabbitMQPipeline.cpp - RabbitMQPipeline.hpp - RabbitMQListener.cpp - RabbitMQListener.hpp - RabbitMQEventHandler.cpp - RabbitMQEventHandler.hpp - RabbitMQConnection.cpp - RabbitMQConnection.hpp - RabbitMQTransportFactory.cpp - RabbitMQTransportFactory.hpp - RabbitMQReconnectStrategy.hpp - ExponentialBackoffStrategy.cpp - ExponentialBackoffStrategy.hpp -) - -add_library (rabbitMQTransport - ${RABBIT_MQ_TRANSPORT_SOURCES} -) - -add_dependencies(rabbitMQTransport signalHandler) - -target_link_libraries(rabbitMQTransport - ${GSTREAMER_LIBRARIES} - ${RABBITMQ_LIBRARIES} - ${EVENT_LIBRARIES} - signalHandler -) - -set_property (TARGET rabbitMQTransport - PROPERTY INCLUDE_DIRECTORIES - ${CMAKE_SOURCE_DIR}/server/transport - ${CMAKE_SOURCE_DIR}/server/signalHandler - ${GSTREAMER_INCLUDE_DIRS} - ${KMSCORE_INCLUDE_DIRS} - ${RABBITMQ_INCLUDE_DIRS} - ${EVENT_INCLUDE_DIRS} -) diff --git a/server/transport/rabbitmq/ExponentialBackoffStrategy.cpp b/server/transport/rabbitmq/ExponentialBackoffStrategy.cpp deleted file mode 100644 index 8085906fe..000000000 --- a/server/transport/rabbitmq/ExponentialBackoffStrategy.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#include -#include -#include -#include -#include -#include "ExponentialBackoffStrategy.hpp" - -#define DEFAULT_LIMIT (30 * 1000) /* seconds */ - -#define GST_CAT_DEFAULT kurento_rabbitmq_exponential_backoff_strategy -GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); -#define GST_DEFAULT_NAME "KurentoExponentialBackoffStrategy" - -namespace kurento -{ - -ExponentialBackoffStrategy::ExponentialBackoffStrategy (int max) -{ - if (limit > 0) { - limit = max; - } else { - limit = DEFAULT_LIMIT; - } - - /* initialize random seed */ - srand (time (NULL) ); - - reset (); -} - -void -ExponentialBackoffStrategy::reset () -{ - attemp = 1; -} - -int -ExponentialBackoffStrategy::getTimeout () -{ - int timeout, max; - - max = (int) ( (pow (2.0, attemp++) - 1) * 1000); - - /* generate a random interval of time between 0 and 2^attemp - 1 */ - timeout = rand() % max; - - if (timeout > limit) { - timeout = limit; - } - - return timeout; -} - -ExponentialBackoffStrategy::~ExponentialBackoffStrategy() -{ -} - -ExponentialBackoffStrategy::StaticConstructor -ExponentialBackoffStrategy::staticConstructor; - -ExponentialBackoffStrategy::StaticConstructor::StaticConstructor() -{ - GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, - GST_DEFAULT_NAME); -} - -} /* kurento */ - - - diff --git a/server/transport/rabbitmq/ExponentialBackoffStrategy.hpp b/server/transport/rabbitmq/ExponentialBackoffStrategy.hpp deleted file mode 100644 index 063eda7da..000000000 --- a/server/transport/rabbitmq/ExponentialBackoffStrategy.hpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#ifndef __EXPONENTIAL_BACKOFF_STRATEGY_HPP__ -#define __EXPONENTIAL_BACKOFF_STRATEGY_HPP__ - -#include "RabbitMQReconnectStrategy.hpp" - -namespace kurento -{ - -class ExponentialBackoffStrategy: public RabbitMQReconnectStrategy -{ -public: - ExponentialBackoffStrategy (int max); - virtual ~ExponentialBackoffStrategy(); - - virtual int getTimeout(); - virtual void reset(); - -private: - - int attemp; - int limit; - - class StaticConstructor - { - public: - StaticConstructor(); - }; - - static StaticConstructor staticConstructor; -}; - -} /* kurento */ - -#endif /* __EXPONENTIAL_BACKOFF_STRATEGY_HPP__ */ \ No newline at end of file diff --git a/server/transport/rabbitmq/RabbitMQConnection.cpp b/server/transport/rabbitmq/RabbitMQConnection.cpp deleted file mode 100644 index ca2cd4d50..000000000 --- a/server/transport/rabbitmq/RabbitMQConnection.cpp +++ /dev/null @@ -1,381 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#include -#include "RabbitMQConnection.hpp" -#include -#include -#include -#include - -#include -#include - -#define GST_CAT_DEFAULT kurento_rabbitmq_connection -GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); -#define GST_DEFAULT_NAME "KurentoRabbitMQConnection" - -namespace kurento -{ - -static void -exception_on_error (amqp_rpc_reply_t x, const char *context) -{ - std::string ctx = context; - - switch (x.reply_type) { - case AMQP_RESPONSE_NORMAL: - return; - - case AMQP_RESPONSE_NONE: - throw RabbitMQException (ctx + ": missing RPC reply type"); - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - if (AMQP_STATUS_TIMEOUT == x.library_error) { - throw RabbitMQTimeoutException (ctx + ": Tiemout exception: " + - amqp_error_string2 (x.library_error) ); - } else { - throw RabbitMQException (ctx + ": Library exception: " + - amqp_error_string2 (x.library_error) ); - } - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; - std::string message; - gchar *error_message; - - error_message = - g_strdup_printf ("%s: server connection error %d, message: %.*s\n", context, - m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); - - message = error_message; - g_free (error_message); - - throw RabbitMQException (message); - } - - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; - std::string message; - gchar *error_message; - - error_message = g_strdup_printf ("%s: server channel error %d, message: %.*s\n", - context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); - - message = error_message; - g_free (error_message); - - throw RabbitMQException (message); - break; - } - - default: - throw RabbitMQException (ctx + ": Channel Unknown Error"); - break; - } - - break; - } -} - -static void -makeSocketLinger (int fd) -{ - struct linger ling; - - ling.l_onoff = 1; - ling.l_linger = 30; - - if (setsockopt (fd, SOL_SOCKET, SO_LINGER, &ling, sizeof (ling) ) < 0) { - GST_WARNING ("Could not configure SO_LINGER option on RabbitMQ socket"); - } -} - -RabbitMQConnection::RabbitMQConnection (const std::string &address, int port) : - address (address), port (port) -{ - conn = amqp_new_connection(); - - socket = amqp_tcp_socket_new (conn); - - if (!socket) { - throw Glib::IOChannelError (Glib::IOChannelError::Code::FAILED, - "Cannot create TCP socket"); - } - - if (amqp_socket_open (socket, address.c_str(), port) ) { - throw Glib::IOChannelError (Glib::IOChannelError::Code::FAILED, - "Cannot open TCP socket"); - } - - makeSocketLinger (amqp_socket_get_sockfd (socket) ); - - exception_on_error (amqp_login (conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, - "guest", "guest"), "Loging in"); - amqp_channel_open (conn, 1); - exception_on_error (amqp_get_rpc_reply (conn), "Opening channel"); -} - -RabbitMQConnection::~RabbitMQConnection() -{ - int fd; - - if (conn == NULL) { - GST_DEBUG ("transport already stopped"); - return; - } - - fd = amqp_socket_get_sockfd (socket); - - /* Errors are ignored during close */ - if (!closeOnRelease) { - /* close socket */ - close (fd); - } - - amqp_channel_close (conn, 1, AMQP_REPLY_SUCCESS); - amqp_connection_close (conn, AMQP_REPLY_SUCCESS); - amqp_destroy_connection (conn); - - if (closeOnRelease) { - /* inform remote side that we are done */ - shutdown (fd, SHUT_WR); - } - - conn = NULL; -} - -int -RabbitMQConnection::getFd() -{ - return amqp_socket_get_sockfd (socket); -} - -void RabbitMQConnection::declareQueue (const std::string &queue_name, - bool durable, int ttl) -{ - amqp_bytes_t queue = amqp_cstring_bytes (queue_name.c_str() ); - amqp_table_entry_t entries[1]; - amqp_table_t table; - - table.entries = entries; - - if (ttl > 0) { - table.num_entries = 1; - - entries[0].key = amqp_cstring_bytes ("x-expires"); - entries[0].value.kind = AMQP_FIELD_KIND_I32; - entries[0].value.value.i32 = ttl; - } else { - table.num_entries = 0; - } - - amqp_queue_declare (conn, 1, - queue, /* passive */ false, durable, /* exclusive */ false, - /* autodelete */ false, table); - exception_on_error (amqp_get_rpc_reply (conn), "Declaring queue"); -} - -void RabbitMQConnection::deleteQueue (const std::string &queue_name, - bool ifUnused, bool ifEmpty) -{ - amqp_bytes_t queue = amqp_cstring_bytes (queue_name.c_str() ); - - amqp_queue_delete (conn, 1, queue, ifUnused, ifEmpty); -} - -void RabbitMQConnection::declareExchange (const std::string &exchange_name, - const std::string &type, bool durable, const int ttl) -{ - amqp_bytes_t exchange = amqp_cstring_bytes (exchange_name.c_str() ); - amqp_bytes_t exchange_type = amqp_cstring_bytes (type.c_str() ); - amqp_table_entry_t entries[1]; - amqp_table_t table; - - table.entries = entries; - - if (ttl > 0) { - table.num_entries = 1; - - entries[0].key = amqp_cstring_bytes ("x-expires"); - entries[0].value.kind = AMQP_FIELD_KIND_I32; - entries[0].value.value.i32 = ttl; - } else { - table.num_entries = 0; - } - - amqp_exchange_declare (conn, 1, exchange, exchange_type, - /* passive */ false, durable, table); - exception_on_error (amqp_get_rpc_reply (conn), "Declaring exchange"); -} - -void RabbitMQConnection::deleteExchange (const std::string &exchange_name, - bool ifUnused) -{ - amqp_bytes_t exchange = amqp_cstring_bytes (exchange_name.c_str() ); - - amqp_exchange_delete (conn, 1, exchange, ifUnused); -} - - -void -RabbitMQConnection::bindQueue (const std::string &queue_name, - const std::string &exchange_name) -{ - amqp_bytes_t queue = amqp_cstring_bytes (queue_name.c_str() ); - amqp_bytes_t exchange = amqp_cstring_bytes (exchange_name.c_str() ); - - amqp_queue_bind (conn, 1, queue, exchange, amqp_empty_bytes, - amqp_empty_table); - exception_on_error (amqp_get_rpc_reply (conn), "Binding queue"); -} - -void -RabbitMQConnection::consumeQueue (const std::string &queue_name, - const std::string &tag) -{ - amqp_bytes_t queue = amqp_cstring_bytes (queue_name.c_str() ); - amqp_bytes_t tag_id = amqp_cstring_bytes (tag.c_str() ); - - amqp_basic_consume (conn, 1, queue, tag_id, /* no_local */ false, - /* no_ack */ false, /* exclusive */ false, - amqp_empty_table); - exception_on_error (amqp_get_rpc_reply (conn), "Consuming"); -} - -void -RabbitMQConnection::readMessage (struct timeval *timeout, - std::function process) -{ - RabbitMQMessage message (shared_from_this () ); - - exception_on_error (amqp_consume_message (conn, &message.envelope, timeout, 0), - "Reading message"); - message.valid = true; - - try { - process (message); - } catch (...) { - GST_WARNING ("Error processing message"); - } -} - -void -RabbitMQConnection::sendReply (const amqp_envelope_t &envelope, - const amqp_bytes_t &reply) -{ - sendMessage (reply, amqp_empty_bytes, envelope.message.properties.reply_to, - envelope.message.properties.correlation_id); -} - -void -RabbitMQConnection::sendMessage (const std::string &message, - const std::string &exchange, const std::string &routingKey, - const std::string &correlationID) -{ - sendMessage (amqp_cstring_bytes (message.c_str() ), - amqp_cstring_bytes (exchange.c_str() ), - amqp_cstring_bytes (routingKey.c_str() ), - amqp_cstring_bytes (correlationID.c_str() ) ); -} - -void -RabbitMQConnection::sendMessage (const amqp_bytes_t &message, - const amqp_bytes_t &exchange, const amqp_bytes_t &routingKey, - const amqp_bytes_t &correlationID) -{ - amqp_basic_properties_t props; - int ret; - - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; - props.content_type = amqp_cstring_bytes ("text/plain"); - props.delivery_mode = 2; /* persistent delivery mode */ - - if (correlationID.len > 0) { - props._flags |= AMQP_BASIC_CORRELATION_ID_FLAG; - props.correlation_id = correlationID; - } - - ret = amqp_basic_publish (conn, 1, exchange, - routingKey, /* mandatory */ false, /* inmediate */ false, &props, message); - - if (ret != AMQP_STATUS_OK) { - GST_ERROR ("Error ret value: %d", ret); - } -} - -RabbitMQMessage::RabbitMQMessage (std::shared_ptr - connection) : connection (connection) -{ - -} - -RabbitMQMessage::~RabbitMQMessage() -{ - if (!acked && valid) { - GST_WARNING ("Rejecting message because it is not acked"); - amqp_basic_reject (connection->conn, 1, - envelope.delivery_tag, /* requeue */ true); - } - - amqp_destroy_envelope (&envelope); -} - -void -RabbitMQMessage::reply (std::shared_ptr< RabbitMQConnection > conn, - const std::string &response) -{ - conn->sendReply (envelope, amqp_cstring_bytes (response.c_str() ) ); -} - -void -RabbitMQMessage::reply (const std::string &response) -{ - reply (connection, response); -} - -std::string -RabbitMQMessage::getData() -{ - std::string data (reinterpret_cast - (envelope.message.body.bytes), envelope.message.body.len ); - - return data; -} - -void -RabbitMQMessage::ack() -{ - amqp_basic_ack (connection->conn, 1, - envelope.delivery_tag, /* multiple */ false); - - acked = true; -} - -RabbitMQConnection::StaticConstructor RabbitMQConnection::staticConstructor; - -RabbitMQConnection::StaticConstructor::StaticConstructor() -{ - GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, - GST_DEFAULT_NAME); -} - -const std::string RabbitMQConnection::EXCHANGE_TYPE_DIRECT ("direct"); -const std::string RabbitMQConnection::EXCHANGE_TYPE_FANOUT ("fanout"); -const std::string RabbitMQConnection::EXCHANGE_TYPE_TOPIC ("topic"); - -} /* kurento */ diff --git a/server/transport/rabbitmq/RabbitMQConnection.hpp b/server/transport/rabbitmq/RabbitMQConnection.hpp deleted file mode 100644 index c315a79eb..000000000 --- a/server/transport/rabbitmq/RabbitMQConnection.hpp +++ /dev/null @@ -1,147 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#ifndef __RABBITMQ_CONNECTION_HPP__ -#define __RABBITMQ_CONNECTION_HPP__ - -#include -#include -#include -#include -#include -#include - -namespace kurento -{ - -class RabbitMQMessage; - -class RabbitMQException : public std::exception -{ - -public: - RabbitMQException (std::string desc) : desc (desc) { - } - virtual ~RabbitMQException() {} - - virtual const char *what() const throw() { - return desc.c_str(); - } - -private: - std::string desc; -}; - -class RabbitMQTimeoutException : public RabbitMQException -{ - -public: - RabbitMQTimeoutException (std::string desc) : RabbitMQException (desc) { - } - virtual ~RabbitMQTimeoutException() {} -}; - -class RabbitMQConnection : public - std::enable_shared_from_this -{ -public: - RabbitMQConnection (const std::string &address, int port); - virtual ~RabbitMQConnection() throw (); - - std::string getAddress() { - return address; - } - - int getPort () { - return port; - } - - int getFd(); - void declareQueue (const std::string &queue_name, bool durable = false, - int ttl = -1); - void deleteQueue (const std::string &queue_name, bool ifUnused = false, - bool ifEmpty = false); - void declareExchange (const std::string &exchange_name, const std::string &type, - bool durable = false, const int ttl = -1); - void deleteExchange (const std::string &exchange_name, bool ifUnused = false); - void bindQueue (const std::string &queue_name, - const std::string &exchange_name); - void consumeQueue (const std::string &queue_name, const std::string &tag); - void readMessage (struct timeval *timeout, - std::function process); - void sendMessage (const std::string &message, const std::string &exchange, - const std::string &routingKey, const std::string &correlationID = ""); - - void noCloseOnRelease() { - closeOnRelease = false; - } - - static const std::string EXCHANGE_TYPE_DIRECT; - static const std::string EXCHANGE_TYPE_FANOUT; - static const std::string EXCHANGE_TYPE_TOPIC; -private: - - void sendMessage (const amqp_bytes_t &message, const amqp_bytes_t &exchange, - const amqp_bytes_t &routingKey, - const amqp_bytes_t &correlationID = amqp_empty_bytes); - void sendReply (const amqp_envelope_t &envelope, const amqp_bytes_t &message); - - std::string address; - int port; - - bool closeOnRelease = true; - amqp_connection_state_t conn; - amqp_socket_t *socket; - Glib::RefPtr source; - - class StaticConstructor - { - public: - StaticConstructor(); - }; - - static StaticConstructor staticConstructor; - - friend RabbitMQMessage; -}; - -class RabbitMQMessage -{ -public: - RabbitMQMessage (std::shared_ptr connection); - virtual ~RabbitMQMessage() throw (); - - void reply (const std::string &response); - void reply (std::shared_ptr conn, - const std::string &response); - std::string getData(); - void ack(); - void noRejectOnRelease() { - acked = true; - } - -private: - - bool acked = false; - bool valid = false; - amqp_envelope_t envelope; - std::shared_ptr connection; - - friend RabbitMQConnection; -}; - -} /* kurento */ - -#endif /* __RABBITMQ_CONNECTION_HPP__ */ diff --git a/server/transport/rabbitmq/RabbitMQEventHandler.cpp b/server/transport/rabbitmq/RabbitMQEventHandler.cpp deleted file mode 100644 index 411a885c7..000000000 --- a/server/transport/rabbitmq/RabbitMQEventHandler.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#include "RabbitMQEventHandler.hpp" -#include -#include - -#define GST_CAT_DEFAULT kurento_rabbitmq_event_handler -GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); -#define GST_DEFAULT_NAME "KurentoRabbitMQEventHandler" - -namespace kurento -{ - -RabbitMQEventHandler::RabbitMQEventHandler (std::shared_ptr< MediaObjectImpl > - obj, - const std::string &address, int port, - const std::string &exchange, const std::string &routingKey) : - EventHandler (obj), connection (address, port), exchange (exchange), - routingKey (routingKey) -{ -} - -RabbitMQEventHandler::~RabbitMQEventHandler() -{ -} - -void -RabbitMQEventHandler::sendEvent (Json::Value &value) -{ - try { - Json::FastWriter writer; - Json::Value rpc; - Json::Value event; - - event ["value"] = value; - - rpc [JSON_RPC_PROTO] = JSON_RPC_PROTO_VERSION; - rpc [JSON_RPC_METHOD] = "onEvent"; - rpc [JSON_RPC_PARAMS] = event; - - GST_DEBUG ("Sending event: %s -> %s", writer.write (rpc).c_str(), - exchange.c_str() ); - connection.sendMessage (writer.write (rpc), exchange, routingKey); - } catch (std::exception &e) { - GST_WARNING ("Error sending event to MediaHandler"); - } catch (...) { - GST_WARNING ("Error sending event to MediaHandler"); - } -} - -RabbitMQEventHandler::StaticConstructor::StaticConstructor() -{ - GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, - GST_DEFAULT_NAME); -} - -RabbitMQEventHandler::StaticConstructor RabbitMQEventHandler::staticConstructor; - -} /* kurento */ diff --git a/server/transport/rabbitmq/RabbitMQEventHandler.hpp b/server/transport/rabbitmq/RabbitMQEventHandler.hpp deleted file mode 100644 index af8dcd54c..000000000 --- a/server/transport/rabbitmq/RabbitMQEventHandler.hpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#ifndef __RABBITMQ_EVENT_HANDLER_HPP__ -#define __RABBITMQ_EVENT_HANDLER_HPP__ - -#include -#include "RabbitMQConnection.hpp" - -namespace kurento -{ - -class RabbitMQEventHandler : public EventHandler -{ -public: - RabbitMQEventHandler (std::shared_ptr obj, - const std::string &address, int port, - const std::string &exchange, - const std::string &routingKey); - - std::string getRoutingKey () { - return routingKey; - } - - virtual ~RabbitMQEventHandler(); - - virtual void sendEvent (Json::Value &value); - -private: - RabbitMQConnection connection; - std::string exchange; - std::string routingKey; - - class StaticConstructor - { - public: - StaticConstructor(); - }; - - static StaticConstructor staticConstructor; -}; - -} /* kurento */ - -#endif /* __RABBITMQ_EVENT_HANDLER_HPP__ */ diff --git a/server/transport/rabbitmq/RabbitMQListener.cpp b/server/transport/rabbitmq/RabbitMQListener.cpp deleted file mode 100644 index 94bdb97ac..000000000 --- a/server/transport/rabbitmq/RabbitMQListener.cpp +++ /dev/null @@ -1,212 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#include -#include "RabbitMQListener.hpp" -#include "ExponentialBackoffStrategy.hpp" -#include -#include - -#define GST_CAT_DEFAULT kurento_rabbitmq_listener -GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); -#define GST_DEFAULT_NAME "KurentoRabbitMQListener" - -/* This is included to avoid problems with slots and lamdas */ -#include -#include -#include - -#define MAX_TIMEOUT (30 * 1000) /* 30 seconds */ - -namespace sigc -{ -template -struct functor_trait { - typedef decltype (::sigc::mem_fun (std::declval (), - &Functor::operator() ) ) _intermediate; - - typedef typename _intermediate::result_type result_type; - typedef Functor functor_type; -}; -} - -namespace kurento -{ - -RabbitMQListener::RabbitMQListener () -{ - timeoutStrategy = std::shared_ptr - (new ExponentialBackoffStrategy (MAX_TIMEOUT) ); -}; - -RabbitMQListener::~RabbitMQListener() -{ - if (reconnectSrc) { - reconnectSrc->destroy(); - } -} - -void RabbitMQListener::setConfig (const std::string &address, int port) -{ - this->address = address; - this->port = port; -} - -void -RabbitMQListener::reconnect () -{ - int timeout; - - timeout = timeoutStrategy->getTimeout(); - - GST_DEBUG ("Reconnecting to RabbitMQ broker in %d ms.", timeout); - - reconnectSrc = Glib::TimeoutSource::create (timeout); - reconnectSrc->connect ( [this] () -> bool { - try { - connection = std::shared_ptr (new RabbitMQConnection ( - address, port) ); - } catch (Glib::IOChannelError &e) - { - reconnect (); - return false; - } - - timeoutStrategy->reset (); - - try { - eventReconnectHandler (); - } catch (std::bad_function_call &e) - { - /* No one is managing this event */ - } - - return false; - }); - - reconnectSrc->attach(); -} - -bool -RabbitMQListener::readMessages (Glib::IOCondition cond) -{ - if (cond & (Glib::IO_HUP | Glib::IO_NVAL | Glib::IO_ERR) ) { - goto error; - } - - readMessages(); - - return true; - -error: - - if (cond & Glib::IO_HUP) { - GST_DEBUG ("Connection hung up"); - reconnect (); - } - - return false; -} - -bool -RabbitMQListener::readMessages () -{ - struct timeval timeout; - - if (getpid () != pid) { - return false; - } - - timeout.tv_sec = 0; - timeout.tv_usec = 0; - - try { - connection->readMessage (&timeout, [this] (RabbitMQMessage & message) { - processMessage (message); - }); - } catch (RabbitMQTimeoutException &e) { - return false; - } catch (RabbitMQException &e) { - GST_ERROR ("%s", e.what() ); - } - - return true; -} - -void RabbitMQListener::listenQueue (const std::string &queue, bool durable, - int ttl) -{ - Glib::RefPtr idle; - int fd; - - if (connection == NULL) { - connection = std::shared_ptr (new RabbitMQConnection ( - address, port) ); - } - - connection->declareQueue (queue, durable, ttl); - connection->declareExchange (queue, - RabbitMQConnection::EXCHANGE_TYPE_DIRECT, durable, ttl); - connection->bindQueue (queue, queue); - connection->consumeQueue (queue, ""); - - fd = connection->getFd(); - - channel = Glib::IOChannel::create_from_fd (fd); - channel->set_close_on_unref (false); - channel->set_encoding (""); - channel->set_buffered (false); - - source = channel->create_watch (Glib::IO_IN | Glib::IO_HUP | Glib::IO_ERR | - Glib::IO_NVAL); - - // TODO: Investigate why std::bind cannot be used here - source->connect ([this] (Glib::IOCondition cond) -> bool { - return readMessages (cond); - }); - - source->attach (Glib::MainContext::get_default() ); - - /* Add idle source to read all message that are already in the queue, - * This is needed because the Channel callback is not being - * triggered when the data is already in the socket when the main loop - * starts */ - - idle = Glib::IdleSource::create(); - - idle->connect ([this] () -> bool { - return readMessages (); - }); - - /* Only the same process that creates the channel can read it */ - pid = getpid (); - idle->attach (); -} - -void RabbitMQListener::stopListen () -{ - channel->flush(); - source->destroy(); -} - -RabbitMQListener::StaticConstructor RabbitMQListener::staticConstructor; - -RabbitMQListener::StaticConstructor::StaticConstructor() -{ - GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, - GST_DEFAULT_NAME); -} - -} /* kurento */ diff --git a/server/transport/rabbitmq/RabbitMQListener.hpp b/server/transport/rabbitmq/RabbitMQListener.hpp deleted file mode 100644 index 1f3f234c7..000000000 --- a/server/transport/rabbitmq/RabbitMQListener.hpp +++ /dev/null @@ -1,78 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#ifndef __RABBITMQ_LISTENER_HPP__ -#define __RABBITMQ_LISTENER_HPP__ - -#include "RabbitMQConnection.hpp" -#include "RabbitMQReconnectStrategy.hpp" - -namespace kurento -{ - -class RabbitMQListener -{ -public: - RabbitMQListener (); - RabbitMQListener (const std::string &address, int port); - virtual ~RabbitMQListener() throw (); - - void setConfig (const std::string &address, int port); - void listenQueue (const std::string &queue, bool durable = false, int ttl = -1); - void stopListen (); - -protected: - virtual void processMessage (RabbitMQMessage &message) = 0; - - std::shared_ptr getConnection () { - return connection; - } - - void setEventReconnectHandler (std::function < void ( void ) > e) { - eventReconnectHandler = e; - } - -private: - bool readMessages (Glib::IOCondition cond); - bool readMessages (); - void reconnect (); - - Glib::RefPtr channel; - std::shared_ptr connection; - Glib::RefPtr source; - - Glib::RefPtr reconnectSrc; - - std::string address; - int port; - - int pid = 0; - - std::shared_ptr timeoutStrategy; - - std::function eventReconnectHandler; - - class StaticConstructor - { - public: - StaticConstructor(); - }; - - static StaticConstructor staticConstructor; -}; - -} /* kurento */ - -#endif /* __RABBITMQ_LISTENER_HPP__ */ diff --git a/server/transport/rabbitmq/RabbitMQPipeline.cpp b/server/transport/rabbitmq/RabbitMQPipeline.cpp deleted file mode 100644 index cb3479eee..000000000 --- a/server/transport/rabbitmq/RabbitMQPipeline.cpp +++ /dev/null @@ -1,213 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#include -#include "RabbitMQPipeline.hpp" -#include "RabbitMQEventHandler.hpp" - -#include -#include - -#include -#include -#include - -#define GST_CAT_DEFAULT kurento_rabbitmq_pipeline -GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); -#define GST_DEFAULT_NAME "KurentoRabbitMQPipeline" - -/* This is included to avoid problems with slots and lamdas */ -#include -#include -#include -namespace sigc -{ -template -struct functor_trait { - typedef decltype (::sigc::mem_fun (std::declval (), - &Functor::operator() ) ) _intermediate; - - typedef typename _intermediate::result_type result_type; - typedef Functor functor_type; -}; -} - -// #define PIPELINE_QUEUE_PREFIX "media_pipeline_" -#define PIPELINE_QUEUE_PREFIX "" -#define EVENT_EXCHANGE_PREFIX "event_" - -#define PIPELINE_QUEUE_TTL 240000 - -namespace kurento -{ - -static std::string -generateUUID() -{ - std::stringstream ss; - boost::uuids::uuid uuid = boost::uuids::random_generator() (); - - ss << uuid; - return ss.str(); -} - -RabbitMQPipeline::RabbitMQPipeline (const boost::property_tree::ptree &config, - const std::string &address, const int port, - std::shared_ptr processor) : processor (processor) -{ - setConfig (address, port); - - MediaSet::getMediaSet()->signalEmpty.connect ([] () { - GST_INFO ("Mediaset is empty, exiting from child process"); - kill (getpid(), SIGINT); - }); - - processor->setEventSubscriptionHandler (std::bind ( - &RabbitMQPipeline::processSubscription, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3, std::placeholders::_4) ); - - setEventReconnectHandler (std::bind (&RabbitMQPipeline::reconnect, this) ); -} - -RabbitMQPipeline::~RabbitMQPipeline() -{ - stopListen(); - - if (!pipelineId.empty() ) { - getConnection()->deleteQueue (PIPELINE_QUEUE_PREFIX + pipelineId); - getConnection()->deleteExchange (PIPELINE_QUEUE_PREFIX + pipelineId); - getConnection()->deleteExchange (EVENT_EXCHANGE_PREFIX + pipelineId); - } -} - -void -RabbitMQPipeline::processMessage (RabbitMQMessage &message) -{ - std::string data = message.getData(); - std::string response; - - GST_DEBUG ("Message: >%s<", data.c_str() ); - processor->process (data, response); - GST_DEBUG ("Response: >%s<", response.c_str() ); - - message.ack(); - message.reply (response); -} - -void -RabbitMQPipeline::reconnect () -{ - GST_DEBUG ("Reconnected"); - listenQueue (PIPELINE_QUEUE_PREFIX + pipelineId, false, PIPELINE_QUEUE_TTL); - getConnection()->declareExchange (EVENT_EXCHANGE_PREFIX + pipelineId, - RabbitMQConnection::EXCHANGE_TYPE_FANOUT, - false, PIPELINE_QUEUE_TTL); -} - -void -RabbitMQPipeline::startRequest (RabbitMQMessage &message) -{ - Json::Value responseJson; - Json::Reader reader; - std::string response; - std::string request = message.getData(); - - GST_DEBUG ("Message: >%s<", request.c_str() ); - processor->process (request, response); - - reader.parse (response, responseJson); - message.ack(); - - if (responseJson.isObject() && responseJson.isMember ("result") - && responseJson["result"].isObject() - && responseJson["result"].isMember ("value") ) { - pipelineId = responseJson["result"]["value"].asString(); - - listenQueue (PIPELINE_QUEUE_PREFIX + pipelineId, false, PIPELINE_QUEUE_TTL); - getConnection()->declareExchange (EVENT_EXCHANGE_PREFIX + pipelineId, - RabbitMQConnection::EXCHANGE_TYPE_FANOUT, - false, PIPELINE_QUEUE_TTL); - } - - message.reply (getConnection(), response); - - GST_DEBUG ("Response: >%s<", response.c_str() ); - - if (MediaSet::getMediaSet()->empty() ) { - GST_ERROR ("Error creating media pipeline, terminating process"); - kill (getpid(), SIGINT); - } -} - -std::string -RabbitMQPipeline::processSubscription (std::shared_ptr< MediaObjectImpl > obj, - const std::string &sessionId, - const std::string &eventType, - const Json::Value ¶ms) -{ - std::string subscriptionId; - std::string eventId = obj->getId() + "/" + eventType; - std::shared_ptr handler; - std::unique_lock lock (mutex); - - if (handlers.find (eventId) != handlers.end() ) { - handler = handlers[eventId].lock(); - } - - if (!handler) { - handler = std::shared_ptr (new RabbitMQEventHandler (obj, - getConnection()->getAddress(), getConnection()->getPort(), - EVENT_EXCHANGE_PREFIX + pipelineId, eventId), - std::bind (&RabbitMQPipeline::destroyHandler, this, std::placeholders::_1) ); - - subscriptionId = processor->connectEventHandler (obj, sessionId, eventType, - handler); - handlers[eventId] = std::weak_ptr (handler); - } else { - subscriptionId = generateUUID(); - processor->registerEventHandler (obj, sessionId, subscriptionId, handler); - } - - return subscriptionId; -} - -void RabbitMQPipeline::destroyHandler (EventHandler *handler) -{ - RabbitMQEventHandler *rabbitHandler = dynamic_cast - (handler); - - if (rabbitHandler != NULL) { - std::string eventId; - std::unique_lock lock (mutex); - - eventId = rabbitHandler->getRoutingKey(); - - if (handlers.find (eventId) != handlers.end() ) { - handlers.erase (eventId); - } - } - - delete handler; -} - -RabbitMQPipeline::StaticConstructor RabbitMQPipeline::staticConstructor; - -RabbitMQPipeline::StaticConstructor::StaticConstructor() -{ - GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, - GST_DEFAULT_NAME); -} - -} /* kurento */ diff --git a/server/transport/rabbitmq/RabbitMQPipeline.hpp b/server/transport/rabbitmq/RabbitMQPipeline.hpp deleted file mode 100644 index 3ba064a52..000000000 --- a/server/transport/rabbitmq/RabbitMQPipeline.hpp +++ /dev/null @@ -1,63 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#ifndef __RABBITMQ_PIPELINE_HPP__ -#define __RABBITMQ_PIPELINE_HPP__ - -#include "RabbitMQListener.hpp" -#include -#include - -namespace kurento -{ - -class RabbitMQPipeline: private RabbitMQListener -{ -public: - RabbitMQPipeline (const boost::property_tree::ptree &config, - const std::string &address, - const int port, std::shared_ptr processor); - virtual ~RabbitMQPipeline() throw (); - virtual void startRequest (RabbitMQMessage &message); - - virtual std::string processSubscription (std::shared_ptr obj, - const std::string &sessionId, const std::string &eventType, - const Json::Value ¶ms); - -protected: - virtual void processMessage (RabbitMQMessage &message); - -private: - - void destroyHandler (EventHandler *handler); - virtual void reconnect (); - - std::shared_ptr processor; - std::map > handlers; - std::mutex mutex; - std::string pipelineId; - - class StaticConstructor - { - public: - StaticConstructor(); - }; - - static StaticConstructor staticConstructor; -}; - -} /* kurento */ - -#endif /* __RABBITMQ_PIPELINE_HPP__ */ diff --git a/server/transport/rabbitmq/RabbitMQReconnectStrategy.hpp b/server/transport/rabbitmq/RabbitMQReconnectStrategy.hpp deleted file mode 100644 index 87e83a71b..000000000 --- a/server/transport/rabbitmq/RabbitMQReconnectStrategy.hpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#ifndef __RABBITMQ_RECONNECT_STRATEGY_HPP__ -#define __RABBITMQ_RECONNECT_STRATEGY_HPP__ - -namespace kurento -{ - -class RabbitMQReconnectStrategy -{ -public: - virtual int getTimeout() = 0; - virtual void reset() = 0; -}; - -} /* kurento */ - -#endif /* __RABBITMQ_RECONNECT_STRATEGY_HPP__ */ \ No newline at end of file diff --git a/server/transport/rabbitmq/RabbitMQTransport.cpp b/server/transport/rabbitmq/RabbitMQTransport.cpp deleted file mode 100644 index fc4e1db06..000000000 --- a/server/transport/rabbitmq/RabbitMQTransport.cpp +++ /dev/null @@ -1,187 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#include -#include "RabbitMQTransport.hpp" - -#include -#include -#include -#include - -#define GST_CAT_DEFAULT kurento_rabbitmq_transport -GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); -#define GST_DEFAULT_NAME "KurentoRabbitMQTransport" - -/* This is included to avoid problems with slots and lamdas */ -#include -#include -#include -namespace sigc -{ -template -struct functor_trait { - typedef decltype (::sigc::mem_fun (std::declval (), - &Functor::operator() ) ) _intermediate; - - typedef typename _intermediate::result_type result_type; - typedef Functor functor_type; -}; -} - -/* Default config values */ -#define RABBITMQ_SERVER_ADDRESS_DEFAULT "127.0.0.1" -#define RABBITMQ_SERVER_PORT_DEFAULT 5672 - -#define PIPELINE_CREATION "pipeline_creation" - -namespace kurento -{ - -static void -check_port (int port) -{ - if (port <= 0 || port > G_MAXUSHORT) { - throw boost::property_tree::ptree_bad_data ("Invalid port value", port); - } -} - -RabbitMQTransport::RabbitMQTransport (const boost::property_tree::ptree &config, - std::shared_ptr processor) : processor (processor), config (config) -{ - sigset_t mask; - std::string address; - int port; - - try { - address = config.get ("mediaServer.net.rabbitmq.address"); - } catch (const boost::property_tree::ptree_error &err) { - GST_WARNING ("Setting default address %s to media server", - RABBITMQ_SERVER_ADDRESS_DEFAULT); - address = RABBITMQ_SERVER_ADDRESS_DEFAULT; - } - - try { - port = config.get ("mediaServer.net.rabbitmq.port"); - check_port (port); - } catch (const boost::property_tree::ptree_error &err) { - GST_WARNING ("Setting default port %d to media server", - RABBITMQ_SERVER_PORT_DEFAULT); - port = RABBITMQ_SERVER_PORT_DEFAULT; - } - - this->address = address; - this->port = port; - - setConfig (address, port); - - sigemptyset (&mask); - sigaddset (&mask, SIGCHLD); - signalHandler = std::shared_ptr (new SignalHandler (mask, - std::bind (&RabbitMQTransport::childSignal, this, std::placeholders::_1) ) ); - - setEventReconnectHandler (std::bind (&RabbitMQTransport::reconnect, this) ); - - // TODO: Set event processor -} - -RabbitMQTransport::~RabbitMQTransport() -{ -} - -void -RabbitMQTransport::processMessage (RabbitMQMessage &message) -{ - int pid; - - /* TODO: */ - /* Fork is an expensive operation so we should make sure that this */ - /* request is a valid creation pipeline request before forking the */ - /* process. Checking this stuff here does not seems a clean fix */ - /* because we would mixing protocol stuff in the transport layer. */ - - pid = fork(); - - if (pid < 0) { - throw RabbitMQException ("Proccess cannot be forked"); - } else if (pid == 0) { - /* Child process */ - childs.clear(); - stopListen(); - getConnection()->noCloseOnRelease(); - - pipeline = std::shared_ptr (new RabbitMQPipeline (config, - address, - port, processor) ); - pipeline->startRequest (message); - - } else { - /* Parent process */ - childs.push_back (pid); - message.noRejectOnRelease(); - } -} - -void -RabbitMQTransport::reconnect () -{ - GST_DEBUG ("Reconnected"); - listenQueue (PIPELINE_CREATION, true); -} - -void RabbitMQTransport::start () -{ - listenQueue (PIPELINE_CREATION, true); -} - -void RabbitMQTransport::stop () -{ - stopListen(); - GST_DEBUG ("stop transport"); - - if (pipeline) { - pipeline.reset(); - } - - for (auto pid : childs) { - GST_DEBUG ("Killing child %d", pid); - kill (pid, SIGINT); - } -} - -void -RabbitMQTransport::childSignal (uint32_t signal) -{ - int pid; - int status; - - pid = waitpid (-1, &status, WNOHANG); - - if (pid > 0) { - GST_DEBUG ("Child %d terminated", pid); - childs.remove (pid); - } -} - - -RabbitMQTransport::StaticConstructor RabbitMQTransport::staticConstructor; - -RabbitMQTransport::StaticConstructor::StaticConstructor() -{ - GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, - GST_DEFAULT_NAME); -} - -} /* kurento */ diff --git a/server/transport/rabbitmq/RabbitMQTransport.hpp b/server/transport/rabbitmq/RabbitMQTransport.hpp deleted file mode 100644 index a6032ba60..000000000 --- a/server/transport/rabbitmq/RabbitMQTransport.hpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#ifndef __RABBITMQ_TRANSPORT_HPP__ -#define __RABBITMQ_TRANSPORT_HPP__ - -#include "Transport.hpp" -#include "RabbitMQListener.hpp" -#include "RabbitMQPipeline.hpp" -#include - -namespace kurento -{ - -class RabbitMQTransport: private RabbitMQListener, public Transport -{ -public: - RabbitMQTransport (const boost::property_tree::ptree &config, - std::shared_ptr processor); - virtual ~RabbitMQTransport() throw (); - virtual void start (); - virtual void stop (); - -protected: - virtual void processMessage (RabbitMQMessage &message); - -private: - void childSignal (uint32_t signal); - void reconnect (); - - std::shared_ptr processor; - std::list childs; - std::shared_ptr pipeline; - - std::string address; - int port; - - const boost::property_tree::ptree &config; - - std::shared_ptr signalHandler; - - class StaticConstructor - { - public: - StaticConstructor(); - }; - - static StaticConstructor staticConstructor; -}; - -} /* kurento */ - -#endif /* __RABBITMQ_TRANSPORT_HPP__ */ diff --git a/server/transport/rabbitmq/RabbitMQTransportFactory.cpp b/server/transport/rabbitmq/RabbitMQTransportFactory.cpp deleted file mode 100644 index bec3603a9..000000000 --- a/server/transport/rabbitmq/RabbitMQTransportFactory.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#include "RabbitMQTransportFactory.hpp" -#include "RabbitMQTransport.hpp" - -namespace kurento -{ - -std::shared_ptr RabbitMQTransportFactory::create ( - const boost::property_tree::ptree &config, std::shared_ptr processor) -{ - return std::shared_ptr (new RabbitMQTransport (config, processor) ); -} - -} /* kurento */ diff --git a/server/transport/rabbitmq/RabbitMQTransportFactory.hpp b/server/transport/rabbitmq/RabbitMQTransportFactory.hpp deleted file mode 100644 index 51ee86cb1..000000000 --- a/server/transport/rabbitmq/RabbitMQTransportFactory.hpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * (C) Copyright 2014 Kurento (http://kurento.org/) - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the GNU Lesser General Public License - * (LGPL) version 2.1 which accompanies this distribution, and is available at - * http://www.gnu.org/licenses/lgpl-2.1.html - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - */ - -#ifndef __RABBITMQ_TRANSPORT_FACTORY_HPP__ -#define __RABBITMQ_TRANSPORT_FACTORY_HPP__ - -#include - -namespace kurento -{ - -class RabbitMQTransportFactory: public TransportFactory -{ -public: - RabbitMQTransportFactory () {}; - virtual ~RabbitMQTransportFactory() throw () {}; - - virtual std::string getName () { - return "rabbitmq"; - } - - virtual std::shared_ptr create (const boost::property_tree::ptree - &config, std::shared_ptr processor); - -}; - -} /* kurento */ - -#endif /* __RABBITMQ_TRANSPORT_FACTORY_HPP__ */