Skip to content

Commit

Permalink
Reduce complexity of network_event_thread()
Browse files Browse the repository at this point in the history
in order to improve readability and reduce complexity,
network_event_thread uses following subfuncions
- setting UDP/TCP sockets
- receiving UDP/TCP messages

Change-Id: Idd4642ca9d83944e008a130b336378b7a6f38c36
Signed-off-by: Rami Jung <[email protected]>
Reviewed-on: https://gerrit.iotivity.org/gerrit/27529
Tested-by: IoTivity Jenkins <[email protected]>
Reviewed-by: Kishen Maloor <[email protected]>
  • Loading branch information
RamiJung authored and kmaloor committed Nov 27, 2018
1 parent c7d1754 commit f335a34
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 128 deletions.
210 changes: 109 additions & 101 deletions port/linux/ipadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -760,20 +760,9 @@ recv_msg(int sock, uint8_t *recv_buf, int recv_buf_size,
return ret;
}

static void *
network_event_thread(void *data)
static void
oc_udp_add_socks_to_fd_set(ip_context_t *dev)
{
ip_context_t *dev = (ip_context_t *)data;

fd_set setfds;
FD_ZERO(&dev->rfds);
/* Monitor network interface changes on the platform from only the 0th logical
* device
*/
if (dev->device == 0) {
FD_SET(ifchange_sock, &dev->rfds);
}
FD_SET(dev->shutdown_pipe[0], &dev->rfds);
FD_SET(dev->server_sock, &dev->rfds);
FD_SET(dev->mcast_sock, &dev->rfds);
#ifdef OC_SECURITY
Expand All @@ -787,7 +776,107 @@ network_event_thread(void *data)
FD_SET(dev->secure4_sock, &dev->rfds);
#endif /* OC_SECURITY */
#endif /* OC_IPV4 */
}

static adapter_receive_state_t
oc_udp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
{
if (FD_ISSET(dev->server_sock, fds)) {
int count = recv_msg(dev->server_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6;
FD_CLR(dev->server_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}

if (FD_ISSET(dev->mcast_sock, fds)) {
int count = recv_msg(dev->mcast_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6 | MULTICAST;
FD_CLR(dev->mcast_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}

#ifdef OC_IPV4
if (FD_ISSET(dev->server4_sock, fds)) {
int count = recv_msg(dev->server4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4;
FD_CLR(dev->server4_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}

if (FD_ISSET(dev->mcast4_sock, fds)) {
int count = recv_msg(dev->mcast4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4 | MULTICAST;
FD_CLR(dev->mcast4_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
#endif /* OC_IPV4 */

#ifdef OC_SECURITY
if (FD_ISSET(dev->secure_sock, fds)) {
int count = recv_msg(dev->secure_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6 | SECURED;
FD_CLR(dev->secure_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
#ifdef OC_IPV4
if (FD_ISSET(dev->secure4_sock, fds)) {
int count = recv_msg(dev->secure4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4 | SECURED;
FD_CLR(dev->secure4_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
#endif /* OC_IPV4 */
#endif /* OC_SECURITY */

return ADAPTER_STATUS_NONE;
}

static void *
network_event_thread(void *data)
{
ip_context_t *dev = (ip_context_t *)data;

fd_set setfds;
FD_ZERO(&dev->rfds);
/* Monitor network interface changes on the platform from only the 0th logical
* device
*/
if (dev->device == 0) {
FD_SET(ifchange_sock, &dev->rfds);
}
FD_SET(dev->shutdown_pipe[0], &dev->rfds);

oc_udp_add_socks_to_fd_set(dev);
#ifdef OC_TCP
oc_tcp_add_socks_to_fd_set(dev);
#endif /* OC_TCP */
Expand Down Expand Up @@ -829,101 +918,20 @@ network_event_thread(void *data)

message->endpoint.device = dev->device;

if (FD_ISSET(dev->server_sock, &setfds)) {
int count = recv_msg(dev->server_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6;
FD_CLR(dev->server_sock, &setfds);
goto common;
}

if (FD_ISSET(dev->mcast_sock, &setfds)) {
int count = recv_msg(dev->mcast_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6 | MULTICAST;
FD_CLR(dev->mcast_sock, &setfds);
goto common;
}

#ifdef OC_IPV4
if (FD_ISSET(dev->server4_sock, &setfds)) {
int count = recv_msg(dev->server4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4;
FD_CLR(dev->server4_sock, &setfds);
goto common;
}

if (FD_ISSET(dev->mcast4_sock, &setfds)) {
int count = recv_msg(dev->mcast4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4 | MULTICAST;
FD_CLR(dev->mcast4_sock, &setfds);
goto common;
}
#endif /* OC_IPV4 */

#ifdef OC_SECURITY
if (FD_ISSET(dev->secure_sock, &setfds)) {
int count = recv_msg(dev->secure_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6 | SECURED;
FD_CLR(dev->secure_sock, &setfds);
if (oc_udp_receive_message(dev, &setfds, message) ==
ADAPTER_STATUS_RECEIVE) {
goto common;
}
#ifdef OC_IPV4
if (FD_ISSET(dev->secure4_sock, &setfds)) {
int count = recv_msg(dev->secure4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4 | SECURED;
FD_CLR(dev->secure4_sock, &setfds);
goto common;
}
#endif /* OC_IPV4 */
#endif /* OC_SECURITY */

#ifdef OC_TCP
tcp_receive_state_t tcp_status = oc_tcp_receive_message(dev,
&setfds,
message);
if (tcp_status == TCP_STATUS_RECEIVE) {
if (oc_tcp_receive_message(dev, &setfds, message) ==
ADAPTER_STATUS_RECEIVE) {
goto common;
} else {
oc_message_unref(message);
continue;
}
#endif /* OC_TCP */

oc_message_unref(message);
continue;

common:
#ifdef OC_DEBUG
PRINT("Incoming message of size %d bytes from ", message->length);
Expand Down
10 changes: 9 additions & 1 deletion port/linux/ipcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ extern "C"
{
#endif

typedef enum {
ADAPTER_STATUS_NONE = 0, /* Nothing happens */
ADAPTER_STATUS_ACCEPT, /* Receiving no meaningful data */
ADAPTER_STATUS_RECEIVE, /* Receiving meaningful data */
ADAPTER_STATUS_ERROR /* Error */
} adapter_receive_state_t;

#ifdef OC_TCP
typedef struct tcp_context_t {
typedef struct tcp_context_t
{
struct sockaddr_storage server;
int server_sock;
uint16_t port;
Expand Down
34 changes: 17 additions & 17 deletions port/linux/tcpadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ get_total_length_from_header(oc_message_t *message, oc_endpoint_t *endpoint)
return total_length;
}

tcp_receive_state_t
adapter_receive_state_t
oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
{
pthread_mutex_lock(&dev->tcp.mutex);
Expand All @@ -309,62 +309,62 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
ret = status; \
goto oc_tcp_receive_message_done

tcp_receive_state_t ret = TCP_STATUS_ERROR;
adapter_receive_state_t ret = ADAPTER_STATUS_ERROR;
message->endpoint.device = dev->device;

if (FD_ISSET(dev->tcp.server_sock, fds)) {
message->endpoint.flags = IPV6 | TCP;
if (accept_new_session(dev, dev->tcp.server_sock, fds, &message->endpoint) <
0) {
OC_ERR("accept new session fail");
ret_with_code(TCP_STATUS_ERROR);
ret_with_code(ADAPTER_STATUS_ERROR);
}
ret_with_code(TCP_STATUS_ACCEPT);
ret_with_code(ADAPTER_STATUS_ACCEPT);
#ifdef OC_SECURITY
} else if (FD_ISSET(dev->tcp.secure_sock, fds)) {
message->endpoint.flags = IPV6 | SECURED | TCP;
if (accept_new_session(dev, dev->tcp.secure_sock, fds, &message->endpoint) <
0) {
OC_ERR("accept new session fail");
ret_with_code(TCP_STATUS_ERROR);
ret_with_code(ADAPTER_STATUS_ERROR);
}
ret_with_code(TCP_STATUS_ACCEPT);
ret_with_code(ADAPTER_STATUS_ACCEPT);
#endif /* OC_SECURITY */
#ifdef OC_IPV4
} else if (FD_ISSET(dev->tcp.server4_sock, fds)) {
message->endpoint.flags = IPV4 | TCP;
if (accept_new_session(dev, dev->tcp.server4_sock, fds,
&message->endpoint) < 0) {
OC_ERR("accept new session fail");
ret_with_code(TCP_STATUS_ERROR);
ret_with_code(ADAPTER_STATUS_ERROR);
}
ret_with_code(TCP_STATUS_ACCEPT);
ret_with_code(ADAPTER_STATUS_ACCEPT);
#ifdef OC_SECURITY
} else if (FD_ISSET(dev->tcp.secure4_sock, fds)) {
message->endpoint.flags = IPV4 | SECURED | TCP;
if (accept_new_session(dev, dev->tcp.secure4_sock, fds,
&message->endpoint) < 0) {
OC_ERR("accept new session fail");
ret_with_code(TCP_STATUS_ERROR);
ret_with_code(ADAPTER_STATUS_ERROR);
}
ret_with_code(TCP_STATUS_ACCEPT);
ret_with_code(ADAPTER_STATUS_ACCEPT);
#endif /* OC_SECURITY */
#endif /* OC_IPV4 */
} else if (FD_ISSET(dev->tcp.connect_pipe[0], fds)) {
ssize_t len = read(dev->tcp.connect_pipe[0], message->data, OC_PDU_SIZE);
if (len < 0) {
OC_ERR("read error! %d", errno);
ret_with_code(TCP_STATUS_ERROR);
ret_with_code(ADAPTER_STATUS_ERROR);
}
FD_CLR(dev->tcp.connect_pipe[0], fds);
ret_with_code(TCP_STATUS_NONE);
ret_with_code(ADAPTER_STATUS_NONE);
}

// find session.
tcp_session_t *session = get_ready_to_read_session(fds);
if (!session) {
OC_DBG("could not find TCP session socket in fd set");
ret_with_code(TCP_STATUS_NONE);
ret_with_code(ADAPTER_STATUS_NONE);
}

// receive message.
Expand All @@ -379,13 +379,13 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)

free_tcp_session(session);

ret_with_code(TCP_STATUS_ERROR);
ret_with_code(ADAPTER_STATUS_ERROR);
} else if (count == 0) {
OC_DBG("peer closed TCP session\n");

free_tcp_session(session);

ret_with_code(TCP_STATUS_NONE);
ret_with_code(ADAPTER_STATUS_NONE);
}

OC_DBG("recv(): %d bytes.", count);
Expand All @@ -399,7 +399,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
OC_ERR("total receive length(%ld) is bigger than max pdu size(%ld)",
total_length, (OC_MAX_APP_DATA_SIZE + COAP_MAX_HEADER_SIZE));
OC_ERR("It may occur buffer overflow.");
ret_with_code(TCP_STATUS_ERROR);
ret_with_code(ADAPTER_STATUS_ERROR);
}
OC_DBG("tcp packet total length : %ld bytes.", total_length);

Expand All @@ -410,7 +410,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
memcpy(&message->endpoint, &session->endpoint, sizeof(oc_endpoint_t));

FD_CLR(session->sock, fds);
ret = TCP_STATUS_RECEIVE;
ret = ADAPTER_STATUS_RECEIVE;

oc_tcp_receive_message_done:
pthread_mutex_unlock(&dev->tcp.mutex);
Expand Down
Loading

0 comments on commit f335a34

Please sign in to comment.