Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
WuJin committed Jul 17, 2024
1 parent 257b55f commit 123e9e4
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 80 deletions.
8 changes: 4 additions & 4 deletions src/net/cdk-net.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ static void _poller_manager_create(int parallel) {
if (parallel <= 0) {
abort();
}
cdk_timer_create();
cdk_list_init(&manager.poller_lst);
mtx_init(&manager.poller_mtx, mtx_plain);
cnd_init(&manager.poller_cnd);
Expand Down Expand Up @@ -356,8 +355,8 @@ void cdk_net_postevent(cdk_poller_t* poller, void (*cb)(void*), void* arg, bool
}
mtx_unlock(&poller->evmtx);

int hardcode = 1;
platform_socket_sendall(poller->evfds[0], &hardcode, sizeof(int));
bool wakeup = true;
platform_socket_send(poller->evfds[0], &wakeup, sizeof(bool));
}
}

Expand All @@ -374,13 +373,14 @@ void cdk_net_startup(cdk_conf_t* conf) {
if (atomic_flag_test_and_set(&manager.initialized)) {
return;
}
cdk_timer_create();
tls_ctx = tls_ctx_create(&conf->tls);
_poller_manager_create(conf->nthrds);
}

void cdk_net_cleanup(void) {
tls_ctx_destroy(tls_ctx);
_poller_manager_destroy();
tls_ctx_destroy(tls_ctx);
}

