Skip to content

Commit

Permalink
Ensure that the saved node is freed correctly if an exception occurs. (
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Oct 24, 2023
1 parent b7954d6 commit be3b547
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 43 deletions.
65 changes: 47 additions & 18 deletions ext/io/event/selector/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,11 @@ struct select_arguments {

int count;
struct epoll_event events[EPOLL_MAX_EVENTS];

struct timespec * timeout;
struct timespec storage;

struct IO_Event_List saved;
};

static int make_timeout_ms(struct timespec * timeout) {
Expand Down Expand Up @@ -883,7 +885,7 @@ void select_internal_with_gvl(struct select_arguments *arguments) {
}

static
int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, const struct epoll_event *event)
int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, const struct epoll_event *event, struct IO_Event_List *saved)
{
int descriptor = event->data.fd;

Expand All @@ -893,29 +895,32 @@ int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, con
struct IO_Event_Selector_EPoll_Descriptor *epoll_descriptor = IO_Event_Selector_EPoll_Descriptor_lookup(selector, descriptor);
struct IO_Event_List *list = &epoll_descriptor->list;
struct IO_Event_List *node = list->tail;
struct IO_Event_List saved = {NULL, NULL};

// Reset the events back to 0 so that we can re-arm if necessary:
epoll_descriptor->waiting_events = 0;

if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d epoll_descriptor=%p\n", descriptor, ready_events, epoll_descriptor);

// It's possible (but unlikely) that the address of list will changing during iteration.
while (node != list) {
if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: node=%p list=%p type=%p\n", node, list, node->type);

struct IO_Event_Selector_EPoll_Waiting *waiting = (struct IO_Event_Selector_EPoll_Waiting *)node;

// Compute the intersection of the events we are waiting for and the events that occured:
enum IO_Event matching_events = waiting->events & ready_events;

if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d, matching_events=%d\n", descriptor, ready_events, matching_events);
if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d, waiting_events=%d, matching_events=%d\n", descriptor, ready_events, waiting->events, matching_events);

if (matching_events) {
IO_Event_List_append(node, &saved);
IO_Event_List_append(node, saved);

// Resume the fiber:
waiting->ready = matching_events;
IO_Event_Selector_fiber_transfer(waiting->fiber, 0, NULL);

node = saved.tail;
IO_Event_List_pop(&saved);
node = saved->tail;
IO_Event_List_pop(saved);
} else {
// We are still waiting for the events:
epoll_descriptor->waiting_events |= waiting->events;
Expand All @@ -926,6 +931,36 @@ int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, con
return IO_Event_Selector_EPoll_Descriptor_update(selector, epoll_descriptor->io, descriptor, epoll_descriptor);
}

static
VALUE select_handle_events(VALUE _arguments)
{
struct select_arguments *arguments = (struct select_arguments *)_arguments;
struct IO_Event_Selector_EPoll *selector = arguments->selector;

for (int i = 0; i < arguments->count; i += 1) {
const struct epoll_event *event = &arguments->events[i];
if (DEBUG) fprintf(stderr, "-> fd=%d events=%d\n", event->data.fd, event->events);

if (event->data.fd >= 0) {
IO_Event_Selector_EPoll_handle(selector, event, &arguments->saved);
} else {
IO_Event_Interrupt_clear(&selector->interrupt);
}
}

return INT2NUM(arguments->count);
}

static
VALUE select_handle_events_ensure(VALUE _arguments)
{
struct select_arguments *arguments = (struct select_arguments *)_arguments;

IO_Event_List_free(&arguments->saved);

return Qnil;
}

// TODO This function is not re-entrant and we should document and assert as such.
VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) {
struct IO_Event_Selector_EPoll *selector = NULL;
Expand All @@ -939,6 +974,7 @@ VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) {
.tv_sec = 0,
.tv_nsec = 0
},
.saved = {},
};

arguments.timeout = &arguments.storage;
Expand All @@ -960,18 +996,11 @@ VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) {
}
}

for (int i = 0; i < arguments.count; i += 1) {
const struct epoll_event *event = &arguments.events[i];
if (DEBUG) fprintf(stderr, "-> ptr=%p events=%d\n", event->data.ptr, event->events);

if (event->data.fd >= 0) {
IO_Event_Selector_EPoll_handle(selector, event);
} else {
IO_Event_Interrupt_clear(&selector->interrupt);
}
if (arguments.count) {
return rb_ensure(select_handle_events, (VALUE)&arguments, select_handle_events_ensure, (VALUE)&arguments);
} else {
return RB_INT2NUM(0);
}

return INT2NUM(arguments.count);
}

