Skip to content

Commit

Permalink
Merge pull request #500: register Update handler dynamically
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk authored Sep 10, 2024
2 parents 28538c1 + 16b8b95 commit 17e6150
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 26 deletions.
4 changes: 0 additions & 4 deletions psalm-baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,6 @@
<MissingClosureReturnType>
<code><![CDATA[function (QueryInput $input) use ($fn) {]]></code>
</MissingClosureReturnType>
<MoreSpecificImplementedParamType>
<code><![CDATA[$handler]]></code>
<code><![CDATA[$handler]]></code>
</MoreSpecificImplementedParamType>
<PropertyNotSetInConstructor>
<code><![CDATA[$queryExecutor]]></code>
<code><![CDATA[$updateExecutor]]></code>
Expand Down
15 changes: 13 additions & 2 deletions src/Internal/Declaration/WorkflowInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public function findValidateUpdateHandler(string $name): ?\Closure

/**
* @param string $name
* @param callable(ValuesInterface):mixed $handler
* @param callable $handler
* @throws \ReflectionException
*/
public function addQueryHandler(string $name, callable $handler): void
Expand All @@ -208,7 +208,7 @@ function (QueryInput $input) use ($fn) {

/**
* @param non-empty-string $name
* @param callable(ValuesInterface):mixed $handler
* @param callable $handler
* @throws \ReflectionException
*/
public function addUpdateHandler(string $name, callable $handler): void
Expand All @@ -224,6 +224,17 @@ function (UpdateInput $input, Deferred $deferred) use ($fn) {
)(...);
}

/**
* @param non-empty-string $name
* @param callable $handler
* @throws \ReflectionException
*/
public function addValidateUpdateHandler(string $name, callable $handler): void
{
$fn = $this->createCallableHandler($handler);
$this->validateUpdateHandlers[$name] = fn(UpdateInput $input): mixed => ($this->updateValidator)($input, $fn);
}

/**
* @return string[]
*/
Expand Down
6 changes: 6 additions & 0 deletions src/Internal/Declaration/WorkflowInstanceInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public function addQueryHandler(string $name, callable $handler): void;
*/
public function addUpdateHandler(string $name, callable $handler): void;

/**
* @param non-empty-string $name
* @param callable $handler
*/
public function addValidateUpdateHandler(string $name, callable $handler): void;

/**
* @param non-empty-string $name
* @return \Closure(ValuesInterface): void
Expand Down
11 changes: 11 additions & 0 deletions src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,17 @@ public function registerSignal(string $queryType, callable $handler): WorkflowCo
return $this;
}

/**
* {@inheritDoc}
*/
public function registerUpdate(string $name, callable $handler, ?callable $validator): static
{
$this->getWorkflowInstance()->addUpdateHandler($name, $handler);
$this->getWorkflowInstance()->addValidateUpdateHandler($name, $validator ?? fn() => null);

return $this;
}

/**
* {@inheritDoc}
*/
Expand Down
64 changes: 44 additions & 20 deletions src/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,12 @@ public static function getLastCompletionResult($type = null)
}

/**
* A method that allows you to dynamically register additional query
* handler in a workflow during the execution of a workflow.
* Register a Query handler in the Workflow.
*
* ```php
* #[WorkflowMethod]
* public function handler()
* {
* Workflow::registerQuery('query', function(string $argument) {
* echo sprintf('Executed query "query" with argument "%s"', $argument);
* });
* }
* Workflow::registerQuery('query', function(string $argument) {
* echo sprintf('Executed query "query" with argument "%s"', $argument);
* });
* ```
*
* The same method ({@see WorkflowStubInterface::query()}) should be used
Expand All @@ -359,19 +354,12 @@ public static function registerQuery(string $queryType, callable $handler): Scop
}

/**
* Registers a query with an additional signal handler.
*
* The method is similar to the {@see Workflow::registerQuery()}, but it
* registers an additional signal handler.
* Registers a Signal handler in the Workflow.
*
* ```php
* #[WorkflowMethod]
* public function handler()
* {
* Workflow::registerSignal('signal', function(string $argument) {
* echo sprintf('Executed signal "signal" with argument "%s"', $argument);
* });
* }
* Workflow::registerSignal('signal', function(string $argument) {
* echo sprintf('Executed signal "signal" with argument "%s"', $argument);
* });
* ```
*
* The same method ({@see WorkflowStubInterface::signal()}) should be used
Expand All @@ -387,6 +375,42 @@ public static function registerSignal(string $queryType, callable $handler): Sco
return self::getCurrentContext()->registerSignal($queryType, $handler);
}

/**
* Registers an Update method in the Workflow.
*
* ```php
* Workflow::registerUpdate(
* 'pushTask',
* fn(Task $task) => $this->queue->push($task),
* );
* ```
*
* Register an Update method with a validator:
*
* ```php
* Workflow::registerUpdate(
* 'pushTask',
* fn(Task $task) => $this->queue->push($task),
* fn(Task $task) => $this->isValidTask($task) or throw new \InvalidArgumentException('Invalid task'),
* );
* ```
*
* @param non-empty-string $name
* @param callable $handler Handler function to execute the update.
* @param callable|null $validator Validator function to check the input. It should throw an exception
* if the input is invalid.
* Note that the validator must have the same parameters as the handler.
* @throws OutOfContextException in the absence of the workflow execution context.
* @since 2.11.0
*/
public static function registerUpdate(
string $name,
callable $handler,
?callable $validator = null,
): ScopedContextInterface {
return self::getCurrentContext()->registerUpdate($name, $handler, $validator);
}

/**
* Updates the behavior of an existing workflow to resolve inconsistency errors during replay.
*
Expand Down
9 changes: 9 additions & 0 deletions src/Workflow/WorkflowContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ public function registerQuery(string $queryType, callable $handler): self;
*/
public function registerSignal(string $queryType, callable $handler): self;

/**
* Registers an update method with an optional validator.
*
* @see Workflow::registerUpdate()
*
* @param non-empty-string $name
*/
public function registerUpdate(string $name, callable $handler, ?callable $validator): static;

/**
* Exchanges data between worker and host process.
*
Expand Down
91 changes: 91 additions & 0 deletions tests/Acceptance/Extra/Update/DynamicUpdateTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?php

declare(strict_types=1);

namespace Temporal\Tests\Acceptance\Extra\Update\DynamicUpdate;

use PHPUnit\Framework\Attributes\Test;
use Ramsey\Uuid\Uuid;
use Temporal\Client\Update\LifecycleStage;
use Temporal\Client\Update\UpdateOptions;
use Temporal\Client\WorkflowStubInterface;
use Temporal\Exception\Client\WorkflowUpdateException;
use Temporal\Exception\Failure\ApplicationFailure;
use Temporal\Tests\Acceptance\App\Attribute\Stub;
use Temporal\Tests\Acceptance\App\TestCase;
use Temporal\Workflow;
use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;

class DynamicUpdateTest extends TestCase
{
#[Test]
public function addUpdateMethodWithoutValidation(
#[Stub('Extra_Update_DynamicUpdate')] WorkflowStubInterface $stub,
): void {
$idResult = $stub->update(TestWorkflow::UPDATE_METHOD)->getValue(0);
self::assertNotNull($idResult);

$id = Uuid::uuid4()->toString();
$idResult = $stub->startUpdate(
UpdateOptions::new(TestWorkflow::UPDATE_METHOD, LifecycleStage::StageCompleted)
->withUpdateId($id)
)->getResult();
self::assertSame($id, $idResult);
}

#[Test]
public function addUpdateMethodWithValidation(
#[Stub('Extra_Update_DynamicUpdate')] WorkflowStubInterface $stub,
): void {
// Valid
$result = $stub->update(TestWorkflow::UPDATE_METHOD_WV, 42)->getValue(0);
self::assertSame(42, $result);

// Invalid input
try {
$stub->update(TestWorkflow::UPDATE_METHOD_WV, -42);
} catch (WorkflowUpdateException $e) {
$previous = $e->getPrevious();
self::assertInstanceOf(ApplicationFailure::class, $previous);
self::assertSame('Value must be positive', $previous->getOriginalMessage());
}
}
}


#[WorkflowInterface]
class TestWorkflow
{
public const UPDATE_METHOD = 'update-method';
public const UPDATE_METHOD_WV = 'update-method-with-validation';

private array $result = [];
private bool $exit = false;

#[WorkflowMethod(name: "Extra_Update_DynamicUpdate")]
public function handle()
{
// Register update methods
Workflow::registerUpdate(self::UPDATE_METHOD, function () {
// Also Update context is tested
$id = Workflow::getUpdateContext()->getUpdateId();
return $this->result[self::UPDATE_METHOD] = $id;
});
// Update method with validation
Workflow::registerUpdate(
self::UPDATE_METHOD_WV,
fn(int $value): int => $value,
fn(int $value) => $value > 0 or throw new \InvalidArgumentException('Value must be positive'),
);

yield Workflow::await(fn() => $this->exit);
return $this->result;
}

#[Workflow\SignalMethod]
public function exit(): void
{
$this->exit = true;
}
}

0 comments on commit 17e6150

Please sign in to comment.