Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic subscription scalability fix #1217

Merged
merged 5 commits into from
Dec 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions clients/roscpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ catkin_package(

include(CheckIncludeFiles)
include(CheckFunctionExists)
include(CheckCXXSymbolExists)

# Not everybody has <ifaddrs.h> (e.g., embedded arm-linux)
CHECK_INCLUDE_FILES(ifaddrs.h HAVE_IFADDRS_H)
# Not everybody has trunc (e.g., Windows, embedded arm-linux)
CHECK_FUNCTION_EXISTS(trunc HAVE_TRUNC)
# Not everybody has epoll (e.g., Windows, BSD, embedded arm-linux)
CHECK_CXX_SYMBOL_EXISTS(epoll_wait "sys/epoll.h" HAVE_EPOLL)

# Output test results to config.h
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/libros/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h)
Expand Down
10 changes: 9 additions & 1 deletion clients/roscpp/include/ros/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,26 @@ namespace ros {
typedef struct pollfd socket_pollfd;
#endif

typedef boost::shared_ptr<std::vector<socket_pollfd> > pollfd_vector_ptr;

/*****************************************************************************
** Functions
*****************************************************************************/

ROSCPP_DECL int last_socket_error();
ROSCPP_DECL const char* last_socket_error_string();
ROSCPP_DECL bool last_socket_error_is_would_block();
ROSCPP_DECL int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout);
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout);
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket);
ROSCPP_DECL int close_socket(socket_fd_t &socket);
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2]);

ROSCPP_DECL int create_socket_watcher();
ROSCPP_DECL void close_socket_watcher(int fd);
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd);
ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd);
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events);

/*****************************************************************************
** Inlines - almost direct api replacements, should stay fast.
*****************************************************************************/
Expand Down
2 changes: 2 additions & 0 deletions clients/roscpp/include/ros/poll_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class ROSCPP_DECL PollSet

boost::mutex signal_mutex_;
signal_fd_t signal_pipe_[2];

int epfd_;
};

}
Expand Down
1 change: 1 addition & 0 deletions clients/roscpp/src/libros/config.h.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#cmakedefine HAVE_TRUNC
#cmakedefine HAVE_IFADDRS_H
#cmakedefine HAVE_EPOLL
148 changes: 136 additions & 12 deletions clients/roscpp/src/libros/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
** Includes
*****************************************************************************/

#include "config.h"

#include <ros/io.h>
#include <ros/assert.h> // don't need if we dont call the pipe functions.
#include <errno.h> // for EFAULT and co.
Expand All @@ -46,6 +48,16 @@
#include <fcntl.h> // for non-blocking configuration
#endif

#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif

/*****************************************************************************
** Macros
*****************************************************************************/

#define UNUSED(expr) do { (void)(expr); } while (0)

/*****************************************************************************
** Namespaces
*****************************************************************************/
Expand Down Expand Up @@ -87,6 +99,69 @@ bool last_socket_error_is_would_block() {
#endif
}

int create_socket_watcher() {
int epfd = -1;
#if defined(HAVE_EPOLL)
epfd = ::epoll_create1(0);
if (epfd < 0)
{
ROS_ERROR("Unable to create epoll watcher: %s", strerror(errno));
}
#endif
return epfd;
}

void close_socket_watcher(int fd) {
if (fd >= 0)
::close(fd);
}

void add_socket_to_watcher(int epfd, int fd) {
#if defined(HAVE_EPOLL)
struct epoll_event ev;
ev.events = 0;
ev.data.fd = fd;

if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev))
{
ROS_ERROR("Unable to add FD to epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
#endif
}

void del_socket_from_watcher(int epfd, int fd) {
#if defined(HAVE_EPOLL)
if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL))
{
ROS_ERROR("Unable to remove FD to epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
#endif
}

void set_events_on_socket(int epfd, int fd, int events) {
#if defined(HAVE_EPOLL)
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev))
{
ROS_ERROR("Unable to modify FD epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
UNUSED(events);
#endif
}



/*****************************************************************************
** Service Robotics/Libssh Functions
*****************************************************************************/
Expand All @@ -96,22 +171,26 @@ bool last_socket_error_is_would_block() {
* Windows doesn't have a polling function until Vista (WSAPoll) and even then
* its implementation is not supposed to be great. This works for a broader
* range of platforms and will suffice.
* @param epfd - the socket watcher to poll on.
* @param fds - the socket set (descriptor's and events) to poll for.
* @param nfds - the number of descriptors to poll for.
* @param timeout - timeout in milliseconds.
* @return int : -1 on error, 0 on timeout, +ve number of structures with received events.
* @return pollfd_vector_ptr : NULL on error, empty on timeout, a list of structures with received events.
*/
int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {
pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout) {
#if defined(WIN32)
fd_set readfds, writefds, exceptfds;
struct timeval tv, *ptv;
socket_fd_t max_fd;
int rc;
nfds_t i;
boost::shared_ptr<std::vector<socket_pollfd> > ofds;

UNUSED(epfd);

if (fds == NULL) {
errno = EFAULT;
return -1;
return ofds;
}

FD_ZERO (&readfds);
Expand Down Expand Up @@ -146,7 +225,7 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {

if (rc == -1) {
errno = EINVAL;
return -1;
return ofds;
}
/*********************
** Setting the timeout
Expand All @@ -166,9 +245,11 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {

rc = select (max_fd + 1, &readfds, &writefds, &exceptfds, ptv);
if (rc < 0) {
return -1;
} else if ( rc == 0 ) {
return 0;
return ofds;
}
ofds.reset(new std::vector<socket_pollfd>);
if ( rc == 0 ) {
return ofds;
}

for (rc = 0, i = 0; i < nfds; i++) {
Expand Down Expand Up @@ -213,18 +294,61 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {
} else {
fds[i].revents = POLLNVAL;
}
ofds->push_back(fds[i]);
}
return ofds;
#elif defined (HAVE_EPOLL)
UNUSED(nfds);
UNUSED(fds);
struct epoll_event ev[nfds];
pollfd_vector_ptr ofds;

int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout);

if (fd_cnt < 0)
{
// EINTR means that we got interrupted by a signal, and is not an error
if(errno != EINTR) {
ROS_ERROR("Error in epoll_wait! %s", strerror(errno));
ofds.reset();
}
}
return rc;
else
{
ofds.reset(new std::vector<socket_pollfd>);
for (int i = 0; i < fd_cnt; i++)
{
socket_pollfd pfd;
pfd.fd = ev[i].data.fd;
pfd.revents = ev[i].events;
ofds->push_back(pfd);
}
}
return ofds;
#else
UNUSED(epfd);
pollfd_vector_ptr ofds(new std::vector<socket_pollfd>);
// use an existing poll implementation
int result = poll(fds, nfds, timeout);
if ( result < 0 ) {
if ( result < 0 )
{
// EINTR means that we got interrupted by a signal, and is not an error
if(errno == EINTR) {
result = 0;
if(errno != EINTR)
{
ROS_ERROR("Error in poll! %s", strerror(errno));
ofds.reset();
}
} else {
for (nfds_t i = 0; i < nfds; i++)
{
if (fds[i].revents)
{
ofds->push_back(fds[i]);
fds[i].revents = 0;
}
}
}
return result;
return ofds;
#endif // poll_sockets functions
}
/*****************************************************************************
Expand Down
Loading