Skip to content

Commit

Permalink
feat: add connect callback to the client
Browse files Browse the repository at this point in the history
  • Loading branch information
michaldziuba03 committed Aug 15, 2024
1 parent 8631db6 commit 64bb53d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 29 deletions.
25 changes: 16 additions & 9 deletions samples/tcp_echo.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ int main_old() {
return 0;
}

void on_response(pnd_tcp_t *stream) {
void on_server_response(pnd_tcp_t *stream) {
char buf[1024] = {};
ssize_t bytes = read(stream->fd, buf, 1024);
ssize_t bytes = read(stream->fd, buf, 1023);
if (bytes < 0) {
perror("read");
return;
Expand All @@ -82,23 +82,30 @@ void on_response(pnd_tcp_t *stream) {
return;
}

printf("read %ld bytes\n", bytes); fflush(stdout);
printf("Response from server: %s\n", buf);

pnd_tcp_close(stream);
}

void handle_connect(pnd_tcp_t *client, pnd_fd_t fd) {
printf("Connected to server. Client fd: %d\n", fd);

client->on_data = on_server_response;
client->on_close = handle_close;

pnd_write_t *write_op = malloc(sizeof(pnd_write_t));
pnd_tcp_write_init(write_op, strdup("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"), 35, handle_write);
pnd_tcp_write(client, write_op);
}

int main() {
printf("Starting client, pid is: %d\n", getpid());
pnd_io_t ctx;
pnd_io_init(&ctx);

pnd_tcp_t *client = malloc(sizeof(pnd_tcp_t));
pnd_tcp_init(&ctx, client);
client->on_data = on_response;
client->on_close = handle_close;
pnd_tcp_connect(client, "127.0.0.1", 3000);
pnd_write_t *write_op = malloc(sizeof(pnd_write_t));
pnd_tcp_write_init(write_op, strdup("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"), 36, handle_write);
pnd_tcp_write(client, write_op);
pnd_tcp_connect(client, "127.0.0.1", 3000, handle_connect);

pnd_io_run(&ctx);

Expand Down
2 changes: 1 addition & 1 deletion src/tcp_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@ void pnd_tcp_destroy(pnd_tcp_t *stream);
/* gracefully closes tcp stream */
void pnd_tcp_close(pnd_tcp_t *stream);

void pnd_tcp_connect(pnd_tcp_t *stream, const char *host, int port);
int pnd_tcp_connect(pnd_tcp_t *stream, const char *host, int port, void (*onconnect)(pnd_tcp_t*, int));
20 changes: 13 additions & 7 deletions src/unix/poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,39 +104,45 @@ void pnd_modify_event(pnd_event_t *event, int fd, uint32_t operation,
}
}

void pnd_add_event(pnd_event_t * event, pnd_fd_t fd)
void pnd_add_event_readable(pnd_event_t *event, pnd_fd_t fd)
{
printf("FD: %d\n", fd);
event->flags |= EPOLLIN;
event->ctx->handles++;
pnd_modify_event(event, fd, EPOLL_CTL_ADD, event->flags);
}

void pnd_start_reading(pnd_event_t * event, pnd_fd_t fd)
void pnd_add_event_writable(pnd_event_t *event, pnd_fd_t fd)
{
event->flags |= EPOLLOUT;
event->ctx->handles++;
pnd_modify_event(event, fd, EPOLL_CTL_ADD, event->flags);
}

void pnd_start_reading(pnd_event_t *event, pnd_fd_t fd)
{
event->flags |= EPOLLIN;
pnd_modify_event(event, fd, EPOLL_CTL_MOD, event->flags);
}

void pnd_stop_reading(pnd_event_t * event, pnd_fd_t fd)
void pnd_stop_reading(pnd_event_t *event, pnd_fd_t fd)
{
event->flags &= ~EPOLLIN;
pnd_modify_event(event, fd, EPOLL_CTL_MOD, event->flags);
}

void pnd_start_writing(pnd_event_t * event, pnd_fd_t fd)
void pnd_start_writing(pnd_event_t *event, pnd_fd_t fd)
{
event->flags |= EPOLLOUT;
pnd_modify_event(event, fd, EPOLL_CTL_MOD, event->flags);
}

void pnd_stop_writing(pnd_event_t * event, pnd_fd_t fd)
void pnd_stop_writing(pnd_event_t *event, pnd_fd_t fd)
{
event->flags &= ~EPOLLOUT;
pnd_modify_event(event, fd, EPOLL_CTL_MOD, event->flags);
}

void pnd_remove_event(pnd_event_t * event, pnd_fd_t fd)
void pnd_remove_event(pnd_event_t *event, pnd_fd_t fd)
{
event->ctx->handles--;
if (epoll_ctl(event->ctx->poll_handle, EPOLL_CTL_DEL, fd, NULL) == -1) {
Expand Down
4 changes: 3 additions & 1 deletion src/unix/poll.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ void pnd_init_event(pnd_event_t * event);

void pnd_modify_event(pnd_event_t * event, int fd, uint32_t operation, uint32_t flags);

void pnd_add_event(pnd_event_t * event, pnd_fd_t fd);
void pnd_add_event_readable(pnd_event_t *event, pnd_fd_t fd);

void pnd_add_event_writable(pnd_event_t *event, pnd_fd_t fd);

void pnd_start_reading(pnd_event_t * event, pnd_fd_t fd);

Expand Down
47 changes: 36 additions & 11 deletions src/unix/tcp_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ int pnd_tcp_listen(pnd_tcp_t *server, int port, void (*onconnect)(pnd_tcp_t*, in
server->state = PND_TCP_ACTIVE;
server->on_connect = onconnect;

pnd_add_event(&server->ev, lfd);
pnd_add_event_readable(&server->ev, lfd);

return 0;
}
Expand Down Expand Up @@ -260,7 +260,7 @@ void pnd_tcp_write_async(pnd_tcp_t *stream, pnd_write_t *write_op)
}

/* handler for I/O events from epoll/kqueue */
void pnd_tcp_client_io(struct pnd_event * event, unsigned events)
void pnd_tcp_client_io(struct pnd_event *event, unsigned events)
{
pnd_tcp_t *stream = container_of(event, pnd_tcp_t, ev);

Expand Down Expand Up @@ -291,7 +291,7 @@ void pnd_tcp_accept(pnd_tcp_t *peer, pnd_fd_t fd)
peer->fd = fd;
peer->state = PND_TCP_ACTIVE;
peer->ev.callback = pnd_tcp_client_io;
pnd_add_event(&peer->ev, fd);
pnd_add_event_readable(&peer->ev, fd);
}

void pnd_tcp_pause(pnd_tcp_t *stream)
Expand Down Expand Up @@ -341,34 +341,59 @@ void pnd_tcp_close(pnd_tcp_t *stream)
}
}

void pnd_tcp_connect(pnd_tcp_t *stream, const char *host, int port)
void pnd_tcp_connect_io(pnd_event_t *ev, unsigned events)
{
pnd_tcp_t *stream = container_of(ev, pnd_tcp_t, ev);

if (events & PND_CLOSE) {
pnd_tcp_destroy(stream);
return;
}

if (events & PND_WRITABLE) {
stream->state = PND_TCP_ACTIVE;
ev->callback = pnd_tcp_client_io;

// TODO: we should make single system call
pnd_stop_writing(ev, stream->fd);
pnd_start_reading(ev, stream->fd);

if (stream->on_connect != NULL)
stream->on_connect(stream, stream->fd);
}
}

int pnd_tcp_connect(pnd_tcp_t *stream, const char *host, int port, void (*onconnect)(pnd_tcp_t*, int))
{
pnd_fd_t fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
perror("socket");
return;
return -1;
}

if (pnd_set_nonblocking(fd) < 0) {
pnd_close_fd(fd);
return;
return -1;
}

struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(port);
address.sin_addr.s_addr = inet_addr(host);

stream->fd = fd;
stream->on_connect = onconnect;
stream->ev.callback = pnd_tcp_connect_io;

if (connect(fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
if (errno != EINPROGRESS) {
perror("connect");
pnd_close_fd(fd);
return;
// TODO: make a on_connect callback with error status?
return -1;
}
}

stream->fd = fd;
stream->state = PND_TCP_ACTIVE;
stream->ev.callback = pnd_tcp_client_io;
pnd_add_event(&stream->ev, fd);
pnd_add_event_writable(&stream->ev, fd);
return 0;
}

0 comments on commit 64bb53d

Please sign in to comment.