Skip to content

Commit

Permalink
Merge pull request #580 from cderici/streamline-asyncio-stuff
Browse files Browse the repository at this point in the history
#580

### Description

A bunch of background noise is happening in the concurrent asyncio events resulting in repeated error logs to build up, despite that nothing in the foreground is failing. These are often:

1. Actual exceptions that are ignored in the main task/thread.
2. Error logs bleeding from other libraries (potentially when trying to handle the ignored exceptions in (1)
(b/c `asyncio` manages their events as well).
3. Tasks that are not gathered properly at teardown.

This PR attempts to streamline all the tasks and events that are happening on our `connection.py` and make sure we're:
* not producing any background exceptions,
* not producing any unnecessary/useless/excess error logs, and 
* tearing down properly.

Hopefully will fix #576 

Jira card [#138](https://warthogs.atlassian.net/browse/JUJU-138)

### QA Steps

In the CI test runs (especially in the ones with failed tests), we should only see the pass info and exception stacktraces that are relevant to the test logic (i.e. there shouldn't be any asyncio related errors in there).

### Notes & Discussion 

* This first commit ensures the `Pinger` facade is available when the `_pinger_task` is created, by moving the `_pinger_task` creation after `_build_facade` calls.

* Also disables the `_pinger_task` completely for `debug_log` connections, because debug_log connections are not doing any RPCs, they directly connect through the WebSocketClientProtocol, so we're not getting any facades from the apiserver (like we do in the case of `_connect_with_login` etc)

* A relevant bug here is https://bugs.python.org/issue36709 : which is about `asyncio.sslproto.SSLProtocol` raising an error whenever the event_loop is closed during an alive websocket connection. So in our `client/connection.py` we avoid keeping the connection alive (by removing `ping_interval=None`).

* The websocket object (_ws) is tightly coupled with the asyncio-managed tasks (e.g. Pinger) as well as the Monitor, so interacting with it directly (e.g. doing conn._ws.close() instead of conn.close()) pushes things out of sync/control. So it shouldn't be used directly (e.g. avoid `conn.ws.close()`, instead use `conn.close()`)

* Functions such as `model.get_controller` returns a newly constructed Controller object that is already connected with the model's own connection parameters. This puts the responsibility of closing out the connection (and destroying the spawned asyncio tasks etc) to the caller. If not done, everyone (asyncio, websockets etc) yells at pylibjuju about lingering tasks and open connections. So we introduce a `ConnectedController` object that when it's used with the `async with` form automatically disconnects after done.

* asyncio loop event_handler is only called on Task exceptions when the Task object is cleared from the memory. But the garbabe collector doesn't clear the Task if we keep a reference for it (e.g. putting it in a neverending Task ---like Pinger) until the very end. So installing our own handler makes sure that the exceptions are retrieved and properly handled and logged regardless of when the Task is being destroyed.
  • Loading branch information
jujubot authored Nov 19, 2021
2 parents 5e54f0b + b045038 commit 43becc4
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 63 deletions.
1 change: 1 addition & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Next Release
^^^^^^^^^^^^

* Legacy "services" for describing "applications" within bundles are no longer supported. "applications" can be used as a direct replacement for "services" in bundles.yaml.
* The websocket (ws) in a Connection object became a read-only property.

2.9.4
^^^^^
Expand Down
63 changes: 31 additions & 32 deletions juju/client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def status(self):
return self.DISCONNECTED

# connection cleanly disconnected or not yet opened
if not connection.ws:
if not connection._ws:
return self.DISCONNECTED

# close called but not yet complete
Expand All @@ -199,7 +199,7 @@ def status(self):
else:
stopped = connection._receiver_task.cancelled()

if stopped or not connection.ws.open:
if stopped or not connection._ws.open:
return self.ERROR

# everything is fine!
Expand Down Expand Up @@ -287,7 +287,7 @@ async def connect(
# _connect_with_redirect method, but create them here
# as a reminder that they will exist.
self.addr = None
self.ws = None
self._ws = None
self.endpoint = None
self.endpoints = None
self.cacert = None
Expand Down Expand Up @@ -340,6 +340,11 @@ async def connect(
raise lastError
raise Exception("Unable to connect to websocket")

@property
def ws(self):
log.warning('Direct access to the websocket object may cause disruptions in asyncio event handling.')
return self._ws

@property
def username(self):
if not self.usertag:
Expand Down Expand Up @@ -389,11 +394,10 @@ async def _open(self, endpoint, cacert):
max_size=self.max_frame_size,
server_hostname=server_hostname,
sock=sock,
ping_interval=None
)), url, endpoint, cacert

async def close(self, to_reconnect=False):
if not self.ws:
if not self._ws:
return
self.monitor.close_called.set()

Expand All @@ -411,26 +415,20 @@ async def close(self, to_reconnect=False):
# Allow a second for tasks to be cancelled
await jasyncio.sleep(1)

if self.ws and not self.ws.closed:
ws_close_task = jasyncio.create_task(self.ws.close())
done, pending = await jasyncio.wait([ws_close_task])

assert ws_close_task in done

# close_task.exception() is None means that close_task
# (ws.close()) actually completed without any errors
assert ws_close_task.exception() is None, 'the websocket is unable to close properly, try making a new connection from scratch'
# proof that the errors we see in the output dont belong
# to us, but belongs to websockets library
self.ws = None
if self._ws and not self._ws.closed:
await self._ws.close()
self._ws = None

if self.proxy is not None:
self.proxy.close()

async def _recv(self, request_id):
if not self.is_open:
raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
return await self.messages.get(request_id)
try:
return await self.messages.get(request_id)
except GeneratorExit:
return {}

def _close_debug_log_target(self):
if self.debug_log_target is not sys.stdout:
Expand All @@ -440,7 +438,7 @@ async def _debug_logger(self):
try:
while self.is_open:
result = await utils.run_with_interrupt(
self.ws.recv(),
self._ws.recv(),
self.monitor.close_called)
if self.monitor.close_called.is_set():
break
Expand Down Expand Up @@ -476,8 +474,9 @@ async def _receiver(self):
try:
while self.is_open:
result = await utils.run_with_interrupt(
self.ws.recv(),
self.monitor.close_called)
self._ws.recv(),
self.monitor.close_called,
log=log)
if self.monitor.close_called.is_set():
break
if result is not None:
Expand Down Expand Up @@ -518,7 +517,8 @@ async def _do_ping():
while True:
await utils.run_with_interrupt(
_do_ping(),
self.monitor.close_called)
self.monitor.close_called,
log=log)
if self.monitor.close_called.is_set():
break
await jasyncio.sleep(10)
Expand Down Expand Up @@ -553,7 +553,7 @@ async def rpc(self, msg, encoder=None):
raise websockets.exceptions.ConnectionClosed(
0, 'websocket closed')
try:
await self.ws.send(outgoing)
await self._ws.send(outgoing)
break
except websockets.ConnectionClosed:
if attempt == 2:
Expand Down Expand Up @@ -690,11 +690,15 @@ async def reconnect(self):
async with monitor.reconnecting:
await self.close(to_reconnect=True)
connector = self._connect if self.is_debug_log_connection else self._connect_with_login
await connector(
res = await connector(
[(self.endpoint, self.cacert)]
if not self.endpoints else
self.endpoints
)
if connector is self._connect_with_login:
self._build_facades(res.get('facades', {}))
if not self._pinger_task:
self._pinger_task = jasyncio.create_task(self._pinger())

async def _connect(self, endpoints):
if len(endpoints) == 0:
Expand Down Expand Up @@ -737,7 +741,7 @@ async def _try_endpoint(endpoint, cacert, delay):
break
for task in tasks:
task.cancel()
self.ws = result[0]
self._ws = result[0]
self.addr = result[1]
self.endpoint = result[2]
self.cacert = result[3]
Expand All @@ -752,11 +756,6 @@ async def _try_endpoint(endpoint, cacert, delay):
elif not self.is_debug_log_connection and not self._receiver_task:
self._receiver_task = jasyncio.create_task(self._receiver())

# In any type of connection, if we don't have a _pinger_task
# yet, then schedule a new one
if not self._pinger_task:
self._pinger_task = jasyncio.create_task(self._pinger())

log.debug("Driver connected to juju %s", self.addr)
self.monitor.close_called.clear()

Expand Down Expand Up @@ -799,8 +798,6 @@ async def _connect_with_login(self, endpoints):
finally:
if not success:
await self.close()
else:
self._pinger_task = jasyncio.create_task(self._pinger())

async def _connect_with_redirect(self, endpoints):
try:
Expand All @@ -811,6 +808,8 @@ async def _connect_with_redirect(self, endpoints):
raise
login_result = await self._connect_with_login(e.endpoints)
self._build_facades(login_result.get('facades', {}))
if not self._pinger_task:
self._pinger_task = jasyncio.create_task(self._pinger())

def _build_facades(self, facades):
self.facades.clear()
Expand Down
2 changes: 2 additions & 0 deletions juju/client/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ async def connect(self, **kwargs):
assert self._connection
self._log_connection = await Connection.connect(**kwargs)
else:
if self._connection:
await self._connection.close()
self._connection = await Connection.connect(**kwargs)

async def disconnect(self):
Expand Down
24 changes: 24 additions & 0 deletions juju/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,3 +855,27 @@ async def _watcher(stop_event):
log.debug('Starting watcher task for model summaries')
jasyncio.ensure_future(_watcher(stop_event))
return stop_event


class ConnectedController(Controller):
def __init__(
self,
connection,
max_frame_size=None,
bakery_client=None,
jujudata=None,
):
super().__init__(
max_frame_size=max_frame_size,
bakery_client=bakery_client,
jujudata=jujudata)
self._conn = connection

async def __aenter__(self):
kwargs = self._conn.connect_params()
kwargs.pop('uuid')
await self._connect_direct(**kwargs)
return self

async def __aexit__(self, exc_type, exc, tb):
await self.disconnect()
35 changes: 35 additions & 0 deletions juju/jasyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import asyncio
import signal
import functools
import websockets

from asyncio import Event, TimeoutError, Queue, ensure_future, \
gather, sleep, wait_for, create_subprocess_exec, subprocess, \
Expand All @@ -44,6 +46,39 @@ def create_task(coro):
return asyncio.ensure_future(coro)


def create_task_with_handler(coro, task_name, logger):
"""Wrapper around "asyncio.create_task" to make sure the task
exceptions are handled properly.
asyncio loop event_handler is only called on task exceptions when
the Task object is cleared from memory. But the GC doesn't clear
the Task if we keep a reference for it (e.g. _pinger_task in
connection.py) until the very end.
This makes sure the exceptions are retrieved and properly
handled/logged whenever the Task is destroyed.
"""
def _task_result_exp_handler(task, task_name=task_name, logger=logger):
try:
task.result()
except CancelledError:
pass
except websockets.exceptions.ConnectionClosed:
return
except Exception as e:
# This really is an arbitrary exception we need to catch
#
# No need to re-raise, though, because after this point
# the only thing that can catch this is asyncio loop base
# event_handler, which won't do anything but yell 'Task
# exception was never retrieved' anyways.
logger.exception("Task %s raised an exception: %s" % (task_name, e))

task = create_task(coro)
task.add_done_callback(functools.partial(_task_result_exp_handler, task_name=task_name, logger=logger))
return task


def run(*steps):
"""
Helper to run one or more async functions synchronously, with graceful
Expand Down
28 changes: 14 additions & 14 deletions juju/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from .client.client import ConfigValue, Value
from .client.overrides import Caveat, Macaroon
from .constraints import parse as parse_constraints
from .controller import Controller
from .controller import Controller, ConnectedController
from .delta import get_entity_class, get_entity_delta
from .errors import JujuAPIError, JujuError, JujuModelConfigError, JujuBackupError
from .errors import JujuAppError, JujuUnitError, JujuAgentError, JujuMachineError
Expand Down Expand Up @@ -1344,8 +1344,8 @@ async def add_relation(self, relation1, relation2):
raise JujuError("cannot add relation to {}: remote endpoints not supported".format(remote_endpoint.string()))

if remote_endpoint.has_empty_source():
current = await self.get_controller()
remote_endpoint.source = current.controller_name
async with ConnectedController(self.connection()) as current:
remote_endpoint.source = current.controller_name
# consume the remote endpoint
await self.consume(remote_endpoint.string(),
application_alias=remote_endpoint.application,
Expand Down Expand Up @@ -2401,18 +2401,18 @@ async def create_offer(self, endpoint, offer_name=None, application_name=None):
@param endpoint: holds the application and endpoint you want to offer
@param offer_name: over ride the offer name to help the consumer
"""
controller = await self.get_controller()
return await controller.create_offer(self.info.uuid, endpoint,
offer_name=offer_name,
application_name=application_name)
async with ConnectedController(self.connection()) as controller:
return await controller.create_offer(self.info.uuid, endpoint,
offer_name=offer_name,
application_name=application_name)

async def list_offers(self):
"""
Offers list information about applications' endpoints that have been
shared and who is connected.
"""
controller = await self.get_controller()
return await controller.list_offers(self.info.name)
async with ConnectedController(self.connection()) as controller:
return await controller.list_offers(self.info.name)

async def remove_offer(self, endpoint, force=False):
"""
Expand All @@ -2421,8 +2421,8 @@ async def remove_offer(self, endpoint, force=False):
Offers will also remove relations to those offers, use force to do
so, without an error.
"""
controller = await self.get_controller()
return await controller.remove_offer(self.info.uuid, endpoint, force)
async with ConnectedController(self.connection()) as controller:
return await controller.remove_offer(self.info.uuid, endpoint, force)

async def consume(self, endpoint, application_alias="", controller_name=None, controller=None):
"""
Expand Down Expand Up @@ -2520,9 +2520,9 @@ async def export_bundle(self, filename=None):
async def _get_source_api(self, url, controller_name=None):
controller = Controller()
if url.has_empty_source():
current = await self.get_controller()
if current.controller_name is not None:
controller_name = current.controller_name
async with ConnectedController(self.connection()) as current:
if current.controller_name is not None:
controller_name = current.controller_name
await controller.connect(controller_name=controller_name)
return controller

Expand Down
4 changes: 2 additions & 2 deletions juju/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async def wait_for_bundle(model, bundle, **kwargs):
await model.wait_for_idle(apps, **kwargs)


async def run_with_interrupt(task, *events):
async def run_with_interrupt(task, *events, log=None):
"""
Awaits a task while allowing it to be interrupted by one or more
`asyncio.Event`s.
Expand All @@ -156,7 +156,7 @@ async def run_with_interrupt(task, *events):
:param events: One or more `asyncio.Event`s which, if set, will interrupt
`task` and cause it to be cancelled.
"""
task = jasyncio.ensure_future(task)
task = jasyncio.create_task_with_handler(task, 'tmp', log)
event_tasks = [jasyncio.ensure_future(event.wait()) for event in events]
done, pending = await jasyncio.wait([task] + event_tasks,
return_when=jasyncio.FIRST_COMPLETED)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def test_monitor_catches_error(event_loop):
# grab the reconnect lock to prevent automatic
# reconnecting during the test
async with conn.monitor.reconnecting:
await conn.ws.close() # this could be racy with reconnect
await conn._ws.close() # this could be racy with reconnect
# if auto-reconnect is not disabled by lock, force this
# test to fail by deferring to the reconnect task via sleep
await jasyncio.sleep(0.1)
Expand Down Expand Up @@ -78,7 +78,7 @@ async def test_reconnect(event_loop):
try:
await jasyncio.sleep(0.1)
assert conn.is_open
await conn.ws.close()
await conn._ws.close()
assert not conn.is_open
await model.block_until(lambda: conn.is_open, timeout=3)
finally:
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ async def test_reset_user_password(event_loop):
pass
finally:
# No connection with old password
assert new_connection is None
if new_connection:
await new_connection.close()
raise AssertionError()


@base.bootstrapped
Expand Down
Loading

0 comments on commit 43becc4

Please sign in to comment.