diff --git a/include/fluent-bit/flb_io.h b/include/fluent-bit/flb_io.h index 559bc95d7b2..7c553a5e2ed 100644 --- a/include/fluent-bit/flb_io.h +++ b/include/fluent-bit/flb_io.h @@ -48,5 +48,7 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn, int flb_io_net_write(struct flb_upstream_conn *u, const void *data, size_t len, size_t *out_len); ssize_t flb_io_net_read(struct flb_upstream_conn *u, void *buf, size_t len); +int flb_io_fd_write(int fd, const void *data, size_t len, size_t *out_len); +ssize_t flb_io_fd_read(int fd, void *buf, size_t len); #endif diff --git a/src/flb_io.c b/src/flb_io.c index 0905843f2b3..1e0872f5f2b 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -118,12 +118,11 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn, return 0; } +static int fd_io_write(int fd, const void *data, size_t len, size_t *out_len); static int net_io_write(struct flb_upstream_conn *u_conn, const void *data, size_t len, size_t *out_len) { int ret; - int tries = 0; - size_t total = 0; struct flb_coro *coro; if (u_conn->fd <= 0) { @@ -134,8 +133,17 @@ static int net_io_write(struct flb_upstream_conn *u_conn, } } + return fd_io_write(u_conn->fd, data, len, out_len); +} + +static int fd_io_write(int fd, const void *data, size_t len, size_t *out_len) +{ + int ret; + int tries = 0; + size_t total = 0; + while (total < len) { - ret = send(u_conn->fd, (char *) data + total, len - total, 0); + ret = send(fd, (char *) data + total, len - total, 0); if (ret == -1) { if (FLB_WOULDBLOCK()) { /* @@ -305,12 +313,18 @@ static FLB_INLINE int net_io_write_async(struct flb_coro *co, return bytes; } +static ssize_t fd_io_read(int fd, void *buf, size_t len); static ssize_t net_io_read(struct flb_upstream_conn *u_conn, void *buf, size_t len) +{ + return fd_io_read(u_conn->fd, buf, len); +} + +static ssize_t fd_io_read(int fd, void *buf, size_t len) { int ret; - ret = recv(u_conn->fd, buf, len, 0); + ret = recv(fd, buf, len, 0); if (ret == -1) { ret = FLB_WOULDBLOCK(); if (ret) { @@ -368,6 +382,13 @@ static FLB_INLINE ssize_t net_io_read_async(struct flb_coro *co, return ret; } +/* Write data to fd. For unix socket. */ +int flb_io_fd_write(int fd, const void *data, size_t len, size_t *out_len) +{ + /* TODO: support async mode */ + return fd_io_write(fd, data, len, out_len); +} + /* Write data to an upstream connection/server */ int flb_io_net_write(struct flb_upstream_conn *u_conn, const void *data, size_t len, size_t *out_len) @@ -408,6 +429,12 @@ int flb_io_net_write(struct flb_upstream_conn *u_conn, const void *data, return ret; } +ssize_t flb_io_fd_read(int fd, void *buf, size_t len) +{ + /* TODO: support async mode */ + return fd_io_read(fd, buf, len); +} + ssize_t flb_io_net_read(struct flb_upstream_conn *u_conn, void *buf, size_t len) { int ret = -1;