void cdk_net_startup2(void) {
Expand Down
26 changes: 21 additions & 5 deletions src/net/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,27 +531,43 @@ void channel_connect(cdk_channel_t* channel) {
}

void channel_enable_write(cdk_channel_t* channel) {
platform_event_add(channel->poller->pfd, channel->fd, EVENT_TYPE_W, channel);
if (channel->events) {
platform_event_mod(channel->poller->pfd, channel->fd, channel->events | EVENT_TYPE_W, channel);
} else {
platform_event_add(channel->poller->pfd, channel->fd, channel->events | EVENT_TYPE_W, channel);
}
channel->events |= EVENT_TYPE_W;
}

void channel_enable_read(cdk_channel_t* channel) {
platform_event_add(channel->poller->pfd, channel->fd, EVENT_TYPE_R, channel);
if (channel->events) {
platform_event_mod(channel->poller->pfd, channel->fd, channel->events | EVENT_TYPE_R, channel);
} else {
platform_event_add(channel->poller->pfd, channel->fd, channel->events | EVENT_TYPE_R, channel);
}
channel->events |= EVENT_TYPE_R;
}

void channel_disable_write(cdk_channel_t* channel) {
platform_event_del(channel->poller->pfd, channel->fd, EVENT_TYPE_W, channel);
if (channel->events) {
platform_event_mod(channel->poller->pfd, channel->fd, channel->events & ~EVENT_TYPE_W, channel);
} else {
platform_event_del(channel->poller->pfd, channel->fd);
}
channel->events &= ~EVENT_TYPE_W;
}

void channel_disable_read(cdk_channel_t* channel) {
platform_event_del(channel->poller->pfd, channel->fd, EVENT_TYPE_R, channel);
if (channel->events) {
platform_event_mod(channel->poller->pfd, channel->fd, channel->events & ~EVENT_TYPE_R, channel);
} else {
platform_event_del(channel->poller->pfd, channel->fd);
}
channel->events &= ~EVENT_TYPE_R;
}

void channel_disable_all(cdk_channel_t* channel) {
platform_event_del(channel->poller->pfd, channel->fd, channel->events, channel);
platform_event_del(channel->poller->pfd, channel->fd);
channel->events = 0;
}

Expand Down
49 changes: 20 additions & 29 deletions src/net/poller.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,42 @@
#include "net/channel.h"
#include "net/txlist.h"

static inline void _eventfd_read(cdk_channel_t* channel, void* buf, size_t len) {
mtx_lock(&channel->poller->evmtx);
static inline void _eventfd_read(cdk_poller_t* poller) {
bool wakeup;
platform_socket_recv(poller->evfds[1], (char*)(&wakeup), sizeof(bool));

mtx_lock(&poller->evmtx);
cdk_event_t* e = NULL;
if (!cdk_list_empty(&channel->poller->evlist)) {
e = cdk_list_data(cdk_list_head(&channel->poller->evlist), cdk_event_t, node);
if (!cdk_list_empty(&poller->evlist)) {
e = cdk_list_data(cdk_list_head(&poller->evlist), cdk_event_t, node);
cdk_list_remove(&e->node);
}
mtx_unlock(&channel->poller->evmtx);
mtx_unlock(&poller->evmtx);
if (e) {
e->cb(e->arg);
free(e);
e = NULL;
}
}

static cdk_unpack_t eventfd_unpacker = {
.type = UNPACK_TYPE_FIXEDLEN,
.fixedlen.len = sizeof(int)
};

static cdk_handler_t eventfd_handler = {
.tcp.on_read = _eventfd_read,
.tcp.unpacker = &eventfd_unpacker
};

void poller_poll(cdk_poller_t* poller) {
cdk_pollevent_t* events = malloc(sizeof(cdk_pollevent_t) * MAX_PROCESS_EVENTS);
if (events) {
while (poller->active) {
int nevents = platform_event_wait(poller->pfd, events);
for (int i = 0; i < nevents; i++) {
cdk_channel_t* channel = events[i].ptr;
uint32_t mask = events[i].events;
if (*((int*)events[i].ptr) == poller->evfds[1]) {
_eventfd_read(poller);
} else {
cdk_channel_t* channel = events[i].ptr;
uint32_t mask = events[i].events;

if (mask & EVENT_TYPE_R) {
(channel->type == SOCK_STREAM) ? ((channel->tcp.accepting) ? channel_accept(channel) : channel_recv(channel)) : channel_recv(channel);
}
if (mask & EVENT_TYPE_W) {
(channel->type == SOCK_STREAM) ? ((channel->tcp.connecting) ? channel_connect(channel) : channel_send(channel)) : channel_send(channel);
if (mask & EVENT_TYPE_R) {
(channel->type == SOCK_STREAM) ? ((channel->tcp.accepting) ? channel_accept(channel) : channel_recv(channel)) : channel_recv(channel);
}
if (mask & EVENT_TYPE_W) {
(channel->type == SOCK_STREAM) ? ((channel->tcp.connecting) ? channel_connect(channel) : channel_send(channel)) : channel_send(channel);
}
}
}
}
Expand All @@ -85,13 +82,7 @@ cdk_poller_t* poller_create(void) {
mtx_init(&poller->evmtx, mtx_plain);
platform_socket_socketpair(AF_INET, SOCK_STREAM, 0, poller->evfds);
platform_socket_nonblock(poller->evfds[1]);

cdk_channel_t* wakeup = channel_create(poller, poller->evfds[1], &eventfd_handler);
if (wakeup) {
if (!channel_is_reading(wakeup)) {
channel_enable_read(wakeup);
}
}
platform_event_add(poller->pfd, poller->evfds[1], EVENT_TYPE_R, &poller->evfds[1]);
}
return poller;
}
Expand Down
5 changes: 3 additions & 2 deletions src/platform/platform-event.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ typedef struct cdk_pollevent_s {
void* ptr;
}cdk_pollevent_t;

extern void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud);
extern void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud);
extern void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud);
extern void platform_event_mod(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud);
extern void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd);
extern int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events);
49 changes: 27 additions & 22 deletions src/platform/unix/platform-event.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,32 @@

#if defined(__linux__)

void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) {
void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) {
struct epoll_event ee;
memset(&ee, 0, sizeof(struct epoll_event));

int op = ud->events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
events |= ud->events;
if (events & EVENT_TYPE_R) {
ee.events |= EPOLLIN;
}
if (events & EVENT_TYPE_W) {
ee.events |= EPOLLOUT;
}
ee.data.ptr = ud;

epoll_ctl(pfd, op, sfd, (struct epoll_event*)&ee);
epoll_ctl(pfd, EPOLL_CTL_ADD, sfd, (struct epoll_event*)&ee);
}

