Skip to content

Commit

Permalink
Application cleanup
Browse files Browse the repository at this point in the history
Imrpove application cleanup. This might need another pass
after #112 is merged

Issue #91 #92
  • Loading branch information
ovv committed Apr 26, 2018
1 parent 850b6bf commit ddda818
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 59 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ multidict = ">=2.0"
pyquery = "*"
aiodns = "*"
websockets = "*"
async-timeout = "*"

[dev-packages]
twine = "*"
Expand Down
22 changes: 15 additions & 7 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions aiosip/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,12 @@ async def reply(*args, **kwargs):
method=msg.method,
from_details=Contact.from_header(msg.headers['To']),
to_details=Contact.from_header(msg.headers['From']),
call_id=call_id
call_id=call_id,
inbound=True
)

await dialog.reply(*args, **kwargs)
await dialog.close(fast=True)
await dialog.close()

try:
route = await self.dialplan.resolve(
Expand Down Expand Up @@ -226,7 +227,12 @@ def finish(self):
def register_on_finish(self, func, *args, **kwargs):
self._finish_callbacks.insert(0, (func, args, kwargs))

async def close(self):
async def close(self, timeout=5):
for dialog in set(self._dialogs.values()):
try:
await dialog.close(timeout=timeout)
except asyncio.TimeoutError:
pass
for connector in self._connectors.values():
await connector.close()
for task in self._tasks:
Expand Down
35 changes: 26 additions & 9 deletions aiosip/dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from collections import defaultdict
from multidict import CIMultiDict
from async_timeout import timeout as Timeout

from . import utils
from .message import Request, Response
Expand Down Expand Up @@ -151,10 +152,19 @@ def _maybe_close(self, msg):

def _close(self):
LOG.debug('Closing: %s', self)
if self._closing:
self._closing.cancel()

for transactions in self.transactions.values():
for transaction in transactions.values():
transaction.close()

# Should not be necessary once dialog are correctly tracked
try:
del self.app._dialogs[self.dialog_id]
except KeyError as e:
pass

def _connection_lost(self):
for transactions in self.transactions.values():
for transaction in transactions.values():
Expand Down Expand Up @@ -187,10 +197,11 @@ def end_transaction(self, transaction):
for item in to_delete:
del self.transactions[item[0]][item[1]]

async def request(self, method, contact_details=None, headers=None, payload=None):
async def request(self, method, contact_details=None, headers=None, payload=None, timeout=None):
msg = self._prepare_request(method, contact_details, headers, payload)
if msg.method != 'ACK':
return await self.start_unreliable_transaction(msg)
async with Timeout(timeout):
return await self.start_unreliable_transaction(msg)
else:
self.peer.send_message(msg)

Expand Down Expand Up @@ -274,15 +285,19 @@ async def refresh(self, headers=None, expires=1800, *args, **kwargs):
headers['Expires'] = int(expires)
return await self.request(self.original_msg.method, headers=headers, *args, **kwargs)

async def close(self, fast=False, headers=None, *args, **kwargs):
async def close(self, headers=None, *args, **kwargs):
if not self._closed:
self._closed = True
result = None
if not self.inbound and self.original_msg.method in ('REGISTER', 'SUBSCRIBE'):
headers = CIMultiDict(headers or {})
if 'Expires' not in headers:
headers['Expires'] = 0
result = await self.request(self.original_msg.method, headers=headers, *args, **kwargs)
try:
result = await self.request(self.original_msg.method, headers=headers, *args, **kwargs)
finally:
self._close()

self._close()
return result

Expand Down Expand Up @@ -409,7 +424,7 @@ def end_transaction(self, transaction):
for item in to_delete:
del self.transactions[item[0]][item[1]]

async def close(self):
async def close(self, timeout=None):
if not self._closed:
self._closed = True

Expand All @@ -422,9 +437,11 @@ async def close(self):
if msg:
transaction = UnreliableTransaction(self, original_msg=msg, loop=self.app.loop)
self.transactions[msg.method][msg.cseq] = transaction
await transaction.start()

self._close()
try:
async with Timeout(timeout):
await transaction.start()
finally:
self._close()

def _close(self):
pass
self._close()
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
'multidict>=2.0',
'pyquery',
'aiodns',
'websockets'
'websockets',
'async_timeout'
]

