Skip to content

Commit

Permalink
Add 'sendmmg' support for UDP by setting '-Z' and using '-b' burt size
Browse files Browse the repository at this point in the history
  • Loading branch information
davidBar-On committed Sep 18, 2021
1 parent 6f8918e commit 635e879
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 60 deletions.
4 changes: 4 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ AC_CHECK_FUNCS([sendfile])
# connections.
AC_CHECK_FUNCS([getline])

# Check for sendmmsg/recvmmsg support for better throughput.
AC_CHECK_FUNCS([sendmmsg recvmmsg],
AC_DEFINE([HAVE_SEND_RECVMMSG], [1], [Have sendmmsg/recvmmsg functions.]))

# Check for packet pacing socket option (Linux only for now).
AC_CACHE_CHECK([SO_MAX_PACING_RATE socket option],
[iperf3_cv_header_so_max_pacing_rate],
Expand Down
21 changes: 21 additions & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@
typedef uint64_t iperf_size_t;
#endif // __IPERF_API_H

#ifndef HAVE_SENDMMSG
struct dummy_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of received bytes for header */
};
#endif /* HAVE_SENDMMSG */

/* macro to calculate buffer sizes used for a stream (to support hse of send/recvmmsg) */
#define STREAM_BUFSIZE(prefix) (((prefix)->settings->blksize) * (((prefix)->settings->send_recvmmsg == 0 || (prefix)->settings->burst == 0)? 1 : (prefix)->settings->burst))

struct iperf_interval_results
{
iperf_size_t bytes_transferred; /* bytes transferred in this interval */
Expand Down Expand Up @@ -168,6 +178,7 @@ struct iperf_settings
#endif // HAVE_SSL
int connect_timeout; /* socket connection timeout, in ms */
int idle_timeout; /* server idle time timeout */
int send_recvmmsg; /* whether sendmmsg/recvmmsg should be used - mainly per the -Z option */
struct iperf_time rcv_timeout; /* Timeout for receiving messages in active mode, in us */
};

Expand Down Expand Up @@ -210,6 +221,16 @@ struct iperf_stream
int omitted_outoforder_packets;
int cnt_error;
int omitted_cnt_error;

#ifdef HAVE_SENDMMSG
struct mmsghdr *msg; /* For some reason, just struct and not a pointer failes on "field ‘msg’ has incomplete type" */
#else
struct dummy_mmsghdr *msg;
#endif /* HAVE_SENDMMSG */
struct iovec *pmsg_iov; /* Pointer to the array of iov_msg per mmsg message */
int sendmmsg_buffered_packets_count; /* number of buffered packets for sendmmsg */
char *pbuf; /* Pointer to current vailable space in buffer */

uint64_t target;

struct sockaddr_storage local_addr;
Expand Down
118 changes: 104 additions & 14 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
Expand Down Expand Up @@ -102,6 +103,7 @@ static int diskfile_recv(struct iperf_stream *sp);
static int JSON_write(int fd, cJSON *json);
static void print_interval_results(struct iperf_test *test, struct iperf_stream *sp, cJSON *json_interval_streams);
static cJSON *JSON_read(int fd);
void iperf_init_send_recvmmsg(struct iperf_test *test);


/*************************** Print usage functions ****************************/
Expand Down Expand Up @@ -1612,6 +1614,9 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
1 : round(test->settings->bitrate_limit_interval/test->stats_interval) );
}

/* Set whether send/recvmmsg is supported along with related settings */
iperf_init_send_recvmmsg(test);

