Skip to content

Commit

Permalink
Merge pull request #6891 from drew2a/fix/6890
Browse files Browse the repository at this point in the history
EVA: Process future cancellation
  • Loading branch information
drew2a authored May 2, 2022
2 parents ac1f8d1 + 288a87d commit d0f295f
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 41 deletions.
4 changes: 4 additions & 0 deletions src/tribler/core/components/ipv8/eva/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ class ValueException(TransferException):

class TransferLimitException(TransferException):
"""Maximum simultaneous transfers limit exceeded"""


class TransferCancelledException(TransferException):
"""Raised in the case that future was cancelled"""
16 changes: 14 additions & 2 deletions src/tribler/core/components/ipv8/eva/tests/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from ipv8.test.base import TestBase
from ipv8.types import Peer

from tribler.core.components.ipv8.eva.exceptions import SizeException, TimeoutException, TransferException, \
from tribler.core.components.ipv8.eva.exceptions import SizeException, TimeoutException, TransferCancelledException, \
TransferException, \
TransferLimitException, ValueException
from tribler.core.components.ipv8.eva.payload import Acknowledgement, Data, Error, WriteRequest
from tribler.core.components.ipv8.eva.protocol import EVAProtocol
Expand Down Expand Up @@ -117,6 +118,17 @@ async def test_one_block_binary(self):
await self.alice.data_has_been_sent.wait()
assert self.alice.most_recent_sent_data == data

async def test_cancel_send_binary(self):
future = self.alice.eva.send_binary(self.bob.my_peer, b'test1', b'1234')
transfer = self.alice.eva.outgoing[self.bob.my_peer]

future.cancel()
await self.alice.error_has_been_raised.wait()

assert isinstance(self.alice.most_recent_received_exception, TransferCancelledException)
assert transfer.finished
assert not self.alice.eva.outgoing

async def test_block_count_fits_a_single_window(self):
# In this test we send three transfers from Alice to Bob to ensure that
# protocol works well in the case of all blocks roughly fits just a single window.
Expand Down Expand Up @@ -595,7 +607,7 @@ def test_send_scheduled_with_transfers_limit(eva: EVAProtocol):
assert len(eva.scheduled['peer3']) == 1


def test_send_write_request_finished_transfer(eva: EVAProtocol):
async def test_send_write_request_finished_transfer(eva: EVAProtocol):
transfer = OutgoingTransfer(container=eva.outgoing, peer=Mock(), info=b'123', data=b'456', nonce=42,
settings=EVASettings())
transfer.finished = True
Expand Down
29 changes: 23 additions & 6 deletions src/tribler/core/components/ipv8/eva/transfer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import asyncio
import logging
import time
from asyncio import Future
from typing import Dict, Optional, Type

from ipv8.types import Peer

from tribler.core.components.ipv8.eva.aliases import TransferCompleteCallback, TransferErrorCallback
from tribler.core.components.ipv8.eva.exceptions import TimeoutException, TransferException
from tribler.core.components.ipv8.eva.exceptions import TimeoutException, TransferCancelledException, TransferException
from tribler.core.components.ipv8.eva.result import TransferResult
from tribler.core.components.ipv8.eva.settings import EVASettings

