diff --git a/aiosip/application.py b/aiosip/application.py index 3a37477..de2a184 100644 --- a/aiosip/application.py +++ b/aiosip/application.py @@ -35,7 +35,6 @@ class Application(MutableMapping): def __init__(self, *, loop=None, - dialog_factory=Dialog, middleware=(), defaults=None, debug=False, @@ -63,7 +62,6 @@ def __init__(self, *, self._tasks = list() self.dialplan = dialplan - self.dialog_factory = dialog_factory self.loop = loop @property @@ -107,14 +105,16 @@ def __init__(self): self.app = app self.dialog = None - def _create_dialog(self): + def _create_dialog(self, dialog_factory=Dialog, **kwargs): if not self.dialog: self.dialog = peer._create_dialog( method=msg.method, from_details=Contact.from_header(msg.headers['To']), to_details=Contact.from_header(msg.headers['From']), call_id=call_id, - inbound=True + inbound=True, + dialog_factory=dialog_factory, + **kwargs ) return self.dialog @@ -133,11 +133,20 @@ async def prepare(self, status_code, *args, **kwargs): async def _dispatch(self, protocol, msg, addr): call_id = msg.headers['Call-ID'] - dialog = self._dialogs.get(frozenset((msg.to_details.details, - msg.from_details.details, - call_id))) + dialog = None - if dialog: + # First incoming request of dialogs do not yet have a tag in to headers + if 'tag' in msg.to_details['params']: + dialog = self._dialogs.get(frozenset((msg.to_details['params']['tag'], + msg.from_details['params']['tag'], + call_id))) + + # First response of dialogs have a tag in the to header but the dialog is not + # yet aware of it. Try to match only with the from header tag + if dialog is None: + dialog = self._dialogs.get(frozenset((None, msg.from_details['params']['tag'], call_id))) + + if dialog is not None: await dialog.receive_message(msg) return @@ -145,6 +154,7 @@ async def _dispatch(self, protocol, msg, addr): # got a response without an associated message (likely a stale # retransmission, drop it) if isinstance(msg, Response) or msg.method == 'ACK': + LOG.debug('Discarding incoming message: %s', msg) return await self._run_dialplan(protocol, msg) diff --git a/aiosip/dialog.py b/aiosip/dialog.py index 84a43dd..ea9c1fa 100644 --- a/aiosip/dialog.py +++ b/aiosip/dialog.py @@ -1,16 +1,15 @@ -import asyncio import enum +import asyncio import logging -from collections import defaultdict from multidict import CIMultiDict +from collections import defaultdict from async_timeout import timeout as Timeout from . import utils -from .message import Request, Response -from .transaction import UnreliableTransaction, ProxyTransaction - from .auth import Auth +from .message import Request, Response +from .transaction import UnreliableTransaction LOG = logging.getLogger(__name__) @@ -58,11 +57,17 @@ def __init__(self, @property def dialog_id(self): - return frozenset((self.original_msg.to_details.details, - self.original_msg.from_details.details, + return frozenset((self.original_msg.to_details['params'].get('tag'), + self.original_msg.from_details['params']['tag'], self.call_id)) def _receive_response(self, msg): + + if 'tag' not in self.to_details['params']: + del self.app._dialogs[self.dialog_id] + self.to_details['params']['tag'] = msg.to_details['params']['tag'] + self.app._dialogs[self.dialog_id] = self + try: transaction = self.transactions[msg.method][msg.cseq] transaction._incoming(msg) @@ -74,7 +79,7 @@ def _receive_response(self, msg): LOG.debug('Response without Request. The Transaction may already be closed. \n%s', msg) def _prepare_request(self, method, contact_details=None, headers=None, payload=None, cseq=None, to_details=None): - self.from_details.add_tag() + if not cseq: self.cseq += 1 @@ -175,17 +180,6 @@ async def start_unreliable_transaction(self, msg, method=None): self.transactions[method or msg.method][msg.cseq] = transaction return await transaction.start() - async def start_proxy_transaction(self, msg, timeout=5): - if msg.cseq not in self.transactions[msg.method]: - transaction = ProxyTransaction(dialog=self, original_msg=msg, loop=self.app.loop, timeout=timeout) - self.transactions[msg.method][msg.cseq] = transaction - async for response in transaction.start(): - yield response - else: - LOG.debug('Message already transmitted: %s %s, %s', msg.cseq, msg.method, msg.headers['Call-ID']) - self.transactions[msg.method][msg.cseq].retransmit() - return - def end_transaction(self, transaction): to_delete = list() for method, values in self.transactions.items(): @@ -211,7 +205,6 @@ async def reply(self, request, status_code, status_message=None, payload=None, h def _prepare_response(self, request, status_code, status_message=None, payload=None, headers=None, contact_details=None): - self.from_details.add_tag() if contact_details: self.contact_details = contact_details @@ -273,9 +266,6 @@ async def receive_message(self, msg): return await self._receive_request(msg) async def _receive_request(self, msg): - if 'tag' in msg.from_details['params']: - self.to_details['params']['tag'] = msg.from_details['params']['tag'] - await self._incoming.put(msg) self._maybe_close(msg) @@ -325,13 +315,25 @@ async def recv(self): class InviteDialog(DialogBase): def __init__(self, *args, **kwargs): - super().__init__(method="INVITE", *args, **kwargs) + + if 'method' not in kwargs: + kwargs['method'] = 'INVITE' + elif kwargs['method'] != 'INVITE': + raise ValueError('method must be INVITE') + + super().__init__(*args, **kwargs) self._queue = asyncio.Queue() self._state = CallState.Calling self._waiter = asyncio.Future() async def receive_message(self, msg): # noqa: C901 + + if 'tag' not in self.to_details['params']: + del self.app._dialogs[self.dialog_id] + self.to_details['params']['tag'] = msg.to_details['params']['tag'] + self.app._dialogs[self.dialog_id] = self + async def set_result(msg): self.ack(msg) if not self._waiter.done(): diff --git a/aiosip/peers.py b/aiosip/peers.py index 50408da..a650bc8 100644 --- a/aiosip/peers.py +++ b/aiosip/peers.py @@ -1,12 +1,15 @@ +import uuid import asyncio import logging -import uuid - +import ipaddress import websockets +from multidict import CIMultiDict + from . import utils -from .protocol import UDP, TCP, WS from .contact import Contact +from .protocol import UDP, TCP, WS +from .dialog import Dialog, InviteDialog LOG = logging.getLogger(__name__) @@ -30,7 +33,9 @@ def send_message(self, msg): self._protocol.send_message(msg, addr=self.peer_addr) def _create_dialog(self, method, from_details, to_details, contact_details=None, password=None, call_id=None, - headers=None, payload=None, cseq=0, inbound=False): + headers=None, payload=None, cseq=0, inbound=False, dialog_factory=Dialog, **kwargs): + + from_details.add_tag() if not call_id: call_id = str(uuid.uuid4()) @@ -55,7 +60,7 @@ def _create_dialog(self, method, from_details, to_details, contact_details=None, } ) - dialog = self._app.dialog_factory( + dialog = dialog_factory( method=method, app=self._app, from_details=from_details, @@ -68,13 +73,16 @@ def _create_dialog(self, method, from_details, to_details, contact_details=None, payload=payload, cseq=cseq, inbound=inbound, + **kwargs ) + LOG.debug('Creating: %s', dialog) self._app._dialogs[dialog.dialog_id] = dialog return dialog async def request(self, method, from_details, to_details, contact_details=None, password=None, call_id=None, - headers=None, cseq=0, payload=None): + headers=None, cseq=0, payload=None, dialog_factory=Dialog, **kwargs): + dialog = self._create_dialog(method=method, from_details=from_details, to_details=to_details, @@ -83,7 +91,9 @@ async def request(self, method, from_details, to_details, contact_details=None, payload=payload, password=password, call_id=call_id, - cseq=cseq) + cseq=cseq, + dialog_factory=dialog_factory, + **kwargs) try: response = await dialog.start() dialog.status_code = response.status_code @@ -93,126 +103,30 @@ async def request(self, method, from_details, to_details, contact_details=None, dialog.cancel() raise - async def subscribe(self, from_details, to_details, contact_details=None, password=None, call_id=None, headers=None, - cseq=0, expires=3600): - dialog = self._create_dialog(method="SUBSCRIBE", - from_details=from_details, - to_details=to_details, - contact_details=contact_details, - password=password, - call_id=call_id, - cseq=cseq, - headers=headers) - try: - response = await dialog.start(expires=expires) - dialog.status_code = response.status_code - dialog.status_message = response.status_message - return dialog - except asyncio.CancelledError: - dialog.cancel() - raise + async def subscribe(self, expires=3600, **kwargs): - async def register(self, from_details, to_details, contact_details=None, password=None, call_id=None, headers=None, - cseq=0, expires=3600): - dialog = self._create_dialog(method="REGISTER", - from_details=from_details, - to_details=to_details, - contact_details=contact_details, - password=password, - call_id=call_id, - cseq=cseq) - try: - response = await dialog.start(expires=expires) - dialog.status_code = response.status_code - dialog.status_message = response.status_message - return dialog - except asyncio.CancelledError: - dialog.cancel() - raise + if expires: + headers = kwargs.get('headers', CIMultiDict()) + headers['Expires'] = expires + kwargs['headers'] = headers - async def invite(self, from_details, to_details, contact_details=None, password=None, call_id=None, headers=None, - cseq=0, payload=None): + return await self.request('SUBSCRIBE', **kwargs) - if not call_id: - call_id = str(uuid.uuid4()) + async def register(self, expires=3600, **kwargs): - if not contact_details: - host, port = self.local_addr + if expires: + headers = kwargs.get('headers', CIMultiDict()) + headers['Expires'] = expires + kwargs['headers'] = headers - # No way to get the public local addr in UDP. Allow an override or select the From host - # Maybe with https://bugs.python.org/issue31203 - if self._app.defaults['override_contact_host']: - host = self._app.defaults['override_contact_host'] - elif host == '0.0.0.0' or host.startswith('127.'): - host = from_details['uri']['host'] + return await self.request('REGISTER', **kwargs) - contact_details = Contact( - { - 'uri': 'sip:{username}@{host_and_port};transport={protocol}'.format( - username=from_details['uri']['user'], - host_and_port=utils.format_host_and_port(host, port), - protocol=type(self._protocol).__name__.lower() - ) - } - ) + async def invite(self, dialog_factory=InviteDialog, **kwargs): - from .dialog import InviteDialog - dialog = InviteDialog( - app=self._app, - from_details=from_details, - to_details=to_details, - call_id=call_id, - peer=self, - contact_details=contact_details, - headers=headers, - payload=payload, - password=None, - cseq=cseq - ) - self._app._dialogs[dialog.dialog_id] = dialog + dialog = self._create_dialog(dialog_factory=dialog_factory, method='INVITE', **kwargs) await dialog.start() return dialog - async def proxy_request(self, dialog, msg, timeout=5): - if msg.method == 'ACK': - self.send_message(msg) - return - - proxy_dialog = self._app._dialogs.get(dialog.call_id) - if not proxy_dialog: - proxy_dialog = self._create_dialog( - method=msg.method, - from_details=dialog.from_details, - to_details=dialog.to_details, - call_id=dialog.call_id - ) - elif msg.cseq in proxy_dialog.transactions[msg.method]: - proxy_dialog.transactions[msg.method][msg.cseq].retransmit() - return - - if isinstance(msg.headers['Via'], str): - msg.headers['Via'] = [msg.headers['Via']] - - host, port = self.local_addr - if self._app.defaults['override_contact_host']: - host = self._app.defaults['override_contact_host'] - - msg.headers['Via'].insert(0, 'SIP/2.0/%(protocol)s {host}:{port};branch={branch}'.format( - host=host, - port=port, - branch=utils.gen_branch(10) - ) - ) - - async for response in proxy_dialog.start_proxy_transaction(msg, timeout=timeout): - yield response - - proxy_dialog._maybe_close(msg) - - def proxy_response(self, msg): - msg.headers['Via'].pop(0) - return self.send_message(msg) - @property def protocol(self): return type(self._protocol) @@ -235,6 +149,9 @@ def _disconnected(self): self._protocol = None self._disconnected_future.set_result(None) + def generate_via_headers(self, branch=utils.gen_branch()): + return f'SIP/2.0/{self._protocol.via} {self.local_addr[0]}:{self.local_addr[1]};branch={branch}' + @property def local_addr(self): if self._protocol: @@ -260,6 +177,12 @@ async def create_server(self, local_addr, sock, **kwargs): return await self._create_server(local_addr, sock, **kwargs) async def create_peer(self, peer_addr, local_addr=None, **kwargs): + try: + peer_addr = ipaddress.ip_address(peer_addr[0]).exploded, peer_addr[1] + except ValueError: + dns = await self._app.dns.query(peer_addr[0], 'A') + peer_addr = dns[0].host, peer_addr[1] + try: if not local_addr: peer = [peer for key, peer in self._peers.items() if key[0] == peer_addr][0] diff --git a/aiosip/protocol.py b/aiosip/protocol.py index 5e53edc..b26e44a 100644 --- a/aiosip/protocol.py +++ b/aiosip/protocol.py @@ -10,15 +10,16 @@ class UDP(asyncio.DatagramProtocol): def __init__(self, app, loop): self.app = app + self.via = 'UDP' self.loop = loop self.transport = None self.ready = asyncio.Future() def send_message(self, msg, addr): if isinstance(msg.headers['Via'], str): - msg.headers['Via'] %= {'protocol': 'UDP'} + msg.headers['Via'] %= {'protocol': self.via} else: - msg.headers['Via'][0] %= {'protocol': 'UDP'} + msg.headers['Via'][0] %= {'protocol': self.via} LOG.log(5, 'Sending to: "%s" via UDP: "%s"', addr, msg) self.transport.sendto(msg.encode(), addr) @@ -44,6 +45,7 @@ def datagram_received(self, data, addr): class TCP(asyncio.Protocol): def __init__(self, app, loop): self.app = app + self.via = 'TCP' self.loop = loop self.transport = None self.ready = asyncio.Future() @@ -51,9 +53,9 @@ def __init__(self, app, loop): def send_message(self, msg, addr=None): if isinstance(msg.headers['Via'], str): - msg.headers['Via'] %= {'protocol': 'TCP'} + msg.headers['Via'] %= {'protocol': self.via} else: - msg.headers['Via'][0] %= {'protocol': 'TCP'} + msg.headers['Via'][0] %= {'protocol': self.via} LOG.log(5, 'Sent via TCP: "%s"', msg) self.transport.write(msg.encode()) @@ -90,9 +92,9 @@ def __init__(self, app, loop, local_addr, peer_addr, websocket): self.local_addr = local_addr self.peer_addr = peer_addr if isinstance(peer_addr, str) and peer_addr.startswith('wss:'): - self.protocol = 'WSS' + self.via = 'WSS' else: - self.protocol = 'WS' + self.via = 'WS' self.transport = self self.websocket = websocket self.websocket_pump = asyncio.ensure_future(self.run()) @@ -108,11 +110,11 @@ def get_extra_info(self, key): def send_message(self, msg, addr): if isinstance(msg.headers['Via'], str): - msg.headers['Via'] %= {'protocol': self.protocol} + msg.headers['Via'] %= {'protocol': self.via} else: - msg.headers['Via'][0] %= {'protocol': self.protocol} + msg.headers['Via'][0] %= {'protocol': self.via} - LOG.log(5, 'Sending via %s: "%s"', self.protocol, msg) + LOG.log(5, 'Sending via %s: "%s"', self.via, msg) asyncio.ensure_future(self.websocket.send(msg.encode().decode('utf8'))) async def run(self): @@ -126,7 +128,7 @@ async def run(self): headers, data = data.split(b'\r\n\r\n', 1) msg = message.Message.from_raw_headers(headers) msg._raw_payload = data - LOG.log(5, 'Received via %s: "%s"', self.protocol, msg) + LOG.log(5, 'Received via %s: "%s"', self.via, msg) asyncio.ensure_future(self.app._dispatch(self, msg, self.peer_addr)) await self.websocket.close() diff --git a/aiosip/transaction.py b/aiosip/transaction.py index 4f2b0ab..d0d6c47 100644 --- a/aiosip/transaction.py +++ b/aiosip/transaction.py @@ -163,47 +163,3 @@ def close(self): if self._running and not self._future.done(): self.dialog.cancel(cseq=self.original_msg.cseq) super().close() - - -class ProxyTransaction(BaseTransaction): - def __init__(self, timeout=5, *args, **kwargs): - super().__init__(*args, **kwargs) - self._incomings = asyncio.Queue() - self._closing = None - self._timeout = timeout - - async def start(self): - self.dialog.peer.send_message(self.original_msg) - while self._running: - response = await self._incomings.get() - if isinstance(response, BaseException): - self.dialog.end_transaction(self) - raise response - elif response is None: - self.dialog.end_transaction(self) - return - elif 100 <= response.status_code < 200: - yield response - else: - if self._closing: - self._closing.cancel() - yield response - self._closing = self.dialog.app.loop.call_later(self._timeout, self.dialog.end_transaction, self) - self._running = False - - def _incoming(self, msg): - self._result(msg) - - def _error(self, error): - self._incomings.put_nowait(error) - - def _result(self, msg): - self._incomings.put_nowait(msg) - - def retransmit(self): - self.dialog.peer.send_message(self.original_msg) - - def close(self): - if self._running: - super().close() - self._incomings.put_nowait(None) diff --git a/tests/test_sip_proxy.py b/tests/test_sip_proxy.py index 9dae680..74c2ba1 100644 --- a/tests/test_sip_proxy.py +++ b/tests/test_sip_proxy.py @@ -1,161 +1,145 @@ -import aiosip -import pytest -import asyncio -import itertools - - -@pytest.mark.parametrize('close_order', itertools.permutations(('client', 'server', 'proxy'))) # noQa C901: too complex -async def test_proxy_subscribe(test_server, test_proxy, protocol, loop, from_details, to_details, close_order): - callback_complete = loop.create_future() - callback_complete_proxy = loop.create_future() - - class ServerDialplan(aiosip.BaseDialplan): - - async def resolve(self, *args, **kwargs): - await super().resolve(*args, **kwargs) - - return self.subscribe - - async def subscribe(self, request, message): - await request.prepare(status_code=200) - callback_complete.set_result(message) - - class ProxyDialplan(aiosip.BaseDialplan): - async def resolve(self, *args, **kwargs): - await super().resolve(*args, **kwargs) - - return self.proxy_subscribe - - async def proxy_subscribe(self, request, message): - dialog = request._create_dialog() - peer = await aiosip.utils.get_proxy_peer(dialog, message) - - async for proxy_response in peer.proxy_request(dialog, message, 0.1): - if proxy_response: - dialog.peer.proxy_response(proxy_response) - - callback_complete_proxy.set_result(message) - - app = aiosip.Application(loop=loop, debug=True) - - server_app = aiosip.Application(loop=loop, debug=True, dialplan=ServerDialplan()) - await test_server(server_app) - - proxy_app = aiosip.Application(loop=loop, dialplan=ProxyDialplan()) - proxy = await test_proxy(proxy_app) - - peer = await app.connect( - protocol=protocol, - remote_addr=(proxy.sip_config['server_host'], proxy.sip_config['server_port']) - ) - - await peer.subscribe( - expires=1800, - from_details=aiosip.Contact.from_header(from_details), - to_details=aiosip.Contact.from_header(to_details), - ) - - received_request_server = await asyncio.wait_for(callback_complete, timeout=2) - received_request_proxy = await asyncio.wait_for(callback_complete_proxy, timeout=2) - - assert received_request_server.method == 'SUBSCRIBE' - assert received_request_server.payload == received_request_proxy.payload - assert received_request_server.headers == received_request_proxy.headers - - for item in close_order: - if item == 'client': - await app.close() - elif item == 'server': - await server_app.close() - elif item == 'proxy': - await proxy_app.close() - else: - raise ValueError('Invalid close_order') - - -@pytest.mark.parametrize('close_order', itertools.permutations(('client', 'server', 'proxy'))) # noQa C901: too complex -async def test_proxy_notify(test_server, test_proxy, protocol, loop, from_details, to_details, close_order): - callback_complete = loop.create_future() - callback_complete_proxy = loop.create_future() - - class ServerDialpan(aiosip.BaseDialplan): - - async def resolve(self, *args, **kwargs): - await super().resolve(*args, **kwargs) - - return self.subscribe - - async def subscribe(self, request, message): - dialog = await request.prepare(status_code=200) - await asyncio.sleep(0.2) - await dialog.notify(payload='1') - - class ProxyDialplan(aiosip.BaseDialplan): - async def resolve(self, *args, **kwargs): - await super().resolve(*args, **kwargs) - - return self.proxy_subscribe - - async def proxy_subscribe(self, request, message): - dialog = request._create_dialog() - peer = await aiosip.utils.get_proxy_peer(dialog, message) - - async for proxy_response in peer.proxy_request(dialog, message, 0.1): - if proxy_response: - dialog.peer.proxy_response(proxy_response) - - # TODO: refactor - subscription = request.app._dialogs[frozenset(( - message.to_details.details, - message.from_details.details, - message.headers['Call-ID'] - ))] - - async for msg in subscription: - async for proxy_response in dialog.peer.proxy_request(subscription, msg): - if proxy_response: - peer.proxy_response(proxy_response) - callback_complete_proxy.set_result(msg) - - app = aiosip.Application(loop=loop, debug=True) - - server_app = aiosip.Application(loop=loop, debug=True, dialplan=ServerDialpan()) - await test_server(server_app) - - proxy_app = aiosip.Application(loop=loop, debug=True, dialplan=ProxyDialplan()) - proxy = await test_proxy(proxy_app) - - peer = await app.connect( - protocol=protocol, - remote_addr=(proxy.sip_config['server_host'], proxy.sip_config['server_port']) - ) - - subscription = await peer.subscribe( - expires=1800, - from_details=aiosip.Contact.from_header(from_details), - to_details=aiosip.Contact.from_header(to_details) - ) - - async for msg in subscription: - await subscription.reply(msg, 200) - callback_complete.set_result(msg) - break # We only expect a single message - - received_notify_server = await asyncio.wait_for(callback_complete, timeout=2) - received_notify_proxy = await asyncio.wait_for(callback_complete_proxy, timeout=2) - - assert received_notify_server.method == 'NOTIFY' - assert received_notify_server.payload == '1' - - assert received_notify_server.payload == received_notify_proxy.payload - assert received_notify_server.headers == received_notify_proxy.headers - - for item in close_order: - if item == 'client': - await app.close() - elif item == 'server': - await server_app.close() - elif item == 'proxy': - await proxy_app.close() - else: - raise ValueError('Invalid close_order') +# import aiosip +# import pytest +# import asyncio +# import itertools +# +# +# @pytest.mark.parametrize('close_order', itertools.permutations(('client', 'server', 'proxy'))) # noQa C901: too complex +# async def test_proxy_subscribe(test_server, test_proxy, protocol, loop, from_details, to_details, close_order): +# callback_complete = loop.create_future() +# callback_complete_proxy = loop.create_future() +# +# class ServerDialplan(aiosip.BaseDialplan): +# +# async def resolve(self, *args, **kwargs): +# await super().resolve(*args, **kwargs) +# return self.subscribe +# +# async def subscribe(self, request, message): +# await request.prepare(status_code=200) +# callback_complete.set_result(message) +# +# class ProxyDialplan(aiosip.BaseDialplan): +# async def resolve(self, *args, **kwargs): +# await super().resolve(*args, **kwargs) +# +# return self.proxy_subscribe +# +# async def proxy_subscribe(self, request, message): +# dialog = await request.proxy(message) +# callback_complete_proxy.set_result(message) +# async for message in dialog: +# dialog.proxy(message) +# +# app = aiosip.Application(loop=loop, debug=True) +# +# server_app = aiosip.Application(loop=loop, debug=True, dialplan=ServerDialplan()) +# await test_server(server_app) +# +# proxy_app = aiosip.Application(loop=loop, dialplan=ProxyDialplan()) +# proxy = await test_proxy(proxy_app) +# +# peer = await app.connect( +# protocol=protocol, +# remote_addr=(proxy.sip_config['server_host'], proxy.sip_config['server_port']) +# ) +# +# await peer.subscribe( +# expires=1800, +# from_details=aiosip.Contact.from_header(from_details), +# to_details=aiosip.Contact.from_header(to_details), +# ) +# +# received_request_server = await asyncio.wait_for(callback_complete, timeout=2) +# received_request_proxy = await asyncio.wait_for(callback_complete_proxy, timeout=2) +# +# assert received_request_server.method == 'SUBSCRIBE' +# assert received_request_server.payload == received_request_proxy.payload +# assert received_request_server.headers == received_request_proxy.headers +# +# for item in close_order: +# if item == 'client': +# await app.close() +# elif item == 'server': +# await server_app.close() +# elif item == 'proxy': +# await proxy_app.close() +# else: +# raise ValueError('Invalid close_order') +# +# +# @pytest.mark.parametrize('close_order', itertools.permutations(('client', 'server', 'proxy'))) # noQa C901: too complex +# async def test_proxy_notify(test_server, test_proxy, protocol, loop, from_details, to_details, close_order): +# +# callback_complete = loop.create_future() +# callback_complete_proxy = loop.create_future() +# +# class ServerDialpan(aiosip.BaseDialplan): +# +# async def resolve(self, *args, **kwargs): +# await super().resolve(*args, **kwargs) +# +# return self.subscribe +# +# async def subscribe(self, request, message): +# dialog = await request.prepare(status_code=200) +# await asyncio.sleep(0.2) +# await dialog.notify(payload='1') +# +# class ProxyDialplan(aiosip.BaseDialplan): +# async def resolve(self, *args, **kwargs): +# await super().resolve(*args, **kwargs) +# +# return self.proxy_subscribe +# +# async def proxy_subscribe(self, request, message): +# dialog = await request.proxy(message) +# +# async for message in dialog: +# dialog.proxy(message) +# +# if message.method == 'NOTIFY': +# callback_complete_proxy.set_result(message) +# +# app = aiosip.Application(loop=loop, debug=True) +# +# server_app = aiosip.Application(loop=loop, debug=True, dialplan=ServerDialpan()) +# await test_server(server_app) +# +# proxy_app = aiosip.Application(loop=loop, debug=True, dialplan=ProxyDialplan()) +# proxy = await test_proxy(proxy_app) +# +# peer = await app.connect( +# protocol=protocol, +# remote_addr=(proxy.sip_config['server_host'], proxy.sip_config['server_port']) +# ) +# +# subscription = await peer.subscribe( +# expires=1800, +# from_details=aiosip.Contact.from_header(from_details), +# to_details=aiosip.Contact.from_header(to_details) +# ) +# +# async for msg in subscription: +# await subscription.reply(msg, 200) +# callback_complete.set_result(msg) +# break # We only expect a single message +# +# received_notify_server = await asyncio.wait_for(callback_complete, timeout=2) +# received_notify_proxy = await asyncio.wait_for(callback_complete_proxy, timeout=2) +# +# assert received_notify_server.method == 'NOTIFY' +# assert received_notify_server.payload == '1' +# +# assert received_notify_server.payload == received_notify_proxy.payload +# assert received_notify_server.headers == received_notify_proxy.headers +# +# for item in close_order: +# if item == 'client': +# await app.close() +# elif item == 'server': +# await server_app.close() +# elif item == 'proxy': +# await proxy_app.close() +# else: +# raise ValueError('Invalid close_order')