Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that the saved node is freed correctly if an exception occurs. #84

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 47 additions & 18 deletions ext/io/event/selector/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,11 @@ struct select_arguments {

int count;
struct epoll_event events[EPOLL_MAX_EVENTS];

struct timespec * timeout;
struct timespec storage;

struct IO_Event_List saved;
};

static int make_timeout_ms(struct timespec * timeout) {
Expand Down Expand Up @@ -883,7 +885,7 @@ void select_internal_with_gvl(struct select_arguments *arguments) {
}

static
int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, const struct epoll_event *event)
int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, const struct epoll_event *event, struct IO_Event_List *saved)
{
int descriptor = event->data.fd;

Expand All @@ -893,29 +895,32 @@ int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, con
struct IO_Event_Selector_EPoll_Descriptor *epoll_descriptor = IO_Event_Selector_EPoll_Descriptor_lookup(selector, descriptor);
struct IO_Event_List *list = &epoll_descriptor->list;
struct IO_Event_List *node = list->tail;
struct IO_Event_List saved = {NULL, NULL};

// Reset the events back to 0 so that we can re-arm if necessary:
epoll_descriptor->waiting_events = 0;

if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d epoll_descriptor=%p\n", descriptor, ready_events, epoll_descriptor);

// It's possible (but unlikely) that the address of list will changing during iteration.
while (node != list) {
if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: node=%p list=%p type=%p\n", node, list, node->type);

struct IO_Event_Selector_EPoll_Waiting *waiting = (struct IO_Event_Selector_EPoll_Waiting *)node;

// Compute the intersection of the events we are waiting for and the events that occured:
enum IO_Event matching_events = waiting->events & ready_events;

if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d, matching_events=%d\n", descriptor, ready_events, matching_events);
if (DEBUG) fprintf(stderr, "IO_Event_Selector_EPoll_handle: descriptor=%d, ready_events=%d, waiting_events=%d, matching_events=%d\n", descriptor, ready_events, waiting->events, matching_events);

if (matching_events) {
IO_Event_List_append(node, &saved);
IO_Event_List_append(node, saved);

// Resume the fiber:
waiting->ready = matching_events;
IO_Event_Selector_fiber_transfer(waiting->fiber, 0, NULL);

node = saved.tail;
IO_Event_List_pop(&saved);
node = saved->tail;
IO_Event_List_pop(saved);
} else {
// We are still waiting for the events:
epoll_descriptor->waiting_events |= waiting->events;
Expand All @@ -926,6 +931,36 @@ int IO_Event_Selector_EPoll_handle(struct IO_Event_Selector_EPoll *selector, con
return IO_Event_Selector_EPoll_Descriptor_update(selector, epoll_descriptor->io, descriptor, epoll_descriptor);
}

static
VALUE select_handle_events(VALUE _arguments)
{
struct select_arguments *arguments = (struct select_arguments *)_arguments;
struct IO_Event_Selector_EPoll *selector = arguments->selector;

for (int i = 0; i < arguments->count; i += 1) {
const struct epoll_event *event = &arguments->events[i];
if (DEBUG) fprintf(stderr, "-> fd=%d events=%d\n", event->data.fd, event->events);

if (event->data.fd >= 0) {
IO_Event_Selector_EPoll_handle(selector, event, &arguments->saved);
} else {
IO_Event_Interrupt_clear(&selector->interrupt);
}
}

return INT2NUM(arguments->count);
}

static
VALUE select_handle_events_ensure(VALUE _arguments)
{
struct select_arguments *arguments = (struct select_arguments *)_arguments;

IO_Event_List_free(&arguments->saved);

return Qnil;
}

// TODO This function is not re-entrant and we should document and assert as such.
VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) {
struct IO_Event_Selector_EPoll *selector = NULL;
Expand All @@ -939,6 +974,7 @@ VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) {
.tv_sec = 0,
.tv_nsec = 0
},
.saved = {},
};

arguments.timeout = &arguments.storage;
Expand All @@ -960,18 +996,11 @@ VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) {
}
}

for (int i = 0; i < arguments.count; i += 1) {
const struct epoll_event *event = &arguments.events[i];
if (DEBUG) fprintf(stderr, "-> ptr=%p events=%d\n", event->data.ptr, event->events);

if (event->data.fd >= 0) {
IO_Event_Selector_EPoll_handle(selector, event);
} else {
IO_Event_Interrupt_clear(&selector->interrupt);
}
if (arguments.count) {
return rb_ensure(select_handle_events, (VALUE)&arguments, select_handle_events_ensure, (VALUE)&arguments);
} else {
return RB_INT2NUM(0);
}

return INT2NUM(arguments.count);
}

