From 3dbae80df658859a84b0f4142d3f9e7a63f54fe4 Mon Sep 17 00:00:00 2001 From: MARiA so cute <33935209+NathanFreeman@users.noreply.github.com> Date: Wed, 30 Oct 2024 13:35:57 +0800 Subject: [PATCH] Refactor iouring --- .github/workflows/iouring.yml | 2 +- ext-src/php_swoole.cc | 9 +- include/swoole.h | 8 +- include/swoole_async.h | 106 -------------------- include/swoole_coroutine.h | 16 --- include/swoole_coroutine_system.h | 11 -- include/swoole_iouring.h | 139 ++++++++++++++++++++++++++ src/coroutine/iouring.cc | 33 +++--- src/coroutine/system.cc | 116 +-------------------- src/os/iouring.cc | 161 +++++++++++++++++++++--------- 10 files changed, 278 insertions(+), 323 deletions(-) create mode 100644 include/swoole_iouring.h diff --git a/.github/workflows/iouring.yml b/.github/workflows/iouring.yml index 6fd5355bfbb..047bfdfca05 100644 --- a/.github/workflows/iouring.yml +++ b/.github/workflows/iouring.yml @@ -5,7 +5,7 @@ on: [push, pull_request] jobs: test-linux: if: "!contains(github.event.head_commit.message, '--filter=') || contains(github.event.head_commit.message, '[iouring]')" - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 strategy: fail-fast: false matrix: diff --git a/ext-src/php_swoole.cc b/ext-src/php_swoole.cc index 1c40d6b0855..bde895fbf39 100644 --- a/ext-src/php_swoole.cc +++ b/ext-src/php_swoole.cc @@ -17,6 +17,7 @@ #include "php_swoole_library.h" #include "php_swoole_process.h" #include "php_swoole_thread.h" +#include "swoole_iouring.h" BEGIN_EXTERN_C() #include "zend_exceptions.h" @@ -61,7 +62,7 @@ END_EXTERN_C() using swoole::Server; using swoole::network::Socket; #ifdef SW_USE_IOURING -using swoole::AsyncIouring; +using swoole::Iouring; #endif ZEND_DECLARE_MODULE_GLOBALS(swoole) @@ -723,8 +724,8 @@ PHP_MINIT_FUNCTION(swoole) { * iouring */ #ifdef SW_USE_IOURING - SW_REGISTER_LONG_CONSTANT("SWOOLE_IOURING_DEFAULT", AsyncIouring::SW_IOURING_DEFAULT); - SW_REGISTER_LONG_CONSTANT("SWOOLE_IOURING_SQPOLL", AsyncIouring::SW_IOURING_SQPOLL); + SW_REGISTER_LONG_CONSTANT("SWOOLE_IOURING_DEFAULT", Iouring::SW_IOURING_DEFAULT); + SW_REGISTER_LONG_CONSTANT("SWOOLE_IOURING_SQPOLL", Iouring::SW_IOURING_SQPOLL); #endif // clang-format on @@ -1067,7 +1068,7 @@ PHP_RINIT_FUNCTION(swoole) { * This would cause php_swoole_load_library function not to execute correctly, so it must be replaced * with the execute_ex function. */ - void (*old_zend_execute_ex)(zend_execute_data * execute_data) = nullptr; + void (*old_zend_execute_ex)(zend_execute_data *execute_data) = nullptr; if (UNEXPECTED(zend_execute_ex != execute_ex)) { old_zend_execute_ex = zend_execute_ex; zend_execute_ex = execute_ex; diff --git a/include/swoole.h b/include/swoole.h index 0ea4147d014..4f19d095cff 100644 --- a/include/swoole.h +++ b/include/swoole.h @@ -62,10 +62,6 @@ #include #include -#ifdef SW_USE_IOURING -#include -#endif - typedef unsigned long ulong_t; #ifndef PRId64 @@ -221,7 +217,7 @@ struct Address; } // namespace network class AsyncThreads; #ifdef SW_USE_IOURING -class AsyncIouring; +class Iouring; #endif namespace async { class ThreadPool; @@ -701,7 +697,7 @@ struct ThreadGlobal { MessageBus *message_bus; AsyncThreads *async_threads; #ifdef SW_USE_IOURING - AsyncIouring *async_iouring; + Iouring *iouring; #endif uint32_t signal_listener_num; uint32_t co_signal_listener_num; diff --git a/include/swoole_async.h b/include/swoole_async.h index cf4d89d9049..38633407168 100644 --- a/include/swoole_async.h +++ b/include/swoole_async.h @@ -24,10 +24,6 @@ #include #include -#ifdef SW_USE_IOURING -#include -#endif - #ifndef O_DIRECT #define O_DIRECT 040000 #endif @@ -45,32 +41,16 @@ struct AsyncRequest { struct AsyncEvent { size_t task_id; -#ifdef SW_USE_IOURING - size_t count; -#endif uint8_t canceled; int error; /** * input & output */ std::shared_ptr data; -#ifdef SW_USE_IOURING - const char *pathname; - const char *pathname2; - struct statx *statxbuf; - void *rbuf; - const void *wbuf; -#endif /** * output */ ssize_t retval; -#ifdef SW_USE_IOURING - int fd; - int flags; - int opcode; - mode_t mode; -#endif /** * internal use only */ @@ -148,92 +128,6 @@ class AsyncThreads { static int callback(Reactor *reactor, Event *event); }; -#ifdef SW_USE_IOURING -class AsyncIouring { - private: - int ring_fd; - uint64_t task_num = 0; - uint64_t entries = 8192; - struct io_uring ring; - std::queue waiting_tasks; - network::Socket *iou_socket = nullptr; - Reactor *reactor = nullptr; - - inline struct io_uring_sqe *get_iouring_sqe() { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); - // We need to reset the values of each sqe structure so that they can be used in a loop. - if (sqe) { - memset(sqe, 0, sizeof(struct io_uring_sqe)); - } - return sqe; - } - - inline bool submit_iouring_sqe(AsyncEvent *event) { - int ret = io_uring_submit(&ring); - - if (ret < 0) { - errno = -ret; - if (ret == -EAGAIN) { - waiting_tasks.push(event); - return true; - } - return false; - } - - task_num++; - return true; - } - - public: - AsyncIouring(Reactor *reactor_); - ~AsyncIouring(); - - enum opcodes { - SW_IORING_OP_OPENAT = IORING_OP_OPENAT, - SW_IORING_OP_CLOSE = IORING_OP_CLOSE, - SW_IORING_OP_STATX = IORING_OP_STATX, - SW_IORING_OP_READ = IORING_OP_READ, - SW_IORING_OP_WRITE = IORING_OP_WRITE, - SW_IORING_OP_RENAMEAT = IORING_OP_RENAMEAT, - SW_IORING_OP_UNLINKAT = IORING_OP_UNLINKAT, - SW_IORING_OP_MKDIRAT = IORING_OP_MKDIRAT, - - SW_IORING_OP_FSTAT = 1000, - SW_IORING_OP_LSTAT = 1001, - SW_IORING_OP_UNLINK_FILE = 1002, - SW_IORING_OP_UNLINK_DIR = 1003, - SW_IORING_OP_FSYNC = 1004, - SW_IORING_OP_FDATASYNC = 1005, - }; - - enum flags { - SW_IOURING_DEFAULT = 0, - SW_IOURING_SQPOLL = IORING_SETUP_SQPOLL, - }; - - void add_event(); - void delete_event(); - bool wakeup(); - bool open(AsyncEvent *event); - bool close(AsyncEvent *event); - bool wr(AsyncEvent *event); - bool statx(AsyncEvent *event); - bool mkdir(AsyncEvent *event); - bool unlink(AsyncEvent *event); - bool rename(AsyncEvent *event); - bool fsync(AsyncEvent *event); - inline bool is_empty_waiting_tasks() { - return waiting_tasks.size() == 0; - } - - inline uint64_t get_task_num() { - return task_num; - } - - static int callback(Reactor *reactor, Event *event); -}; -#endif - namespace async { typedef void (*Handler)(AsyncEvent *event); diff --git a/include/swoole_coroutine.h b/include/swoole_coroutine.h index b2d8ea2754a..b80df4eb131 100644 --- a/include/swoole_coroutine.h +++ b/include/swoole_coroutine.h @@ -301,22 +301,6 @@ class Coroutine { namespace coroutine { bool async(async::Handler handler, AsyncEvent &event, double timeout = -1); bool async(const std::function &fn, double timeout = -1); -#ifdef SW_USE_IOURING -int async(AsyncIouring::opcodes opcode, - const char *pathname, - const char *pathname2 = nullptr, - mode_t mode = 0, - int flags = 0, - struct statx *statxbuf = nullptr, - double timeout = -1); -int async(AsyncIouring::opcodes opcode, - int fd, - void *rbuf = nullptr, - const void *wbuf = nullptr, - struct statx *statxbuf = nullptr, - size_t count = 0, - double timeout = -1); -#endif bool run(const CoroutineFunc &fn, void *arg = nullptr); } // namespace coroutine //------------------------------------------------------------------------------- diff --git a/include/swoole_coroutine_system.h b/include/swoole_coroutine_system.h index bc2853f84e5..1ef45e00100 100644 --- a/include/swoole_coroutine_system.h +++ b/include/swoole_coroutine_system.h @@ -72,16 +72,5 @@ class System { static int wait_event(int fd, int events, double timeout); }; std::string gethostbyname_impl_with_async(const std::string &hostname, int domain, double timeout = -1); -//------------------------------------------------------------------------------- -struct AsyncLock { - private: - void *resource_; - public: - AsyncLock(void *resource); - ~AsyncLock(); -}; - -std::shared_ptr async_lock(void *); -//------------------------------------------------------------------------------- } // namespace coroutine } // namespace swoole diff --git a/include/swoole_iouring.h b/include/swoole_iouring.h new file mode 100644 index 00000000000..e2a09a64c1f --- /dev/null +++ b/include/swoole_iouring.h @@ -0,0 +1,139 @@ +/* + +----------------------------------------------------------------------+ + | Swoole | + +----------------------------------------------------------------------+ + | This source file is subject to version 2.0 of the Apache license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.apache.org/licenses/LICENSE-2.0.html | + | If you did not receive a copy of the Apache2.0 license and are unable| + | to obtain it through the world-wide-web, please send a note to | + | license@swoole.com so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: NathanFreeman | + +----------------------------------------------------------------------+ +*/ + +#ifndef SWOOLE_SRC_SWOOLE_IOURING_H +#define SWOOLE_SRC_SWOOLE_IOURING_H + +#ifdef SW_USE_IOURING +#include + +enum opcodes { + SW_IORING_OP_OPENAT = IORING_OP_OPENAT, + SW_IORING_OP_CLOSE = IORING_OP_CLOSE, + SW_IORING_OP_STATX = IORING_OP_STATX, + SW_IORING_OP_READ = IORING_OP_READ, + SW_IORING_OP_WRITE = IORING_OP_WRITE, + SW_IORING_OP_RENAMEAT = IORING_OP_RENAMEAT, + SW_IORING_OP_UNLINKAT = IORING_OP_UNLINKAT, + SW_IORING_OP_MKDIRAT = IORING_OP_MKDIRAT, + + SW_IORING_OP_FSTAT = 1000, + SW_IORING_OP_LSTAT = 1001, + SW_IORING_OP_UNLINK_FILE = 1002, + SW_IORING_OP_UNLINK_DIR = 1003, + SW_IORING_OP_FSYNC = 1004, + SW_IORING_OP_FDATASYNC = 1005, +}; + +namespace swoole { +struct IouringEvent { + int fd; + int flags; + int opcode; + mode_t mode; + uint64_t count; // share with offset + ssize_t result; + void *rbuf; + void *coroutine; + const void *wbuf; + const char *pathname; + const char *pathname2; + struct statx *statxbuf; +}; + +class Iouring { + private: + int ring_fd; + uint64_t task_num = 0; + uint64_t entries = 8192; + struct io_uring ring; + std::queue waiting_tasks; + network::Socket *iou_socket = nullptr; + Reactor *reactor = nullptr; + + void add_event(); + void delete_event(); + bool wakeup(); + bool open(IouringEvent *event); + bool close(IouringEvent *event); + bool wr(IouringEvent *event); + bool statx(IouringEvent *event); + bool mkdir(IouringEvent *event); + bool unlink(IouringEvent *event); + bool rename(IouringEvent *event); + bool fsync(IouringEvent *event); + + inline struct io_uring_sqe *get_iouring_sqe() { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); + // We need to reset the values of each sqe structure so that they can be used in a loop. + if (sqe) { + memset(sqe, 0, sizeof(struct io_uring_sqe)); + } + return sqe; + } + + inline bool submit_iouring_sqe(IouringEvent *event) { + int ret = io_uring_submit(&ring); + + if (ret < 0) { + errno = -ret; + if (ret == -EAGAIN) { + waiting_tasks.push(event); + return true; + } + return false; + } + + task_num++; + return true; + } + + static Iouring *create_iouring(); + + public: + Iouring(Reactor *reactor_); + ~Iouring(); + + enum flags { + SW_IOURING_DEFAULT = 0, + SW_IOURING_SQPOLL = IORING_SETUP_SQPOLL, + }; + + inline bool is_empty_waiting_tasks() { + return waiting_tasks.size() == 0; + } + + inline uint64_t get_task_num() { + return task_num; + } + + static int async(opcodes opcode, + int fd = 0, + uint64_t count = 0, + void *rbuf = nullptr, + const void *wbuf = nullptr, + struct statx *statxbuf = nullptr); + static int async(opcodes opcode, + const char *pathname = nullptr, + const char *pathname2 = nullptr, + struct statx *statxbuf = nullptr, + int flags = 0, + mode_t mode = 0); + static int callback(Reactor *reactor, Event *event); +}; +}; // namespace swoole +#endif +#endif diff --git a/src/coroutine/iouring.cc b/src/coroutine/iouring.cc index f1de793e0ec..4727e9e2c50 100644 --- a/src/coroutine/iouring.cc +++ b/src/coroutine/iouring.cc @@ -17,13 +17,12 @@ #include #include -#include "swoole_coroutine_socket.h" -#include "swoole_coroutine_system.h" +#include "swoole_coroutine.h" +#include "swoole_iouring.h" #ifdef SW_USE_IOURING -using swoole::AsyncIouring; +using swoole::Iouring; using swoole::Coroutine; -using swoole::coroutine::async; static sw_inline bool is_no_coro() { return SwooleTG.reactor == nullptr || !Coroutine::get_current(); @@ -33,14 +32,14 @@ int swoole_coroutine_iouring_open(const char *pathname, int flags, mode_t mode) if (sw_unlikely(is_no_coro())) { return open(pathname, flags, mode); } - return async(AsyncIouring::SW_IORING_OP_OPENAT, pathname, nullptr, mode, flags); + return Iouring::async(SW_IORING_OP_OPENAT, pathname, nullptr, nullptr, flags, mode); } int swoole_coroutine_iouring_close_file(int fd) { if (sw_unlikely(is_no_coro())) { return close(fd); } - return async(AsyncIouring::SW_IORING_OP_CLOSE, fd); + return Iouring::async(SW_IORING_OP_CLOSE, fd); } ssize_t swoole_coroutine_iouring_read(int sockfd, void *buf, size_t count) { @@ -48,7 +47,7 @@ ssize_t swoole_coroutine_iouring_read(int sockfd, void *buf, size_t count) { return read(sockfd, buf, count); } - return async(AsyncIouring::SW_IORING_OP_READ, sockfd, buf, nullptr, nullptr, count); + return Iouring::async(SW_IORING_OP_READ, sockfd, count, buf); } ssize_t swoole_coroutine_iouring_write(int sockfd, const void *buf, size_t count) { @@ -56,28 +55,28 @@ ssize_t swoole_coroutine_iouring_write(int sockfd, const void *buf, size_t count return write(sockfd, buf, count); } - return async(AsyncIouring::SW_IORING_OP_WRITE, sockfd, nullptr, buf, nullptr, count);; + return Iouring::async(SW_IORING_OP_WRITE, sockfd, count, nullptr, buf); } int swoole_coroutine_iouring_rename(const char *oldpath, const char *newpath) { if (sw_unlikely(is_no_coro())) { return rename(oldpath, newpath); } - return async(AsyncIouring::SW_IORING_OP_RENAMEAT, oldpath, newpath); + return Iouring::async(SW_IORING_OP_RENAMEAT, oldpath, newpath); } int swoole_coroutine_iouring_mkdir(const char *pathname, mode_t mode) { if (sw_unlikely(is_no_coro())) { return mkdir(pathname, mode); } - return async(AsyncIouring::SW_IORING_OP_MKDIRAT, pathname, nullptr, mode); + return Iouring::async(SW_IORING_OP_MKDIRAT, pathname, nullptr, nullptr, 0, mode); } int swoole_coroutine_iouring_unlink(const char *pathname) { if (sw_unlikely(is_no_coro())) { return unlink(pathname); } - return async(AsyncIouring::SW_IORING_OP_UNLINK_FILE, pathname); + return Iouring::async(SW_IORING_OP_UNLINK_FILE, pathname); } void swoole_statx_to_stat(const struct statx *statxbuf, struct stat *statbuf) { @@ -105,7 +104,7 @@ int swoole_coroutine_iouring_fstat(int fd, struct stat *statbuf) { } struct statx statxbuf = {}; - int retval = async(AsyncIouring::SW_IORING_OP_FSTAT, fd, nullptr, nullptr, &statxbuf); + int retval = Iouring::async(SW_IORING_OP_FSTAT, fd, 0, nullptr, nullptr, &statxbuf); swoole_statx_to_stat(&statxbuf, statbuf); return retval; } @@ -116,7 +115,7 @@ int swoole_coroutine_iouring_stat(const char *path, struct stat *statbuf) { } struct statx statxbuf = {}; - int retval = async(AsyncIouring::SW_IORING_OP_LSTAT, path, nullptr, 0, 0, &statxbuf); + int retval = Iouring::async(SW_IORING_OP_LSTAT, path, nullptr, &statxbuf); swoole_statx_to_stat(&statxbuf, statbuf); return retval; } @@ -127,7 +126,7 @@ int swoole_coroutine_iouring_lstat(const char *path, struct stat *statbuf) { } struct statx statxbuf = {}; - int retval = async(AsyncIouring::SW_IORING_OP_LSTAT, path, nullptr, 0, 0, &statxbuf); + int retval = Iouring::async(SW_IORING_OP_LSTAT, path, nullptr, &statxbuf); swoole_statx_to_stat(&statxbuf, statbuf); return retval; } @@ -137,7 +136,7 @@ int swoole_coroutine_iouring_rmdir(const char *pathname) { return rmdir(pathname); } - return async(AsyncIouring::SW_IORING_OP_UNLINK_DIR, pathname); + return Iouring::async(SW_IORING_OP_UNLINK_DIR, pathname); } int swoole_coroutine_iouring_fsync(int fd) { @@ -145,7 +144,7 @@ int swoole_coroutine_iouring_fsync(int fd) { return fsync(fd); } - return async(AsyncIouring::SW_IORING_OP_FSYNC, fd); + return Iouring::async(SW_IORING_OP_FSYNC, fd); } int swoole_coroutine_iouring_fdatasync(int fd) { @@ -153,6 +152,6 @@ int swoole_coroutine_iouring_fdatasync(int fd) { return fdatasync(fd); } - return async(AsyncIouring::SW_IORING_OP_FDATASYNC, fd); + return Iouring::async(SW_IORING_OP_FDATASYNC, fd); } #endif diff --git a/src/coroutine/system.cc b/src/coroutine/system.cc index 9276b9e2850..35972fa41c8 100644 --- a/src/coroutine/system.cc +++ b/src/coroutine/system.cc @@ -605,7 +605,7 @@ void System::init_reactor(Reactor *reactor) { reactor->set_handler(SW_FD_AIO | SW_EVENT_READ, AsyncThreads::callback); #ifdef SW_USE_IOURING - reactor->set_handler(SW_FD_IOURING | SW_EVENT_READ, AsyncIouring::callback); + reactor->set_handler(SW_FD_IOURING | SW_EVENT_READ, Iouring::callback); #endif } @@ -687,119 +687,5 @@ bool async(const std::function &fn, double timeout) { return true; } } - -#ifdef SW_USE_IOURING -int async(AsyncIouring::opcodes opcode, - const char *pathname, - const char *pathname2, - mode_t mode, - int flags, - struct statx *statxbuf, - double timeout) { - if (SwooleTG.async_iouring == nullptr) { - SwooleTG.async_iouring = new AsyncIouring(SwooleTG.reactor); - SwooleTG.async_iouring->add_event(); - } - - AsyncEvent event{}; - AsyncLambdaTask task{Coroutine::get_current_safe(), nullptr}; - - event.object = &task; - event.callback = async_lambda_callback; - event.opcode = opcode; - event.pathname = pathname; - event.pathname2 = pathname2; - event.mode = mode; - event.flags = flags; - event.statxbuf = statxbuf; - - bool result = false; - AsyncIouring *iouring = SwooleTG.async_iouring; - if (opcode == AsyncIouring::SW_IORING_OP_OPENAT) { - result = iouring->open(&event); - } else if (opcode == AsyncIouring::SW_IORING_OP_MKDIRAT) { - result = iouring->mkdir(&event); - } else if (opcode == AsyncIouring::SW_IORING_OP_UNLINK_FILE || opcode == AsyncIouring::SW_IORING_OP_UNLINK_DIR) { - result = iouring->unlink(&event); - } else if (opcode == AsyncIouring::SW_IORING_OP_RENAMEAT) { - result = iouring->rename(&event); - } else if (opcode == AsyncIouring::SW_IORING_OP_FSTAT || opcode == AsyncIouring::SW_IORING_OP_LSTAT) { - result = iouring->statx(&event); - } - - if (!result || !task.co->yield_ex(timeout)) { - return 0; - } - - return event.retval; -} - -int async(AsyncIouring::opcodes opcode, - int fd, - void *rbuf, - const void *wbuf, - struct statx *statxbuf, - size_t count, - double timeout) { - if (SwooleTG.async_iouring == nullptr) { - SwooleTG.async_iouring = new AsyncIouring(SwooleTG.reactor); - SwooleTG.async_iouring->add_event(); - } - - AsyncEvent event{}; - AsyncLambdaTask task{Coroutine::get_current_safe(), nullptr}; - - event.object = &task; - event.callback = async_lambda_callback; - event.opcode = opcode; - event.fd = fd; - event.rbuf = rbuf; - event.wbuf = wbuf; - event.statxbuf = statxbuf; - event.count = count; - - bool result = false; - AsyncIouring *iouring = SwooleTG.async_iouring; - if (opcode == AsyncIouring::SW_IORING_OP_READ || opcode == AsyncIouring::SW_IORING_OP_WRITE) { - result = iouring->wr(&event); - } else if (opcode == AsyncIouring::SW_IORING_OP_CLOSE) { - result = iouring->close(&event); - } else if (opcode == AsyncIouring::SW_IORING_OP_FSTAT) { - result = iouring->statx(&event); - } else if (opcode == AsyncIouring::SW_IORING_OP_FSYNC || opcode == AsyncIouring::SW_IORING_OP_FDATASYNC) { - result = iouring->fsync(&event); - } - - if (!result || !task.co->yield_ex(timeout)) { - return 0; - } - - return event.retval; -} -#endif - -AsyncLock::AsyncLock(void *resource) { - resource_ = resource; - async_resource_map.emplace(resource, Coroutine::get_current_cid()); -} - -AsyncLock::~AsyncLock() { - async_resource_map.erase(resource_); -} - -std::shared_ptr async_lock(void *resource) { - auto iter = async_resource_map.find(resource); - if (iter != async_resource_map.end()) { - swoole_fatal_error(SW_ERROR_CO_HAS_BEEN_BOUND, - "resource(%p) has already been bound to another coroutine#%ld, " - "%s of the same resource in coroutine#%ld at the same time is not allowed", - resource, - *iter, - Coroutine::get_current_cid()); - return nullptr; - } - return std::make_shared(resource); -} - } // namespace coroutine } // namespace swoole diff --git a/src/os/iouring.cc b/src/os/iouring.cc index d14bb6e7be9..e3f1b484964 100644 --- a/src/os/iouring.cc +++ b/src/os/iouring.cc @@ -14,21 +14,15 @@ +----------------------------------------------------------------------+ */ -#include "swoole.h" -#include "swoole_api.h" -#include "swoole_socket.h" -#include "swoole_reactor.h" -#include "swoole_string.h" -#include "swoole_signal.h" -#include "swoole_pipe.h" -#include "swoole_async.h" -#include "swoole_util.h" #include "swoole_coroutine.h" +#include "swoole_iouring.h" #ifdef SW_USE_IOURING +using swoole::Coroutine; + namespace swoole { //------------------------------------------------------------------------------- -AsyncIouring::AsyncIouring(Reactor *reactor_) { +Iouring::Iouring(Reactor *reactor_) { if (!SwooleTG.reactor) { swoole_warning("no event loop, cannot initialized"); throw swoole::Exception(SW_ERROR_WRONG_OPERATION); @@ -71,24 +65,23 @@ AsyncIouring::AsyncIouring(Reactor *reactor_) { } reactor->set_exit_condition(Reactor::EXIT_CONDITION_IOURING, [](Reactor *reactor, size_t &event_num) -> bool { - if (SwooleTG.async_iouring && SwooleTG.async_iouring->get_task_num() == 0 && - SwooleTG.async_iouring->is_empty_waiting_tasks()) { + if (SwooleTG.iouring && SwooleTG.iouring->get_task_num() == 0 && SwooleTG.iouring->is_empty_waiting_tasks()) { event_num--; } return true; }); reactor->add_destroy_callback([](void *data) { - if (!SwooleTG.async_iouring) { + if (!SwooleTG.iouring) { return; } - SwooleTG.async_iouring->delete_event(); - delete SwooleTG.async_iouring; - SwooleTG.async_iouring = nullptr; + SwooleTG.iouring->delete_event(); + delete SwooleTG.iouring; + SwooleTG.iouring = nullptr; }); } -AsyncIouring::~AsyncIouring() { +Iouring::~Iouring() { if (ring_fd >= 0) { ::close(ring_fd); } @@ -100,20 +93,20 @@ AsyncIouring::~AsyncIouring() { io_uring_queue_exit(&ring); } -void AsyncIouring::add_event() { +void Iouring::add_event() { reactor->add(iou_socket, SW_EVENT_READ); } -void AsyncIouring::delete_event() { +void Iouring::delete_event() { reactor->del(iou_socket); } -bool AsyncIouring::wakeup() { +bool Iouring::wakeup() { unsigned count = 0; unsigned num = 8192; void *data = nullptr; - AsyncEvent *task = nullptr; - AsyncEvent *waiting_task = nullptr; + IouringEvent *task = nullptr; + IouringEvent *waiting_task = nullptr; struct io_uring_cqe *cqe = nullptr; struct io_uring_cqe *cqes[num]; @@ -126,7 +119,7 @@ bool AsyncIouring::wakeup() { for (unsigned i = 0; i < count; i++) { cqe = cqes[i]; data = io_uring_cqe_get_data(cqe); - task = reinterpret_cast(data); + task = reinterpret_cast(data); task_num--; if (cqe->res < 0) { errno = -(cqe->res); @@ -142,32 +135,32 @@ bool AsyncIouring::wakeup() { } } - task->retval = (cqe->res >= 0 ? cqe->res : -1); + task->result = (cqe->res >= 0 ? cqe->res : -1); io_uring_cq_advance(&ring, 1); - task->callback(task); + + Coroutine *coroutine = reinterpret_cast(task->coroutine); + coroutine->resume(); if (!is_empty_waiting_tasks()) { waiting_task = waiting_tasks.front(); waiting_tasks.pop(); - if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_OPENAT) { + if (waiting_task->opcode == SW_IORING_OP_OPENAT) { open(waiting_task); - } else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_CLOSE) { + } else if (waiting_task->opcode == SW_IORING_OP_CLOSE) { close(waiting_task); - } else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_FSTAT || - waiting_task->opcode == AsyncIouring::SW_IORING_OP_LSTAT) { + } else if (waiting_task->opcode == SW_IORING_OP_FSTAT || waiting_task->opcode == SW_IORING_OP_LSTAT) { statx(waiting_task); - } else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_READ || - waiting_task->opcode == AsyncIouring::SW_IORING_OP_WRITE) { + } else if (waiting_task->opcode == SW_IORING_OP_READ || waiting_task->opcode == SW_IORING_OP_WRITE) { wr(waiting_task); - } else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_RENAMEAT) { + } else if (waiting_task->opcode == SW_IORING_OP_RENAMEAT) { rename(waiting_task); - } else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_UNLINK_FILE || - waiting_task->opcode == AsyncIouring::SW_IORING_OP_UNLINK_DIR) { + } else if (waiting_task->opcode == SW_IORING_OP_UNLINK_FILE || + waiting_task->opcode == SW_IORING_OP_UNLINK_DIR) { unlink(waiting_task); - } else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_MKDIRAT) { + } else if (waiting_task->opcode == SW_IORING_OP_MKDIRAT) { mkdir(waiting_task); - } else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_FSYNC || - waiting_task->opcode == AsyncIouring::SW_IORING_OP_FDATASYNC) { + } else if (waiting_task->opcode == SW_IORING_OP_FSYNC || + waiting_task->opcode == SW_IORING_OP_FDATASYNC) { fsync(waiting_task); } } @@ -177,7 +170,7 @@ bool AsyncIouring::wakeup() { return true; } -bool AsyncIouring::open(AsyncEvent *event) { +bool Iouring::open(IouringEvent *event) { struct io_uring_sqe *sqe = get_iouring_sqe(); if (!sqe) { waiting_tasks.push(event); @@ -194,7 +187,7 @@ bool AsyncIouring::open(AsyncEvent *event) { return submit_iouring_sqe(event); } -bool AsyncIouring::close(AsyncEvent *event) { +bool Iouring::close(IouringEvent *event) { struct io_uring_sqe *sqe = get_iouring_sqe(); if (!sqe) { waiting_tasks.push(event); @@ -208,7 +201,7 @@ bool AsyncIouring::close(AsyncEvent *event) { return submit_iouring_sqe(event); } -bool AsyncIouring::wr(AsyncEvent *event) { +bool Iouring::wr(IouringEvent *event) { struct io_uring_sqe *sqe = get_iouring_sqe(); if (!sqe) { waiting_tasks.push(event); @@ -225,7 +218,7 @@ bool AsyncIouring::wr(AsyncEvent *event) { return submit_iouring_sqe(event); } -bool AsyncIouring::statx(AsyncEvent *event) { +bool Iouring::statx(IouringEvent *event) { struct io_uring_sqe *sqe = get_iouring_sqe(); if (!sqe) { waiting_tasks.push(event); @@ -249,7 +242,7 @@ bool AsyncIouring::statx(AsyncEvent *event) { return submit_iouring_sqe(event); } -bool AsyncIouring::mkdir(AsyncEvent *event) { +bool Iouring::mkdir(IouringEvent *event) { struct io_uring_sqe *sqe = get_iouring_sqe(); if (!sqe) { waiting_tasks.push(event); @@ -265,7 +258,7 @@ bool AsyncIouring::mkdir(AsyncEvent *event) { return submit_iouring_sqe(event); } -bool AsyncIouring::unlink(AsyncEvent *event) { +bool Iouring::unlink(IouringEvent *event) { struct io_uring_sqe *sqe = get_iouring_sqe(); if (!sqe) { waiting_tasks.push(event); @@ -284,7 +277,7 @@ bool AsyncIouring::unlink(AsyncEvent *event) { return submit_iouring_sqe(event); } -bool AsyncIouring::rename(AsyncEvent *event) { +bool Iouring::rename(IouringEvent *event) { struct io_uring_sqe *sqe = get_iouring_sqe(); if (!sqe) { waiting_tasks.push(event); @@ -302,7 +295,7 @@ bool AsyncIouring::rename(AsyncEvent *event) { return submit_iouring_sqe(event); } -bool AsyncIouring::fsync(AsyncEvent *event) { +bool Iouring::fsync(IouringEvent *event) { struct io_uring_sqe *sqe = get_iouring_sqe(); if (!sqe) { waiting_tasks.push(event); @@ -324,8 +317,82 @@ bool AsyncIouring::fsync(AsyncEvent *event) { return submit_iouring_sqe(event); } -int AsyncIouring::callback(Reactor *reactor, Event *event) { - AsyncIouring *iouring = SwooleTG.async_iouring; +Iouring *Iouring::create_iouring() { + if (SwooleTG.iouring == nullptr) { + SwooleTG.iouring = new Iouring(SwooleTG.reactor); + SwooleTG.iouring->add_event(); + } + + return SwooleTG.iouring; +} + +int Iouring::async(opcodes opcode, int fd, uint64_t count, void *rbuf, const void *wbuf, struct statx *statxbuf) { + IouringEvent event{}; + event.fd = fd; + event.rbuf = rbuf; + event.wbuf = wbuf; + event.count = count; + event.opcode = opcode; + event.statxbuf = statxbuf; + + Coroutine *coroutine = Coroutine::get_current_safe(); + event.coroutine = (void *) coroutine; + + bool result = false; + Iouring *iouring = create_iouring(); + if (opcode == SW_IORING_OP_READ || opcode == SW_IORING_OP_WRITE) { + result = iouring->wr(&event); + } else if (opcode == SW_IORING_OP_CLOSE) { + result = iouring->close(&event); + } else if (opcode == SW_IORING_OP_FSTAT) { + result = iouring->statx(&event); + } else if (opcode == SW_IORING_OP_FSYNC || opcode == SW_IORING_OP_FDATASYNC) { + result = iouring->fsync(&event); + } + + if (!result || !coroutine->yield_ex()) { + return 0; + } + + return event.result; +} + +int Iouring::async( + opcodes opcode, const char *pathname, const char *pathname2, struct statx *statxbuf, int flags, mode_t mode) { + IouringEvent event{}; + event.mode = mode; + event.flags = flags; + event.opcode = opcode; + event.pathname = pathname; + event.pathname2 = pathname2; + event.statxbuf = statxbuf; + + Coroutine *coroutine = Coroutine::get_current_safe(); + event.coroutine = (void *) coroutine; + + bool result = false; + Iouring *iouring = create_iouring(); + if (opcode == SW_IORING_OP_OPENAT) { + result = iouring->open(&event); + } else if (opcode == SW_IORING_OP_MKDIRAT) { + result = iouring->mkdir(&event); + } else if (opcode == SW_IORING_OP_UNLINK_FILE || opcode == SW_IORING_OP_UNLINK_DIR) { + result = iouring->unlink(&event); + } else if (opcode == SW_IORING_OP_RENAMEAT) { + result = iouring->rename(&event); + } else if (opcode == SW_IORING_OP_LSTAT) { + result = iouring->statx(&event); + } + + if (!result || !coroutine->yield_ex()) { + return 0; + } + + return event.result; +} + +int Iouring::callback(Reactor *reactor, Event *event) { + Iouring *iouring = SwooleTG.iouring; return iouring->wakeup() ? 1 : 0; } } // namespace swoole