Skip to content

Commit

Permalink
support coroutine feature
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFreeman committed Nov 3, 2024
1 parent 72886f2 commit 7986b01
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 8 deletions.
28 changes: 21 additions & 7 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -930,13 +930,6 @@ EOF
AC_DEFINE(HAVE_CARES, 1, [have c-ares])
fi

if test "$PHP_IOURING" = "yes"; then
PKG_CHECK_MODULES([URING], [liburing])
PHP_EVAL_LIBLINE($URING_LIBS, SWOOLE_SHARED_LIBADD)
PHP_EVAL_INCLINE($URING_CFLAGS)
AC_DEFINE(SW_USE_IOURING, 1, [have io_uring])
fi

AC_SWOOLE_CPU_AFFINITY
AC_SWOOLE_HAVE_REUSEPORT
AC_SWOOLE_HAVE_FUTEX
Expand All @@ -959,6 +952,27 @@ EOF

dnl Check should we link to librt

if test "$PHP_IOURING" = "yes" && test "$SW_OS" = "LINUX"; then
PKG_CHECK_MODULES([URING], [liburing])
PHP_EVAL_LIBLINE($URING_LIBS, SWOOLE_SHARED_LIBADD)
PHP_EVAL_INCLINE($URING_CFLAGS)
AC_DEFINE(SW_USE_IOURING, 1, [have io_uring])

LINUX_VERSION=`uname -r | cut -d '-' -f 1`
LINUX_MAJOR_VERSION=`echo $LINUX_VERSION | cut -d '.' -f 1`
LINUX_MINIO_VERSION=`echo $LINUX_VERSION | cut -d '.' -f 2`

_PKG_CONFIG(URING_VERSION, [modversion], [liburing])
IOURING_MAJOR_VERSION=`echo $pkg_cv_URING_VERSION | cut -d '.' -f 1`
IOURING_MINOR_VERSION=`echo $pkg_cv_URING_VERSION | cut -d '.' -f 2`

if test $IOURING_MAJOR_VERSION > 2 || (test $IOURING_MAJOR_VERSION = 2 && test $IOURING_MINOR_VERSION >= 6); then
if test $LINUX_MAJOR_VERSION > 6 || (test $LINUX_MAJOR_VERSION = 6 && test $LINUX_MAJOR_VERSION >= ); then
AC_DEFINE(HAVE_IOURING_FUTEX, 1, [have io_uring futex feature])
fi
fi
fi

if test "$SW_OS" = "LINUX"; then
GLIBC_VERSION=$(getconf GNU_LIBC_VERSION | awk '{print $2}')
AC_MSG_NOTICE([glibc version: $GLIBC_VERSION])
Expand Down
11 changes: 11 additions & 0 deletions ext-src/swoole_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ using swoole::SpinLock;
#ifdef HAVE_RWLOCK
using swoole::RWLock;
#endif
#ifdef HAVE_IOURING_FUTEX
using swoole::CoroutineLock;
#endif

static zend_class_entry *swoole_lock_ce;
static zend_object_handlers swoole_lock_handlers;
Expand Down Expand Up @@ -127,6 +130,9 @@ void php_swoole_lock_minit(int module_number) {
#ifdef HAVE_SPINLOCK
SW_REGISTER_LONG_CONSTANT("SWOOLE_SPINLOCK", Lock::SPIN_LOCK);
#endif
#ifdef HAVE_IOURING_FUTEX
SW_REGISTER_LONG_CONSTANT("SWOOLE_COROUTINELOCK", Lock::COROUTINE_LOCK);
#endif
}