VALUE IO_Event_Selector_EPoll_wakeup(VALUE self) {
Expand Down
73 changes: 49 additions & 24 deletions ext/io/event/selector/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,8 @@ struct select_arguments {

struct timespec storage;
struct timespec *timeout;

struct IO_Event_List saved;
};

static
Expand Down Expand Up @@ -862,7 +864,7 @@ void select_internal_with_gvl(struct select_arguments *arguments) {
}

static
int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, uintptr_t identifier, struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor)
int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, uintptr_t identifier, struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor, struct IO_Event_List *saved)
{
// This is the mask of all events that occured for the given descriptor:
enum IO_Event ready_events = kqueue_descriptor->ready_events;
Expand All @@ -877,7 +879,6 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u

struct IO_Event_List *list = &kqueue_descriptor->list;
struct IO_Event_List *node = list->tail;
struct IO_Event_List saved = {NULL, NULL};

// Reset the events back to 0 so that we can re-arm if necessary:
kqueue_descriptor->waiting_events = 0;
Expand All @@ -891,13 +892,13 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u
if (DEBUG) fprintf(stderr, "IO_Event_Selector_KQueue_handle: identifier=%lu, ready_events=%d, matching_events=%d\n", identifier, ready_events, matching_events);

if (matching_events) {
IO_Event_List_append(node, &saved);
IO_Event_List_append(node, saved);

waiting->ready = matching_events;
IO_Event_Selector_fiber_transfer(waiting->fiber, 0, NULL);

node = saved.tail;
IO_Event_List_pop(&saved);
node = saved->tail;
IO_Event_List_pop(saved);
} else {
kqueue_descriptor->waiting_events |= waiting->events;
node = node->tail;
Expand All @@ -907,6 +908,43 @@ int IO_Event_Selector_KQueue_handle(struct IO_Event_Selector_KQueue *selector, u
return IO_Event_Selector_KQueue_Descriptor_update(selector, identifier, kqueue_descriptor);
}

static
VALUE select_handle_events(VALUE _arguments)
{
struct select_arguments *arguments = (struct select_arguments *)_arguments;
struct IO_Event_Selector_KQueue *selector = arguments->selector;

for (int i = 0; i < arguments->count; i += 1) {
if (arguments->events[i].udata) {
struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments->events[i].udata;
kqueue_descriptor->ready_events |= events_from_kevent_filter(arguments->events[i].filter);
}
}

for (int i = 0; i < arguments->count; i += 1) {
if (arguments->events[i].udata) {
struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments->events[i].udata;
IO_Event_Selector_KQueue_handle(selector, arguments->events[i].ident, kqueue_descriptor, &arguments->saved);
} else {
#ifdef IO_EVENT_SELECTOR_KQUEUE_USE_INTERRUPT
IO_Event_Interrupt_clear(&selector->interrupt);
#endif
}
}

return RB_INT2NUM(arguments->count);
}

static
VALUE select_handle_events_ensure(VALUE _arguments)
{
struct select_arguments *arguments = (struct select_arguments *)_arguments;

IO_Event_List_free(&arguments->saved);

return Qnil;
}

VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
struct IO_Event_Selector_KQueue *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, selector);
Expand All @@ -919,7 +957,8 @@ VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
.storage = {
.tv_sec = 0,
.tv_nsec = 0
}
},
.saved = {},
};

arguments.timeout = &arguments.storage;
Expand Down Expand Up @@ -951,25 +990,11 @@ VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
}
}

for (int i = 0; i < arguments.count; i += 1) {
if (arguments.events[i].udata) {
struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata;
kqueue_descriptor->ready_events |= events_from_kevent_filter(arguments.events[i].filter);
}
}

for (int i = 0; i < arguments.count; i += 1) {
if (arguments.events[i].udata) {
struct IO_Event_Selector_KQueue_Descriptor *kqueue_descriptor = arguments.events[i].udata;
IO_Event_Selector_KQueue_handle(selector, arguments.events[i].ident, kqueue_descriptor);
} else {
#ifdef IO_EVENT_SELECTOR_KQUEUE_USE_INTERRUPT
IO_Event_Interrupt_clear(&selector->interrupt);
#endif
}
if (arguments.count) {
return rb_ensure(select_handle_events, (VALUE)&arguments, select_handle_events_ensure, (VALUE)&arguments);
} else {
return RB_INT2NUM(0);
}

return RB_INT2NUM(arguments.count);
}

VALUE IO_Event_Selector_KQueue_wakeup(VALUE self) {
Expand Down
3 changes: 2 additions & 1 deletion ext/io/event/selector/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ inline static void IO_Event_List_initialize(struct IO_Event_List *list)
inline static void IO_Event_List_clear(struct IO_Event_List *list)
{
list->head = list->tail = NULL;
list->type = 0;
}

// Append an item to the end of the list.
Expand Down Expand Up @@ -64,7 +65,7 @@ inline static void IO_Event_List_pop(struct IO_Event_List *node)

inline static void IO_Event_List_free(struct IO_Event_List *node)
{
if (node->head != node->tail) {
if (node->head && node->tail) {
IO_Event_List_pop(node);
}
}
Expand Down
Loading