From 1282d64868b9c560c074b9c9630391f3b18ef633 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Thu, 23 Aug 2012 01:14:41 +0200 Subject: [PATCH] unix: remove dependency on libev --- README.md | 4 +- include/uv-private/uv-bsd.h | 4 +- include/uv-private/uv-darwin.h | 4 +- include/uv-private/uv-unix.h | 29 ++-- include/uv.h | 28 ++-- src/unix/async.c | 11 +- src/unix/core.c | 153 ++++++++++++++----- src/unix/darwin.c | 3 + src/unix/freebsd.c | 2 +- src/unix/fsevents.c | 6 +- src/unix/internal.h | 51 +++++-- src/unix/kqueue.c | 263 +++++++++++++++++++++++++++++---- src/unix/linux/inotify.c | 15 +- src/unix/linux/linux-core.c | 167 ++++++++++++++++++++- src/unix/loop.c | 39 +++-- src/unix/netbsd.c | 2 +- src/unix/openbsd.c | 2 +- src/unix/pipe.c | 40 +++-- src/unix/poll.c | 24 ++- src/unix/process.c | 2 +- src/unix/signal.c | 9 +- src/unix/stream.c | 143 +++++++++--------- src/unix/sunos.c | 163 +++++++++++++++++++- src/unix/tcp.c | 88 ++++------- src/unix/tty.c | 6 +- src/unix/udp.c | 131 ++++++++-------- 26 files changed, 988 insertions(+), 401 deletions(-) diff --git a/README.md b/README.md index e553fe81dd..2d8b4367c0 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # libuv [![Build Status](https://secure.travis-ci.org/joyent/libuv.png)](http://travis-ci.org/joyent/libuv) libuv is a new platform layer for Node. Its purpose is to abstract IOCP on -Windows and libev on Unix systems. We intend to eventually contain all -platform differences in this library. +Windows and epoll/kqueue/event ports/etc. on Unix systems. We intend to +eventually contain all platform differences in this library. http://nodejs.org/ diff --git a/include/uv-private/uv-bsd.h b/include/uv-private/uv-bsd.h index e4039abf31..fc253aa89b 100644 --- a/include/uv-private/uv-bsd.h +++ b/include/uv-private/uv-bsd.h @@ -23,8 +23,6 @@ #define UV_BSD_H #define UV_PLATFORM_FS_EVENT_FIELDS \ - ev_io event_watcher; \ - int fflags; \ - int fd; \ + uv__io_t event_watcher; \ #endif /* UV_BSD_H */ diff --git a/include/uv-private/uv-darwin.h b/include/uv-private/uv-darwin.h index 6a3ab4a3fd..c4bfd0ff9e 100644 --- a/include/uv-private/uv-darwin.h +++ b/include/uv-private/uv-darwin.h @@ -39,9 +39,7 @@ ngx_queue_t cf_signals; \ #define UV_PLATFORM_FS_EVENT_FIELDS \ - ev_io event_watcher; \ - int fflags; \ - int fd; \ + uv__io_t event_watcher; \ char* realpath; \ int realpath_len; \ int cf_flags; \ diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 7b3554a301..683a9c9298 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -24,8 +24,6 @@ #include "ngx-queue.h" -#include "ev.h" - #include #include #include @@ -46,11 +44,18 @@ struct uv__io_s; struct uv_loop_s; +typedef void (*uv__io_cb)(struct uv_loop_s* loop, + struct uv__io_s* w, + unsigned int events); typedef struct uv__io_s uv__io_t; -typedef void (*uv__io_cb)(struct uv_loop_s* loop, uv__io_t* handle, int events); struct uv__io_s { - ev_io io_watcher; + uv__io_cb cb; + ngx_queue_t pending_queue; + ngx_queue_t watcher_queue; + unsigned int pevents; /* Pending event mask i.e. mask at next tick. */ + unsigned int events; /* Current event mask. */ + int fd; }; struct uv__work { @@ -135,7 +140,12 @@ typedef struct { #define UV_LOOP_PRIVATE_FIELDS \ unsigned long flags; \ - struct ev_loop* ev; \ + int backend_fd; \ + ngx_queue_t pending_queue; \ + ngx_queue_t watcher_queue; \ + uv__io_t** watchers; \ + unsigned int nwatchers; \ + unsigned int nfds; \ ngx_queue_t wq; \ uv_mutex_t wq_mutex; \ uv_async_t wq_async; \ @@ -193,23 +203,19 @@ typedef struct { #define UV_STREAM_PRIVATE_FIELDS \ uv_connect_t *connect_req; \ uv_shutdown_t *shutdown_req; \ - uv__io_t read_watcher; \ - uv__io_t write_watcher; \ + uv__io_t io_watcher; \ ngx_queue_t write_queue; \ ngx_queue_t write_completed_queue; \ uv_connection_cb connection_cb; \ int delayed_error; \ int accepted_fd; \ - int fd; \ #define UV_TCP_PRIVATE_FIELDS /* empty */ #define UV_UDP_PRIVATE_FIELDS \ - int fd; \ uv_alloc_cb alloc_cb; \ uv_udp_recv_cb recv_cb; \ - uv__io_t read_watcher; \ - uv__io_t write_watcher; \ + uv__io_t io_watcher; \ ngx_queue_t write_queue; \ ngx_queue_t write_completed_queue; \ @@ -217,7 +223,6 @@ typedef struct { const char* pipe_fname; /* strdup'ed */ #define UV_POLL_PRIVATE_FIELDS \ - int fd; \ uv__io_t io_watcher; #define UV_PREPARE_PRIVATE_FIELDS \ diff --git a/include/uv.h b/include/uv.h index 7f3bc589a5..b187d6cb35 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1044,9 +1044,8 @@ UV_EXTERN int uv_poll_stop(uv_poll_t* handle); /* * uv_prepare_t is a subclass of uv_handle_t. * - * libev wrapper. Every active prepare handle gets its callback called - * exactly once per loop iteration, just before the system blocks to wait - * for completed i/o. + * Every active prepare handle gets its callback called exactly once per loop + * iteration, just before the system blocks to wait for completed i/o. */ struct uv_prepare_s { UV_HANDLE_FIELDS @@ -1063,8 +1062,8 @@ UV_EXTERN int uv_prepare_stop(uv_prepare_t* prepare); /* * uv_check_t is a subclass of uv_handle_t. * - * libev wrapper. Every active check handle gets its callback called exactly - * once per loop iteration, just after the system returns from blocking. + * Every active check handle gets its callback called exactly once per loop + * iteration, just after the system returns from blocking. */ struct uv_check_s { UV_HANDLE_FIELDS @@ -1081,10 +1080,10 @@ UV_EXTERN int uv_check_stop(uv_check_t* check); /* * uv_idle_t is a subclass of uv_handle_t. * - * libev wrapper. Every active idle handle gets its callback called - * repeatedly until it is stopped. This happens after all other types of - * callbacks are processed. When there are multiple "idle" handles active, - * their callbacks are called in turn. + * Every active idle handle gets its callback called repeatedly until it is + * stopped. This happens after all other types of callbacks are processed. + * When there are multiple "idle" handles active, their callbacks are called + * in turn. */ struct uv_idle_s { UV_HANDLE_FIELDS @@ -1101,12 +1100,11 @@ UV_EXTERN int uv_idle_stop(uv_idle_t* idle); /* * uv_async_t is a subclass of uv_handle_t. * - * libev wrapper. uv_async_send wakes up the event - * loop and calls the async handle's callback There is no guarantee that - * every uv_async_send call leads to exactly one invocation of the callback; - * The only guarantee is that the callback function is called at least once - * after the call to async_send. Unlike all other libuv functions, - * uv_async_send can be called from another thread. + * uv_async_send wakes up the event loop and calls the async handle's callback. + * There is no guarantee that every uv_async_send call leads to exactly one + * invocation of the callback; the only guarantee is that the callback function + * is called at least once after the call to async_send. Unlike all other + * libuv functions, uv_async_send can be called from another thread. */ struct uv_async_s { UV_HANDLE_FIELDS diff --git a/src/unix/async.c b/src/unix/async.c index 4d3cd94f17..683084e6a4 100644 --- a/src/unix/async.c +++ b/src/unix/async.c @@ -27,7 +27,7 @@ #include static int uv__async_init(uv_loop_t* loop); -static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events); +static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); static int uv__async_make_pending(volatile sig_atomic_t* ptr) { @@ -104,17 +104,14 @@ static int uv__async_init(uv_loop_t* loop) { if (uv__make_pipe(loop->async_pipefd, UV__F_NONBLOCK)) return -1; - uv__io_init(&loop->async_watcher, - uv__async_io, - loop->async_pipefd[0], - UV__IO_READ); - uv__io_start(loop, &loop->async_watcher); + uv__io_init(&loop->async_watcher, uv__async_io, loop->async_pipefd[0]); + uv__io_start(loop, &loop->async_watcher, UV__IO_READ); return 0; } -static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events) { +static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { char buf[1024]; ngx_queue_t* q; uv_async_t* h; diff --git a/src/unix/core.c b/src/unix/core.c index d7ccb68b0c..0be7e33584 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -35,7 +35,7 @@ #include #include #include -#include /* PATH_MAX */ +#include /* INT_MAX, PATH_MAX */ #include /* writev */ #ifdef __linux__ @@ -60,6 +60,8 @@ # include #endif +static void uv__run_pending(uv_loop_t* loop); + static uv_loop_t default_loop_struct; static uv_loop_t* default_loop_ptr; @@ -167,9 +169,6 @@ static void uv__finish_close(uv_handle_t* handle) { case UV_NAMED_PIPE: case UV_TCP: case UV_TTY: - assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher)); - assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher)); - assert(((uv_stream_t*)handle)->fd == -1); uv__stream_destroy((uv_stream_t*)handle); break; @@ -263,20 +262,13 @@ static unsigned int uv__poll_timeout(uv_loop_t* loop) { } -static void uv__poll(uv_loop_t* loop) { - void ev__run(EV_P_ ev_tstamp waittime); - ev_invoke_pending(loop->ev); - ev__run(loop->ev, uv__poll_timeout(loop) / 1000.); - ev_invoke_pending(loop->ev); -} - - static int uv__run(uv_loop_t* loop) { uv_update_time(loop); uv__run_timers(loop); uv__run_idle(loop); uv__run_prepare(loop); - uv__poll(loop); + uv__run_pending(loop); + uv__io_poll(loop, uv__poll_timeout(loop)); uv__run_check(loop); uv__run_closing_handles(loop); return uv__has_active_handles(loop) || uv__has_active_reqs(loop); @@ -534,49 +526,136 @@ void uv_disable_stdio_inheritance(void) { } -static void uv__io_set_cb(uv__io_t* handle, uv__io_cb cb) { - union { void* data; uv__io_cb cb; } u; - u.cb = cb; - handle->io_watcher.data = u.data; +static void uv__run_pending(uv_loop_t* loop) { + ngx_queue_t* q; + uv__io_t* w; + + while (!ngx_queue_empty(&loop->pending_queue)) { + q = ngx_queue_head(&loop->pending_queue); + ngx_queue_remove(q); + ngx_queue_init(q); + + w = ngx_queue_data(q, uv__io_t, pending_queue); + w->cb(loop, w, UV__IO_WRITE); + } } -static void uv__io_rw(struct ev_loop* ev, ev_io* w, int events) { - union { void* data; uv__io_cb cb; } u; - uv_loop_t* loop = ev_userdata(ev); - uv__io_t* handle = container_of(w, uv__io_t, io_watcher); - u.data = handle->io_watcher.data; - u.cb(loop, handle, events & (EV_READ|EV_WRITE|EV_ERROR)); +static unsigned int next_power_of_two(unsigned int val) { + val -= 1; + val |= val >> 1; + val |= val >> 2; + val |= val >> 4; + val |= val >> 8; + val |= val >> 16; + val += 1; + return val; } +static void maybe_resize(uv_loop_t* loop, unsigned int len) { + uv__io_t** watchers; + unsigned int nwatchers; + unsigned int i; -void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events) { - ev_io_init(&handle->io_watcher, uv__io_rw, fd, events & (EV_READ|EV_WRITE)); - uv__io_set_cb(handle, cb); + if (len <= loop->nwatchers) + return; + + nwatchers = next_power_of_two(len); + watchers = realloc(loop->watchers, nwatchers * sizeof(loop->watchers[0])); + + if (watchers == NULL) + abort(); + + for (i = loop->nwatchers; i < nwatchers; i++) + watchers[i] = NULL; + + loop->watchers = watchers; + loop->nwatchers = nwatchers; } -void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events) { - ev_io_set(&handle->io_watcher, fd, events); - uv__io_set_cb(handle, cb); +void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) { + assert(cb != NULL); + assert(fd >= -1); + ngx_queue_init(&w->pending_queue); + ngx_queue_init(&w->watcher_queue); + w->cb = cb; + w->fd = fd; + w->events = 0; + w->pevents = 0; } -void uv__io_start(uv_loop_t* loop, uv__io_t* handle) { - ev_io_start(loop->ev, &handle->io_watcher); +/* Note that uv__io_start() and uv__io_stop() can't simply remove the watcher + * from the queue when the new event mask equals the old one. The event ports + * backend operates exclusively in single-shot mode and needs to rearm all fds + * before each call to port_getn(). It's up to the individual backends to + * filter out superfluous event mask modifications. + */ + + +void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) { + assert(0 == (events & ~(UV__IO_READ | UV__IO_WRITE))); + assert(0 != events); + assert(w->fd >= 0); + assert(w->fd < INT_MAX); + + w->pevents |= events; + maybe_resize(loop, w->fd + 1); + + if (ngx_queue_empty(&w->watcher_queue)) + ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); + + if (loop->watchers[w->fd] == NULL) { + loop->watchers[w->fd] = w; + loop->nfds++; + } } -void uv__io_stop(uv_loop_t* loop, uv__io_t* handle) { - ev_io_stop(loop->ev, &handle->io_watcher); +void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) { + assert(0 == (events & ~(UV__IO_READ | UV__IO_WRITE))); + assert(0 != events); + + if (w->fd == -1) + return; + + assert(w->fd >= 0); + + /* Happens when uv__io_stop() is called on a handle that was never started. */ + if ((unsigned) w->fd >= loop->nwatchers) + return; + + w->pevents &= ~events; + + if (w->pevents == 0) { + ngx_queue_remove(&w->pending_queue); + ngx_queue_init(&w->pending_queue); + + ngx_queue_remove(&w->watcher_queue); + ngx_queue_init(&w->watcher_queue); + + if (loop->watchers[w->fd] != NULL) { + assert(loop->watchers[w->fd] == w); + assert(loop->nfds > 0); + loop->watchers[w->fd] = NULL; + loop->nfds--; + w->events = 0; + } + } + else if (ngx_queue_empty(&w->watcher_queue)) + ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); } -void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event) { - ev_feed_event(loop->ev, &handle->io_watcher, event); +void uv__io_feed(uv_loop_t* loop, uv__io_t* w) { + if (ngx_queue_empty(&w->pending_queue)) + ngx_queue_insert_tail(&loop->pending_queue, &w->pending_queue); } -int uv__io_active(uv__io_t* handle) { - return ev_is_active(&handle->io_watcher); +int uv__io_active(const uv__io_t* w, unsigned int events) { + assert(0 == (events & ~(UV__IO_READ | UV__IO_WRITE))); + assert(0 != events); + return 0 != (w->pevents & events); } diff --git a/src/unix/darwin.c b/src/unix/darwin.c index 9de27db961..f40efb2fcd 100644 --- a/src/unix/darwin.c +++ b/src/unix/darwin.c @@ -55,6 +55,9 @@ int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { CFRunLoopSourceContext ctx; int r; + if (uv__kqueue_init(loop)) + return -1; + loop->cf_loop = NULL; if ((r = uv_mutex_init(&loop->cf_mutex))) return r; diff --git a/src/unix/freebsd.c b/src/unix/freebsd.c index 76f2793ade..6619893b9b 100644 --- a/src/unix/freebsd.c +++ b/src/unix/freebsd.c @@ -55,7 +55,7 @@ static char *process_title; int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { - return 0; + return uv__kqueue_init(loop); } diff --git a/src/unix/fsevents.c b/src/unix/fsevents.c index 02565f3a1d..b6d274675a 100644 --- a/src/unix/fsevents.c +++ b/src/unix/fsevents.c @@ -81,12 +81,14 @@ void uv__fsevents_cb(uv_async_t* cb, int status) { handle = cb->data; UV__FSEVENTS_WALK(handle, { - if (handle->fd != -1) + if (handle->event_watcher.fd != -1) handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0); }); - if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 && handle->fd == -1) + if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 && + handle->event_watcher.fd == -1) { uv__fsevents_close(handle); + } } diff --git a/src/unix/internal.h b/src/unix/internal.h index 80e12a32b9..834a67d522 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -82,9 +82,35 @@ } \ while (0) -#define UV__IO_READ EV_READ -#define UV__IO_WRITE EV_WRITE -#define UV__IO_ERROR EV_ERROR +#if defined(__linux__) +# define UV__IO_READ UV__EPOLLIN +# define UV__IO_WRITE UV__EPOLLOUT +# define UV__IO_ERROR UV__EPOLLERR +# define UV__IO_HUP UV__EPOLLHUP +#endif + +#if defined(__sun) +# define UV__IO_READ POLLIN +# define UV__IO_WRITE POLLOUT +# define UV__IO_ERROR POLLERR +# define UV__IO_HUP POLLHUP +#endif + +#ifndef UV__IO_READ +# define UV__IO_READ 1 +#endif + +#ifndef UV__IO_WRITE +# define UV__IO_WRITE 2 +#endif + +#ifndef UV__IO_ERROR +# define UV__IO_ERROR 4 +#endif + +#ifndef UV__IO_HUP +# define UV__IO_HUP 8 +#endif /* handle flags */ enum { @@ -118,12 +144,12 @@ int uv__dup(int fd); int uv_async_stop(uv_async_t* handle); void uv__make_close_pending(uv_handle_t* handle); -void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events); -void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events); -void uv__io_start(uv_loop_t* loop, uv__io_t* handle); -void uv__io_stop(uv_loop_t* loop, uv__io_t* handle); -void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event); -int uv__io_active(uv__io_t* handle); +void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd); +void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events); +void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events); +void uv__io_feed(uv_loop_t* loop, uv__io_t* w); +int uv__io_active(const uv__io_t* w, unsigned int events); +void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */ /* loop */ int uv__loop_init(uv_loop_t* loop, int default_loop); @@ -141,13 +167,13 @@ void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type); int uv__stream_open(uv_stream_t*, int fd, int flags); void uv__stream_destroy(uv_stream_t* stream); -void uv__server_io(uv_loop_t* loop, uv__io_t* watcher, int events); +void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); int uv__accept(int sockfd); /* tcp */ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb); -int uv__tcp_nodelay(uv_tcp_t* handle, int enable); -int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay); +int uv__tcp_nodelay(int fd, int on); +int uv__tcp_keepalive(int fd, int on, unsigned int delay); /* pipe */ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); @@ -169,6 +195,7 @@ void uv__work_submit(uv_loop_t* loop, void uv__work_done(uv_async_t* handle, int status); /* platform specific */ +int uv__kqueue_init(uv_loop_t* loop); int uv__platform_loop_init(uv_loop_t* loop, int default_loop); void uv__platform_loop_delete(uv_loop_t* loop); diff --git a/src/unix/kqueue.c b/src/unix/kqueue.c index 46b9da2a27..70c8aea93a 100644 --- a/src/unix/kqueue.c +++ b/src/unix/kqueue.c @@ -29,57 +29,254 @@ #include #include #include +#include #include #include #include -static void uv__fs_event(EV_P_ ev_io* w, int revents); +static void uv__fs_event(uv_loop_t* loop, uv__io_t* w, unsigned int fflags); -static void uv__fs_event_start(uv_fs_event_t* handle) { - ev_io_init(&handle->event_watcher, - uv__fs_event, - handle->fd, - EV_LIBUV_KQUEUE_HACK); - ev_io_start(handle->loop->ev, &handle->event_watcher); +int uv__kqueue_init(uv_loop_t* loop) { + loop->backend_fd = kqueue(); + + if (loop->backend_fd == -1) + return -1; + + uv__cloexec(loop->backend_fd, 1); + + return 0; } -static void uv__fs_event_stop(uv_fs_event_t* handle) { - ev_io_stop(handle->loop->ev, &handle->event_watcher); +void uv__io_poll(uv_loop_t* loop, int timeout) { + struct kevent events[1024]; + struct kevent* ev; + struct timespec spec; + unsigned int nevents; + unsigned int revents; + ngx_queue_t* q; + uint64_t base; + uint64_t diff; + uv__io_t* w; + int filter; + int fflags; + int count; + int nfds; + int fd; + int op; + int i; + + if (loop->nfds == 0) { + assert(ngx_queue_empty(&loop->watcher_queue)); + return; + } + + nevents = 0; + + while (!ngx_queue_empty(&loop->watcher_queue)) { + q = ngx_queue_head(&loop->watcher_queue); + ngx_queue_remove(q); + ngx_queue_init(q); + + w = ngx_queue_data(q, uv__io_t, watcher_queue); + assert(w->pevents != 0); + assert(w->fd >= 0); + assert(w->fd < (int) loop->nwatchers); + + /* Filter out no-op changes. This is for compatibility with the event ports + * backend, see uv__io_start(). + */ + if (w->events == w->pevents) + continue; + + if ((w->events & UV__IO_READ) == 0 && (w->pevents & UV__IO_READ) != 0) { + filter = EVFILT_READ; + fflags = 0; + op = EV_ADD; + + if (w->cb == uv__fs_event) { + filter = EVFILT_VNODE; + fflags = NOTE_ATTRIB | NOTE_WRITE | NOTE_RENAME + | NOTE_DELETE | NOTE_EXTEND | NOTE_REVOKE; + op = EV_ADD | EV_ONESHOT; /* Stop the event from firing repeatedly. */ + } + + EV_SET(events + nevents, w->fd, filter, op, fflags, 0, 0); + + if (++nevents == ARRAY_SIZE(events)) { + if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL)) + abort(); + nevents = 0; + } + } + + if ((w->events & UV__IO_WRITE) == 0 && (w->pevents & UV__IO_WRITE) != 0) { + EV_SET(events + nevents, w->fd, EVFILT_WRITE, EV_ADD, 0, 0, 0); + + if (++nevents == ARRAY_SIZE(events)) { + if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL)) + abort(); + nevents = 0; + } + } + + w->events = w->pevents; + } + + assert(timeout >= -1); + base = loop->time; + count = 48; /* Benchmarks suggest this gives the best throughput. */ + + for (;; nevents = 0) { + if (timeout != -1) { + spec.tv_sec = timeout / 1000; + spec.tv_nsec = (timeout % 1000) * 1000000; + } + + nfds = kevent(loop->backend_fd, + events, + nevents, + events, + ARRAY_SIZE(events), + timeout == -1 ? NULL : &spec); + + if (nfds == 0) { + assert(timeout != -1); + return; + } + + if (nfds == -1) { + if (errno != EINTR) + abort(); + + if (timeout == 0) + return; + + if (timeout == -1) + continue; + + /* Interrupted by a signal. Update timeout and poll again. */ + goto update_timeout; + } + + nevents = 0; + + for (i = 0; i < nfds; i++) { + ev = events + i; + fd = ev->ident; + w = loop->watchers[fd]; + + if (w == NULL) { + /* File descriptor that we've stopped watching, disarm it. */ + /* TODO batch up */ + struct kevent events[1]; + + EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0); + if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL)) + if (errno != EBADF && errno != ENOENT) + abort(); + + continue; + } + + if (ev->filter == EVFILT_VNODE) { + assert(w->events == UV__IO_READ); + assert(w->pevents == UV__IO_READ); + w->cb(loop, w, ev->fflags); /* XXX always uv__fs_event() */ + nevents++; + continue; + } + + revents = 0; + + if (ev->filter == EVFILT_READ) { + if (w->events & UV__IO_READ) + revents |= UV__IO_READ; + else { + /* TODO batch up */ + struct kevent events[1]; + EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0); + if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL)) abort(); + } + } + + if (ev->filter == EVFILT_WRITE) { + if (w->events & UV__IO_WRITE) + revents |= UV__IO_WRITE; + else { + /* TODO batch up */ + struct kevent events[1]; + EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0); + if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL)) abort(); + } + } + + if (ev->flags & EV_ERROR) + revents |= UV__IO_ERROR; + + if (revents == 0) + continue; + + w->cb(loop, w, revents); + nevents++; + } + + if (nevents != 0) { + if (nfds == ARRAY_SIZE(events) && --count != 0) { + /* Poll for more events but don't block this time. */ + timeout = 0; + continue; + } + return; + } + + if (timeout == 0) + return; + + if (timeout == -1) + continue; + +update_timeout: + assert(timeout > 0); + + diff = uv_hrtime() / 1000000; + assert(diff >= base); + diff -= base; + + if (diff >= (uint64_t) timeout) + return; + + timeout -= diff; + } } -static void uv__fs_event(EV_P_ ev_io* w, int revents) { +static void uv__fs_event(uv_loop_t* loop, uv__io_t* w, unsigned int fflags) { uv_fs_event_t* handle; + struct kevent ev; int events; - assert(revents == EV_LIBUV_KQUEUE_HACK); - handle = container_of(w, uv_fs_event_t, event_watcher); - if (handle->fflags & (NOTE_ATTRIB | NOTE_EXTEND)) + if (fflags & (NOTE_ATTRIB | NOTE_EXTEND)) events = UV_CHANGE; else events = UV_RENAME; handle->cb(handle, NULL, events, 0); - if (handle->fd == -1) + if (handle->event_watcher.fd == -1) return; - /* File watcher operates in one-shot mode, re-arm it. */ - uv__fs_event_stop(handle); - uv__fs_event_start(handle); -} - + /* Watcher operates in one-shot mode, re-arm it. */ + fflags = NOTE_ATTRIB | NOTE_WRITE | NOTE_RENAME + | NOTE_DELETE | NOTE_EXTEND | NOTE_REVOKE; -/* Called by libev, don't touch. */ -void uv__kqueue_hack(EV_P_ int fflags, ev_io *w) { - uv_fs_event_t* handle; + EV_SET(&ev, w->fd, EVFILT_VNODE, EV_ADD | EV_ONESHOT, fflags, 0, 0); - handle = container_of(w, uv_fs_event_t, event_watcher); - handle->fflags = fflags; + if (kevent(loop->backend_fd, &ev, 1, NULL, 0, NULL)) + abort(); } @@ -88,10 +285,10 @@ int uv_fs_event_init(uv_loop_t* loop, const char* filename, uv_fs_event_cb cb, int flags) { - int fd; #if defined(__APPLE__) struct stat statbuf; #endif /* defined(__APPLE__) */ + int fd; /* TODO open asynchronously - but how do we report back errors? */ if ((fd = open(filename, O_RDONLY)) == -1) { @@ -101,10 +298,9 @@ int uv_fs_event_init(uv_loop_t* loop, uv__handle_init(loop, (uv_handle_t*)handle, UV_FS_EVENT); uv__handle_start(handle); /* FIXME shouldn't start automatically */ + uv__io_init(&handle->event_watcher, uv__fs_event, fd); handle->filename = strdup(filename); - handle->fflags = 0; handle->cb = cb; - handle->fd = fd; #if defined(__APPLE__) /* Nullify field to perform checks later */ @@ -124,7 +320,7 @@ int uv_fs_event_init(uv_loop_t* loop, fallback: #endif /* defined(__APPLE__) */ - uv__fs_event_start(handle); + uv__io_start(loop, &handle->event_watcher, UV__IO_READ); return 0; } @@ -133,13 +329,16 @@ int uv_fs_event_init(uv_loop_t* loop, void uv__fs_event_close(uv_fs_event_t* handle) { #if defined(__APPLE__) if (uv__fsevents_close(handle)) - uv__fs_event_stop(handle); + uv__io_stop(handle->loop, &handle->event_watcher, UV__IO_READ); #else - uv__fs_event_stop(handle); + uv__io_stop(handle->loop, &handle->event_watcher, UV__IO_READ); #endif /* defined(__APPLE__) */ uv__handle_stop(handle); + free(handle->filename); - close(handle->fd); - handle->fd = -1; + handle->filename = NULL; + + close(handle->event_watcher.fd); + handle->event_watcher.fd = -1; } diff --git a/src/unix/linux/inotify.c b/src/unix/linux/inotify.c index 97231db9a8..b44623ef76 100644 --- a/src/unix/linux/inotify.c +++ b/src/unix/linux/inotify.c @@ -64,7 +64,9 @@ static int compare_watchers(const struct watcher_list* a, RB_GENERATE_STATIC(watcher_root, watcher_list, entry, compare_watchers) -static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int revents); +static void uv__inotify_read(uv_loop_t* loop, + uv__io_t* w, + unsigned int revents); static int new_inotify_fd(void) { @@ -98,11 +100,8 @@ static int init_inotify(uv_loop_t* loop) { return -1; } - uv__io_init(&loop->inotify_read_watcher, - uv__inotify_read, - loop->inotify_fd, - UV__IO_READ); - uv__io_start(loop, &loop->inotify_read_watcher); + uv__io_init(&loop->inotify_read_watcher, uv__inotify_read, loop->inotify_fd); + uv__io_start(loop, &loop->inotify_read_watcher, UV__IO_READ); return 0; } @@ -115,7 +114,9 @@ static struct watcher_list* find_watcher(uv_loop_t* loop, int wd) { } -static void uv__inotify_read(uv_loop_t* loop, uv__io_t* dummy, int events) { +static void uv__inotify_read(uv_loop_t* loop, + uv__io_t* dummy, + unsigned int events) { const struct uv__inotify_event* e; struct watcher_list* w; uv_fs_event_t* h; diff --git a/src/unix/linux/linux-core.c b/src/unix/linux/linux-core.c index e92869e3b0..488898cbd1 100644 --- a/src/unix/linux/linux-core.c +++ b/src/unix/linux/linux-core.c @@ -78,20 +78,183 @@ static void free_args_mem(void) { int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { - loop->inotify_watchers = NULL; + int fd; + + fd = uv__epoll_create1(UV__EPOLL_CLOEXEC); + + /* epoll_create1() can fail either because it's not implemented (old kernel) + * or because it doesn't understand the EPOLL_CLOEXEC flag. + */ + if (fd == -1 && (errno == ENOSYS || errno == EINVAL)) { + fd = uv__epoll_create(256); + + if (fd != -1) + uv__cloexec(fd, 1); + } + + loop->backend_fd = fd; loop->inotify_fd = -1; + loop->inotify_watchers = NULL; + + if (fd == -1) + return -1; + return 0; } void uv__platform_loop_delete(uv_loop_t* loop) { if (loop->inotify_fd == -1) return; - uv__io_stop(loop, &loop->inotify_read_watcher); + uv__io_stop(loop, &loop->inotify_read_watcher, UV__IO_READ); close(loop->inotify_fd); loop->inotify_fd = -1; } +void uv__io_poll(uv_loop_t* loop, int timeout) { + struct uv__epoll_event events[1024]; + struct uv__epoll_event* pe; + struct uv__epoll_event e; + ngx_queue_t* q; + uv__io_t* w; + uint64_t base; + uint64_t diff; + int nevents; + int count; + int nfds; + int fd; + int op; + int i; + + if (loop->nfds == 0) { + assert(ngx_queue_empty(&loop->watcher_queue)); + return; + } + + while (!ngx_queue_empty(&loop->watcher_queue)) { + q = ngx_queue_head(&loop->watcher_queue); + ngx_queue_remove(q); + ngx_queue_init(q); + + w = ngx_queue_data(q, uv__io_t, watcher_queue); + assert(w->pevents != 0); + assert(w->fd >= 0); + assert(w->fd < (int) loop->nwatchers); + + /* Filter out no-op changes. This is for compatibility with the event ports + * backend, see the comment in uv__io_start(). + */ + if (w->events == w->pevents) + continue; + + e.events = w->pevents; + e.data = w->fd; + + if (w->events == 0) + op = UV__EPOLL_CTL_ADD; + else + op = UV__EPOLL_CTL_MOD; + + /* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching + * events, skip the syscall and squelch the events after epoll_wait(). + */ + if (uv__epoll_ctl(loop->backend_fd, op, w->fd, &e)) { + if (errno != EEXIST) + abort(); + + assert(op == UV__EPOLL_CTL_ADD); + + /* We've reactivated a file descriptor that's been watched before. */ + if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_MOD, w->fd, &e)) + abort(); + } + + w->events = w->pevents; + } + + assert(timeout >= -1); + base = loop->time; + count = 48; /* Benchmarks suggest this gives the best throughput. */ + + for (;;) { + nfds = uv__epoll_wait(loop->backend_fd, + events, + ARRAY_SIZE(events), + timeout); + + if (nfds == 0) { + assert(timeout != -1); + return; + } + + if (nfds == -1) { + if (errno != EINTR) + abort(); + + if (timeout == -1) + continue; + + if (timeout == 0) + return; + + /* Interrupted by a signal. Update timeout and poll again. */ + goto update_timeout; + } + + nevents = 0; + + for (i = 0; i < nfds; i++) { + pe = events + i; + fd = pe->data; + + assert(fd >= 0); + assert((unsigned) fd < loop->nwatchers); + + w = loop->watchers[fd]; + + if (w == NULL) { + /* File descriptor that we've stopped watching, disarm it. */ + if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_DEL, fd, pe)) + if (errno != EBADF && errno != ENOENT) + abort(); + + continue; + } + + w->cb(loop, w, pe->events); + nevents++; + } + + if (nevents != 0) { + if (nfds == ARRAY_SIZE(events) && --count != 0) { + /* Poll for more events but don't block this time. */ + timeout = 0; + continue; + } + return; + } + + if (timeout == 0) + return; + + if (timeout == -1) + continue; + +update_timeout: + assert(timeout > 0); + + diff = uv_hrtime() / 1000000; + assert(diff >= base); + diff -= base; + + if (diff >= (uint64_t) timeout) + return; + + timeout -= diff; + } +} + + uint64_t uv_hrtime() { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); diff --git a/src/unix/loop.c b/src/unix/loop.c index c70513f3a9..e63f12a622 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -29,16 +29,9 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { unsigned int i; - int flags; uv__signal_global_once_init(); -#if HAVE_KQUEUE - flags = EVBACKEND_KQUEUE; -#else - flags = EVFLAG_AUTO; -#endif - memset(loop, 0, sizeof(*loop)); RB_INIT(&loop->timer_handles); ngx_queue_init(&loop->wq); @@ -48,15 +41,24 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { ngx_queue_init(&loop->check_handles); ngx_queue_init(&loop->prepare_handles); ngx_queue_init(&loop->handle_queue); + + loop->nfds = 0; + loop->watchers = NULL; + loop->nwatchers = 0; + ngx_queue_init(&loop->pending_queue); + ngx_queue_init(&loop->watcher_queue); + loop->closing_handles = NULL; loop->time = uv_hrtime() / 1000000; loop->async_pipefd[0] = -1; loop->async_pipefd[1] = -1; loop->signal_pipefd[0] = -1; loop->signal_pipefd[1] = -1; + loop->backend_fd = -1; loop->emfile_fd = -1; - loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags); - ev_set_userdata(loop->ev, loop); + + if (uv__platform_loop_init(loop, default_loop)) + return -1; uv_signal_init(loop, &loop->child_watcher); uv__handle_unref(&loop->child_watcher); @@ -74,9 +76,6 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { uv__handle_unref(&loop->wq_async); loop->wq_async.flags |= UV__HANDLE_INTERNAL; - if (uv__platform_loop_init(loop, default_loop)) - return -1; - return 0; } @@ -84,7 +83,6 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { void uv__loop_delete(uv_loop_t* loop) { uv__signal_loop_cleanup(loop); uv__platform_loop_delete(loop); - ev_loop_destroy(loop->ev); if (loop->async_pipefd[0] != -1) { close(loop->async_pipefd[0]); @@ -101,8 +99,23 @@ void uv__loop_delete(uv_loop_t* loop) { loop->emfile_fd = -1; } + if (loop->backend_fd != -1) { + close(loop->backend_fd); + loop->backend_fd = -1; + } + uv_mutex_lock(&loop->wq_mutex); assert(ngx_queue_empty(&loop->wq) && "thread pool work queue not empty!"); uv_mutex_unlock(&loop->wq_mutex); uv_mutex_destroy(&loop->wq_mutex); + +#if 0 + assert(ngx_queue_empty(&loop->pending_queue)); + assert(ngx_queue_empty(&loop->watcher_queue)); + assert(loop->nfds == 0); +#endif + + free(loop->watchers); + loop->watchers = NULL; + loop->nwatchers = 0; } diff --git a/src/unix/netbsd.c b/src/unix/netbsd.c index dab1b696be..d9fb426d65 100644 --- a/src/unix/netbsd.c +++ b/src/unix/netbsd.c @@ -48,7 +48,7 @@ static char *process_title; int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { - return 0; + return uv__kqueue_init(loop); } diff --git a/src/unix/openbsd.c b/src/unix/openbsd.c index 4319270912..88f20b0153 100644 --- a/src/unix/openbsd.c +++ b/src/unix/openbsd.c @@ -44,7 +44,7 @@ static char *process_title; int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { - return 0; + return uv__kqueue_init(loop); } diff --git a/src/unix/pipe.c b/src/unix/pipe.c index 860f7d2d53..7a839bec99 100644 --- a/src/unix/pipe.c +++ b/src/unix/pipe.c @@ -29,7 +29,7 @@ #include #include -static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events); +static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events); int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { @@ -57,7 +57,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { bound = 0; /* Already bound? */ - if (handle->fd >= 0) { + if (handle->io_watcher.fd >= 0) { uv__set_artificial_error(handle->loop, UV_EINVAL); goto out; } @@ -89,7 +89,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { /* Success. */ handle->pipe_fname = pipe_fname; /* Is a strdup'ed copy. */ - handle->fd = sockfd; + handle->io_watcher.fd = sockfd; status = 0; out: @@ -117,21 +117,18 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { saved_errno = errno; status = -1; - if (handle->fd == -1) { + if (handle->io_watcher.fd == -1) { uv__set_artificial_error(handle->loop, UV_EINVAL); goto out; } - assert(handle->fd >= 0); + assert(handle->io_watcher.fd >= 0); - if ((status = listen(handle->fd, backlog)) == -1) { + if ((status = listen(handle->io_watcher.fd, backlog)) == -1) { uv__set_sys_error(handle->loop, errno); } else { handle->connection_cb = cb; - uv__io_init(&handle->read_watcher, - uv__pipe_accept, - handle->fd, - UV__IO_READ); - uv__io_start(handle->loop, &handle->read_watcher); + handle->io_watcher.cb = uv__pipe_accept; + uv__io_start(handle->loop, &handle->io_watcher, UV__IO_READ); } out: @@ -175,11 +172,11 @@ void uv_pipe_connect(uv_connect_t* req, int r; saved_errno = errno; - new_sock = (handle->fd == -1); + new_sock = (handle->io_watcher.fd == -1); err = -1; if (new_sock) - if ((handle->fd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) + if ((handle->io_watcher.fd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) goto out; memset(&saddr, 0, sizeof saddr); @@ -190,7 +187,7 @@ void uv_pipe_connect(uv_connect_t* req, * is either there or not. */ do { - r = connect(handle->fd, (struct sockaddr*)&saddr, sizeof saddr); + r = connect(handle->io_watcher.fd, (struct sockaddr*)&saddr, sizeof saddr); } while (r == -1 && errno == EINTR); @@ -199,12 +196,11 @@ void uv_pipe_connect(uv_connect_t* req, if (new_sock) if (uv__stream_open((uv_stream_t*)handle, - handle->fd, + handle->io_watcher.fd, UV_STREAM_READABLE | UV_STREAM_WRITABLE)) goto out; - uv__io_start(handle->loop, &handle->read_watcher); - uv__io_start(handle->loop, &handle->write_watcher); + uv__io_start(handle->loop, &handle->io_watcher, UV__IO_READ|UV__IO_WRITE); err = 0; out: @@ -217,7 +213,7 @@ void uv_pipe_connect(uv_connect_t* req, ngx_queue_init(&req->queue); /* Run callback on next tick. */ - uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE); + uv__io_feed(handle->loop, &handle->io_watcher); /* Mimic the Windows pipe implementation, always * return 0 and let the callback handle errors. @@ -227,17 +223,17 @@ void uv_pipe_connect(uv_connect_t* req, /* TODO merge with uv__server_io()? */ -static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) { +static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_pipe_t* pipe; int saved_errno; int sockfd; saved_errno = errno; - pipe = container_of(w, uv_pipe_t, read_watcher); + pipe = container_of(w, uv_pipe_t, io_watcher); assert(pipe->type == UV_NAMED_PIPE); - sockfd = uv__accept(pipe->fd); + sockfd = uv__accept(pipe->io_watcher.fd); if (sockfd == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { uv__set_sys_error(pipe->loop, errno); @@ -248,7 +244,7 @@ static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) { pipe->connection_cb((uv_stream_t*)pipe, 0); if (pipe->accepted_fd == sockfd) { /* The user hasn't called uv_accept() yet */ - uv__io_stop(pipe->loop, &pipe->read_watcher); + uv__io_stop(pipe->loop, &pipe->io_watcher, UV__IO_READ); } } diff --git a/src/unix/poll.c b/src/unix/poll.c index 8d6e299d32..afdac3e681 100644 --- a/src/unix/poll.c +++ b/src/unix/poll.c @@ -27,15 +27,14 @@ #include -static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) { +static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_poll_t* handle; int pevents; handle = container_of(w, uv_poll_t, io_watcher); if (events & UV__IO_ERROR) { - /* An error happened. Libev has implicitly stopped the watcher, but we */ - /* need to fix the refcount. */ + uv__io_stop(loop, w, UV__IO_READ | UV__IO_WRITE); uv__handle_stop(handle); uv__set_sys_error(handle->loop, EBADF); handle->poll_cb(handle, -1, 0); @@ -54,10 +53,8 @@ static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) { int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) { uv__handle_init(loop, (uv_handle_t*) handle, UV_POLL); - handle->fd = fd; + uv__io_init(&handle->io_watcher, uv__poll_io, fd); handle->poll_cb = NULL; - uv__io_init(&handle->io_watcher, uv__poll_io, fd, 0); - return 0; } @@ -69,7 +66,7 @@ int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, static void uv__poll_stop(uv_poll_t* handle) { - uv__io_stop(handle->loop, &handle->io_watcher); + uv__io_stop(handle->loop, &handle->io_watcher, UV__IO_READ | UV__IO_WRITE); uv__handle_stop(handle); } @@ -87,10 +84,10 @@ int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) { assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0); assert(!(handle->flags & (UV_CLOSING | UV_CLOSED))); - if (pevents == 0) { - uv__poll_stop(handle); + uv__poll_stop(handle); + + if (pevents == 0) return 0; - } events = 0; if (pevents & UV_READABLE) @@ -98,12 +95,9 @@ int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) { if (pevents & UV_WRITABLE) events |= UV__IO_WRITE; - uv__io_stop(handle->loop, &handle->io_watcher); - uv__io_set(&handle->io_watcher, uv__poll_io, handle->fd, events); - uv__io_start(handle->loop, &handle->io_watcher); - - handle->poll_cb = poll_cb; + uv__io_start(handle->loop, &handle->io_watcher, events); uv__handle_start(handle); + handle->poll_cb = poll_cb; return 0; } diff --git a/src/unix/process.c b/src/unix/process.c index 8b00af5dd8..9ff722c0c1 100644 --- a/src/unix/process.c +++ b/src/unix/process.c @@ -204,7 +204,7 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) { if (container->flags & UV_INHERIT_FD) { fd = container->data.fd; } else { - fd = container->data.stream->fd; + fd = container->data.stream->io_watcher.fd; } if (fd == -1) { diff --git a/src/unix/signal.c b/src/unix/signal.c index c24bb783e9..ed2cf6e1bb 100644 --- a/src/unix/signal.c +++ b/src/unix/signal.c @@ -38,7 +38,7 @@ RB_HEAD(uv__signal_tree_s, uv_signal_s); static int uv__signal_unlock(); -static void uv__signal_event(uv_loop_t* loop, uv__io_t* watcher, int events); +static void uv__signal_event(uv_loop_t* loop, uv__io_t* w, unsigned int events); static int uv__signal_compare(uv_signal_t* w1, uv_signal_t* w2); static void uv__signal_stop(uv_signal_t* handle); @@ -212,9 +212,8 @@ static int uv__signal_loop_once_init(uv_loop_t* loop) { uv__io_init(&loop->signal_io_watcher, uv__signal_event, - loop->signal_pipefd[0], - UV__IO_READ); - uv__io_start(loop, &loop->signal_io_watcher); + loop->signal_pipefd[0]); + uv__io_start(loop, &loop->signal_io_watcher, UV__IO_READ); return 0; } @@ -329,7 +328,7 @@ int uv_signal_start(uv_signal_t* handle, uv_signal_cb signal_cb, int signum) { } -static void uv__signal_event(uv_loop_t* loop, uv__io_t* watcher, int events) { +static void uv__signal_event(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv__signal_msg_t* msg; uv_signal_t* handle; char buf[sizeof(uv__signal_msg_t) * 32]; diff --git a/src/unix/stream.c b/src/unix/stream.c index ed489f779e..c664d33456 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -38,7 +38,7 @@ static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); static void uv__read(uv_stream_t* stream); -static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events); +static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); /* Used by the accept() EMFILE party trick. */ @@ -88,7 +88,6 @@ void uv__stream_init(uv_loop_t* loop, stream->connect_req = NULL; stream->shutdown_req = NULL; stream->accepted_fd = -1; - stream->fd = -1; stream->delayed_error = 0; ngx_queue_init(&stream->write_queue); ngx_queue_init(&stream->write_completed_queue); @@ -97,8 +96,7 @@ void uv__stream_init(uv_loop_t* loop, if (loop->emfile_fd == -1) loop->emfile_fd = uv__open_cloexec("/", O_RDONLY); - uv__io_init(&stream->read_watcher, uv__stream_io, -1, 0); - uv__io_init(&stream->write_watcher, uv__stream_io, -1, 0); + uv__io_init(&stream->io_watcher, uv__stream_io, -1); } @@ -106,33 +104,24 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) { socklen_t yes; assert(fd >= 0); - stream->fd = fd; - stream->flags |= flags; if (stream->type == UV_TCP) { /* Reuse the port address if applicable. */ yes = 1; - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) { - uv__set_sys_error(stream->loop, errno); - return -1; - } - if ((stream->flags & UV_TCP_NODELAY) && - uv__tcp_nodelay((uv_tcp_t*)stream, 1)) { - return -1; - } + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) + return uv__set_sys_error(stream->loop, errno); + + if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) + return uv__set_sys_error(stream->loop, errno); /* TODO Use delay the user passed in. */ - if ((stream->flags & UV_TCP_KEEPALIVE) && - uv__tcp_keepalive((uv_tcp_t*)stream, 1, 60)) { - return -1; - } + if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60)) + return uv__set_sys_error(stream->loop, errno); } - /* Associate the fd with each watcher. */ - uv__io_set(&stream->read_watcher, uv__stream_io, fd, UV__IO_READ); - uv__io_set(&stream->write_watcher, uv__stream_io, fd, UV__IO_WRITE); + stream->io_watcher.fd = fd; return 0; } @@ -142,6 +131,7 @@ void uv__stream_destroy(uv_stream_t* stream) { uv_write_t* req; ngx_queue_t* q; + assert(!uv__io_active(&stream->io_watcher, UV__IO_READ | UV__IO_WRITE)); assert(stream->flags & UV_CLOSED); if (stream->connect_req) { @@ -232,27 +222,26 @@ static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) { } -void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) { +void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { static int use_emfile_trick = -1; uv_stream_t* stream; int fd; int r; - stream = container_of(w, uv_stream_t, read_watcher); + stream = container_of(w, uv_stream_t, io_watcher); assert(events == UV__IO_READ); + assert(stream->accepted_fd == -1); assert(!(stream->flags & UV_CLOSING)); - if (stream->accepted_fd >= 0) { - uv__io_stop(loop, &stream->read_watcher); - return; - } + if (stream->accepted_fd == -1) + uv__io_start(stream->loop, &stream->io_watcher, UV__IO_READ); /* connection_cb can close the server socket while we're * in the loop so check it on each iteration. */ - while (stream->fd != -1) { - assert(stream->accepted_fd < 0); - fd = uv__accept(stream->fd); + while (stream->io_watcher.fd != -1) { + assert(stream->accepted_fd == -1); + fd = uv__accept(stream->io_watcher.fd); if (fd == -1) { switch (errno) { @@ -273,7 +262,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) { } if (use_emfile_trick) { - SAVE_ERRNO(r = uv__emfile_trick(loop, stream->fd)); + SAVE_ERRNO(r = uv__emfile_trick(loop, stream->io_watcher.fd)); if (r == 0) continue; } @@ -292,7 +281,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) { if (stream->accepted_fd != -1) { /* The user hasn't yet accepted called uv_accept() */ - uv__io_stop(loop, &stream->read_watcher); + uv__io_stop(loop, &stream->io_watcher, UV__IO_READ); return; } @@ -333,7 +322,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { goto out; } - uv__io_start(streamServer->loop, &streamServer->read_watcher); + uv__io_start(streamServer->loop, &streamServer->io_watcher, UV__IO_READ); streamServer->accepted_fd = -1; status = 0; @@ -393,7 +382,7 @@ static void uv__drain(uv_stream_t* stream) { assert(!uv_write_queue_head(stream)); assert(stream->write_queue_size == 0); - uv__io_stop(stream->loop, &stream->write_watcher); + uv__io_stop(stream->loop, &stream->io_watcher, UV__IO_WRITE); /* Shutdown? */ if ((stream->flags & UV_STREAM_SHUTTING) && @@ -405,7 +394,7 @@ static void uv__drain(uv_stream_t* stream) { stream->shutdown_req = NULL; uv__req_unregister(stream->loop, req); - if (shutdown(stream->fd, SHUT_WR)) { + if (shutdown(stream->io_watcher.fd, SHUT_WR)) { /* Error. Report it. User should call uv_close(). */ uv__set_sys_error(stream->loop, errno); if (req->cb) { @@ -447,7 +436,7 @@ static void uv__write_req_finish(uv_write_t* req) { * callback called in the near future. */ ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue); - uv__io_feed(stream->loop, &stream->write_watcher, UV__IO_WRITE); + uv__io_feed(stream->loop, &stream->io_watcher); } @@ -469,7 +458,7 @@ static void uv__write(uv_stream_t* stream) { start: - assert(stream->fd >= 0); + assert(stream->io_watcher.fd >= 0); /* Get the request at the head of the queue. */ req = uv_write_queue_head(stream); @@ -497,7 +486,7 @@ static void uv__write(uv_stream_t* stream) { struct msghdr msg; char scratch[64]; struct cmsghdr *cmsg; - int fd_to_send = req->send_handle->fd; + int fd_to_send = req->send_handle->io_watcher.fd; assert(fd_to_send >= 0); @@ -523,15 +512,15 @@ static void uv__write(uv_stream_t* stream) { } do { - n = sendmsg(stream->fd, &msg, 0); + n = sendmsg(stream->io_watcher.fd, &msg, 0); } while (n == -1 && errno == EINTR); } else { do { if (iovcnt == 1) { - n = write(stream->fd, iov[0].iov_base, iov[0].iov_len); + n = write(stream->io_watcher.fd, iov[0].iov_base, iov[0].iov_len); } else { - n = writev(stream->fd, iov, iovcnt); + n = writev(stream->io_watcher.fd, iov, iovcnt); } } while (n == -1 && errno == EINTR); @@ -603,7 +592,7 @@ static void uv__write(uv_stream_t* stream) { assert(!(stream->flags & UV_STREAM_BLOCKING)); /* We're not done. */ - uv__io_start(stream->loop, &stream->write_watcher); + uv__io_start(stream->loop, &stream->io_watcher, UV__IO_WRITE); } @@ -680,11 +669,11 @@ static void uv__read(uv_stream_t* stream) { assert(buf.len > 0); assert(buf.base); - assert(stream->fd >= 0); + assert(stream->io_watcher.fd >= 0); if (stream->read_cb) { do { - nread = read(stream->fd, buf.base, buf.len); + nread = read(stream->io_watcher.fd, buf.base, buf.len); } while (nread < 0 && errno == EINTR); } else { @@ -700,7 +689,7 @@ static void uv__read(uv_stream_t* stream) { msg.msg_control = (void *) cmsg_space; do { - nread = recvmsg(stream->fd, &msg, 0); + nread = recvmsg(stream->io_watcher.fd, &msg, 0); } while (nread < 0 && errno == EINTR); } @@ -711,7 +700,7 @@ static void uv__read(uv_stream_t* stream) { if (errno == EAGAIN || errno == EWOULDBLOCK) { /* Wait for the next one. */ if (stream->flags & UV_STREAM_READING) { - uv__io_start(stream->loop, &stream->read_watcher); + uv__io_start(stream->loop, &stream->io_watcher, UV__IO_READ); } uv__set_sys_error(stream->loop, EAGAIN); @@ -732,15 +721,16 @@ static void uv__read(uv_stream_t* stream) { stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); } - assert(!uv__io_active(&stream->read_watcher)); + assert(!uv__io_active(&stream->io_watcher, UV__IO_READ)); return; } } else if (nread == 0) { /* EOF */ uv__set_artificial_error(stream->loop, UV_EOF); - uv__io_stop(stream->loop, &stream->read_watcher); - if (!uv__io_active(&stream->write_watcher)) + uv__io_stop(stream->loop, &stream->io_watcher, UV__IO_READ); + + if (!uv__io_active(&stream->io_watcher, UV__IO_WRITE)) uv__handle_stop(stream); if (stream->read_cb) { @@ -808,7 +798,7 @@ static void uv__read(uv_stream_t* stream) { int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) && "uv_shutdown (unix) only supports uv_handle_t right now"); - assert(stream->fd >= 0); + assert(stream->io_watcher.fd >= 0); if (!(stream->flags & UV_STREAM_WRITABLE) || stream->flags & UV_STREAM_SHUT || @@ -825,36 +815,38 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { stream->shutdown_req = req; stream->flags |= UV_STREAM_SHUTTING; - uv__io_start(stream->loop, &stream->write_watcher); + uv__io_start(stream->loop, &stream->io_watcher, UV__IO_WRITE); return 0; } -static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events) { +static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_stream_t* stream; - /* either UV__IO_READ or UV__IO_WRITE but not both */ - assert(!!(events & UV__IO_READ) ^ !!(events & UV__IO_WRITE)); - - if (events & UV__IO_READ) - stream = container_of(w, uv_stream_t, read_watcher); - else - stream = container_of(w, uv_stream_t, write_watcher); + stream = container_of(w, uv_stream_t, io_watcher); assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); assert(!(stream->flags & UV_CLOSING)); - if (stream->connect_req) + if (stream->connect_req) { uv__stream_connect(stream); - else if (events & UV__IO_READ) { - assert(stream->fd >= 0); + return; + } + + if (events & UV__IO_READ) { + assert(stream->io_watcher.fd >= 0); + uv__read(stream); + + if (stream->io_watcher.fd == -1) + return; /* read_cb closed stream. */ } - else { - assert(stream->fd >= 0); + + if (events & UV__IO_WRITE) { + assert(stream->io_watcher.fd >= 0); uv__write(stream); uv__write_callbacks(stream); } @@ -883,8 +875,8 @@ static void uv__stream_connect(uv_stream_t* stream) { stream->delayed_error = 0; } else { /* Normal situation: we need to get the socket error from the kernel. */ - assert(stream->fd >= 0); - getsockopt(stream->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize); + assert(stream->io_watcher.fd >= 0); + getsockopt(stream->io_watcher.fd, SOL_SOCKET, SO_ERROR, &error, &errorsize); } if (error == EINPROGRESS) @@ -914,7 +906,7 @@ int uv_write2(uv_write_t* req, stream->type == UV_TTY) && "uv_write (unix) does not yet support other types of streams"); - if (stream->fd < 0) { + if (stream->io_watcher.fd < 0) { uv__set_sys_error(stream->loop, EBADF); return -1; } @@ -966,7 +958,7 @@ int uv_write2(uv_write_t* req, * sufficiently flushed in uv__write. */ assert(!(stream->flags & UV_STREAM_BLOCKING)); - uv__io_start(stream->loop, &stream->write_watcher); + uv__io_start(stream->loop, &stream->io_watcher, UV__IO_WRITE); } return 0; @@ -1001,14 +993,14 @@ int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb, /* TODO: keep track of tcp state. If we've gotten a EOF then we should * not start the IO watcher. */ - assert(stream->fd >= 0); + assert(stream->io_watcher.fd >= 0); assert(alloc_cb); stream->read_cb = read_cb; stream->read2_cb = read2_cb; stream->alloc_cb = alloc_cb; - uv__io_start(stream->loop, &stream->read_watcher); + uv__io_start(stream->loop, &stream->io_watcher, UV__IO_READ); uv__handle_start(stream); return 0; @@ -1028,7 +1020,7 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, int uv_read_stop(uv_stream_t* stream) { - uv__io_stop(stream->loop, &stream->read_watcher); + uv__io_stop(stream->loop, &stream->io_watcher, UV__IO_READ); uv__handle_stop(stream); stream->flags &= ~UV_STREAM_READING; stream->read_cb = NULL; @@ -1050,16 +1042,15 @@ int uv_is_writable(const uv_stream_t* stream) { void uv__stream_close(uv_stream_t* handle) { uv_read_stop(handle); - uv__io_stop(handle->loop, &handle->write_watcher); + uv__io_stop(handle->loop, &handle->io_watcher, UV__IO_WRITE); - close(handle->fd); - handle->fd = -1; + close(handle->io_watcher.fd); + handle->io_watcher.fd = -1; if (handle->accepted_fd >= 0) { close(handle->accepted_fd); handle->accepted_fd = -1; } - assert(!uv__io_active(&handle->read_watcher)); - assert(!uv__io_active(&handle->write_watcher)); + assert(!uv__io_active(&handle->io_watcher, UV__IO_READ|UV__IO_WRITE)); } diff --git a/src/unix/sunos.c b/src/unix/sunos.c index bf1352480d..21766203c6 100644 --- a/src/unix/sunos.c +++ b/src/unix/sunos.c @@ -65,14 +65,163 @@ int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { loop->fs_fd = -1; + loop->backend_fd = port_create(); + + if (loop->backend_fd == -1) + return -1; + + uv__cloexec(loop->backend_fd, 1); + return 0; } void uv__platform_loop_delete(uv_loop_t* loop) { - if (loop->fs_fd == -1) return; - close(loop->fs_fd); - loop->fs_fd = -1; + if (loop->fs_fd != -1) { + close(loop->fs_fd); + loop->fs_fd = -1; + } + + if (loop->backend_fd != -1) { + close(loop->backend_fd); + loop->backend_fd = -1; + } +} + + +void uv__io_poll(uv_loop_t* loop, int timeout) { + struct port_event events[1024]; + struct port_event* pe; + struct timespec spec; + ngx_queue_t* q; + uv__io_t* w; + uint64_t base; + uint64_t diff; + unsigned int nfds; + unsigned int i; + int saved_errno; + int nevents; + int count; + int fd; + + if (loop->nfds == 0) { + assert(ngx_queue_empty(&loop->watcher_queue)); + return; + } + + while (!ngx_queue_empty(&loop->watcher_queue)) { + q = ngx_queue_head(&loop->watcher_queue); + ngx_queue_remove(q); + ngx_queue_init(q); + + w = ngx_queue_data(q, uv__io_t, watcher_queue); + assert(w->pevents != 0); + + if (port_associate(loop->backend_fd, PORT_SOURCE_FD, w->fd, w->pevents, 0)) + abort(); + + w->events = w->pevents; + } + + assert(timeout >= -1); + base = loop->time; + count = 48; /* Benchmarks suggest this gives the best throughput. */ + + for (;;) { + if (timeout != -1) { + spec.tv_sec = timeout / 1000; + spec.tv_nsec = (timeout % 1000) * 1000000; + } + + /* Work around a kernel bug where nfds is not updated. */ + events[0].portev_source = 0; + + nfds = 1; + saved_errno = 0; + if (port_getn(loop->backend_fd, + events, + ARRAY_SIZE(events), + &nfds, + timeout == -1 ? NULL : &spec)) { + /* Work around another kernel bug: port_getn() may return events even + * on error. + */ + if (errno == EINTR || errno == ETIME) + saved_errno = errno; + else + abort(); + } + + if (events[0].portev_source == 0) { + if (timeout == 0) + return; + + if (timeout == -1) + continue; + + goto update_timeout; + } + + if (nfds == 0) { + assert(timeout != -1); + return; + } + + nevents = 0; + + for (i = 0; i < nfds; i++) { + pe = events + i; + fd = pe->portev_object; + + assert(fd >= 0); + assert((unsigned) fd < loop->nwatchers); + + w = loop->watchers[fd]; + + /* File descriptor that we've stopped watching, ignore. */ + if (w == NULL) + continue; + + w->cb(loop, w, pe->portev_events); + nevents++; + + /* Events Ports operates in oneshot mode, rearm timer on next run. */ + if (w->pevents != 0 && ngx_queue_empty(&w->watcher_queue)) + ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); + } + + if (nevents != 0) { + if (nfds == ARRAY_SIZE(events) && --count != 0) { + /* Poll for more events but don't block this time. */ + timeout = 0; + continue; + } + return; + } + + if (saved_errno == ETIME) { + assert(timeout != -1); + return; + } + + if (timeout == 0) + return; + + if (timeout == -1) + continue; + +update_timeout: + assert(timeout > 0); + + diff = uv_hrtime() / 1000000; + assert(diff >= base); + diff -= base; + + if (diff >= (uint64_t) timeout) + return; + + timeout -= diff; + } } @@ -139,7 +288,9 @@ static void uv__fs_event_rearm(uv_fs_event_t *handle) { } -static void uv__fs_event_read(uv_loop_t* loop, uv__io_t* w, int revents) { +static void uv__fs_event_read(uv_loop_t* loop, + uv__io_t* w, + unsigned int revents) { uv_fs_event_t *handle = NULL; timespec_t timeout; port_event_t pe; @@ -216,8 +367,8 @@ int uv_fs_event_init(uv_loop_t* loop, uv__fs_event_rearm(handle); if (first_run) { - uv__io_init(&loop->fs_event_watcher, uv__fs_event_read, portfd, UV__IO_READ); - uv__io_start(loop, &loop->fs_event_watcher); + uv__io_init(&loop->fs_event_watcher, uv__fs_event_read, portfd); + uv__io_start(loop, &loop->fs_event_watcher, UV__IO_READ); } return 0; diff --git a/src/unix/tcp.c b/src/unix/tcp.c index 4325c5b120..0809238cf1 100644 --- a/src/unix/tcp.c +++ b/src/unix/tcp.c @@ -37,7 +37,7 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) { static int maybe_new_socket(uv_tcp_t* handle, int domain, int flags) { int sockfd; - if (handle->fd != -1) + if (handle->io_watcher.fd != -1) return 0; sockfd = uv__socket(domain, SOCK_STREAM, 0); @@ -68,7 +68,7 @@ static int uv__bind(uv_tcp_t* tcp, return -1; tcp->delayed_error = 0; - if (bind(tcp->fd, addr, addrsize) == -1) { + if (bind(tcp->io_watcher.fd, addr, addrsize) == -1) { if (errno == EADDRINUSE) { tcp->delayed_error = errno; } else { @@ -105,7 +105,7 @@ static int uv__connect(uv_connect_t* req, handle->delayed_error = 0; do - r = connect(handle->fd, addr, addrlen); + r = connect(handle->io_watcher.fd, addr, addrlen); while (r == -1 && errno == EINTR); if (r == -1) { @@ -127,10 +127,10 @@ static int uv__connect(uv_connect_t* req, ngx_queue_init(&req->queue); handle->connect_req = req; - uv__io_start(handle->loop, &handle->write_watcher); + uv__io_start(handle->loop, &handle->io_watcher, UV__IO_WRITE); if (handle->delayed_error) - uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE); + uv__io_feed(handle->loop, &handle->io_watcher); return 0; } @@ -174,7 +174,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name, goto out; } - if (handle->fd < 0) { + if (handle->io_watcher.fd < 0) { uv__set_sys_error(handle->loop, EINVAL); rv = -1; goto out; @@ -183,7 +183,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name, /* sizeof(socklen_t) != sizeof(int) on some systems. */ socklen = (socklen_t)*namelen; - if (getsockname(handle->fd, name, &socklen) == -1) { + if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) { uv__set_sys_error(handle->loop, errno); rv = -1; } else { @@ -211,7 +211,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name, goto out; } - if (handle->fd < 0) { + if (handle->io_watcher.fd < 0) { uv__set_sys_error(handle->loop, EINVAL); rv = -1; goto out; @@ -220,7 +220,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name, /* sizeof(socklen_t) != sizeof(int) on some systems. */ socklen = (socklen_t)*namelen; - if (getpeername(handle->fd, name, &socklen) == -1) { + if (getpeername(handle->io_watcher.fd, name, &socklen) == -1) { uv__set_sys_error(handle->loop, errno); rv = -1; } else { @@ -250,14 +250,14 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { if (maybe_new_socket(tcp, AF_INET, UV_STREAM_READABLE)) return -1; - if (listen(tcp->fd, backlog)) + if (listen(tcp->io_watcher.fd, backlog)) return uv__set_sys_error(tcp->loop, errno); tcp->connection_cb = cb; /* Start listening for connections. */ - uv__io_set(&tcp->read_watcher, uv__server_io, tcp->fd, UV__IO_READ); - uv__io_start(tcp->loop, &tcp->read_watcher); + tcp->io_watcher.cb = uv__server_io; + uv__io_start(tcp->loop, &tcp->io_watcher, UV__IO_READ); return 0; } @@ -293,63 +293,38 @@ int uv__tcp_connect6(uv_connect_t* req, } -int uv__tcp_nodelay(uv_tcp_t* handle, int enable) { - if (setsockopt(handle->fd, - IPPROTO_TCP, - TCP_NODELAY, - &enable, - sizeof enable) == -1) { - uv__set_sys_error(handle->loop, errno); - return -1; - } - return 0; +int uv__tcp_nodelay(int fd, int on) { + return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); } -int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) { - if (setsockopt(handle->fd, - SOL_SOCKET, - SO_KEEPALIVE, - &enable, - sizeof enable) == -1) { - uv__set_sys_error(handle->loop, errno); +int uv__tcp_keepalive(int fd, int on, unsigned int delay) { + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on))) return -1; - } #ifdef TCP_KEEPIDLE - if (enable && setsockopt(handle->fd, - IPPROTO_TCP, - TCP_KEEPIDLE, - &delay, - sizeof delay) == -1) { - uv__set_sys_error(handle->loop, errno); + if (on && setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &delay, sizeof(delay))) return -1; - } #endif /* Solaris/SmartOS, if you don't support keep-alive, * then don't advertise it in your system headers... */ #if defined(TCP_KEEPALIVE) && !defined(__sun) - if (enable && setsockopt(handle->fd, - IPPROTO_TCP, - TCP_KEEPALIVE, - &delay, - sizeof delay) == -1) { - uv__set_sys_error(handle->loop, errno); + if (on && setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &delay, sizeof(delay))) return -1; - } #endif return 0; } -int uv_tcp_nodelay(uv_tcp_t* handle, int enable) { - if (handle->fd != -1 && uv__tcp_nodelay(handle, enable)) - return -1; +int uv_tcp_nodelay(uv_tcp_t* handle, int on) { + if (handle->io_watcher.fd != -1) + if (uv__tcp_nodelay(handle->io_watcher.fd, on)) + return -1; - if (enable) + if (on) handle->flags |= UV_TCP_NODELAY; else handle->flags &= ~UV_TCP_NODELAY; @@ -358,25 +333,26 @@ int uv_tcp_nodelay(uv_tcp_t* handle, int enable) { } -int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) { - if (handle->fd != -1 && uv__tcp_keepalive(handle, enable, delay)) - return -1; +int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) { + if (handle->io_watcher.fd != -1) + if (uv__tcp_keepalive(handle->io_watcher.fd, on, delay)) + return -1; - if (enable) + if (on) handle->flags |= UV_TCP_KEEPALIVE; else handle->flags &= ~UV_TCP_KEEPALIVE; - /* TODO Store delay if handle->fd == -1 but don't want to enlarge - * uv_tcp_t with an int that's almost never used... + /* TODO Store delay if handle->io_watcher.fd == -1 but don't want to enlarge + * uv_tcp_t with an int that's almost never used... */ return 0; } -int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) { - if (enable) +int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int on) { + if (on) handle->flags |= UV_TCP_SINGLE_ACCEPT; else handle->flags &= ~UV_TCP_SINGLE_ACCEPT; diff --git a/src/unix/tty.c b/src/unix/tty.c index 3ef9064a10..5c1e3609af 100644 --- a/src/unix/tty.c +++ b/src/unix/tty.c @@ -51,8 +51,10 @@ int uv_tty_init(uv_loop_t* loop, uv_tty_t* tty, int fd, int readable) { int uv_tty_set_mode(uv_tty_t* tty, int mode) { - int fd = tty->fd; struct termios raw; + int fd; + + fd = tty->io_watcher.fd; if (mode && tty->mode == 0) { /* on */ @@ -103,7 +105,7 @@ int uv_tty_set_mode(uv_tty_t* tty, int mode) { int uv_tty_get_winsize(uv_tty_t* tty, int* width, int* height) { struct winsize ws; - if (ioctl(tty->fd, TIOCGWINSZ, &ws) < 0) { + if (ioctl(tty->io_watcher.fd, TIOCGWINSZ, &ws) < 0) { uv__set_sys_error(tty->loop, errno); return -1; } diff --git a/src/unix/udp.c b/src/unix/udp.c index 6f5a3f14e7..b90033435e 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -31,41 +31,19 @@ static void uv__udp_run_completed(uv_udp_t* handle); static void uv__udp_run_pending(uv_udp_t* handle); -static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents); -static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents); +static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents); +static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents); +static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents); static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain); static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb); -static void uv__udp_start_watcher(uv_udp_t* handle, - uv__io_t* w, - uv__io_cb cb, - int events) { - if (uv__io_active(w)) return; - uv__io_init(w, cb, handle->fd, events); - uv__io_start(handle->loop, w); - uv__handle_start(handle); -} - - -static void uv__udp_stop_watcher(uv_udp_t* handle, uv__io_t* w) { - if (!uv__io_active(w)) return; - uv__io_stop(handle->loop, w); - - if (!uv__io_active(&handle->read_watcher) && - !uv__io_active(&handle->write_watcher)) - { - uv__handle_stop(handle); - } -} - - void uv__udp_close(uv_udp_t* handle) { - uv__udp_stop_watcher(handle, &handle->write_watcher); - uv__udp_stop_watcher(handle, &handle->read_watcher); - close(handle->fd); - handle->fd = -1; + uv__io_stop(handle->loop, &handle->io_watcher, UV__IO_READ|UV__IO_WRITE); + uv__handle_stop(handle); + close(handle->io_watcher.fd); + handle->io_watcher.fd = -1; } @@ -73,9 +51,8 @@ void uv__udp_finish_close(uv_udp_t* handle) { uv_udp_send_t* req; ngx_queue_t* q; - assert(!uv__io_active(&handle->write_watcher)); - assert(!uv__io_active(&handle->read_watcher)); - assert(handle->fd == -1); + assert(!uv__io_active(&handle->io_watcher, UV__IO_READ|UV__IO_WRITE)); + assert(handle->io_watcher.fd == -1); uv__udp_run_completed(handle); @@ -126,7 +103,7 @@ static void uv__udp_run_pending(uv_udp_t* handle) { h.msg_iovlen = req->bufcnt; do { - size = sendmsg(handle->fd, &h, 0); + size = sendmsg(handle->io_watcher.fd, &h, 0); } while (size == -1 && errno == EINTR); @@ -194,7 +171,18 @@ static void uv__udp_run_completed(uv_udp_t* handle) { } -static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { +static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { + if (revents & UV__IO_READ) + uv__udp_recvmsg(loop, w, revents); + + if (revents & UV__IO_WRITE) + uv__udp_sendmsg(loop, w, revents); +} + + +static void uv__udp_recvmsg(uv_loop_t* loop, + uv__io_t* w, + unsigned int revents) { struct sockaddr_storage peer; struct msghdr h; uv_udp_t* handle; @@ -203,7 +191,7 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { int flags; int count; - handle = container_of(w, uv_udp_t, read_watcher); + handle = container_of(w, uv_udp_t, io_watcher); assert(handle->type == UV_UDP); assert(revents & UV__IO_READ); @@ -228,7 +216,7 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { h.msg_iovlen = 1; do { - nread = recvmsg(handle->fd, &h, 0); + nread = recvmsg(handle->io_watcher.fd, &h, 0); } while (nread == -1 && errno == EINTR); @@ -258,15 +246,17 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { /* recv_cb callback may decide to pause or close the handle */ while (nread != -1 && count-- > 0 - && handle->fd != -1 + && handle->io_watcher.fd != -1 && handle->recv_cb != NULL); } -static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents) { +static void uv__udp_sendmsg(uv_loop_t* loop, + uv__io_t* w, + unsigned int revents) { uv_udp_t* handle; - handle = container_of(w, uv_udp_t, write_watcher); + handle = container_of(w, uv_udp_t, io_watcher); assert(handle->type == UV_UDP); assert(revents & UV__IO_WRITE); @@ -281,11 +271,14 @@ static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents) { if (!ngx_queue_empty(&handle->write_completed_queue)) { /* Schedule completion callbacks. */ - uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE); + uv__io_feed(handle->loop, &handle->io_watcher); } else if (ngx_queue_empty(&handle->write_queue)) { /* Pending queue and completion queue empty, stop watcher. */ - uv__udp_stop_watcher(handle, &handle->write_watcher); + uv__io_stop(loop, &handle->io_watcher, UV__IO_WRITE); + + if (!uv__io_active(&handle->io_watcher, UV__IO_READ)) + uv__handle_stop(handle); } } @@ -316,15 +309,15 @@ static int uv__bind(uv_udp_t* handle, goto out; } - if (handle->fd == -1) { + if (handle->io_watcher.fd == -1) { if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) { uv__set_sys_error(handle->loop, errno); goto out; } - handle->fd = fd; + handle->io_watcher.fd = fd; } - fd = handle->fd; + fd = handle->io_watcher.fd; yes = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) { uv__set_sys_error(handle->loop, errno); @@ -365,12 +358,13 @@ static int uv__bind(uv_udp_t* handle, goto out; } + handle->io_watcher.fd = fd; status = 0; out: if (status) { - close(handle->fd); - handle->fd = -1; + close(handle->io_watcher.fd); + handle->io_watcher.fd = -1; } errno = saved_errno; @@ -384,7 +378,7 @@ static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) { assert(domain == AF_INET || domain == AF_INET6); - if (handle->fd != -1) + if (handle->io_watcher.fd != -1) return 0; switch (domain) { @@ -442,14 +436,11 @@ static int uv__udp_send(uv_udp_send_t* req, uv__set_sys_error(handle->loop, ENOMEM); return -1; } - memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0])); + memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0])); ngx_queue_insert_tail(&handle->write_queue, &req->queue); - - uv__udp_start_watcher(handle, - &handle->write_watcher, - uv__udp_sendmsg, - UV__IO_WRITE); + uv__io_start(handle->loop, &handle->io_watcher, UV__IO_WRITE); + uv__handle_start(handle); return 0; } @@ -459,9 +450,10 @@ int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) { memset(handle, 0, sizeof *handle); uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP); - handle->fd = -1; + handle->io_watcher.fd = -1; ngx_queue_init(&handle->write_queue); ngx_queue_init(&handle->write_completed_queue); + uv__io_init(&handle->io_watcher, uv__udp_io, -1); return 0; } @@ -494,7 +486,7 @@ int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) { status = -1; /* Check for already active socket. */ - if (handle->fd != -1) { + if (handle->io_watcher.fd != -1) { uv__set_artificial_error(handle->loop, UV_EALREADY); goto out; } @@ -521,7 +513,7 @@ int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) { } #endif - handle->fd = sock; + handle->io_watcher.fd = sock; status = 0; out: @@ -557,7 +549,7 @@ int uv_udp_set_membership(uv_udp_t* handle, const char* multicast_addr, return -1; } - if (setsockopt(handle->fd, IPPROTO_IP, optname, (void*) &mreq, sizeof mreq) == -1) { + if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, optname, (void*) &mreq, sizeof mreq) == -1) { uv__set_sys_error(handle->loop, errno); return -1; } @@ -576,7 +568,7 @@ static int uv__setsockopt_maybe_char(uv_udp_t* handle, int option, int val) { if (val < 0 || val > 255) return uv__set_sys_error(handle->loop, EINVAL); - if (setsockopt(handle->fd, IPPROTO_IP, option, &arg, sizeof(arg))) + if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, option, &arg, sizeof(arg))) return uv__set_sys_error(handle->loop, errno); return 0; @@ -584,7 +576,7 @@ static int uv__setsockopt_maybe_char(uv_udp_t* handle, int option, int val) { int uv_udp_set_broadcast(uv_udp_t* handle, int on) { - if (setsockopt(handle->fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) + if (setsockopt(handle->io_watcher.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) return uv__set_sys_error(handle->loop, errno); return 0; @@ -595,7 +587,7 @@ int uv_udp_set_ttl(uv_udp_t* handle, int ttl) { if (ttl < 1 || ttl > 255) return uv__set_sys_error(handle->loop, EINVAL); - if (setsockopt(handle->fd, IPPROTO_IP, IP_TTL, &ttl, sizeof(ttl))) + if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, IP_TTL, &ttl, sizeof(ttl))) return uv__set_sys_error(handle->loop, errno); return 0; @@ -620,7 +612,7 @@ int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, int* namelen) { /* Don't clobber errno. */ saved_errno = errno; - if (handle->fd < 0) { + if (handle->io_watcher.fd == -1) { uv__set_sys_error(handle->loop, EINVAL); rv = -1; goto out; @@ -629,7 +621,7 @@ int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, int* namelen) { /* sizeof(socklen_t) != sizeof(int) on some systems. */ socklen = (socklen_t)*namelen; - if (getsockname(handle->fd, name, &socklen) == -1) { + if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) { uv__set_sys_error(handle->loop, errno); rv = -1; } else { @@ -682,7 +674,7 @@ int uv_udp_recv_start(uv_udp_t* handle, return -1; } - if (uv__io_active(&handle->read_watcher)) { + if (uv__io_active(&handle->io_watcher, UV__IO_READ)) { uv__set_artificial_error(handle->loop, UV_EALREADY); return -1; } @@ -693,18 +685,21 @@ int uv_udp_recv_start(uv_udp_t* handle, handle->alloc_cb = alloc_cb; handle->recv_cb = recv_cb; - uv__udp_start_watcher(handle, - &handle->read_watcher, - uv__udp_recvmsg, - UV__IO_READ); + uv__io_start(handle->loop, &handle->io_watcher, UV__IO_READ); + uv__handle_start(handle); return 0; } int uv_udp_recv_stop(uv_udp_t* handle) { - uv__udp_stop_watcher(handle, &handle->read_watcher); + uv__io_stop(handle->loop, &handle->io_watcher, UV__IO_READ); + + if (!uv__io_active(&handle->io_watcher, UV__IO_WRITE)) + uv__handle_stop(handle); + handle->alloc_cb = NULL; handle->recv_cb = NULL; + return 0; }