Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Can't invoke activity from WorkflowMethod #35

Open
tush4hworks opened this issue Feb 20, 2023 · 2 comments
Open

Can't invoke activity from WorkflowMethod #35

tush4hworks opened this issue Feb 20, 2023 · 2 comments

Comments

@tush4hworks
Copy link

tush4hworks commented Feb 20, 2023

import logging
import time

from cadence.activity_method import activity_method
from cadence.workerfactory import WorkerFactory
from cadence.workflow import workflow_method, Workflow, WorkflowClient, signal_method, query_method

logging.basicConfig(level=logging.DEBUG)

TASK_LIST = "SubsActivity-python-tasklistd3sasa4552asddasdsa1sda31"
DOMAIN = "sample2"
QUOTA = 5


class SubscriptionActivity:
    @activity_method(task_list=TASK_LIST)
    def cancel(self):
        return "Cancel"


class SubscriptionActivityImpl(SubscriptionActivity):

    def cancel(self):
        return "Cancel"


class SubscriptionWfInterface:
    @workflow_method(task_list=TASK_LIST)
    async def manage_subscription(self, name):
        raise NotImplementedError

    @signal_method()
    async def decrement(self):
        raise NotImplementedError

    @signal_method()
    async def renew_subscription(self):
        raise NotImplementedError

    @query_method()
    async def get_quota(self):
        raise NotImplementedError


class SubscriptionWf(SubscriptionWfInterface):
    def __init__(self):
        self.subscription_activities: SubscriptionActivity = Workflow.new_activity_stub(SubscriptionActivity)
        self.quota = QUOTA

    async def manage_subscription(self, name):
        self.name = name
        print(f"Started workflow for {self.name}")
        print(Workflow.get_workflow_id())

        await Workflow.await_till(lambda: self.quota <= 0)
        print("Condition met..")
        # return await self.subscription_activities.cancel()

    def renew_subscription(self):
        self.quota = QUOTA

    async def decrement(self):
        self.quota = self.quota - 1

    async def get_quota(self):
        return self.quota


if __name__ == '__main__':
    factory = WorkerFactory("localhost", 7933, DOMAIN)
    worker = factory.new_worker(TASK_LIST)
    worker.register_activities_implementation(SubscriptionActivityImpl(), "SubscriptionActivity")
    worker.register_workflow_implementation_type(SubscriptionWf)
    factory.start()

    client = WorkflowClient.new_client(domain=DOMAIN)
    subs_workflow: SubscriptionWf = client.new_workflow_stub(SubscriptionWfInterface)

    wf_execution = client.start(subs_workflow.manage_subscription, "tddd3wsdsasdassdaeqsdsaw")
    time.sleep(5)
    subs_workflow2: SubscriptionWf = client.new_workflow_stub_from_workflow_id(SubscriptionWfInterface,
                                                                               wf_execution.workflow_execution.workflow_id)
    subs_workflow2.decrement()
    time.sleep(5)
    subs_workflow2.get_quota()
    subs_workflow2.decrement()
    subs_workflow2.decrement()
    subs_workflow2.decrement()
    subs_workflow2.get_quota()
    subs_workflow2.decrement()
    subs_workflow2.get_quota()
    # subs_workflow2.decrement()
    # time.sleep(1)
    # worker.stop()
    # print("Workers stopped...")
    # sys.exit(0)

In the above dummy code , If I try to invoke return await self.subscription_activities.cancel(), that throws errors such as:

