From 455810f0ac8be6f771ecdc6ec26bc53ce05498da Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 5 Jul 2024 20:08:28 +0400 Subject: [PATCH 1/5] Update branch alias --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index b06fa747..2a58c8f3 100644 --- a/composer.json +++ b/composer.json @@ -91,7 +91,7 @@ }, "extra": { "branch-alias": { - "dev-master": "2.9.x-dev" + "dev-master": "2.11.x-dev" } }, "config": { From 2373309778c5aea1c2c7ec2c7f29439a18c0325a Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 5 Jul 2024 20:14:08 +0400 Subject: [PATCH 2/5] Provide Update Method resolver into coroutine to be able to resolve or reject it looking at the exception retryability --- src/Internal/Declaration/WorkflowInstance.php | 14 +++++---- .../Declaration/WorkflowInstanceInterface.php | 3 +- .../Transport/Router/InvokeUpdate.php | 20 +++++-------- src/Internal/Workflow/Process/Process.php | 7 +++-- src/Internal/Workflow/Process/Scope.php | 29 ++++++++++++++++--- 5 files changed, 48 insertions(+), 25 deletions(-) diff --git a/src/Internal/Declaration/WorkflowInstance.php b/src/Internal/Declaration/WorkflowInstance.php index 5cc99271..eab5a122 100644 --- a/src/Internal/Declaration/WorkflowInstance.php +++ b/src/Internal/Declaration/WorkflowInstance.php @@ -11,6 +11,7 @@ namespace Temporal\Internal\Declaration; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use Temporal\DataConverter\ValuesInterface; use Temporal\Interceptor\WorkflowInbound\QueryInput; @@ -23,10 +24,10 @@ /** * @psalm-import-type DispatchableHandler from InstanceInterface * @psalm-type QueryHandler = \Closure(QueryInput): mixed - * @psalm-type UpdateHandler = \Closure(UpdateInput): PromiseInterface + * @psalm-type UpdateHandler = \Closure(UpdateInput, Deferred): PromiseInterface * @psalm-type ValidateUpdateHandler = \Closure(UpdateInput): void * @psalm-type QueryExecutor = \Closure(QueryInput, callable(ValuesInterface): mixed): mixed - * @psalm-type UpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed): PromiseInterface + * @psalm-type UpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed, Deferred): PromiseInterface * @psalm-type ValidateUpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed): mixed * @psalm-type UpdateValidator = \Closure(UpdateInput, UpdateHandler): void */ @@ -85,7 +86,8 @@ public function __construct( $updateValidators = $prototype->getValidateUpdateHandlers(); foreach ($prototype->getUpdateHandlers() as $method => $reflection) { $fn = $this->createHandler($reflection); - $this->updateHandlers[$method] = fn(UpdateInput $input): mixed => ($this->updateExecutor)($input, $fn); + $this->updateHandlers[$method] = fn(UpdateInput $input, Deferred $deferred): mixed => + ($this->updateExecutor)($input, $fn, $deferred); // Register validate update handlers $this->validateUpdateHandlers[$method] = \array_key_exists($method, $updateValidators) ? fn(UpdateInput $input): mixed => ($this->updateValidator)( @@ -168,7 +170,7 @@ public function findQueryHandler(string $name): ?\Closure /** * @param non-empty-string $name * - * @return null|\Closure(UpdateInput): PromiseInterface + * @return null|\Closure(UpdateInput, Deferred): PromiseInterface * @psalm-return UpdateHandler|null */ public function findUpdateHandler(string $name): ?\Closure @@ -215,8 +217,8 @@ public function addUpdateHandler(string $name, callable $handler): void $fn = $this->createCallableHandler($handler); $this->updateHandlers[$name] = $this->pipeline->with( - function (UpdateInput $input) use ($fn) { - return ($this->updateExecutor)($input, $fn); + function (UpdateInput $input, Deferred $deferred) use ($fn) { + return ($this->updateExecutor)($input, $fn, $deferred); }, /** @see WorkflowInboundCallsInterceptor::handleUpdate() */ 'handleUpdate', diff --git a/src/Internal/Declaration/WorkflowInstanceInterface.php b/src/Internal/Declaration/WorkflowInstanceInterface.php index 1c9862f8..9422087e 100644 --- a/src/Internal/Declaration/WorkflowInstanceInterface.php +++ b/src/Internal/Declaration/WorkflowInstanceInterface.php @@ -11,6 +11,7 @@ namespace Temporal\Internal\Declaration; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use Temporal\DataConverter\ValuesInterface; use Temporal\Interceptor\WorkflowInbound\QueryInput; @@ -49,7 +50,7 @@ public function getSignalHandler(string $name): \Closure; /** * @param non-empty-string $name - * @return null|\Closure(UpdateInput): PromiseInterface + * @return null|\Closure(UpdateInput, Deferred): PromiseInterface */ public function findUpdateHandler(string $name): ?\Closure; diff --git a/src/Internal/Transport/Router/InvokeUpdate.php b/src/Internal/Transport/Router/InvokeUpdate.php index 60858c76..4d3822e7 100644 --- a/src/Internal/Transport/Router/InvokeUpdate.php +++ b/src/Internal/Transport/Router/InvokeUpdate.php @@ -89,8 +89,8 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred // Validation has passed - $promise = $handler($input); - $promise->then( + $deferred = new Deferred(); + $deferred->promise()->then( static function (mixed $value) use ($updateId, $context): void { $context->getClient()->send(new UpdateResponse( command: UpdateResponse::COMMAND_COMPLETED, @@ -108,22 +108,18 @@ static function (\Throwable $err) use ($updateId, $context): void { )); }, ); + + $handler($input, $deferred); } /** * @param non-empty-string $name - * @return \Closure(UpdateInput): PromiseInterface + * @return \Closure(UpdateInput, Deferred): PromiseInterface */ private function getUpdateHandler(WorkflowInstanceInterface $instance, string $name): \Closure { - $handler = $instance->findUpdateHandler($name); - - if ($handler === null) { - $available = \implode(' ', $instance->getUpdateHandlerNames()); - - throw new \LogicException(\sprintf(self::ERROR_HANDLER_NOT_FOUND, $name, $available)); - } - - return $handler; + return $instance->findUpdateHandler($name) ?? throw new \LogicException( + \sprintf(self::ERROR_HANDLER_NOT_FOUND, $name, \implode(' ', $instance->getUpdateHandlerNames())) + ); } } diff --git a/src/Internal/Workflow/Process/Process.php b/src/Internal/Workflow/Process/Process.php index 37440778..1fdd06ed 100644 --- a/src/Internal/Workflow/Process/Process.php +++ b/src/Internal/Workflow/Process/Process.php @@ -12,6 +12,7 @@ namespace Temporal\Internal\Workflow\Process; use JetBrains\PhpStorm\Pure; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use Temporal\DataConverter\ValuesInterface; use Temporal\Exception\DestructMemorizedInstanceException; @@ -92,7 +93,7 @@ function (UpdateInput $input) use ($handler): void { }); // Configure update handler in a mutable scope - $wfInstance->setUpdateExecutor(function (UpdateInput $input, callable $handler) use ($inboundPipeline): PromiseInterface { + $wfInstance->setUpdateExecutor(function (UpdateInput $input, callable $handler, Deferred $resolver) use ($inboundPipeline): PromiseInterface { try { // Define Context for interceptors Pipeline $scope = $this->createScope( @@ -102,7 +103,8 @@ function (UpdateInput $input) use ($handler): void { new Input($input->info, $input->arguments, $input->header), ), ); - $scope->startSignal( + + $scope->startUpdate( function () use ($handler, $inboundPipeline, $input) { Workflow::setCurrentContext($this->scopeContext); return $inboundPipeline->with( @@ -114,6 +116,7 @@ function (UpdateInput $input) use ($handler) { )($input); }, $input->arguments, + $resolver, ); return $scope->promise(); diff --git a/src/Internal/Workflow/Process/Scope.php b/src/Internal/Workflow/Process/Scope.php index a9671b5b..1b8a0fc4 100644 --- a/src/Internal/Workflow/Process/Scope.php +++ b/src/Internal/Workflow/Process/Scope.php @@ -185,13 +185,34 @@ public function start(callable $handler, ValuesInterface $values = null): void $this->next(); } + /** + * @param callable(ValuesInterface): mixed $handler Update method handler. + * @param Deferred $resolver Update method promise resolver. + */ + public function startUpdate(callable $handler, ValuesInterface $values, Deferred $resolver): void + { + $this->then( + $resolver->resolve(...), + function (\Throwable $error) use ($resolver): void { + $this->services->exceptionInterceptor->isRetryable($error) + ? $this->scopeContext->panic($error) + : $resolver->reject($error); + } + ); + + // Create a coroutine generator + $this->coroutine = $this->callSignalOrUpdateHandler($handler, $values); + $this->context->resolveConditions(); + $this->next(); + } + /** * @param callable $handler */ public function startSignal(callable $handler, ValuesInterface $values): void { // Create a coroutine generator - $this->coroutine = $this->callSignalHandler($handler, $values); + $this->coroutine = $this->callSignalOrUpdateHandler($handler, $values); $this->context->resolveConditions(); $this->next(); } @@ -371,12 +392,12 @@ protected function call(callable $handler, ValuesInterface $values): \Generator } /** - * Call a Signal method. In this case deserialization errors are skipped. + * Call a Signal or Update method. In this case deserialization errors are skipped. * - * @param callable $handler + * @param callable(ValuesInterface): mixed $handler * @return \Generator */ - protected function callSignalHandler(callable $handler, ValuesInterface $values): \Generator + protected function callSignalOrUpdateHandler(callable $handler, ValuesInterface $values): \Generator { try { $this->makeCurrent(); From d2927fd66227d891984deb6f6e07db5b6ee35b30 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 5 Jul 2024 23:21:16 +0400 Subject: [PATCH 3/5] Simplify code --- src/Internal/Workflow/Process/Process.php | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/Internal/Workflow/Process/Process.php b/src/Internal/Workflow/Process/Process.php index 1fdd06ed..1954a81d 100644 --- a/src/Internal/Workflow/Process/Process.php +++ b/src/Internal/Workflow/Process/Process.php @@ -105,12 +105,10 @@ function (UpdateInput $input) use ($handler): void { ); $scope->startUpdate( - function () use ($handler, $inboundPipeline, $input) { + function () use ($handler, $inboundPipeline, $input): mixed { Workflow::setCurrentContext($this->scopeContext); return $inboundPipeline->with( - function (UpdateInput $input) use ($handler) { - return $handler($input->arguments); - }, + static fn(UpdateInput $input): mixed => $handler($input->arguments), /** @see WorkflowInboundCallsInterceptor::handleUpdate() */ 'handleUpdate', )($input); From fc856f3e501bbc822acdc63b574ac252241b0176 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 12 Jul 2024 00:07:18 +0400 Subject: [PATCH 4/5] Add tests with exceptions in updates --- src/Internal/Transport/Server.php | 14 +-- .../src/Workflow/UpdateExceptionsWorkflow.php | 83 ++++++++++++++++ tests/Functional/Client/FailureTestCase.php | 99 ++++++++++++++++++- 3 files changed, 182 insertions(+), 14 deletions(-) create mode 100644 tests/Fixtures/src/Workflow/UpdateExceptionsWorkflow.php diff --git a/src/Internal/Transport/Server.php b/src/Internal/Transport/Server.php index d4332c78..1bf28308 100644 --- a/src/Internal/Transport/Server.php +++ b/src/Internal/Transport/Server.php @@ -17,8 +17,6 @@ use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Worker\Transport\Command\ServerRequestInterface; use Temporal\Worker\Transport\Command\SuccessResponse; -use Temporal\Worker\Transport\Command\UpdateResponse; -use Temporal\Workflow\Update\UpdateResult; /** * @psalm-import-type OnMessageHandler from ServerInterface @@ -82,17 +80,7 @@ public function dispatch(ServerRequestInterface $request, array $headers): void private function onFulfilled(ServerRequestInterface $request): \Closure { return function ($result) use ($request) { - if ($result::class === UpdateResult::class) { - $response = new UpdateResponse( - command: $result->command, - values: $result->result, - failure: $result->failure, - updateId: $request->getOptions()['updateId'] ?? null, - ); - } else { - $response = new SuccessResponse($result, $request->getID()); - } - + $response = new SuccessResponse($result, $request->getID()); $this->queue->push($response); return $response; diff --git a/tests/Fixtures/src/Workflow/UpdateExceptionsWorkflow.php b/tests/Fixtures/src/Workflow/UpdateExceptionsWorkflow.php new file mode 100644 index 00000000..268618d3 --- /dev/null +++ b/tests/Fixtures/src/Workflow/UpdateExceptionsWorkflow.php @@ -0,0 +1,83 @@ + $this->greetings !== [] || $this->exit); + if ($this->greetings === [] && $this->exit) { + return $received; + } + + $message = array_shift($this->greetings); + $received[] = $message; + } + } + + #[Workflow\UpdateMethod] + public function failWithName(string $name): void + { + $this->greetings[] = $name; + throw new \RuntimeException("Signal exception $name"); + } + + #[Workflow\UpdateMethod] + public function failInvalidArgument($name = 'foo'): void + { + $this->greetings[] = "invalidArgument $name"; + throw new InvalidArgumentException("Invalid argument $name"); + } + + #[Workflow\UpdateMethod] + public function failActivity($name = 'foo') + { + yield Workflow::newUntypedActivityStub( + ActivityOptions::new() + ->withScheduleToStartTimeout(1) + ->withRetryOptions( + RetryOptions::new()->withMaximumAttempts(1) + ) + ->withStartToCloseTimeout(1), + )->execute('nonExistingActivityName', [$name]); + } + + #[Workflow\UpdateMethod] + public function error() + { + yield Workflow::timer(CarbonInterval::millisecond(10)); + 10 / 0; + } + + #[SignalMethod] + public function exit(): void + { + $this->exit = true; + } +} diff --git a/tests/Functional/Client/FailureTestCase.php b/tests/Functional/Client/FailureTestCase.php index 229f976b..f9430b9b 100644 --- a/tests/Functional/Client/FailureTestCase.php +++ b/tests/Functional/Client/FailureTestCase.php @@ -12,12 +12,16 @@ namespace Temporal\Tests\Functional\Client; use PHPUnit\Framework\AssertionFailedError; +use Temporal\Api\Enums\V1\EventType; +use Temporal\Client\WorkflowOptions; use Temporal\Exception\Client\WorkflowFailedException; use Temporal\Exception\Client\WorkflowNotFoundException; +use Temporal\Exception\Client\WorkflowUpdateException; use Temporal\Exception\Failure\ActivityFailure; use Temporal\Exception\Failure\ApplicationFailure; use Temporal\Exception\Failure\ChildWorkflowFailure; use Temporal\Tests\Workflow\SignalExceptionsWorkflow; +use Temporal\Tests\Workflow\UpdateExceptionsWorkflow; /** * @group client @@ -148,7 +152,7 @@ public function testSignalThatThrowsInternalException() $run = $client->startWithSignal($wf, 'failActivity', ['foo']); try { - sleep(8); + sleep(3); $wf->failActivity('foo'); $this->fail('Signal must fail'); } catch (AssertionFailedError $e) { @@ -161,4 +165,97 @@ public function testSignalThatThrowsInternalException() $result = $run->getResult(); $this->fail(sprintf("Workflow must fail. Got result %s", \print_r($result, true))); } + + /** + * @group skip-on-test-server + */ + public function testUpdateThatThrowsRetryableException() + { + $client = $this->createClient(); + $wf = $client->newUntypedWorkflowStub( + 'SignalExceptions.greet', + WorkflowOptions::new()->withWorkflowRunTimeout('40 seconds') + ); + + $run = $client->start($wf); + + $wf->startUpdate('error'); + + sleep(1); + $wf->signal('exit'); + + // Check history + $e = null; + $s = null; + foreach ($client->getWorkflowHistory($run->getExecution()) as $event) { + if ($event->getEventType() === EventType::EVENT_TYPE_WORKFLOW_TASK_FAILED) { + $e = $event; + continue; + } + + if ($event->getEventType() === EventType::EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED) { + $s = $event; + continue; + } + + if ($event->getEventType() === EventType::EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED) { + $this->fail('Workflow must not complete'); + } + } + + $this->assertNotNull($e); + $this->assertNotNull($s); + } + + /** + * @group skip-on-test-server + */ + public function testUpdateThatThrowsCustomError() + { + $client = $this->createClient(); + $wf = $client->newWorkflowStub(UpdateExceptionsWorkflow::class); + $run = $client->start($wf); + + try { + $this->expectException(WorkflowUpdateException::class); + $wf->failWithName('test1'); + } finally { + $wf->exit(); + $this->assertSame(['test1'], $run->getResult()); + } + } + + /** + * @group skip-on-test-server + */ + public function testUpdateThatThrowsInvalidArgumentException() + { + try { + $client = $this->createClient(); + $wf = $client->newWorkflowStub(UpdateExceptionsWorkflow::class); + $run = $client->start($wf); + $this->expectException(WorkflowUpdateException::class); + $wf->failInvalidArgument('test1'); + } finally { + $wf->exit(); + $this->assertSame(['invalidArgument test1'], $run->getResult()); + } + } + + /** + * @group skip-on-test-server + */ + public function testUpdateThatThrowsInternalException() + { + $client = $this->createClient(); + $wf = $client->newWorkflowStub(UpdateExceptionsWorkflow::class); + $client->startWithSignal($wf, 'failActivity', ['foo']); + + try { + $this->expectException(WorkflowUpdateException::class); + $wf->failActivity('foo'); + } finally { + $wf->exit(); + } + } } From 654ef71a1179283fd7ada79a0a1e56ce1cddf1c0 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 12 Jul 2024 00:42:15 +0400 Subject: [PATCH 5/5] Fix psalm issues --- psalm-baseline.xml | 18 +----------------- src/Internal/Declaration/WorkflowInstance.php | 2 +- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/psalm-baseline.xml b/psalm-baseline.xml index aa1b9cbf..9085f551 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -1,5 +1,5 @@ - + @@ -581,9 +581,6 @@ - - - @@ -1157,12 +1154,6 @@ - - getOptions()['updateId'] ?? null]]> - - - - @@ -1232,16 +1223,9 @@ - - arguments]]> - - - - - diff --git a/src/Internal/Declaration/WorkflowInstance.php b/src/Internal/Declaration/WorkflowInstance.php index eab5a122..2c6859ba 100644 --- a/src/Internal/Declaration/WorkflowInstance.php +++ b/src/Internal/Declaration/WorkflowInstance.php @@ -130,7 +130,7 @@ public function setUpdateExecutor(\Closure $executor): self } /** - * @param UpdateValidator $validator + * @param ValidateUpdateExecutor $validator */ public function setUpdateValidator(\Closure $validator): self {