static PHP_METHOD(swoole_lock, __construct) {
Expand Down Expand Up @@ -158,6 +164,11 @@ static PHP_METHOD(swoole_lock, __construct) {
case Lock::MUTEX:
lock = new Mutex(Mutex::PROCESS_SHARED);
break;
#ifdef HAVE_IOURING_FUTEX
case Lock::COROUTINE_LOCK:
lock = new CoroutineLock();
break;
#endif
default:
zend_throw_exception(swoole_exception_ce, "lock type[%d] is not support", type);
RETURN_FALSE;
Expand Down
8 changes: 8 additions & 0 deletions ext-src/swoole_thread_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ using swoole::SpinLock;
#ifdef HAVE_RWLOCK
using swoole::RWLock;
#endif
#ifdef HAVE_IOURING_FUTEX
using swoole::CoroutineLock;
#endif

zend_class_entry *swoole_thread_lock_ce;
static zend_object_handlers swoole_thread_lock_handlers;
Expand All @@ -50,6 +53,11 @@ struct LockResource : public ThreadResource {
case Lock::RW_LOCK:
lock_ = new RWLock(0);
break;
#endif
#ifdef HAVE_IOURING_FUTEX
case Lock::COROUTINE_LOCK:
lock_ = new CoroutineLock();
break;
#endif
case Lock::MUTEX:
default:
Expand Down
4 changes: 4 additions & 0 deletions include/swoole_iouring.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ class Iouring {
static int rmdir(const char *pathname);
static int fsync(int fd);
static int fdatasync(int fd);
#ifdef HAVE_IOURING_FUTEX
static int wait_futex(uint32_t *futex);
static int wakeup_futex(uint32_t *futex);
#endif

static int callback(Reactor *reactor, Event *event);
};
Expand Down
24 changes: 23 additions & 1 deletion include/swoole_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class Lock {
RW_LOCK = 1,
MUTEX = 3,
SPIN_LOCK = 5,
ATOMIC_LOCK = 6,
#ifdef HAVE_IOURING_FUTEX
COROUTINE_LOCK = 6,
#endif
};
Type get_type() {
return type_;
Expand Down Expand Up @@ -106,6 +108,26 @@ class SpinLock : public Lock {
};
#endif

#ifdef HAVE_IOURING_FUTEX
class CoroutineLock : public Lock {
private:
long cid = 0;
sw_atomic_t *value = nullptr;
void *coroutine = nullptr;

int lock_impl(bool blocking = true);

public:
CoroutineLock();
~CoroutineLock();
int lock_rd() override;
int lock() override;
int unlock() override;
int trylock_rd() override;
int trylock() override;
};
#endif

#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
#define SW_USE_PTHREAD_BARRIER
#endif
Expand Down
48 changes: 48 additions & 0 deletions src/coroutine/iouring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ enum IouringOpcode {
SW_IORING_OP_WRITE = IORING_OP_WRITE,
SW_IORING_OP_RENAMEAT = IORING_OP_RENAMEAT,
SW_IORING_OP_MKDIRAT = IORING_OP_MKDIRAT,
#ifdef HAVE_IOURING_FUTEX
SW_IORING_OP_FUTEX_WAIT = IORING_OP_FUTEX_WAIT,
SW_IORING_OP_FUTEX_WAKE = IORING_OP_FUTEX_WAKE,
#endif

SW_IORING_OP_FSTAT = 1000,
SW_IORING_OP_LSTAT = 1001,
Expand All @@ -54,6 +58,9 @@ struct IouringEvent {
const char *pathname;
const char *pathname2;
struct statx *statxbuf;
#ifdef HAVE_IOURING_FUTEX
uint32_t *futex;
#endif
};

Iouring::Iouring(Reactor *_reactor) {
Expand Down Expand Up @@ -206,6 +213,11 @@ static const char *get_opcode_name(IouringOpcode opcode) {
return "FSYNC";
case SW_IORING_OP_FDATASYNC:
return "FDATASYNC";
#ifdef HAVE_IOURING_FUTEX
case SW_IORING_OP_FUTEX_WAIT:
case SW_IORING_OP_FUTEX_WAKE:
return "FUTEX";
#endif
default:
return "unknown";
}
Expand Down Expand Up @@ -329,6 +341,26 @@ bool Iouring::dispatch(IouringEvent *event) {
sqe->fsync_flags = IORING_FSYNC_DATASYNC;
}
break;
#ifdef HAVE_IOURING_FUTEX
case SW_IORING_OP_FUTEX_WAIT:
sqe->opcode = SW_IORING_OP_FUTEX_WAIT;
sqe->fd = FUTEX2_SIZE_U32;
sqe->off = 1;
sqe->addr = (unsigned long) event->futex;
sqe->len = 0;
sqe->futex_flags = 0;
sqe->addr3 = FUTEX_BITSET_MATCH_ANY;
break;
case SW_IORING_OP_FUTEX_WAKE:
sqe->opcode = SW_IORING_OP_FUTEX_WAKE;
sqe->fd = FUTEX2_SIZE_U32;
sqe->off = 1;
sqe->addr = (unsigned long) event->futex;
sqe->len = 0;
sqe->futex_flags = 0;
sqe->addr3 = FUTEX_BITSET_MATCH_ANY;
break;
#endif
default:
abort();
return false;
Expand Down Expand Up @@ -465,6 +497,22 @@ int Iouring::stat(const char *path, struct stat *statbuf) {
return retval;
}

#ifdef HAVE_IOURING_FUTEX
int Iouring::wait_futex(uint32_t *futex) {
INIT_EVENT(SW_IORING_OP_FUTEX_WAIT);
event.futex = futex;

return execute(&event);
}

int Iouring::wakeup_futex(uint32_t *futex) {
INIT_EVENT(SW_IORING_OP_FUTEX_WAKE);
event.futex = futex;

return execute(&event);
}
#endif

int Iouring::callback(Reactor *reactor, Event *event) {
Iouring *iouring = static_cast<Iouring *>(event->socket->object);
return iouring->wakeup() ? SW_OK : SW_ERR;
Expand Down
95 changes: 95 additions & 0 deletions src/lock/coroutine_lock.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
+----------------------------------------------------------------------+
| Swoole |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| [email protected] so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: NathanFreeman <[email protected]> |
+----------------------------------------------------------------------+
*/

#include "swoole.h"

#ifdef HAVE_IOURING_FUTEX
#include "swoole_coroutine.h"
#include "swoole_iouring.h"
#include "swoole_lock.h"

namespace swoole {
CoroutineLock::CoroutineLock() : Lock() {
type_ = COROUTINE_LOCK;
value = (sw_atomic_t *) sw_mem_pool()->alloc(sizeof(sw_atomic_t));
*value = 0;
}

CoroutineLock::~CoroutineLock() {
sw_mem_pool()->free((void *) value);
value = nullptr;
}

int CoroutineLock::lock() {
return lock_impl(true);
}

int CoroutineLock::trylock() {
return lock_impl(false);
}

int CoroutineLock::lock_rd() {
return lock_impl(true);
}

int CoroutineLock::trylock_rd() {
return lock_impl(false);
}

int CoroutineLock::unlock() {
Coroutine *current_coroutine = Coroutine::get_current();
if (current_coroutine == nullptr) {
swoole_warning("The coroutine lock can only be used in a coroutine environment");
return 0;
}

if (*value == 0) {
return 0;
}

*value = 0;
cid = 0;
coroutine = nullptr;
return Iouring::wakeup_futex((uint32_t *) value) > 0 ? 0 : 1;
}

int CoroutineLock::lock_impl(bool blocking) {
Coroutine *current_coroutine = Coroutine::get_current();
if (current_coroutine == nullptr) {
swoole_warning("The coroutine lock can only be used in a coroutine environment");
return 1;
}

if (current_coroutine == static_cast<Coroutine *>(coroutine) && current_coroutine->get_cid() == cid) {
return 0;
}

int result = 0;
if (!sw_atomic_cmp_set(value, 0, 1)) {
if (!blocking) {
return 1;
}

result = Iouring::wait_futex((uint32_t *) value);
*value = 1;
}

cid = current_coroutine->get_cid();
coroutine = (void *) current_coroutine;
return result;
}
} // namespace swoole
#endif
71 changes: 71 additions & 0 deletions tests/swoole_iouring/coroutine_lock.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
--TEST--
swoole_lock: coroutine lock
--SKIPIF--
<?php
require __DIR__ . '/../include/skipif.inc';
if (!defined('SWOOLE_COROUTINELOCK')) {
skip('coroutine lock require linux kernel >= 6.7 and liburing version >= 2.6');
}
?>
--FILE--
<?php
use Swoole\Lock;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
use Swoole\Coroutine\WaitGroup;

swoole_async_set([
'iouring_workers' => 32,
'iouring_entries' => 20000,
'iouring_flag' => SWOOLE_IOURING_SQPOLL
]);

$lock = new Lock(SWOOLE_COROUTINELOCK);
var_dump($lock->lock());

run(function () use ($argv, $lock) {
$waitGroup = new WaitGroup();
go(function () use ($waitGroup, $lock) {
$waitGroup->add();
$lock->lock();
$lock->lock();
sleep(10);
var_dump(1);
$lock->unlock();
$waitGroup->done();
});

go(function () use ($waitGroup, $lock) {
$waitGroup->add();
sleep(3);
$lock->lock();
var_dump(2);
$lock->unlock();
$waitGroup->done();
});

go(function () use ($waitGroup, $lock) {
$waitGroup->add();
sleep(1);
$lock->lock_read();
var_dump(3);
$lock->unlock();
$waitGroup->done();
});

go(function () use ($waitGroup) {
$waitGroup->add();
var_dump(5);
$waitGroup->done();
});

$waitGroup->wait();
});
?>
--EXPECTF--
%s
bool(false)
int(5)
int(1)
int(3)
int(2)
Loading

0 comments on commit 7986b01

Please sign in to comment.