Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zaza utils #2

Merged
merged 8 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 249 additions & 0 deletions cou/zaza_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
# Copyright 2023 Canonical Limited.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Functions to support converting async function to a sync equivalent."""
import asyncio
import concurrent.futures
import inspect
import logging
import threading
import time
from pkgutil import extend_path
from sys import version_info

__path__ = extend_path(__path__, __name__)

# This flag is for testing, but can be used to control whether libjuju runs in
# a background thread or is simplified.
RUN_LIBJUJU_IN_THREAD = True

# Hold the libjuju thread so we can interact with it.
_libjuju_thread = None
_libjuju_loop = None
_libjuju_run = False

# Timeout for loop to close. This is set to 30 seconds. If there is a non
# async call in the async thread then it could stall the thread for more than
# 30 seconds (e.g. an errant subprocess call). This will cause a runtime error
# if the timeout is exceeded. This shouldn't normally be the case as there is
# only one 'start' and 'stop' of the thread during a zaza runtime
LOOP_CLOSE_TIMEOUT = 30.0


def get_or_create_libjuju_thread():
"""Get (or Create) the thread that libjuju asyncio is running in.

:returns: the thread that libjuju is running in.
:rtype: threading.Thread
"""
global _libjuju_thread, _libjuju_loop, _libjuju_run
if _libjuju_thread is None:
_libjuju_run = True
_libjuju_thread = threading.Thread(target=libjuju_thread_run)
_libjuju_thread.start()
# There's a race hazard for _libjuju_loop becoming available, so let's
# wait for that to happen.
now = time.time()
while True:
if _libjuju_loop is not None and _libjuju_loop.is_running():
break
time.sleep(0.01)
# allow 5 seconds for thead to start
if time.time() > now + 5.0:
raise RuntimeError("Async thread didn't start!")
# enable async subprocess calls in the libjuju thread to work
asyncio.get_child_watcher().attach_loop(_libjuju_loop)
return _libjuju_thread


def libjuju_thread_run():
"""Run the libjuju async thread.

The thread that contains the runtime for libjuju asyncio futures.

zaza runs libjuju in a background thread so that it can make progress as
needed with the model(s) that are connected. The sync functions run in the
foreground thread, and the asyncio libjuju functions run in the background
thread. `run_coroutine_threadsafe` is used to cross from sync to asyncio
code in the background thread to enable access to the libjuju.

Note: it's very important that libjuju objects are not updated in the sync
thread; it's advisable that they are copied into neutral objects and handed
back. e.g. always use unit_name, rather than handling a libjuju 'unit'
object in a sync function.
"""
global _libjuju_loop

async def looper():
global _libjuju_run
while _libjuju_run:
# short spinner to ensure that foreground tasks 'happen' so that
# background tasks can complete (e.g. during model disconnection).
# loop.run_forever() doesn't work as there is no foreground async
# task to make progress, and thus the background tasks do nothing.
await asyncio.sleep(0.1)

_libjuju_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_libjuju_loop)
try:
_libjuju_loop.run_until_complete(_libjuju_loop.create_task(looper()))
finally:
while True:
# issue #445 - asyncio.Task.all_tasks() deprecated in 3.7
if version_info.major == 3 and version_info.minor >= 7:
try:
tasklist = asyncio.all_tasks()
except RuntimeError:
# no running event loop
break
else:
tasklist = asyncio.Task.all_tasks()
pending_tasks = [p for p in tasklist if not p.done()]
if pending_tasks:
logging.info(
"async -> sync. cleaning up pending tasks: len: {}".format(len(pending_tasks))
)
for pending_task in pending_tasks:
pending_task.cancel()
try:
_libjuju_loop.run_until_complete(pending_task)
except asyncio.CancelledError:
pass
except Exception as e:
logging.error("A pending task caused an exception: {}".format(str(e)))
else:
break
_libjuju_loop.close()


def join_libjuju_thread():
"""Stop and cleanup the asyncio tasks on the loop, and then join it."""
global _libjuju_thread, _libjuju_loop, _libjuju_run
if _libjuju_thread is not None:
logging.debug("stopping the event loop")
# remove the child watcher (that was for subprocess calls) when
# dropping the thread.
asyncio.get_child_watcher().attach_loop(None)
_libjuju_run = False
# wait up to 30 seconds for loop to close.
now = time.time()
while not _libjuju_loop.is_closed():
logging.debug("Closing ...")
time.sleep(0.1)
if time.time() > now + LOOP_CLOSE_TIMEOUT:
raise RuntimeError(
"Exceeded {} seconds for loop to close".format(LOOP_CLOSE_TIMEOUT)
)
logging.debug("joining the loop")
_libjuju_thread.join(timeout=30.0)
if _libjuju_thread.is_alive():
logging.error("The thread didn't die")
raise RuntimeError("libjuju async thread didn't finish after 30seconds")
_libjuju_thread = None


def clean_up_libjuju_thread():
"""Clean up the libjuju thread and any models that are still running."""
global _libjuju_loop, _libjuju_run
if _libjuju_loop is not None:
# circular import; tricky to remove
from . import model

sync_wrapper(model.remove_models_memo)()
join_libjuju_thread()
_libjuju_run = False
_libjuju_loop = None


def sync_wrapper(f, timeout=None):
"""Convert the async function into one that runs in the async thread.

This is only to be called from sync code. It wraps the given async
co-routine in some sync logic that allows it to be injected into the async
libjuju thread. This is then waited until there is a result, in which case
the result is returned.

:param f: The async function that when called is a co-routine
e.g. `async def some_function(...)` then `some_function` should be
passed as `f`.
:type f: function
:param timeout: The timeout to apply, None for no timeout
:type timeout: Optional[float]
:returns: The de-async'd function
:rtype: function
"""

def _wrapper(*args, **kwargs):
global _libjuju_loop

async def _runner():
return await f(*args, **kwargs)

if not RUN_LIBJUJU_IN_THREAD:
# run it in this thread's event loop:
loop = asyncio.get_event_loop()
return loop.run_until_complete(_runner())

# ensure that the thread is created
get_or_create_libjuju_thread()
assert (
_libjuju_loop is not None and _libjuju_loop.is_running()
), "Background thread must be running by now, so this is a bug"

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(_runner(), _libjuju_loop)
try:
return future.result(timeout)
except concurrent.futures.TimeoutError:
logging.error("The coroutine took too long, cancelling the task...")
future.cancel()
raise

return _wrapper


def run(*steps):
"""Run the given steps in an asyncio loop.

Note: previous versions of this function would ensure that any other
futures spawned by any of the steps are cleaned up after each step is
performed. However, as the async thread runs continuously in a background
thread (unless RUN_LIBJUJU_IN_THREAD is not True), then these other tasks
are not cleaned up until the whole thread ends.

:param steps: List of async functions, coroutines or literals
:type steps: List[function]
:returns: The result of the last async function called
:rtype: Any
"""
if not steps:
return None

async def maybe_call(f):
if inspect.iscoroutine(f):
return await f
elif inspect.iscoroutinefunction(f):
return await f()
elif inspect.isfunction(f):
return f()
else:
return f

async def _runner():
for step in steps[:-1]:
await maybe_call(step)
return await maybe_call(steps[-1])

return sync_wrapper(_runner)()
Loading