From 123e9e40390f5ef8158cbc5c11bba0e3c32f7f71 Mon Sep 17 00:00:00 2001 From: WuJin Date: Wed, 17 Jul 2024 15:08:06 +0800 Subject: [PATCH] update --- src/net/cdk-net.c | 8 ++--- src/net/channel.c | 26 +++++++++++++--- src/net/poller.c | 49 ++++++++++++------------------ src/platform/platform-event.h | 5 +-- src/platform/unix/platform-event.c | 49 ++++++++++++++++-------------- src/platform/win/platform-event.c | 29 +++++++----------- 6 files changed, 86 insertions(+), 80 deletions(-) diff --git a/src/net/cdk-net.c b/src/net/cdk-net.c index 7d35729..45dacc0 100644 --- a/src/net/cdk-net.c +++ b/src/net/cdk-net.c @@ -106,7 +106,6 @@ static void _poller_manager_create(int parallel) { if (parallel <= 0) { abort(); } - cdk_timer_create(); cdk_list_init(&manager.poller_lst); mtx_init(&manager.poller_mtx, mtx_plain); cnd_init(&manager.poller_cnd); @@ -356,8 +355,8 @@ void cdk_net_postevent(cdk_poller_t* poller, void (*cb)(void*), void* arg, bool } mtx_unlock(&poller->evmtx); - int hardcode = 1; - platform_socket_sendall(poller->evfds[0], &hardcode, sizeof(int)); + bool wakeup = true; + platform_socket_send(poller->evfds[0], &wakeup, sizeof(bool)); } } @@ -374,13 +373,14 @@ void cdk_net_startup(cdk_conf_t* conf) { if (atomic_flag_test_and_set(&manager.initialized)) { return; } + cdk_timer_create(); tls_ctx = tls_ctx_create(&conf->tls); _poller_manager_create(conf->nthrds); } void cdk_net_cleanup(void) { - tls_ctx_destroy(tls_ctx); _poller_manager_destroy(); + tls_ctx_destroy(tls_ctx); } void cdk_net_startup2(void) { diff --git a/src/net/channel.c b/src/net/channel.c index 4dd0dbe..d4daaa8 100644 --- a/src/net/channel.c +++ b/src/net/channel.c @@ -531,27 +531,43 @@ void channel_connect(cdk_channel_t* channel) { } void channel_enable_write(cdk_channel_t* channel) { - platform_event_add(channel->poller->pfd, channel->fd, EVENT_TYPE_W, channel); + if (channel->events) { + platform_event_mod(channel->poller->pfd, channel->fd, channel->events | EVENT_TYPE_W, channel); + } else { + platform_event_add(channel->poller->pfd, channel->fd, channel->events | EVENT_TYPE_W, channel); + } channel->events |= EVENT_TYPE_W; } void channel_enable_read(cdk_channel_t* channel) { - platform_event_add(channel->poller->pfd, channel->fd, EVENT_TYPE_R, channel); + if (channel->events) { + platform_event_mod(channel->poller->pfd, channel->fd, channel->events | EVENT_TYPE_R, channel); + } else { + platform_event_add(channel->poller->pfd, channel->fd, channel->events | EVENT_TYPE_R, channel); + } channel->events |= EVENT_TYPE_R; } void channel_disable_write(cdk_channel_t* channel) { - platform_event_del(channel->poller->pfd, channel->fd, EVENT_TYPE_W, channel); + if (channel->events) { + platform_event_mod(channel->poller->pfd, channel->fd, channel->events & ~EVENT_TYPE_W, channel); + } else { + platform_event_del(channel->poller->pfd, channel->fd); + } channel->events &= ~EVENT_TYPE_W; } void channel_disable_read(cdk_channel_t* channel) { - platform_event_del(channel->poller->pfd, channel->fd, EVENT_TYPE_R, channel); + if (channel->events) { + platform_event_mod(channel->poller->pfd, channel->fd, channel->events & ~EVENT_TYPE_R, channel); + } else { + platform_event_del(channel->poller->pfd, channel->fd); + } channel->events &= ~EVENT_TYPE_R; } void channel_disable_all(cdk_channel_t* channel) { - platform_event_del(channel->poller->pfd, channel->fd, channel->events, channel); + platform_event_del(channel->poller->pfd, channel->fd); channel->events = 0; } diff --git a/src/net/poller.c b/src/net/poller.c index 155ddf5..e5f7326 100644 --- a/src/net/poller.c +++ b/src/net/poller.c @@ -25,14 +25,17 @@ #include "net/channel.h" #include "net/txlist.h" -static inline void _eventfd_read(cdk_channel_t* channel, void* buf, size_t len) { - mtx_lock(&channel->poller->evmtx); +static inline void _eventfd_read(cdk_poller_t* poller) { + bool wakeup; + platform_socket_recv(poller->evfds[1], (char*)(&wakeup), sizeof(bool)); + + mtx_lock(&poller->evmtx); cdk_event_t* e = NULL; - if (!cdk_list_empty(&channel->poller->evlist)) { - e = cdk_list_data(cdk_list_head(&channel->poller->evlist), cdk_event_t, node); + if (!cdk_list_empty(&poller->evlist)) { + e = cdk_list_data(cdk_list_head(&poller->evlist), cdk_event_t, node); cdk_list_remove(&e->node); } - mtx_unlock(&channel->poller->evmtx); + mtx_unlock(&poller->evmtx); if (e) { e->cb(e->arg); free(e); @@ -40,30 +43,24 @@ static inline void _eventfd_read(cdk_channel_t* channel, void* buf, size_t len) } } -static cdk_unpack_t eventfd_unpacker = { - .type = UNPACK_TYPE_FIXEDLEN, - .fixedlen.len = sizeof(int) -}; - -static cdk_handler_t eventfd_handler = { - .tcp.on_read = _eventfd_read, - .tcp.unpacker = &eventfd_unpacker -}; - void poller_poll(cdk_poller_t* poller) { cdk_pollevent_t* events = malloc(sizeof(cdk_pollevent_t) * MAX_PROCESS_EVENTS); if (events) { while (poller->active) { int nevents = platform_event_wait(poller->pfd, events); for (int i = 0; i < nevents; i++) { - cdk_channel_t* channel = events[i].ptr; - uint32_t mask = events[i].events; + if (*((int*)events[i].ptr) == poller->evfds[1]) { + _eventfd_read(poller); + } else { + cdk_channel_t* channel = events[i].ptr; + uint32_t mask = events[i].events; - if (mask & EVENT_TYPE_R) { - (channel->type == SOCK_STREAM) ? ((channel->tcp.accepting) ? channel_accept(channel) : channel_recv(channel)) : channel_recv(channel); - } - if (mask & EVENT_TYPE_W) { - (channel->type == SOCK_STREAM) ? ((channel->tcp.connecting) ? channel_connect(channel) : channel_send(channel)) : channel_send(channel); + if (mask & EVENT_TYPE_R) { + (channel->type == SOCK_STREAM) ? ((channel->tcp.accepting) ? channel_accept(channel) : channel_recv(channel)) : channel_recv(channel); + } + if (mask & EVENT_TYPE_W) { + (channel->type == SOCK_STREAM) ? ((channel->tcp.connecting) ? channel_connect(channel) : channel_send(channel)) : channel_send(channel); + } } } } @@ -85,13 +82,7 @@ cdk_poller_t* poller_create(void) { mtx_init(&poller->evmtx, mtx_plain); platform_socket_socketpair(AF_INET, SOCK_STREAM, 0, poller->evfds); platform_socket_nonblock(poller->evfds[1]); - - cdk_channel_t* wakeup = channel_create(poller, poller->evfds[1], &eventfd_handler); - if (wakeup) { - if (!channel_is_reading(wakeup)) { - channel_enable_read(wakeup); - } - } + platform_event_add(poller->pfd, poller->evfds[1], EVENT_TYPE_R, &poller->evfds[1]); } return poller; } diff --git a/src/platform/platform-event.h b/src/platform/platform-event.h index f4b721c..d6f9a75 100644 --- a/src/platform/platform-event.h +++ b/src/platform/platform-event.h @@ -30,6 +30,7 @@ typedef struct cdk_pollevent_s { void* ptr; }cdk_pollevent_t; -extern void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud); -extern void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud); +extern void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud); +extern void platform_event_mod(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud); +extern void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd); extern int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events); \ No newline at end of file diff --git a/src/platform/unix/platform-event.c b/src/platform/unix/platform-event.c index e40d9e7..5661ff2 100644 --- a/src/platform/unix/platform-event.c +++ b/src/platform/unix/platform-event.c @@ -23,12 +23,8 @@ #if defined(__linux__) -void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) { +void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) { struct epoll_event ee; - memset(&ee, 0, sizeof(struct epoll_event)); - - int op = ud->events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - events |= ud->events; if (events & EVENT_TYPE_R) { ee.events |= EPOLLIN; } @@ -36,26 +32,23 @@ void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channe ee.events |= EPOLLOUT; } ee.data.ptr = ud; - - epoll_ctl(pfd, op, sfd, (struct epoll_event*)&ee); + epoll_ctl(pfd, EPOLL_CTL_ADD, sfd, (struct epoll_event*)&ee); } -void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) { +void platform_event_mod(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) { struct epoll_event ee; - memset(&ee, 0, sizeof(struct epoll_event)); - - int mask = ud->events & (~events); - int op = mask == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; - - if (mask & EVENT_TYPE_R) { + if (events & EVENT_TYPE_R) { ee.events |= EPOLLIN; } - if (mask & EVENT_TYPE_W) { + if (events & EVENT_TYPE_W) { ee.events |= EPOLLOUT; } ee.data.ptr = ud; + epoll_ctl(pfd, EPOLL_CTL_MOD, sfd, (struct epoll_event*)&ee); +} - epoll_ctl(pfd, op, sfd, &ee); +void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd) { + epoll_ctl(pfd, EPOLL_CTL_DEL, sfd, NULL); } int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events) { @@ -83,9 +76,8 @@ int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events) { #endif #if defined(__APPLE__) -void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) { +void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) { struct kevent ke; - if (events & EVENT_TYPE_R) { EV_SET(&ke, sfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ud); kevent(pfd, &ke, 1, NULL, 0, NULL); @@ -100,19 +92,32 @@ void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channe } } -void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) { - (void)ud; +void platform_event_mod(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) { struct kevent ke; if (events & EVENT_TYPE_R) { - EV_SET(&ke, sfd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&ke, sfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ud); + kevent(pfd, &ke, 1, NULL, 0, NULL); + EV_SET(&ke, sfd, EVFILT_WRITE, EV_ADD | EV_DISABLE, 0, 0, ud); kevent(pfd, &ke, 1, NULL, 0, NULL); } if (events & EVENT_TYPE_W) { - EV_SET(&ke, sfd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + EV_SET(&ke, sfd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ud); + kevent(pfd, &ke, 1, NULL, 0, NULL); + EV_SET(&ke, sfd, EVFILT_READ, EV_ADD | EV_DISABLE, 0, 0, ud); kevent(pfd, &ke, 1, NULL, 0, NULL); } } +void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd) { + struct kevent ke; + + EV_SET(&ke, sfd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + kevent(pfd, &ke, 1, NULL, 0, NULL); + + EV_SET(&ke, sfd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + kevent(pfd, &ke, 1, NULL, 0, NULL); +} + int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events) { struct kevent __events[MAX_PROCESS_EVENTS]; diff --git a/src/platform/win/platform-event.c b/src/platform/win/platform-event.c index 861eed8..823d9a0 100644 --- a/src/platform/win/platform-event.c +++ b/src/platform/win/platform-event.c @@ -22,12 +22,8 @@ #include "platform/platform-event.h" #include "wepoll/wepoll.h" -void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) { - struct epoll_event ee; - memset(&ee, 0, sizeof(struct epoll_event)); - - int op = ud->events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - events |= ud->events; +void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) { + struct epoll_event ee = {0}; if (events & EVENT_TYPE_R) { ee.events |= EPOLLIN; } @@ -35,26 +31,23 @@ void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channe ee.events |= EPOLLOUT; } ee.data.ptr = ud; - - epoll_ctl(pfd, op, sfd, (struct epoll_event*)&ee); + epoll_ctl(pfd, EPOLL_CTL_ADD, sfd, (struct epoll_event*)&ee); } -void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) { - struct epoll_event ee; - memset(&ee, 0, sizeof(struct epoll_event)); - - int mask = ud->events & (~events); - int op = mask == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; - - if (mask & EVENT_TYPE_R) { +void platform_event_mod(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) { + struct epoll_event ee = { 0 }; + if (events & EVENT_TYPE_R) { ee.events |= EPOLLIN; } - if (mask & EVENT_TYPE_W) { + if (events & EVENT_TYPE_W) { ee.events |= EPOLLOUT; } ee.data.ptr = ud; + epoll_ctl(pfd, EPOLL_CTL_MOD, sfd, (struct epoll_event*)&ee); +} - epoll_ctl(pfd, op, sfd, &ee); +void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd) { + epoll_ctl(pfd, EPOLL_CTL_DEL, sfd, NULL); } int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events) {