Skip to content

Commit

Permalink
Merge pull request #90 from sangoma/bug-fixes
Browse files Browse the repository at this point in the history
Slew of bug fixes
  • Loading branch information
vodik authored Mar 30, 2018
2 parents 28b05ff + 1cde787 commit 4770c33
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 29 deletions.
7 changes: 6 additions & 1 deletion aiosip/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def _create_dialog(self):
method=msg.method,
from_details=Contact.from_header(msg.headers['To']),
to_details=Contact.from_header(msg.headers['From']),
call_id=call_id
call_id=call_id,
inbound=True
)
return self.dialog

Expand All @@ -137,6 +138,10 @@ async def _dispatch(self, protocol, msg, addr):
await dialog.receive_message(msg)
return

# If we got an ACK, but nowhere to deliver it, drop it
if msg.method == 'ACK':
return

router = await self.dialplan.resolve(
username=msg.from_details['uri']['user'],
protocol=peer.protocol,
Expand Down
88 changes: 62 additions & 26 deletions aiosip/dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ def __init__(self,
peer,
contact_details,
*,
headers=None,
payload=None,
password=None,
cseq=0):
cseq=0,
inbound=False):

self.app = app
self.from_details = from_details
Expand All @@ -43,12 +46,17 @@ def __init__(self,
self.peer = peer
self.password = password
self.cseq = cseq
self.inbound = inbound
self.transactions = defaultdict(dict)

# TODO: Needs to be last because we need the above attributes set
self.original_msg = self._prepare_request(method)
self.original_msg = self._prepare_request(method, headers=headers, payload=payload)

self._closed = False
self._closing = None

def _receive_response(self, msg):

try:
transaction = self.transactions[msg.method][msg.cseq]
transaction._incoming(msg)
Expand Down Expand Up @@ -87,10 +95,17 @@ def _prepare_request(self, method, contact_details=None, headers=None, payload=N

async def start(self, *, expires=None):
# TODO: this is a hack
headers = {}
if expires:
headers = self.original_msg.headers
if expires is not None:
headers['Expires'] = expires
return await self.request(self.original_msg.method, headers=headers)
return await self.request(self.original_msg.method, headers=headers, payload=self.original_msg.payload)

def ack(self, msg, headers=None, *args, **kwargs):
headers = CIMultiDict(headers or {})

headers['Via'] = msg.headers['Via']
ack = self._prepare_request('ACK', cseq=msg.cseq, to_details=msg.to_details, headers=headers, *args, **kwargs)
self.peer.send_message(ack)

async def unauthorized(self, msg):
self._nonce = utils.gen_str(10)
Expand All @@ -107,10 +122,6 @@ def validate_auth(self, msg, password):
else:
return False

def close(self, *, fast=False):
self.peer._close_dialog(self.call_id)
self._close()

def close_later(self, delay=None):
if delay is None:
delay = self.app.defaults['dialog_closing_delay']
Expand All @@ -124,7 +135,7 @@ async def closure():
self._closing = asyncio.ensure_future(closure())

def _maybe_close(self, msg):
if msg.method in ('REGISTER', 'SUBSCRIBE'):
if msg.method in ('REGISTER', 'SUBSCRIBE') and not self.inbound:
expire = int(msg.headers.get('Expires', 0))
delay = int(expire * 1.1) if expire else None
self.close_later(delay)
Expand Down Expand Up @@ -231,7 +242,6 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self._nonce = None
self._closing = None
self._incoming = asyncio.Queue()

async def receive_message(self, msg):
Expand Down Expand Up @@ -262,10 +272,17 @@ async def refresh(self, headers=None, expires=1800, *args, **kwargs):
return await self.request(self.original_msg.method, headers=headers, *args, **kwargs)

async def close(self, fast=False, headers=None, *args, **kwargs):
headers = CIMultiDict(headers or {})
if 'Expires' not in headers:
headers['Expires'] = 0
return await self.request(self.original_msg.method, headers=headers, *args, **kwargs)
if not self._closed:
self._closed = True
result = None
if not self.inbound and self.original_msg.method in ('REGISTER', 'SUBSCRIBE'):
headers = CIMultiDict(headers or {})
if 'Expires' not in headers:
headers['Expires'] = 0
result = await self.request(self.original_msg.method, headers=headers, *args, **kwargs)
self.peer._close_dialog(self.call_id)
self._close()
return result

async def notify(self, *args, headers=None, **kwargs):
headers = CIMultiDict(headers or {})
Expand Down Expand Up @@ -350,6 +367,17 @@ async def handle_completed_state(msg):
else:
return await self._receive_request(msg)

async def _receive_request(self, msg):
self.peer._bookkeeping(msg, self.call_id)

if 'tag' in msg.from_details['params']:
self.to_details['params']['tag'] = msg.from_details['params']['tag']

if msg.method == 'BYE':
self._closed = True

self._maybe_close(msg)

@property
def state(self):
return self._state
Expand All @@ -358,6 +386,9 @@ async def start(self, *, expires=None):
# TODO: this is a hack
self.peer.send_message(self.original_msg)

async def recv(self):
return await self._queue.get()

async def wait_for_terminate(self):
while not self._waiter.done():
yield await self._queue.get()
Expand All @@ -367,13 +398,6 @@ async def ready(self):
if msg.status_code != 200:
raise RuntimeError("INVITE failed with {}".format(msg.status_code))

def ack(self, msg, headers=None, *args, **kwargs):
headers = CIMultiDict(headers or {})

headers['Via'] = msg.headers['Via']
ack = self._prepare_request('ACK', cseq=msg.cseq, to_details=msg.to_details, headers=headers, *args, **kwargs)
self.peer.send_message(ack)

def end_transaction(self, transaction):
to_delete = list()
for method, values in self.transactions.items():
Expand All @@ -386,10 +410,22 @@ def end_transaction(self, transaction):
del self.transactions[item[0]][item[1]]

async def close(self):
msg = self._prepare_request('BYE')
transaction = UnreliableTransaction(self, original_msg=msg, loop=self.app.loop)
self.transactions[msg.method][msg.cseq] = transaction
return await transaction.start()
if not self._closed:
self._closed = True

msg = None
if self._state == CallState.Terminated:
msg = self._prepare_request('BYE')
elif self._state != CallState.Completed:
msg = self._prepare_request('CANCEL')

if msg:
transaction = UnreliableTransaction(self, original_msg=msg, loop=self.app.loop)
self.transactions[msg.method][msg.cseq] = transaction
await transaction.start()

self.peer._close_dialog(self.call_id)
self._close()

def _close(self):
pass
29 changes: 27 additions & 2 deletions aiosip/peers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def send_message(self, msg):
self._protocol.send_message(msg, addr=self.peer_addr)

def _create_dialog(self, method, from_details, to_details, contact_details=None, password=None, call_id=None,
cseq=0):
headers=None, payload=None, cseq=0, inbound=False):

if not call_id:
call_id = str(uuid.uuid4())
Expand Down Expand Up @@ -73,12 +73,34 @@ def _create_dialog(self, method, from_details, to_details, contact_details=None,
call_id=call_id,
peer=self,
password=password,
headers=headers,
payload=payload,
cseq=cseq,
inbound=inbound,
)
LOG.debug('Creating: %s', dialog)
self._dialogs[call_id] = dialog
return dialog

async def request(self, method, from_details, to_details, contact_details=None, password=None, call_id=None,
headers=None, payload=None, cseq=0):
dialog = self._create_dialog(method=method,
from_details=from_details,
to_details=to_details,
contact_details=contact_details,
headers=headers,
payload=payload,
password=password,
call_id=call_id, cseq=cseq)
try:
response = await dialog.start()
dialog.status_code = response.status_code
dialog.status_message = response.status_message
return dialog
except asyncio.CancelledError:
dialog.cancel()
raise

async def subscribe(self, from_details, to_details, contact_details=None, password=None, call_id=None, cseq=0,
expires=3600):
dialog = self._create_dialog(method="SUBSCRIBE",
Expand Down Expand Up @@ -113,7 +135,8 @@ async def register(self, from_details, to_details, contact_details=None, passwor
dialog.cancel()
raise

async def invite(self, from_details, to_details, contact_details=None, password=None, call_id=None, cseq=0):
async def invite(self, from_details, to_details, contact_details=None, password=None, call_id=None, headers=None,
payload=None, cseq=0):

if not call_id:
call_id = str(uuid.uuid4())
Expand Down Expand Up @@ -146,6 +169,8 @@ async def invite(self, from_details, to_details, contact_details=None, password=
call_id=call_id,
peer=self,
contact_details=contact_details,
headers=headers,
payload=payload,
password=None,
cseq=cseq
)
Expand Down

0 comments on commit 4770c33

Please sign in to comment.