Skip to content

Commit

Permalink
Merge pull request #94 from sangoma/redo-dialog-tracking
Browse files Browse the repository at this point in the history
Redo dialog tracking
  • Loading branch information
vodik authored Apr 18, 2018
2 parents 4770c33 + d75e997 commit 70f91af
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 95 deletions.
34 changes: 27 additions & 7 deletions aiosip/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from .dialplan import Dialplan
from .protocol import UDP, TCP, WS
from .peers import UDPConnector, TCPConnector, WSConnector
from .message import Response
from .contact import Contact
from .via import Via


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -53,6 +55,7 @@ def __init__(self, *,
self.dns = dns_resolver
self._finish_callbacks = []
self._state = {}
self._dialogs = {}
self._connectors = {UDP: UDPConnector(self, loop=loop),
TCP: TCPConnector(self, loop=loop),
WS: WSConnector(self, loop=loop)}
Expand Down Expand Up @@ -129,19 +132,36 @@ async def prepare(self, status_code, *args, **kwargs):
await route(request, msg)

async def _dispatch(self, protocol, msg, addr):
connector = self._connectors[type(protocol)]
peer = await connector.get_peer(protocol, addr)
key = msg.headers['Call-ID']
call_id = msg.headers['Call-ID']
dialog = self._dialogs.get(frozenset((msg.to_details.details,
msg.from_details.details,
call_id)))

dialog = peer._dialogs.get(key)
if dialog:
await dialog.receive_message(msg)
return

# If we got an ACK, but nowhere to deliver it, drop it
if msg.method == 'ACK':
# If we got an ACK, but nowhere to deliver it, drop it. If we
# got a response without an associated message (likely a stale
# retransmission, drop it)
if isinstance(msg, Response) or msg.method == 'ACK':
return

await self._run_dialplan(protocol, msg)

async def _run_dialplan(self, protocol, msg):
call_id = msg.headers['Call-ID']
via_header = msg.headers['Via']

# TODO: isn't multidict supposed to only return the first header?
if isinstance(via_header, list):
via_header = via_header[0]

connector = self._connectors[type(protocol)]
via = Via.from_header(via_header)
via_addr = via['host'], int(via['port'])
peer = await connector.get_peer(protocol, via_addr)

router = await self.dialplan.resolve(
username=msg.from_details['uri']['user'],
protocol=peer.protocol,
Expand All @@ -154,7 +174,7 @@ async def reply(*args, **kwargs):
method=msg.method,
from_details=Contact.from_header(msg.headers['To']),
to_details=Contact.from_header(msg.headers['From']),
call_id=key
call_id=call_id
)

await dialog.reply(*args, **kwargs)
Expand Down
31 changes: 31 additions & 0 deletions aiosip/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,37 @@ def __str__(self):
r += ';%s' % str(params)
return r

@property
def scheme(self):
return self._contact['uri']['scheme']

@property
def transport(self):
transport = self._contact['params'].get('transport')
if not transport:
return 'tcp' if self.scheme == 'sips' else 'udp'
return transport

@property
def host(self):
return self._contact['uri']['host']

@property
def port(self):
port = self._contact['uri'].get('port')
if not port:
if self.scheme == 'sips':
return 5061
elif self.transport == 'udp':
return 5060
else:
return 5080
return port

@property
def details(self):
return self.scheme, self.transport, self.host, self.port

# MutableMapping API
def __eq__(self, other):
return self is other
Expand Down
13 changes: 6 additions & 7 deletions aiosip/dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ def __init__(self,
self._closed = False
self._closing = None

def _receive_response(self, msg):
@property
def dialog_id(self):
return frozenset((self.original_msg.to_details.details,
self.original_msg.from_details.details,
self.call_id))

def _receive_response(self, msg):
try:
transaction = self.transactions[msg.method][msg.cseq]
transaction._incoming(msg)
Expand Down Expand Up @@ -257,8 +262,6 @@ async def receive_message(self, msg):
return await self._receive_request(msg)

async def _receive_request(self, msg):
self.peer._bookkeeping(msg, self.call_id)

if 'tag' in msg.from_details['params']:
self.to_details['params']['tag'] = msg.from_details['params']['tag']

Expand All @@ -280,7 +283,6 @@ async def close(self, fast=False, headers=None, *args, **kwargs):
if 'Expires' not in headers:
headers['Expires'] = 0
result = await self.request(self.original_msg.method, headers=headers, *args, **kwargs)
self.peer._close_dialog(self.call_id)
self._close()
return result

Expand Down Expand Up @@ -368,8 +370,6 @@ async def handle_completed_state(msg):
return await self._receive_request(msg)

async def _receive_request(self, msg):
self.peer._bookkeeping(msg, self.call_id)

if 'tag' in msg.from_details['params']:
self.to_details['params']['tag'] = msg.from_details['params']['tag']

Expand Down Expand Up @@ -424,7 +424,6 @@ async def close(self):
self.transactions[msg.method][msg.cseq] = transaction
await transaction.start()

self.peer._close_dialog(self.call_id)
self._close()

def _close(self):
Expand Down
90 changes: 10 additions & 80 deletions aiosip/peers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
import logging
import time
import uuid

from collections import defaultdict
import websockets

from . import utils
Expand All @@ -16,19 +14,12 @@
class Peer:
def __init__(self, peer_addr, app, *, loop=None):
self.peer_addr = peer_addr
self.registered = {}
self.subscriber = defaultdict(dict)
self._app = app
self._protocol = None
self._loop = loop
self._dialogs = {}
self._connected_future = asyncio.Future(loop=loop)
self._disconnected_future = asyncio.Future(loop=loop)

@property
def dialogs(self):
return self._dialogs

async def close(self):
if self._protocol is not None:
LOG.debug('Closing connection for %s', self)
Expand Down Expand Up @@ -79,7 +70,7 @@ def _create_dialog(self, method, from_details, to_details, contact_details=None,
inbound=inbound,
)
LOG.debug('Creating: %s', dialog)
self._dialogs[call_id] = dialog
self._app._dialogs[dialog.dialog_id] = dialog
return dialog

async def request(self, method, from_details, to_details, contact_details=None, password=None, call_id=None,
Expand All @@ -91,7 +82,8 @@ async def request(self, method, from_details, to_details, contact_details=None,
headers=headers,
payload=payload,
password=password,
call_id=call_id, cseq=cseq)
call_id=call_id,
cseq=cseq)
try:
response = await dialog.start()
dialog.status_code = response.status_code
Expand All @@ -108,7 +100,8 @@ async def subscribe(self, from_details, to_details, contact_details=None, passwo
to_details=to_details,
contact_details=contact_details,
password=password,
call_id=call_id, cseq=cseq)
call_id=call_id,
cseq=cseq)
try:
response = await dialog.start(expires=expires)
dialog.status_code = response.status_code
Expand All @@ -125,7 +118,8 @@ async def register(self, from_details, to_details, contact_details=None, passwor
to_details=to_details,
contact_details=contact_details,
password=password,
call_id=call_id, cseq=cseq)
call_id=call_id,
cseq=cseq)
try:
response = await dialog.start(expires=expires)
dialog.status_code = response.status_code
Expand Down Expand Up @@ -174,7 +168,7 @@ async def invite(self, from_details, to_details, contact_details=None, password=
password=None,
cseq=cseq
)
self._dialogs[call_id] = dialog
self._app._dialogs[dialog.dialog_id] = dialog
await dialog.start()
return dialog

Expand All @@ -183,13 +177,13 @@ async def proxy_request(self, dialog, msg, timeout=5):
self.send_message(msg)
return

proxy_dialog = self._dialogs.get(dialog.call_id)
proxy_dialog = self._app._dialogs.get(dialog.call_id)
if not proxy_dialog:
proxy_dialog = self._create_dialog(
method=msg.method,
from_details=dialog.from_details,
to_details=dialog.to_details,
call_id=dialog.call_id,
call_id=dialog.call_id
)
elif msg.cseq in proxy_dialog.transactions[msg.method]:
proxy_dialog.transactions[msg.method][msg.cseq].retransmit()
Expand All @@ -214,59 +208,10 @@ async def proxy_request(self, dialog, msg, timeout=5):

proxy_dialog._maybe_close(msg)

def _bookkeeping(self, msg, call_id):
if msg.method not in ('REGISTER', 'SUBSCRIBE'):
return

expires = int(msg.headers.get('Expires', 0))

if msg.method == 'REGISTER' and expires:
self.registered[msg.contact_details['uri']['user']] = {
'expires': time.time() + expires,
'dialog': call_id
}
elif msg.method == 'SUBSCRIBE' and expires:
self.subscriber[msg.contact_details['uri']['user']][msg.to_details['uri']['user']] = {
'expires': time.time() + expires,
'dialog': call_id
}
if msg.method == 'REGISTER' and not expires:
try:
del self.registered[msg.contact_details['uri']['user']]
except KeyError:
pass
elif msg.method == 'SUBSCRIBE' and not expires:
try:
del self.subscriber[msg.contact_details['uri']['user']][msg.to_details['uri']['user']]
except KeyError:
pass

def proxy_response(self, msg):
msg.headers['Via'].pop(0)
return self.send_message(msg)

def _close_dialog(self, call_id):
try:
del self._dialogs[call_id]
except KeyError:
pass

to_del = list()
for user, value in self.registered.items():
if value['dialog'] == call_id:
to_del.append(user)
for user in to_del:
del self.registered[user]

to_del = list()
for user, subscriptions in self.subscriber.items():
for subscribe, values in subscriptions.items():
if values['dialog'] == call_id:
to_del.append((user, subscribe))

for v in to_del:
del self.subscriber[v[0]][v[1]]

@property
def protocol(self):
return type(self._protocol)
Expand All @@ -286,14 +231,6 @@ def _connected(self, protocol):

def _disconnected(self):
LOG.debug('Lost connection for %s', self)

for dialog in self._dialogs.values():
dialog._close()
self._dialogs = {}

self.registered = {}
self.subscriber = defaultdict(dict)

self._protocol = None
self._disconnected_future.set_result(None)

Expand All @@ -304,13 +241,6 @@ def local_addr(self):
else:
return None, None

@property
def contacts(self):
for contact in self.registered:
yield contact
for contact in self.subscriber:
yield contact

def __repr__(self):
return '<{0} {1[0]}:{1[1]} {2}, local_addr={3[0]}:{3[1]}>'.format(
self.__class__.__name__, self.peer_addr, self.protocol.__name__, self.local_addr)
Expand Down
58 changes: 58 additions & 0 deletions aiosip/via.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import re
import logging

from collections import MutableMapping

from .param import Param


VIA_PATTERNS = [
re.compile('SIP/2.0/(?P<protocol>[a-zA-Z]+)'
'[ \t]*'
'(?P<sentby>[^;]+)'
'(?:;(?P<params>.*))'),
]


LOG = logging.getLogger(__name__)


class Via(MutableMapping):
def __init__(self, *args, **kwargs):
self._via = dict(*args, **kwargs)

params = self._via.get('params')
if not params:
self._via['params'] = Param()
if not isinstance(params, Param):
self._via['params'] = Param(self._via['params'])

self._via['host'], self._via['port'] = self._via['sentby'].rsplit(':', 1)

@classmethod
def from_header(cls, via):
for s in VIA_PATTERNS:
m = s.match(via)
if m:
return cls(m.groupdict())
else:
raise ValueError('Not valid via address')

# MutableMapping API
def __eq__(self, other):
return self is other

def __getitem__(self, key):
return self._via[key]

def __setitem__(self, key, value):
self._via[key] = value

def __delitem__(self, key):
del self._via[key]

def __len__(self):
return len(self._via)

def __iter__(self):
return iter(self._via)
Loading

0 comments on commit 70f91af

Please sign in to comment.