DEBUG:cadence.decision_loop:[signal-task-WorkflowExecution(workflow_id='3b1cd958-4af4-4f7b-9446-af7d489be482', run_id='e9c472e6-693c-4529-8265-e8c204a9a6e2')-SubscriptionWfInterface::decrement] Created
DEBUG:cadence.decision_loop:[signal-task-WorkflowExecution(workflow_id='3b1cd958-4af4-4f7b-9446-af7d489be482', run_id='e9c472e6-693c-4529-8265-e8c204a9a6e2')-SubscriptionWfInterface::decrement] Running
INFO:cadence.decision_loop:Invoking signal SubscriptionWfInterface::decrement()
INFO:cadence.decision_loop:Signal SubscriptionWfInterface::decrement() returned None
Started workflow for tddd3wsdsasdassdaeqsdsaw
3b1cd958-4af4-4f7b-9446-af7d489be482
Condition met..
DEBUG:cadence.decision_loop:RespondDecisionTaskCompleted: RespondDecisionTaskCompletedResponse(decision_task=None)
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved```

not sure what's happening here, I've tried multiple ways but still stuck,
@tush4hworks
Copy link
Author

I need to add logic , for example to update a DB entry, when it moves past Workflow.await , is there another way to do it without calling activity? If we decorate with @ActivityMethod , then I guess that it needs an await syntax to call that activity?

@tush4hworks
Copy link
Author

I could do something like this to get the behavior I wanted but I'm not using Activity Interface or @ActivityMethod anymore.
Since asyncio is single process- single thread, maybe if I call Workflow.await once and it exits, it somehow can't create_task on the same loop? This is just speculation, I only have a basic knowledge of asyncio .

import asyncio
import sys
import logging
import time

from cadence.activity_method import activity_method
from cadence.workerfactory import WorkerFactory
from cadence.workflow import workflow_method, Workflow, WorkflowClient, signal_method, query_method

logging.basicConfig(level=logging.DEBUG)

TASK_LIST = "SubsActivity-python-tasklistd3sasa455sadsa2sdaasddasdsdasa1sda31"
DOMAIN = "sample2"
QUOTA = 5


class SubscriptionActivity:

    def cancel(self, quota):
        if quota <= 0:
            print("Cancelled")
            return True
        return False


class SubscriptionWfInterface:
    @workflow_method(task_list=TASK_LIST)
    async def manage_subscription(self, name):
        raise NotImplementedError

    @signal_method()
    async def decrement(self):
        raise NotImplementedError

    @signal_method()
    async def renew_subscription(self):
        raise NotImplementedError

    @query_method()
    async def get_quota(self):
        raise NotImplementedError


class SubscriptionWf(SubscriptionWfInterface):
    def __init__(self):
        self.subscription_activities: SubscriptionActivity = Workflow.new_activity_stub(SubscriptionActivity)
        self.quota = QUOTA

    async def manage_subscription(self, name):
        self.name = name
        print(f"Started workflow for {self.name}:{self.quota}")
        print(Workflow.get_workflow_id())

        await Workflow.await_till(lambda: self.subscription_activities.cancel(self.quota))
        print("Condition met..")

    def renew_subscription(self):
        self.quota = QUOTA

    async def decrement(self):
        self.quota = self.quota - 1

    async def get_quota(self):
        return self.quota


if __name__ == '__main__':
    factory = WorkerFactory("localhost", 7933, DOMAIN)
    worker = factory.new_worker(TASK_LIST)
    worker.register_activities_implementation(SubscriptionActivity(), "SubscriptionActivity")
    worker.register_workflow_implementation_type(SubscriptionWf)
    factory.start()

    client = WorkflowClient.new_client(domain=DOMAIN)
    subs_workflow: SubscriptionWf = client.new_workflow_stub(SubscriptionWfInterface)

    wf_execution = client.start(subs_workflow.manage_subscription, "tddd3wssdadsasdassdaeqsdsaw")
    time.sleep(5)
    subs_workflow2: SubscriptionWf = client.new_workflow_stub_from_workflow_id(SubscriptionWfInterface,
                                                                               wf_execution.workflow_execution.workflow_id)
    subs_workflow2.decrement()
    time.sleep(5)
    subs_workflow2.get_quota()
    subs_workflow2.decrement()
    subs_workflow2.decrement()
    subs_workflow2.decrement()
    subs_workflow2.get_quota()
    subs_workflow2.decrement()
    subs_workflow2.get_quota()
    # subs_workflow2.decrement()
    # time.sleep(1)
    # worker.stop()
    # print("Workers stopped...")
    # sys.exit(0)

cc @firdaus

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant