Skip to content

Commit

Permalink
Child process queue workers (#450)
Browse files Browse the repository at this point in the history
* Child process queue workers
- Remove `ShouldBroadcast` in favor of `ShouldBroadcastNow` in `Events\\ChildProcess` namespace
- Implement `QueueWorker::class`
- Add new binding to `NativeServiceProvider::class`
- Fire up workers: iterate through queue worker config in `NativeServiceProvider::configureApp()`

- Test `QueueWorker::up()`, `QueueWorker::down()`
- Test `QueueWorkerFake::class` assertions work as expected

* Prevent attempting to boot workers on CLI calls

* Remove creating workers just by string

Rely on keys of the config array to assert uniqueness of worker aliases

* Allow workers to be instantiated directly by alias

* Fix styling

* Fix tests

---------

Co-authored-by: Simon Hamp <[email protected]>
Co-authored-by: simonhamp <[email protected]>
  • Loading branch information
3 people authored Dec 29, 2024
1 parent 8267087 commit 101ddf0
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 8 deletions.
8 changes: 8 additions & 0 deletions config/nativephp.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,12 @@
],
],
],

'queue_workers' => [
'default' => [
'queues' => ['default'],
'memory_limit' => 128,
'timeout' => 60,
],
],
];
12 changes: 12 additions & 0 deletions src/Contracts/QueueWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace Native\Laravel\Contracts;

use Native\Laravel\DTOs\QueueConfig;

interface QueueWorker
{
public function up(QueueConfig $config): void;

public function down(string $alias): void;
}
35 changes: 35 additions & 0 deletions src/DTOs/QueueConfig.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Native\Laravel\DTOs;

class QueueConfig
{
/**
* @param array<int, string> $queuesToConsume
*/
public function __construct(
public readonly string $alias,
public readonly array $queuesToConsume,
public readonly int $memoryLimit,
public readonly int $timeout,
) {}

/**
* @return array<int, self>
*/
public static function fromConfigArray(array $config): array
{
return array_map(
function (array|string $worker, string $alias) {
return new self(
$alias,
$worker['queues'] ?? ['default'],
$worker['memory_limit'] ?? 128,
$worker['timeout'] ?? 60,
);
},
$config,
array_keys($config),
);
}
}
4 changes: 2 additions & 2 deletions src/Events/ChildProcess/ErrorReceived.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Native\Laravel\Events\ChildProcess;

use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class ErrorReceived implements ShouldBroadcast
class ErrorReceived implements ShouldBroadcastNow
{
use Dispatchable, SerializesModels;

Expand Down
4 changes: 2 additions & 2 deletions src/Events/ChildProcess/MessageReceived.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Native\Laravel\Events\ChildProcess;

use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class MessageReceived implements ShouldBroadcast
class MessageReceived implements ShouldBroadcastNow
{
use Dispatchable, SerializesModels;

Expand Down
4 changes: 2 additions & 2 deletions src/Events/ChildProcess/ProcessExited.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Native\Laravel\Events\ChildProcess;

use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class ProcessExited implements ShouldBroadcast
class ProcessExited implements ShouldBroadcastNow
{
use Dispatchable, SerializesModels;

Expand Down
4 changes: 2 additions & 2 deletions src/Events/ChildProcess/ProcessSpawned.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Native\Laravel\Events\ChildProcess;

use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class ProcessSpawned implements ShouldBroadcast
class ProcessSpawned implements ShouldBroadcastNow
{
use Dispatchable, SerializesModels;

Expand Down
29 changes: 29 additions & 0 deletions src/Facades/QueueWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace Native\Laravel\Facades;

use Illuminate\Support\Facades\Facade;
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
use Native\Laravel\DTOs\QueueConfig;
use Native\Laravel\Fakes\QueueWorkerFake;

/**
* @method static void up(QueueConfig $config)
* @method static void down(string $alias)
*/
class QueueWorker extends Facade
{
public static function fake()
{
return tap(static::getFacadeApplication()->make(QueueWorkerFake::class), function ($fake) {
static::swap($fake);
});
}

protected static function getFacadeAccessor(): string
{
self::clearResolvedInstance(QueueWorkerContract::class);

return QueueWorkerContract::class;
}
}
61 changes: 61 additions & 0 deletions src/Fakes/QueueWorkerFake.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

namespace Native\Laravel\Fakes;

use Closure;
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
use Native\Laravel\DTOs\QueueConfig;
use PHPUnit\Framework\Assert as PHPUnit;

class QueueWorkerFake implements QueueWorkerContract
{
/**
* @var array<int, QueueConfig>
*/
public array $ups = [];

/**
* @var array<int, string>
*/
public array $downs = [];

public function up(QueueConfig $config): void
{
$this->ups[] = $config;
}

public function down(string $alias): void
{
$this->downs[] = $alias;
}

public function assertUp(Closure $callback): void
{
$hit = empty(
array_filter(
$this->ups,
fn (QueueConfig $up) => $callback($up) === true
)
) === false;

PHPUnit::assertTrue($hit);
}

public function assertDown(string|Closure $alias): void
{
if (is_callable($alias) === false) {
PHPUnit::assertContains($alias, $this->downs);

return;
}

$hit = empty(
array_filter(
$this->downs,
fn (string $down) => $alias($down) === true
)
) === false;

PHPUnit::assertTrue($hit);
}
}
20 changes: 20 additions & 0 deletions src/NativeServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use Native\Laravel\Contracts\ChildProcess as ChildProcessContract;
use Native\Laravel\Contracts\GlobalShortcut as GlobalShortcutContract;
use Native\Laravel\Contracts\PowerMonitor as PowerMonitorContract;
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
use Native\Laravel\Contracts\WindowManager as WindowManagerContract;
use Native\Laravel\DTOs\QueueConfig;
use Native\Laravel\Events\EventWatcher;
use Native\Laravel\Exceptions\Handler;
use Native\Laravel\GlobalShortcut as GlobalShortcutImplementation;
Expand Down Expand Up @@ -73,6 +75,10 @@ public function packageRegistered()
return $app->make(PowerMonitorImplementation::class);
});

$this->app->bind(QueueWorkerContract::class, function (Foundation $app) {
return $app->make(QueueWorker::class);
});

if (config('nativephp-internal.running')) {
$this->app->singleton(
\Illuminate\Contracts\Debug\ExceptionHandler::class,
Expand Down Expand Up @@ -112,6 +118,11 @@ protected function configureApp()

config(['session.driver' => 'file']);
config(['queue.default' => 'database']);

// XXX: This logic may need to change when we ditch the internal web server
if (! $this->app->runningInConsole()) {
$this->fireUpQueueWorkers();
}
}

protected function rewriteStoragePath()
Expand Down Expand Up @@ -210,4 +221,13 @@ protected function configureDisks(): void
]);
}
}

