Skip to content

Commit

Permalink
[CBRD-24593] replace select () with poll () in the BROKER (#4011)
Browse files Browse the repository at this point in the history
replace select () with poll () to support more than 1,024 connections
  • Loading branch information
kisoo-han authored Dec 26, 2022
1 parent bb99ca0 commit 9b73dd6
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 71 deletions.
160 changes: 90 additions & 70 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
write_to_client(FD, (char*) &write_val, 4); \
} while (0)

#define JOB_COUNT_MAX 1000000
#define JOB_COUNT_MAX 130000000

/* num of collecting counts per monitoring interval */
#define NUM_COLLECT_COUNT_PER_INTVL 4
Expand Down Expand Up @@ -265,6 +265,8 @@ static int write_to_client (SOCKET sock_fd, char *buf, int size);
static int write_to_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec);
static int read_from_client (SOCKET sock_fd, char *buf, int size);
static int read_from_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec);
static int read_buffer_async (SOCKET sock_fd, char *buf, int size, int timeout);
static int write_buffer_async (SOCKET sock_fd, char *buf, int size, int timeout);
static int run_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
static int stop_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
static void restart_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
Expand Down Expand Up @@ -1420,50 +1422,31 @@ read_from_client (SOCKET sock_fd, char *buf, int size)
static int
read_from_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec)
{
int len = size;
int read_len;
#ifdef ASYNC_MODE
SELECT_MASK read_mask;
int nfound;
int maxfd;
struct timeval timeout_val, *timeout_ptr;

if (timeout_sec < 0)
{
timeout_ptr = NULL;
}
else
{
timeout_val.tv_sec = timeout_sec;
timeout_val.tv_usec = 0;
timeout_ptr = &timeout_val;
}
#endif

#ifdef ASYNC_MODE
FD_ZERO (&read_mask);
FD_SET (sock_fd, (fd_set *) (&read_mask));
maxfd = (int) sock_fd + 1;
nfound = select (maxfd, &read_mask, (SELECT_MASK *) 0, (SELECT_MASK *) 0, timeout_ptr);
if (nfound < 0)
if (IS_INVALID_SOCKET (sock_fd))
{
return -1;
}
#endif

#ifdef ASYNC_MODE
if (FD_ISSET (sock_fd, (fd_set *) (&read_mask)))
{
#endif
read_len = READ_FROM_SOCKET (sock_fd, buf, size);
#ifdef ASYNC_MODE
}
else
while (size > 0)
{
return -1;
read_len = read_buffer_async (sock_fd, buf, size, timeout_sec);
if (read_len <= 0)
{
return -1;
}

buf += read_len;
size -= read_len;
}
#else
read_len = READ_FROM_SOCKET (sock_fd, buf, size);
#endif

return read_len;
return len;
}

static int
Expand All @@ -1475,53 +1458,32 @@ write_to_client (SOCKET sock_fd, char *buf, int size)
static int
write_to_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec)
{
int write_len;
#ifdef ASYNC_MODE
SELECT_MASK write_mask;
int nfound;
int maxfd;
struct timeval timeout_val, *timeout_ptr;

if (timeout_sec < 0)
{
timeout_ptr = NULL;
}
else
{
timeout_val.tv_sec = timeout_sec;
timeout_val.tv_usec = 0;
timeout_ptr = &timeout_val;
}
#endif
int len = size;
int write_len = -1;

if (IS_INVALID_SOCKET (sock_fd))
return -1;

#ifdef ASYNC_MODE
FD_ZERO (&write_mask);
FD_SET (sock_fd, (fd_set *) (&write_mask));
maxfd = (int) sock_fd + 1;
nfound = select (maxfd, (SELECT_MASK *) 0, &write_mask, (SELECT_MASK *) 0, timeout_ptr);
if (nfound < 0)
{
return -1;
}
#endif

#ifdef ASYNC_MODE
if (FD_ISSET (sock_fd, (fd_set *) (&write_mask)))
while (size > 0)
{
#endif
write_len = WRITE_TO_SOCKET (sock_fd, buf, size);
#ifdef ASYNC_MODE
}
else
{
return -1;
write_len = write_buffer_async (sock_fd, buf, size, timeout_sec);

if (write_len <= 0)
{
return -1;
}

buf += write_len;
size -= write_len;
}
#else
len = WRITE_TO_SOCKET (sock_fd, buf, size);
#endif

return write_len;
return len;
}

/*
Expand Down Expand Up @@ -3367,3 +3329,61 @@ get_as_slow_log_filename (char *log_filename, int len, char *broker_name, T_APPL
log_filename[0] = '\0';
}
}

static int
read_buffer_async (SOCKET sock_fd, char *buf, int size, int timeout)
{
int read_len = -1;
struct pollfd po[1] = { {0, 0, 0} };
int ret;

po[0].fd = sock_fd;
po[0].events = POLLIN;

do
{
ret = poll (po, 1, timeout * 1000);
}
while (ret < 0 && errno == EINTR);

if (ret < 1) /* ERROR OR TIMEOUT */
{
return -1;
}

if (po[0].revents & POLLIN) /* RECEIVE NEW REQUEST */
{
read_len = READ_FROM_SOCKET (sock_fd, buf, size);
}

return read_len;
}

static int
write_buffer_async (SOCKET sock_fd, char *buf, int size, int timeout)
{
int write_len = -1;
struct pollfd po[1] = { {0, 0, 0} };
int ret;

po[0].fd = sock_fd;
po[0].events = POLLOUT;

do
{
ret = poll (po, 1, timeout * 1000);
}
while (ret < 0 && errno == EINTR);

if (ret < 1) /* ERROR OR TIMEOUT */
{
return -1;
}

if (po[0].revents & POLLOUT) /* RECEIVE NEW REQUEST */
{
write_len = WRITE_TO_SOCKET (sock_fd, buf, size);
}

return write_len;
}
2 changes: 1 addition & 1 deletion src/broker/broker_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@

#define APPL_NAME_LENGTH 128

#define JOB_QUEUE_MAX_SIZE 2048
#define JOB_QUEUE_MAX_SIZE 4096

#define MAX_CRYPT_STR_LENGTH 32

Expand Down

0 comments on commit 9b73dd6

Please sign in to comment.