/* Show warning if JSON output is used with explicit report format */
if ((test->json_output) && (test->settings->unit_format != 'a')) {
warning("Report format (-f) flag ignored with JSON output (-J)");
Expand Down Expand Up @@ -1742,10 +1747,10 @@ iperf_send(struct iperf_test *test, fd_set *write_setP)
SLIST_FOREACH(sp, &test->streams, streams) {
if ((sp->green_light && sp->sender &&
(write_setP == NULL || FD_ISSET(sp->socket, write_setP)))) {
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
break;
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
break;
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
break;
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
break;
if ((r = sp->snd(sp)) < 0) {
if (r == NET_SOFTERROR)
break;
Expand Down Expand Up @@ -1915,6 +1920,9 @@ iperf_exchange_parameters(struct iperf_test *test)

if (get_parameters(test) < 0)
return -1;

/* Set whether send/recvmmsg is supported along with related settings */
iperf_init_send_recvmmsg(test);

#if defined(HAVE_SSL)
if (test_is_authorized(test) < 0){
Expand Down Expand Up @@ -2048,6 +2056,8 @@ send_parameters(struct iperf_test *test)
cJSON_AddNumberToObject(j, "udp_counters_64bit", iperf_get_test_udp_counters_64bit(test));
if (test->repeating_payload)
cJSON_AddNumberToObject(j, "repeating_payload", test->repeating_payload);
if (test->zerocopy)
cJSON_AddNumberToObject(j, "zerocopy", test->zerocopy);
#if defined(HAVE_DONT_FRAGMENT)
if (test->settings->dont_fragment)
cJSON_AddNumberToObject(j, "dont_fragment", test->settings->dont_fragment);
Expand Down Expand Up @@ -2160,6 +2170,8 @@ get_parameters(struct iperf_test *test)
iperf_set_test_udp_counters_64bit(test, 1);
if ((j_p = cJSON_GetObjectItem(j, "repeating_payload")) != NULL)
test->repeating_payload = 1;
if ((j_p = cJSON_GetObjectItem(j, "zerocopy")) != NULL)
test->zerocopy = 1;
#if defined(HAVE_DONT_FRAGMENT)
if ((j_p = cJSON_GetObjectItem(j, "dont_fragment")) != NULL)
test->settings->dont_fragment = j_p->valueint;
Expand Down Expand Up @@ -2677,6 +2689,8 @@ iperf_defaults(struct iperf_test *testp)
testp->settings->rcv_timeout.secs = DEFAULT_NO_MSG_RCVD_TIMEOUT / SEC_TO_mS;
testp->settings->rcv_timeout.usecs = (DEFAULT_NO_MSG_RCVD_TIMEOUT % SEC_TO_mS) * mS_TO_US;

testp->zerocopy = 0;
testp->settings->send_recvmmsg = 0;
memset(testp->cookie, 0, COOKIE_SIZE);

testp->multisend = 10; /* arbitrary */
Expand Down Expand Up @@ -2963,6 +2977,8 @@ iperf_reset_test(struct iperf_test *test)
test->settings->mss = 0;
test->settings->tos = 0;
test->settings->dont_fragment = 0;
test->zerocopy = 0;
test->settings->send_recvmmsg = 0;

#if defined(HAVE_SSL)
if (test->settings->authtoken) {
Expand Down Expand Up @@ -3024,6 +3040,8 @@ iperf_reset_stats(struct iperf_test *test)
sp->omitted_cnt_error = sp->cnt_error;
sp->omitted_outoforder_packets = sp->outoforder_packets;
sp->jitter = 0;
sp->sendmmsg_buffered_packets_count = 0;
sp->pbuf = sp->buffer;
rp = sp->result;
rp->bytes_sent_omit = rp->bytes_sent;
rp->bytes_received = 0;
Expand Down Expand Up @@ -3991,9 +4009,15 @@ void
iperf_free_stream(struct iperf_stream *sp)
{
struct iperf_interval_results *irp, *nirp;
int r;

/* XXX: need to free interval list too! */
munmap(sp->buffer, sp->test->settings->blksize);
r = munmap(sp->buffer, STREAM_BUFSIZE(sp->test));
if (r < 0)
printf("Failed to release stream buffer at %p with size %d - errno=%d: %s)\n", sp->buffer, STREAM_BUFSIZE(sp->test), errno, strerror(errno));
else if (sp->test->debug)
printf("Successfully released stream buffer at %p with size %d\n", sp->buffer, STREAM_BUFSIZE(sp->test));

close(sp->buffer_fd);
if (sp->diskfile_fd >= 0)
close(sp->diskfile_fd);
Expand All @@ -4004,6 +4028,11 @@ iperf_free_stream(struct iperf_stream *sp)
free(sp->result);
if (sp->send_timer != NULL)
tmr_cancel(sp->send_timer);

if (sp->pmsg_iov != NULL)
free(sp->pmsg_iov);
if (sp->msg != NULL)
free(sp->msg);
free(sp);
}

Expand All @@ -4013,6 +4042,9 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)
{
struct iperf_stream *sp;
int ret = 0;
int burst;
int i;
char *pbuf;

char template[1024];
if (test->tmp_template) {
Expand Down Expand Up @@ -4067,19 +4099,50 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)
free(sp);
return NULL;
}
if (ftruncate(sp->buffer_fd, test->settings->blksize) < 0) {
if (ftruncate(sp->buffer_fd, STREAM_BUFSIZE(sp)) < 0) {
i_errno = IECREATESTREAM;
free(sp->result);
free(sp);
return NULL;
}
sp->buffer = (char *) mmap(NULL, test->settings->blksize, PROT_READ|PROT_WRITE, MAP_PRIVATE, sp->buffer_fd, 0);

sp->buffer = (char *) mmap(NULL, STREAM_BUFSIZE(sp), PROT_READ|PROT_WRITE, MAP_PRIVATE, sp->buffer_fd, 0);
if (sp->buffer == MAP_FAILED) {
i_errno = IECREATESTREAM;
free(sp->result);
free(sp);
return NULL;
}
else if (sp->test->debug)
printf("Successfully allocated stream buffer at addr %p with length %d\n", sp->buffer, STREAM_BUFSIZE(sp));

/* Allocate and initialize pointers for the messages in the buffer for UDP multi-msg send/receive */
if (test->protocol->id == Pudp) {
burst = (sp->settings->send_recvmmsg == 0)? 1 : sp->settings->burst;
sp->msg = malloc(burst * sizeof(*sp->msg));
if (sp->msg == NULL) {
i_errno = IECREATESTREAM;
iperf_free_stream(sp);
return NULL;
}
memset(sp->msg, 0, burst * sizeof(*sp->msg));

sp->pmsg_iov = malloc(burst * sizeof(struct iovec));
if (sp->pmsg_iov == NULL) {
i_errno = IECREATESTREAM;
iperf_free_stream(sp);
return NULL;
}
memset(sp->pmsg_iov, 0, burst * sizeof(struct iovec));

for (i = 0, pbuf = sp->buffer; i < burst; i++, pbuf += sp->settings->blksize) {
sp->msg[i].msg_hdr.msg_iov = &sp->pmsg_iov[i];
sp->msg[i].msg_hdr.msg_iov->iov_base = pbuf;
sp->msg[i].msg_hdr.msg_iov->iov_len = sp->settings->blksize;
sp->msg[i].msg_hdr.msg_iovlen = 1;
}
}

sp->pending_size = 0;

/* Set socket */
Expand All @@ -4088,11 +4151,14 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)
sp->snd = test->protocol->send;
sp->rcv = test->protocol->recv;

sp->sendmmsg_buffered_packets_count = 0;
sp->pbuf = sp->buffer;

if (test->diskfile_name != (char*) 0) {
sp->diskfile_fd = open(test->diskfile_name, sender ? O_RDONLY : (O_WRONLY|O_CREAT|O_TRUNC), S_IRUSR|S_IWUSR);
if (sp->diskfile_fd == -1) {
i_errno = IEFILE;
munmap(sp->buffer, sp->test->settings->blksize);
munmap(sp->buffer, STREAM_BUFSIZE(sp));
free(sp->result);
free(sp);
return NULL;
Expand All @@ -4106,13 +4172,13 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)

/* Initialize stream */
if (test->repeating_payload)
fill_with_repeating_pattern(sp->buffer, test->settings->blksize);
fill_with_repeating_pattern(sp->buffer, STREAM_BUFSIZE(sp));
else
ret = readentropy(sp->buffer, test->settings->blksize);
ret = readentropy(sp->buffer, STREAM_BUFSIZE(sp));

if ((ret < 0) || (iperf_init_stream(sp, test) < 0)) {
close(sp->buffer_fd);
munmap(sp->buffer, sp->test->settings->blksize);
munmap(sp->buffer, STREAM_BUFSIZE(sp));
free(sp->result);
free(sp);
return NULL;
Expand Down Expand Up @@ -4241,7 +4307,7 @@ diskfile_send(struct iperf_stream *sp)

/* if needed, read enough data from the disk to fill up the buffer */
if (sp->diskfile_left < sp->test->settings->blksize && !sp->test->done) {
r = read(sp->diskfile_fd, sp->buffer, sp->test->settings->blksize -
r = read(sp->diskfile_fd, sp->buffer, STREAM_BUFSIZE(sp) -
sp->diskfile_left);
buffer_left += r;
rtot += r;
Expand All @@ -4251,8 +4317,8 @@ diskfile_send(struct iperf_stream *sp)

// If the buffer doesn't contain a full buffer at this point,
// adjust the size of the data to send.
if (buffer_left != sp->test->settings->blksize) {
if (sp->test->debug)
if (buffer_left != STREAM_BUFSIZE(sp)) {
if (sp->test->debug)
printf("possible eof\n");
// setting data size to be sent,
// which is less than full block/buffer size
Expand Down Expand Up @@ -4651,3 +4717,27 @@ iflush(struct iperf_test *test)
{
return fflush(test->outfile);
}

/*
* Initialize whether sed/recvmmsg is supported for the test,
* along with initializing other related settings.
*/
void
iperf_init_send_recvmmsg(struct iperf_test *test)
{
#ifdef HAVE_SENDMMSG
/* Set UDP `send_recvmmsg` from `zerocopy` settings (not used for disk file) */
if (test->protocol->id == Pudp && test->zerocopy && test->diskfile_name == (char *)0) {
test->settings->send_recvmmsg = 1;
/* Ensure non-zero burst number of packets when sendmmsg/recvmmsg are used */
if (test->settings->burst == 0)
test->settings->burst = 1;
#ifdef UIO_MAXIOV
/* Ensure burst size is appropriate for sendmmsg() */
else if (test->settings->burst > UIO_MAXIOV)
test->settings->burst = UIO_MAXIOV;
#endif /* UIO_MAXIOV */
}
#endif /* HAVE_SENDMMSG */
return;
}
6 changes: 5 additions & 1 deletion src/iperf_locale.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ const char usage_longstr[] = "Usage: iperf3 [-s|-c host] [options]\n"
#if defined(HAVE_FLOWLABEL)
" -L, --flowlabel N set the IPv6 flow label (only supported on Linux)\n"
#endif /* HAVE_FLOWLABEL */
" -Z, --zerocopy use a 'zero copy' method of sending data\n"
" -Z, --zerocopy TCP: use a 'zero copy' method of sending data\n"
#if defined(HAVE_SENDMMSG)
" UDP: use `sendmmsg` to send `-b` bust number of messages;\n"
" (not supported when reading from a file).\n"
#endif /* HAVE_SENDMMSG */
" -O, --omit N omit the first n seconds\n"
" -T, --title str prefix every output line with this string\n"
" --extra-data str data string to include in client and server JSON\n"
Expand Down
Loading

0 comments on commit 635e879

Please sign in to comment.