Skip to content

Commit

Permalink
fix(PollSet): #3248 #3249
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Apr 21, 2021
1 parent f810bd0 commit 11fe13f
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 24 deletions.
82 changes: 65 additions & 17 deletions Net/src/PollSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,8 @@ class PollSetImpl
if (it->fd == fd)
{
it->events = 0;
if (mode & PollSet::POLL_READ)
it->events |= POLLIN;
if (mode & PollSet::POLL_WRITE)
it->events |= POLLOUT;
it->revents = 0;
setMode(it->fd, it->events, mode);
}
}
}
Expand Down Expand Up @@ -307,11 +305,7 @@ class PollSetImpl
pfd.fd = it->first;
pfd.events = 0;
pfd.revents = 0;
if (it->second & PollSet::POLL_READ)
pfd.events |= POLLIN;
if (it->second & PollSet::POLL_WRITE)
pfd.events |= POLLOUT;

setMode(pfd.fd, pfd.events, it->second);
_pollfds.push_back(pfd);
}
_addMap.clear();
Expand All @@ -325,9 +319,15 @@ class PollSetImpl
{
Poco::Timestamp start;
#ifdef _WIN32
rc = WSAPoll(&_pollfds[0], static_cast<ULONG>(_pollfds.size()), static_cast<INT>(timeout.totalMilliseconds()));
rc = WSAPoll(&_pollfds[0], static_cast<ULONG>(_pollfds.size()), static_cast<INT>(remainingTime.totalMilliseconds()));
// see https://github.com/pocoproject/poco/issues/3248
if ((remainingTime > 0) && (rc > 0) && !hasSignaledFDs())
{
rc = -1;
WSASetLastError(WSAEINTR);
}
#else
rc = ::poll(&_pollfds[0], _pollfds.size(), timeout.totalMilliseconds());
rc = ::poll(&_pollfds[0], _pollfds.size(), remainingTime.totalMilliseconds());
#endif
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Expand All @@ -352,16 +352,20 @@ class PollSetImpl
std::map<poco_socket_t, Socket>::const_iterator its = _socketMap.find(it->fd);
if (its != _socketMap.end())
{
if (it->revents & POLLIN)
if ((it->revents & POLLIN)
#ifdef _WIN32
|| (it->revents & POLLHUP)
#endif
)
result[its->second] |= PollSet::POLL_READ;
if (it->revents & POLLOUT)
if ((it->revents & POLLOUT)
#ifdef _WIN32
&& (_wantPOLLOUT.find(it->fd) != _wantPOLLOUT.end())
#endif
)
result[its->second] |= PollSet::POLL_WRITE;
if (it->revents & POLLERR)
result[its->second] |= PollSet::POLL_ERROR;
#ifdef _WIN32
if (it->revents & POLLHUP)
result[its->second] |= PollSet::POLL_READ;
#endif
}
it->revents = 0;
}
Expand All @@ -372,8 +376,52 @@ class PollSetImpl
}

private:

#ifdef _WIN32

void setMode(poco_socket_t fd, short& target, int mode)
{
if (mode & PollSet::POLL_READ)
target |= POLLIN;

if (mode & PollSet::POLL_WRITE)
_wantPOLLOUT.insert(fd);
else
_wantPOLLOUT.erase(fd);
target |= POLLOUT;
}

bool hasSignaledFDs()
{
for (const auto& pollfd : _pollfds)
{
if ((pollfd.revents | POLLOUT) &&
(_wantPOLLOUT.find(pollfd.fd) != _wantPOLLOUT.end()))
{
return true;
}
}
return false;
}

#else

void setMode(poco_socket_t fd, short& target, int mode)
{
if (mode & PollSet::POLL_READ)
target |= POLLIN;

if (mode & PollSet::POLL_WRITE)
target |= POLLOUT;
}

#endif

mutable Poco::FastMutex _mutex;
std::map<poco_socket_t, Socket> _socketMap;
#ifdef _WIN32
std::set<poco_socket_t> _wantPOLLOUT;
#endif
std::map<poco_socket_t, int> _addMap;
std::set<poco_socket_t> _removeSet;
std::vector<pollfd> _pollfds;
Expand Down
19 changes: 17 additions & 2 deletions Net/testsuite/src/EchoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ using Poco::Net::SocketAddress;
EchoServer::EchoServer():
_socket(SocketAddress()),
_thread("EchoServer"),
_stop(false)
_stop(false),
_done(false)
{
_thread.start(*this);
_ready.wait();
Expand All @@ -33,7 +34,8 @@ EchoServer::EchoServer():
EchoServer::EchoServer(const Poco::Net::SocketAddress& address):
_socket(address),
_thread("EchoServer"),
_stop(false)
_stop(false),
_done(false)
{
_thread.start(*this);
_ready.wait();
Expand Down Expand Up @@ -78,5 +80,18 @@ void EchoServer::run()
}
}
}
_done = true;
}


void EchoServer::stop()
{
_stop = true;
}


bool EchoServer::done()
{
return _done;
}

11 changes: 9 additions & 2 deletions Net/testsuite/src/EchoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,22 @@ class EchoServer: public Poco::Runnable
Poco::UInt16 port() const;
/// Returns the port the echo server is
/// listening on.

