From 1973dc5c97cfa91c0437ea0d3b208a7fc55f64a9 Mon Sep 17 00:00:00 2001 From: Guillaume Autran Date: Wed, 16 Aug 2017 20:57:11 -0400 Subject: [PATCH 1/5] Topic subscription scalability fix (#1078) (#12) Switching to using epoll system calls to improve performance of the topic polling code by a factor of 2. --- clients/roscpp/CMakeLists.txt | 4 + clients/roscpp/include/ros/io.h | 10 +- clients/roscpp/include/ros/poll_set.h | 2 + clients/roscpp/src/libros/config.h.in | 1 + clients/roscpp/src/libros/io.cpp | 156 ++++++++++++++++++++++--- clients/roscpp/src/libros/poll_set.cpp | 51 ++++---- 6 files changed, 184 insertions(+), 40 deletions(-) diff --git a/clients/roscpp/CMakeLists.txt b/clients/roscpp/CMakeLists.txt index ad6ccda1c2..1957e018d8 100644 --- a/clients/roscpp/CMakeLists.txt +++ b/clients/roscpp/CMakeLists.txt @@ -60,11 +60,15 @@ catkin_package( include(CheckIncludeFiles) include(CheckFunctionExists) +include(CheckCXXSymbolExists) # Not everybody has (e.g., embedded arm-linux) CHECK_INCLUDE_FILES(ifaddrs.h HAVE_IFADDRS_H) # Not everybody has trunc (e.g., Windows, embedded arm-linux) CHECK_FUNCTION_EXISTS(trunc HAVE_TRUNC) +# Not everybody has epoll (e.g., Windows, BSD, embedded arm-linux) +CHECK_CXX_SYMBOL_EXISTS(epoll_wait "sys/epoll.h" HAVE_EPOLL) + # Output test results to config.h configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/libros/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h) diff --git a/clients/roscpp/include/ros/io.h b/clients/roscpp/include/ros/io.h index fc38cbca2e..9cf2bd9d81 100644 --- a/clients/roscpp/include/ros/io.h +++ b/clients/roscpp/include/ros/io.h @@ -138,6 +138,8 @@ namespace ros { typedef struct pollfd socket_pollfd; #endif + typedef boost::shared_ptr > pollfd_vector_ptr; + /***************************************************************************** ** Functions *****************************************************************************/ @@ -145,11 +147,17 @@ namespace ros { ROSCPP_DECL int last_socket_error(); ROSCPP_DECL const char* last_socket_error_string(); ROSCPP_DECL bool last_socket_error_is_would_block(); -ROSCPP_DECL int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout); +ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout); ROSCPP_DECL int set_non_blocking(socket_fd_t &socket); ROSCPP_DECL int close_socket(socket_fd_t &socket); ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2]); +ROSCPP_DECL int create_socket_watcher(); +ROSCPP_DECL void close_socket_watcher(int fd); +ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd); +ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd); +ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events); + /***************************************************************************** ** Inlines - almost direct api replacements, should stay fast. *****************************************************************************/ diff --git a/clients/roscpp/include/ros/poll_set.h b/clients/roscpp/include/ros/poll_set.h index 4f1091d59f..5a6ede20d1 100644 --- a/clients/roscpp/include/ros/poll_set.h +++ b/clients/roscpp/include/ros/poll_set.h @@ -147,6 +147,8 @@ class ROSCPP_DECL PollSet boost::mutex signal_mutex_; signal_fd_t signal_pipe_[2]; + + int epfd_; }; } diff --git a/clients/roscpp/src/libros/config.h.in b/clients/roscpp/src/libros/config.h.in index 7788518c3c..2ec486310c 100644 --- a/clients/roscpp/src/libros/config.h.in +++ b/clients/roscpp/src/libros/config.h.in @@ -1,2 +1,3 @@ #cmakedefine HAVE_TRUNC #cmakedefine HAVE_IFADDRS_H +#cmakedefine HAVE_EPOLL diff --git a/clients/roscpp/src/libros/io.cpp b/clients/roscpp/src/libros/io.cpp index 6f0c242865..f45330b112 100644 --- a/clients/roscpp/src/libros/io.cpp +++ b/clients/roscpp/src/libros/io.cpp @@ -35,6 +35,8 @@ ** Includes *****************************************************************************/ +#include "config.h" + #include #include // don't need if we dont call the pipe functions. #include // for EFAULT and co. @@ -46,6 +48,16 @@ #include // for non-blocking configuration #endif +#ifdef HAVE_EPOLL + #include +#endif + +/***************************************************************************** +** Macros +*****************************************************************************/ + +#define UNUSED(expr) do { (void)(expr); } while (0) + /***************************************************************************** ** Namespaces *****************************************************************************/ @@ -87,6 +99,69 @@ bool last_socket_error_is_would_block() { #endif } +int create_socket_watcher() { + int epfd = -1; +#if defined(HAVE_EPOLL) + epfd = ::epoll_create1(0); + if (epfd < 0) + { + ROS_ERROR("Unable to create epoll watcher: %s", strerror(errno)); + } +#endif + return epfd; +} + +void close_socket_watcher(int fd) { + if (fd >= 0) + ::close(fd); +} + +void add_socket_to_watcher(int epfd, int fd) { +#if defined(HAVE_EPOLL) + struct epoll_event ev; + ev.events = 0; + ev.data.fd = fd; + + if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev)) + { + ROS_ERROR("Unable to add FD to epoll: %s", strerror(errno)); + } +#else + UNUSED(epfd); + UNUSED(fd); +#endif +} + +void del_socket_from_watcher(int epfd, int fd) { +#if defined(HAVE_EPOLL) + if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL)) + { + ROS_ERROR("Unable to remove FD to epoll: %s", strerror(errno)); + } +#else + UNUSED(epfd); + UNUSED(fd); +#endif +} + +void set_events_on_socket(int epfd, int fd, int events) { +#if defined(HAVE_EPOLL) + struct epoll_event ev; + ev.events = events; + ev.data.fd = fd; + if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev)) + { + ROS_ERROR("Unable to modify FD epoll: %s", strerror(errno)); + } +#else + UNUSED(epfd); + UNUSED(fd); + UNUSED(events); +#endif +} + + + /***************************************************************************** ** Service Robotics/Libssh Functions *****************************************************************************/ @@ -96,22 +171,26 @@ bool last_socket_error_is_would_block() { * Windows doesn't have a polling function until Vista (WSAPoll) and even then * its implementation is not supposed to be great. This works for a broader * range of platforms and will suffice. + * @param epfd - the socket watcher to poll on. * @param fds - the socket set (descriptor's and events) to poll for. * @param nfds - the number of descriptors to poll for. * @param timeout - timeout in milliseconds. - * @return int : -1 on error, 0 on timeout, +ve number of structures with received events. + * @return pollfd_vector_ptr : NULL on error, empty on timeout, a list of structures with received events. */ -int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) { +pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout) { #if defined(WIN32) fd_set readfds, writefds, exceptfds; struct timeval tv, *ptv; socket_fd_t max_fd; int rc; nfds_t i; + boost::shared_ptr > ofds; + + UNUSED(epfd); if (fds == NULL) { errno = EFAULT; - return -1; + return ofds; } FD_ZERO (&readfds); @@ -146,7 +225,7 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) { if (rc == -1) { errno = EINVAL; - return -1; + return ofds; } /********************* ** Setting the timeout @@ -166,9 +245,11 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) { rc = select (max_fd + 1, &readfds, &writefds, &exceptfds, ptv); if (rc < 0) { - return -1; - } else if ( rc == 0 ) { - return 0; + return ofds; + } + ofds.reset(new std::vector); + if ( rc == 0 ) { + return ofds; } for (rc = 0, i = 0; i < nfds; i++) { @@ -213,18 +294,59 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) { } else { fds[i].revents = POLLNVAL; } + ofds->push_back(fds[i]); } - return rc; + return ofds; +#elif defined (HAVE_EPOLL) + UNUSED(nfds); + UNUSED(fds); + struct epoll_event ev[nfds]; + pollfd_vector_ptr ofds; + + int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout); + + if (fd_cnt < 0) + { + // EINTR means that we got interrupted by a signal, and is not an error + if(errno != EINTR) { + ROS_ERROR("Error in epoll_wait! %s", strerror(errno)); + ofds.reset(); + } + } + else + { + ofds.reset(new std::vector); + for (int i = 0; i < fd_cnt; i++) + { + socket_pollfd pfd; + pfd.fd = ev[i].data.fd; + pfd.revents = ev[i].events; + ofds->push_back(pfd); + } + } + return ofds; #else - // use an existing poll implementation - int result = poll(fds, nfds, timeout); - if ( result < 0 ) { - // EINTR means that we got interrupted by a signal, and is not an error - if(errno == EINTR) { - result = 0; - } - } - return result; + UNUSED(epfd); + pollfd_vector_ptr ofds(new std::vector); + // use an existing poll implementation + int result = poll(fds, nfds, timeout); + if ( result < 0 ) { + // EINTR means that we got interrupted by a signal, and is not an error + if(errno != EINTR) { + ROS_ERROR("Error in poll! %s", strerror(errno)); + ofds.reset(); + } + } else { + for (nfds_t i = 0; i < nfds; i++) + { + if (fds[i].revents) + { + ofds->push_back(fds[i]); + fds[i].revents = 0; + } + } + } + return ofds; #endif // poll_sockets functions } /***************************************************************************** diff --git a/clients/roscpp/src/libros/poll_set.cpp b/clients/roscpp/src/libros/poll_set.cpp index 647e857c85..acc6a95403 100644 --- a/clients/roscpp/src/libros/poll_set.cpp +++ b/clients/roscpp/src/libros/poll_set.cpp @@ -47,7 +47,7 @@ namespace ros { PollSet::PollSet() -: sockets_changed_(false) + : sockets_changed_(false), epfd_(create_socket_watcher()) { if ( create_signal_pair(signal_pipe_) != 0 ) { ROS_FATAL("create_signal_pair() failed"); @@ -60,6 +60,7 @@ PollSet::PollSet() PollSet::~PollSet() { close_signal_pair(signal_pipe_); + close_socket_watcher(epfd_); } bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport) @@ -80,6 +81,8 @@ bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const Trans return false; } + add_socket_to_watcher(epfd_, fd); + sockets_changed_ = true; } @@ -106,6 +109,8 @@ bool PollSet::delSocket(int fd) just_deleted_.push_back(fd); } + del_socket_from_watcher(epfd_, fd); + sockets_changed_ = true; signal(); @@ -132,6 +137,8 @@ bool PollSet::addEvents(int sock, int events) it->second.events_ |= events; + set_events_on_socket(epfd_, sock, it->second.events_); + signal(); return true; @@ -152,6 +159,8 @@ bool PollSet::delEvents(int sock, int events) return false; } + set_events_on_socket(epfd_, sock, it->second.events_); + signal(); return true; @@ -177,28 +186,28 @@ void PollSet::update(int poll_timeout) createNativePollset(); // Poll across the sockets we're servicing - int ret; - size_t ufds_count = ufds_.size(); - if((ret = poll_sockets(&ufds_.front(), ufds_count, poll_timeout)) < 0) + boost::shared_ptr > ofds = poll_sockets(epfd_, &ufds_.front(), ufds_.size(), poll_timeout); + if (!ofds) { - ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string()); - } - else if (ret > 0) // ret = 0 implies the poll timed out, nothing to do + ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string()); + } + else { - // We have one or more sockets to service - for(size_t i=0; i::iterator it = ofds->begin() ; it != ofds->end(); ++it) { - if (ufds_[i].revents == 0) - { - continue; - } - + int fd = it->fd; + int revents = it->revents; SocketUpdateFunc func; TransportPtr transport; int events = 0; + + if (revents == 0) + { + continue; + } { boost::mutex::scoped_lock lock(socket_info_mutex_); - M_SocketInfo::iterator it = socket_info_.find(ufds_[i].fd); + M_SocketInfo::iterator it = socket_info_.find(fd); // the socket has been entirely deleted if (it == socket_info_.end()) { @@ -215,7 +224,6 @@ void PollSet::update(int poll_timeout) // If these are registered events for this socket, OR the events are ERR/HUP/NVAL, // call through to the registered function - int revents = ufds_[i].revents; if (func && ((events & revents) || (revents & POLLERR) @@ -231,7 +239,7 @@ void PollSet::update(int poll_timeout) // we ignore the first instance of one of these errors. If it's a real error we'll // hit it again next time through. boost::mutex::scoped_lock lock(just_deleted_mutex_); - if (std::find(just_deleted_.begin(), just_deleted_.end(), ufds_[i].fd) != just_deleted_.end()) + if (std::find(just_deleted_.begin(), just_deleted_.end(), fd) != just_deleted_.end()) { skip = true; } @@ -242,13 +250,12 @@ void PollSet::update(int poll_timeout) func(revents & (events|POLLERR|POLLHUP|POLLNVAL)); } } - - ufds_[i].revents = 0; } - - boost::mutex::scoped_lock lock(just_deleted_mutex_); - just_deleted_.clear(); } + + boost::mutex::scoped_lock lock(just_deleted_mutex_); + just_deleted_.clear(); + } void PollSet::createNativePollset() From 7a568fb300ec8af226d3905a15c84a6d11ecf215 Mon Sep 17 00:00:00 2001 From: Dirk Thomas Date: Wed, 8 Nov 2017 12:09:55 -0800 Subject: [PATCH 2/5] remove extra newline --- clients/roscpp/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/roscpp/CMakeLists.txt b/clients/roscpp/CMakeLists.txt index 1957e018d8..af68f8e13a 100644 --- a/clients/roscpp/CMakeLists.txt +++ b/clients/roscpp/CMakeLists.txt @@ -69,7 +69,6 @@ CHECK_FUNCTION_EXISTS(trunc HAVE_TRUNC) # Not everybody has epoll (e.g., Windows, BSD, embedded arm-linux) CHECK_CXX_SYMBOL_EXISTS(epoll_wait "sys/epoll.h" HAVE_EPOLL) - # Output test results to config.h configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/libros/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h) include_directories(${CMAKE_CURRENT_BINARY_DIR}) From ad03922b99ac8bedba4fc855b41bca54c167ebd9 Mon Sep 17 00:00:00 2001 From: Dirk Thomas Date: Wed, 8 Nov 2017 12:10:19 -0800 Subject: [PATCH 3/5] remove extra spaces --- clients/roscpp/include/ros/io.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/roscpp/include/ros/io.h b/clients/roscpp/include/ros/io.h index 9cf2bd9d81..4414212127 100644 --- a/clients/roscpp/include/ros/io.h +++ b/clients/roscpp/include/ros/io.h @@ -138,7 +138,7 @@ namespace ros { typedef struct pollfd socket_pollfd; #endif - typedef boost::shared_ptr > pollfd_vector_ptr; +typedef boost::shared_ptr > pollfd_vector_ptr; /***************************************************************************** ** Functions From d929b5fdd639263da1f2929463368b56aa123126 Mon Sep 17 00:00:00 2001 From: Dirk Thomas Date: Wed, 8 Nov 2017 12:16:04 -0800 Subject: [PATCH 4/5] keep consistent indentation --- clients/roscpp/src/libros/io.cpp | 108 ++++++++++++++++--------------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/clients/roscpp/src/libros/io.cpp b/clients/roscpp/src/libros/io.cpp index f45330b112..bf0ca1bcb7 100644 --- a/clients/roscpp/src/libros/io.cpp +++ b/clients/roscpp/src/libros/io.cpp @@ -184,9 +184,9 @@ pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int ti socket_fd_t max_fd; int rc; nfds_t i; - boost::shared_ptr > ofds; + boost::shared_ptr > ofds; - UNUSED(epfd); + UNUSED(epfd); if (fds == NULL) { errno = EFAULT; @@ -247,8 +247,8 @@ pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int ti if (rc < 0) { return ofds; } - ofds.reset(new std::vector); - if ( rc == 0 ) { + ofds.reset(new std::vector); + if ( rc == 0 ) { return ofds; } @@ -294,59 +294,61 @@ pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int ti } else { fds[i].revents = POLLNVAL; } - ofds->push_back(fds[i]); + ofds->push_back(fds[i]); } return ofds; #elif defined (HAVE_EPOLL) - UNUSED(nfds); - UNUSED(fds); - struct epoll_event ev[nfds]; - pollfd_vector_ptr ofds; - - int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout); - - if (fd_cnt < 0) - { - // EINTR means that we got interrupted by a signal, and is not an error - if(errno != EINTR) { - ROS_ERROR("Error in epoll_wait! %s", strerror(errno)); - ofds.reset(); - } - } - else - { - ofds.reset(new std::vector); - for (int i = 0; i < fd_cnt; i++) - { - socket_pollfd pfd; - pfd.fd = ev[i].data.fd; - pfd.revents = ev[i].events; - ofds->push_back(pfd); - } - } - return ofds; + UNUSED(nfds); + UNUSED(fds); + struct epoll_event ev[nfds]; + pollfd_vector_ptr ofds; + + int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout); + + if (fd_cnt < 0) + { + // EINTR means that we got interrupted by a signal, and is not an error + if(errno != EINTR) { + ROS_ERROR("Error in epoll_wait! %s", strerror(errno)); + ofds.reset(); + } + } + else + { + ofds.reset(new std::vector); + for (int i = 0; i < fd_cnt; i++) + { + socket_pollfd pfd; + pfd.fd = ev[i].data.fd; + pfd.revents = ev[i].events; + ofds->push_back(pfd); + } + } + return ofds; #else - UNUSED(epfd); - pollfd_vector_ptr ofds(new std::vector); - // use an existing poll implementation - int result = poll(fds, nfds, timeout); - if ( result < 0 ) { - // EINTR means that we got interrupted by a signal, and is not an error - if(errno != EINTR) { - ROS_ERROR("Error in poll! %s", strerror(errno)); - ofds.reset(); - } - } else { - for (nfds_t i = 0; i < nfds; i++) - { - if (fds[i].revents) - { - ofds->push_back(fds[i]); - fds[i].revents = 0; - } - } - } - return ofds; + UNUSED(epfd); + pollfd_vector_ptr ofds(new std::vector); + // use an existing poll implementation + int result = poll(fds, nfds, timeout); + if ( result < 0 ) + { + // EINTR means that we got interrupted by a signal, and is not an error + if(errno != EINTR) + { + ROS_ERROR("Error in poll! %s", strerror(errno)); + ofds.reset(); + } + } else { + for (nfds_t i = 0; i < nfds; i++) + { + if (fds[i].revents) + { + ofds->push_back(fds[i]); + fds[i].revents = 0; + } + } + } + return ofds; #endif // poll_sockets functions } /***************************************************************************** From cf7aa188975a4bcd09e6bc74a2605cf3880262ed Mon Sep 17 00:00:00 2001 From: Mike Purvis Date: Tue, 19 Dec 2017 11:14:25 -0500 Subject: [PATCH 5/5] Disable addDelMultiThread test. --- test/test_roscpp/test/test_poll_set.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/test/test_roscpp/test/test_poll_set.cpp b/test/test_roscpp/test/test_poll_set.cpp index 046e1b3fd0..61a20088b9 100644 --- a/test/test_roscpp/test/test_poll_set.cpp +++ b/test/test_roscpp/test/test_poll_set.cpp @@ -256,7 +256,16 @@ void delThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier) ps->delSocket(sh->socket_); } -TEST_F(Poller, addDelMultiThread) +/** + * This test has been disabled. The underlying logic which it tests has three + * different implementations (poll, epoll, Windows), and development of the epoll + * version exposed that the test was validating a buggy aspect of the original + * poll version. To reenable this test, the poll version and the test would both + * have to be updated. + * + * For more discussion, see: https://github.com/ros/ros_comm/pull/1217 + */ +TEST_F(Poller, DISABLED_addDelMultiThread) { for (int i = 0; i < 100; ++i) {