Skip to content

Commit

Permalink
Application cleanup
Browse files Browse the repository at this point in the history
Imrpove application cleanup. This might need another pass
after #112 is merged

Issue #91 #92
  • Loading branch information
ovv committed Apr 27, 2018
1 parent 25ebf63 commit af98795
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 59 deletions.
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ multidict = ">=2.0"
pyquery = "*"
aiodns = "*"
websockets = "*"
async-timeout = "*"

[dev-packages]
twine = "*"
pytest-asyncio = "*"
"flake8" = "*"

[requires]
python_version = "3.6"
53 changes: 46 additions & 7 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions aiosip/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,12 @@ async def reply(*args, **kwargs):
method=msg.method,
from_details=Contact.from_header(msg.headers['To']),
to_details=Contact.from_header(msg.headers['From']),
call_id=call_id
call_id=call_id,
inbound=True
)

await dialog.reply(*args, **kwargs)
await dialog.close(fast=True)
await dialog.close()

try:
route = await self.dialplan.resolve(
Expand Down Expand Up @@ -226,7 +227,12 @@ def finish(self):
def register_on_finish(self, func, *args, **kwargs):
self._finish_callbacks.insert(0, (func, args, kwargs))

async def close(self):
async def close(self, timeout=5):
for dialog in set(self._dialogs.values()):
try:
await dialog.close(timeout=timeout)
except asyncio.TimeoutError:
pass
for connector in self._connectors.values():
await connector.close()
for task in self._tasks:
Expand Down
35 changes: 26 additions & 9 deletions aiosip/dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from collections import defaultdict
from multidict import CIMultiDict
from async_timeout import timeout as Timeout

from . import utils
from .message import Request, Response
Expand Down Expand Up @@ -151,10 +152,19 @@ def _maybe_close(self, msg):

def _close(self):
LOG.debug('Closing: %s', self)
if self._closing:
self._closing.cancel()

for transactions in self.transactions.values():
for transaction in transactions.values():
transaction.close()

# Should not be necessary once dialog are correctly tracked
try:
del self.app._dialogs[self.dialog_id]
except KeyError as e:
pass

def _connection_lost(self):
for transactions in self.transactions.values():
for transaction in transactions.values():
Expand Down Expand Up @@ -187,10 +197,11 @@ def end_transaction(self, transaction):
for item in to_delete:
del self.transactions[item[0]][item[1]]

async def request(self, method, contact_details=None, headers=None, payload=None):
async def request(self, method, contact_details=None, headers=None, payload=None, timeout=None):
msg = self._prepare_request(method, contact_details, headers, payload)
if msg.method != 'ACK':
return await self.start_unreliable_transaction(msg)
async with Timeout(timeout):
return await self.start_unreliable_transaction(msg)
else:
self.peer.send_message(msg)

Expand Down Expand Up @@ -274,15 +285,19 @@ async def refresh(self, headers=None, expires=1800, *args, **kwargs):
headers['Expires'] = int(expires)
return await self.request(self.original_msg.method, headers=headers, *args, **kwargs)

async def close(self, fast=False, headers=None, *args, **kwargs):
async def close(self, headers=None, *args, **kwargs):
if not self._closed:
self._closed = True
result = None
if not self.inbound and self.original_msg.method in ('REGISTER', 'SUBSCRIBE'):
headers = CIMultiDict(headers or {})
if 'Expires' not in headers:
headers['Expires'] = 0
result = await self.request(self.original_msg.method, headers=headers, *args, **kwargs)
try:
result = await self.request(self.original_msg.method, headers=headers, *args, **kwargs)
finally:
self._close()

self._close()
return result

Expand Down Expand Up @@ -409,7 +424,7 @@ def end_transaction(self, transaction):
for item in to_delete:
del self.transactions[item[0]][item[1]]

async def close(self):
async def close(self, timeout=None):
if not self._closed:
self._closed = True

Expand All @@ -422,9 +437,11 @@ async def close(self):
if msg:
transaction = UnreliableTransaction(self, original_msg=msg, loop=self.app.loop)
self.transactions[msg.method][msg.cseq] = transaction
await transaction.start()

self._close()
try:
async with Timeout(timeout):
await transaction.start()
finally:
self._close()

def _close(self):
pass
self._close()
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
'multidict>=2.0',
'pyquery',
'aiodns',
'websockets'
'websockets',
'async_timeout'
]

test_requirements = [
Expand Down
10 changes: 7 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio

import aiosip
import pytest

import asyncio
import itertools

pytest_plugins = ['aiosip.pytest_plugin']

Expand Down Expand Up @@ -119,3 +118,8 @@ def to_details(request):
@pytest.fixture
def loop(event_loop):
return event_loop


@pytest.fixture(params=itertools.permutations(('client', 'server')))
def close_order(request):
return request.param
34 changes: 25 additions & 9 deletions tests/test_sip_proxy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import aiosip
import pytest
import asyncio
import itertools


async def test_proxy_subscribe(test_server, test_proxy, protocol, loop, from_details, to_details):
@pytest.mark.parametrize('close_order', itertools.permutations(('client', 'server', 'proxy'))) # noQa C901: too complex
async def test_proxy_subscribe(test_server, test_proxy, protocol, loop, from_details, to_details, close_order):
callback_complete = loop.create_future()
callback_complete_proxy = loop.create_future()

Expand Down Expand Up @@ -59,12 +62,19 @@ async def proxy_subscribe(self, request, message):
assert received_request_server.payload == received_request_proxy.payload
assert received_request_server.headers == received_request_proxy.headers

await server_app.close()
await proxy_app.close()
await app.close()
for item in close_order:
if item == 'client':
await app.close()
elif item == 'server':
await server_app.close()
elif item == 'proxy':
await proxy_app.close()
else:
raise ValueError('Invalid close_order')


async def test_proxy_notify(test_server, test_proxy, protocol, loop, from_details, to_details): # noQa: C901
@pytest.mark.parametrize('close_order', itertools.permutations(('client', 'server', 'proxy'))) # noQa C901: too complex
async def test_proxy_notify(test_server, test_proxy, protocol, loop, from_details, to_details, close_order):
callback_complete = loop.create_future()
callback_complete_proxy = loop.create_future()

Expand Down Expand Up @@ -140,6 +150,12 @@ async def proxy_subscribe(self, request, message):
assert received_notify_server.payload == received_notify_proxy.payload
assert received_notify_server.headers == received_notify_proxy.headers

await server_app.close()
await proxy_app.close()
await app.close()
for item in close_order:
if item == 'client':
await app.close()
elif item == 'server':
await server_app.close()
elif item == 'proxy':
await proxy_app.close()
else:
raise ValueError('Invalid close_order')
Loading

0 comments on commit af98795

Please sign in to comment.