Skip to content

Commit

Permalink
Extract UpdateWorkflow exception when Workflow was started
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk committed Nov 22, 2024
1 parent dea4b90 commit f735154
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 50 deletions.
1 change: 1 addition & 0 deletions src/Client/WorkflowClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public function updateWithStart(
$startArgs,
);

// todo: set execution if UpdateWorkflow was failed but WF was started
$workflowStub->setExecution($handle->getExecution());

return $handle;
Expand Down
80 changes: 80 additions & 0 deletions src/Internal/Client/ResponseToResultMapper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php

declare(strict_types=1);

namespace Temporal\Internal\Client;

use Temporal\Api\Workflowservice\V1\UpdateWorkflowExecutionResponse;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\DataConverter\EncodedValues;
use Temporal\Exception\Client\WorkflowUpdateException;
use Temporal\Exception\Failure\FailureConverter;
use Temporal\Interceptor\WorkflowClient\StartUpdateOutput;
use Temporal\Interceptor\WorkflowClient\UpdateRef;
use Temporal\Workflow\WorkflowExecution;

/**
* @internal
*/
final class ResponseToResultMapper
{
public function __construct(
private readonly DataConverterInterface $converter,
) {}

public function mapUpdateWorkflowResponse(
UpdateWorkflowExecutionResponse $result,
string $updateName,
string $workflowType,
WorkflowExecution $workflowExecution,
): StartUpdateOutput {
$outcome = $result->getOutcome();
$updateRef = $result->getUpdateRef();
\assert($updateRef !== null);
$updateRefDto = new UpdateRef(
new WorkflowExecution(
(string)$updateRef->getWorkflowExecution()?->getWorkflowId(),
$updateRef->getWorkflowExecution()?->getRunId(),
),
$updateRef->getUpdateId(),
);

if ($outcome === null) {
// Not completed
return new StartUpdateOutput($updateRefDto, false, null);
}

$failure = $outcome->getFailure();
$success = $outcome->getSuccess();


if ($success !== null) {
return new StartUpdateOutput(
$updateRefDto,
true,
EncodedValues::fromPayloads($success, $this->converter),
);
}

if ($failure !== null) {
$execution = $updateRef->getWorkflowExecution();
throw new WorkflowUpdateException(
null,
$execution === null
? $workflowExecution
: new WorkflowExecution($execution->getWorkflowId(), $execution->getRunId()),
workflowType: $workflowType,
updateId: $updateRef->getUpdateId(),
updateName: $updateName,
previous: FailureConverter::mapFailureToException($failure, $this->converter),
);
}

throw new \RuntimeException(
\sprintf(
'Received unexpected outcome from update request: %s',
$outcome->getValue(),
),
);
}
}
30 changes: 26 additions & 4 deletions src/Internal/Client/WorkflowStarter.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Temporal\Api\Update\V1\Request as UpdateRequestMessage;
use Temporal\Api\Workflowservice\V1\ExecuteMultiOperationRequest;
use Temporal\Api\Workflowservice\V1\ExecuteMultiOperationRequest\Operation;
use Temporal\Api\Workflowservice\V1\ExecuteMultiOperationResponse\Response;
use Temporal\Api\Workflowservice\V1\SignalWithStartWorkflowExecutionRequest;
use Temporal\Api\Workflowservice\V1\StartWorkflowExecutionRequest;
use Temporal\Api\Workflowservice\V1\UpdateWorkflowExecutionRequest;
Expand Down Expand Up @@ -187,7 +188,7 @@ function (UpdateWithStartInput $input): UpdateHandle {
];

try {
$this->serviceClient->ExecuteMultiOperation(
$response = $this->serviceClient->ExecuteMultiOperation(
(new ExecuteMultiOperationRequest())
->setNamespace($this->clientOptions->namespace)
->setOperations($ops),
Expand Down Expand Up @@ -222,16 +223,37 @@ function (UpdateWithStartInput $input): UpdateHandle {
);
}

// Extract result
/** @var \ArrayAccess<int, Response> $responses */
$responses = $response->getResponses();

// Start Workflow: get execution
$startResponse = $responses[0]->getStartWorkflow();
\assert($startResponse !== null);
$execution = new WorkflowExecution($input->workflowStartInput->workflowId, $startResponse->getRunId());

// Update Workflow: get handler
$updateResponse = $responses[1]->getUpdateWorkflow();
\assert($updateResponse !== null);

$updateResult = (new \Temporal\Internal\Client\ResponseToResultMapper($this->converter))
->mapUpdateWorkflowResponse(
$updateResponse,
updateName: $input->updateInput->updateName,
workflowType: $input->workflowStartInput->workflowType,
workflowExecution: $execution,
);

return new UpdateHandle(
client: $this->serviceClient,
clientOptions: $this->clientOptions,
converter: $this->converter,
execution: $input->updateInput->workflowExecution,
execution: $updateResult->getReference()->workflowExecution,
workflowType: $input->updateInput->workflowType,
updateName: $input->updateInput->updateName,
resultType: $input->updateInput->resultType,
updateId: $input->updateInput->updateId,
result: null,
updateId: $updateResult->getReference()->updateId,
result: $updateResult->getResult(),
);
},
/** @see WorkflowClientCallsInterceptor::updateWithStart() */
Expand Down
51 changes: 6 additions & 45 deletions src/Internal/Client/WorkflowStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -338,52 +338,13 @@ static function (
throw new WorkflowServiceException(null, $input->workflowExecution, $input->workflowType, $e);
}

$outcome = $result->getOutcome();
$updateRef = $result->getUpdateRef();
\assert($updateRef !== null);
$updateRefDto = new UpdateRef(
new WorkflowExecution(
(string) $updateRef->getWorkflowExecution()?->getWorkflowId(),
$updateRef->getWorkflowExecution()?->getRunId(),
),
$updateRef->getUpdateId(),
);

if ($outcome === null) {
// Not completed
return new StartUpdateOutput($updateRefDto, false, null);
}

$failure = $outcome->getFailure();
$success = $outcome->getSuccess();


if ($success !== null) {
return new StartUpdateOutput(
$updateRefDto,
true,
EncodedValues::fromPayloads($success, $converter),
return (new \Temporal\Internal\Client\ResponseToResultMapper($converter))
->mapUpdateWorkflowResponse(
$result,
$input->updateName,
$input->workflowType,
$input->workflowExecution,
);
}

if ($failure !== null) {
$execution = $updateRef->getWorkflowExecution();
throw new WorkflowUpdateException(
null,
$execution === null
? $input->workflowExecution
: new WorkflowExecution($execution->getWorkflowId(), $execution->getRunId()),
workflowType: $input->workflowType,
updateId: $updateRef->getUpdateId(),
updateName: $input->updateName,
previous: FailureConverter::mapFailureToException($failure, $converter),
);
}

throw new \RuntimeException(\sprintf(
'Received unexpected outcome from update request: %s',
$outcome->getValue(),
));
},
/** @see WorkflowClientCallsInterceptor::update() */
'update',
Expand Down
3 changes: 2 additions & 1 deletion tests/Acceptance/Extra/Update/UpdateWithStartTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public function failWithBadUpdateName(
);

try {
$this->expectException(WorkflowUpdateException::class);
$client->updateWithStart($stub, 'await1234', ['key']);
$this->fail('Update must fail');
} catch (WorkflowUpdateException $e) {
$this->assertStringContainsString('await1234', $e->getPrevious()->getMessage());
} finally {
try {
$stub->getResult(timeout: 1);
Expand Down

0 comments on commit f735154

Please sign in to comment.