Skip to content

Commit

Permalink
Replace queue factory with queue providers (#222)
Browse files Browse the repository at this point in the history
* Remove `@internal` from `QueueInterface`

* queue provider

* improve

* `QueueProviderInterfaceProxy`

* remove exceptions

* config

* Remove factory

* fix tests

* test `PrototypeQueueProvider`

* test `FactoryQueueProvider`

* test `CompositeQueueProvider`

* Apply fixes from StyleCI

* Improve `QueueFactoryQueueProvider`

* Add `AdapterFactoryQueueProvider`

* improve tests

* Rename "channel-definitions" to "channels"

* stubs phpdoc

* phpdoc

* Apply fixes from StyleCI

* readme

* fix cs

* Update src/Command/ListenAllCommand.php

Co-authored-by: Viktor Babanov <[email protected]>

* Move `DEFAULT_CHANNEL_NAME` to `QueueInterface`

* Apply fixes from StyleCI

* Extract stubs to separate namespace

* Fix `Queue`

* fix exception

* fix

* Remove QueueFactoryQueueProvider

* Fix docs

* Fix config

* Bugfixes

* Apply fixes from StyleCI

---------

Co-authored-by: StyleCI Bot <[email protected]>
Co-authored-by: Viktor Babanov <[email protected]>
  • Loading branch information
3 people authored Dec 7, 2024
1 parent af0fb00 commit 3dd8a7b
Show file tree
Hide file tree
Showing 42 changed files with 838 additions and 654 deletions.
40 changes: 24 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ The package could be installed with [Composer](https://getcomposer.org):
composer require yiisoft/queue
```

## Ready for yiisoft/config
## Ready for Yii Config

If you are using [yiisoft/config](https://github.com/yiisoft/config), you'll find out this package has some defaults
in the [`common`](config/di.php) and [`params`](config/params.php) configurations saving your time. Things you should
change to start working with the queue:

- Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation.
- And/or define channel-specific `AdapterInterface` implementations in the `channel-definitions` params key to be used
with the [queue factory](#different-queue-channels).
- And/or define channel-specific `AdapterInterface` implementations in the `channel` params key to be used
with the [queue provider](#different-queue-channels).
- Define [message handlers](docs/guide/worker.md#handler-format) in the `handlers` params key to be used with the `QueueWorker`.
- Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher).

Expand Down Expand Up @@ -159,15 +159,23 @@ $worker = new \Yiisoft\Queue\Worker\Worker(

## Different queue channels

Often we need to push to different queue channels with an only application. There is the `QueueFactory` class to make
different `Queue` objects creation for different channels. With this factory channel-specific `Queue` creation is as
simple as
Often we need to push to different queue channels with an only application. There is the `QueueProviderInterface`
interface that provides different `Queue` objects creation for different channels. With implementation of this interface
channel-specific `Queue` creation is as simple as

```php
$queue = $factory->get('channel-name');
$queue = $provider->get('channel-name');
```

The main usage strategy is with explicit definition of channel-specific adapters. Definitions are passed in
Out of the box, there are four implementations of the `QueueProviderInterface`:

- `AdapterFactoryQueueProvider`
- `PrototypeQueueProvider`
- `CompositeQueueProvider`

### `AdapterFactoryQueueProvider`

Provider based on definition of channel-specific adapters. Definitions are passed in
the `$definitions` constructor parameter of the factory, where keys are channel names and values are definitions
for the [`Yiisoft\Factory\Factory`](https://github.com/yiisoft/factory). Below are some examples:

Expand All @@ -186,19 +194,19 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter;

For more information about a definition formats available see the [factory](https://github.com/yiisoft/factory) documentation.

Another queue factory usage strategy is implicit adapter creation via `withChannel()` method call. To use this approach
you should pass some specific constructor parameters:
### `PrototypeQueueProvider`

- `true` to the `$enableRuntimeChannelDefinition`
- a default `AdapterInterface` implementation to the `$defaultAdapter`.

In this case `$factory->get('channel-name')` call will be converted
to `$this->queue->withAdapter($this->defaultAdapter->withChannel($channel))`, when there is no explicit adapter definition
in the `$definitions`.
Queue provider that only changes the channel name of the base queue. It can be useful when your queues used the same
adapter.

> Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes
> in channel names.
### `CompositeQueueProvider`

This provider allows you to combine multiple providers into one. It will try to get a queue from each provider in the
order they are passed to the constructor. The first queue found will be returned.

## Console execution

The exact way of task execution depends on the adapter used. Most adapters can be run using
Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"psr/log": "^2.0|^3.0",
"symfony/console": "^5.4|^6.0",
"yiisoft/definitions": "^1.0|^2.0|^3.0",
"yiisoft/factory": "dev-strict",
"yiisoft/friendly-exception": "^1.0",
"yiisoft/injector": "^1.0"
},
Expand All @@ -60,7 +61,8 @@
},
"autoload": {
"psr-4": {
"Yiisoft\\Queue\\": "src"
"Yiisoft\\Queue\\": "src",
"Yiisoft\\Queue\\Stubs\\": "stubs"
}
},
"autoload-dev": {
Expand Down
18 changes: 10 additions & 8 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPushInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\Worker as QueueWorker;
use Yiisoft\Queue\Worker\WorkerInterface;

/* @var array $params */

return [
AdapterFactoryQueueProvider::class => [
'__construct()' => [
'definitions' => $params['yiisoft/queue']['channels'],
],
],
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,
QueueWorker::class => [
'class' => QueueWorker::class,
'__construct()' => [$params['yiisoft/queue']['handlers']],
Expand All @@ -39,10 +45,6 @@
? $container->get(SignalLoop::class)
: $container->get(SimpleLoop::class);
},
QueueFactoryInterface::class => QueueFactory::class,
QueueFactory::class => [
'__construct()' => ['channelConfiguration' => $params['yiisoft/queue']['channel-definitions']],
],
QueueInterface::class => Queue::class,
MiddlewareFactoryPushInterface::class => MiddlewareFactoryPush::class,
MiddlewareFactoryConsumeInterface::class => MiddlewareFactoryConsume::class,
Expand All @@ -59,12 +61,12 @@
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
'channels' => array_keys($params['yiisoft/queue']['channels']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
'channels' => array_keys($params['yiisoft/queue']['channels']),
],
],
];
11 changes: 6 additions & 5 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
use Yiisoft\Queue\Command\ListenCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Debug\QueueCollector;
use Yiisoft\Queue\Debug\QueueFactoryInterfaceProxy;
use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy;
use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;

return [
Expand All @@ -22,8 +23,8 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channel-definitions' => [
QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
'channels' => [
QueueInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand All @@ -34,7 +35,7 @@
QueueCollector::class,
],
'trackedServices' => [
QueueFactoryInterface::class => [QueueFactoryInterfaceProxy::class, QueueCollector::class],
QueueProviderInterface::class => [QueueProviderInterfaceProxy::class, QueueCollector::class],
WorkerInterface::class => [QueueWorkerInterfaceProxy::class, QueueCollector::class],
],
],
Expand Down
3 changes: 1 addition & 2 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use InvalidArgumentException;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;
Expand All @@ -20,7 +19,7 @@ final class SynchronousAdapter implements AdapterInterface
public function __construct(
private WorkerInterface $worker,
private QueueInterface $queue,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
private string $channel = QueueInterface::DEFAULT_CHANNEL_NAME,
) {
}

Expand Down
17 changes: 10 additions & 7 deletions src/Command/ListenAllCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class ListenAllCommand extends Command
{
Expand All @@ -20,8 +20,11 @@ final class ListenAllCommand extends Command
'Listens all configured queues by default in case you\'re using yiisoft/config. ' .
'Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels,
) {
parent::__construct();
}

Expand All @@ -45,7 +48,7 @@ public function configure(): void
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. ' .
'Default is 0 (no limits).',
'Default is 0 (no limits).',
0,
);

Expand All @@ -57,17 +60,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueFactory->get($channel);
$queues[] = $this->queueProvider->get($channel);
}

while ($this->loop->canContinue()) {
$hasMessages = false;
foreach ($queues as $queue) {
$hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages;
$hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;
}

if (!$hasMessages) {
$pauseSeconds = (int)$input->getOption('pause');
$pauseSeconds = (int) $input->getOption('pause');
if ($pauseSeconds < 0) {
$pauseSeconds = 1;
}
Expand Down
13 changes: 7 additions & 6 deletions src/Command/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;

final class ListenCommand extends Command
{
protected static $defaultName = 'queue:listen';
protected static $defaultDescription = 'Listens the queue and executes messages as they come. Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider
) {
parent::__construct();
}

Expand All @@ -27,13 +28,13 @@ public function configure(): void
'channel',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueFactory::DEFAULT_CHANNEL_NAME
QueueInterface::DEFAULT_CHANNEL_NAME,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueFactory
$this->queueProvider
->get($input->getArgument('channel'))
->listen();

Expand Down
10 changes: 6 additions & 4 deletions src/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class RunCommand extends Command
{
protected static $defaultName = 'queue:run';
protected static $defaultDescription = 'Runs all the existing messages in the given queues. ' .
'Exits once messages are over.';

public function __construct(private QueueFactoryInterface $queueFactory, private array $channels)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly array $channels,
) {
parent::__construct();
}

Expand All @@ -45,7 +47,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");
$count = $this->queueFactory
$count = $this->queueProvider
->get($channel)
->run((int)$input->getOption('maximum'));

Expand Down
24 changes: 0 additions & 24 deletions src/Debug/QueueFactoryInterfaceProxy.php

This file was deleted.

28 changes: 28 additions & 0 deletions src/Debug/QueueProviderInterfaceProxy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Debug;

use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;

final class QueueProviderInterfaceProxy implements QueueProviderInterface
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly QueueCollector $collector,
) {
}

public function get(string $channel): QueueInterface
{
$queue = $this->queueProvider->get($channel);
return new QueueDecorator($queue, $this->collector);
}

public function has(string $channel): bool
{
return $this->queueProvider->has($channel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

use RuntimeException;
use Yiisoft\FriendlyException\FriendlyExceptionInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueFactory;

class AdapterNotConfiguredException extends RuntimeException implements FriendlyExceptionInterface
{
Expand All @@ -21,7 +21,7 @@ public function getName(): string
public function getSolution(): ?string
{
$queueClass = Queue::class;
$factoryClass = QueueFactory::class;
$queueProviderInterface = QueueProviderInterface::class;

return <<<SOLUTION
Adapter property must be set in the Queue object before you can use it.
Expand All @@ -32,7 +32,7 @@ public function getSolution(): ?string
References:
- $queueClass::\$adapter
- $queueClass::withAdapter()
- $factoryClass::get()
- $queueProviderInterface
SOLUTION;
}
}
Loading

0 comments on commit 3dd8a7b

Please sign in to comment.