Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Johny b/561 non context manager #600

Closed
wants to merge 14 commits into from
76 changes: 44 additions & 32 deletions yapapi/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def __init__(
self._services: Set[asyncio.Task] = set()
self._stack = AsyncExitStack()

# changed in _start/_stop methods
self._operative: bool = False

async def create_demand_builder(
self, expiration_time: datetime, payload: Payload
) -> DemandBuilder:
Expand Down Expand Up @@ -229,53 +232,65 @@ def subnet_tag(self) -> Optional[str]:
"""Return the name of the subnet used by this engine, or `None` if it is not set."""
return self._subnet

@property
def operative(self) -> bool:
return self._operative

def emit(self, event: events.Event) -> None:
"""Emit an event to be consumed by this engine's event consumer."""
if self._wrapped_consumer:
self._wrapped_consumer.async_call(event)

async def __aenter__(self) -> "_Engine":
"""Initialize resources and start background services used by this engine."""

try:
stack = self._stack
await self._start()
return self
except:
await self._stop(*sys.exc_info())
raise

await stack.enter_async_context(self._wrapped_consumer)
async def __aexit__(self, *exc_info) -> Optional[bool]:
return await self._stop(*exc_info)

def report_shutdown(*exc_info):
if any(item for item in exc_info):
self.emit(events.ShutdownFinished(exc_info=exc_info)) # noqa
else:
self.emit(events.ShutdownFinished())
async def _stop(self, *exc_info) -> Optional[bool]:
self._operative = False
return await self._stack.__aexit__(*exc_info)

stack.push(report_shutdown)
async def _start(self) -> None:
self._operative = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we set this to True at the end of the method? If any error occurs before the end of initialization we may end up with a Golem instance that is operative but not properly initialized.

Copy link
Contributor Author

@johny-b johny-b Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning was that we have to guard against more than one start running at the same time, as in:

golem = Golem(...)
loop.create_task(golem.start())
loop.create_task(golem.start())

(I don't think anyone will write exactly this but this might somehow happen in a more complex code, I guess?).

We could store two bits of information, start_started, start_ended.

But maybe we should renameoperative? It is now if fact started_not_stopped. Or maybe just remove a public property?

Copy link
Contributor

@azawlocki azawlocki Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johny-b You've raised a good point. Consider again your example:

golem = Golem(...)
loop.create_task(golem.start())   # 1
loop.create_task(golem.start())   # 2

Suppose golem.start() in line #1 is in progress and golem.start() in #2 starts. We don't want the second instance of golem._start() to run concurrently with the first one, and your current solution will prevent that. But we also don't want golem.start() in line #2 exiting immediately, as if golem were already properly started, while _start() is still in progress! This is even more important when we replace both golem.start()'s with golem.execute_tasks().

What we do want is for the second golem.start() to wait until the first one finishes. So we need some kind of task synchronisation mechanism, e.g. an asyncio.Lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good point. I need to think about this : )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0829514

Copy link
Contributor

@azawlocki azawlocki Aug 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johny-b Thanks. Now it probably works well, but the solution using asyncio.Futures is a bit hard to follow, at least for me.

Why not something more straightforward, with asyncio.Lock:

def __init__():
    self._started = False
    self._stopped = False
    self._lock = asyncio.Lock()

async def start():
    async with self._lock:
       if not self._started:
           await self._start()
           self._started = True

# Similarly for stop()

@property
def operative():
    return self._started and not self._stopped

EDIT: Perhaps stop() should differ from start() in that self._stopped = True goes before await self.stop(). And there's the complication with the exc_info tuple being passed around from __aexit__().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, if you put start() and stop() in _Engine instead of Golem, you could define _Engine's __aenter__() and __aexit__() in terms of start() and stop(). That would also protect us from concurrent executions of async with golem:... in two different tasks.

stack = self._stack

market_client = await stack.enter_async_context(self._api_config.market())
self._market_api = rest.Market(market_client)
await stack.enter_async_context(self._wrapped_consumer)

activity_client = await stack.enter_async_context(self._api_config.activity())
self._activity_api = rest.Activity(activity_client)
def report_shutdown(*exc_info):
if any(item for item in exc_info):
self.emit(events.ShutdownFinished(exc_info=exc_info)) # noqa
else:
self.emit(events.ShutdownFinished())

payment_client = await stack.enter_async_context(self._api_config.payment())
self._payment_api = rest.Payment(payment_client)
stack.push(report_shutdown)

self.payment_decorator = _Engine.PaymentDecorator(await self._create_allocations())
market_client = await stack.enter_async_context(self._api_config.market())
self._market_api = rest.Market(market_client)

# TODO: make the method starting the process_invoices() task an async context manager
# to simplify code in __aexit__()
loop = asyncio.get_event_loop()
self._process_invoices_job = loop.create_task(self._process_invoices())
self._services.add(self._process_invoices_job)
self._services.add(loop.create_task(self._process_debit_notes()))
activity_client = await stack.enter_async_context(self._api_config.activity())
self._activity_api = rest.Activity(activity_client)

self._storage_manager = await stack.enter_async_context(gftp.provider())
payment_client = await stack.enter_async_context(self._api_config.payment())
self._payment_api = rest.Payment(payment_client)

stack.push_async_exit(self._shutdown)
self.payment_decorator = _Engine.PaymentDecorator(await self._create_allocations())

return self
except:
await self.__aexit__(*sys.exc_info())
raise
# TODO: make the method starting the process_invoices() task an async context manager
# to simplify code in __aexit__()
loop = asyncio.get_event_loop()
self._process_invoices_job = loop.create_task(self._process_invoices())
self._services.add(self._process_invoices_job)
self._services.add(loop.create_task(self._process_debit_notes()))

self._storage_manager = await stack.enter_async_context(gftp.provider())

stack.push_async_exit(self._shutdown)

def _unpaid_agreement_ids(self) -> Set[AgreementId]:
"""Return the set of all yet unpaid agreement ids."""
Expand Down Expand Up @@ -340,9 +355,6 @@ async def _shutdown(self, *exc_info):
except Exception:
logger.debug("Got error when waiting for services to finish", exc_info=True)

async def __aexit__(self, *exc_info) -> Optional[bool]:
return await self._stack.__aexit__(*exc_info)

async def _create_allocations(self) -> rest.payment.MarketDecoration:

if not self._budget_allocations:
Expand Down
23 changes: 23 additions & 0 deletions yapapi/golem.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ class Golem(_Engine):
at any given time.
"""

async def start(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weren't these methods supposed to be implemented on the _Engine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. All logic is on _Engine.
  2. Those are public methods.
    --> I think this is 100% in line with the Twist discussion

"""Initialize resources and start background services used by this engine.

Calling this method is not necessary, it will be called either way internally
when the engine is used for the first time.
"""
if not self.operative:
await self._start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed that there's a difference between:

async with Golem(...) as golem:
   do_stuff_with(golem)

and

golem = Golem(...)
await golem.start()
do_stuff_with(golem)
await golem.stop()

in case an exception is raised in golem._start().

With context manager, golem._stop() will be called before re-raising the exception.

With the start()/stop() combo, the developer has to make sure that stop() is called when start() raises an exception. That's fine, if it's intentional and we document this clearly.

Copy link
Contributor Author

@johny-b johny-b Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if the developer has any code that ensures golem is stopped when an exception is raised in do_stuff_with(golem) it will also work for start?

E.g. our "recommended" way of doing things without context manager would be

golem = Golem(...)
try:
    do_stuff_with(golem)
finally:
    golem.stop()

(I skipped 'start` because it is optional and I don't think we need it in our examples)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johny-b As I said, that's fine for me. But let's remember about this when creating documentation for start()/stop().

Copy link
Contributor Author

@johny-b johny-b Aug 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll write the docs the way asyncio.Lock is documented, so
"the preferred way is"
"which is equivalent to"


async def stop(self) -> Optional[bool]:
"""Stop the engine in a graceful way.

This **must** be called when using Golem in a non-contextmanager way.
"""
if self.operative:
return await self._stop(None, None, None)
return None

async def execute_tasks(
self,
worker: Callable[
Expand Down Expand Up @@ -111,6 +129,8 @@ async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
print(completed.result.stdout)
```
"""
if not self.operative:
await self.start()

kwargs: Dict[str, Any] = {"payload": payload}
if max_workers:
Expand Down Expand Up @@ -203,6 +223,9 @@ async def main():
await asyncio.sleep(REFRESH_INTERVAL_SEC)
```
"""
if not self.operative:
await self.start()

payload = payload or await service_class.get_payload()

if not payload:
Expand Down