Skip to content

Commit

Permalink
Merge pull request #473: Divide errors and failures in updates
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk authored Jul 12, 2024
2 parents adcfc27 + 654ef71 commit c1107fd
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 62 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
},
"extra": {
"branch-alias": {
"dev-master": "2.9.x-dev"
"dev-master": "2.11.x-dev"
}
},
"config": {
Expand Down
18 changes: 1 addition & 17 deletions psalm-baseline.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="5.24.0@462c80e31c34e58cc4f750c656be3927e80e550e">
<files psalm-version="5.25.0@01a8eb06b9e9cc6cfb6a320bf9fb14331919d505">
<file src="src/Activity.php">
<ImplicitToStringCast>
<code><![CDATA[$type]]></code>
Expand Down Expand Up @@ -581,9 +581,6 @@
</RedundantConditionGivenDocblockType>
</file>
<file src="src/Internal/Declaration/WorkflowInstance.php">
<InvalidPropertyAssignmentValue>
<code><![CDATA[$validator]]></code>
</InvalidPropertyAssignmentValue>
<MissingClosureReturnType>
<code><![CDATA[function (QueryInput $input) use ($fn) {]]></code>
</MissingClosureReturnType>
Expand Down Expand Up @@ -1157,12 +1154,6 @@
<code><![CDATA[$result]]></code>
<code><![CDATA[$result]]></code>
</MissingClosureParamType>
<PossiblyNullArgument>
<code><![CDATA[$request->getOptions()['updateId'] ?? null]]></code>
</PossiblyNullArgument>
<UndefinedClass>
<code><![CDATA[UpdateResult]]></code>
</UndefinedClass>
</file>
<file src="src/Internal/Workflow/ActivityProxy.php">
<ArgumentTypeCoercion>
Expand Down Expand Up @@ -1232,16 +1223,9 @@
</PropertyTypeCoercion>
</file>
<file src="src/Internal/Workflow/Process/Process.php">
<InvalidArgument>
<code><![CDATA[$input->arguments]]></code>
</InvalidArgument>
<MissingClosureParamType>
<code><![CDATA[$result]]></code>
</MissingClosureParamType>
<MissingClosureReturnType>
<code><![CDATA[function () use ($handler, $inboundPipeline, $input) {]]></code>
<code><![CDATA[function (UpdateInput $input) use ($handler) {]]></code>
</MissingClosureReturnType>
<PropertyNotSetInConstructor>
<code><![CDATA[Process]]></code>
</PropertyNotSetInConstructor>
Expand Down
16 changes: 9 additions & 7 deletions src/Internal/Declaration/WorkflowInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Temporal\Internal\Declaration;

use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\DataConverter\ValuesInterface;
use Temporal\Interceptor\WorkflowInbound\QueryInput;
Expand All @@ -23,10 +24,10 @@
/**
* @psalm-import-type DispatchableHandler from InstanceInterface
* @psalm-type QueryHandler = \Closure(QueryInput): mixed
* @psalm-type UpdateHandler = \Closure(UpdateInput): PromiseInterface
* @psalm-type UpdateHandler = \Closure(UpdateInput, Deferred): PromiseInterface
* @psalm-type ValidateUpdateHandler = \Closure(UpdateInput): void
* @psalm-type QueryExecutor = \Closure(QueryInput, callable(ValuesInterface): mixed): mixed
* @psalm-type UpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed): PromiseInterface
* @psalm-type UpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed, Deferred): PromiseInterface
* @psalm-type ValidateUpdateExecutor = \Closure(UpdateInput, callable(ValuesInterface): mixed): mixed
* @psalm-type UpdateValidator = \Closure(UpdateInput, UpdateHandler): void
*/
Expand Down Expand Up @@ -85,7 +86,8 @@ public function __construct(
$updateValidators = $prototype->getValidateUpdateHandlers();
foreach ($prototype->getUpdateHandlers() as $method => $reflection) {
$fn = $this->createHandler($reflection);
$this->updateHandlers[$method] = fn(UpdateInput $input): mixed => ($this->updateExecutor)($input, $fn);
$this->updateHandlers[$method] = fn(UpdateInput $input, Deferred $deferred): mixed =>
($this->updateExecutor)($input, $fn, $deferred);
// Register validate update handlers
$this->validateUpdateHandlers[$method] = \array_key_exists($method, $updateValidators)
? fn(UpdateInput $input): mixed => ($this->updateValidator)(
Expand Down Expand Up @@ -128,7 +130,7 @@ public function setUpdateExecutor(\Closure $executor): self
}

/**
* @param UpdateValidator $validator
* @param ValidateUpdateExecutor $validator
*/
public function setUpdateValidator(\Closure $validator): self
{
Expand Down Expand Up @@ -168,7 +170,7 @@ public function findQueryHandler(string $name): ?\Closure
/**
* @param non-empty-string $name
*
* @return null|\Closure(UpdateInput): PromiseInterface
* @return null|\Closure(UpdateInput, Deferred): PromiseInterface
* @psalm-return UpdateHandler|null
*/
public function findUpdateHandler(string $name): ?\Closure
Expand Down Expand Up @@ -215,8 +217,8 @@ public function addUpdateHandler(string $name, callable $handler): void
$fn = $this->createCallableHandler($handler);

$this->updateHandlers[$name] = $this->pipeline->with(
function (UpdateInput $input) use ($fn) {
return ($this->updateExecutor)($input, $fn);
function (UpdateInput $input, Deferred $deferred) use ($fn) {
return ($this->updateExecutor)($input, $fn, $deferred);
},
/** @see WorkflowInboundCallsInterceptor::handleUpdate() */
'handleUpdate',
Expand Down
3 changes: 2 additions & 1 deletion src/Internal/Declaration/WorkflowInstanceInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Temporal\Internal\Declaration;

use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\DataConverter\ValuesInterface;
use Temporal\Interceptor\WorkflowInbound\QueryInput;
Expand Down Expand Up @@ -49,7 +50,7 @@ public function getSignalHandler(string $name): \Closure;

/**
* @param non-empty-string $name
* @return null|\Closure(UpdateInput): PromiseInterface
* @return null|\Closure(UpdateInput, Deferred): PromiseInterface
*/
public function findUpdateHandler(string $name): ?\Closure;

Expand Down
20 changes: 8 additions & 12 deletions src/Internal/Transport/Router/InvokeUpdate.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred

// Validation has passed

$promise = $handler($input);
$promise->then(
$deferred = new Deferred();
$deferred->promise()->then(
static function (mixed $value) use ($updateId, $context): void {
$context->getClient()->send(new UpdateResponse(
command: UpdateResponse::COMMAND_COMPLETED,
Expand All @@ -108,22 +108,18 @@ static function (\Throwable $err) use ($updateId, $context): void {
));
},
);

$handler($input, $deferred);
}

/**
* @param non-empty-string $name
* @return \Closure(UpdateInput): PromiseInterface
* @return \Closure(UpdateInput, Deferred): PromiseInterface
*/
private function getUpdateHandler(WorkflowInstanceInterface $instance, string $name): \Closure
{
$handler = $instance->findUpdateHandler($name);

if ($handler === null) {
$available = \implode(' ', $instance->getUpdateHandlerNames());

throw new \LogicException(\sprintf(self::ERROR_HANDLER_NOT_FOUND, $name, $available));
}

return $handler;
return $instance->findUpdateHandler($name) ?? throw new \LogicException(
\sprintf(self::ERROR_HANDLER_NOT_FOUND, $name, \implode(' ', $instance->getUpdateHandlerNames()))
);
}
}
14 changes: 1 addition & 13 deletions src/Internal/Transport/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Worker\Transport\Command\ServerRequestInterface;
use Temporal\Worker\Transport\Command\SuccessResponse;
use Temporal\Worker\Transport\Command\UpdateResponse;
use Temporal\Workflow\Update\UpdateResult;

/**
* @psalm-import-type OnMessageHandler from ServerInterface
Expand Down Expand Up @@ -82,17 +80,7 @@ public function dispatch(ServerRequestInterface $request, array $headers): void
private function onFulfilled(ServerRequestInterface $request): \Closure
{
return function ($result) use ($request) {
if ($result::class === UpdateResult::class) {
$response = new UpdateResponse(
command: $result->command,
values: $result->result,
failure: $result->failure,
updateId: $request->getOptions()['updateId'] ?? null,
);
} else {
$response = new SuccessResponse($result, $request->getID());
}

$response = new SuccessResponse($result, $request->getID());
$this->queue->push($response);

return $response;
Expand Down
13 changes: 7 additions & 6 deletions src/Internal/Workflow/Process/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Temporal\Internal\Workflow\Process;

use JetBrains\PhpStorm\Pure;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\DataConverter\ValuesInterface;
use Temporal\Exception\DestructMemorizedInstanceException;
Expand Down Expand Up @@ -92,7 +93,7 @@ function (UpdateInput $input) use ($handler): void {
});

// Configure update handler in a mutable scope
$wfInstance->setUpdateExecutor(function (UpdateInput $input, callable $handler) use ($inboundPipeline): PromiseInterface {
$wfInstance->setUpdateExecutor(function (UpdateInput $input, callable $handler, Deferred $resolver) use ($inboundPipeline): PromiseInterface {
try {
// Define Context for interceptors Pipeline
$scope = $this->createScope(
Expand All @@ -102,18 +103,18 @@ function (UpdateInput $input) use ($handler): void {
new Input($input->info, $input->arguments, $input->header),
),
);
$scope->startSignal(
function () use ($handler, $inboundPipeline, $input) {

$scope->startUpdate(
function () use ($handler, $inboundPipeline, $input): mixed {
Workflow::setCurrentContext($this->scopeContext);
return $inboundPipeline->with(
function (UpdateInput $input) use ($handler) {
return $handler($input->arguments);
},
static fn(UpdateInput $input): mixed => $handler($input->arguments),
/** @see WorkflowInboundCallsInterceptor::handleUpdate() */
'handleUpdate',
)($input);
},
$input->arguments,
$resolver,
);

return $scope->promise();
Expand Down
29 changes: 25 additions & 4 deletions src/Internal/Workflow/Process/Scope.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,34 @@ public function start(callable $handler, ValuesInterface $values = null): void
$this->next();
}

/**
* @param callable(ValuesInterface): mixed $handler Update method handler.
* @param Deferred $resolver Update method promise resolver.
*/
public function startUpdate(callable $handler, ValuesInterface $values, Deferred $resolver): void
{
$this->then(
$resolver->resolve(...),
function (\Throwable $error) use ($resolver): void {
$this->services->exceptionInterceptor->isRetryable($error)
? $this->scopeContext->panic($error)
: $resolver->reject($error);
}
);

// Create a coroutine generator
$this->coroutine = $this->callSignalOrUpdateHandler($handler, $values);
$this->context->resolveConditions();
$this->next();
}

/**
* @param callable $handler
*/
public function startSignal(callable $handler, ValuesInterface $values): void
{
// Create a coroutine generator
$this->coroutine = $this->callSignalHandler($handler, $values);
$this->coroutine = $this->callSignalOrUpdateHandler($handler, $values);
$this->context->resolveConditions();
$this->next();
}
Expand Down Expand Up @@ -371,12 +392,12 @@ protected function call(callable $handler, ValuesInterface $values): \Generator
}

/**
* Call a Signal method. In this case deserialization errors are skipped.
* Call a Signal or Update method. In this case deserialization errors are skipped.
*
* @param callable $handler
* @param callable(ValuesInterface): mixed $handler
* @return \Generator
*/
protected function callSignalHandler(callable $handler, ValuesInterface $values): \Generator
protected function callSignalOrUpdateHandler(callable $handler, ValuesInterface $values): \Generator
{
try {
$this->makeCurrent();
Expand Down
83 changes: 83 additions & 0 deletions tests/Fixtures/src/Workflow/UpdateExceptionsWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Tests\Workflow;

use Carbon\CarbonInterval;
use InvalidArgumentException;
use Temporal\Activity\ActivityOptions;
use Temporal\Common\RetryOptions;
use Temporal\Workflow;
use Temporal\Workflow\SignalMethod;
use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;

#[WorkflowInterface]
class UpdateExceptionsWorkflow
{
private array $greetings = [];
private bool $exit = false;

#[WorkflowMethod(name: "UpdateExceptionsWorkflow.greet")]
public function greet()
{
$received = [];
while (true) {
yield Workflow::await(fn() => $this->greetings !== [] || $this->exit);
if ($this->greetings === [] && $this->exit) {
return $received;
}

$message = array_shift($this->greetings);
$received[] = $message;
}
}

#[Workflow\UpdateMethod]
public function failWithName(string $name): void
{
$this->greetings[] = $name;
throw new \RuntimeException("Signal exception $name");
}

#[Workflow\UpdateMethod]
public function failInvalidArgument($name = 'foo'): void
{
$this->greetings[] = "invalidArgument $name";
throw new InvalidArgumentException("Invalid argument $name");
}

#[Workflow\UpdateMethod]
public function failActivity($name = 'foo')
{
yield Workflow::newUntypedActivityStub(
ActivityOptions::new()
->withScheduleToStartTimeout(1)
->withRetryOptions(
RetryOptions::new()->withMaximumAttempts(1)
)
->withStartToCloseTimeout(1),
)->execute('nonExistingActivityName', [$name]);
}

#[Workflow\UpdateMethod]
public function error()
{
yield Workflow::timer(CarbonInterval::millisecond(10));
10 / 0;
}

#[SignalMethod]
public function exit(): void
{
$this->exit = true;
}
}
Loading

0 comments on commit c1107fd

Please sign in to comment.