Skip to content

Commit

Permalink
Add extra is cancelled check to terminate early.
Browse files Browse the repository at this point in the history
Add upper bound to number of workers.
Update tests for additional cancel checks.
  • Loading branch information
rtibbles committed Apr 1, 2022
1 parent bd90eeb commit a158f54
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
6 changes: 5 additions & 1 deletion kolibri/core/content/management/commands/importcontent.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ def _transfer( # noqa: max-complexity=16
# the actual destination file that it is moved to. To add tolerance, we divide
# the number of file descriptors that could be allocated to this task by four,
# which should give us leeway in case of unforeseen descriptor use during the process.
max_workers = min(1, max_descriptors_per_download_task // 4)
max_workers = min(
max_workers, min(1, max_descriptors_per_download_task // 4)
)

with executor(max_workers=max_workers) as executor:
batch_size = 100
Expand All @@ -345,6 +347,8 @@ def _transfer( # noqa: max-complexity=16
break
future_file_transfers = {}
for i in range(batch_size):
if self.is_cancelled():
break
if files_to_download:
f = files_to_download.pop()
filename = get_content_file_name(f)
Expand Down
26 changes: 19 additions & 7 deletions kolibri/core/content/test/test_import_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ def __eq__(self, other):
return Any()


class FalseThenTrue(object):
def __init__(self, times=1):
self.times = times
self.count = 0

def __call__(self):
self.count += 1
if self.count > self.times:
return True
return False


@patch(
"kolibri.core.content.management.commands.importchannel.channel_import.import_channel_from_local_db"
)
Expand Down Expand Up @@ -290,7 +302,7 @@ def test_remote_cancel_immediately(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, False, False, True, True, True],
side_effect=FalseThenTrue(times=6),
)
def test_remote_cancel_during_transfer(
self,
Expand Down Expand Up @@ -344,7 +356,7 @@ def test_remote_cancel_during_transfer(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True, True],
side_effect=FalseThenTrue(times=3),
)
def test_remote_cancel_after_file_copy_file_not_deleted(
self,
Expand Down Expand Up @@ -421,7 +433,7 @@ def test_local_cancel_immediately(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True, True],
side_effect=FalseThenTrue(times=3),
)
def test_local_cancel_during_transfer(
self,
Expand All @@ -438,7 +450,7 @@ def test_local_cancel_during_transfer(
fd2, local_src_path = tempfile.mkstemp()
os.close(fd1)
os.close(fd2)
local_path_mock.side_effect = [local_dest_path, local_src_path]
local_path_mock.side_effect = [local_dest_path, local_src_path] * 10
FileCopyMock.return_value.__iter__.side_effect = TransferCanceled()
get_import_export_mock.return_value = (
1,
Expand All @@ -460,7 +472,7 @@ def test_local_cancel_during_transfer(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True, True, True],
side_effect=FalseThenTrue(times=3),
)
def test_remote_cancel_during_connect_error(
self,
Expand Down Expand Up @@ -736,7 +748,7 @@ def test_remote_import_no_space_after_first_download(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, False, False, True, True, True, True],
side_effect=FalseThenTrue(times=6),
)
def test_remote_import_chunkedencodingerror(
self,
Expand Down Expand Up @@ -782,7 +794,7 @@ def test_remote_import_chunkedencodingerror(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True],
side_effect=FalseThenTrue(times=3),
)
def test_local_import_oserror_dne(
self,
Expand Down

0 comments on commit a158f54

Please sign in to comment.