Skip to content
This repository has been archived by the owner on Apr 6, 2019. It is now read-only.

Switch to select #9

Merged
merged 3 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions includes/tacopie/network/io_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#ifdef _WIN32
#include <Winsock2.h>
#else
#include <poll.h>
#include <sys/select.h>
#endif /* _WIN32 */

#include <tacopie/network/self_pipe.hpp>
Expand Down Expand Up @@ -98,12 +98,12 @@ class io_service {
void poll(void);

//! init m_poll_fds_info
void init_poll_fds_info(void);
int init_poll_fds_info(void);

//! process poll detected events
void process_events(void);
void process_rd_event(const struct pollfd& poll_result, tracked_socket& socket);
void process_wr_event(const struct pollfd& poll_result, tracked_socket& socket);
void process_rd_event(const fd_t& fd, tracked_socket& socket);
void process_wr_event(const fd_t& fd, tracked_socket& socket);

private:
//! tracked sockets
Expand All @@ -121,8 +121,10 @@ class io_service {
//! thread safety
std::mutex m_tracked_sockets_mtx;

//! data structure given to poll
std::vector<struct pollfd> m_poll_fds_info;
//! data structure given to select
std::vector<fd_t> m_polled_fds;
fd_set m_rd_set;
fd_set m_wr_set;

//! condition variable to wait on removal
std::condition_variable m_wait_for_removal_condvar;
Expand Down
48 changes: 25 additions & 23 deletions sources/network/unix/io_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ io_service::poll(void) {
__TACOPIE_LOG(debug, "starting poll() worker");

while (!m_should_stop) {
init_poll_fds_info();
int ndfs = init_poll_fds_info();

__TACOPIE_LOG(debug, "polling fds");
if (::poll(const_cast<struct pollfd*>(m_poll_fds_info.data()), m_poll_fds_info.size(), -1) > 0) { process_events(); }
if (::select(ndfs, &m_rd_set, &m_wr_set, NULL, NULL) > 0) { process_events(); }
else {
__TACOPIE_LOG(debug, "poll woke up, but nothing to process");
}
Expand All @@ -107,20 +107,19 @@ io_service::process_events(void) {

__TACOPIE_LOG(debug, "processing events");

for (const auto& poll_result : m_poll_fds_info) {
if (poll_result.fd == m_notifier.get_read_fd() && poll_result.revents & POLLIN) {
for (const auto& fd : m_polled_fds) {
if (fd == m_notifier.get_read_fd() && FD_ISSET(fd, &m_rd_set)) {
m_notifier.clr_buffer();
continue;
}

auto it = m_tracked_sockets.find(poll_result.fd);

auto it = m_tracked_sockets.find(fd);
if (it == m_tracked_sockets.end()) { continue; }

auto& socket = it->second;

if (poll_result.revents & (POLLIN | POLLHUP) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(poll_result, socket); }
if (poll_result.revents & POLLOUT && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(poll_result, socket); }
if (FD_ISSET(fd, &m_rd_set) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(fd, socket); }
if (FD_ISSET(fd, &m_wr_set) && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(fd, socket); }

if (socket.marked_for_untrack && !socket.is_executing_rd_callback && !socket.is_executing_wr_callback) {
__TACOPIE_LOG(debug, "untrack socket");
Expand All @@ -133,11 +132,10 @@ io_service::process_events(void) {
}

void
io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& socket) {
io_service::process_rd_event(const fd_t& fd, tracked_socket& socket) {
__TACOPIE_LOG(debug, "processing read event");

auto rd_callback = socket.rd_callback;
auto fd = poll_result.fd;

socket.is_executing_rd_callback = true;

Expand All @@ -164,11 +162,10 @@ io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& s
}

void
io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& socket) {
io_service::process_wr_event(const fd_t& fd, tracked_socket& socket) {
__TACOPIE_LOG(debug, "processing write event");

auto wr_callback = socket.wr_callback;
auto fd = poll_result.fd;

socket.is_executing_wr_callback = true;

Expand All @@ -193,32 +190,37 @@ io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& s
}

//!
//! init m_poll_fds_info
//! init m_polled_fds
//!

void
int
io_service::init_poll_fds_info(void) {
std::lock_guard<std::mutex> lock(m_tracked_sockets_mtx);

m_poll_fds_info.clear();
m_polled_fds.clear();
FD_ZERO(&m_rd_set);
FD_ZERO(&m_wr_set);

int ndfs = m_notifier.get_read_fd();
FD_SET(m_notifier.get_read_fd(), &m_rd_set);
m_polled_fds.push_back(m_notifier.get_read_fd());

for (const auto& socket : m_tracked_sockets) {
const auto& fd = socket.first;
const auto& socket_info = socket.second;

struct pollfd poll_fd_info;
poll_fd_info.fd = fd;
poll_fd_info.events = 0;
poll_fd_info.revents = 0;
bool should_rd = socket_info.rd_callback && !socket_info.is_executing_rd_callback;
if (should_rd) { FD_SET(fd, &m_rd_set); }

if (socket_info.rd_callback && !socket_info.is_executing_rd_callback) { poll_fd_info.events |= POLLIN; }
bool should_wr = socket_info.wr_callback && !socket_info.is_executing_wr_callback;
if (should_wr) { FD_SET(fd, &m_wr_set); }

if (socket_info.wr_callback && !socket_info.is_executing_wr_callback) { poll_fd_info.events |= POLLOUT; }
if (should_rd || should_wr || socket_info.marked_for_untrack) { m_polled_fds.push_back(fd); }

if (poll_fd_info.events || socket_info.marked_for_untrack) { m_poll_fds_info.push_back(std::move(poll_fd_info)); }
if ((should_rd || should_wr) && fd > ndfs) { ndfs = fd; }
}

m_poll_fds_info.push_back({m_notifier.get_read_fd(), POLLIN, 0});
return ndfs + 1;
}

//!
Expand Down
45 changes: 24 additions & 21 deletions sources/network/windows/io_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ io_service::poll(void) {
__TACOPIE_LOG(debug, "starting poll() worker");

while (!m_should_stop) {
init_poll_fds_info();
int ndfs = init_poll_fds_info();

__TACOPIE_LOG(debug, "polling fds");
if (WSAPoll(const_cast<WSAPOLLFD*>(m_poll_fds_info.data()), m_poll_fds_info.size(), -1) > 0) { process_events(); }
if (select(ndfs, &m_rd_set, &m_wr_set, NULL, NULL) > 0) { process_events(); }
else {
__TACOPIE_LOG(debug, "poll woke up, but nothing to process");
}
Expand All @@ -107,20 +107,20 @@ io_service::process_events(void) {

__TACOPIE_LOG(debug, "processing events");

for (const auto& poll_result : m_poll_fds_info) {
if (poll_result.fd == m_notifier.get_read_fd() && poll_result.revents & POLLRDNORM) {
for (const auto& fd : m_polled_fds) {
if (fd == m_notifier.get_read_fd() && FD_ISSET(fd, &m_rd_set)) {
m_notifier.clr_buffer();
continue;
}

auto it = m_tracked_sockets.find(poll_result.fd);
auto it = m_tracked_sockets.find(fd);

if (it == m_tracked_sockets.end()) { continue; }

auto& socket = it->second;

if (poll_result.revents & (POLLRDNORM | POLLHUP) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(poll_result, socket); }
if (poll_result.revents & POLLWRNORM && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(poll_result, socket); }
if (FD_ISSET(fd, &m_rd_set) && socket.rd_callback && !socket.is_executing_rd_callback) { process_rd_event(fd, socket); }
if (FD_ISSET(fd, &m_wr_set) && socket.wr_callback && !socket.is_executing_wr_callback) { process_wr_event(fd, socket); }

if (socket.marked_for_untrack && !socket.is_executing_rd_callback && !socket.is_executing_wr_callback) {
__TACOPIE_LOG(debug, "untrack socket");
Expand All @@ -131,11 +131,10 @@ io_service::process_events(void) {
}

void
io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& socket) {
io_service::process_rd_event(const fd_t& fd, tracked_socket& socket) {
__TACOPIE_LOG(debug, "processing read event");

auto rd_callback = socket.rd_callback;
auto fd = poll_result.fd;

socket.is_executing_rd_callback = true;

Expand All @@ -162,11 +161,10 @@ io_service::process_rd_event(const struct pollfd& poll_result, tracked_socket& s
}

void
io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& socket) {
io_service::process_wr_event(const fd_t& fd, tracked_socket& socket) {
__TACOPIE_LOG(debug, "processing write event");

auto wr_callback = socket.wr_callback;
auto fd = poll_result.fd;

socket.is_executing_wr_callback = true;

Expand Down Expand Up @@ -196,29 +194,34 @@ io_service::process_wr_event(const struct pollfd& poll_result, tracked_socket& s
//! init m_poll_fds_info
//!

void
int
io_service::init_poll_fds_info(void) {
std::lock_guard<std::mutex> lock(m_tracked_sockets_mtx);

m_poll_fds_info.clear();
m_polled_fds.clear();
FD_ZERO(&m_rd_set);
FD_ZERO(&m_wr_set);

int ndfs = m_notifier.get_read_fd();
FD_SET(m_notifier.get_read_fd(), &m_rd_set);
m_polled_fds.push_back(m_notifier.get_read_fd());

for (const auto& socket : m_tracked_sockets) {
const auto& fd = socket.first;
const auto& socket_info = socket.second;

struct pollfd poll_fd_info;
poll_fd_info.fd = fd;
poll_fd_info.events = 0;
poll_fd_info.revents = 0;
bool should_rd = socket_info.rd_callback && !socket_info.is_executing_rd_callback;
if (should_rd) { FD_SET(fd, &m_rd_set); }

if (socket_info.rd_callback && !socket_info.is_executing_rd_callback) { poll_fd_info.events |= POLLRDNORM; }
bool should_wr = socket_info.wr_callback && !socket_info.is_executing_wr_callback;
if (should_wr) { FD_SET(fd, &m_wr_set); }

if (socket_info.wr_callback && !socket_info.is_executing_wr_callback) { poll_fd_info.events |= POLLWRNORM; }
if (should_rd || should_wr || socket_info.marked_for_untrack) { m_polled_fds.push_back(fd); }

if (poll_fd_info.events || socket_info.marked_for_untrack) { m_poll_fds_info.push_back(std::move(poll_fd_info)); }
if ((should_rd || should_wr) && (int) fd > ndfs) { ndfs = fd; }
}

m_poll_fds_info.push_back({m_notifier.get_read_fd(), POLLRDNORM, 0});
return ndfs + 1;
}

//!
Expand Down