protected function fireUpQueueWorkers(): void
{
$queueConfigs = QueueConfig::fromConfigArray(config('nativephp.queue_workers'));

foreach ($queueConfigs as $queueConfig) {
$this->app->make(QueueWorkerContract::class)->up($queueConfig);
}
}
}
47 changes: 47 additions & 0 deletions src/QueueWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Native\Laravel;

use Native\Laravel\Contracts\ChildProcess as ChildProcessContract;
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
use Native\Laravel\DTOs\QueueConfig;

class QueueWorker implements QueueWorkerContract
{
public function __construct(
private readonly ChildProcessContract $childProcess,
) {}

public function up(string|QueueConfig $config): void
{
if (is_string($config) && config()->has("nativephp.queue_workers.{$config}")) {
$config = QueueConfig::fromConfigArray([
$config => config("nativephp.queue_workers.{$config}"),
])[0];
}

if (! $config instanceof QueueConfig) {
throw new \InvalidArgumentException("Invalid queue configuration alias [$config]");
}

$this->childProcess->php(
[
'-d',
"memory_limit={$config->memoryLimit}M",
'artisan',
'queue:work',
"--name={$config->alias}",
'--queue='.implode(',', $config->queuesToConsume),
"--memory={$config->memoryLimit}",
"--timeout={$config->timeout}",
],
$config->alias,
persistent: true,
);
}

public function down(string $alias): void
{
$this->childProcess->stop($alias);
}
}
66 changes: 66 additions & 0 deletions tests/DTOs/QueueWorkerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

use Illuminate\Support\Arr;
use Native\Laravel\DTOs\QueueConfig;

test('the factory method generates an array of config objects for several formats', function (array $config) {
$configObject = QueueConfig::fromConfigArray($config);

expect($configObject)->toBeArray();
expect($configObject)->toHaveCount(count($config));

foreach ($config as $alias => $worker) {
if (is_string($worker)) {
expect(
Arr::first(
array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker))
)->queuesToConsume->toBe(['default']
);

expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->memoryLimit->toBe(128);
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->timeout->toBe(60);

continue;
}

expect(
Arr::first(
array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias))
)->queuesToConsume->toBe($worker['queues'] ?? ['default']
);

expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->memoryLimit->toBe($worker['memory_limit'] ?? 128);
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->timeout->toBe($worker['timeout'] ?? 60);
}
})->with([
[
'queue_workers' => [
'some_worker' => [
'queues' => ['default'],
'memory_limit' => 64,
'timeout' => 60,
],
],
],
[
'queue_workers' => [
'some_worker' => [],
'another_worker' => [],
],
],
[
'queue_workers' => [
'some_worker' => [
],
'another_worker' => [
'queues' => ['default', 'another'],
],
'yet_another_worker' => [
'memory_limit' => 256,
],
'one_more_worker' => [
'timeout' => 120,
],
],
],
]);
Loading

0 comments on commit 101ddf0

Please sign in to comment.