diff --git a/config/nativephp.php b/config/nativephp.php index b24afec..5c6e7a8 100644 --- a/config/nativephp.php +++ b/config/nativephp.php @@ -114,4 +114,12 @@ ], ], ], + + 'queue_workers' => [ + 'default' => [ + 'queues' => ['default'], + 'memory_limit' => 128, + 'timeout' => 60, + ], + ], ]; diff --git a/src/Contracts/QueueWorker.php b/src/Contracts/QueueWorker.php new file mode 100644 index 0000000..a2c4cf9 --- /dev/null +++ b/src/Contracts/QueueWorker.php @@ -0,0 +1,12 @@ + $queuesToConsume + */ + public function __construct( + public readonly string $alias, + public readonly array $queuesToConsume, + public readonly int $memoryLimit, + public readonly int $timeout, + ) {} + + /** + * @return array + */ + public static function fromConfigArray(array $config): array + { + return array_map( + function (array|string $worker, string $alias) { + return new self( + $alias, + $worker['queues'] ?? ['default'], + $worker['memory_limit'] ?? 128, + $worker['timeout'] ?? 60, + ); + }, + $config, + array_keys($config), + ); + } +} diff --git a/src/Events/ChildProcess/ErrorReceived.php b/src/Events/ChildProcess/ErrorReceived.php index 65db9c6..334e9cc 100644 --- a/src/Events/ChildProcess/ErrorReceived.php +++ b/src/Events/ChildProcess/ErrorReceived.php @@ -3,11 +3,11 @@ namespace Native\Laravel\Events\ChildProcess; use Illuminate\Broadcasting\Channel; -use Illuminate\Contracts\Broadcasting\ShouldBroadcast; +use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; -class ErrorReceived implements ShouldBroadcast +class ErrorReceived implements ShouldBroadcastNow { use Dispatchable, SerializesModels; diff --git a/src/Events/ChildProcess/MessageReceived.php b/src/Events/ChildProcess/MessageReceived.php index 5f7a432..04a51c2 100644 --- a/src/Events/ChildProcess/MessageReceived.php +++ b/src/Events/ChildProcess/MessageReceived.php @@ -3,11 +3,11 @@ namespace Native\Laravel\Events\ChildProcess; use Illuminate\Broadcasting\Channel; -use Illuminate\Contracts\Broadcasting\ShouldBroadcast; +use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; -class MessageReceived implements ShouldBroadcast +class MessageReceived implements ShouldBroadcastNow { use Dispatchable, SerializesModels; diff --git a/src/Events/ChildProcess/ProcessExited.php b/src/Events/ChildProcess/ProcessExited.php index bf570d8..0dcd589 100644 --- a/src/Events/ChildProcess/ProcessExited.php +++ b/src/Events/ChildProcess/ProcessExited.php @@ -3,11 +3,11 @@ namespace Native\Laravel\Events\ChildProcess; use Illuminate\Broadcasting\Channel; -use Illuminate\Contracts\Broadcasting\ShouldBroadcast; +use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; -class ProcessExited implements ShouldBroadcast +class ProcessExited implements ShouldBroadcastNow { use Dispatchable, SerializesModels; diff --git a/src/Events/ChildProcess/ProcessSpawned.php b/src/Events/ChildProcess/ProcessSpawned.php index 91fc917..a49b31b 100644 --- a/src/Events/ChildProcess/ProcessSpawned.php +++ b/src/Events/ChildProcess/ProcessSpawned.php @@ -3,11 +3,11 @@ namespace Native\Laravel\Events\ChildProcess; use Illuminate\Broadcasting\Channel; -use Illuminate\Contracts\Broadcasting\ShouldBroadcast; +use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; -class ProcessSpawned implements ShouldBroadcast +class ProcessSpawned implements ShouldBroadcastNow { use Dispatchable, SerializesModels; diff --git a/src/Facades/QueueWorker.php b/src/Facades/QueueWorker.php new file mode 100644 index 0000000..3b504be --- /dev/null +++ b/src/Facades/QueueWorker.php @@ -0,0 +1,29 @@ +make(QueueWorkerFake::class), function ($fake) { + static::swap($fake); + }); + } + + protected static function getFacadeAccessor(): string + { + self::clearResolvedInstance(QueueWorkerContract::class); + + return QueueWorkerContract::class; + } +} diff --git a/src/Fakes/QueueWorkerFake.php b/src/Fakes/QueueWorkerFake.php new file mode 100644 index 0000000..6482dd9 --- /dev/null +++ b/src/Fakes/QueueWorkerFake.php @@ -0,0 +1,61 @@ + + */ + public array $ups = []; + + /** + * @var array + */ + public array $downs = []; + + public function up(QueueConfig $config): void + { + $this->ups[] = $config; + } + + public function down(string $alias): void + { + $this->downs[] = $alias; + } + + public function assertUp(Closure $callback): void + { + $hit = empty( + array_filter( + $this->ups, + fn (QueueConfig $up) => $callback($up) === true + ) + ) === false; + + PHPUnit::assertTrue($hit); + } + + public function assertDown(string|Closure $alias): void + { + if (is_callable($alias) === false) { + PHPUnit::assertContains($alias, $this->downs); + + return; + } + + $hit = empty( + array_filter( + $this->downs, + fn (string $down) => $alias($down) === true + ) + ) === false; + + PHPUnit::assertTrue($hit); + } +} diff --git a/src/NativeServiceProvider.php b/src/NativeServiceProvider.php index 22e3991..6ca8d62 100644 --- a/src/NativeServiceProvider.php +++ b/src/NativeServiceProvider.php @@ -17,7 +17,9 @@ use Native\Laravel\Contracts\ChildProcess as ChildProcessContract; use Native\Laravel\Contracts\GlobalShortcut as GlobalShortcutContract; use Native\Laravel\Contracts\PowerMonitor as PowerMonitorContract; +use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract; use Native\Laravel\Contracts\WindowManager as WindowManagerContract; +use Native\Laravel\DTOs\QueueConfig; use Native\Laravel\Events\EventWatcher; use Native\Laravel\Exceptions\Handler; use Native\Laravel\GlobalShortcut as GlobalShortcutImplementation; @@ -73,6 +75,10 @@ public function packageRegistered() return $app->make(PowerMonitorImplementation::class); }); + $this->app->bind(QueueWorkerContract::class, function (Foundation $app) { + return $app->make(QueueWorker::class); + }); + if (config('nativephp-internal.running')) { $this->app->singleton( \Illuminate\Contracts\Debug\ExceptionHandler::class, @@ -112,6 +118,11 @@ protected function configureApp() config(['session.driver' => 'file']); config(['queue.default' => 'database']); + + // XXX: This logic may need to change when we ditch the internal web server + if (! $this->app->runningInConsole()) { + $this->fireUpQueueWorkers(); + } } protected function rewriteStoragePath() @@ -210,4 +221,13 @@ protected function configureDisks(): void ]); } } + + protected function fireUpQueueWorkers(): void + { + $queueConfigs = QueueConfig::fromConfigArray(config('nativephp.queue_workers')); + + foreach ($queueConfigs as $queueConfig) { + $this->app->make(QueueWorkerContract::class)->up($queueConfig); + } + } } diff --git a/src/QueueWorker.php b/src/QueueWorker.php new file mode 100644 index 0000000..1eb0c00 --- /dev/null +++ b/src/QueueWorker.php @@ -0,0 +1,47 @@ +has("nativephp.queue_workers.{$config}")) { + $config = QueueConfig::fromConfigArray([ + $config => config("nativephp.queue_workers.{$config}"), + ])[0]; + } + + if (! $config instanceof QueueConfig) { + throw new \InvalidArgumentException("Invalid queue configuration alias [$config]"); + } + + $this->childProcess->php( + [ + '-d', + "memory_limit={$config->memoryLimit}M", + 'artisan', + 'queue:work', + "--name={$config->alias}", + '--queue='.implode(',', $config->queuesToConsume), + "--memory={$config->memoryLimit}", + "--timeout={$config->timeout}", + ], + $config->alias, + persistent: true, + ); + } + + public function down(string $alias): void + { + $this->childProcess->stop($alias); + } +} diff --git a/tests/DTOs/QueueWorkerTest.php b/tests/DTOs/QueueWorkerTest.php new file mode 100644 index 0000000..bc1b764 --- /dev/null +++ b/tests/DTOs/QueueWorkerTest.php @@ -0,0 +1,66 @@ +toBeArray(); + expect($configObject)->toHaveCount(count($config)); + + foreach ($config as $alias => $worker) { + if (is_string($worker)) { + expect( + Arr::first( + array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)) + )->queuesToConsume->toBe(['default'] + ); + + expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->memoryLimit->toBe(128); + expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->timeout->toBe(60); + + continue; + } + + expect( + Arr::first( + array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)) + )->queuesToConsume->toBe($worker['queues'] ?? ['default'] + ); + + expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->memoryLimit->toBe($worker['memory_limit'] ?? 128); + expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->timeout->toBe($worker['timeout'] ?? 60); + } +})->with([ + [ + 'queue_workers' => [ + 'some_worker' => [ + 'queues' => ['default'], + 'memory_limit' => 64, + 'timeout' => 60, + ], + ], + ], + [ + 'queue_workers' => [ + 'some_worker' => [], + 'another_worker' => [], + ], + ], + [ + 'queue_workers' => [ + 'some_worker' => [ + ], + 'another_worker' => [ + 'queues' => ['default', 'another'], + ], + 'yet_another_worker' => [ + 'memory_limit' => 256, + ], + 'one_more_worker' => [ + 'timeout' => 120, + ], + ], + ], +]); diff --git a/tests/Fakes/FakeQueueWorkerTest.php b/tests/Fakes/FakeQueueWorkerTest.php new file mode 100644 index 0000000..4b22f34 --- /dev/null +++ b/tests/Fakes/FakeQueueWorkerTest.php @@ -0,0 +1,69 @@ +toBeInstanceOf(QueueWorkerFake::class); +}); + +it('asserts up using callable', function () { + swap(QueueWorkerContract::class, $fake = app(QueueWorkerFake::class)); + + $fake->up(new QueueConfig('testA', ['default'], 123, 123)); + $fake->up(new QueueConfig('testB', ['default'], 123, 123)); + + $fake->assertUp(fn (QueueConfig $up) => $up->alias === 'testA'); + $fake->assertUp(fn (QueueConfig $up) => $up->alias === 'testB'); + + try { + $fake->assertUp(fn (QueueConfig $up) => $up->alias === 'testC'); + } catch (AssertionFailedError) { + return; + } + + $this->fail('Expected assertion to fail'); +}); + +it('asserts down using string', function () { + swap(QueueWorkerContract::class, $fake = app(QueueWorkerFake::class)); + + $fake->down('testA'); + $fake->down('testB'); + + $fake->assertDown('testA'); + $fake->assertDown('testB'); + + try { + $fake->assertDown('testC'); + } catch (AssertionFailedError) { + return; + } + + $this->fail('Expected assertion to fail'); +}); + +it('asserts down using callable', function () { + swap(QueueWorkerContract::class, $fake = app(QueueWorkerFake::class)); + + $fake->down('testA'); + $fake->down('testB'); + + $fake->assertDown(fn (string $alias) => $alias === 'testA'); + $fake->assertDown(fn (string $alias) => $alias === 'testB'); + + try { + $fake->assertDown(fn (string $alias) => $alias === 'testC'); + } catch (AssertionFailedError) { + return; + } + + $this->fail('Expected assertion to fail'); +}); diff --git a/tests/QueueWorker/QueueWorkerTest.php b/tests/QueueWorker/QueueWorkerTest.php new file mode 100644 index 0000000..a3fbd57 --- /dev/null +++ b/tests/QueueWorker/QueueWorkerTest.php @@ -0,0 +1,39 @@ +toBe([ + '-d', + 'memory_limit=128M', + 'artisan', + 'queue:work', + "--name={$alias}", + '--queue=default', + '--memory=128', + '--timeout=61', + ]); + + expect($alias)->toBe('some_worker'); + expect($env)->toBeNull(); + expect($persistent)->toBeTrue(); + + return true; + }); +}); + +it('hits the child process with relevant alias spin down a queue worker', function () { + ChildProcess::fake(); + + QueueWorker::down('some_worker'); + + ChildProcess::assertStop('some_worker'); +});