VALUE IO_Event_Selector_EPoll_wakeup(VALUE self) {
Expand Down
73 changes: 49 additions & 24 deletions ext/io/event/selector/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,8 @@ struct select_arguments {

struct timespec storage;
struct timespec *timeout;

struct IO_Event_List saved;
};

static
Expand Down Expand Up @@ -862,7 +864,7 @@ void select_internal_with_gvl(struct select_arguments *arguments) {
}

static
int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, uintptr_t identifier, struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor)
int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, uintptr_t identifier, struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor, struct IO_Event_List *saved)
{
// This is the mask of all events that occured for the given descriptor:
enum IO_Event ready_events = kqueue_descriptor->ready_events;
Expand All @@ -877,7 +879,6 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u

struct IO_Event_List *list = &kqueue_descriptor->list;
struct IO_Event_List *node = list->tail;
struct IO_Event_List saved = {NULL, NULL};

// Reset the events back to 0 so that we can re-arm if necessary:
kqueue_descriptor->waiting_events = 0;
Expand All @@ -891,13 +892,13 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u
if (DEBUG) fprintf(stderr, "IO_Event_Selector_KQueue_handle: identifier=%lu, ready_events=%d, matching_events=%d\n", identifier, ready_events, matching_events);

if (matching_events) {
IO_Event_List_append(node, &saved);
IO_Event_List_append(node, saved);

waiting->ready = matching_events;
IO_Event_Selector_fiber_transfer(waiting->fiber, 0, NULL);

node = saved.tail;
IO_Event_List_pop(&saved);
node = saved->tail;
IO_Event_List_pop(saved);
} else {
kqueue_descriptor->waiting_events |= waiting->events;
node = node->tail;
Expand All @@ -907,6 +908,43 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u
return IO_Event_Selector_KQueue_Descriptor_update(selector, identifier, kqueue_descriptor);
}

static
VALUE select_handle_events(VALUE _arguments)
{
struct select_arguments *arguments = (struct select_arguments *)_arguments;
struct IO_Event_Selector_KQueue *selector = arguments->selector;

for (int i = 0; i < arguments->count; i += 1) {
if (arguments->events[i].udata) {
struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments->events[i].udata;
kqueue_descriptor->ready_events |= events_from_kevent_filter(arguments->events[i].filter);
}
}

for (int i = 0; i < arguments->count; i += 1) {
if (arguments->events[i].udata) {
struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments->events[i].udata;
IO_Event_Selector_KQueue_handle(selector, arguments->events[i].ident, kqueue_descriptor, &arguments->saved);
} else {
#ifdef IO_EVENT_SELECTOR_KQUEUE_USE_INTERRUPT
IO_Event_Interrupt_clear(&selector->interrupt);
#endif
}
}

return RB_INT2NUM(arguments->count);
}

static
VALUE select_handle_events_ensure(VALUE _arguments)
{
struct select_arguments *arguments = (struct select_arguments *)_arguments;

IO_Event_List_free(&arguments->saved);

return Qnil;
}

VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
struct IO_Event_Selector_KQueue *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, selector);
Expand All @@ -919,7 +957,8 @@ VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
.storage = {
.tv_sec = 0,
.tv_nsec = 0
}
},
.saved = {},
};

arguments.timeout = &arguments.storage;
Expand Down Expand Up @@ -951,25 +990,11 @@ VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
}
}

for (int i = 0; i < arguments.count; i += 1) {
if (arguments.events[i].udata) {
struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata;
kqueue_descriptor->ready_events |= events_from_kevent_filter(arguments.events[i].filter);
}
}

for (int i = 0; i < arguments.count; i += 1) {
if (arguments.events[i].udata) {
struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata;
IO_Event_Selector_KQueue_handle(selector, arguments.events[i].ident, kqueue_descriptor);
} else {
#ifdef IO_EVENT_SELECTOR_KQUEUE_USE_INTERRUPT
IO_Event_Interrupt_clear(&selector->interrupt);
#endif
}
if (arguments.count) {
return rb_ensure(select_handle_events, (VALUE)&arguments, select_handle_events_ensure, (VALUE)&arguments);
} else {
return RB_INT2NUM(0);
}

return RB_INT2NUM(arguments.count);
}

VALUE IO_Event_Selector_KQueue_wakeup(VALUE self) {
Expand Down
3 changes: 2 additions & 1 deletion ext/io/event/selector/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ inline static void IO_Event_List_initialize(struct IO_Event_List *list)
inline static void IO_Event_List_clear(struct IO_Event_List *list)
{
list->head = list->tail = NULL;
list->type = 0;
}

// Append an item to the end of the list.
Expand Down Expand Up @@ -64,7 +65,7 @@ inline static void IO_Event_List_pop(struct IO_Event_List *node)

inline static void IO_Event_List_free(struct IO_Event_List *node)
{
if (node->head != node->tail) {
if (node->head && node->tail) {
IO_Event_List_pop(node);
}
}
Expand Down

0 comments on commit be3b547

Please sign in to comment.