Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled workflow raising exception: where to observe re-execution? #147

Open
chbussler opened this issue Nov 5, 2024 · 2 comments
Open

Comments

@chbussler
Copy link
Contributor

The code below shows a scheduled workflow that raises an exception on first execution. In order to identify the executing workflow a log statement logs the workflow id (sched-print_log-2024-11-05T03:31:50+00:00).

When another workflow is scheduled, it will not raise an exception due to the workflow_failed Boolean variable set to True when the first workflow raises an exception.

My expectation was that the failed workflow with id sched-print_log-2024-11-05T03:31:50+00:00 will be re-executed as it did not successfully complete on first execution. Where in the DBOS schema can I observe that the failed workflow is re-executed and successful on 2nd try?

Thank you.

import sys
import time
import traceback
from datetime import datetime

from dbos import DBOS

DBOS()

workflow_failed: bool = False


@DBOS.scheduled("*/10 * * * * *")
@DBOS.workflow()
def print_log(scheduled_time: datetime,
              actual_time: datetime) -> None:
    DBOS.logger.info(f"Workflow with id {DBOS.workflow_id} start ...")
    global workflow_failed
    if not workflow_failed:
        workflow_failed = True
        raise Exception()
    DBOS.logger.info(f"... workflow with id {DBOS.workflow_id} completed")


def main(arguments: list[str]) -> None:
    DBOS().launch()

    while True:
        time.sleep(5)
        DBOS.logger.info('alive')


if __name__ == "__main__":
    try:
        main(sys.argv)
    except KeyboardInterrupt:
        DBOS.logger.warning('KeyboardInterrupt received')
    except Exception as e:
        DBOS.logger.error(traceback.format_exc())
    finally:
        DBOS.destroy()

Example log:

"C:\Users\miso\VirtualEnvironments\drag\Scripts\python.exe" "C:\Users\miso\Workspaces\drag\z_research\infinite_loop\infinite_loop.py" 
19:31:46 [    INFO] (dbos:_dbos.py:266) Initializing DBOS
19:31:48 [    INFO] (dbos:_dbos.py:394) DBOS launched
19:31:50 [    INFO] (dbos:infinite_loop.py:17) Workflow with id sched-print_log-2024-11-05T03:31:50+00:00 start ...
19:31:50 [   ERROR] (dbos:_core.py:236) Exception encountered in asynchronous workflow: Traceback (most recent call last):
  File "C:\Users\miso\VirtualEnvironments\drag\Lib\site-packages\dbos\_core.py", line 234, in _execute_workflow_wthread
    return _execute_workflow(dbos, status, func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\miso\VirtualEnvironments\drag\Lib\site-packages\dbos\_core.py", line 192, in _execute_workflow
    output = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\miso\Workspaces\drag\z_research\infinite_loop\infinite_loop.py", line 21, in print_log
    raise Exception()
Exception

19:31:53 [    INFO] (dbos:infinite_loop.py:30) alive
19:31:58 [    INFO] (dbos:infinite_loop.py:30) alive
19:32:00 [    INFO] (dbos:infinite_loop.py:17) Workflow with id sched-print_log-2024-11-05T03:32:00+00:00 start ...
19:32:00 [    INFO] (dbos:infinite_loop.py:22) ... workflow with id sched-print_log-2024-11-05T03:32:00+00:00 completed
19:32:03 [    INFO] (dbos:infinite_loop.py:30) alive
19:32:08 [    INFO] (dbos:infinite_loop.py:30) alive
19:32:10 [    INFO] (dbos:infinite_loop.py:17) Workflow with id sched-print_log-2024-11-05T03:32:10+00:00 start ...
19:32:10 [    INFO] (dbos:infinite_loop.py:22) ... workflow with id sched-print_log-2024-11-05T03:32:10+00:00 completed
19:32:12 [ WARNING] (dbos:infinite_loop.py:37) KeyboardInterrupt received

Process finished with exit code -1
@apoliakov
Copy link
Member

apoliakov commented Nov 5, 2024

Hi @chbussler, I think the expectation isn't quite right.

A @DBOS.workflow() that throws an exception does not get retried automatically. There may be some confusion between workflows and steps. If you have a @DBOS.step() function called from a workflow, then you can configure retries for just the step function. It would be something like this:

@DBOS.step(retries_allowed=True, max_attempts=3)
def step_foo(): 
  #... throw
  
@DBOS.workflow()
def wf_bar(): 
  #... 
  step_foo()

If you have code like the above then you can observe step_foo() retrying with a warning message printed to the app's log.

The design intent is to wrap individual actions (like external API calls) into separate steps. The workflow then proceeds to completion retrying steps only where appropriate.

Does this help?

@chbussler
Copy link
Contributor Author

Hi @apoliakov, thanks for the super-quick response, that's great!

  1. No confusion :-): pdf
  2. I created a setup based on your input and added a try, except around the step invocation; this proves that DBOS retries the step as specified in the step annotations; and in the workflow itself it is possible to catch the case that the retries are exhausted. This means if retrying fails, the try and catch has the next opportunity to deal with the failure. Great, thanks for outlining as it made me implement this case as code (added below)!
  3. However, my focus was not on step failures and re-execution, but on workflow failures and re-execution.

I need a bit more time to make my argument as the system isn't behaving according to my expectations based on the DBOS documentation. I'd like to create one or more use cases around workflows in code so that I can explain what I am trying to address. Please bear with me in this discussion and I'll get back to you.

Thank you!

import sys
import time
import traceback
from datetime import datetime

from dbos import DBOS

DBOS()


@DBOS.step(retries_allowed=True, max_attempts=2)
def step_foo():
    DBOS.logger.info(f"Step start ...")
    raise Exception()
    DBOS.logger.info(f"... step completed")


@DBOS.scheduled("*/30 * * * * *")
@DBOS.workflow()
def workflow_steps_exceptions(scheduled_time: datetime,
                              actual_time: datetime) -> int:
    DBOS.logger.info(f"Workflow with id {DBOS.workflow_id} start ...")

    try:
        step_foo()
    except Exception as e:
        DBOS.logger.error(f"Exception: {e}")

    DBOS.logger.info(f"... workflow with id {DBOS.workflow_id} completed")
    return 5


def main(arguments: list[str]) -> None:
    DBOS().launch()

    while True:
        time.sleep(5)
        DBOS.logger.info('alive')


if __name__ == "__main__":
    try:
        main(sys.argv)
    except KeyboardInterrupt:
        DBOS.logger.warning('KeyboardInterrupt received')
    except Exception as e:
        DBOS.logger.error(traceback.format_exc())
    finally:
        DBOS.destroy()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants