Skip to content

Commit

Permalink
Update proxy for dialog tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
ovv committed Apr 25, 2018
1 parent b417a24 commit b51ad8e
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 162 deletions.
19 changes: 16 additions & 3 deletions aiosip/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from collections import MutableMapping

from . import __version__
from .dialog import Dialog
from .dialog import Dialog, ProxyDialog
from .dialplan import BaseDialplan
from .protocol import UDP, TCP, WS
from .peers import UDPConnector, TCPConnector, WSConnector
Expand Down Expand Up @@ -105,15 +105,16 @@ def __init__(self):
self.app = app
self.dialog = None

def _create_dialog(self, dialog_factory=Dialog):
def _create_dialog(self, dialog_factory=Dialog, **kwargs):
if not self.dialog:
self.dialog = peer._create_dialog(
method=msg.method,
from_details=Contact.from_header(msg.headers['To']),
to_details=Contact.from_header(msg.headers['From']),
call_id=call_id,
inbound=True,
dialog_factory=dialog_factory
dialog_factory=dialog_factory,
**kwargs
)
return self.dialog

Expand All @@ -127,6 +128,17 @@ async def prepare(self, status_code, *args, **kwargs):

return dialog

async def proxy(self, message, proxy_peer=None, dialog_factory=ProxyDialog):
if not proxy_peer:
proxy_peer = await self.app.connect(
remote_addr=(message.to_details.host, message.to_details.port),
protocol=peer.protocol
)

dialog = self._create_dialog(dialog_factory=dialog_factory, proxy_peer=proxy_peer)
dialog.proxy(message)
return dialog

request = Request()
await route(request, msg)

Expand All @@ -153,6 +165,7 @@ async def _dispatch(self, protocol, msg, addr):
# got a response without an associated message (likely a stale
# retransmission, drop it)
if isinstance(msg, Response) or msg.method == 'ACK':
LOG.debug('Discarding incoming message: %s', msg)
return

await self._run_dialplan(protocol, msg)
Expand Down
49 changes: 47 additions & 2 deletions aiosip/dialog.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
import enum
import asyncio
import logging

from collections import defaultdict
from multidict import CIMultiDict
from collections import defaultdict

from . import utils
from .message import Request, Response
Expand Down Expand Up @@ -442,3 +442,48 @@ async def close(self):

def _close(self):
pass


class ProxyDialog(DialogBase):
def __init__(self, *args, proxy_peer, **kwargs):
super().__init__(*args, **kwargs)
self.proxy_peer = proxy_peer
self._incoming = asyncio.Queue()

@property
def dialog_id(self):
return frozenset((self.to_details['params']['tag'],
self.from_details['params'].get('tag'),
self.call_id))

async def receive_message(self, msg):
if 'tag' not in self.from_details['params'] and 'tag' in msg.to_details['params']:
del self.app._dialogs[self.dialog_id]
self.from_details['params']['tag'] = msg.to_details['params']['tag']
self.app._dialogs[self.dialog_id] = self

await self._incoming.put(msg)

async def recv(self):
return await self._incoming.get()

def proxy(self, message):
# TODO: should be cleaner
if not isinstance(message.headers['Via'], list):
message.headers['Via'] = [message.headers['Via'], ]

if f'{self.peer.peer_addr[0]}:{self.peer.peer_addr[1]}' in message.headers['Via'][0]:
message.headers['Via'].insert(0, self.proxy_peer.generate_via_headers())
self.proxy_peer.send_message(message)
elif f'{self.peer.local_addr[0]}:{self.peer.local_addr[1]}' in message.headers['Via'][0]:
message.headers['Via'].pop(0)
self.proxy_peer.send_message(message)
elif f'{self.proxy_peer.peer_addr[0]}:{self.proxy_peer.peer_addr[1]}' in message.headers['Via'][0]:
message.headers['Via'].insert(0, self.peer.generate_via_headers())
self.peer.send_message(message)
elif f'{self.proxy_peer.local_addr[0]}:{self.proxy_peer.local_addr[1]}' in message.headers['Via'][0]:
message.headers['Via'].pop(0)
self.peer.send_message(message)
else:
message.headers['Via'].insert(0, self.proxy_peer.generate_via_headers())
self.proxy_peer.send_message(message)
16 changes: 14 additions & 2 deletions aiosip/peers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import uuid
import asyncio
import logging
import ipaddress
import websockets

from multidict import CIMultiDict

from . import utils
from .contact import Contact
from .protocol import UDP, TCP, WS
from .dialog import Dialog, InviteDialog
from .dialog import Dialog, InviteDialog, ProxyDialog

LOG = logging.getLogger(__name__)

Expand All @@ -34,7 +35,8 @@ def send_message(self, msg):
def _create_dialog(self, method, from_details, to_details, contact_details=None, password=None, call_id=None,
headers=None, payload=None, cseq=0, inbound=False, dialog_factory=Dialog, **kwargs):

from_details.add_tag()
if not issubclass(dialog_factory, ProxyDialog):
from_details.add_tag()

if not call_id:
call_id = str(uuid.uuid4())
Expand Down Expand Up @@ -74,6 +76,7 @@ def _create_dialog(self, method, from_details, to_details, contact_details=None,
inbound=inbound,
**kwargs
)

LOG.debug('Creating: %s', dialog)
self._app._dialogs[dialog.dialog_id] = dialog
return dialog
Expand Down Expand Up @@ -186,6 +189,9 @@ def _disconnected(self):
self._protocol = None
self._disconnected_future.set_result(None)

def generate_via_headers(self, branch=utils.gen_branch()):
return f'SIP/2.0/{self._protocol.via} {self.local_addr[0]}:{self.local_addr[1]};branch={branch}'

@property
def local_addr(self):
if self._protocol:
Expand All @@ -211,6 +217,12 @@ async def create_server(self, local_addr, sock):
return await self._create_server(local_addr, sock)

async def create_peer(self, peer_addr, local_addr=None):
try:
peer_addr = ipaddress.ip_address(peer_addr[0]).exploded, peer_addr[1]
except ValueError:
dns = await self._app.dns.query(peer_addr[0], 'A')
peer_addr = dns[0].host, peer_addr[1]

try:
if not local_addr:
peer = [peer for key, peer in self._peers.items() if key[0] == peer_addr][0]
Expand Down
22 changes: 12 additions & 10 deletions aiosip/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
class UDP(asyncio.DatagramProtocol):
def __init__(self, app, loop):
self.app = app
self.via = 'UDP'
self.loop = loop
self.transport = None
self.ready = asyncio.Future()

def send_message(self, msg, addr):
if isinstance(msg.headers['Via'], str):
msg.headers['Via'] %= {'protocol': 'UDP'}
msg.headers['Via'] %= {'protocol': self.via}
else:
msg.headers['Via'][0] %= {'protocol': 'UDP'}
msg.headers['Via'][0] %= {'protocol': self.via}

LOG.log(5, 'Sending to: "%s" via UDP: "%s"', addr, msg)
self.transport.sendto(msg.encode(), addr)
Expand All @@ -44,16 +45,17 @@ def datagram_received(self, data, addr):
class TCP(asyncio.Protocol):
def __init__(self, app, loop):
self.app = app
self.via = 'TCP'
self.loop = loop
self.transport = None
self.ready = asyncio.Future()
self._data = b''

def send_message(self, msg, addr=None):
if isinstance(msg.headers['Via'], str):
msg.headers['Via'] %= {'protocol': 'TCP'}
msg.headers['Via'] %= {'protocol': self.via}
else:
msg.headers['Via'][0] %= {'protocol': 'TCP'}
msg.headers['Via'][0] %= {'protocol': self.via}

LOG.log(5, 'Sent via TCP: "%s"', msg)
self.transport.write(msg.encode())
Expand Down Expand Up @@ -90,9 +92,9 @@ def __init__(self, app, loop, local_addr, peer_addr, websocket):
self.local_addr = local_addr
self.peer_addr = peer_addr
if isinstance(peer_addr, str) and peer_addr.startswith('wss:'):
self.protocol = 'WSS'
self.via = 'WSS'
else:
self.protocol = 'WS'
self.via = 'WS'
self.transport = self
self.websocket = websocket
self.websocket_pump = asyncio.ensure_future(self.run())
Expand All @@ -108,11 +110,11 @@ def get_extra_info(self, key):

def send_message(self, msg, addr):
if isinstance(msg.headers['Via'], str):
msg.headers['Via'] %= {'protocol': self.protocol}
msg.headers['Via'] %= {'protocol': self.via}
else:
msg.headers['Via'][0] %= {'protocol': self.protocol}
msg.headers['Via'][0] %= {'protocol': self.via}

LOG.log(5, 'Sending via %s: "%s"', self.protocol, msg)
LOG.log(5, 'Sending via %s: "%s"', self.via, msg)
asyncio.ensure_future(self.websocket.send(msg.encode().decode('utf8')))

async def run(self):
Expand All @@ -126,7 +128,7 @@ async def run(self):
headers, data = data.split(b'\r\n\r\n', 1)
msg = message.Message.from_raw_headers(headers)
msg._raw_payload = data
LOG.log(5, 'Received via %s: "%s"', self.protocol, msg)
LOG.log(5, 'Received via %s: "%s"', self.via, msg)
asyncio.ensure_future(self.app._dispatch(self, msg, self.peer_addr))

await self.websocket.close()
Expand Down
Loading

0 comments on commit b51ad8e

Please sign in to comment.