From 63fd47564392af34d33bec66aebb36c067e05eba Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 24 Oct 2023 16:49:28 +1300 Subject: [PATCH 1/2] Ensure that the saved node is freed correctly if an exception occurs. --- ext/io/event/selector/epoll.c | 65 +++++++++++++++++++++--------- ext/io/event/selector/kqueue.c | 73 +++++++++++++++++++++++----------- ext/io/event/selector/list.h | 3 +- 3 files changed, 98 insertions(+), 43 deletions(-) diff --git a/ext/io/event/selector/epoll.c b/ext/io/event/selector/epoll.c index dcacedc7..37ea26ac 100644 --- a/ext/io/event/selector/epoll.c +++ b/ext/io/event/selector/epoll.c @@ -804,9 +804,11 @@ struct select_arguments { int count; struct epoll_event events[EPOLL_MAX_EVENTS]; - + struct timespec * timeout; struct timespec storage; + + struct IO_Event_List saved; }; static int make_timeout_ms(struct timespec * timeout) { @@ -883,7 +885,7 @@ void select_internal_with_gvl(struct select_arguments *arguments) { } static -int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, const struct epoll_event *event) +int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, const struct epoll_event *event, struct IO_Event_List *saved) { int descriptor = event->data.fd; @@ -893,29 +895,32 @@ int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, con struct IO_Event_Selector_EPoll_Descriptor *epoll_descriptor = IO_Event_Selector_EPoll_Descriptor_lookup(selector, descriptor); struct IO_Event_List *list = &epoll_descriptor->list; struct IO_Event_List *node = list->tail; - struct IO_Event_List saved = {NULL, NULL}; // Reset the events back to 0 so that we can re-arm if necessary: epoll_descriptor->waiting_events = 0; + if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d epoll_descriptor=%p\n", descriptor, ready_events, epoll_descriptor); + // It's possible (but unlikely) that the address of list will changing during iteration. while (node != list) { + if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: node=%p list=%p type=%p\n", node, list, node->type); + struct IO_Event_Selector_EPoll_Waiting *waiting = (struct IO_Event_Selector_EPoll_Waiting *)node; // Compute the intersection of the events we are waiting for and the events that occured: enum IO_Event matching_events = waiting->events & ready_events; - if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d, matching_events=%d\n", descriptor, ready_events, matching_events); + if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d, waiting_events=%d, matching_events=%d\n", descriptor, ready_events, waiting->events, matching_events); if (matching_events) { - IO_Event_List_append(node, &saved); + IO_Event_List_append(node, saved); // Resume the fiber: waiting->ready = matching_events; IO_Event_Selector_fiber_transfer(waiting->fiber, 0, NULL); - node = saved.tail; - IO_Event_List_pop(&saved); + node = saved->tail; + IO_Event_List_pop(saved); } else { // We are still waiting for the events: epoll_descriptor->waiting_events |= waiting->events; @@ -926,6 +931,36 @@ int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, con return IO_Event_Selector_EPoll_Descriptor_update(selector, epoll_descriptor->io, descriptor, epoll_descriptor); } +static +VALUE select_handle_events(VALUE _arguments) +{ + struct select_arguments *arguments = (struct select_arguments *)_arguments; + struct IO_Event_Selector_EPoll *selector = arguments->selector; + + for (int i = 0; i < arguments->count; i += 1) { + const struct epoll_event *event = &arguments->events[i]; + if (DEBUG) fprintf(stderr, "-> fd=%d events=%d\n", event->data.fd, event->events); + + if (event->data.fd >= 0) { + IO_Event_Selector_EPoll_handle(selector, event, &arguments->saved); + } else { + IO_Event_Interrupt_clear(&selector->interrupt); + } + } + + return INT2NUM(arguments->count); +} + +static +VALUE select_handle_events_ensure(VALUE _arguments) +{ + struct select_arguments *arguments = (struct select_arguments *)_arguments; + + IO_Event_List_free(&arguments->saved); + + return Qnil; +} + // TODO This function is not re-entrant and we should document and assert as such. VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) { struct IO_Event_Selector_EPoll *selector = NULL; @@ -939,6 +974,7 @@ VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) { .tv_sec = 0, .tv_nsec = 0 }, + .saved = {}, }; arguments.timeout = &arguments.storage; @@ -960,18 +996,11 @@ VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) { } } - for (int i = 0; i < arguments.count; i += 1) { - const struct epoll_event *event = &arguments.events[i]; - if (DEBUG) fprintf(stderr, "-> ptr=%p events=%d\n", event->data.ptr, event->events); - - if (event->data.fd >= 0) { - IO_Event_Selector_EPoll_handle(selector, event); - } else { - IO_Event_Interrupt_clear(&selector->interrupt); - } + if (arguments.count) { + return rb_ensure(select_handle_events, (VALUE)&arguments, select_handle_events_ensure, (VALUE)&arguments); + } else { + return RB_INT2NUM(0); } - - return INT2NUM(arguments.count); } VALUE IO_Event_Selector_EPoll_wakeup(VALUE self) { diff --git a/ext/io/event/selector/kqueue.c b/ext/io/event/selector/kqueue.c index 2b95b0de..6c14fa65 100644 --- a/ext/io/event/selector/kqueue.c +++ b/ext/io/event/selector/kqueue.c @@ -821,6 +821,8 @@ struct select_arguments { struct timespec storage; struct timespec *timeout; + + struct IO_Event_List saved; }; static @@ -862,7 +864,7 @@ void select_internal_with_gvl(struct select_arguments *arguments) { } static -int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, uintptr_t identifier, struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor) +int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, uintptr_t identifier, struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor, struct IO_Event_List *saved) { // This is the mask of all events that occured for the given descriptor: enum IO_Event ready_events = kqueue_descriptor->ready_events; @@ -877,7 +879,6 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u struct IO_Event_List *list = &kqueue_descriptor->list; struct IO_Event_List *node = list->tail; - struct IO_Event_List saved = {NULL, NULL}; // Reset the events back to 0 so that we can re-arm if necessary: kqueue_descriptor->waiting_events = 0; @@ -891,13 +892,13 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u if (DEBUG) fprintf(stderr, "IO_Event_Selector_KQueue_handle: identifier=%lu, ready_events=%d, matching_events=%d\n", identifier, ready_events, matching_events); if (matching_events) { - IO_Event_List_append(node, &saved); + IO_Event_List_append(node, saved); waiting->ready = matching_events; IO_Event_Selector_fiber_transfer(waiting->fiber, 0, NULL); - node = saved.tail; - IO_Event_List_pop(&saved); + node = saved->tail; + IO_Event_List_pop(saved); } else { kqueue_descriptor->waiting_events |= waiting->events; node = node->tail; @@ -907,6 +908,43 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u return IO_Event_Selector_KQueue_Descriptor_update(selector, identifier, kqueue_descriptor); } +static +VALUE select_handle_events(VALUE _arguments) +{ + struct select_arguments *arguments = (struct select_arguments *)_arguments; + struct IO_Event_Selector_KQueue *selector = arguments->selector; + + for (int i = 0; i < arguments.count; i += 1) { + if (arguments.events[i].udata) { + struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata; + kqueue_descriptor->ready_events |= events_from_kevent_filter(arguments.events[i].filter); + } + } + + for (int i = 0; i < arguments.count; i += 1) { + if (arguments.events[i].udata) { + struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata; + IO_Event_Selector_KQueue_handle(selector, arguments.events[i].ident, kqueue_descriptor, &arguments.saved); + } else { +#ifdef IO_EVENT_SELECTOR_KQUEUE_USE_INTERRUPT + IO_Event_Interrupt_clear(&selector->interrupt); +#endif + } + } + + return RB_INT2NUM(arguments.count); +} + +static +VALUE select_handle_events_ensure(VALUE _arguments) +{ + struct select_arguments *arguments = (struct select_arguments *)_arguments; + + IO_Event_List_free(arguments->saved); + + return Qnil; +} + VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) { struct IO_Event_Selector_KQueue *selector = NULL; TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, selector); @@ -919,7 +957,8 @@ VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) { .storage = { .tv_sec = 0, .tv_nsec = 0 - } + }, + .saved = {}, }; arguments.timeout = &arguments.storage; @@ -951,25 +990,11 @@ VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) { } } - for (int i = 0; i < arguments.count; i += 1) { - if (arguments.events[i].udata) { - struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata; - kqueue_descriptor->ready_events |= events_from_kevent_filter(arguments.events[i].filter); - } - } - - for (int i = 0; i < arguments.count; i += 1) { - if (arguments.events[i].udata) { - struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata; - IO_Event_Selector_KQueue_handle(selector, arguments.events[i].ident, kqueue_descriptor); - } else { -#ifdef IO_EVENT_SELECTOR_KQUEUE_USE_INTERRUPT - IO_Event_Interrupt_clear(&selector->interrupt); -#endif - } + if (arguments.count) { + return rb_ensure(select_handle_events, (VALUE)&arguments, select_handle_events_ensure, (VALUE)&arguments); + } else { + return RB_INT2NUM(0); } - - return RB_INT2NUM(arguments.count); } VALUE IO_Event_Selector_KQueue_wakeup(VALUE self) { diff --git a/ext/io/event/selector/list.h b/ext/io/event/selector/list.h index 45bb5841..1eac1225 100644 --- a/ext/io/event/selector/list.h +++ b/ext/io/event/selector/list.h @@ -21,6 +21,7 @@ inline static void IO_Event_List_initialize(struct IO_Event_List *list) inline static void IO_Event_List_clear(struct IO_Event_List *list) { list->head = list->tail = NULL; + list->type = 0; } // Append an item to the end of the list. @@ -64,7 +65,7 @@ inline static void IO_Event_List_pop(struct IO_Event_List *node) inline static void IO_Event_List_free(struct IO_Event_List *node) { - if (node->head != node->tail) { + if (node->head && node->tail) { IO_Event_List_pop(node); } } From 586f1fbb10315b0a5ba9576e3dfd1d0994d9cad4 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 24 Oct 2023 16:56:03 +1300 Subject: [PATCH 2/2] Fix pointer usage in kqueue implementation. --- ext/io/event/selector/kqueue.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ext/io/event/selector/kqueue.c b/ext/io/event/selector/kqueue.c index 6c14fa65..53fa8253 100644 --- a/ext/io/event/selector/kqueue.c +++ b/ext/io/event/selector/kqueue.c @@ -914,17 +914,17 @@ VALUE select_handle_events(VALUE _arguments) struct select_arguments *arguments = (struct select_arguments *)_arguments; struct IO_Event_Selector_KQueue *selector = arguments->selector; - for (int i = 0; i < arguments.count; i += 1) { - if (arguments.events[i].udata) { - struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata; - kqueue_descriptor->ready_events |= events_from_kevent_filter(arguments.events[i].filter); + for (int i = 0; i < arguments->count; i += 1) { + if (arguments->events[i].udata) { + struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments->events[i].udata; + kqueue_descriptor->ready_events |= events_from_kevent_filter(arguments->events[i].filter); } } - for (int i = 0; i < arguments.count; i += 1) { - if (arguments.events[i].udata) { - struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata; - IO_Event_Selector_KQueue_handle(selector, arguments.events[i].ident, kqueue_descriptor, &arguments.saved); + for (int i = 0; i < arguments->count; i += 1) { + if (arguments->events[i].udata) { + struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments->events[i].udata; + IO_Event_Selector_KQueue_handle(selector, arguments->events[i].ident, kqueue_descriptor, &arguments->saved); } else { #ifdef IO_EVENT_SELECTOR_KQUEUE_USE_INTERRUPT IO_Event_Interrupt_clear(&selector->interrupt); @@ -932,7 +932,7 @@ VALUE select_handle_events(VALUE _arguments) } } - return RB_INT2NUM(arguments.count); + return RB_INT2NUM(arguments->count); } static @@ -940,7 +940,7 @@ VALUE select_handle_events_ensure(VALUE _arguments) { struct select_arguments *arguments = (struct select_arguments *)_arguments; - IO_Event_List_free(arguments->saved); + IO_Event_List_free(&arguments->saved); return Qnil; }