From 10d6530688516badcd33d4aeaa61abebc815a088 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Tue, 4 Apr 2017 22:49:59 -0700 Subject: [PATCH 1/3] switch from poll to select (unix) --- includes/tacopie/network/io_service.hpp | 14 ++++---- sources/network/unix/io_service.cpp | 48 +++++++++++++------------ 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/includes/tacopie/network/io_service.hpp b/includes/tacopie/network/io_service.hpp index 810bb10..c2f8a01 100644 --- a/includes/tacopie/network/io_service.hpp +++ b/includes/tacopie/network/io_service.hpp @@ -34,7 +34,7 @@ #ifdef _WIN32 #include #else -#include +#include #endif /* _WIN32 */ #include @@ -98,12 +98,12 @@ class io_service { void poll(void); //! init m_poll_fds_info - void init_poll_fds_info(void); + int init_poll_fds_info(void); //! process poll detected events void process_events(void); - void process_rd_event(const struct pollfd& poll_result, tracked_socket& socket); - void process_wr_event(const struct pollfd& poll_result, tracked_socket& socket); + void process_rd_event(const fd_t& fd, tracked_socket& socket); + void process_wr_event(const fd_t& fd, tracked_socket& socket); private: //! tracked sockets @@ -121,8 +121,10 @@ class io_service { //! thread safety std::mutex m_tracked_sockets_mtx; - //! data structure given to poll - std::vector m_poll_fds_info; + //! data structure given to select + std::vector m_polled_fds; + fd_set m_rd_set; + fd_set m_wr_set; //! condition variable to wait on removal std::condition_variable m_wait_for_removal_condvar; diff --git a/sources/network/unix/io_service.cpp b/sources/network/unix/io_service.cpp index 7987fa9..0799e4b 100644 --- a/sources/network/unix/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -85,10 +85,10 @@ io_service::poll(void) { __TACOPIE_LOG(debug, "starting poll() worker"); while (!m_should_stop) { - init_poll_fds_info(); + int ndfs = init_poll_fds_info(); __TACOPIE_LOG(debug, "polling fds"); - if (::poll(const_cast(m_poll_fds_info.data()), m_poll_fds_info.size(), -1) > 0) { process_events(); } + if (::select(ndfs, &m_rd_set, &m_wr_set, NULL, NULL) > 0) { process_events(); } else { __TACOPIE_LOG(debug, "poll woke up, but nothing to process"); } @@ -107,20 +107,19 @@ io_service::process_events(void) { __TACOPIE_LOG(debug, "processing events"); - for (const auto& poll_result : m_poll_fds_info) { - if (poll_result.fd == m_notifier.get_read_fd() && poll_result.revents & POLLIN) { + for (const auto& fd : m_polled_fds) { + if (fd == m_notifier.get_read_fd() && FD_ISSET(fd, &m_rd_set)) { m_notifier.clr_buffer(); continue; } - auto it = m_tracked_sockets.find(poll_result.fd); - + auto it = m_tracked_sockets.find(fd); if (it == m_tracked_sockets.end()) { continue; } auto& socket = it->second; - if (poll_result.revents & (POLLIN | POLLHUP) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(poll_result, socket); } - if (poll_result.revents & POLLOUT && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(poll_result, socket); } + if (FD_ISSET(fd, &m_rd_set) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(fd, socket); } + if (FD_ISSET(fd, &m_wr_set) && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(fd, socket); } if (socket.marked_for_untrack && !socket.is_executing_rd_callback && !socket.is_executing_wr_callback) { __TACOPIE_LOG(debug, "untrack socket"); @@ -133,11 +132,10 @@ io_service::process_events(void) { } void -io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& socket) { +io_service::process_rd_event(const fd_t& fd, tracked_socket& socket) { __TACOPIE_LOG(debug, "processing read event"); auto rd_callback = socket.rd_callback; - auto fd = poll_result.fd; socket.is_executing_rd_callback = true; @@ -164,11 +162,10 @@ io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& s } void -io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& socket) { +io_service::process_wr_event(const fd_t& fd, tracked_socket& socket) { __TACOPIE_LOG(debug, "processing write event"); auto wr_callback = socket.wr_callback; - auto fd = poll_result.fd; socket.is_executing_wr_callback = true; @@ -193,32 +190,37 @@ io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& s } //! -//! init m_poll_fds_info +//! init m_polled_fds //! -void +int io_service::init_poll_fds_info(void) { std::lock_guard lock(m_tracked_sockets_mtx); - m_poll_fds_info.clear(); + m_polled_fds.clear(); + FD_ZERO(&m_rd_set); + FD_ZERO(&m_wr_set); + + int ndfs = m_notifier.get_read_fd(); + FD_SET(m_notifier.get_read_fd(), &m_rd_set); + m_polled_fds.push_back(m_notifier.get_read_fd()); for (const auto& socket : m_tracked_sockets) { const auto& fd = socket.first; const auto& socket_info = socket.second; - struct pollfd poll_fd_info; - poll_fd_info.fd = fd; - poll_fd_info.events = 0; - poll_fd_info.revents = 0; + bool should_rd = socket_info.rd_callback && !socket_info.is_executing_rd_callback; + if (should_rd) { FD_SET(fd, &m_rd_set); } - if (socket_info.rd_callback && !socket_info.is_executing_rd_callback) { poll_fd_info.events |= POLLIN; } + bool should_wr = socket_info.wr_callback && !socket_info.is_executing_wr_callback; + if (should_wr) { FD_SET(fd, &m_wr_set); } - if (socket_info.wr_callback && !socket_info.is_executing_wr_callback) { poll_fd_info.events |= POLLOUT; } + if (should_rd || should_wr || socket_info.marked_for_untrack) { m_polled_fds.push_back(fd); } - if (poll_fd_info.events || socket_info.marked_for_untrack) { m_poll_fds_info.push_back(std::move(poll_fd_info)); } + if ((should_rd || should_wr) && fd > ndfs) { ndfs = fd; } } - m_poll_fds_info.push_back({m_notifier.get_read_fd(), POLLIN, 0}); + return ndfs + 1; } //! From eee11c0893fa5b226a47cb00accf5e0071c8b52c Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Tue, 4 Apr 2017 23:29:07 -0700 Subject: [PATCH 2/3] windows impl --- sources/network/windows/io_service.cpp | 45 ++++++++++++++------------ 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index bba993b..ea7b619 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -85,10 +85,10 @@ io_service::poll(void) { __TACOPIE_LOG(debug, "starting poll() worker"); while (!m_should_stop) { - init_poll_fds_info(); + int ndfs = init_poll_fds_info(); __TACOPIE_LOG(debug, "polling fds"); - if (WSAPoll(const_cast(m_poll_fds_info.data()), m_poll_fds_info.size(), -1) > 0) { process_events(); } + if (select(ndfs, &m_rd_set, &m_wr_set, NULL, NULL) > 0) { process_events(); } else { __TACOPIE_LOG(debug, "poll woke up, but nothing to process"); } @@ -107,20 +107,20 @@ io_service::process_events(void) { __TACOPIE_LOG(debug, "processing events"); - for (const auto& poll_result : m_poll_fds_info) { - if (poll_result.fd == m_notifier.get_read_fd() && poll_result.revents & POLLRDNORM) { + for (const auto& fd : m_polled_fds) { + if (fd == m_notifier.get_read_fd() && FD_ISSET(fd, &m_rd_set)) { m_notifier.clr_buffer(); continue; } - auto it = m_tracked_sockets.find(poll_result.fd); + auto it = m_tracked_sockets.find(fd); if (it == m_tracked_sockets.end()) { continue; } auto& socket = it->second; - if (poll_result.revents & (POLLRDNORM | POLLHUP) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(poll_result, socket); } - if (poll_result.revents & POLLWRNORM && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(poll_result, socket); } + if (FD_ISSET(fd, &m_rd_set) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(fd, socket); } + if (FD_ISSET(fd, &m_wr_set) && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(fd, socket); } if (socket.marked_for_untrack && !socket.is_executing_rd_callback && !socket.is_executing_wr_callback) { __TACOPIE_LOG(debug, "untrack socket"); @@ -131,11 +131,10 @@ io_service::process_events(void) { } void -io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& socket) { +io_service::process_rd_event(const fd_t& fd, tracked_socket& socket) { __TACOPIE_LOG(debug, "processing read event"); auto rd_callback = socket.rd_callback; - auto fd = poll_result.fd; socket.is_executing_rd_callback = true; @@ -162,11 +161,10 @@ io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& s } void -io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& socket) { +io_service::process_wr_event(const fd_t& fd, tracked_socket& socket) { __TACOPIE_LOG(debug, "processing write event"); auto wr_callback = socket.wr_callback; - auto fd = poll_result.fd; socket.is_executing_wr_callback = true; @@ -196,29 +194,34 @@ io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& s //! init m_poll_fds_info //! -void +int io_service::init_poll_fds_info(void) { std::lock_guard lock(m_tracked_sockets_mtx); - m_poll_fds_info.clear(); + m_polled_fds.clear(); + FD_ZERO(&m_rd_set); + FD_ZERO(&m_wr_set); + + int ndfs = m_notifier.get_read_fd(); + FD_SET(m_notifier.get_read_fd(), &m_rd_set); + m_polled_fds.push_back(m_notifier.get_read_fd()); for (const auto& socket : m_tracked_sockets) { const auto& fd = socket.first; const auto& socket_info = socket.second; - struct pollfd poll_fd_info; - poll_fd_info.fd = fd; - poll_fd_info.events = 0; - poll_fd_info.revents = 0; + bool should_rd = socket_info.rd_callback && !socket_info.is_executing_rd_callback; + if (should_rd) { FD_SET(fd, &m_rd_set); } - if (socket_info.rd_callback && !socket_info.is_executing_rd_callback) { poll_fd_info.events |= POLLRDNORM; } + bool should_wr = socket_info.wr_callback && !socket_info.is_executing_wr_callback; + if (should_wr) { FD_SET(fd, &m_wr_set); } - if (socket_info.wr_callback && !socket_info.is_executing_wr_callback) { poll_fd_info.events |= POLLWRNORM; } + if (should_rd || should_wr || socket_info.marked_for_untrack) { m_polled_fds.push_back(fd); } - if (poll_fd_info.events || socket_info.marked_for_untrack) { m_poll_fds_info.push_back(std::move(poll_fd_info)); } + if ((should_rd || should_wr) && (int)fd > ndfs) { ndfs = fd; } } - m_poll_fds_info.push_back({m_notifier.get_read_fd(), POLLRDNORM, 0}); + return ndfs + 1; } //! From 64146224f9e5c01eb422af5c8f9cfdea38496e91 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Tue, 4 Apr 2017 23:45:53 -0700 Subject: [PATCH 3/3] clang format --- sources/network/windows/io_service.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index ea7b619..8d58561 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -210,15 +210,15 @@ io_service::init_poll_fds_info(void) { const auto& fd = socket.first; const auto& socket_info = socket.second; - bool should_rd = socket_info.rd_callback && !socket_info.is_executing_rd_callback; - if (should_rd) { FD_SET(fd, &m_rd_set); } + bool should_rd = socket_info.rd_callback && !socket_info.is_executing_rd_callback; + if (should_rd) { FD_SET(fd, &m_rd_set); } - bool should_wr = socket_info.wr_callback && !socket_info.is_executing_wr_callback; - if (should_wr) { FD_SET(fd, &m_wr_set); } + bool should_wr = socket_info.wr_callback && !socket_info.is_executing_wr_callback; + if (should_wr) { FD_SET(fd, &m_wr_set); } if (should_rd || should_wr || socket_info.marked_for_untrack) { m_polled_fds.push_back(fd); } - if ((should_rd || should_wr) && (int)fd > ndfs) { ndfs = fd; } + if ((should_rd || should_wr) && (int) fd > ndfs) { ndfs = fd; } } return ndfs + 1;