Skip to content

Commit

Permalink
UDP: Introduce an experimental (undocumented for now) public API for …
Browse files Browse the repository at this point in the history
…UDP. (#1838)

This exposes the UDP methods as nng_ methods, and adds support for Multicast Membership,
which is useful in a variety of situations.

No documentation is provided, and applications should consider thios API experimental.
  • Loading branch information
gdamore authored Jun 3, 2024
1 parent 890d489 commit 603282f
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 98 deletions.
33 changes: 33 additions & 0 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,39 @@ NNG_DECL int nng_stream_listener_set_ptr(
NNG_DECL int nng_stream_listener_set_addr(
nng_stream_listener *, const char *, const nng_sockaddr *);

// UDP operations. These are provided for convenience,
// and should be considered somewhat experimental.

// nng_udp represents a socket / file descriptor for use with UDP
typedef struct nng_udp nng_udp;

// nng_udp_open initializes a UDP socket. The socket is bound
// to the specified address.
NNG_DECL int nng_udp_open(nng_udp **udpp, nng_sockaddr *sa);

// nng_udp_close closes the underlying UDP socket.
NNG_DECL void nng_udp_close(nng_udp *udp);

// nng_udp_sockname determines the locally bound address.
// This is useful to determine a chosen port after binding to port 0.
NNG_DECL int nng_udp_sockname(nng_udp *udp, nng_sockaddr *sa);

// nng_udp_send sends the data in the aio to the the
// destination specified in the nng_aio. The iovs are the UDP payload.
// The destination address is the first input (0th) for the aio.
NNG_DECL void nng_udp_send(nng_udp *udp, nng_aio *aio);

// nng_udp_recv receives a message, storing it in the iovs
// from the UDP payload. If the UDP payload will not fit, then
// NNG_EMSGSIZE results. The senders address is stored in the
// socket address (nng_sockaddr), which should have been specified
// in the aio's first input.
NNG_DECL void nng_udp_recv(nng_udp *udp, nng_aio *aio);

// nng_udp_membership provides for joining or leaving multicast groups.
NNG_DECL int nng_udp_multicast_membership(
nng_udp *udp, nng_sockaddr *sa, bool join);

#ifndef NNG_ELIDE_DEPRECATED
// These are legacy APIs that have been deprecated.
// Their use is strongly discouraged.
Expand Down
4 changes: 4 additions & 0 deletions src/core/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ extern void nni_plat_udp_send(nni_plat_udp *, nni_aio *);
// NNG_EMSGSIZE results.
extern void nni_plat_udp_recv(nni_plat_udp *, nni_aio *);

// nni_plat_udp_membership provides for joining or leaving multicast groups.
extern int nni_plat_udp_multicast_membership(
nni_plat_udp *udp, nni_sockaddr *sa, bool join);

//
// Notification Pipe Pairs
//
Expand Down
39 changes: 39 additions & 0 deletions src/nng.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "nng/nng.h"
#include "core/nng_impl.h"
#include "core/platform.h"

// This file provides the "public" API. This is a thin wrapper around
// internal API functions. We use the public prefix instead of internal,
Expand Down Expand Up @@ -2174,3 +2175,41 @@ nng_socket_pair(int fds[2])
{
return (nni_socket_pair(fds));
}

int
nng_udp_open(nng_udp **udp, nng_sockaddr *sa)
{
(void) nni_init();
return (nni_plat_udp_open((nni_plat_udp **) udp, sa));
}

void
nng_udp_close(nng_udp *udp)
{
nni_plat_udp_close((nni_plat_udp *) udp);
}

int
nng_udp_sockname(nng_udp *udp, nng_sockaddr *sa)
{
return (nni_plat_udp_sockname((nni_plat_udp *) udp, sa));
}

void
nng_udp_send(nng_udp *udp, nng_aio *aio)
{
nni_plat_udp_send((nni_plat_udp *) udp, aio);
}

void
nng_udp_recv(nng_udp *udp, nng_aio *aio)
{
nni_plat_udp_recv((nni_plat_udp *) udp, aio);
}

int
nng_udp_multicast_membership(nng_udp *udp, nng_sockaddr *sa, bool join)
{
return (
nni_plat_udp_multicast_membership((nni_plat_udp *) udp, sa, join));
}
128 changes: 121 additions & 7 deletions src/platform/posix/posix_udp.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2020 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand All @@ -9,6 +9,9 @@
//

#include "core/nng_impl.h"
#include "nng/nng.h"
#include "platform/posix/posix_impl.h"
#include <sys/errno.h>

#ifdef NNG_PLATFORM_POSIX
#include "platform/posix/posix_pollq.h"
Expand All @@ -27,6 +30,22 @@
#define MSG_NOSIGNAL 0
#endif

#ifndef NNG_HAVE_INET6
#undef NNG_ENABLE_IPV6
#endif

// Linux has IPV6_ADD_MEMBERSHIP and IPV6_DROP_MEMBERSHIP
#ifndef IPV6_JOIN_GROUP
#ifdef IPV6_ADD_MEMBERSHIP
#define IPV6_JOIN_GROUP IPV6_ADD_MEMBERSHIP
#endif
#endif
#ifndef IPV6_LEAVE_GROUP
#ifdef IPV6_DROP_MEMBERSHIP
#define IPV6_LEAVE_GROUP IPV6_DROP_MEMBERSHIP
#endif
#endif

struct nni_plat_udp {
nni_posix_pfd *udp_pfd;
int udp_fd;
Expand Down Expand Up @@ -56,15 +75,15 @@ nni_posix_udp_doclose(nni_plat_udp *udp)
static void
nni_posix_udp_dorecv(nni_plat_udp *udp)
{
nni_aio * aio;
nni_aio *aio;
nni_list *q = &udp->udp_recvq;
// While we're able to recv, do so.
while ((aio = nni_list_first(q)) != NULL) {
struct iovec iov[4];
unsigned niov;
nni_iov * aiov;
nni_iov *aiov;
struct sockaddr_storage ss;
nng_sockaddr * sa;
nng_sockaddr *sa;
struct msghdr hdr = { .msg_name = NULL };
int rv = 0;
int cnt = 0;
Expand Down Expand Up @@ -102,7 +121,7 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
static void
nni_posix_udp_dosend(nni_plat_udp *udp)
{
nni_aio * aio;
nni_aio *aio;
nni_list *q = &udp->udp_sendq;

// While we're able to send, do so.
Expand All @@ -118,7 +137,7 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
rv = NNG_EADDRINVAL;
} else {
unsigned niov;
nni_iov * aiov;
nni_iov *aiov;
struct iovec iov[16];

nni_aio_get_iov(aio, &niov, &aiov);
Expand Down Expand Up @@ -192,7 +211,7 @@ nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
int
nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
{
nni_plat_udp * udp;
nni_plat_udp *udp;
int salen;
struct sockaddr_storage sa;
int rv;
Expand Down Expand Up @@ -323,4 +342,99 @@ nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa)
return (nni_posix_sockaddr2nn(sa, &ss, sz));
}

// Joining a multicast group is different than binding to a multicast
// group. This allows to receive both unicast and multicast at the given
// address.
static int
ip4_multicast_member(nni_plat_udp *udp, struct sockaddr *sa, bool join)
{
struct ip_mreq mreq;
struct sockaddr_in *sin;
struct sockaddr_storage local;
socklen_t sz = sizeof(local);

if (getsockname(udp->udp_fd, (struct sockaddr *) &local, &sz) >= 0) {
if (local.ss_family != AF_INET) {
// address families have to match
return (NNG_EADDRINVAL);
}
sin = (struct sockaddr_in *) &local;
mreq.imr_interface.s_addr = sin->sin_addr.s_addr;
} else {
mreq.imr_interface.s_addr = INADDR_ANY;
}

// Determine our local interface
sin = (struct sockaddr_in *) sa;

mreq.imr_multiaddr.s_addr = sin->sin_addr.s_addr;
if (setsockopt(udp->udp_fd, IPPROTO_IP,
join ? IP_ADD_MEMBERSHIP : IP_DROP_MEMBERSHIP, &mreq,
sizeof(mreq)) == 0) {
return (0);
}
return (nni_plat_errno(errno));
}

#ifdef NNG_ENABLE_IPV6
static int
ip6_multicast_member(nni_plat_udp *udp, struct sockaddr *sa, bool join)
{
struct ipv6_mreq mreq;
struct sockaddr_in6 *sin6;
struct sockaddr_storage local;
socklen_t sz = sizeof(local);

if (getsockname(udp->udp_fd, (struct sockaddr *) &local, &sz) >= 0) {
if (local.ss_family != AF_INET6) {
// address families have to match
return (NNG_EADDRINVAL);
}
sin6 = (struct sockaddr_in6 *) &local;
mreq.ipv6mr_interface = sin6->sin6_scope_id;
} else {
mreq.ipv6mr_interface = 0;
}

// Determine our local interface
sin6 = (struct sockaddr_in6 *) sa;

mreq.ipv6mr_multiaddr = sin6->sin6_addr;
if (setsockopt(udp->udp_fd, IPPROTO_IPV6,
join ? IPV6_JOIN_GROUP : IPV6_LEAVE_GROUP, &mreq,
sizeof(mreq)) == 0) {
return (0);
}
return (nni_plat_errno(errno));
}
#endif

int
nni_plat_udp_multicast_membership(
nni_plat_udp *udp, nni_sockaddr *sa, bool join)
{
struct sockaddr_storage ss;
socklen_t sz;
int rv;

sz = nni_posix_nn2sockaddr(&ss, sa);
if (sz < 1) {
return (NNG_EADDRINVAL);
}
switch (ss.ss_family) {
case AF_INET:
rv = ip4_multicast_member(udp, (struct sockaddr *) &ss, join);
break;
#ifdef NNG_ENABLE_IPV6
case AF_INET6:
rv = ip6_multicast_member(udp, (struct sockaddr *) &ss, join);
break;
#endif
default:
rv = NNG_EADDRINVAL;
}

return (rv);
}

#endif // NNG_PLATFORM_POSIX
Loading

0 comments on commit 603282f

Please sign in to comment.