Expand All @@ -31,17 +30,25 @@ def __init__(self, container: Dict[Peer, Type[Transfer]], peer: Peer, info: byte
self.on_complete = on_complete
self.on_error = on_error
self.settings = settings
self.future = Future()
self.future = asyncio.get_running_loop().create_future()
self.logger = logging.getLogger(self.__class__.__name__)
self.updated = 0
self.attempt = 0
self.finished = False

self.future.add_done_callback(self.on_future_cancelled)

def update(self):
self.updated = time.time()

def _release(self):
if self.container:
self.container.pop(self.peer, None)
self.container = None
self.finished = True

def finish(self, *, result: Optional[TransferResult] = None, exception: Optional[TransferException] = None):
if self.finished:
if self.finished or self.future.done():
return

if exception:
Expand All @@ -60,8 +67,18 @@ def finish(self, *, result: Optional[TransferResult] = None, exception: Optional
if self.on_complete:
asyncio.create_task(self.on_complete(result))

self.finished = True
self.container = None
self._release()

def on_future_cancelled(self, _):
if not self.future.cancelled():
return

if self.on_error:
self.logger.warning('Future was cancelled')
exception = TransferCancelledException('The future was cancelled', self)
asyncio.create_task(self.on_error(self.peer, exception))

self._release()

async def terminate_by_timeout_task(self):
timeout = self.settings.timeout_interval_in_sec
Expand Down
6 changes: 2 additions & 4 deletions src/tribler/core/components/ipv8/eva/transfer/incoming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from typing import List, Optional

from tribler.core.components.ipv8.eva.exceptions import TransferException
from tribler.core.components.ipv8.eva.payload import Acknowledgement
from tribler.core.components.ipv8.eva.result import TransferResult
from tribler.core.components.ipv8.eva.transfer.base import Transfer
Expand Down Expand Up @@ -47,7 +46,6 @@ def make_acknowledgement(self) -> Acknowledgement:
self.logger.debug(f'Transfer window: {self.window}')
return Acknowledgement(self.window.start, len(self.window.blocks), self.nonce)

def finish(self, *, result: Optional[TransferResult] = None, exception: Optional[TransferException] = None):
self.container.pop(self.peer, None)
super().finish(result=result, exception=exception)
def _release(self):
super()._release()
self.data_list = None
9 changes: 4 additions & 5 deletions src/tribler/core/components/ipv8/eva/transfer/outgoing.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

import math
from typing import Iterable, Optional
from typing import Iterable

from tribler.core.components.ipv8.eva.exceptions import SizeException, TransferException
from tribler.core.components.ipv8.eva.exceptions import SizeException
from tribler.core.components.ipv8.eva.payload import Data
from tribler.core.components.ipv8.eva.result import TransferResult
from tribler.core.components.ipv8.eva.transfer.base import Transfer
Expand Down Expand Up @@ -37,9 +37,8 @@ def on_acknowledgement(self, ack_number: int, window_size: int) -> Iterable[Data
if len(block) == 0:
return

def finish(self, *, result: Optional[TransferResult] = None, exception: Optional[TransferException] = None):
self.container.pop(self.peer, None)
super().finish(result=result, exception=exception)
def _release(self):
super()._release()
self.data = None

def _get_block(self, number: int) -> bytes:
Expand Down
42 changes: 36 additions & 6 deletions src/tribler/core/components/ipv8/eva/transfer/tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from asyncio import InvalidStateError
from unittest.mock import AsyncMock, Mock, patch

Expand All @@ -8,22 +9,26 @@
from tribler.core.components.ipv8.eva.transfer.base import Transfer


# pylint: disable=redefined-outer-name
# pylint: disable=redefined-outer-name, protected-access

@pytest.fixture
def transfer() -> Transfer:
return Transfer(container=Mock(), info=b'info', data_size=100, nonce=0, on_complete=AsyncMock(), peer=Mock(),
settings=EVASettings(block_size=2))
async def transfer() -> Transfer:
peer = Mock()
container = {}
transfer = Transfer(container=container, info=b'info', data_size=100, nonce=0, on_complete=AsyncMock(), peer=peer,
settings=EVASettings(block_size=2))
container[peer] = transfer
return transfer


@patch('time.time', Mock(return_value=42))
def test_update(transfer: Transfer):
async def test_update(transfer: Transfer):
transfer.update()

assert transfer.updated == 42


def test_finish_double_call(transfer: Transfer):
async def test_finish_double_call(transfer: Transfer):
assert not transfer.finished

# The first call of the finish method should process exception and result
Expand All @@ -39,6 +44,31 @@ def test_finish_double_call(transfer: Transfer):
assert transfer.container == 'any'


async def test_release(transfer: Transfer):
container = transfer.container
assert container

transfer._release()

assert transfer.finished
assert not transfer.container
assert not container


async def test_release_double_call(transfer: Transfer):
transfer._release()
transfer._release()

assert transfer.finished


async def test_finish_cancelled(transfer: Transfer):
transfer.future.cancel()
transfer.finish(result=Mock())
await asyncio.sleep(0.1) # sleep to process cancel callback
assert transfer.finished


async def test_finish_with_result(transfer: Transfer):
result = Mock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@pytest.fixture
def incoming_transfer() -> IncomingTransfer:
async def incoming_transfer() -> IncomingTransfer:
settings = EVASettings(block_size=10)
eva = EVAProtocol(Mock(), settings=settings)
peer = Mock()
Expand All @@ -25,7 +25,7 @@ def incoming_transfer() -> IncomingTransfer:
return transfer


def test_on_data_normal_packet(incoming_transfer: IncomingTransfer):
async def test_on_data_normal_packet(incoming_transfer: IncomingTransfer):
incoming_transfer.window = Mock(is_finished=Mock(return_value=False))
incoming_transfer.make_acknowledgement = Mock()
incoming_transfer.update = Mock()
Expand All @@ -39,7 +39,7 @@ def test_on_data_normal_packet(incoming_transfer: IncomingTransfer):
assert not incoming_transfer.make_acknowledgement.called


def test_on_data_window_is_finished(incoming_transfer: IncomingTransfer):
async def test_on_data_window_is_finished(incoming_transfer: IncomingTransfer):
incoming_transfer.window = Mock(is_finished=Mock(return_value=True))
incoming_transfer.make_acknowledgement = Mock()
incoming_transfer.update = Mock()
Expand All @@ -54,7 +54,7 @@ def test_on_data_window_is_finished(incoming_transfer: IncomingTransfer):
assert not incoming_transfer.finished


def test_on_data_window_is_last_and_finished(incoming_transfer: IncomingTransfer):
async def test_on_data_window_is_last_and_finished(incoming_transfer: IncomingTransfer):
incoming_transfer.window = Mock(is_finished=Mock(return_value=True))
incoming_transfer.make_acknowledgement = Mock()
incoming_transfer.update = Mock()
Expand All @@ -71,7 +71,7 @@ def test_on_data_window_is_last_and_finished(incoming_transfer: IncomingTransfer
assert incoming_transfer.finish.called


def test_on_data_final_packet(incoming_transfer: IncomingTransfer):
async def test_on_data_final_packet(incoming_transfer: IncomingTransfer):
incoming_transfer.window = TransferWindow(0, 10)
index = 3

Expand All @@ -81,7 +81,7 @@ def test_on_data_final_packet(incoming_transfer: IncomingTransfer):
assert len(incoming_transfer.window.blocks) == index + 1


def test_make_acknowledgement_no_window(incoming_transfer: IncomingTransfer):
async def test_make_acknowledgement_no_window(incoming_transfer: IncomingTransfer):
assert not incoming_transfer.window

acknowledgement = incoming_transfer.make_acknowledgement()
Expand All @@ -92,7 +92,7 @@ def test_make_acknowledgement_no_window(incoming_transfer: IncomingTransfer):
assert acknowledgement.window_size == incoming_transfer.settings.window_size


def test_make_acknowledgement_next_window(incoming_transfer: IncomingTransfer):
async def test_make_acknowledgement_next_window(incoming_transfer: IncomingTransfer):
incoming_transfer.window = TransferWindow(10, 7)
incoming_transfer.window.blocks = [b'd', b'a', b't', b'a', None, None, None]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# pylint: disable=redefined-outer-name, protected-access

@pytest.fixture
def outgoing_transfer() -> OutgoingTransfer:
async def outgoing_transfer() -> OutgoingTransfer:
settings = EVASettings(block_size=2)
eva = EVAProtocol(Mock(), settings=settings)
peer = Mock()
Expand All @@ -25,20 +25,20 @@ def outgoing_transfer() -> OutgoingTransfer:
return transfer


def test_size_exception():
async def test_size_exception():
settings = EVASettings(binary_size_limit=10)
limit = settings.binary_size_limit
with pytest.raises(SizeException):
OutgoingTransfer(container=Mock(), info=b'info', data=b'd' * (limit + 1), nonce=0, on_complete=AsyncMock(),
peer=Mock(), settings=settings)


def test_block_count(outgoing_transfer: OutgoingTransfer):
async def test_block_count(outgoing_transfer: OutgoingTransfer):
# data is b'binary_data' and block_size is `2`
assert outgoing_transfer.block_count == 6


def test_on_acknowledgement(outgoing_transfer: OutgoingTransfer):
async def test_on_acknowledgement(outgoing_transfer: OutgoingTransfer):
assert not outgoing_transfer.acknowledgement_received
assert not outgoing_transfer.updated

Expand All @@ -59,7 +59,7 @@ def test_on_acknowledgement(outgoing_transfer: OutgoingTransfer):
assert all(a.data == e.data and a.number == e.number for a, e in zip(actual, expected))


def test_on_final_acknowledgement(outgoing_transfer: OutgoingTransfer):
async def test_on_final_acknowledgement(outgoing_transfer: OutgoingTransfer):
outgoing_transfer.finish = Mock()
data_list = list(outgoing_transfer.on_acknowledgement(ack_number=10, window_size=16))
expected_result = TransferResult(peer=outgoing_transfer.peer, info=outgoing_transfer.info,
Expand All @@ -79,7 +79,7 @@ async def test_finish(outgoing_transfer: OutgoingTransfer):
assert not container


def test_get_block(outgoing_transfer: OutgoingTransfer):
async def test_get_block(outgoing_transfer: OutgoingTransfer):
assert outgoing_transfer._get_block(0) == b'bi'
assert outgoing_transfer._get_block(1) == b'na'
...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
# pylint: disable=redefined-outer-name

@pytest.fixture
def window() -> TransferWindow:
async def window() -> TransferWindow:
return TransferWindow(start=0, size=10)


def test_constructor(window: TransferWindow):
async def test_constructor(window: TransferWindow):
assert len(window.blocks) == 10
assert all(not block for block in window.blocks)


def test_add(window: TransferWindow):
async def test_add(window: TransferWindow):
window.add(0, b'first')
window.add(0, b'first')
window.add(9, b'last')
Expand All @@ -25,15 +25,15 @@ def test_add(window: TransferWindow):
assert not window.is_finished()


def test_finished(window: TransferWindow):
async def test_finished(window: TransferWindow):
for i in range(10):
window.add(i, b'block')

assert window.processed == 10
assert window.is_finished()


def test_consecutive_blocks(window: TransferWindow):
async def test_consecutive_blocks(window: TransferWindow):
window.add(0, b'first')
window.add(1, b'second')
window.add(3, b'fourth')
Expand Down

0 comments on commit d0f295f

Please sign in to comment.