diff --git a/CMakeLists.txt b/CMakeLists.txt index cd23cd82..a313214f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,7 +20,7 @@ project(${PROJECT} CXX) ### # compilation options ### -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3") ### @@ -60,7 +60,7 @@ endforeach() # executable ### add_library(${PROJECT} SHARED ${SOURCES}) -target_link_libraries(${PROJECT} boost_system) +target_link_libraries(${PROJECT} pthread) set_target_properties(${PROJECT} PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") @@ -76,7 +76,7 @@ install (DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include) # examples ### IF (BUILD_EXAMPLES) - add_subdirectory(examples) + add_subdirectory(examples) ENDIF(BUILD_EXAMPLES) @@ -84,5 +84,5 @@ ENDIF(BUILD_EXAMPLES) # tests ### IF (BUILD_TESTS) - add_subdirectory(tests) + add_subdirectory(tests) ENDIF(BUILD_TESTS) diff --git a/LICENSE b/LICENSE index 90e9b0fb..85f04851 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2015 Simon Ninon +Copyright (c) 2015-2016 Simon Ninon Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -19,4 +19,3 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - diff --git a/README.md b/README.md index cad6a46e..b9818a4f 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,10 @@ # cpp_redis cpp_redis is C++11 Asynchronous Redis Client. -Network is based on Boost Asio library. +Network is based on raw sockets API. This, library is really lightweight. ## Requirements * C++11 -* Boost Asio ## Compiling The library uses `cmake`. In order to build the library, follow these steps: @@ -62,7 +61,7 @@ Reply callback is an `std::function`. ### Example ```cpp -#include "cpp_redis/cpp_redis" +#include #include #include @@ -72,28 +71,31 @@ cpp_redis::redis_client client; void sigint_handler(int) { - std::cout << "disconnected (sigint handler)" << std::endl; - client.disconnect(); + std::cout << "disconnected (sigint handler)" << std::endl; + client.disconnect(); + should_exit = true; } int main(void) { - client.set_disconnection_handler([] (cpp_redis::redis_client&) { - std::cout << "client disconnected (disconnection handler)" << std::endl; - should_exit = true; - }); + client.set_disconnection_handler([] (cpp_redis::redis_client&) { + std::cout << "client disconnected (disconnection handler)" << std::endl; + should_exit = true; + }); - client.connect(); + client.connect(); - client.send({"SET", "hello", "world"}); - client.send({"GET", "hello"}, [] (cpp_redis::reply& reply) { - std::cout << reply.as_string() << std::endl; - }); + client.send({"SET", "hello", "world"}, [] (cpp_redis::reply& reply) { + std::cout << reply.as_string() << std::endl; + }); + client.send({"GET", "hello"}, [] (cpp_redis::reply& reply) { + std::cout << reply.as_string() << std::endl; + }); - signal(SIGINT, &sigint_handler); - while (not should_exit); + signal(SIGINT, &sigint_handler); + while (not should_exit); - return 0; + return 0; } ``` @@ -133,7 +135,7 @@ Unsubscribe from the given pattern. ### Example ```cpp -#include "cpp_redis/cpp_redis" +#include #include #include @@ -145,6 +147,7 @@ void sigint_handler(int) { std::cout << "disconnected (sigint handler)" << std::endl; sub.disconnect(); + should_exit = true; } int diff --git a/examples/redis_client.cpp b/examples/redis_client.cpp index fd341c86..57100954 100644 --- a/examples/redis_client.cpp +++ b/examples/redis_client.cpp @@ -8,26 +8,29 @@ cpp_redis::redis_client client; void sigint_handler(int) { - std::cout << "disconnected (sigint handler)" << std::endl; - client.disconnect(); + std::cout << "disconnected (sigint handler)" << std::endl; + client.disconnect(); + should_exit = true; } int main(void) { - client.set_disconnection_handler([] (cpp_redis::redis_client&) { - std::cout << "client disconnected (disconnection handler)" << std::endl; - should_exit = true; - }); + client.set_disconnection_handler([] (cpp_redis::redis_client&) { + std::cout << "client disconnected (disconnection handler)" << std::endl; + should_exit = true; + }); - client.connect(); + client.connect(); - client.send({"SET", "hello", "world"}); - client.send({"GET", "hello"}, [] (cpp_redis::reply& reply) { - std::cout << reply.as_string() << std::endl; - }); + client.send({"SET", "hello", "world"}, [] (cpp_redis::reply& reply) { + std::cout << reply.as_string() << std::endl; + }); + client.send({"GET", "hello"}, [] (cpp_redis::reply& reply) { + std::cout << reply.as_string() << std::endl; + }); - signal(SIGINT, &sigint_handler); - while (not should_exit); + signal(SIGINT, &sigint_handler); + while (not should_exit); - return 0; + return 0; } diff --git a/examples/redis_subscriber.cpp b/examples/redis_subscriber.cpp index 390acb20..1053f4a5 100644 --- a/examples/redis_subscriber.cpp +++ b/examples/redis_subscriber.cpp @@ -8,28 +8,29 @@ cpp_redis::redis_subscriber sub; void sigint_handler(int) { - std::cout << "disconnected (sigint handler)" << std::endl; - sub.disconnect(); + std::cout << "disconnected (sigint handler)" << std::endl; + sub.disconnect(); + should_exit = true; } int main(void) { - sub.set_disconnection_handler([] (cpp_redis::redis_subscriber&) { - std::cout << "sub disconnected (disconnection handler)" << std::endl; - should_exit = true; - }); + sub.set_disconnection_handler([] (cpp_redis::redis_subscriber&) { + std::cout << "sub disconnected (disconnection handler)" << std::endl; + should_exit = true; + }); - sub.connect(); + sub.connect(); - sub.subscribe("some_chan", [] (const std::string& chan, const std::string& msg) { - std::cout << "MESSAGE " << chan << ": " << msg << std::endl; - }); - sub.psubscribe("*", [] (const std::string& chan, const std::string& msg) { - std::cout << "PMESSAGE " << chan << ": " << msg << std::endl; - }); + sub.subscribe("some_chan", [] (const std::string& chan, const std::string& msg) { + std::cout << "MESSAGE " << chan << ": " << msg << std::endl; + }); + sub.psubscribe("*", [] (const std::string& chan, const std::string& msg) { + std::cout << "PMESSAGE " << chan << ": " << msg << std::endl; + }); - signal(SIGINT, &sigint_handler); - while (not should_exit); + signal(SIGINT, &sigint_handler); + while (not should_exit); - return 0; + return 0; } diff --git a/includes/cpp_redis/builders/array_builder.hpp b/includes/cpp_redis/builders/array_builder.hpp index 87d1f4d4..726c7116 100644 --- a/includes/cpp_redis/builders/array_builder.hpp +++ b/includes/cpp_redis/builders/array_builder.hpp @@ -10,32 +10,32 @@ namespace builders { class array_builder : public builder_iface { public: - //! ctor & dtor - array_builder(void); - ~array_builder(void) = default; + //! ctor & dtor + array_builder(void); + ~array_builder(void) = default; - //! copy ctor & assignment operator - array_builder(const array_builder&) = delete; - array_builder& operator=(const array_builder&) = delete; + //! copy ctor & assignment operator + array_builder(const array_builder&) = delete; + array_builder& operator=(const array_builder&) = delete; public: - //! builder_iface impl - builder_iface& operator<<(std::string&); - bool reply_ready(void) const; - reply get_reply(void) const; + //! builder_iface impl + builder_iface& operator<<(std::string&); + bool reply_ready(void) const; + reply get_reply(void) const; private: - bool fetch_array_size(std::string& buffer); - bool build_row(std::string& buffer); + bool fetch_array_size(std::string& buffer); + bool build_row(std::string& buffer); private: - integer_builder m_int_builder; - unsigned int m_array_size; + integer_builder m_int_builder; + unsigned int m_array_size; - std::unique_ptr m_current_builder; + std::unique_ptr m_current_builder; - bool m_reply_ready; - replies::array_reply m_reply; + bool m_reply_ready; + replies::array_reply m_reply; }; } //! builders diff --git a/includes/cpp_redis/builders/builder_iface.hpp b/includes/cpp_redis/builders/builder_iface.hpp index 57ea8f30..80d9286d 100644 --- a/includes/cpp_redis/builders/builder_iface.hpp +++ b/includes/cpp_redis/builders/builder_iface.hpp @@ -12,17 +12,17 @@ namespace builders { //! interface inherited by all builders class builder_iface { public: - virtual ~builder_iface(void) = default; + virtual ~builder_iface(void) = default; - //! take data as parameter which is consumed to build the reply - //! every bytes used to build the reply must be removed from the buffer passed as parameter - virtual builder_iface& operator<<(std::string&) = 0; + //! take data as parameter which is consumed to build the reply + //! every bytes used to build the reply must be removed from the buffer passed as parameter + virtual builder_iface& operator<<(std::string&) = 0; - //! return whether the reply could be built - virtual bool reply_ready(void) const = 0; + //! return whether the reply could be built + virtual bool reply_ready(void) const = 0; - //! return reply object - virtual reply get_reply(void) const = 0; + //! return reply object + virtual reply get_reply(void) const = 0; }; } //! builders diff --git a/includes/cpp_redis/builders/bulk_string_builder.hpp b/includes/cpp_redis/builders/bulk_string_builder.hpp index b758a368..605412dd 100644 --- a/includes/cpp_redis/builders/bulk_string_builder.hpp +++ b/includes/cpp_redis/builders/bulk_string_builder.hpp @@ -10,39 +10,39 @@ namespace builders { class bulk_string_builder : public builder_iface { public: - //! ctor & dtor - bulk_string_builder(void); - ~bulk_string_builder(void) = default; + //! ctor & dtor + bulk_string_builder(void); + ~bulk_string_builder(void) = default; - //! copy ctor & assignment operator - bulk_string_builder(const bulk_string_builder&) = delete; - bulk_string_builder& operator=(const bulk_string_builder&) = delete; + //! copy ctor & assignment operator + bulk_string_builder(const bulk_string_builder&) = delete; + bulk_string_builder& operator=(const bulk_string_builder&) = delete; public: - //! builder_iface impl - builder_iface& operator<<(std::string&); - bool reply_ready(void) const; - reply get_reply(void) const; + //! builder_iface impl + builder_iface& operator<<(std::string&); + bool reply_ready(void) const; + reply get_reply(void) const; - //! getter - const std::string& get_bulk_string(void) const; - bool is_null(void) const; + //! getter + const std::string& get_bulk_string(void) const; + bool is_null(void) const; private: - void build_reply(void); - bool fetch_size(std::string& str); - void fetch_str(std::string& str); + void build_reply(void); + bool fetch_size(std::string& str); + void fetch_str(std::string& str); private: - //! used to get bulk string size - integer_builder m_int_builder; + //! used to get bulk string size + integer_builder m_int_builder; - int m_str_size; - std::string m_str; - bool m_is_null; + int m_str_size; + std::string m_str; + bool m_is_null; - bool m_reply_ready; - replies::bulk_string_reply m_reply; + bool m_reply_ready; + replies::bulk_string_reply m_reply; }; } //! builders diff --git a/includes/cpp_redis/builders/error_builder.hpp b/includes/cpp_redis/builders/error_builder.hpp index 6be5b365..9730cd48 100644 --- a/includes/cpp_redis/builders/error_builder.hpp +++ b/includes/cpp_redis/builders/error_builder.hpp @@ -10,26 +10,26 @@ namespace builders { class error_builder : public builder_iface { public: - //! ctor & dtor - error_builder(void) = default; - ~error_builder(void) = default; + //! ctor & dtor + error_builder(void) = default; + ~error_builder(void) = default; - //! copy ctor & assignment operator - error_builder(const error_builder&) = delete; - error_builder& operator=(const error_builder&) = delete; + //! copy ctor & assignment operator + error_builder(const error_builder&) = delete; + error_builder& operator=(const error_builder&) = delete; public: - //! builder_iface impl - builder_iface& operator<<(std::string&); - bool reply_ready(void) const; - reply get_reply(void) const; + //! builder_iface impl + builder_iface& operator<<(std::string&); + bool reply_ready(void) const; + reply get_reply(void) const; - //! getter - const std::string& get_error(void) const; + //! getter + const std::string& get_error(void) const; private: - simple_string_builder m_string_builder; - replies::error_reply m_reply; + simple_string_builder m_string_builder; + replies::error_reply m_reply; }; } //! builders diff --git a/includes/cpp_redis/builders/integer_builder.hpp b/includes/cpp_redis/builders/integer_builder.hpp index 3a71c273..359b1126 100644 --- a/includes/cpp_redis/builders/integer_builder.hpp +++ b/includes/cpp_redis/builders/integer_builder.hpp @@ -9,29 +9,29 @@ namespace builders { class integer_builder : public builder_iface { public: - //! ctor & dtor - integer_builder(void); - ~integer_builder(void) = default; + //! ctor & dtor + integer_builder(void); + ~integer_builder(void) = default; - //! copy ctor & assignment operator - integer_builder(const integer_builder&) = delete; - integer_builder& operator=(const integer_builder&) = delete; + //! copy ctor & assignment operator + integer_builder(const integer_builder&) = delete; + integer_builder& operator=(const integer_builder&) = delete; public: - //! builder_iface impl - builder_iface& operator<<(std::string&); - bool reply_ready(void) const; - reply get_reply(void) const; + //! builder_iface impl + builder_iface& operator<<(std::string&); + bool reply_ready(void) const; + reply get_reply(void) const; - //! getter - int get_integer(void) const; + //! getter + int get_integer(void) const; private: - int m_nbr; - char m_negative_multiplicator; - bool m_reply_ready; + int m_nbr; + char m_negative_multiplicator; + bool m_reply_ready; - replies::integer_reply m_reply; + replies::integer_reply m_reply; }; } //! builders diff --git a/includes/cpp_redis/builders/reply_builder.hpp b/includes/cpp_redis/builders/reply_builder.hpp index 64f8b1c5..ea4253f0 100644 --- a/includes/cpp_redis/builders/reply_builder.hpp +++ b/includes/cpp_redis/builders/reply_builder.hpp @@ -14,34 +14,34 @@ namespace builders { class reply_builder { public: - //! ctor & dtor - reply_builder(void); - ~reply_builder(void) = default; + //! ctor & dtor + reply_builder(void); + ~reply_builder(void) = default; - //! copy ctor & assignment operator - reply_builder(const reply_builder&) = delete; - reply_builder& operator=(const reply_builder&) = delete; + //! copy ctor & assignment operator + reply_builder(const reply_builder&) = delete; + reply_builder& operator=(const reply_builder&) = delete; public: - //! add data to reply builder - reply_builder& operator<<(const std::string& data); + //! add data to reply builder + reply_builder& operator<<(const std::string& data); - //! get reply - void operator>>(reply& reply); - const reply& get_front(void) const; - void pop_front(void); + //! get reply + void operator>>(reply& reply); + const reply& get_front(void) const; + void pop_front(void); - //! returns whether a reply is available - bool reply_available(void) const; + //! returns whether a reply is available + bool reply_available(void) const; private: - //! build reply. Return whether the reply has been fully built or not - bool build_reply(void); + //! build reply. Return whether the reply has been fully built or not + bool build_reply(void); private: - std::string m_buffer; - std::unique_ptr m_builder; - std::deque m_available_replies; + std::string m_buffer; + std::unique_ptr m_builder; + std::deque m_available_replies; }; } //! builders diff --git a/includes/cpp_redis/builders/simple_string_builder.hpp b/includes/cpp_redis/builders/simple_string_builder.hpp index a0245fc8..e4ffbb9b 100644 --- a/includes/cpp_redis/builders/simple_string_builder.hpp +++ b/includes/cpp_redis/builders/simple_string_builder.hpp @@ -11,28 +11,28 @@ namespace builders { class simple_string_builder : public builder_iface { public: - //! ctor & dtor - simple_string_builder(void); - ~simple_string_builder(void) = default; + //! ctor & dtor + simple_string_builder(void); + ~simple_string_builder(void) = default; - //! copy ctor & assignment operator - simple_string_builder(const simple_string_builder&) = delete; - simple_string_builder& operator=(const simple_string_builder&) = delete; + //! copy ctor & assignment operator + simple_string_builder(const simple_string_builder&) = delete; + simple_string_builder& operator=(const simple_string_builder&) = delete; public: - //! builder_iface impl - builder_iface& operator<<(std::string&); - bool reply_ready(void) const; - reply get_reply(void) const; + //! builder_iface impl + builder_iface& operator<<(std::string&); + bool reply_ready(void) const; + reply get_reply(void) const; - //! getter - const std::string& get_simple_string(void) const; + //! getter + const std::string& get_simple_string(void) const; private: - std::string m_str; - bool m_reply_ready; + std::string m_str; + bool m_reply_ready; - replies::simple_string_reply m_reply; + replies::simple_string_reply m_reply; }; } //! builders diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp index f20a4bc4..3cf2086d 100644 --- a/includes/cpp_redis/network/io_service.hpp +++ b/includes/cpp_redis/network/io_service.hpp @@ -1,35 +1,104 @@ #pragma once #include -#include +#include +#include +#include +#include + +#include +#include +#include namespace cpp_redis { namespace network { -//! boost io service wrapper class io_service { public: - //! ctor & dtor - io_service(void); - ~io_service(void); + //! instance getter (singleton pattern) + static io_service& get_instance(void); + +private: + //! ctor & dtor + io_service(void); + ~io_service(void); - //! copy ctor & assignment operator - io_service(const io_service&) = delete; - io_service& operator=(const io_service&) = delete; + //! copy ctor & assignment operator + io_service(const io_service&) = delete; + io_service& operator=(const io_service&) = delete; public: - //! methods - void run(void); - void post(const std::function& fct); + //! disconnection handler declaration + typedef std::function disconnection_handler_t; + + //! add or remove a given fd from the io service + //! untrack should never be called from inside a callback + void track(int fd, const disconnection_handler_t& handler); + void untrack(int fd); + + //! asynchronously read read_size bytes and append them to the given buffer + //! on completion, call the read_callback to notify of the success or failure of the operation + //! return false if another async_read operation is in progress or fd is not registered + typedef std::function read_callback_t; + bool async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); + + //! asynchronously write write_size bytes from buffer to the specified fd + //!on completion, call the write_callback to notify of the success or failure of the operation + //! return false if another async_write operation is in progress or fd is not registered + typedef std::function write_callback_t; + bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); + +private: + //! simple struct to keep track of ongoing operations on a given fd + struct fd_info { + disconnection_handler_t disconnection_handler; + + std::atomic_bool async_read; + std::vector* read_buffer; + std::size_t read_size; + read_callback_t read_callback; - //! getters - boost::asio::io_service& get(void); + std::atomic_bool async_write; + std::vector write_buffer; + std::size_t write_size; + write_callback_t write_callback; + }; private: - boost::asio::io_service m_io_service; - boost::asio::io_service::work m_work; - std::thread m_io_service_thread; + //! listen for incoming events and notify + void listen(void); + + //! notify the select call so that it can wake up to process new events + void notify_select(void); + +private: + //! select fds sets handling (init, rd/wr handling) + int init_sets(fd_set* rd_set, fd_set* wr_set); + void read_fd(int fd); + void write_fd(int fd); + void process_sets(fd_set* rd_set, fd_set* wr_set); + +private: + //! whether the worker should terminate or not + std::atomic_bool m_should_stop; + + //! worker in the background, listening for events + std::thread m_worker; + + //! tracked fds + std::unordered_map m_fds; + + //! fd associated to the pipe used to wake up the select call + int m_notif_pipe_fds[2]; + + //! mutex to protect m_fds access against race condition + //! + //! specific mutex for untrack: we dont want someone to untrack a fd while we process it + //! this behavior could cause some issues when executing callbacks in another thread + //! for example, obj is destroyed, in its dtor it untracks the fd, but at the same time + //! a callback is executed from within another thread: the untrack mutex avoid this without being costly + std::recursive_mutex m_fds_mutex; }; } //! network diff --git a/includes/cpp_redis/network/redis_connection.hpp b/includes/cpp_redis/network/redis_connection.hpp index 1c167a8a..a0a721f0 100644 --- a/includes/cpp_redis/network/redis_connection.hpp +++ b/includes/cpp_redis/network/redis_connection.hpp @@ -14,54 +14,54 @@ namespace network { class redis_connection { public: - //! ctor & dtor - redis_connection(void); - ~redis_connection(void); + //! ctor & dtor + redis_connection(void); + ~redis_connection(void); - //! copy ctor & assignment operator - redis_connection(const redis_connection&) = delete; - redis_connection& operator=(const redis_connection&) = delete; + //! copy ctor & assignment operator + redis_connection(const redis_connection&) = delete; + redis_connection& operator=(const redis_connection&) = delete; public: - //! handle connection - void connect(const std::string& host = "127.0.0.1", unsigned int port = 6379); - void disconnect(void); - bool is_connected(void); + //! handle connection + void connect(const std::string& host = "127.0.0.1", unsigned int port = 6379); + void disconnect(void); + bool is_connected(void); - //! disconnection handler - typedef std::function disconnection_handler; - void set_disconnection_handler(const disconnection_handler& handler); + //! disconnection handler + typedef std::function disconnection_handler_t; + void set_disconnection_handler(const disconnection_handler_t& handler); - //! send cmd - void send(const std::vector& redis_cmd); + //! send cmd + void send(const std::vector& redis_cmd); - //! receive handler - typedef std::function reply_callback; - void set_reply_callback(const reply_callback& handler); + //! receive handler + typedef std::function reply_callback_t; + void set_reply_callback(const reply_callback_t& handler); private: - //! receive & disconnection handlers - bool tcp_client_receive_handler(network::tcp_client&, const std::vector& buffer); - void tcp_client_disconnection_handler(network::tcp_client&); + //! receive & disconnection handlers + bool tcp_client_receive_handler(network::tcp_client&, const std::vector& buffer); + void tcp_client_disconnection_handler(network::tcp_client&); - std::string build_command(const std::vector& redis_cmd); + std::string build_command(const std::vector& redis_cmd); private: - //! tcp client for redis connection - network::tcp_client m_client; + //! tcp client for redis connection + network::tcp_client m_client; - //! reply callback - reply_callback m_reply_callback; + //! reply callback + reply_callback_t m_reply_callback; - //! user defined disconnection handler - disconnection_handler m_disconnection_handler; + //! user defined disconnection handler + disconnection_handler_t m_disconnection_handler; - //! reply builder - builders::reply_builder m_builder; + //! reply builder + builders::reply_builder m_builder; - //! thread safety - std::mutex m_disconnection_handler_mutex; - std::mutex m_reply_callback_mutex; + //! thread safety + std::mutex m_disconnection_handler_mutex; + std::mutex m_reply_callback_mutex; }; } //! network diff --git a/includes/cpp_redis/network/tcp_client.hpp b/includes/cpp_redis/network/tcp_client.hpp index ca3e9677..40b2b92b 100644 --- a/includes/cpp_redis/network/tcp_client.hpp +++ b/includes/cpp_redis/network/tcp_client.hpp @@ -1,16 +1,15 @@ #pragma once +#include +#include #include #include #include #include -#include #include "cpp_redis/network/io_service.hpp" #include "cpp_redis/redis_error.hpp" -using boost::asio::ip::tcp; - namespace cpp_redis { namespace network { @@ -19,66 +18,67 @@ namespace network { //! async tcp client based on boost asio class tcp_client { public: - //! ctor & dtor - tcp_client(void); - ~tcp_client(void); + //! ctor & dtor + tcp_client(void); + ~tcp_client(void); - //! assignment operator & copy ctor - tcp_client(const tcp_client&) = delete; - tcp_client& operator=(const tcp_client&) = delete; + //! assignment operator & copy ctor + tcp_client(const tcp_client&) = delete; + tcp_client& operator=(const tcp_client&) = delete; - //! returns whether the client is connected or not - bool is_connected(void); + //! returns whether the client is connected or not + bool is_connected(void); - //! handle connection & disconnection - void connect(const std::string& host, unsigned int port); - void disconnect(void); + //! handle connection & disconnection + void connect(const std::string& host, unsigned int port); + void disconnect(void); - //! send data - void send(const std::string& buffer); - void send(const std::vector& buffer); + //! send data + void send(const std::string& buffer); + void send(const std::vector& buffer); - //! set receive callback - //! called each time some data has been received - //! Returns true if everything is ok, false otherwise (trigger disconnection) - typedef std::function& buffer)> receive_handler; - void set_receive_handler(const receive_handler& handler); + //! set receive callback + //! called each time some data has been received + //! Returns true if everything is ok, false otherwise (trigger disconnection) + typedef std::function& buffer)> receive_handler_t; + void set_receive_handler(const receive_handler_t& handler); - //! set disconnection callback - //! called each time a disconnection has been detected - typedef std::function disconnection_handler; - void set_disconnection_handler(const disconnection_handler& handler); + //! set disconnection callback + //! called each time a disconnection has been detected + typedef std::function disconnection_handler_t; + void set_disconnection_handler(const disconnection_handler_t& handler); private: - //! make boost asio async read and write operations - void async_read(void); - void async_write(void); + //! make boost asio async read and write operations + void async_read(void); + void async_write(void); - //! close socket and call disconnect callback - //! called in case of error - void process_disconnection(void); + //! io service callback + void io_service_disconnection_handler(io_service&); private: - //! io service - static io_service m_io_service; - tcp::socket m_socket; - - //! is connected - std::atomic_bool m_is_connected; - - //! buffers - static const unsigned int READ_SIZE = 2048; - std::vector m_read_buffer; - std::vector m_write_buffer; - - //! handlers - receive_handler m_receive_handler; - disconnection_handler m_disconnection_handler; - - //! thread safety - std::mutex m_write_buffer_mutex; - std::mutex m_receive_handler_mutex; - std::mutex m_disconnection_handler_mutex; + //! io service instance + io_service& m_io_service; + + //! socket fd + int m_fd; + + //! is connected + std::atomic_bool m_is_connected; + + //! buffers + static const unsigned int READ_SIZE = 2048; + std::vector m_read_buffer; + std::vector m_write_buffer; + + //! handlers + receive_handler_t m_receive_handler; + disconnection_handler_t m_disconnection_handler; + + //! thread safety + std::mutex m_write_buffer_mutex; + std::mutex m_receive_handler_mutex; + std::mutex m_disconnection_handler_mutex; }; } //! network diff --git a/includes/cpp_redis/redis_client.hpp b/includes/cpp_redis/redis_client.hpp index 0db85de9..5c755f81 100644 --- a/includes/cpp_redis/redis_client.hpp +++ b/includes/cpp_redis/redis_client.hpp @@ -12,49 +12,49 @@ namespace cpp_redis { class redis_client { public: - //! ctor & dtor - redis_client(void); - ~redis_client(void); + //! ctor & dtor + redis_client(void); + ~redis_client(void); - //! copy ctor & assignment operator - redis_client(const redis_client&) = delete; - redis_client& operator=(const redis_client&) = delete; + //! copy ctor & assignment operator + redis_client(const redis_client&) = delete; + redis_client& operator=(const redis_client&) = delete; public: - //! handle connection - void connect(const std::string& host = "127.0.0.1", unsigned int port = 6379); - void disconnect(void); - bool is_connected(void); + //! handle connection + void connect(const std::string& host = "127.0.0.1", unsigned int port = 6379); + void disconnect(void); + bool is_connected(void); - //! disconnection handler - typedef std::function disconnection_handler; - void set_disconnection_handler(const disconnection_handler& handler); + //! disconnection handler + typedef std::function disconnection_handler_t; + void set_disconnection_handler(const disconnection_handler_t& handler); - //! send cmd - typedef std::function reply_callback; - void send(const std::vector& redis_cmd, const reply_callback& callback = nullptr); + //! send cmd + typedef std::function reply_callback_t; + void send(const std::vector& redis_cmd, const reply_callback_t& callback = nullptr); private: - //! receive & disconnection handlers - void connection_receive_handler(network::redis_connection&, reply& reply); - void connection_disconnection_handler(network::redis_connection&); + //! receive & disconnection handlers + void connection_receive_handler(network::redis_connection&, reply& reply); + void connection_disconnection_handler(network::redis_connection&); - void clear_callbacks(void); - void call_disconnection_handler(void); + void clear_callbacks(void); + void call_disconnection_handler(void); private: - //! tcp client for redis connection - network::redis_connection m_client; + //! tcp client for redis connection + network::redis_connection m_client; - //! queue of callback to process - std::queue m_callbacks; + //! queue of callback to process + std::queue m_callbacks; - //! user defined disconnection handler - disconnection_handler m_disconnection_handler; + //! user defined disconnection handler + disconnection_handler_t m_disconnection_handler; - //! thread safety - std::mutex m_disconnection_handler_mutex; - std::mutex m_callbacks_mutex; + //! thread safety + std::mutex m_disconnection_handler_mutex; + std::mutex m_callbacks_mutex; }; } //! cpp_redis diff --git a/includes/cpp_redis/redis_error.hpp b/includes/cpp_redis/redis_error.hpp index 0bb565fa..45ac40c2 100644 --- a/includes/cpp_redis/redis_error.hpp +++ b/includes/cpp_redis/redis_error.hpp @@ -6,8 +6,8 @@ namespace cpp_redis { class redis_error : public std::runtime_error { public: - using std::runtime_error::runtime_error; - using std::runtime_error::what; + using std::runtime_error::runtime_error; + using std::runtime_error::what; }; } //! cpp_redis diff --git a/includes/cpp_redis/redis_subscriber.hpp b/includes/cpp_redis/redis_subscriber.hpp index b3393f8f..8ab0fcac 100644 --- a/includes/cpp_redis/redis_subscriber.hpp +++ b/includes/cpp_redis/redis_subscriber.hpp @@ -14,7 +14,7 @@ class redis_subscriber { public: //! ctor & dtor redis_subscriber(void); - ~redis_subscriber(void) = default; + ~redis_subscriber(void); //! copy ctor & assignment operator redis_subscriber(const redis_subscriber&) = delete; @@ -27,13 +27,13 @@ class redis_subscriber { bool is_connected(void); //! disconnection handler - typedef std::function disconnection_handler; - void set_disconnection_handler(const disconnection_handler& handler); + typedef std::function disconnection_handler_t; + void set_disconnection_handler(const disconnection_handler_t& handler); //! subscribe - unsubscribe - typedef std::function subscribe_callback; - void subscribe(const std::string& channel, const subscribe_callback& callback); - void psubscribe(const std::string& pattern, const subscribe_callback& callback); + typedef std::function subscribe_callback_t; + void subscribe(const std::string& channel, const subscribe_callback_t& callback); + void psubscribe(const std::string& pattern, const subscribe_callback_t& callback); void unsubscribe(const std::string& channel); void punsubscribe(const std::string& pattern); @@ -49,11 +49,11 @@ class redis_subscriber { network::redis_connection m_client; //! (p)subscribed channels and their associated channels - std::map m_subscribed_channels; - std::map m_psubscribed_channels; + std::map m_subscribed_channels; + std::map m_psubscribed_channels; //! disconnection handler - disconnection_handler m_disconnection_handler; + disconnection_handler_t m_disconnection_handler; //! thread safety std::mutex m_disconnection_handler_mutex; diff --git a/includes/cpp_redis/replies/array_reply.hpp b/includes/cpp_redis/replies/array_reply.hpp index 0df13bcd..bc5475b7 100644 --- a/includes/cpp_redis/replies/array_reply.hpp +++ b/includes/cpp_redis/replies/array_reply.hpp @@ -12,29 +12,29 @@ namespace replies { class array_reply : public reply { public: - //! ctor & dtor - array_reply(const std::vector& rows = {}); - ~array_reply(void) = default; + //! ctor & dtor + array_reply(const std::vector& rows = {}); + ~array_reply(void) = default; - //! copy ctor & assignment operator - array_reply(const array_reply&) = default; - array_reply& operator=(const array_reply&) = default; + //! copy ctor & assignment operator + array_reply(const array_reply&) = default; + array_reply& operator=(const array_reply&) = default; public: - //! getters - unsigned int size(void) const; - const std::vector& get_rows(void) const; + //! getters + unsigned int size(void) const; + const std::vector& get_rows(void) const; - const cpp_redis::reply& get(unsigned int idx) const; - const cpp_redis::reply& operator[](unsigned int idx) const; + const cpp_redis::reply& get(unsigned int idx) const; + const cpp_redis::reply& operator[](unsigned int idx) const; - //! setters - void set_rows(const std::vector& rows); - void add_row(const cpp_redis::reply& row); - void operator<<(const cpp_redis::reply& row); + //! setters + void set_rows(const std::vector& rows); + void add_row(const cpp_redis::reply& row); + void operator<<(const cpp_redis::reply& row); private: - std::vector m_rows; + std::vector m_rows; }; } //! replies diff --git a/includes/cpp_redis/replies/bulk_string_reply.hpp b/includes/cpp_redis/replies/bulk_string_reply.hpp index 59d23c3b..81b54000 100644 --- a/includes/cpp_redis/replies/bulk_string_reply.hpp +++ b/includes/cpp_redis/replies/bulk_string_reply.hpp @@ -10,26 +10,26 @@ namespace replies { class bulk_string_reply : public reply { public: - //! ctor & dtor - bulk_string_reply(bool is_null = false, const std::string& bulk_string = ""); - ~bulk_string_reply(void) = default; + //! ctor & dtor + bulk_string_reply(bool is_null = false, const std::string& bulk_string = ""); + ~bulk_string_reply(void) = default; - //! copy ctor & assignment operator - bulk_string_reply(const bulk_string_reply&) = default; - bulk_string_reply& operator=(const bulk_string_reply&) = default; + //! copy ctor & assignment operator + bulk_string_reply(const bulk_string_reply&) = default; + bulk_string_reply& operator=(const bulk_string_reply&) = default; public: - //! getters - bool is_null(void) const; - const std::string& str(void) const; + //! getters + bool is_null(void) const; + const std::string& str(void) const; - //! setters - void is_null(bool is_null); - void str(const std::string& bulk_string); + //! setters + void is_null(bool is_null); + void str(const std::string& bulk_string); private: - std::string m_str; - bool m_is_null; + std::string m_str; + bool m_is_null; }; } //! replies diff --git a/includes/cpp_redis/replies/error_reply.hpp b/includes/cpp_redis/replies/error_reply.hpp index 48504310..8d447152 100644 --- a/includes/cpp_redis/replies/error_reply.hpp +++ b/includes/cpp_redis/replies/error_reply.hpp @@ -10,23 +10,23 @@ namespace replies { class error_reply : public reply { public: - //! ctor & dtor - error_reply(const std::string& error = ""); - ~error_reply(void) = default; + //! ctor & dtor + error_reply(const std::string& error = ""); + ~error_reply(void) = default; - //! copy ctor & assignment operator - error_reply(const error_reply&) = default; - error_reply& operator=(const error_reply&) = default; + //! copy ctor & assignment operator + error_reply(const error_reply&) = default; + error_reply& operator=(const error_reply&) = default; public: - //! getter - const std::string& str(void) const; + //! getter + const std::string& str(void) const; - //! setter - void str(const std::string& error); + //! setter + void str(const std::string& error); private: - std::string m_error; + std::string m_error; }; } //! replies diff --git a/includes/cpp_redis/replies/integer_reply.hpp b/includes/cpp_redis/replies/integer_reply.hpp index 9b9b41f6..9fcade0a 100644 --- a/includes/cpp_redis/replies/integer_reply.hpp +++ b/includes/cpp_redis/replies/integer_reply.hpp @@ -8,23 +8,23 @@ namespace replies { class integer_reply : public reply { public: - //! ctor & dtor - integer_reply(int nbr = 0); - ~integer_reply(void) = default; + //! ctor & dtor + integer_reply(int nbr = 0); + ~integer_reply(void) = default; - //! copy ctor & assignment operator - integer_reply(const integer_reply&) = default; - integer_reply& operator=(const integer_reply&) = default; + //! copy ctor & assignment operator + integer_reply(const integer_reply&) = default; + integer_reply& operator=(const integer_reply&) = default; public: - //! getter - int val(void) const; + //! getter + int val(void) const; - //! setter - void val(int nbr); + //! setter + void val(int nbr); private: - int m_nbr; + int m_nbr; }; } //! replies diff --git a/includes/cpp_redis/replies/reply.hpp b/includes/cpp_redis/replies/reply.hpp index 9788fe55..f6c5f1ea 100644 --- a/includes/cpp_redis/replies/reply.hpp +++ b/includes/cpp_redis/replies/reply.hpp @@ -12,43 +12,43 @@ class simple_string_reply; class reply { public: - //! type of reply - enum class type { - array, - bulk_string, - error, - integer, - simple_string - }; + //! type of reply + enum class type { + array, + bulk_string, + error, + integer, + simple_string + }; public: - //! ctor & dtor - reply(type reply_type); - virtual ~reply(void) = default; + //! ctor & dtor + reply(type reply_type); + virtual ~reply(void) = default; - //! copy ctor & assignment operator - reply(const reply&) = default; - reply& operator=(const reply&) = default; + //! copy ctor & assignment operator + reply(const reply&) = default; + reply& operator=(const reply&) = default; public: - type get_type(void) const; + type get_type(void) const; public: - bool is_array(void) const; - bool is_bulk_string(void) const; - bool is_error(void) const; - bool is_integer(void) const; - bool is_simple_string(void) const; + bool is_array(void) const; + bool is_bulk_string(void) const; + bool is_error(void) const; + bool is_integer(void) const; + bool is_simple_string(void) const; public: - array_reply& as_array(void); - bulk_string_reply& as_bulk_string(void); - error_reply& as_error(void); - integer_reply& as_integer(void); - simple_string_reply& as_simple_string(void); + array_reply& as_array(void); + bulk_string_reply& as_bulk_string(void); + error_reply& as_error(void); + integer_reply& as_integer(void); + simple_string_reply& as_simple_string(void); private: - type m_type; + type m_type; }; } //! replies diff --git a/includes/cpp_redis/replies/simple_string_reply.hpp b/includes/cpp_redis/replies/simple_string_reply.hpp index 48d761e0..8330754b 100644 --- a/includes/cpp_redis/replies/simple_string_reply.hpp +++ b/includes/cpp_redis/replies/simple_string_reply.hpp @@ -10,23 +10,23 @@ namespace replies { class simple_string_reply : public reply { public: - //! ctor & dtor - simple_string_reply(const std::string& simple_string = ""); - ~simple_string_reply(void) = default; + //! ctor & dtor + simple_string_reply(const std::string& simple_string = ""); + ~simple_string_reply(void) = default; - //! copy ctor & assignment operator - simple_string_reply(const simple_string_reply&) = default; - simple_string_reply& operator=(const simple_string_reply&) = default; + //! copy ctor & assignment operator + simple_string_reply(const simple_string_reply&) = default; + simple_string_reply& operator=(const simple_string_reply&) = default; public: - //! getter - const std::string& str(void) const; + //! getter + const std::string& str(void) const; - //! setter - void str(const std::string& simple_string); + //! setter + void str(const std::string& simple_string); private: - std::string m_str; + std::string m_str; }; } //! replies diff --git a/includes/cpp_redis/reply.hpp b/includes/cpp_redis/reply.hpp index d82285af..a17b3a3d 100644 --- a/includes/cpp_redis/reply.hpp +++ b/includes/cpp_redis/reply.hpp @@ -15,57 +15,57 @@ class simple_string_reply; class reply { public: - //! type of reply - enum class type { - array, - bulk_string, - error, - integer, - simple_string, - null - }; + //! type of reply + enum class type { + array, + bulk_string, + error, + integer, + simple_string, + null + }; public: - //! ctors - reply(void); - explicit reply(const replies::array_reply& array); - explicit reply(const replies::bulk_string_reply& string); - explicit reply(const replies::error_reply& string); - explicit reply(const replies::integer_reply& integer); - explicit reply(const replies::simple_string_reply& string); + //! ctors + reply(void); + explicit reply(const replies::array_reply& array); + explicit reply(const replies::bulk_string_reply& string); + explicit reply(const replies::error_reply& string); + explicit reply(const replies::integer_reply& integer); + explicit reply(const replies::simple_string_reply& string); - //! dtors & copy ctor & assignment operator - ~reply(void) = default; - reply(const reply&) = default; - reply& operator=(const reply&) = default; + //! dtors & copy ctor & assignment operator + ~reply(void) = default; + reply(const reply&) = default; + reply& operator=(const reply&) = default; - //! custom assignment operators - reply& operator=(const replies::array_reply& array); - reply& operator=(const replies::bulk_string_reply& string); - reply& operator=(const replies::error_reply& string); - reply& operator=(const replies::integer_reply& integer); - reply& operator=(const replies::simple_string_reply& string); + //! custom assignment operators + reply& operator=(const replies::array_reply& array); + reply& operator=(const replies::bulk_string_reply& string); + reply& operator=(const replies::error_reply& string); + reply& operator=(const replies::integer_reply& integer); + reply& operator=(const replies::simple_string_reply& string); public: - bool is_array(void) const; - bool is_string(void) const; - bool is_simple_string(void) const; - bool is_bulk_string(void) const; - bool is_error(void) const; - bool is_integer(void) const; - bool is_null(void) const; + bool is_array(void) const; + bool is_string(void) const; + bool is_simple_string(void) const; + bool is_bulk_string(void) const; + bool is_error(void) const; + bool is_integer(void) const; + bool is_null(void) const; - const std::vector& as_array(void) const; - const std::string& as_string(void) const; - int as_integer(void) const; + const std::vector& as_array(void) const; + const std::string& as_string(void) const; + int as_integer(void) const; - type get_type(void) const; + type get_type(void) const; private: - type m_type; - std::vector m_replies; - std::string m_str; - int m_int; + type m_type; + std::vector m_replies; + std::string m_str; + int m_int; }; } //! cpp_redis diff --git a/sources/builders/array_builder.cpp b/sources/builders/array_builder.cpp index a9fd3679..99eff60a 100644 --- a/sources/builders/array_builder.cpp +++ b/sources/builders/array_builder.cpp @@ -12,67 +12,67 @@ array_builder::array_builder(void) bool array_builder::fetch_array_size(std::string& buffer) { - if (m_int_builder.reply_ready()) - return true; + if (m_int_builder.reply_ready()) + return true; - m_int_builder << buffer; - if (not m_int_builder.reply_ready()) - return false; + m_int_builder << buffer; + if (not m_int_builder.reply_ready()) + return false; - int size = m_int_builder.get_integer(); - if (size < 0) - throw redis_error("Invalid array size"); - else if (size == 0) - m_reply_ready = true; + int size = m_int_builder.get_integer(); + if (size < 0) + throw redis_error("Invalid array size"); + else if (size == 0) + m_reply_ready = true; - m_array_size = size; + m_array_size = size; - return true; + return true; } bool array_builder::build_row(std::string& buffer) { - if (not m_current_builder) { - m_current_builder = create_builder(buffer.front()); - buffer.erase(0, 1); - } + if (not m_current_builder) { + m_current_builder = create_builder(buffer.front()); + buffer.erase(0, 1); + } - *m_current_builder << buffer; - if (not m_current_builder->reply_ready()) - return false; + *m_current_builder << buffer; + if (not m_current_builder->reply_ready()) + return false; - m_reply << m_current_builder->get_reply(); - m_current_builder = nullptr; + m_reply << m_current_builder->get_reply(); + m_current_builder = nullptr; - if (m_reply.size() == m_array_size) - m_reply_ready = true; + if (m_reply.size() == m_array_size) + m_reply_ready = true; - return true; + return true; } builder_iface& array_builder::operator<<(std::string& buffer) { - if (m_reply_ready) - return *this; + if (m_reply_ready) + return *this; - if (not fetch_array_size(buffer)) - return *this; + if (not fetch_array_size(buffer)) + return *this; - while (buffer.size() and not m_reply_ready) - if (not build_row(buffer)) - return *this; + while (buffer.size() and not m_reply_ready) + if (not build_row(buffer)) + return *this; - return *this; + return *this; } bool array_builder::reply_ready(void) const { - return m_reply_ready; + return m_reply_ready; } reply array_builder::get_reply(void) const { - return reply{ m_reply }; + return reply{ m_reply }; } } //! builders diff --git a/sources/builders/builders_factory.cpp b/sources/builders/builders_factory.cpp index e32b895b..0503d2eb 100644 --- a/sources/builders/builders_factory.cpp +++ b/sources/builders/builders_factory.cpp @@ -12,20 +12,20 @@ namespace builders { std::unique_ptr create_builder(char id) { - switch (id) { - case '+': - return std::unique_ptr{ new simple_string_builder() }; - case '-': - return std::unique_ptr{ new error_builder() }; - case ':': - return std::unique_ptr{ new integer_builder() }; - case '$': - return std::unique_ptr{ new bulk_string_builder() }; - case '*': - return std::unique_ptr{ new array_builder() }; - default: - throw redis_error("Invalid data"); - } + switch (id) { + case '+': + return std::unique_ptr{ new simple_string_builder() }; + case '-': + return std::unique_ptr{ new error_builder() }; + case ':': + return std::unique_ptr{ new integer_builder() }; + case '$': + return std::unique_ptr{ new bulk_string_builder() }; + case '*': + return std::unique_ptr{ new array_builder() }; + default: + throw redis_error("Invalid data"); + } } } //! builders diff --git a/sources/builders/bulk_string_builder.cpp b/sources/builders/bulk_string_builder.cpp index d4544776..c6f7325c 100644 --- a/sources/builders/bulk_string_builder.cpp +++ b/sources/builders/bulk_string_builder.cpp @@ -13,74 +13,74 @@ bulk_string_builder::bulk_string_builder(void) void bulk_string_builder::build_reply(void) { - m_reply.str(m_str); - m_reply.is_null(m_is_null); - m_reply_ready = true; + m_reply.str(m_str); + m_reply.is_null(m_is_null); + m_reply_ready = true; } bool bulk_string_builder::fetch_size(std::string& buffer) { - if (m_int_builder.reply_ready()) - return true; + if (m_int_builder.reply_ready()) + return true; - m_int_builder << buffer; - if (not m_int_builder.reply_ready()) - return false; + m_int_builder << buffer; + if (not m_int_builder.reply_ready()) + return false; - m_str_size = m_int_builder.get_integer(); - if (m_str_size == -1) { - m_is_null = true; - build_reply(); - } + m_str_size = m_int_builder.get_integer(); + if (m_str_size == -1) { + m_is_null = true; + build_reply(); + } - return true; + return true; } void bulk_string_builder::fetch_str(std::string& buffer) { - if (buffer.size() < static_cast(m_str_size) + 2) // also wait for end sequence - return ; + if (buffer.size() < static_cast(m_str_size) + 2) // also wait for end sequence + return ; - if (buffer[m_str_size] != '\r' or buffer[m_str_size + 1] != '\n') - throw redis_error("Wrong ending sequence"); + if (buffer[m_str_size] != '\r' or buffer[m_str_size + 1] != '\n') + throw redis_error("Wrong ending sequence"); - m_str = buffer.substr(0, m_str_size); - buffer.erase(0, m_str_size + 2); - build_reply(); + m_str = buffer.substr(0, m_str_size); + buffer.erase(0, m_str_size + 2); + build_reply(); } builder_iface& bulk_string_builder::operator<<(std::string& buffer) { - if (m_reply_ready) - return *this; + if (m_reply_ready) + return *this; - //! if we don't have the size, try to get it with the current buffer - if (not fetch_size(buffer) or m_reply_ready) - return *this; + //! if we don't have the size, try to get it with the current buffer + if (not fetch_size(buffer) or m_reply_ready) + return *this; - fetch_str(buffer); + fetch_str(buffer); - return *this; + return *this; } bool bulk_string_builder::reply_ready(void) const { - return m_reply_ready; + return m_reply_ready; } reply bulk_string_builder::get_reply(void) const { - return reply{ m_reply }; + return reply{ m_reply }; } const std::string& bulk_string_builder::get_bulk_string(void) const { - return m_str; + return m_str; } bool bulk_string_builder::is_null(void) const { - return m_is_null; + return m_is_null; } } //! builders diff --git a/sources/builders/error_builder.cpp b/sources/builders/error_builder.cpp index adc7736d..1323f047 100644 --- a/sources/builders/error_builder.cpp +++ b/sources/builders/error_builder.cpp @@ -7,27 +7,27 @@ namespace builders { builder_iface& error_builder::operator<<(std::string& buffer) { - m_string_builder << buffer; + m_string_builder << buffer; - if (m_string_builder.reply_ready()) - m_reply.str(m_string_builder.get_simple_string()); + if (m_string_builder.reply_ready()) + m_reply.str(m_string_builder.get_simple_string()); - return *this; + return *this; } bool error_builder::reply_ready(void) const { - return m_string_builder.reply_ready(); + return m_string_builder.reply_ready(); } reply error_builder::get_reply(void) const { - return reply{ m_reply }; + return reply{ m_reply }; } const std::string& error_builder::get_error(void) const { - return m_string_builder.get_simple_string(); + return m_string_builder.get_simple_string(); } } //! builders diff --git a/sources/builders/integer_builder.cpp b/sources/builders/integer_builder.cpp index c7d3d75e..63b348b8 100644 --- a/sources/builders/integer_builder.cpp +++ b/sources/builders/integer_builder.cpp @@ -12,47 +12,47 @@ integer_builder::integer_builder(void) builder_iface& integer_builder::operator<<(std::string& buffer) { - if (m_reply_ready) - return *this; - - auto end_sequence = buffer.find("\r\n"); - if (end_sequence == std::string::npos) - return *this; - - unsigned int i; - for (i = 0; i < end_sequence; i++) { - //! check for negative numbers - if (not i and m_negative_multiplicator == 1 and buffer[i] == '-') { - m_negative_multiplicator = -1; - continue; - } - else if (not std::isdigit(buffer[i])) - throw redis_error("Invalid character for integer redis reply"); - - m_nbr *= 10; - m_nbr += buffer[i] - '0'; + if (m_reply_ready) + return *this; + + auto end_sequence = buffer.find("\r\n"); + if (end_sequence == std::string::npos) + return *this; + + unsigned int i; + for (i = 0; i < end_sequence; i++) { + //! check for negative numbers + if (not i and m_negative_multiplicator == 1 and buffer[i] == '-') { + m_negative_multiplicator = -1; + continue; } + else if (not std::isdigit(buffer[i])) + throw redis_error("Invalid character for integer redis reply"); - buffer.erase(0, end_sequence + 2); - m_reply.val(m_negative_multiplicator * m_nbr); - m_reply_ready = true; + m_nbr *= 10; + m_nbr += buffer[i] - '0'; + } - return *this; + buffer.erase(0, end_sequence + 2); + m_reply.val(m_negative_multiplicator * m_nbr); + m_reply_ready = true; + + return *this; } bool integer_builder::reply_ready(void) const { - return m_reply_ready; + return m_reply_ready; } reply integer_builder::get_reply(void) const { - return reply{ m_reply }; + return reply{ m_reply }; } int integer_builder::get_integer(void) const { - return m_negative_multiplicator * m_nbr; + return m_negative_multiplicator * m_nbr; } } //! builders diff --git a/sources/builders/reply_builder.cpp b/sources/builders/reply_builder.cpp index 61b97b5a..8a5166a2 100644 --- a/sources/builders/reply_builder.cpp +++ b/sources/builders/reply_builder.cpp @@ -11,59 +11,59 @@ reply_builder::reply_builder(void) reply_builder& reply_builder::operator<<(const std::string& data) { - m_buffer += data; + m_buffer += data; - while (build_reply()); + while (build_reply()); - return *this; + return *this; } bool reply_builder::build_reply(void) { - if (not m_buffer.size()) - return false; + if (not m_buffer.size()) + return false; - if (not m_builder) { - m_builder = create_builder(m_buffer.front()); - m_buffer.erase(0, 1); - } + if (not m_builder) { + m_builder = create_builder(m_buffer.front()); + m_buffer.erase(0, 1); + } - *m_builder << m_buffer; + *m_builder << m_buffer; - if (m_builder->reply_ready()) { - m_available_replies.push_back(m_builder->get_reply()); - m_builder = nullptr; + if (m_builder->reply_ready()) { + m_available_replies.push_back(m_builder->get_reply()); + m_builder = nullptr; - return true; - } + return true; + } - return false; + return false; } void reply_builder::operator>>(reply& reply) { - reply = get_front(); + reply = get_front(); } const reply& reply_builder::get_front(void) const { - if (not reply_available()) - throw redis_error("No available reply"); + if (not reply_available()) + throw redis_error("No available reply"); - return m_available_replies.front(); + return m_available_replies.front(); } void reply_builder::pop_front(void) { - if (not reply_available()) - throw redis_error("No available reply"); + if (not reply_available()) + throw redis_error("No available reply"); - m_available_replies.pop_front(); + m_available_replies.pop_front(); } bool reply_builder::reply_available(void) const { - return m_available_replies.size() > 0; + return m_available_replies.size() > 0; } } //! builders diff --git a/sources/builders/simple_string_builder.cpp b/sources/builders/simple_string_builder.cpp index c68a02f4..66415611 100644 --- a/sources/builders/simple_string_builder.cpp +++ b/sources/builders/simple_string_builder.cpp @@ -11,34 +11,34 @@ simple_string_builder::simple_string_builder(void) builder_iface& simple_string_builder::operator<<(std::string& buffer) { - if (m_reply_ready) - return *this; + if (m_reply_ready) + return *this; - auto end_sequence = buffer.find("\r\n"); - if (end_sequence == std::string::npos) - return *this; + auto end_sequence = buffer.find("\r\n"); + if (end_sequence == std::string::npos) + return *this; - m_str = buffer.substr(0, end_sequence); - m_reply.str(m_str); - buffer.erase(0, end_sequence + 2); - m_reply_ready = true; + m_str = buffer.substr(0, end_sequence); + m_reply.str(m_str); + buffer.erase(0, end_sequence + 2); + m_reply_ready = true; - return *this; + return *this; } bool simple_string_builder::reply_ready(void) const { - return m_reply_ready; + return m_reply_ready; } reply simple_string_builder::get_reply(void) const { - return reply{ m_reply }; + return reply{ m_reply }; } const std::string& simple_string_builder::get_simple_string(void) const { - return m_str; + return m_str; } } //! builders diff --git a/sources/network/io_service.cpp b/sources/network/io_service.cpp index 421b4726..31c482b3 100644 --- a/sources/network/io_service.cpp +++ b/sources/network/io_service.cpp @@ -1,33 +1,215 @@ #include "cpp_redis/network/io_service.hpp" +#include "cpp_redis/redis_error.hpp" + +#include namespace cpp_redis { namespace network { +io_service& +io_service::get_instance(void) { + static io_service instance; + return instance; +} + io_service::io_service(void) -: m_work(m_io_service) {} +: m_should_stop(false) +{ + if (pipe(m_notif_pipe_fds) == -1) + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, pipe() failure"); + + int flags = fcntl(m_notif_pipe_fds[1], F_GETFL, 0); + if (flags == -1 or fcntl(m_notif_pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1) + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, fcntl() failure"); + + m_worker = std::thread(&io_service::listen, this); +} io_service::~io_service(void) { - if (m_io_service_thread.joinable()) { - m_io_service.stop(); - m_io_service_thread.join(); + m_should_stop = true; + notify_select(); + + m_worker.join(); + + close(m_notif_pipe_fds[0]); + close(m_notif_pipe_fds[1]); +} + +int +io_service::init_sets(fd_set* rd_set, fd_set* wr_set) { + int max_fd = m_notif_pipe_fds[0]; + + FD_ZERO(rd_set); + FD_ZERO(wr_set); + FD_SET(m_notif_pipe_fds[0], rd_set); + + std::lock_guard lock(m_fds_mutex); + for (const auto& fd : m_fds) { + if (fd.second.async_read) + FD_SET(fd.first, rd_set); + + if (fd.second.async_write) + FD_SET(fd.first, wr_set); + + if ((fd.second.async_read or fd.second.async_write) and fd.first > max_fd) + max_fd = fd.first; + } + + return max_fd; +} + +void +io_service::read_fd(int fd) { + std::lock_guard lock(m_fds_mutex); + + auto fd_it = m_fds.find(fd); + if (fd_it == m_fds.end()) + return ; + + auto& buffer = *fd_it->second.read_buffer; + int original_buffer_size = buffer.size(); + buffer.resize(original_buffer_size + fd_it->second.read_size); + + int nb_bytes_read = recv(fd_it->first, buffer.data() + original_buffer_size, fd_it->second.read_size, 0); + fd_it->second.async_read = false; + + if (nb_bytes_read <= 0) { + buffer.resize(original_buffer_size); + fd_it->second.disconnection_handler(*this); + m_fds.erase(fd_it); + } + else { + buffer.resize(original_buffer_size + nb_bytes_read); + fd_it->second.read_callback(nb_bytes_read); + } +} + +void +io_service::write_fd(int fd) { + std::lock_guard lock(m_fds_mutex); + + auto fd_it = m_fds.find(fd); + if (fd_it == m_fds.end()) + return ; + + int nb_bytes_written = send(fd_it->first, fd_it->second.write_buffer.data(), fd_it->second.write_size, 0); + fd_it->second.async_write = false; + + if (nb_bytes_written <= 0) { + fd_it->second.disconnection_handler(*this); + m_fds.erase(fd_it); + } + else + fd_it->second.write_callback(nb_bytes_written); +} + +void +io_service::process_sets(fd_set* rd_set, fd_set* wr_set) { + std::vector fds_to_read; + std::vector fds_to_write; + + //! Quickly fetch fds that are readable/writeable + //! This reduce lock time and avoid possible deadlock in callbacks + { + std::lock_guard lock(m_fds_mutex); + + for (const auto& fd : m_fds) { + if (fd.second.async_read and FD_ISSET(fd.first, rd_set)) + fds_to_read.push_back(fd.first); + + if (fd.second.async_write and FD_ISSET(fd.first, wr_set)) + fds_to_write.push_back(fd.first); } + } + + for (int fd : fds_to_read) { read_fd(fd); } + for (int fd : fds_to_write) { write_fd(fd); } + + if (FD_ISSET(m_notif_pipe_fds[0], rd_set)) { + char buf[1024]; + read(m_notif_pipe_fds[0], buf, 1024); + } } void -io_service::run(void) { - if (not m_io_service_thread.joinable()) - m_io_service_thread = std::thread([this]() { m_io_service.run(); }); +io_service::listen(void) { + fd_set rd_set; + fd_set wr_set; + + while (not m_should_stop) { + int max_fd = init_sets(&rd_set, &wr_set); + + if (select(max_fd + 1, &rd_set, &wr_set, nullptr, nullptr) > 0) + process_sets(&rd_set, &wr_set); + } } void -io_service::post(const std::function& fct) { - m_io_service.post(fct); +io_service::track(int fd, const disconnection_handler_t& handler) { + std::lock_guard lock(m_fds_mutex); + + auto& info = m_fds[fd]; + info.async_read = false; + info.async_write = false; + info.disconnection_handler = handler; + + notify_select(); } -boost::asio::io_service& -io_service::get(void) { - return m_io_service; +void +io_service::untrack(int fd) { + std::lock_guard lock(m_fds_mutex); + m_fds.erase(fd); +} + +bool +io_service::async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { + std::lock_guard lock(m_fds_mutex); + + auto reg_fd_it = m_fds.find(fd); + if (reg_fd_it == m_fds.end()) + return false; + + auto& reg_fd = reg_fd_it->second; + bool expected = false; + if (not reg_fd.async_read.compare_exchange_strong(expected, true)) + return false; + + reg_fd.read_buffer = &buffer; + reg_fd.read_size = read_size; + reg_fd.read_callback = callback; + + notify_select(); + + return true; +} + +bool +io_service::async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { + std::lock_guard lock(m_fds_mutex); + + auto reg_fd_it = m_fds.find(fd); + if (reg_fd_it == m_fds.end()) + return false; + + auto& reg_fd = reg_fd_it->second; + bool expected = false; + if (not reg_fd.async_write.compare_exchange_strong(expected, true)) + return false; + + reg_fd.write_buffer = buffer; + reg_fd.write_size = write_size; + reg_fd.write_callback = callback; + + notify_select(); + + return true; +} + +void +io_service::notify_select(void) { + write(m_notif_pipe_fds[1], "a", 1); } } //! network diff --git a/sources/network/redis_connection.cpp b/sources/network/redis_connection.cpp index 21208a9f..361a54ee 100644 --- a/sources/network/redis_connection.cpp +++ b/sources/network/redis_connection.cpp @@ -8,90 +8,90 @@ redis_connection::redis_connection(void) : m_reply_callback(nullptr) , m_disconnection_handler(nullptr) { - auto disconnection_handler = std::bind(&redis_connection::tcp_client_disconnection_handler, this, std::placeholders::_1); - m_client.set_disconnection_handler(disconnection_handler); + auto disconnection_handler = std::bind(&redis_connection::tcp_client_disconnection_handler, this, std::placeholders::_1); + m_client.set_disconnection_handler(disconnection_handler); - auto receive_handler = std::bind(&redis_connection::tcp_client_receive_handler, this, std::placeholders::_1, std::placeholders::_2); - m_client.set_receive_handler(receive_handler); + auto receive_handler = std::bind(&redis_connection::tcp_client_receive_handler, this, std::placeholders::_1, std::placeholders::_2); + m_client.set_receive_handler(receive_handler); } redis_connection::~redis_connection(void) { - if (is_connected()) - disconnect(); + if (is_connected()) + disconnect(); } void redis_connection::connect(const std::string& host, unsigned int port) { - m_client.connect(host, port); + m_client.connect(host, port); } void redis_connection::disconnect(void) { - m_client.disconnect(); + m_client.disconnect(); } bool redis_connection::is_connected(void) { - return m_client.is_connected(); + return m_client.is_connected(); } std::string redis_connection::build_command(const std::vector& redis_cmd) { - std::string cmd = "*" + std::to_string(redis_cmd.size()) + "\r\n"; + std::string cmd = "*" + std::to_string(redis_cmd.size()) + "\r\n"; - for (const auto& cmd_part : redis_cmd) - cmd += "$" + std::to_string(cmd_part.length()) + "\r\n" + cmd_part + "\r\n"; + for (const auto& cmd_part : redis_cmd) + cmd += "$" + std::to_string(cmd_part.length()) + "\r\n" + cmd_part + "\r\n"; - return cmd; + return cmd; } void redis_connection::send(const std::vector& redis_cmd) { - m_client.send(build_command(redis_cmd)); + m_client.send(build_command(redis_cmd)); } void -redis_connection::set_disconnection_handler(const disconnection_handler& handler) { - std::lock_guard lock(m_disconnection_handler_mutex); +redis_connection::set_disconnection_handler(const disconnection_handler_t& handler) { + std::lock_guard lock(m_disconnection_handler_mutex); - m_disconnection_handler = handler; + m_disconnection_handler = handler; } void -redis_connection::set_reply_callback(const reply_callback& handler) { - std::lock_guard lock(m_reply_callback_mutex); +redis_connection::set_reply_callback(const reply_callback_t& handler) { + std::lock_guard lock(m_reply_callback_mutex); - m_reply_callback = handler; + m_reply_callback = handler; } bool redis_connection::tcp_client_receive_handler(network::tcp_client&, const std::vector& buffer) { - try { - m_builder << std::string(buffer.begin(), buffer.end()); - } - catch (const redis_error& e) { - return false; - } - - while (m_builder.reply_available()) { - std::lock_guard lock(m_reply_callback_mutex); + try { + m_builder << std::string(buffer.begin(), buffer.end()); + } + catch (const redis_error& e) { + return false; + } + + while (m_builder.reply_available()) { + std::lock_guard lock(m_reply_callback_mutex); - auto reply = m_builder.get_front(); - m_builder.pop_front(); + auto reply = m_builder.get_front(); + m_builder.pop_front(); - if (m_reply_callback) - m_reply_callback(*this, reply); - } + if (m_reply_callback) + m_reply_callback(*this, reply); + } - return true; + return true; } void redis_connection::tcp_client_disconnection_handler(network::tcp_client&) { - std::lock_guard lock(m_disconnection_handler_mutex); + std::lock_guard lock(m_disconnection_handler_mutex); - if (m_disconnection_handler) - m_disconnection_handler(*this); + if (m_disconnection_handler) + m_disconnection_handler(*this); } } //! network diff --git a/sources/network/tcp_client.cpp b/sources/network/tcp_client.cpp index f5f4d762..ed5dd6bf 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/tcp_client.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include "cpp_redis/network/tcp_client.hpp" @@ -6,172 +8,158 @@ namespace cpp_redis { namespace network { -io_service tcp_client::m_io_service; - +//! note that we call io_service::get_instance in the init list +//! +//! this will force force io_service instance creation +//! this is a workaround to handle static object destructions order +//! +//! that way, any object containing a tcp_client has an attribute (or through its attributes) +//! is guaranteed to be destructed before the io_service is destructed, even if it is global tcp_client::tcp_client(void) -: m_socket(m_io_service.get()) +: m_io_service(io_service::get_instance()) +, m_fd(-1) , m_is_connected(false) -, m_read_buffer(READ_SIZE) {} +, m_receive_handler(nullptr) +, m_disconnection_handler(nullptr) +{} tcp_client::~tcp_client(void) { - if (m_is_connected) - disconnect(); + if (m_is_connected) + disconnect(); } void tcp_client::connect(const std::string& host, unsigned int port) { - if (m_is_connected) - return ; - - std::condition_variable conn_cond_var; - - //! resolve host name - boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); - - //! async connect - std::atomic_bool is_notified(false); - m_socket.async_connect(endpoint, [&](boost::system::error_code error) { - if (not error) { - m_is_connected = true; - async_read(); - } - - is_notified = true; - conn_cond_var.notify_one(); - }); - - //! start loop and wait for async connect result - std::mutex conn_mutex; - std::unique_lock lock(conn_mutex); - m_io_service.run(); - - if (not is_notified) - conn_cond_var.wait(lock); - - if (not m_is_connected) - throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); + if (m_is_connected) + return ; + + //! create the socket + m_fd = socket(AF_INET, SOCK_STREAM, 0); + if (m_fd < 0) + throw redis_error("Can't open a socket"); + + //! get the server's DNS entry + struct hostent *server = gethostbyname(host.c_str()); + if (not server) + throw redis_error("No such host: " + host); + + //! build the server's Internet address + struct sockaddr_in server_addr; + std::memset(&server_addr, 0, sizeof(server_addr)); + std::memcpy(server->h_addr, &server_addr.sin_addr.s_addr, server->h_length); + server_addr.sin_port = htons(port); + server_addr.sin_family = AF_INET; + + //! create a connection with the server + if (::connect(m_fd, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) + throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); + + //! add fd to the io_service and set the disconnection_handler + m_io_service.track(m_fd, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); + m_is_connected = true; + + //! start async read + async_read(); } void tcp_client::disconnect(void) { - if (not m_is_connected) - return ; - - m_is_connected = false; - - std::mutex close_socket_mutex; - std::condition_variable close_socket_cond_var; - std::unique_lock lock(close_socket_mutex); - - std::atomic_bool is_notified(false); - m_io_service.post([this, &close_socket_cond_var, &is_notified]() { - m_socket.close(); - - is_notified = true; - close_socket_cond_var.notify_one(); - }); - - if (not is_notified) - close_socket_cond_var.wait(lock); -} + if (not m_is_connected) + return ; -void -tcp_client::async_read(void) { - boost::asio::async_read(m_socket, boost::asio::buffer(m_read_buffer.data(), READ_SIZE), - [](const boost::system::error_code& error, std::size_t bytes) -> std::size_t { - //! break if bytes have been received, continue otherwise - return error or bytes ? 0 : READ_SIZE; - }, - [=](boost::system::error_code error, std::size_t length) { - if (error) { - process_disconnection(); - return ; - } - - std::lock_guard lock(m_receive_handler_mutex); - if (m_receive_handler) - if (not m_receive_handler(*this, { m_read_buffer.begin(), m_read_buffer.begin() + length })) { - process_disconnection(); - return ; - } - - //! keep waiting for incoming bytes - async_read(); - }); + m_is_connected = false; + m_io_service.untrack(m_fd); + close(m_fd); } void tcp_client::send(const std::string& buffer) { - send(std::vector{ buffer.begin(), buffer.end() }); + send(std::vector{ buffer.begin(), buffer.end() }); } void tcp_client::send(const std::vector& buffer) { - if (not m_is_connected) - throw redis_error("Not connected"); + if (not m_is_connected) + throw redis_error("Not connected"); - if (not buffer.size()) - return ; + if (not buffer.size()) + return ; - std::lock_guard lock(m_write_buffer_mutex); + std::lock_guard lock(m_write_buffer_mutex); - bool bytes_in_buffer = m_write_buffer.size() > 0; + bool bytes_in_buffer = m_write_buffer.size() > 0; - //! concat buffer - m_write_buffer.insert(m_write_buffer.end(), buffer.begin(), buffer.end()); + //! concat buffer + m_write_buffer.insert(m_write_buffer.end(), buffer.begin(), buffer.end()); - //! if there were already bytes in buffer, simply return - //! async_write callback will process the new buffer - if (bytes_in_buffer) - return; + //! if there were already bytes in buffer, simply return + //! async_write callback will process the new buffer + if (bytes_in_buffer) + return ; - async_write(); + async_write(); } void -tcp_client::async_write(void) { - boost::asio::async_write(m_socket, boost::asio::buffer(m_write_buffer.data(), m_write_buffer.size()), - [this](boost::system::error_code error, std::size_t length) { - if (error) { - process_disconnection(); - return ; - } - - std::lock_guard lock(m_write_buffer_mutex); - m_write_buffer.erase(m_write_buffer.begin(), m_write_buffer.begin() + length); - - if (m_write_buffer.size()) - async_write(); - }); +tcp_client::async_read(void) { + m_io_service.async_read(m_fd, m_read_buffer, READ_SIZE, + [&](std::size_t length) { + std::lock_guard lock(m_receive_handler_mutex); + + if (m_receive_handler) + if (not m_receive_handler(*this, { m_read_buffer.begin(), m_read_buffer.begin() + length })) { + disconnect(); + return ; + } + + //! clear read buffer keep waiting for incoming bytes + m_read_buffer.clear(); + + if (m_is_connected) + async_read(); + }); } void -tcp_client::process_disconnection(void) { - m_is_connected = false; - m_socket.close(); +tcp_client::async_write(void) { + m_io_service.async_write(m_fd, m_write_buffer, m_write_buffer.size(), + [&](std::size_t length) { + std::lock_guard lock(m_write_buffer_mutex); - std::lock_guard lock(m_disconnection_handler_mutex); - if (m_disconnection_handler) - m_disconnection_handler(*this); + m_write_buffer.erase(m_write_buffer.begin(), m_write_buffer.begin() + length); + + if (m_is_connected and m_write_buffer.size()) + async_write(); + }); } void -tcp_client::set_receive_handler(const receive_handler& handler) { - std::lock_guard lock(m_receive_handler_mutex); +tcp_client::set_receive_handler(const receive_handler_t& handler) { + std::lock_guard lock(m_receive_handler_mutex); - m_receive_handler = handler; + m_receive_handler = handler; } void -tcp_client::set_disconnection_handler(const disconnection_handler& handler) { - std::lock_guard lock(m_disconnection_handler_mutex); +tcp_client::set_disconnection_handler(const disconnection_handler_t& handler) { + std::lock_guard lock(m_disconnection_handler_mutex); - m_disconnection_handler = handler; + m_disconnection_handler = handler; } bool tcp_client::is_connected(void) { - return m_is_connected; + return m_is_connected; +} + +void +tcp_client::io_service_disconnection_handler(network::io_service&) { + m_is_connected = false; + close(m_fd); + + std::lock_guard lock(m_disconnection_handler_mutex); + if (m_disconnection_handler) + m_disconnection_handler(*this); } } //! network diff --git a/sources/redis_client.cpp b/sources/redis_client.cpp index b56715d0..35e77825 100644 --- a/sources/redis_client.cpp +++ b/sources/redis_client.cpp @@ -4,81 +4,85 @@ namespace cpp_redis { redis_client::redis_client(void) { - auto disconnection_handler = std::bind(&redis_client::connection_disconnection_handler, this, std::placeholders::_1); - m_client.set_disconnection_handler(disconnection_handler); + auto disconnection_handler = std::bind(&redis_client::connection_disconnection_handler, this, std::placeholders::_1); + m_client.set_disconnection_handler(disconnection_handler); - auto receive_handler = std::bind(&redis_client::connection_receive_handler, this, std::placeholders::_1, std::placeholders::_2); - m_client.set_reply_callback(receive_handler); + auto receive_handler = std::bind(&redis_client::connection_receive_handler, this, std::placeholders::_1, std::placeholders::_2); + m_client.set_reply_callback(receive_handler); } redis_client::~redis_client(void) { - if (is_connected()) - disconnect(); + if (not is_connected()) + return ; + + disconnect(); + m_client.set_disconnection_handler(nullptr); + m_client.set_reply_callback(nullptr); } void redis_client::connect(const std::string& host, unsigned int port) { - m_client.connect(host, port); + m_client.connect(host, port); } void redis_client::disconnect(void) { - m_client.disconnect(); + m_client.disconnect(); } bool redis_client::is_connected(void) { - return m_client.is_connected(); + return m_client.is_connected(); } void -redis_client::send(const std::vector& redis_cmd, const reply_callback& callback) { - std::lock_guard lock(m_callbacks_mutex); +redis_client::send(const std::vector& redis_cmd, const reply_callback_t& callback) { + std::lock_guard lock(m_callbacks_mutex); - m_client.send(redis_cmd); - m_callbacks.push(callback); + m_client.send(redis_cmd); + m_callbacks.push(callback); } void -redis_client::set_disconnection_handler(const disconnection_handler& handler) { - std::lock_guard lock(m_disconnection_handler_mutex); +redis_client::set_disconnection_handler(const disconnection_handler_t& handler) { + std::lock_guard lock(m_disconnection_handler_mutex); - m_disconnection_handler = handler; + m_disconnection_handler = handler; } void redis_client::connection_receive_handler(network::redis_connection&, reply& reply) { - std::lock_guard lock(m_callbacks_mutex); + std::lock_guard lock(m_callbacks_mutex); - if (not m_callbacks.size()) - return ; + if (not m_callbacks.size()) + return ; - if (m_callbacks.front()) - m_callbacks.front()(reply); + if (m_callbacks.front()) + m_callbacks.front()(reply); - m_callbacks.pop(); + m_callbacks.pop(); } void redis_client::clear_callbacks(void) { - std::lock_guard lock(m_callbacks_mutex); + std::lock_guard lock(m_callbacks_mutex); - std::queue empty; - std::swap(m_callbacks, empty); + std::queue empty; + std::swap(m_callbacks, empty); } void redis_client::call_disconnection_handler(void) { - std::lock_guard lock(m_disconnection_handler_mutex); + std::lock_guard lock(m_disconnection_handler_mutex); - if (m_disconnection_handler) - m_disconnection_handler(*this); + if (m_disconnection_handler) + m_disconnection_handler(*this); } void redis_client::connection_disconnection_handler(network::redis_connection&) { - clear_callbacks(); - call_disconnection_handler(); + clear_callbacks(); + call_disconnection_handler(); } } //! cpp_redis diff --git a/sources/redis_subscriber.cpp b/sources/redis_subscriber.cpp index 5e4f97e9..1a31afc1 100644 --- a/sources/redis_subscriber.cpp +++ b/sources/redis_subscriber.cpp @@ -6,152 +6,161 @@ namespace cpp_redis { redis_subscriber::redis_subscriber(void) : m_disconnection_handler(nullptr) { - auto disconnection_handler = std::bind(&redis_subscriber::connection_disconnection_handler, this, std::placeholders::_1); - m_client.set_disconnection_handler(disconnection_handler); + auto disconnection_handler = std::bind(&redis_subscriber::connection_disconnection_handler, this, std::placeholders::_1); + m_client.set_disconnection_handler(disconnection_handler); - auto receive_handler = std::bind(&redis_subscriber::connection_receive_handler, this, std::placeholders::_1, std::placeholders::_2); - m_client.set_reply_callback(receive_handler); + auto receive_handler = std::bind(&redis_subscriber::connection_receive_handler, this, std::placeholders::_1, std::placeholders::_2); + m_client.set_reply_callback(receive_handler); +} + +redis_subscriber::~redis_subscriber(void) { + if (not is_connected()) + return ; + + disconnect(); + m_client.set_disconnection_handler(nullptr); + m_client.set_reply_callback(nullptr); } void redis_subscriber::connect(const std::string& host, unsigned int port) { - m_client.connect(host, port); + m_client.connect(host, port); } void redis_subscriber::disconnect(void) { - m_client.disconnect(); + m_client.disconnect(); } bool redis_subscriber::is_connected(void) { - return m_client.is_connected(); + return m_client.is_connected(); } void -redis_subscriber::set_disconnection_handler(const disconnection_handler& handler) { - std::lock_guard lock(m_disconnection_handler_mutex); +redis_subscriber::set_disconnection_handler(const disconnection_handler_t& handler) { + std::lock_guard lock(m_disconnection_handler_mutex); - m_disconnection_handler = handler; + m_disconnection_handler = handler; } void -redis_subscriber::subscribe(const std::string& channel, const subscribe_callback& callback) { - std::lock_guard lock(m_subscribed_channels_mutex); +redis_subscriber::subscribe(const std::string& channel, const subscribe_callback_t& callback) { + std::lock_guard lock(m_subscribed_channels_mutex); - m_subscribed_channels[channel] = callback; - m_client.send({ "SUBSCRIBE", channel }); + m_subscribed_channels[channel] = callback; + m_client.send({ "SUBSCRIBE", channel }); } void -redis_subscriber::psubscribe(const std::string& pattern, const subscribe_callback& callback) { - std::lock_guard lock(m_psubscribed_channels_mutex); +redis_subscriber::psubscribe(const std::string& pattern, const subscribe_callback_t& callback) { + std::lock_guard lock(m_psubscribed_channels_mutex); - m_psubscribed_channels[pattern] = callback; - m_client.send({ "PSUBSCRIBE", pattern }); + m_psubscribed_channels[pattern] = callback; + m_client.send({ "PSUBSCRIBE", pattern }); } void redis_subscriber::unsubscribe(const std::string& channel) { - std::lock_guard lock(m_subscribed_channels_mutex); + std::lock_guard lock(m_subscribed_channels_mutex); - auto it = m_subscribed_channels.find(channel); - if (it == m_subscribed_channels.end()) - return ; + auto it = m_subscribed_channels.find(channel); + if (it == m_subscribed_channels.end()) + return ; - m_client.send({ "UNSUBSCRIBE", channel }); - m_subscribed_channels.erase(it); + m_client.send({ "UNSUBSCRIBE", channel }); + m_subscribed_channels.erase(it); } void redis_subscriber::punsubscribe(const std::string& pattern) { - std::lock_guard lock(m_psubscribed_channels_mutex); + std::lock_guard lock(m_psubscribed_channels_mutex); - auto it = m_psubscribed_channels.find(pattern); - if (it == m_psubscribed_channels.end()) - return ; + auto it = m_psubscribed_channels.find(pattern); + if (it == m_psubscribed_channels.end()) + return ; - m_client.send({ "PUNSUBSCRIBE", pattern }); - m_psubscribed_channels.erase(it); + m_client.send({ "PUNSUBSCRIBE", pattern }); + m_psubscribed_channels.erase(it); } void redis_subscriber::handle_subscribe_reply(const std::vector& reply) { - if (reply.size() != 3) - return ; + if (reply.size() != 3) + return ; - const auto& title = reply[0]; - const auto& channel = reply[1]; - const auto& message = reply[2]; + const auto& title = reply[0]; + const auto& channel = reply[1]; + const auto& message = reply[2]; - if (not title.is_string() - or not channel.is_string() - or not message.is_string()) - return ; + if (not title.is_string() + or not channel.is_string() + or not message.is_string()) + return ; - if (title.as_string() != "message") - return ; + if (title.as_string() != "message") + return ; - std::lock_guard lock(m_subscribed_channels_mutex); + std::lock_guard lock(m_subscribed_channels_mutex); - auto it = m_subscribed_channels.find(channel.as_string()); - if (it == m_subscribed_channels.end()) - return ; + auto it = m_subscribed_channels.find(channel.as_string()); + if (it == m_subscribed_channels.end()) + return ; - it->second(channel.as_string(), message.as_string()); + it->second(channel.as_string(), message.as_string()); } void redis_subscriber::handle_psubscribe_reply(const std::vector& reply) { - if (reply.size() != 4) - return ; + if (reply.size() != 4) + return ; - const auto& title = reply[0]; - const auto& pchannel = reply[1]; - const auto& channel = reply[2]; - const auto& message = reply[3]; + const auto& title = reply[0]; + const auto& pchannel = reply[1]; + const auto& channel = reply[2]; + const auto& message = reply[3]; - if (not title.is_string() - or not pchannel.is_string() - or not channel.is_string() - or not message.is_string()) - return ; + if (not title.is_string() + or not pchannel.is_string() + or not channel.is_string() + or not message.is_string()) + return ; - if (title.as_string() != "pmessage") - return ; + if (title.as_string() != "pmessage") + return ; - std::lock_guard lock(m_psubscribed_channels_mutex); + std::lock_guard lock(m_psubscribed_channels_mutex); - auto it = m_psubscribed_channels.find(pchannel.as_string()); - if (it == m_psubscribed_channels.end()) - return ; + auto it = m_psubscribed_channels.find(pchannel.as_string()); + if (it == m_psubscribed_channels.end()) + return ; - it->second(channel.as_string(), message.as_string()); + it->second(channel.as_string(), message.as_string()); } void redis_subscriber::connection_receive_handler(network::redis_connection&, reply& reply) { - //! alaway return an array - if (not reply.is_array()) - return ; - - auto& array = reply.as_array(); - - //! Array size of 3 -> SUBSCRIBE - //! Array size of 4 -> PSUBSCRIBE - //! Otherwise -> unexepcted reply - if (array.size() == 3) - handle_subscribe_reply(array); - else if (array.size() == 4) - handle_psubscribe_reply(array); + //! alaway return an array + if (not reply.is_array()) + return ; + + auto& array = reply.as_array(); + + //! Array size of 3 -> SUBSCRIBE + //! Array size of 4 -> PSUBSCRIBE + //! Otherwise -> unexepcted reply + if (array.size() == 3) + handle_subscribe_reply(array); + else if (array.size() == 4) + handle_psubscribe_reply(array); } void redis_subscriber::connection_disconnection_handler(network::redis_connection&) { - std::lock_guard lock(m_disconnection_handler_mutex); + std::lock_guard lock(m_disconnection_handler_mutex); - if (m_disconnection_handler) - m_disconnection_handler(*this); + if (m_disconnection_handler) + m_disconnection_handler(*this); } } //! cpp_redis diff --git a/sources/replies/array_reply.cpp b/sources/replies/array_reply.cpp index 61ffba0e..cf678d86 100644 --- a/sources/replies/array_reply.cpp +++ b/sources/replies/array_reply.cpp @@ -12,40 +12,40 @@ array_reply::array_reply(const std::vector& rows) unsigned int array_reply::size(void) const { - return m_rows.size(); + return m_rows.size(); } const std::vector& array_reply::get_rows(void) const { - return m_rows; + return m_rows; } const cpp_redis::reply& array_reply::get(unsigned int idx) const { - if (idx > size()) - throw redis_error("Index out of range"); + if (idx > size()) + throw redis_error("Index out of range"); - return *std::next(m_rows.begin(), idx); + return *std::next(m_rows.begin(), idx); } const cpp_redis::reply& array_reply::operator[](unsigned int idx) const { - return get(idx); + return get(idx); } void array_reply::set_rows(const std::vector& rows) { - m_rows = rows; + m_rows = rows; } void array_reply::add_row(const cpp_redis::reply& row) { - m_rows.push_back(row); + m_rows.push_back(row); } void array_reply::operator<<(const cpp_redis::reply& row) { - add_row(row); + add_row(row); } } //! replies diff --git a/sources/replies/bulk_string_reply.cpp b/sources/replies/bulk_string_reply.cpp index 88092661..df145cc1 100644 --- a/sources/replies/bulk_string_reply.cpp +++ b/sources/replies/bulk_string_reply.cpp @@ -11,22 +11,22 @@ bulk_string_reply::bulk_string_reply(bool is_null, const std::string& bulk_strin bool bulk_string_reply::is_null(void) const { - return m_is_null; + return m_is_null; } const std::string& bulk_string_reply::str(void) const { - return m_str; + return m_str; } void bulk_string_reply::is_null(bool is_null) { - m_is_null = is_null; + m_is_null = is_null; } void bulk_string_reply::str(const std::string& bulk_string) { - m_str = bulk_string; + m_str = bulk_string; } } //! replies diff --git a/sources/replies/error_reply.cpp b/sources/replies/error_reply.cpp index 41f62a18..5b227cbf 100644 --- a/sources/replies/error_reply.cpp +++ b/sources/replies/error_reply.cpp @@ -10,12 +10,12 @@ error_reply::error_reply(const std::string& error) const std::string& error_reply::str(void) const { - return m_error; + return m_error; } void error_reply::str(const std::string& error) { - m_error = error; + m_error = error; } } //! replies diff --git a/sources/replies/integer_reply.cpp b/sources/replies/integer_reply.cpp index 1317ffb1..59ba64b7 100644 --- a/sources/replies/integer_reply.cpp +++ b/sources/replies/integer_reply.cpp @@ -10,12 +10,12 @@ integer_reply::integer_reply(int nbr) int integer_reply::val(void) const { - return m_nbr; + return m_nbr; } void integer_reply::val(int nbr) { - m_nbr = nbr; + m_nbr = nbr; } } //! replies diff --git a/sources/replies/reply.cpp b/sources/replies/reply.cpp index d4dc29fe..b6d159f5 100644 --- a/sources/replies/reply.cpp +++ b/sources/replies/reply.cpp @@ -15,72 +15,72 @@ reply::reply(type reply_type) reply::type reply::get_type(void) const { - return m_type; + return m_type; } bool reply::is_array(void) const { - return m_type == type::array; + return m_type == type::array; } bool reply::is_bulk_string(void) const { - return m_type == type::bulk_string; + return m_type == type::bulk_string; } bool reply::is_error(void) const { - return m_type == type::error; + return m_type == type::error; } bool reply::is_integer(void) const { - return m_type == type::integer; + return m_type == type::integer; } bool reply::is_simple_string(void) const { - return m_type == type::simple_string; + return m_type == type::simple_string; } array_reply& reply::as_array(void) { - if (not is_array()) - throw redis_error("Reply is not an array"); + if (not is_array()) + throw redis_error("Reply is not an array"); - return *dynamic_cast(this); + return *dynamic_cast(this); } bulk_string_reply& reply::as_bulk_string(void) { - if (not is_bulk_string()) - throw redis_error("Reply is not a bulk string"); + if (not is_bulk_string()) + throw redis_error("Reply is not a bulk string"); - return *dynamic_cast(this); + return *dynamic_cast(this); } error_reply& reply::as_error(void) { - if (not is_error()) - throw redis_error("Reply is not an error"); + if (not is_error()) + throw redis_error("Reply is not an error"); - return *dynamic_cast(this); + return *dynamic_cast(this); } integer_reply& reply::as_integer(void) { - if (not is_integer()) - throw redis_error("Reply is not an integer"); + if (not is_integer()) + throw redis_error("Reply is not an integer"); - return *dynamic_cast(this); + return *dynamic_cast(this); } simple_string_reply& reply::as_simple_string(void) { - if (not is_simple_string()) - throw redis_error("Reply is not a simple string"); + if (not is_simple_string()) + throw redis_error("Reply is not a simple string"); - return *dynamic_cast(this); + return *dynamic_cast(this); } } //! replies diff --git a/sources/replies/simple_string_reply.cpp b/sources/replies/simple_string_reply.cpp index 40775393..26724b68 100644 --- a/sources/replies/simple_string_reply.cpp +++ b/sources/replies/simple_string_reply.cpp @@ -10,12 +10,12 @@ simple_string_reply::simple_string_reply(const std::string& simple_string) const std::string& simple_string_reply::str(void) const { - return m_str; + return m_str; } void simple_string_reply::str(const std::string& simple_string) { - m_str = simple_string; + m_str = simple_string; } } //! replies diff --git a/sources/reply.cpp b/sources/reply.cpp index 32902d52..c7d7872e 100644 --- a/sources/reply.cpp +++ b/sources/reply.cpp @@ -32,92 +32,92 @@ reply::reply(const replies::simple_string_reply& string) reply& reply::operator=(const replies::array_reply& array) { - m_type = type::array; - m_replies = array.get_rows(); - return *this; + m_type = type::array; + m_replies = array.get_rows(); + return *this; } reply& reply::operator=(const replies::bulk_string_reply& string) { - m_type = string.is_null() ? type::null : type::bulk_string; - m_str = string.str(); - return *this; + m_type = string.is_null() ? type::null : type::bulk_string; + m_str = string.str(); + return *this; } reply& reply::operator=(const replies::error_reply& string) { - m_type = type::error; - m_str = string.str(); - return *this; + m_type = type::error; + m_str = string.str(); + return *this; } reply& reply::operator=(const replies::integer_reply& integer) { - m_type = type::integer; - m_str = integer.val(); - return *this; + m_type = type::integer; + m_str = integer.val(); + return *this; } reply& reply::operator=(const replies::simple_string_reply& string) { - m_type = type::simple_string; - m_str = string.str(); - return *this; + m_type = type::simple_string; + m_str = string.str(); + return *this; } bool reply::is_array(void) const { - return m_type == type::array; + return m_type == type::array; } bool reply::is_string(void) const { - return is_simple_string() or is_bulk_string() or is_error(); + return is_simple_string() or is_bulk_string() or is_error(); } bool reply::is_simple_string(void) const { - return m_type == type::simple_string; + return m_type == type::simple_string; } bool reply::is_bulk_string(void) const { - return m_type == type::bulk_string; + return m_type == type::bulk_string; } bool reply::is_error(void) const { - return m_type == type::error; + return m_type == type::error; } bool reply::is_integer(void) const { - return m_type == type::integer; + return m_type == type::integer; } bool reply::is_null(void) const { - return m_type == type::null; + return m_type == type::null; } const std::vector& reply::as_array(void) const { - return m_replies; + return m_replies; } const std::string& reply::as_string(void) const { - return m_str; + return m_str; } int reply::as_integer(void) const { - return m_int; + return m_int; } reply::type reply::get_type(void) const { - return m_type; + return m_type; } } //! cpp_redis