void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) {
void platform_event_mod(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) {
struct epoll_event ee;
memset(&ee, 0, sizeof(struct epoll_event));

int mask = ud->events & (~events);
int op = mask == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;

if (mask & EVENT_TYPE_R) {
if (events & EVENT_TYPE_R) {
ee.events |= EPOLLIN;
}
if (mask & EVENT_TYPE_W) {
if (events & EVENT_TYPE_W) {
ee.events |= EPOLLOUT;
}
ee.data.ptr = ud;
epoll_ctl(pfd, EPOLL_CTL_MOD, sfd, (struct epoll_event*)&ee);
}

epoll_ctl(pfd, op, sfd, &ee);
void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd) {
epoll_ctl(pfd, EPOLL_CTL_DEL, sfd, NULL);
}

int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events) {
Expand Down Expand Up @@ -83,9 +76,8 @@ int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events) {
#endif

#if defined(__APPLE__)
void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) {
void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) {
struct kevent ke;

if (events & EVENT_TYPE_R) {
EV_SET(&ke, sfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ud);
kevent(pfd, &ke, 1, NULL, 0, NULL);
Expand All @@ -100,19 +92,32 @@ void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channe
}
}

void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) {
(void)ud;
void platform_event_mod(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) {
struct kevent ke;
if (events & EVENT_TYPE_R) {
EV_SET(&ke, sfd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&ke, sfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ud);
kevent(pfd, &ke, 1, NULL, 0, NULL);
EV_SET(&ke, sfd, EVFILT_WRITE, EV_ADD | EV_DISABLE, 0, 0, ud);
kevent(pfd, &ke, 1, NULL, 0, NULL);
}
if (events & EVENT_TYPE_W) {
EV_SET(&ke, sfd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
EV_SET(&ke, sfd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ud);
kevent(pfd, &ke, 1, NULL, 0, NULL);
EV_SET(&ke, sfd, EVFILT_READ, EV_ADD | EV_DISABLE, 0, 0, ud);
kevent(pfd, &ke, 1, NULL, 0, NULL);
}
}

void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd) {
struct kevent ke;

EV_SET(&ke, sfd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(pfd, &ke, 1, NULL, 0, NULL);

EV_SET(&ke, sfd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(pfd, &ke, 1, NULL, 0, NULL);
}

int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events) {
struct kevent __events[MAX_PROCESS_EVENTS];

Expand Down
29 changes: 11 additions & 18 deletions src/platform/win/platform-event.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,32 @@
#include "platform/platform-event.h"
#include "wepoll/wepoll.h"

void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) {
struct epoll_event ee;
memset(&ee, 0, sizeof(struct epoll_event));

int op = ud->events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
events |= ud->events;
void platform_event_add(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) {
struct epoll_event ee = {0};
if (events & EVENT_TYPE_R) {
ee.events |= EPOLLIN;
}
if (events & EVENT_TYPE_W) {
ee.events |= EPOLLOUT;
}
ee.data.ptr = ud;

epoll_ctl(pfd, op, sfd, (struct epoll_event*)&ee);
epoll_ctl(pfd, EPOLL_CTL_ADD, sfd, (struct epoll_event*)&ee);
}

void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, cdk_channel_t* ud) {
struct epoll_event ee;
memset(&ee, 0, sizeof(struct epoll_event));

int mask = ud->events & (~events);
int op = mask == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;

if (mask & EVENT_TYPE_R) {
void platform_event_mod(cdk_pollfd_t pfd, cdk_sock_t sfd, int events, void* ud) {
struct epoll_event ee = { 0 };
if (events & EVENT_TYPE_R) {
ee.events |= EPOLLIN;
}
if (mask & EVENT_TYPE_W) {
if (events & EVENT_TYPE_W) {
ee.events |= EPOLLOUT;
}
ee.data.ptr = ud;
epoll_ctl(pfd, EPOLL_CTL_MOD, sfd, (struct epoll_event*)&ee);
}

epoll_ctl(pfd, op, sfd, &ee);
void platform_event_del(cdk_pollfd_t pfd, cdk_sock_t sfd) {
epoll_ctl(pfd, EPOLL_CTL_DEL, sfd, NULL);
}

int platform_event_wait(cdk_pollfd_t pfd, cdk_pollevent_t* events) {
Expand Down

0 comments on commit 123e9e4

Please sign in to comment.