Skip to content

Commit

Permalink
Allow queueing of callbacks in destructors
Browse files Browse the repository at this point in the history
  • Loading branch information
danog committed Dec 6, 2024
1 parent 358572c commit 7123e1a
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 22 deletions.
54 changes: 32 additions & 22 deletions src/EventLoop/Internal/AbstractDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ abstract class AbstractDriver implements Driver

private readonly \Closure $interruptCallback;
private readonly \Closure $queueCallback;
/** @var \Closure(): (?\Closure(): mixed) */
private readonly \Closure $runCallback;

private readonly \stdClass $internalSuspensionMarker;
Expand Down Expand Up @@ -87,13 +88,16 @@ public function __construct()
/** @psalm-suppress InvalidArgument */
$this->interruptCallback = $this->setInterrupt(...);
$this->queueCallback = $this->queue(...);
$this->runCallback = function () {
if ($this->fiber->isTerminated()) {
$this->createLoopFiber();
}
$this->runCallback =
/** @return ?\Closure(): mixed */
function (): ?\Closure {
if ($this->fiber->isTerminated()) {
$this->createLoopFiber();
}

return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start();
};
// Returns a callback that returns the value of the {main} fiber, or null in case of deadlock.
return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start();
};
}

public function run(): void
Expand Down Expand Up @@ -533,26 +537,32 @@ private function createLoopFiber(): void
{
$this->fiber = new \Fiber(function (): void {
$this->stopped = false;
do {
// Invoke microtasks if we have some
$this->invokeCallbacks();

// Invoke microtasks if we have some
$this->invokeCallbacks();

/** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
while (!$this->stopped) {
if ($this->interrupt) {
$this->invokeInterrupt();
}
/** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
while (!$this->stopped) {
if ($this->interrupt) {
$this->invokeInterrupt();
}

if ($this->isEmpty()) {
return;
}
if ($this->isEmpty()) {
while (\gc_collect_cycles());
if (!$this->microtaskQueue->isEmpty() || !$this->callbackQueue->isEmpty() || $this->interrupt) {
continue 2;
}
return;
}

$previousIdle = $this->idle;
$this->idle = true;
$previousIdle = $this->idle;
$this->idle = true;

$this->tick($previousIdle);
$this->invokeCallbacks();
}
$this->tick($previousIdle);
$this->invokeCallbacks();
}
return;
} while (true);
});
}

Expand Down
2 changes: 2 additions & 0 deletions src/EventLoop/Internal/DriverSuspension.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ final class DriverSuspension implements Suspension
private bool $deadMain = false;

/**
* @param \Closure(): (?\Closure(): mixed) $run
* @param \WeakMap<object, \WeakReference<DriverSuspension>> $suspensions
*/
public function __construct(
Expand Down Expand Up @@ -145,6 +146,7 @@ public function suspend(): mixed
throw new \Error('Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher):' . $info);
}

\assert($result !== null);
return $result();
}

Expand Down
124 changes: 124 additions & 0 deletions test/EventLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,130 @@

class EventLoopTest extends TestCase
{
public function testSuspensionResumptionWithQueueInGarbageCollection(): void
{
$suspension = EventLoop::getSuspension();

$class = new class ($suspension) {
public function __construct(public Suspension $suspension)
{
}
public function __destruct()
{
$this->suspension->resume(true);
}
};
$cycle = [$class, &$cycle];
unset($class, $cycle);

$ended = $suspension->suspend();

$this->assertTrue($ended);
}

public function testEventLoopResumptionWithQueueInGarbageCollection(): void
{
$suspension = EventLoop::getSuspension();

$class = new class ($suspension) {
public function __construct(public Suspension $suspension)
{
}
public function __destruct()
{
EventLoop::queue($this->suspension->resume(...), true);
}
};
$cycle = [$class, &$cycle];
unset($class, $cycle);

$ended = $suspension->suspend();

$this->assertTrue($ended);
}


public function testSuspensionResumptionWithQueueInGarbageCollectionNested(): void
{
$suspension = EventLoop::getSuspension();

$resumer = new class ($suspension) {
public function __construct(public Suspension $suspension)
{
}
public function __destruct()
{
$this->suspension->resume(true);
}
};

$class = new class ($resumer) {
public static ?object $staticReference = null;
public function __construct(object $resumer)
{
self::$staticReference = $resumer;
}
public function __destruct()
{
EventLoop::queue(function () {
$class = self::$staticReference;
$cycle = [$class, &$cycle];
unset($class, $cycle);

self::$staticReference = null;
});
}
};
$cycle = [$class, &$cycle];
unset($class, $resumer, $cycle);


$ended = $suspension->suspend();

$this->assertTrue($ended);
}

public function testEventLoopResumptionWithQueueInGarbageCollectionNested(): void
{
$suspension = EventLoop::getSuspension();

$resumer = new class ($suspension) {
public function __construct(public Suspension $suspension)
{
}
public function __destruct()
{
EventLoop::queue($this->suspension->resume(...), true);
}
};

$class = new class ($resumer) {
public static ?object $staticReference = null;
public function __construct(object $resumer)
{
self::$staticReference = $resumer;
}
public function __destruct()
{
EventLoop::queue(function () {
$class = self::$staticReference;
$cycle = [$class, &$cycle];
unset($class, $cycle);

self::$staticReference = null;
});
}
};
$cycle = [$class, &$cycle];
unset($class, $resumer, $cycle);


$ended = $suspension->suspend();

$this->assertTrue($ended);
}


public function testDelayWithNegativeDelay(): void
{
$this->expectException(\Error::class);
Expand Down

0 comments on commit 7123e1a

Please sign in to comment.