test_requirements = [
Expand Down
10 changes: 7 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio

import aiosip
import pytest

import asyncio
import itertools

pytest_plugins = ['aiosip.pytest_plugin']

Expand Down Expand Up @@ -119,3 +118,8 @@ def to_details(request):
@pytest.fixture
def loop(event_loop):
return event_loop


@pytest.fixture(params=itertools.permutations(('client', 'server')))
def close_order(request):
return request.param
34 changes: 25 additions & 9 deletions tests/test_sip_proxy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import aiosip
import pytest
import asyncio
import itertools


async def test_proxy_subscribe(test_server, test_proxy, protocol, loop, from_details, to_details):
@pytest.mark.parametrize('close_order', itertools.permutations(('client', 'server', 'proxy')))
async def test_proxy_subscribe(test_server, test_proxy, protocol, loop, from_details, to_details, close_order):
callback_complete = loop.create_future()
callback_complete_proxy = loop.create_future()

Expand Down Expand Up @@ -59,12 +62,19 @@ async def proxy_subscribe(self, request, message):
assert received_request_server.payload == received_request_proxy.payload
assert received_request_server.headers == received_request_proxy.headers

await server_app.close()
await proxy_app.close()
await app.close()
for item in close_order:
if item == 'client':
await app.close()
elif item == 'server':
await server_app.close()
elif item == 'proxy':
await proxy_app.close()
else:
raise ValueError('Invalid close_order')


async def test_proxy_notify(test_server, test_proxy, protocol, loop, from_details, to_details): # noQa: C901
@pytest.mark.parametrize('close_order', itertools.permutations(('client', 'server', 'proxy')))
async def test_proxy_notify(test_server, test_proxy, protocol, loop, from_details, to_details, close_order):
callback_complete = loop.create_future()
callback_complete_proxy = loop.create_future()

Expand Down Expand Up @@ -140,6 +150,12 @@ async def proxy_subscribe(self, request, message):
assert received_notify_server.payload == received_notify_proxy.payload
assert received_notify_server.headers == received_notify_proxy.headers

await server_app.close()
await proxy_app.close()
await app.close()
for item in close_order:
if item == 'client':
await app.close()
elif item == 'server':
await server_app.close()
elif item == 'proxy':
await proxy_app.close()
else:
raise ValueError('Invalid close_order')
66 changes: 48 additions & 18 deletions tests/test_sip_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import pytest
import asyncio

from async_timeout import timeout

async def test_notify(test_server, protocol, loop, from_details, to_details):

async def test_notify(test_server, protocol, loop, from_details, to_details, close_order):
notify_list = [0, 1, 2, 3, 4]
subscribe_future = loop.create_future()

Expand All @@ -14,13 +16,19 @@ async def resolve(self, *args, **kwargs):
return self.subscribe

async def subscribe(self, request, msg):
dialog = await request.prepare(status_code=200)
expires = int(msg.headers['Expires'])
dialog = await request.prepare(status_code=200, headers={'Expires': expires})
await asyncio.sleep(0.1)

for i in notify_list:
await dialog.notify(payload=str(i))
subscribe_future.set_result(None)

async for msg in dialog:
if msg.method == 'SUBSCRIBE':
expires = int(msg.headers['Expires'])
await dialog.reply(msg, status_code=200, headers={'Expires': expires})

app = aiosip.Application(loop=loop)
server_app = aiosip.Application(loop=loop, dialplan=Dialplan())
server = await test_server(server_app)
Expand All @@ -43,11 +51,15 @@ async def subscribe(self, request, msg):

await subscribe_future

await server_app.close()
await app.close()
if close_order[0] == 'client':
await app.close()
await server_app.close()
else:
await server_app.close()
await app.close()


async def test_authentication(test_server, protocol, loop, from_details, to_details):
async def test_authentication(test_server, protocol, loop, from_details, to_details, close_order):
password = 'abcdefg'
received_messages = list()

Expand All @@ -66,8 +78,10 @@ async def subscribe(self, request, message):

async for message in dialog:
received_messages.append(message)
assert dialog.validate_auth(message, password)
await dialog.reply(message, 200)
if dialog.validate_auth(message, password):
await dialog.reply(message, 200)
else:
await dialog.unauthorized(message)

app = aiosip.Application(loop=loop)
server_app = aiosip.Application(loop=loop, dialplan=Dialplan())
Expand All @@ -88,11 +102,15 @@ async def subscribe(self, request, message):
assert len(received_messages) == 2
assert 'Authorization' in received_messages[1].headers

await server_app.close()
await app.close()
if close_order[0] == 'client':
await app.close()
await server_app.close()
else:
await server_app.close()
await app.close()


async def test_authentication_rejection(test_server, protocol, loop, from_details, to_details):
async def test_authentication_rejection(test_server, protocol, loop, from_details, to_details, close_order):
received_messages = list()

class Dialplan(aiosip.BaseDialplan):
Expand Down Expand Up @@ -135,11 +153,15 @@ async def subscribe(self, request, message):
assert len(received_messages) == 2
assert result.status_code == 401

await server_app.close()
await app.close()
if close_order[0] == 'client':
await app.close()
await server_app.close()
else:
await server_app.close()
await app.close()


async def test_invite(test_server, protocol, loop, from_details, to_details):
async def test_invite(test_server, protocol, loop, from_details, to_details, close_order):
call_established = loop.create_future()
call_disconnected = loop.create_future()

Expand Down Expand Up @@ -189,11 +211,15 @@ async def invite(self, request, message):

assert responses == [100, 180, 200]

await app.close()
await server_app.close()
if close_order[0] == 'client':
await app.close()
await server_app.close()
else:
await server_app.close()
await app.close()


async def test_cancel(test_server, protocol, loop, from_details, to_details):
async def test_cancel(test_server, protocol, loop, from_details, to_details, close_order):
cancel_future = loop.create_future()

class Dialplan(aiosip.BaseDialplan):
Expand Down Expand Up @@ -232,5 +258,9 @@ async def cancel(self, request, message):
result = await cancel_future
assert result.method == 'CANCEL'

await app.close()
await server_app.close()
if close_order[0] == 'client':
await app.close()
await server_app.close()
else:
await server_app.close()
await app.close()
Loading

0 comments on commit ddda818

Please sign in to comment.