Skip to content

Commit

Permalink
[feature][experimental] add MessageTask to schedule messenger messages (
Browse files Browse the repository at this point in the history
  • Loading branch information
kbond authored Nov 2, 2020
1 parent 0675d73 commit 59fbaeb
Show file tree
Hide file tree
Showing 16 changed files with 563 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/composer.lock
/phpunit.xml
/vendor/
/build/
/.php_cs.cache
/.phpunit.result.cache
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ Task Scheduling feature](https://laravel.com/docs/master/scheduling).
1. [CommandTask](doc/define-tasks.md#commandtask)
2. [CallbackTask](doc/define-tasks.md#callbacktask)
3. [ProcessTask](doc/define-tasks.md#processtask)
3. [PingTask](doc/define-tasks.md#pingtask)
4. [CompoundTask](doc/define-tasks.md#compoundtask)
3. [MessageTask](doc/define-tasks.md#messagetask)
4. [PingTask](doc/define-tasks.md#pingtask)
5. [CompoundTask](doc/define-tasks.md#compoundtask)
2. [Task Description](doc/define-tasks.md#task-description)
3. [Frequency](doc/define-tasks.md#frequency)
1. [Cron Expression](doc/define-tasks.md#cron-expression)
Expand Down Expand Up @@ -140,6 +141,12 @@ zenstruck_schedule:
# The default timezone for tasks (override at task level), null for system default
timezone: null # Example: America/New_York

messenger:
enabled: false

# The message bus to use
message_bus: message_bus

mailer:
enabled: false

Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"symfony/console": "^3.4|^4.0|^5.0",
"symfony/dependency-injection": "^3.4|^4.0|^5.0",
"symfony/event-dispatcher": "^3.4|^4.0|^5.0",
"symfony/http-kernel": "^3.4|^4.0|^5.0"
"symfony/http-kernel": "^3.4|^4.0|^5.0",
"symfony/polyfill-php80": "^1.15"
},
"require-dev": {
"lorisleiva/cron-translator": "^0.1.0",
Expand All @@ -26,6 +27,7 @@
"symfony/http-client": "^4.3|^5.0",
"symfony/lock": "^4.4|^5.0",
"symfony/mailer": "^4.4|^5.0",
"symfony/messenger": "^4.4|^5.0",
"symfony/phpunit-bridge": "^5.0",
"symfony/process": "^4.2|^5.0"
},
Expand Down
40 changes: 40 additions & 0 deletions doc/define-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,46 @@ zenstruck_schedule:
frequency: '0 * * * *'
```

### MessageTask

*This task type is experimental and may experience BC breaks.*

This task sends a message to a Symfony Messenger message bus.

**Define in [PHP](define-schedule.md#schedulebuilder-service):**

```php
use Symfony\Component\Messenger\Stamp\DelayStamp;
/* @var \Zenstruck\ScheduleBundle\Schedule $schedule */
$schedule->addMessage(new MyMessage('argument'));
// with stamps
$schedule->addMessage(new MyMessage('argument'), [new DelayStamp(10)]);
```

**Notes**:

1. This task type requires `symfony/messenger`:

```console
$ composer require symfony/messenger
```

2. You must enable this task type:

```yaml
# config/packages/zenstruck_schedule.yaml
zenstruck_schedule:
messenger: ~
# optionally configure the message bus (uses "message_bus" by default)
messenger:
message_bus: my_bus
```

### PingTask

This task pings the provided URL. This task type is useful for Cron health monitoring
Expand Down
2 changes: 2 additions & 0 deletions doc/extending.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ it handles.
As an example, let's create a Task that sends a *Message* to your *MessageBus* (`symfony/messenger`
required).

**NOTE**: There is now a [MessageTask in core](define-tasks.md#messagetask).

First, let's create the task:

```php
Expand Down
10 changes: 10 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public function getConfigTreeBuilder(): TreeBuilder
->thenInvalid('Timezone %s is not available')
->end()
->end()
->arrayNode('messenger')
->canBeEnabled()
->children()
->scalarNode('message_bus')
->defaultValue('message_bus')
->cannotBeEmpty()
->info('The message bus to use')
->end()
->end()
->end()
->arrayNode('mailer')
->canBeEnabled()
->children()
Expand Down
10 changes: 10 additions & 0 deletions src/DependencyInjection/ZenstruckScheduleExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use Zenstruck\ScheduleBundle\Schedule\Extension\SingleServerExtension;
use Zenstruck\ScheduleBundle\Schedule\ScheduleBuilder;
use Zenstruck\ScheduleBundle\Schedule\SelfSchedulingCommand;
use Zenstruck\ScheduleBundle\Schedule\Task\Runner\MessageTaskRunner;
use Zenstruck\ScheduleBundle\Schedule\Task\Runner\PingTaskRunner;
use Zenstruck\ScheduleBundle\Schedule\Task\TaskRunner;

Expand Down Expand Up @@ -106,6 +107,15 @@ protected function loadInternal(array $mergedConfig, ContainerBuilder $container
;
}

if ($mergedConfig['messenger']['enabled']) {
$loader->load('messenger.xml');

$container
->getDefinition(MessageTaskRunner::class)
->setArgument(0, new Reference($mergedConfig['messenger']['message_bus']))
;
}

if ($mergedConfig['mailer']['enabled']) {
$loader->load('mailer.xml');

Expand Down
12 changes: 12 additions & 0 deletions src/Resources/config/messenger.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd">

<services>
<service id="Zenstruck\ScheduleBundle\Schedule\Task\Runner\MessageTaskRunner">
<tag name="schedule.task_runner" />
</service>
</services>
</container>
9 changes: 9 additions & 0 deletions src/Schedule.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Zenstruck\ScheduleBundle\Schedule\Task\CallbackTask;
use Zenstruck\ScheduleBundle\Schedule\Task\CommandTask;
use Zenstruck\ScheduleBundle\Schedule\Task\CompoundTask;
use Zenstruck\ScheduleBundle\Schedule\Task\MessageTask;
use Zenstruck\ScheduleBundle\Schedule\Task\PingTask;
use Zenstruck\ScheduleBundle\Schedule\Task\ProcessTask;

Expand Down Expand Up @@ -86,6 +87,14 @@ public function addPing(string $url, string $method = 'GET', array $options = []
return $this->add(new PingTask($url, $method, $options));
}

/**
* @see MessageTask::__construct()
*/
public function addMessage(object $message, array $stamps = []): MessageTask
{
return $this->add(new MessageTask($message, $stamps));
}

public function addCompound(): CompoundTask
{
return $this->add(new CompoundTask());
Expand Down
10 changes: 10 additions & 0 deletions src/Schedule/Task/CompoundTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public function addPing(string $url, string $method = 'GET', array $options = []
return $this->addWithDescription(new PingTask($url, $method, $options), $description);
}

/**
* @see MessageTask::__construct()
*
* @param string|null $description optional description
*/
public function addMessage(object $message, array $stamps = [], ?string $description = null): self
{
return $this->addWithDescription(new MessageTask($message, $stamps), $description);
}

/**
* @return Task[]
*/
Expand Down
77 changes: 77 additions & 0 deletions src/Schedule/Task/MessageTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

namespace Zenstruck\ScheduleBundle\Schedule\Task;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Zenstruck\ScheduleBundle\Schedule\HasMissingDependencyMessage;
use Zenstruck\ScheduleBundle\Schedule\Task;

/**
* @experimental This is experimental and may experience BC breaks
*
* @author Kevin Bond <[email protected]>
*/
final class MessageTask extends Task implements HasMissingDependencyMessage
{
private $message;
private $stamps;

/**
* @param object|Envelope $message
* @param StampInterface[] $stamps
*/
public function __construct(object $message, array $stamps = [])
{
$this->message = $message;
$this->stamps = $stamps;

parent::__construct($this->messageClass());
}

/**
* @return object|Envelope
*/
public function getMessage(): object
{
return $this->message;
}

/**
* @return StampInterface[]
*/
public function getStamps(): array
{
return $this->stamps;
}

public function getContext(): array
{
$stamps = \array_merge(
$this->message instanceof Envelope ? \array_keys($this->message->all()) : [],
\array_map(static function(StampInterface $stamp) { return \get_class($stamp); }, $this->stamps)
);
$stamps = \array_map(
static function($stamp) {
return (new \ReflectionClass($stamp))->getShortName();
},
$stamps
);
$stamps = \implode(', ', \array_unique($stamps));

return [
'Message' => $this->messageClass(),
'Stamps' => $stamps ?: '(none)',
];
}

public static function getMissingDependencyMessage(): string
{
return 'To use the message task you must install symfony/messenger (composer require symfony/messenger) and enable (config path: "zenstruck_schedule.messenger").';
}

private function messageClass(): string
{
return $this->message instanceof Envelope ? \get_class($this->message->getMessage()) : \get_class($this->message);
}
}
79 changes: 79 additions & 0 deletions src/Schedule/Task/Runner/MessageTaskRunner.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

namespace Zenstruck\ScheduleBundle\Schedule\Task\Runner;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Zenstruck\ScheduleBundle\Schedule\Task;
use Zenstruck\ScheduleBundle\Schedule\Task\MessageTask;
use Zenstruck\ScheduleBundle\Schedule\Task\Result;
use Zenstruck\ScheduleBundle\Schedule\Task\TaskRunner;

/**
* @experimental This is experimental and may experience BC breaks
*
* @author Kevin Bond <[email protected]>
*/
final class MessageTaskRunner implements TaskRunner
{
private $bus;

public function __construct(MessageBusInterface $bus)
{
$this->bus = $bus;
}

/**
* @param MessageTask|Task $task
*/
public function __invoke(Task $task): Result
{
$envelope = $this->bus->dispatch($task->getMessage(), $task->getStamps());
$output = $this->handlerOutput($envelope);

if (empty($output)) {
return Result::failure($task, 'Message not handled or sent to transport.');
}

return Result::successful($task, \implode("\n", $output));
}

public function supports(Task $task): bool
{
return $task instanceof MessageTask;
}

private function handlerOutput(Envelope $envelope): array
{
$output = [];

foreach ($envelope->all(HandledStamp::class) as $stamp) {
/* @var HandledStamp $stamp */
$output[] = \sprintf('Handled by: "%s", return: %s', $stamp->getHandlerName(), $this->handledStampReturn($stamp));
}

foreach ($envelope->all(SentStamp::class) as $stamp) {
/* @var SentStamp $stamp */
$output[] = \sprintf('Sent to: "%s"', $stamp->getSenderClass());
}

return $output;
}

private function handledStampReturn(HandledStamp $stamp): string
{
$result = $stamp->getResult();

switch (true) {
case null === $result:
return '(none)';

case \is_scalar($result):
return \sprintf('(%s) "%s"', get_debug_type($result), $result);
}

return \sprintf('(%s)', get_debug_type($result));
}
}
23 changes: 23 additions & 0 deletions tests/DependencyInjection/ZenstruckScheduleExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Zenstruck\ScheduleBundle\Schedule\ScheduleRunner;
use Zenstruck\ScheduleBundle\Schedule\Task\Runner\CallbackTaskRunner;
use Zenstruck\ScheduleBundle\Schedule\Task\Runner\CommandTaskRunner;
use Zenstruck\ScheduleBundle\Schedule\Task\Runner\MessageTaskRunner;
use Zenstruck\ScheduleBundle\Schedule\Task\Runner\PingTaskRunner;
use Zenstruck\ScheduleBundle\Schedule\Task\Runner\ProcessTaskRunner;

Expand Down Expand Up @@ -150,6 +151,28 @@ public function can_configure_http_client()
$this->assertContainerBuilderHasServiceDefinitionWithTag(PingTaskRunner::class, 'schedule.task_runner');
}

/**
* @test
*/
public function can_enable_messenger_with_default_bus(): void
{
$this->load(['messenger' => null]);

$this->assertContainerBuilderHasServiceDefinitionWithArgument(MessageTaskRunner::class, 0, 'message_bus');
$this->assertContainerBuilderHasServiceDefinitionWithTag(MessageTaskRunner::class, 'schedule.task_runner');
}

/**
* @test
*/
public function can_enable_messenger_with_custom_bus(): void
{
$this->load(['messenger' => ['message_bus' => 'my_bus']]);

$this->assertContainerBuilderHasServiceDefinitionWithArgument(MessageTaskRunner::class, 0, 'my_bus');
$this->assertContainerBuilderHasServiceDefinitionWithTag(MessageTaskRunner::class, 'schedule.task_runner');
}

/**
* @test
*/
Expand Down
Loading

0 comments on commit 59fbaeb

Please sign in to comment.