Skip to content
This repository has been archived by the owner on Apr 6, 2019. It is now read-only.

Commit

Permalink
[2.2.0] Switch to select (#9)
Browse files Browse the repository at this point in the history
* switch from poll to select (unix)

* windows impl

* clang format
  • Loading branch information
Cylix authored Apr 5, 2017
1 parent b8d17d0 commit cd809d0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 50 deletions.
14 changes: 8 additions & 6 deletions includes/tacopie/network/io_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#ifdef _WIN32
#include <Winsock2.h>
#else
#include <poll.h>
#include <sys/select.h>
#endif /* _WIN32 */

#include <tacopie/network/self_pipe.hpp>
Expand Down Expand Up @@ -98,12 +98,12 @@ class io_service {
void poll(void);

//! init m_poll_fds_info
void init_poll_fds_info(void);
int init_poll_fds_info(void);

//! process poll detected events
void process_events(void);
void process_rd_event(const struct pollfd& poll_result, tracked_socket& socket);
void process_wr_event(const struct pollfd& poll_result, tracked_socket& socket);
void process_rd_event(const fd_t& fd, tracked_socket& socket);
void process_wr_event(const fd_t& fd, tracked_socket& socket);

private:
//! tracked sockets
Expand All @@ -121,8 +121,10 @@ class io_service {
//! thread safety
std::mutex m_tracked_sockets_mtx;

//! data structure given to poll
std::vector<struct pollfd> m_poll_fds_info;
//! data structure given to select
std::vector<fd_t> m_polled_fds;
fd_set m_rd_set;
fd_set m_wr_set;

//! condition variable to wait on removal
std::condition_variable m_wait_for_removal_condvar;
Expand Down
48 changes: 25 additions & 23 deletions sources/network/unix/io_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ io_service::poll(void) {
__TACOPIE_LOG(debug, "starting poll() worker");

while (!m_should_stop) {
init_poll_fds_info();
int ndfs = init_poll_fds_info();

__TACOPIE_LOG(debug, "polling fds");
if (::poll(const_cast<struct pollfd*>(m_poll_fds_info.data()), m_poll_fds_info.size(), -1) > 0) { process_events(); }
if (::select(ndfs, &m_rd_set, &m_wr_set, NULL, NULL) > 0) { process_events(); }
else {
__TACOPIE_LOG(debug, "poll woke up, but nothing to process");
}
Expand All @@ -107,20 +107,19 @@ io_service::process_events(void) {

__TACOPIE_LOG(debug, "processing events");

for (const auto& poll_result : m_poll_fds_info) {
if (poll_result.fd == m_notifier.get_read_fd() && poll_result.revents & POLLIN) {
for (const auto& fd : m_polled_fds) {
if (fd == m_notifier.get_read_fd() && FD_ISSET(fd, &m_rd_set)) {
m_notifier.clr_buffer();
continue;
}

auto it = m_tracked_sockets.find(poll_result.fd);

auto it = m_tracked_sockets.find(fd);
if (it == m_tracked_sockets.end()) { continue; }

auto& socket = it->second;

if (poll_result.revents & (POLLIN | POLLHUP) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(poll_result, socket); }
if (poll_result.revents & POLLOUT && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(poll_result, socket); }
if (FD_ISSET(fd, &m_rd_set) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(fd, socket); }
if (FD_ISSET(fd, &m_wr_set) && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(fd, socket); }

if (socket.marked_for_untrack && !socket.is_executing_rd_callback && !socket.is_executing_wr_callback) {
__TACOPIE_LOG(debug, "untrack socket");
Expand All @@ -133,11 +132,10 @@ io_service::process_events(void) {
}

void
io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& socket) {
io_service::process_rd_event(const fd_t& fd, tracked_socket& socket) {
__TACOPIE_LOG(debug, "processing read event");

auto rd_callback = socket.rd_callback;
auto fd = poll_result.fd;

socket.is_executing_rd_callback = true;

Expand All @@ -164,11 +162,10 @@ io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& s
}

void
io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& socket) {
io_service::process_wr_event(const fd_t& fd, tracked_socket& socket) {
__TACOPIE_LOG(debug, "processing write event");

auto wr_callback = socket.wr_callback;
auto fd = poll_result.fd;

socket.is_executing_wr_callback = true;

Expand All @@ -193,32 +190,37 @@ io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& s
}

//!
//! init m_poll_fds_info
//! init m_polled_fds
//!

void
int
io_service::init_poll_fds_info(void) {
std::lock_guard<std::mutex> lock(m_tracked_sockets_mtx);

m_poll_fds_info.clear();
m_polled_fds.clear();
FD_ZERO(&m_rd_set);
FD_ZERO(&m_wr_set);

int ndfs = m_notifier.get_read_fd();
FD_SET(m_notifier.get_read_fd(), &m_rd_set);
m_polled_fds.push_back(m_notifier.get_read_fd());

for (const auto& socket : m_tracked_sockets) {
const auto& fd = socket.first;
const auto& socket_info = socket.second;

struct pollfd poll_fd_info;
poll_fd_info.fd = fd;
poll_fd_info.events = 0;
poll_fd_info.revents = 0;
bool should_rd = socket_info.rd_callback && !socket_info.is_executing_rd_callback;
if (should_rd) { FD_SET(fd, &m_rd_set); }

if (socket_info.rd_callback && !socket_info.is_executing_rd_callback) { poll_fd_info.events |= POLLIN; }
bool should_wr = socket_info.wr_callback && !socket_info.is_executing_wr_callback;
if (should_wr) { FD_SET(fd, &m_wr_set); }

if (socket_info.wr_callback && !socket_info.is_executing_wr_callback) { poll_fd_info.events |= POLLOUT; }
if (should_rd || should_wr || socket_info.marked_for_untrack) { m_polled_fds.push_back(fd); }

if (poll_fd_info.events || socket_info.marked_for_untrack) { m_poll_fds_info.push_back(std::move(poll_fd_info)); }
if ((should_rd || should_wr) && fd > ndfs) { ndfs = fd; }
}

m_poll_fds_info.push_back({m_notifier.get_read_fd(), POLLIN, 0});
return ndfs + 1;
}

//!
Expand Down
45 changes: 24 additions & 21 deletions sources/network/windows/io_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ io_service::poll(void) {
__TACOPIE_LOG(debug, "starting poll() worker");

while (!m_should_stop) {
init_poll_fds_info();
int ndfs = init_poll_fds_info();

__TACOPIE_LOG(debug, "polling fds");
if (WSAPoll(const_cast<WSAPOLLFD*>(m_poll_fds_info.data()), m_poll_fds_info.size(), -1) > 0) { process_events(); }
if (select(ndfs, &m_rd_set, &m_wr_set, NULL, NULL) > 0) { process_events(); }
else {
__TACOPIE_LOG(debug, "poll woke up, but nothing to process");
}
Expand All @@ -107,20 +107,20 @@ io_service::process_events(void) {

__TACOPIE_LOG(debug, "processing events");

for (const auto& poll_result : m_poll_fds_info) {
if (poll_result.fd == m_notifier.get_read_fd() && poll_result.revents & POLLRDNORM) {
for (const auto& fd : m_polled_fds) {
if (fd == m_notifier.get_read_fd() && FD_ISSET(fd, &m_rd_set)) {
m_notifier.clr_buffer();
continue;
}

auto it = m_tracked_sockets.find(poll_result.fd);
auto it = m_tracked_sockets.find(fd);

if (it == m_tracked_sockets.end()) { continue; }

auto& socket = it->second;

if (poll_result.revents & (POLLRDNORM | POLLHUP) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(poll_result, socket); }
if (poll_result.revents & POLLWRNORM && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(poll_result, socket); }
if (FD_ISSET(fd, &m_rd_set) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(fd, socket); }
if (FD_ISSET(fd, &m_wr_set) && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(fd, socket); }

if (socket.marked_for_untrack && !socket.is_executing_rd_callback && !socket.is_executing_wr_callback) {
__TACOPIE_LOG(debug, "untrack socket");
Expand All @@ -131,11 +131,10 @@ io_service::process_events(void) {
}

void
io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& socket) {
io_service::process_rd_event(const fd_t& fd, tracked_socket& socket) {
__TACOPIE_LOG(debug, "processing read event");

auto rd_callback = socket.rd_callback;
auto fd = poll_result.fd;

socket.is_executing_rd_callback = true;

Expand All @@ -162,11 +161,10 @@ io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& s
}

void
io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& socket) {
io_service::process_wr_event(const fd_t& fd, tracked_socket& socket) {
__TACOPIE_LOG(debug, "processing write event");

auto wr_callback = socket.wr_callback;
auto fd = poll_result.fd;

socket.is_executing_wr_callback = true;

Expand Down Expand Up @@ -196,29 +194,34 @@ io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& s
//! init m_poll_fds_info
//!

void
int
io_service::init_poll_fds_info(void) {
std::lock_guard<std::mutex> lock(m_tracked_sockets_mtx);

m_poll_fds_info.clear();
m_polled_fds.clear();
FD_ZERO(&m_rd_set);
FD_ZERO(&m_wr_set);

int ndfs = m_notifier.get_read_fd();
FD_SET(m_notifier.get_read_fd(), &m_rd_set);
m_polled_fds.push_back(m_notifier.get_read_fd());

for (const auto& socket : m_tracked_sockets) {
const auto& fd = socket.first;
const auto& socket_info = socket.second;

struct pollfd poll_fd_info;
poll_fd_info.fd = fd;
poll_fd_info.events = 0;
poll_fd_info.revents = 0;
bool should_rd = socket_info.rd_callback && !socket_info.is_executing_rd_callback;
if (should_rd) { FD_SET(fd, &m_rd_set); }

if (socket_info.rd_callback && !socket_info.is_executing_rd_callback) { poll_fd_info.events |= POLLRDNORM; }
bool should_wr = socket_info.wr_callback && !socket_info.is_executing_wr_callback;
if (should_wr) { FD_SET(fd, &m_wr_set); }

if (socket_info.wr_callback && !socket_info.is_executing_wr_callback) { poll_fd_info.events |= POLLWRNORM; }
if (should_rd || should_wr || socket_info.marked_for_untrack) { m_polled_fds.push_back(fd); }

if (poll_fd_info.events || socket_info.marked_for_untrack) { m_poll_fds_info.push_back(std::move(poll_fd_info)); }
if ((should_rd || should_wr) && (int) fd > ndfs) { ndfs = fd; }
}

m_poll_fds_info.push_back({m_notifier.get_read_fd(), POLLRDNORM, 0});
return ndfs + 1;
}

//!
Expand Down

0 comments on commit cd809d0

Please sign in to comment.