Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sendmmsg support for UDP #1034

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ AC_CHECK_FUNCS([sendfile])
# connections.
AC_CHECK_FUNCS([getline])

# Check for sendmmsg/recvmmsg support for better throughput.
AC_CHECK_FUNCS([sendmmsg recvmmsg],
AC_DEFINE([HAVE_SEND_RECVMMSG], [1], [Have sendmmsg/recvmmsg functions.]))

# Check for packet pacing socket option (Linux only for now).
AC_CACHE_CHECK([SO_MAX_PACING_RATE socket option],
[iperf3_cv_header_so_max_pacing_rate],
Expand Down
21 changes: 21 additions & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@
typedef uint64_t iperf_size_t;
#endif // __IPERF_API_H

#ifndef HAVE_SENDMMSG
struct dummy_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of received bytes for header */
};
#endif /* HAVE_SENDMMSG */

/* macro to calculate buffer sizes used for a stream (to support hse of send/recvmmsg) */
#define STREAM_BUFSIZE(prefix) (((prefix)->settings->blksize) * (((prefix)->settings->send_recvmmsg == 0 || (prefix)->settings->burst == 0)? 1 : (prefix)->settings->burst))

struct iperf_interval_results
{
iperf_size_t bytes_transferred; /* bytes transferred in this interval */
Expand Down Expand Up @@ -168,6 +178,7 @@ struct iperf_settings
#endif // HAVE_SSL
int connect_timeout; /* socket connection timeout, in ms */
int idle_timeout; /* server idle time timeout */
int send_recvmmsg; /* whether sendmmsg/recvmmsg should be used - mainly per the -Z option */
struct iperf_time rcv_timeout; /* Timeout for receiving messages in active mode, in us */
};

Expand Down Expand Up @@ -210,6 +221,16 @@ struct iperf_stream
int omitted_outoforder_packets;
int cnt_error;
int omitted_cnt_error;

#ifdef HAVE_SENDMMSG
struct mmsghdr *msg; /* For some reason, just struct and not a pointer failes on "field ‘msg’ has incomplete type" */
#else
struct dummy_mmsghdr *msg;
#endif /* HAVE_SENDMMSG */
struct iovec *pmsg_iov; /* Pointer to the array of iov_msg per mmsg message */
int sendmmsg_buffered_packets_count; /* number of buffered packets for sendmmsg */
char *pbuf; /* Pointer to current vailable space in buffer */

uint64_t target;

struct sockaddr_storage local_addr;
Expand Down
118 changes: 104 additions & 14 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
Expand Down Expand Up @@ -102,6 +103,7 @@ static int diskfile_recv(struct iperf_stream *sp);
static int JSON_write(int fd, cJSON *json);
static void print_interval_results(struct iperf_test *test, struct iperf_stream *sp, cJSON *json_interval_streams);
static cJSON *JSON_read(int fd);
void iperf_init_send_recvmmsg(struct iperf_test *test);


/*************************** Print usage functions ****************************/
Expand Down Expand Up @@ -1612,6 +1614,9 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
1 : round(test->settings->bitrate_limit_interval/test->stats_interval) );
}

/* Set whether send/recvmmsg is supported along with related settings */
iperf_init_send_recvmmsg(test);

/* Show warning if JSON output is used with explicit report format */
if ((test->json_output) && (test->settings->unit_format != 'a')) {
warning("Report format (-f) flag ignored with JSON output (-J)");
Expand Down Expand Up @@ -1742,10 +1747,10 @@ iperf_send(struct iperf_test *test, fd_set *write_setP)
SLIST_FOREACH(sp, &test->streams, streams) {
if ((sp->green_light && sp->sender &&
(write_setP == NULL || FD_ISSET(sp->socket, write_setP)))) {
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
break;
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
break;
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
break;
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
break;
if ((r = sp->snd(sp)) < 0) {
if (r == NET_SOFTERROR)
break;
Expand Down Expand Up @@ -1915,6 +1920,9 @@ iperf_exchange_parameters(struct iperf_test *test)

if (get_parameters(test) < 0)
return -1;

/* Set whether send/recvmmsg is supported along with related settings */
iperf_init_send_recvmmsg(test);

#if defined(HAVE_SSL)
if (test_is_authorized(test) < 0){
Expand Down Expand Up @@ -2048,6 +2056,8 @@ send_parameters(struct iperf_test *test)
cJSON_AddNumberToObject(j, "udp_counters_64bit", iperf_get_test_udp_counters_64bit(test));
if (test->repeating_payload)
cJSON_AddNumberToObject(j, "repeating_payload", test->repeating_payload);
if (test->zerocopy)
cJSON_AddNumberToObject(j, "zerocopy", test->zerocopy);
#if defined(HAVE_DONT_FRAGMENT)
if (test->settings->dont_fragment)
cJSON_AddNumberToObject(j, "dont_fragment", test->settings->dont_fragment);
Expand Down Expand Up @@ -2160,6 +2170,8 @@ get_parameters(struct iperf_test *test)
iperf_set_test_udp_counters_64bit(test, 1);
if ((j_p = cJSON_GetObjectItem(j, "repeating_payload")) != NULL)
test->repeating_payload = 1;
if ((j_p = cJSON_GetObjectItem(j, "zerocopy")) != NULL)
test->zerocopy = 1;
#if defined(HAVE_DONT_FRAGMENT)
if ((j_p = cJSON_GetObjectItem(j, "dont_fragment")) != NULL)
test->settings->dont_fragment = j_p->valueint;
Expand Down Expand Up @@ -2677,6 +2689,8 @@ iperf_defaults(struct iperf_test *testp)
testp->settings->rcv_timeout.secs = DEFAULT_NO_MSG_RCVD_TIMEOUT / SEC_TO_mS;
testp->settings->rcv_timeout.usecs = (DEFAULT_NO_MSG_RCVD_TIMEOUT % SEC_TO_mS) * mS_TO_US;

testp->zerocopy = 0;
testp->settings->send_recvmmsg = 0;
memset(testp->cookie, 0, COOKIE_SIZE);

testp->multisend = 10; /* arbitrary */
Expand Down Expand Up @@ -2963,6 +2977,8 @@ iperf_reset_test(struct iperf_test *test)
test->settings->mss = 0;
test->settings->tos = 0;
test->settings->dont_fragment = 0;
test->zerocopy = 0;
test->settings->send_recvmmsg = 0;

#if defined(HAVE_SSL)
if (test->settings->authtoken) {
Expand Down Expand Up @@ -3024,6 +3040,8 @@ iperf_reset_stats(struct iperf_test *test)
sp->omitted_cnt_error = sp->cnt_error;
sp->omitted_outoforder_packets = sp->outoforder_packets;
sp->jitter = 0;
sp->sendmmsg_buffered_packets_count = 0;
sp->pbuf = sp->buffer;
rp = sp->result;
rp->bytes_sent_omit = rp->bytes_sent;
rp->bytes_received = 0;
Expand Down Expand Up @@ -3991,9 +4009,15 @@ void
iperf_free_stream(struct iperf_stream *sp)
{
struct iperf_interval_results *irp, *nirp;
int r;

/* XXX: need to free interval list too! */
munmap(sp->buffer, sp->test->settings->blksize);
r = munmap(sp->buffer, STREAM_BUFSIZE(sp->test));
if (r < 0)
printf("Failed to release stream buffer at %p with size %d - errno=%d: %s)\n", sp->buffer, STREAM_BUFSIZE(sp->test), errno, strerror(errno));
else if (sp->test->debug)
printf("Successfully released stream buffer at %p with size %d\n", sp->buffer, STREAM_BUFSIZE(sp->test));

close(sp->buffer_fd);
if (sp->diskfile_fd >= 0)
close(sp->diskfile_fd);
Expand All @@ -4004,6 +4028,11 @@ iperf_free_stream(struct iperf_stream *sp)
free(sp->result);
if (sp->send_timer != NULL)
tmr_cancel(sp->send_timer);

if (sp->pmsg_iov != NULL)
free(sp->pmsg_iov);
if (sp->msg != NULL)
free(sp->msg);
free(sp);
}

Expand All @@ -4013,6 +4042,9 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)
{
struct iperf_stream *sp;
int ret = 0;
int burst;
int i;
char *pbuf;

char template[1024];
if (test->tmp_template) {
Expand Down Expand Up @@ -4067,19 +4099,50 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)
free(sp);
return NULL;
}
if (ftruncate(sp->buffer_fd, test->settings->blksize) < 0) {
if (ftruncate(sp->buffer_fd, STREAM_BUFSIZE(sp)) < 0) {
i_errno = IECREATESTREAM;
free(sp->result);
free(sp);
return NULL;
}
sp->buffer = (char *) mmap(NULL, test->settings->blksize, PROT_READ|PROT_WRITE, MAP_PRIVATE, sp->buffer_fd, 0);

sp->buffer = (char *) mmap(NULL, STREAM_BUFSIZE(sp), PROT_READ|PROT_WRITE, MAP_PRIVATE, sp->buffer_fd, 0);
if (sp->buffer == MAP_FAILED) {
i_errno = IECREATESTREAM;
free(sp->result);
free(sp);
return NULL;
}
else if (sp->test->debug)
printf("Successfully allocated stream buffer at addr %p with length %d\n", sp->buffer, STREAM_BUFSIZE(sp));

/* Allocate and initialize pointers for the messages in the buffer for UDP multi-msg send/receive */
if (test->protocol->id == Pudp) {
burst = (sp->settings->send_recvmmsg == 0)? 1 : sp->settings->burst;
sp->msg = malloc(burst * sizeof(*sp->msg));
if (sp->msg == NULL) {
i_errno = IECREATESTREAM;
iperf_free_stream(sp);
return NULL;
}
memset(sp->msg, 0, burst * sizeof(*sp->msg));

sp->pmsg_iov = malloc(burst * sizeof(struct iovec));
if (sp->pmsg_iov == NULL) {
i_errno = IECREATESTREAM;
iperf_free_stream(sp);
return NULL;
}
memset(sp->pmsg_iov, 0, burst * sizeof(struct iovec));

for (i = 0, pbuf = sp->buffer; i < burst; i++, pbuf += sp->settings->blksize) {
sp->msg[i].msg_hdr.msg_iov = &sp->pmsg_iov[i];
sp->msg[i].msg_hdr.msg_iov->iov_base = pbuf;
sp->msg[i].msg_hdr.msg_iov->iov_len = sp->settings->blksize;
sp->msg[i].msg_hdr.msg_iovlen = 1;
}
}

sp->pending_size = 0;

/* Set socket */
Expand All @@ -4088,11 +4151,14 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)
sp->snd = test->protocol->send;
sp->rcv = test->protocol->recv;

sp->sendmmsg_buffered_packets_count = 0;
sp->pbuf = sp->buffer;

if (test->diskfile_name != (char*) 0) {
sp->diskfile_fd = open(test->diskfile_name, sender ? O_RDONLY : (O_WRONLY|O_CREAT|O_TRUNC), S_IRUSR|S_IWUSR);
if (sp->diskfile_fd == -1) {
i_errno = IEFILE;
munmap(sp->buffer, sp->test->settings->blksize);
munmap(sp->buffer, STREAM_BUFSIZE(sp));
free(sp->result);
free(sp);
return NULL;
Expand All @@ -4106,13 +4172,13 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)

/* Initialize stream */
if (test->repeating_payload)
fill_with_repeating_pattern(sp->buffer, test->settings->blksize);
fill_with_repeating_pattern(sp->buffer, STREAM_BUFSIZE(sp));
else
ret = readentropy(sp->buffer, test->settings->blksize);
ret = readentropy(sp->buffer, STREAM_BUFSIZE(sp));

if ((ret < 0) || (iperf_init_stream(sp, test) < 0)) {
close(sp->buffer_fd);
munmap(sp->buffer, sp->test->settings->blksize);
munmap(sp->buffer, STREAM_BUFSIZE(sp));
free(sp->result);
free(sp);
return NULL;
Expand Down Expand Up @@ -4241,7 +4307,7 @@ diskfile_send(struct iperf_stream *sp)

/* if needed, read enough data from the disk to fill up the buffer */
if (sp->diskfile_left < sp->test->settings->blksize && !sp->test->done) {
r = read(sp->diskfile_fd, sp->buffer, sp->test->settings->blksize -
r = read(sp->diskfile_fd, sp->buffer, STREAM_BUFSIZE(sp) -
sp->diskfile_left);
buffer_left += r;
rtot += r;
Expand All @@ -4251,8 +4317,8 @@ diskfile_send(struct iperf_stream *sp)

// If the buffer doesn't contain a full buffer at this point,
// adjust the size of the data to send.
if (buffer_left != sp->test->settings->blksize) {
if (sp->test->debug)
if (buffer_left != STREAM_BUFSIZE(sp)) {
if (sp->test->debug)
printf("possible eof\n");
// setting data size to be sent,
// which is less than full block/buffer size
Expand Down Expand Up @@ -4651,3 +4717,27 @@ iflush(struct iperf_test *test)
{
return fflush(test->outfile);
}

/*
* Initialize whether sed/recvmmsg is supported for the test,
* along with initializing other related settings.
*/
void
iperf_init_send_recvmmsg(struct iperf_test *test)
{
#ifdef HAVE_SENDMMSG
/* Set UDP `send_recvmmsg` from `zerocopy` settings (not used for disk file) */
if (test->protocol->id == Pudp && test->zerocopy && test->diskfile_name == (char *)0) {
test->settings->send_recvmmsg = 1;
/* Ensure non-zero burst number of packets when sendmmsg/recvmmsg are used */
if (test->settings->burst == 0)
test->settings->burst = 1;
#ifdef UIO_MAXIOV
/* Ensure burst size is appropriate for sendmmsg() */
else if (test->settings->burst > UIO_MAXIOV)
test->settings->burst = UIO_MAXIOV;
#endif /* UIO_MAXIOV */
}
#endif /* HAVE_SENDMMSG */
return;
}
6 changes: 5 additions & 1 deletion src/iperf_locale.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ const char usage_longstr[] = "Usage: iperf3 [-s|-c host] [options]\n"
#if defined(HAVE_FLOWLABEL)
" -L, --flowlabel N set the IPv6 flow label (only supported on Linux)\n"
#endif /* HAVE_FLOWLABEL */
" -Z, --zerocopy use a 'zero copy' method of sending data\n"
" -Z, --zerocopy TCP: use a 'zero copy' method of sending data\n"
#if defined(HAVE_SENDMMSG)
" UDP: use `sendmmsg` to send `-b` bust number of messages;\n"
" (not supported when reading from a file).\n"
#endif /* HAVE_SENDMMSG */
" -O, --omit N omit the first n seconds\n"
" -T, --title str prefix every output line with this string\n"
" --extra-data str data string to include in client and server JSON\n"
Expand Down
Loading