void run();
/// Does the work.


void stop();
/// Sets the stop flag.

bool done();
/// Retruns true if if server is done.

private:
Poco::Net::ServerSocket _socket;
Poco::Thread _thread;
Poco::Event _ready;
bool _stop;
bool _done;
};


Expand Down
72 changes: 69 additions & 3 deletions Net/testsuite/src/PollSetTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ using Poco::Net::ConnectionRefusedException;
using Poco::Net::PollSet;
using Poco::Timespan;
using Poco::Stopwatch;
using Poco::Thread;


PollSetTest::PollSetTest(const std::string& name): CppUnit::TestCase(name)
Expand Down Expand Up @@ -76,7 +77,7 @@ void PollSetTest::testPoll()
assertTrue (sm.find(ss1) != sm.end());
assertTrue (sm.find(ss2) == sm.end());
assertTrue (sm.find(ss1)->second == PollSet::POLL_WRITE);
assertTrue (sw.elapsed() < 100000);
assertTrue (sw.elapsed() < 1100000);

ps.update(ss1, PollSet::POLL_READ);

Expand All @@ -87,7 +88,7 @@ void PollSetTest::testPoll()
assertTrue (sm.find(ss1) != sm.end());
assertTrue (sm.find(ss2) == sm.end());
assertTrue (sm.find(ss1)->second == PollSet::POLL_READ);
assertTrue (sw.elapsed() < 100000);
assertTrue (sw.elapsed() < 1100000);

int n = ss1.receiveBytes(buffer, sizeof(buffer));
assertTrue (n == 5);
Expand All @@ -100,7 +101,7 @@ void PollSetTest::testPoll()
assertTrue (sm.find(ss1) == sm.end());
assertTrue (sm.find(ss2) != sm.end());
assertTrue (sm.find(ss2)->second == PollSet::POLL_READ);
assertTrue (sw.elapsed() < 100000);
assertTrue (sw.elapsed() < 1100000);

n = ss2.receiveBytes(buffer, sizeof(buffer));
assertTrue (n == 5);
Expand All @@ -125,6 +126,69 @@ void PollSetTest::testPoll()
}


void PollSetTest::testPollNoServer()
{
StreamSocket ss1;
StreamSocket ss2;

ss1.connectNB(SocketAddress("127.0.0.1", 0xFEFE));
ss2.connectNB(SocketAddress("127.0.0.1", 0xFEFF));
PollSet ps;
assertTrue(ps.empty());
ps.add(ss1, PollSet::POLL_READ);
ps.add(ss2, PollSet::POLL_READ);
assertTrue(!ps.empty());
assertTrue(ps.has(ss1));
assertTrue(ps.has(ss2));
PollSet::SocketModeMap sm;
Stopwatch sw; sw.start();
do
{
sm = ps.poll(Timespan(1000000));
if (sw.elapsedSeconds() > 10) fail();
} while (sm.size() < 2);
assertTrue(sm.size() == 2);
for (auto s : sm)
assertTrue(0 != (s.second | PollSet::POLL_ERROR));
}


void PollSetTest::testPollClosedServer()
{
EchoServer echoServer1;
EchoServer echoServer2;
StreamSocket ss1;
StreamSocket ss2;

ss1.connectNB(SocketAddress("127.0.0.1", echoServer1.port()));
ss2.connectNB(SocketAddress("127.0.0.1", echoServer2.port()));
PollSet ps;
assertTrue(ps.empty());
ps.add(ss1, PollSet::POLL_READ);
ps.add(ss2, PollSet::POLL_READ);
assertTrue(!ps.empty());
assertTrue(ps.has(ss1));
assertTrue(ps.has(ss2));

echoServer1.stop();
ss1.sendBytes("HELLO", 5);
while (!echoServer1.done()) Thread::sleep(10);
echoServer2.stop();
ss2.sendBytes("HELLO", 5);
while (!echoServer2.done()) Thread::sleep(10);
PollSet::SocketModeMap sm;
Stopwatch sw; sw.start();
do
{
sm = ps.poll(Timespan(1000000));
if (sw.elapsedSeconds() > 10) fail();
} while (sm.size() < 2);
assertTrue(sm.size() == 2);
assertTrue(0 == ss1.receiveBytes(0, 0));
assertTrue(0 == ss2.receiveBytes(0, 0));
}


void PollSetTest::setUp()
{
}
Expand All @@ -140,6 +204,8 @@ CppUnit::Test* PollSetTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("PollSetTest");

CppUnit_addTest(pSuite, PollSetTest, testPoll);
CppUnit_addTest(pSuite, PollSetTest, testPollNoServer);
CppUnit_addTest(pSuite, PollSetTest, testPollClosedServer);

return pSuite;
}
2 changes: 2 additions & 0 deletions Net/testsuite/src/PollSetTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class PollSetTest: public CppUnit::TestCase
~PollSetTest();

void testPoll();
void testPollNoServer();
void testPollClosedServer();

void setUp();
void tearDown();
Expand Down

0 comments on commit 11fe13f

Please sign in to comment.