Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Exception in converter encoding doesn't propagate as expected #540

Closed
assafz-q opened this issue Jun 4, 2024 · 5 comments · Fixed by #591
Closed

[Bug] Exception in converter encoding doesn't propagate as expected #540

assafz-q opened this issue Jun 4, 2024 · 5 comments · Fixed by #591
Labels
bug Something isn't working

Comments

@assafz-q
Copy link

assafz-q commented Jun 4, 2024

What are you really trying to do?

I am writing a temporal workflow and I want to handle the exceptions that happen during the execution, I want the workflow to catch any unexpected error and fail the entire workflow in a non-retriable way.

Describe the bug

I am catching all the exceptions during workflow run and re-raise them as non-retriable ApplicationErrors and everything works as expected except for the case where the exception is thrown during the encoding of data to an activity.

Running a workflow that sends unserializable obj to an activity results in the following history (which doesn't contain any failure in it except for the timeout [2s]):
image

and the following warning in the logs:

WARN temporal_sdk_core::worker::workflowError while completing workflow activation error=statusInvalidArgumentmessage"invalid TaskQueue on ScheduleActivityTaskCommand: missing task queue name. ActivityId=1 ActivityType=some_activity"details: [], metadataMetadataMap { headers: {"content-type""application/grpc"} }

I would expect the workflow to fail because of the exception (and log it in the history) before the timeout occurs, in the same way it happens if exception is thrown in the middle of the workflow execution.

Minimal Reproduction

import asyncio
import datetime
import uuid

import temporalio.activity
import temporalio.client
import temporalio.common
import temporalio.exceptions
import temporalio.worker
import temporalio.workflow


#### activities
class SomeObj# non serializable by default so senfding it to activities should fail
    def __init__(selfvaluestr):
        self.value = value


@temporalio.activity.defn
async def some_activity(objSomeObj-> str:
    return obj.value


#### workflow
@temporalio.workflow.defn
class FailingWorkflowExceptionDuringActivitySend:
    @temporalio.workflow.run
    async def run(self-> str:
        try:
            obj = SomeObj("value")
            value = await temporalio.workflow.execute_activity(
                some_activity,
                obj,
                start_to_close_timeout=datetime.timedelta(seconds=10),
            )
            return value
        except Exception as e:
            raise temporalio.exceptions.ApplicationError(
                "Failed during activity send as expected"non_retryable=True
            ) from e


####


async def _main():
    client = await temporalio.client.Client.connect(
        target_host="localhost:7233",
    )

    task_queue = "task-queue-name"
    workflow = FailingWorkflowExceptionDuringActivitySend

    async with temporalio.worker.Worker(
        client,
        task_queue=task_queue,
        workflows=[workflow],
        activities=[some_activity],
    ):
        try:
            handle = await client.start_workflow(
                workflow.run,
                id=f"{workflow.__name__}-{uuid.uuid4().hex}",
                task_queue=task_queue,
                run_timeout=datetime.timedelta(seconds=2),
                execution_timeout=datetime.timedelta(seconds=2),
                retry_policy=temporalio.common.RetryPolicy(
                    maximum_attempts=1,
                ),
            )

            result = await handle.result()
            print(f"Result: {result}")
        except Exception as e:
            print(f"Workflow: {workflow.__name__} failed:{e.__cause__}")


if __name__ == "__main__":
    asyncio.run(_main())

Environment/Versions

  • OS and processor: Linux
  • Temporal Version: sdk version: temporalio==1.6.0, temporal cli (temporal --version) temporal version 0.12.0 (server 1.23.0) (ui 2.26.2)
  • Are you using Docker or Kubernetes or building Temporal from source? No
@assafz-q assafz-q added the bug Something isn't working label Jun 4, 2024
@cretz
Copy link
Member

cretz commented Jun 4, 2024

I would expect the workflow to fail because of the exception (and log it in the history) before the timeout occurs, in the same way it happens if exception is thrown in the middle of the workflow execution.

Common exceptions do not fail the workflow by default, they "suspend" it. This is the same way as if an exception is thrown in the middle of workflow execution. See https://github.com/temporalio/sdk-python?tab=readme-ov-file#exceptions. You can customize this behavior and/or customize the converter to throw certain exceptions.

@assafz-q
Copy link
Author

assafz-q commented Jun 4, 2024

I would expect the workflow to fail because of the exception (and log it in the history) before the timeout occurs, in the same way it happens if exception is thrown in the middle of the workflow execution.

Common exceptions do not fail the workflow by default, they "suspend" it. This is the same way as if an exception is thrown in the middle of workflow execution. See https://github.com/temporalio/sdk-python?tab=readme-ov-file#exceptions. You can customize this behavior and/or customize the converter to throw certain exceptions.

I know, that is why I catch the exception and raise it again as non-retriable ApplicationError.

@cretz
Copy link
Member

cretz commented Jun 4, 2024

Right, so the exception raised by the data converter is similar to raising the exception in the workflow, but it occurs on a different path (it occurs in the payload converter not the workflow code). It's a bit advanced, but you can customize the converter the same way you customize the workflow to catch and re-raise in a certain way. Alternatively to both, you can just put failure_exception_types=[Exception] or similar in the @workflow.defn and it should fail the workflow with any exception instead of failing the task (i.e. suspending pending code fix).

@assafz-q
Copy link
Author

assafz-q commented Jun 4, 2024

Right, so the exception raised by the data converter is similar to raising the exception in the workflow, but it occurs on a different path (it occurs in the payload converter not the workflow code). It's a bit advanced, but you can customize the converter the same way you customize the workflow to catch and re-raise in a certain way. Alternatively to both, you can just put failure_exception_types=[Exception] or similar in the @workflow.defn and it should fail the workflow with any exception instead of failing the task (i.e. suspending pending code fix).

Thanks!
But I am able to catch the exception in the try except of the workflow so I think everything is as I expected.
I think the problem is related to the warning log I attached to the issue:

WARN temporal_sdk_core::worker::workflowError while completing workflow activation error=statusInvalidArgumentmessage"invalid TaskQueue on ScheduleActivityTaskCommand: missing task queue name. ActivityId=1 ActivityType=some_activity"details: [], metadataMetadataMap { headers: {"content-type""application/grpc"} }

I was able to (maybe) fix that locally if in (temporalio.worker._workflow_instance._ActivityHandle._apply_schedule_command)

def _apply_schedule_command(
self,
command: temporalio.bridge.proto.workflow_commands.WorkflowCommand,
local_backoff: Optional[
temporalio.bridge.proto.activity_result.DoBackoff
] = None,
) -> None:
# TODO(cretz): Why can't MyPy infer this?
v: Union[
temporalio.bridge.proto.workflow_commands.ScheduleActivity,
temporalio.bridge.proto.workflow_commands.ScheduleLocalActivity,
] = (
command.schedule_local_activity
if isinstance(self._input, StartLocalActivityInput)
else command.schedule_activity
)
v.seq = self._seq
v.activity_id = self._input.activity_id or str(self._seq)
v.activity_type = self._input.activity
if self._input.headers:
temporalio.common._apply_headers(self._input.headers, v.headers)
if self._input.args:
v.arguments.extend(
self._instance._payload_converter.to_payloads(self._input.args)
)
if self._input.schedule_to_close_timeout:
v.schedule_to_close_timeout.FromTimedelta(
self._input.schedule_to_close_timeout
)
if self._input.schedule_to_start_timeout:
v.schedule_to_start_timeout.FromTimedelta(
self._input.schedule_to_start_timeout
)
if self._input.start_to_close_timeout:
v.start_to_close_timeout.FromTimedelta(self._input.start_to_close_timeout)
if self._input.retry_policy:
self._input.retry_policy.apply_to_proto(v.retry_policy)
v.cancellation_type = cast(
"temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType",
int(self._input.cancellation_type),
)
# Things specific to local or remote
if isinstance(self._input, StartActivityInput):
command.schedule_activity.task_queue = (
self._input.task_queue or self._instance._info.task_queue
)
if self._input.heartbeat_timeout:
command.schedule_activity.heartbeat_timeout.FromTimedelta(
self._input.heartbeat_timeout
)
command.schedule_activity.do_not_eagerly_execute = (
self._input.disable_eager_execution
)
if self._input.versioning_intent:
command.schedule_activity.versioning_intent = (
self._input.versioning_intent._to_proto()
)
if isinstance(self._input, StartLocalActivityInput):
if self._input.local_retry_threshold:
command.schedule_local_activity.local_retry_threshold.FromTimedelta(
self._input.local_retry_threshold
)
if local_backoff:
command.schedule_local_activity.attempt = local_backoff.attempt
command.schedule_local_activity.original_schedule_time.CopyFrom(
local_backoff.original_schedule_time
)
# TODO(cretz): Remove when https://github.com/temporalio/sdk-core/issues/316 fixed
command.schedule_local_activity.retry_policy.SetInParent()

I moved the part that handles the payloads:
if self._input.args:
v.arguments.extend(
self._instance._payload_converter.to_payloads(self._input.args)
)

to the end of the function (_apply_schedule_command)

I am not too familiar with the code, but it seems like in that context the queue name is only initialized after the payload conversion, so if I had an exception during the payload conversion it could not be sent to the temporal server.

@cretz
Copy link
Member

cretz commented Jun 27, 2024

I was able to (maybe) fix that locally if in (temporalio.worker._workflow_instance._ActivityHandle._apply_schedule_command) to the end of the function (_apply_schedule_command)

I see the issue. The issue is that we create the command before we try to serialize the contents. So if this does not fail the workflow task, the command is sent off incomplete (because it threw an exception during building). I have opened #564 to track (it is easier as a separate issue to state the problem clearly), but will keep this issue open.

cretz added a commit to cretz/temporal-sdk-python that referenced this issue Jul 23, 2024
cretz added a commit that referenced this issue Aug 9, 2024
@cretz cretz closed this as completed in #591 Aug 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants