Skip to content

Commit

Permalink
Optimize the details of iouring, Fix core tests
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Nov 1, 2024
1 parent b0b32c4 commit e5b1c90
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 68 deletions.
7 changes: 2 additions & 5 deletions core-tests/src/coroutine/async.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#include "test_coroutine.h"
#ifdef HAVE_SWOOLE_DIR
#include "swoole_async.h"
#else
#include "swoole/swoole_async.h"
#endif

#include <iostream>
#include <regex>

Expand Down Expand Up @@ -57,7 +54,7 @@ TEST(coroutine_async, error) {
coroutine::run([](void *arg) {
int retval = 0x7009501;
const char *test_file = "/tmp/swoole_core_test_file_not_exists";
swoole::coroutine::async([&](void) { retval = open(test_file, O_RDONLY); }, -1);
swoole::coroutine::async([&](void) { retval = open(test_file, O_RDONLY); });
ASSERT_EQ(retval, -1);
ASSERT_EQ(errno, ENOENT);
});
Expand Down
4 changes: 2 additions & 2 deletions examples/runtime/file.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php
Swoole\Runtime::enableCoroutine();
go(function() {
$fp = fopen('data.txt', 'w');
Co\run(function() {
$fp = fopen('data.txt', 'w+');
echo "open\n";
fwrite($fp, str_repeat('A', 1024));
fwrite($fp, str_repeat('B', 1024));
Expand Down
4 changes: 2 additions & 2 deletions ext-src/swoole_socket_coro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ static void socket_coro_write_vector(INTERNAL_FUNCTION_PARAMETERS, const bool al
iovcnt = zend_array_count(vht);

if (iovcnt > IOV_MAX) {
sw_tg_buffer()->length = sw_snprintf(sw_tg_buffer()->str, sw_tg_buffer()->size, IOV_MAX_ERROR_MSG, IOV_MAX);
sw_tg_buffer()->length = sw_snprintf(sw_tg_buffer()->str, sw_tg_buffer()->size, SW_IOV_MAX_ERROR_MSG, IOV_MAX);
sock->socket->set_err(EINVAL, sw_tg_buffer()->to_std_string());
RETURN_FALSE;
}
Expand Down Expand Up @@ -1639,7 +1639,7 @@ static void socket_coro_read_vector(INTERNAL_FUNCTION_PARAMETERS, const bool all
iovcnt = zend_array_count(vht);

if (iovcnt > IOV_MAX) {
sw_tg_buffer()->length = sw_snprintf(sw_tg_buffer()->str, sw_tg_buffer()->size, IOV_MAX_ERROR_MSG, IOV_MAX);
sw_tg_buffer()->length = sw_snprintf(sw_tg_buffer()->str, sw_tg_buffer()->size, SW_IOV_MAX_ERROR_MSG, IOV_MAX);
sock->socket->set_err(EINVAL, sw_tg_buffer()->to_std_string());
RETURN_FALSE;
}
Expand Down
1 change: 1 addition & 0 deletions ext-src/swoole_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sys/resource.h>

#include <unordered_map>
#include <atomic>

#include "swoole_thread.h"

Expand Down
21 changes: 15 additions & 6 deletions include/swoole_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ class Client {
std::shared_ptr<SSLContext> ssl_context = nullptr;
#endif

std::function<void (Client *cli)> onConnect = nullptr;
std::function<void (Client *cli)> onError = nullptr;
std::function<void (Client *cli, const char *, size_t)> onReceive = nullptr;
std::function<void (Client *cli)> onClose = nullptr;
std::function<void (Client *cli)> onBufferFull = nullptr;
std::function<void (Client *cli)> onBufferEmpty = nullptr;
std::function<void(Client *cli)> onConnect = nullptr;
std::function<void(Client *cli)> onError = nullptr;
std::function<void(Client *cli, const char *, size_t)> onReceive = nullptr;
std::function<void(Client *cli)> onClose = nullptr;
std::function<void(Client *cli)> onBufferFull = nullptr;
std::function<void(Client *cli)> onBufferEmpty = nullptr;

int (*connect)(Client *cli, const char *host, int port, double _timeout, int sock_flag) = nullptr;
ssize_t (*send)(Client *cli, const char *data, size_t length, int flags) = nullptr;
Expand All @@ -133,6 +133,15 @@ class Client {
return socket->socket_type;
}

const std::string *get_http_proxy_host_name() {
#ifdef SW_USE_OPENSSL
if (ssl_context && !ssl_context->tls_host_name.empty()) {
return &ssl_context->tls_host_name;
}
#endif
return &http_proxy->target_host;
}

int sleep();
int wakeup();
int shutdown(int __how);
Expand Down
4 changes: 3 additions & 1 deletion include/swoole_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@
#define IOV_MAX 16
#endif

#define IOV_MAX_ERROR_MSG "The maximum of iov count is %d"
#define SW_IOV_MAX_ERROR_MSG "The maximum of iov count is %d"

#define SW_IOURING_CQES_SIZE 8192

/**
* HTTP Protocol
Expand Down
1 change: 0 additions & 1 deletion scripts/make.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ COMPILE_PARAMS="--enable-openssl \
--enable-swoole-curl \
--enable-cares \
--enable-swoole-pgsql \
--enable-iouring \
--with-swoole-odbc=unixODBC,/usr \
--enable-swoole-sqlite"

Expand Down
80 changes: 56 additions & 24 deletions src/coroutine/iouring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Iouring::Iouring(Reactor *_reactor) {
}

if (SwooleG.iouring_workers > 0) {
unsigned int workers[2] = {SwooleG.iouring_workers, SwooleG.iouring_workers};
uint32_t workers[2] = {SwooleG.iouring_workers, SwooleG.iouring_workers};
ret = io_uring_register_iowq_max_workers(&ring, workers);

if (ret < 0) {
Expand All @@ -93,6 +93,7 @@ Iouring::Iouring(Reactor *_reactor) {
}

ring_socket = make_socket(ring.ring_fd, SW_FD_IOURING);
ring_socket->object = this;

reactor->set_exit_condition(Reactor::EXIT_CONDITION_IOURING, [](Reactor *reactor, size_t &event_num) -> bool {
if (SwooleTG.iouring && SwooleTG.iouring->get_task_num() == 0 && SwooleTG.iouring->is_empty_waiting_tasks()) {
Expand All @@ -113,14 +114,16 @@ Iouring::Iouring(Reactor *_reactor) {
}

Iouring::~Iouring() {
if (ring_socket) {
if (!ring_socket->removed) {
reactor->del(ring_socket);
}
ring_socket->move_fd();
ring_socket->free();
ring_socket = nullptr;
if (!ring_socket) {
return;
}

if (!ring_socket->removed) {
reactor->del(ring_socket);
}
ring_socket->move_fd();
ring_socket->free();
ring_socket = nullptr;

io_uring_queue_exit(&ring);
}
Expand All @@ -130,24 +133,18 @@ bool Iouring::ready() {
}

bool Iouring::wakeup() {
unsigned count = 0;
unsigned num = 8192;
void *data = nullptr;
IouringEvent *event = nullptr;
IouringEvent *waiting_task = nullptr;
struct io_uring_cqe *cqe = nullptr;
struct io_uring_cqe *cqes[num];
struct io_uring_cqe *cqes[SW_IOURING_CQES_SIZE];

while (true) {
count = io_uring_peek_batch_cqe(&ring, cqes, num);
auto count = io_uring_peek_batch_cqe(&ring, cqes, SW_IOURING_CQES_SIZE);
if (count == 0) {
return true;
}

for (unsigned i = 0; i < count; i++) {
cqe = cqes[i];
data = io_uring_cqe_get_data(cqe);
event = static_cast<IouringEvent *>(data);
for (decltype(count) i = 0; i < count; i++) {
struct io_uring_cqe *cqe = cqes[i];
IouringEvent *task = static_cast<IouringEvent *>(io_uring_cqe_get_data(cqe));
task_num--;
if (cqe->res < 0) {
errno = -(cqe->res);
Expand All @@ -158,15 +155,15 @@ bool Iouring::wakeup() {
*/
if (cqe->res == -EAGAIN) {
io_uring_cq_advance(&ring, 1);
waiting_tasks.push(event);
waiting_tasks.push(task);
continue;
}
}

event->result = (cqe->res >= 0 ? cqe->res : -1);
task->result = (cqe->res >= 0 ? cqe->res : -1);
io_uring_cq_advance(&ring, 1);

event->coroutine->resume();
task->coroutine->resume();

if (!is_empty_waiting_tasks()) {
waiting_task = waiting_tasks.front();
Expand All @@ -181,7 +178,42 @@ bool Iouring::wakeup() {
return true;
}

static const char *get_opcode_name(IouringOpcode opcode) {
switch (opcode) {
case SW_IORING_OP_OPENAT:
return "OPENAT";
case SW_IORING_OP_CLOSE:
return "CLOSE";
case SW_IORING_OP_STATX:
return "STATX";
case SW_IORING_OP_READ:
return "READ";
case SW_IORING_OP_WRITE:
return "WRITE";
case SW_IORING_OP_RENAMEAT:
return "RENAMEAT";
case SW_IORING_OP_MKDIRAT:
return "MKDIRAT";
case SW_IORING_OP_FSTAT:
return "FSTAT";
case SW_IORING_OP_LSTAT:
return "LSTAT";
case SW_IORING_OP_UNLINK_FILE:
return "UNLINK_FILE";
case SW_IORING_OP_UNLINK_DIR:
return "UNLINK_DIR";
case SW_IORING_OP_FSYNC:
return "FSYNC";
case SW_IORING_OP_FDATASYNC:
return "FDATASYNC";
default:
return "unknown";
}
}

bool Iouring::submit(IouringEvent *event) {
swoole_trace("opcode=%s, fd=%d, path=%s", get_opcode_name(event->opcode), event->fd, event->pathname);

int ret = io_uring_submit(&ring);

if (ret < 0) {
Expand Down Expand Up @@ -434,8 +466,8 @@ int Iouring::stat(const char *path, struct stat *statbuf) {
}

int Iouring::callback(Reactor *reactor, Event *event) {
Iouring *iouring = SwooleTG.iouring;
return iouring->wakeup() ? 1 : 0;
Iouring *iouring = static_cast<Iouring *>(event->socket->object);
return iouring->wakeup() ? SW_OK : SW_ERR;
}
} // namespace swoole
#endif
36 changes: 9 additions & 27 deletions src/network/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,6 @@ static int Client_tcp_connect_sync(Client *cli, const char *host, int port, doub
if (ret >= 0) {
cli->active = 1;

// socks5 proxy
if (cli->socks5_proxy) {
char buf[1024];
Socks5Proxy::pack(buf, cli->socks5_proxy->username.empty() ? 0x00 : 0x02);
Expand All @@ -581,12 +580,7 @@ static int Client_tcp_connect_sync(Client *cli, const char *host, int port, doub
}
} else if (cli->http_proxy) {
auto proxy_buf = sw_tg_buffer();
const std::string *host_name = &cli->http_proxy->target_host;
#ifdef SW_USE_OPENSSL
if (cli->ssl_context && !cli->ssl_context->tls_host_name.empty()) {
host_name = &cli->ssl_context->tls_host_name;
}
#endif
const std::string *host_name = cli->get_http_proxy_host_name();
size_t n_write = cli->http_proxy->pack(proxy_buf, host_name);
if (cli->send(cli, proxy_buf->str, n_write, 0) < 0) {
return SW_ERR;
Expand Down Expand Up @@ -919,7 +913,7 @@ static int Client_onStreamRead(Reactor *reactor, Event *event) {
if (cli->http_proxy && cli->http_proxy->state != SW_HTTP_PROXY_STATE_READY) {
n = event->socket->recv(buf, buf_size, 0);
if (n <= 0) {
_connect_fail:
_connect_fail:
cli->active = 0;
cli->close();
if (cli->onError) {
Expand All @@ -929,8 +923,7 @@ static int Client_onStreamRead(Reactor *reactor, Event *event) {
}
cli->buffer->length += n;
if (!cli->http_proxy->handshake(cli->buffer)) {
swoole_error_log(
SW_LOG_NOTICE, SW_ERROR_HTTP_PROXY_HANDSHAKE_ERROR, "failed to handshake with http proxy");
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_HTTP_PROXY_HANDSHAKE_ERROR, "failed to handshake with http proxy");
goto _connect_fail;
}
cli->http_proxy->state = SW_HTTP_PROXY_STATE_READY;
Expand All @@ -957,7 +950,7 @@ static int Client_onStreamRead(Reactor *reactor, Event *event) {
}

#ifdef SW_USE_OPENSSL
if (cli->open_ssl && cli->socket->ssl_state == SW_SSL_STATE_WAIT_STREAM) {
if (cli->open_ssl && cli->socket->ssl_state != SW_SSL_STATE_READY) {
if (cli->ssl_handshake() < 0) {
goto _connect_fail;
}
Expand Down Expand Up @@ -1132,22 +1125,11 @@ static int Client_onWrite(Reactor *reactor, Event *event) {
}
// http proxy
if (cli->http_proxy && cli->http_proxy->state == SW_HTTP_PROXY_STATE_WAIT) {
#ifdef SW_USE_OPENSSL
if (cli->open_ssl) {
cli->http_proxy->state = SW_HTTP_PROXY_STATE_HANDSHAKE;
auto proxy_buf = sw_tg_buffer();
const std::string *host_name = &cli->http_proxy->target_host;
#ifdef SW_USE_OPENSSL
if (cli->ssl_context && !cli->ssl_context->tls_host_name.empty()) {
host_name = &cli->ssl_context->tls_host_name;
}
#endif
size_t n = cli->http_proxy->pack(proxy_buf, host_name);
swoole_trace_log(SW_TRACE_HTTP_CLIENT, "proxy request: <<EOF\n%.*sEOF", (int) n, proxy_buf->str);

return cli->send(cli, proxy_buf->str, n, 0);
}
#endif
auto proxy_buf = sw_tg_buffer();
const std::string *host_name = cli->get_http_proxy_host_name();
size_t n = cli->http_proxy->pack(proxy_buf, host_name);
swoole_trace_log(SW_TRACE_HTTP_CLIENT, "proxy request: <<EOF\n%.*sEOF", (int) n, proxy_buf->str);
return cli->send(cli, proxy_buf->str, n, 0);
}
#ifdef SW_USE_OPENSSL
if (cli->open_ssl) {
Expand Down

0 comments on commit e5b1c90

Please sign in to comment.