Skip to content

Commit

Permalink
Replace the EndPoint::PrepareIO() functions with stored state
Browse files Browse the repository at this point in the history
#### Problem

Currently, the socket-based networking implementation calls `PrepareIO()` on each endpoint to determine file descriptor sets for `select()`. This usually does redundant work (since the conditions only change on endpoint state changes) and is not usable by event-based interfaces (e.g. epoll/kqueue and libraries like libevent/libev/libuv) – see issue project-chip#5556 _rework system layer event loop_.

#### Summary of Changes

- Maintain the required flags in an endpoint member, `mRequestIO`, in place of recalculating them in `PrepareIO()`. (The points where `mRequestIO` is changed are the points where an event-based interface would be invoked.)
- For UDP and Raw endpoints, pass callbacks to `Listen()` rather than setting members separately, to avoid any possiblity of races in a less synchronous environment.

Part of issue project-chip#5556 _rework system layer event loop_.
  • Loading branch information
kpschoedel committed Apr 16, 2021
1 parent 2b6cc31 commit 7eb4869
Show file tree
Hide file tree
Showing 16 changed files with 111 additions and 89 deletions.
1 change: 1 addition & 0 deletions src/inet/EndPointBasis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void EndPointBasis::InitEndPointBasis(InetLayer & aInetLayer, void * aAppState)
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
mSocket = INET_INVALID_SOCKET_FD;
mPendingIO.Clear();
mRequestIO.Clear();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
}

Expand Down
3 changes: 2 additions & 1 deletion src/inet/EndPointBasis.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class DLL_EXPORT EndPointBasis : public InetLayerBasis
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
int mSocket; /**< Encapsulated socket descriptor. */
IPAddressType mAddrType; /**< Protocol family, i.e. IPv4 or IPv6. */
SocketEvents mPendingIO; /**< Socket event masks */
SocketEvents mPendingIO; /**< Socket event masks (read/write/error) currently available */
SocketEvents mRequestIO; /**< Socket event masks (read/write) to wait for */
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

#if CHIP_SYSTEM_CONFIG_USE_LWIP
Expand Down
10 changes: 0 additions & 10 deletions src/inet/IPEndPointBasis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1078,16 +1078,6 @@ INET_ERROR IPEndPointBasis::GetSocket(IPAddressType aAddressType, int aType, int
return INET_NO_ERROR;
}

SocketEvents IPEndPointBasis::PrepareIO()
{
SocketEvents res;

if (mState == kState_Listening && OnMessageReceived != nullptr)
res.SetRead();

return res;
}

void IPEndPointBasis::HandlePendingIO(uint16_t aPort)
{
INET_ERROR lStatus = INET_NO_ERROR;
Expand Down
14 changes: 7 additions & 7 deletions src/inet/IPEndPointBasis.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ class DLL_EXPORT IPEndPointBasis : public EndPointBasis
typedef void (*OnMessageReceivedFunct)(IPEndPointBasis * endPoint, chip::System::PacketBufferHandle msg,
const IPPacketInfo * pktInfo);

/** The endpoint's message reception event handling function delegate. */
OnMessageReceivedFunct OnMessageReceived;

/**
* @brief Type of reception error event handling function.
*
Expand All @@ -109,16 +106,19 @@ class DLL_EXPORT IPEndPointBasis : public EndPointBasis
*/
typedef void (*OnReceiveErrorFunct)(IPEndPointBasis * endPoint, INET_ERROR err, const IPPacketInfo * pktInfo);

/** The endpoint's receive error event handling function delegate. */
OnReceiveErrorFunct OnReceiveError;

INET_ERROR SetMulticastLoopback(IPVersion aIPVersion, bool aLoopback);
INET_ERROR JoinMulticastGroup(InterfaceId aInterfaceId, const IPAddress & aAddress);
INET_ERROR LeaveMulticastGroup(InterfaceId aInterfaceId, const IPAddress & aAddress);

protected:
void Init(InetLayer * aInetLayer);

/** The endpoint's message reception event handling function delegate. */
OnMessageReceivedFunct OnMessageReceived;

/** The endpoint's receive error event handling function delegate. */
OnReceiveErrorFunct OnReceiveError;

#if CHIP_SYSTEM_CONFIG_USE_LWIP
public:
static struct netif * FindNetifFromInterfaceId(InterfaceId aInterfaceId);
Expand All @@ -138,7 +138,7 @@ class DLL_EXPORT IPEndPointBasis : public EndPointBasis
INET_ERROR BindInterface(IPAddressType aAddressType, InterfaceId aInterfaceId);
INET_ERROR SendMsg(const IPPacketInfo * aPktInfo, chip::System::PacketBufferHandle aBuffer, uint16_t aSendFlags);
INET_ERROR GetSocket(IPAddressType aAddressType, int aType, int aProtocol);
SocketEvents PrepareIO();
SocketEvents PrepareIO() const { return mRequestIO; }
void HandlePendingIO(uint16_t aPort);
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

Expand Down
16 changes: 10 additions & 6 deletions src/inet/InetLayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,13 @@ INET_ERROR InetLayer::GetLinkLocalAddr(InterfaceId link, IPAddress * llAddr)
*
* @param[in] ipProto A protocol within the IP family (e.g., ICMPv4 or ICMPv6).
*
* @param[in,out] retEndPoint A pointer to a pointer of the RawEndPoint object that is
* a return parameter upon completion of the object creation.
* *retEndPoint is NULL if creation fails.
* @param[in] onMessageReceived The endpoint's message reception event handling function delegate.
*
* @param[in] OnReceiveError The endpoint's receive error event handling function delegate.
*
* @param[in,out] retEndPoint A pointer to a pointer of the RawEndPoint object that is
* a return parameter upon completion of the object creation.
* *retEndPoint is NULL if creation fails.
*
* @retval #INET_ERROR_INCORRECT_STATE If the InetLayer object is not initialized.
* @retval #INET_ERROR_NO_ENDPOINTS If the InetLayer RawEndPoint pool is full and no new
Expand Down Expand Up @@ -628,9 +632,9 @@ INET_ERROR InetLayer::NewTCPEndPoint(TCPEndPoint ** retEndPoint)
* This function gets a free UDPEndPoint object from a pre-allocated pool
* and also calls the explicit initializer on the new object.
*
* @param[in,out] retEndPoint A pointer to a pointer of the UDPEndPoint object that is
* a return parameter upon completion of the object creation.
* *retEndPoint is NULL if creation fails.
* @param[in,out] retEndPoint A pointer to a pointer of the UDPEndPoint object that is
* a return parameter upon completion of the object creation.
* *retEndPoint is NULL if creation fails.
*
* @retval #INET_ERROR_INCORRECT_STATE If the InetLayer object is not initialized.
* @retval #INET_ERROR_NO_ENDPOINTS If the InetLayer UDPEndPoint pool is full and no new
Expand Down
2 changes: 1 addition & 1 deletion src/inet/InetLayerBasis.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ inline void InetLayerBasis::InitInetLayerBasis(InetLayer & aInetLayer, void * aA
class SocketEvents
{
public:
enum
enum : uint8_t
{
kRead = 0x01, /**< Bit flag indicating if there is a read event on a socket. */
kWrite = 0x02, /**< Bit flag indicating if there is a write event on a socket. */
Expand Down
24 changes: 18 additions & 6 deletions src/inet/RawEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ INET_ERROR RawEndPoint::BindIPv6LinkLocal(InterfaceId intfId, const IPAddress &
/**
* @brief Prepare the endpoint to receive ICMP messages.
*
* @param[in] onMessageReceived The endpoint's message reception event handling function delegate.
* @param[in] onReceiveError The endpoint's receive error event handling function delegate.
* @param[in] appState Application state pointer.
*
* @retval INET_NO_ERROR always returned.
*
* @details
Expand All @@ -374,7 +378,8 @@ INET_ERROR RawEndPoint::BindIPv6LinkLocal(InterfaceId intfId, const IPAddress &
* On LwIP, this method must not be called with the LwIP stack lock
* already acquired
*/
INET_ERROR RawEndPoint::Listen()
INET_ERROR RawEndPoint::Listen(IPEndPointBasis::OnMessageReceivedFunct onMessageReceived,
IPEndPointBasis::OnReceiveErrorFunct onReceiveError, void * appState)
{
INET_ERROR res = INET_NO_ERROR;

Expand All @@ -394,6 +399,10 @@ INET_ERROR RawEndPoint::Listen()
goto exit;
}

OnMessageReceived = onMessageReceived;
OnReceiveError = onReceiveError;
AppState = appState;

#if CHIP_SYSTEM_CONFIG_USE_LWIP

// Lock LwIP stack
Expand Down Expand Up @@ -423,6 +432,11 @@ INET_ERROR RawEndPoint::Listen()
if (res == INET_NO_ERROR)
{
mState = kState_Listening;

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
// Wait for ability to read on this endpoint.
mRequestIO.SetRead();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
}

exit:
Expand Down Expand Up @@ -478,6 +492,9 @@ void RawEndPoint::Close()
// Clear any results from select() that indicate pending I/O for the socket.
mPendingIO.Clear();

// Do not wait for I/O on this endpoint.
mRequestIO.Clear();

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

mState = kState_Closed;
Expand Down Expand Up @@ -1049,11 +1066,6 @@ INET_ERROR RawEndPoint::GetSocket(IPAddressType aAddressType)
return (lRetval);
}

SocketEvents RawEndPoint::PrepareIO()
{
return (IPEndPointBasis::PrepareIO());
}

void RawEndPoint::HandlePendingIO()
{
if (mState == kState_Listening && OnMessageReceived != nullptr && mPendingIO.IsReadable())
Expand Down
4 changes: 2 additions & 2 deletions src/inet/RawEndPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class DLL_EXPORT RawEndPoint : public IPEndPointBasis
INET_ERROR BindIPv6LinkLocal(InterfaceId intfId, const IPAddress & addr);
INET_ERROR BindInterface(IPAddressType addrType, InterfaceId intfId);
InterfaceId GetBoundInterface();
INET_ERROR Listen();
INET_ERROR Listen(IPEndPointBasis::OnMessageReceivedFunct onMessageReceived,
IPEndPointBasis::OnReceiveErrorFunct onReceiveError, void * appState = nullptr);
INET_ERROR SendTo(const IPAddress & addr, chip::System::PacketBufferHandle && msg, uint16_t sendFlags = 0);
INET_ERROR SendTo(const IPAddress & addr, InterfaceId intfId, chip::System::PacketBufferHandle && msg, uint16_t sendFlags = 0);
INET_ERROR SendMsg(const IPPacketInfo * pktInfo, chip::System::PacketBufferHandle msg, uint16_t sendFlags = 0);
Expand Down Expand Up @@ -107,7 +108,6 @@ class DLL_EXPORT RawEndPoint : public IPEndPointBasis

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
INET_ERROR GetSocket(IPAddressType addrType);
SocketEvents PrepareIO();
void HandlePendingIO();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
};
Expand Down
68 changes: 42 additions & 26 deletions src/inet/TCPEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ INET_ERROR TCPEndPoint::Bind(IPAddressType addrType, const IPAddress & addr, uin
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

if (res == INET_NO_ERROR)
{
State = kState_Bound;
}

return res;
}
Expand All @@ -254,10 +256,6 @@ INET_ERROR TCPEndPoint::Listen(uint16_t backlog)
{
INET_ERROR res = INET_NO_ERROR;

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
chip::System::Layer & lSystemLayer = SystemLayer();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

if (State != kState_Bound)
return INET_ERROR_INCORRECT_STATE;

Expand All @@ -278,8 +276,11 @@ INET_ERROR TCPEndPoint::Listen(uint16_t backlog)
if (listen(mSocket, backlog) != 0)
res = chip::System::MapErrorPOSIX(errno);

// Wait for ability to read on this endpoint.
mRequestIO.SetRead();

// Wake the thread calling select so that it recognizes the new socket.
lSystemLayer.WakeSelect();
SystemLayer().WakeSelect();

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

Expand Down Expand Up @@ -493,11 +494,17 @@ INET_ERROR TCPEndPoint::Connect(const IPAddress & addr, uint16_t port, Interface
if (conRes == 0)
{
State = kState_Connected;
// Wait for ability to read on this endpoint.
mRequestIO.SetRead();
if (OnConnectComplete != nullptr)
OnConnectComplete(this, INET_NO_ERROR);
}
else
{
State = kState_Connecting;
// Wait for ability to write on this endpoint.
mRequestIO.SetWrite();
}

// Wake the thread calling select so that it recognizes the new socket.
lSystemLayer.WakeSelect();
Expand Down Expand Up @@ -699,9 +706,17 @@ INET_ERROR TCPEndPoint::Send(System::PacketBufferHandle data, bool push)
}

if (mSendQueue.IsNull())
{
mSendQueue = std::move(data);
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
// Wait for ability to write on this endpoint.
mRequestIO.SetWrite();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
}
else
{
mSendQueue->AddToEnd(std::move(data));
}

#if CHIP_SYSTEM_CONFIG_USE_LWIP

Expand Down Expand Up @@ -1318,9 +1333,18 @@ INET_ERROR TCPEndPoint::DriveSending()
MarkActive();

if (lenSent < bufLen)
{
mSendQueue->ConsumeHead(lenSent);
}
else
{
mSendQueue.FreeHead();
if (mSendQueue.IsNull())
{
// Do not wait for ability to write on this endpoint.
mRequestIO.ClearWrite();
}
}

if (OnDataSent != nullptr)
OnDataSent(this, lenSent);
Expand Down Expand Up @@ -1418,6 +1442,13 @@ void TCPEndPoint::HandleConnectComplete(INET_ERROR err)
MarkActive();

State = kState_Connected;

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
// Wait for ability to read or write on this endpoint.
mRequestIO.SetRead();
mRequestIO.SetWrite();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

if (OnConnectComplete != nullptr)
OnConnectComplete(this, INET_NO_ERROR);
}
Expand Down Expand Up @@ -1547,6 +1578,7 @@ INET_ERROR TCPEndPoint::DoClose(INET_ERROR err, bool suppressCallback)
if (close(mSocket) != 0 && err == INET_NO_ERROR)
err = chip::System::MapErrorPOSIX(errno);
mSocket = INET_INVALID_SOCKET_FD;
mRequestIO.Clear();

// Wake the thread calling select so that it recognizes the socket is closed.
lSystemLayer.WakeSelect();
Expand Down Expand Up @@ -2335,26 +2367,6 @@ INET_ERROR TCPEndPoint::GetSocket(IPAddressType addrType)
return INET_NO_ERROR;
}

SocketEvents TCPEndPoint::PrepareIO()
{
SocketEvents ioType;

// If initiating a new connection...
// OR if connected and there is data to be sent...
// THEN arrange for the kernel to alert us when the socket is ready to be written.
if (State == kState_Connecting || (IsConnected() && !mSendQueue.IsNull()))
ioType.SetWrite();

// If listening for incoming connections and the app is ready to receive a connection...
// OR if in a state where receiving is allowed, and the app is ready to receive data...
// THEN arrange for the kernel to alert us when the socket is ready to be read.
if ((State == kState_Listening && OnConnectionReceived != nullptr) ||
((State == kState_Connected || State == kState_SendShutdown) && ReceiveEnabled && OnDataReceived != nullptr))
ioType.SetRead();

return ioType;
}

void TCPEndPoint::HandlePendingIO()
{
// Prevent the end point from being freed while in the middle of a callback.
Expand Down Expand Up @@ -2504,7 +2516,8 @@ void TCPEndPoint::ReceiveData()
State = kState_ReceiveShutdown;
else
State = kState_Closing;

// Do not wait for ability to read on this endpoint.
mRequestIO.ClearRead();
// Call the app's OnPeerClose.
if (OnPeerClose != nullptr)
OnPeerClose(this);
Expand Down Expand Up @@ -2601,6 +2614,9 @@ void TCPEndPoint::HandleIncomingConnection()
#endif // !INET_CONFIG_ENABLE_IPV4
conEP->Retain();

// Wait for ability to read on this endpoint.
conEP->mRequestIO.SetRead();

// Call the app's callback function.
OnConnectionReceived(this, conEP, peerAddr, peerPort);
}
Expand Down
2 changes: 1 addition & 1 deletion src/inet/TCPEndPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ class DLL_EXPORT TCPEndPoint : public EndPointBasis

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
INET_ERROR GetSocket(IPAddressType addrType);
SocketEvents PrepareIO();
SocketEvents PrepareIO() const { return mRequestIO; }
void HandlePendingIO();
void ReceiveData();
void HandleIncomingConnection();
Expand Down
Loading

0 comments on commit 7eb4869

Please sign in to comment.