Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent import file download #7521

Merged
merged 8 commits into from
Oct 19, 2020
98 changes: 59 additions & 39 deletions kolibri/core/content/management/commands/importcontent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os

import concurrent.futures
import requests
from django.core.management.base import CommandError
from le_utils.constants import content_kinds
Expand Down Expand Up @@ -285,11 +286,10 @@ def _transfer( # noqa: max-complexity=16
with self.start_progress(
total=total_bytes_to_transfer + dummy_bytes_for_annotation
) as overall_progress_update:
exception = None # Exception that is not caught by the retry logic

if method == DOWNLOAD_METHOD:
session = requests.Session()

file_transfers = []
rtibbles marked this conversation as resolved.
Show resolved Hide resolved
for f in files_to_download:

if self.is_cancelled():
Expand Down Expand Up @@ -318,6 +318,7 @@ def _transfer( # noqa: max-complexity=16
filetransfer = transfer.FileDownload(
url, dest, session=session, cancel_check=self.is_cancelled
)
file_transfers.append((f, filetransfer))
elif method == COPY_METHOD:
try:
srcpath = paths.get_content_storage_file_path(
Expand All @@ -330,37 +331,59 @@ def _transfer( # noqa: max-complexity=16
filetransfer = transfer.FileCopy(
srcpath, dest, cancel_check=self.is_cancelled
)

try:
status = self._start_file_transfer(
f, filetransfer, overall_progress_update
)

if self.is_cancelled():
break

if status == FILE_SKIPPED:
number_of_skipped_files += 1
else:
file_checksums_to_annotate.append(f.id)
transferred_file_size += f.file_size
except transfer.TransferCanceled:
break
except Exception as e:
logger.error(
"An error occurred during content import: {}".format(e)
)
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 404
) or (isinstance(e, OSError) and e.errno == 2):
# Continue file import when the current file is not found from the source and is skipped.
overall_progress_update(f.file_size)
number_of_skipped_files += 1
continue
else:
exception = e
break
file_transfers.append((f, filetransfer))

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
batch_size = 100
# ThreadPoolExecutor allows us to download files concurrently,
# greatly reducing download time in most cases. However, loading
# all the downloads into the pool requires considerable memory,
# so we divide the downloads into batches to keep memory usage down.
# In batches of 100, total RAM usage doesn't exceed 250MB in testing.
while len(file_transfers) > 0:
future_file_transfers = {}
for i in range(batch_size):
if len(file_transfers) > 0:
f, filetransfer = file_transfers.pop()
future = executor.submit(
self._start_file_transfer,
f,
filetransfer,
overall_progress_update,
)
future_file_transfers[future] = (f, filetransfer)

for future in concurrent.futures.as_completed(
future_file_transfers
):
f, filetransfer = future_file_transfers[future]
try:
status = future.result()
if self.is_cancelled():
break

if status == FILE_SKIPPED:
number_of_skipped_files += 1
else:
file_checksums_to_annotate.append(f.id)
transferred_file_size += f.file_size
except transfer.TransferCanceled:
break
except Exception as e:
logger.error(
"An error occurred during content import: {}".format(e)
)
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 404
) or (isinstance(e, OSError) and e.errno == 2):
# Continue file import when the current file is not found from the source and is skipped.
overall_progress_update(f.file_size)
number_of_skipped_files += 1
continue
else:
self.exception = e
break

with db_task_write_lock:
annotation.set_content_visibility(
Expand Down Expand Up @@ -395,8 +418,8 @@ def _transfer( # noqa: max-complexity=16

overall_progress_update(dummy_bytes_for_annotation)

if exception:
raise exception
if self.exception:
raise self.exception

if self.is_cancelled():
self.cancel()
Expand All @@ -408,13 +431,10 @@ def _start_file_transfer(self, f, filetransfer, overall_progress_update):
* FILE_TRANSFERRED - successfully transfer the file.
* FILE_SKIPPED - the file does not exist so it is skipped.
"""
with filetransfer, self.start_progress(
total=filetransfer.total_size
) as file_dl_progress_update:
with filetransfer:
for chunk in filetransfer:
length = len(chunk)
overall_progress_update(length)
rtibbles marked this conversation as resolved.
Show resolved Hide resolved
file_dl_progress_update(length)

# Ensure that if for some reason the total file size for the transfer
# is less than what we have marked in the database that we make up
Expand Down
42 changes: 19 additions & 23 deletions kolibri/core/content/test/test_import_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def test_remote_cancel_immediately(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True],
side_effect=[False, False, False, True, True, True],
)
def test_remote_cancel_during_transfer(
self,
Expand Down Expand Up @@ -399,7 +399,7 @@ def test_local_cancel_immediately(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True],
side_effect=[False, True, True],
)
def test_local_cancel_during_transfer(
self,
Expand All @@ -425,22 +425,20 @@ def test_local_cancel_during_transfer(
cancel_mock.assert_called_with()
annotation_mock.set_content_visibility.assert_called()

@patch("kolibri.core.content.management.commands.importcontent.len")
@patch(
"kolibri.core.content.utils.transfer.Transfer.next",
side_effect=ConnectionError("connection error"),
)
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True, True],
side_effect=[False, True, True, True],
)
def test_remote_cancel_during_connect_error(
self,
is_cancelled_mock,
cancel_mock,
next_mock,
len_mock,
annotation_mock,
files_to_transfer_mock,
channel_list_status_mock,
Expand All @@ -467,7 +465,6 @@ def test_remote_cancel_during_connect_error(
node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"],
)
cancel_mock.assert_called_with()
len_mock.assert_not_called()
annotation_mock.set_content_visibility.assert_called()

@patch("kolibri.core.content.management.commands.importcontent.logger.warning")
Expand Down Expand Up @@ -527,7 +524,7 @@ def test_remote_import_httperror_404(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, False, True, True, True],
side_effect=[False, False, True, True, True, True],
)
@patch(
"kolibri.core.content.management.commands.importcontent.paths.get_content_storage_file_path",
Expand All @@ -552,7 +549,7 @@ def test_remote_import_httperror_502(
LocalFile.objects.filter(
files__contentnode__channel_id=self.the_channel_id
).update(file_size=1)
files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10)
files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10)
call_command("importcontent", "network", self.the_channel_id)

sleep_mock.assert_called_once()
Expand Down Expand Up @@ -587,7 +584,6 @@ def test_remote_import_httperror_500(
self.the_channel_id, [], node_ids=None, exclude_node_ids=None, public=False
)

@patch("kolibri.core.content.management.commands.importcontent.len")
@patch("kolibri.core.content.utils.transfer.sleep")
@patch(
"kolibri.core.content.utils.transfer.Transfer.next",
Expand All @@ -596,15 +592,14 @@ def test_remote_import_httperror_500(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, False, False, True, True, True],
side_effect=[False, False, False, True, True, True, True],
)
def test_remote_import_chunkedencodingerror(
self,
is_cancelled_mock,
cancel_mock,
error_mock,
sleep_mock,
len_mock,
annotation_mock,
files_to_transfer_mock,
channel_list_status_mock,
Expand All @@ -630,9 +625,7 @@ def test_remote_import_chunkedencodingerror(
self.the_channel_id,
node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"],
)
sleep_mock.assert_called_once()
cancel_mock.assert_called_with()
len_mock.assert_not_called()
annotation_mock.set_content_visibility.assert_called()

@patch("kolibri.core.content.management.commands.importcontent.logger.warning")
Expand All @@ -642,7 +635,7 @@ def test_remote_import_chunkedencodingerror(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True, True],
side_effect=[False, True],
)
def test_local_import_oserror_dne(
self,
Expand All @@ -659,7 +652,7 @@ def test_local_import_oserror_dne(
LocalFile.objects.filter(
files__contentnode__channel_id=self.the_channel_id
).update(file_size=1)
files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10)
files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10)
call_command("importcontent", "disk", self.the_channel_id, "destination")
self.assertTrue("1 files are skipped" in logger_mock.call_args_list[0][0][0])
annotation_mock.set_content_visibility.assert_called()
Expand All @@ -681,7 +674,7 @@ def test_local_import_oserror_permission_denied(
dest_path = tempfile.mkstemp()[1]
path_mock.side_effect = [dest_path, "/test/dne"]
getsize_mock.side_effect = ["1", OSError("Permission denied")]
files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10)
files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10)
with self.assertRaises(OSError):
call_command("importcontent", "disk", self.the_channel_id, "destination")
self.assertTrue("Permission denied" in logger_mock.call_args_list[0][0][0])
Expand All @@ -698,7 +691,7 @@ def test_local_import_oserror_permission_denied(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, False, True, True],
side_effect=[False, False, True, True, True],
)
def test_local_import_source_corrupted(
self,
Expand All @@ -718,9 +711,11 @@ def test_local_import_source_corrupted(
).update(file_size=1)
path_mock.side_effect = [local_dest_path, local_src_path]
files_to_transfer_mock.return_value = (
LocalFile.objects.filter(
files__contentnode="32a941fb77c2576e8f6b294cde4c3b0c"
),
[
LocalFile.objects.filter(
files__contentnode="32a941fb77c2576e8f6b294cde4c3b0c"
).first()
],
10,
)
call_command(
Expand All @@ -731,7 +726,7 @@ def test_local_import_source_corrupted(
node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"],
)
cancel_mock.assert_called_with()
remove_mock.assert_called_with(local_dest_path)
remove_mock.assert_any_call(local_dest_path)

@patch(
"kolibri.core.content.management.commands.importcontent.os.path.isfile",
Expand Down Expand Up @@ -908,7 +903,8 @@ def test_remote_import_full_import(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False] * 32 + [True, True, True],
# We have to return False for 30 1-second checks to ensure we actually retry.
side_effect=[False] * 32 + [True] * 5,
)
def test_remote_import_file_compressed_on_gcs(
self,
Expand All @@ -930,7 +926,7 @@ def test_remote_import_file_compressed_on_gcs(
LocalFile.objects.filter(
files__contentnode__channel_id=self.the_channel_id
).update(file_size=1)
files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10)
files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10)

m = mock_open()
with patch("kolibri.core.content.utils.transfer.open", m) as open_mock:
Expand Down
3 changes: 3 additions & 0 deletions kolibri/core/content/utils/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(
# record whether the destination file already exists, so it can be checked, but don't error out
self.dest_exists = os.path.isfile(dest)

def start(self):
# open the destination file for writing
self.dest_file_obj = open(self.dest_tmp, "wb")

Expand Down Expand Up @@ -159,6 +160,7 @@ def __init__(self, *args, **kwargs):
super(FileDownload, self).__init__(*args, **kwargs)

def start(self):
super(FileDownload, self).start()
# initiate the download, check for status errors, and calculate download size
try:
self.response = self.session.get(
Expand Down Expand Up @@ -276,6 +278,7 @@ def start(self):
assert (
not self.started
), "File copy has already been started, and cannot be started again"
super(FileCopy, self).start()
self.total_size = os.path.getsize(self.source)
self.source_file_obj = open(self.source, "rb")
self.started = True
Expand Down
8 changes: 8 additions & 0 deletions kolibri/core/tasks/management/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class AsyncCommand(BaseCommand):

def __init__(self, *args, **kwargs):
self.progresstrackers = []

# The importcontent command stores an unhandled exception and re-raises it
# later. We check it in is_cancelled so that we cancel remaining tasks once
# an unhandled exception has occurred.
self.exception = None

super(AsyncCommand, self).__init__(*args, **kwargs)

def _update_all_progress(self, progress_fraction, progress):
Expand Down Expand Up @@ -129,6 +135,8 @@ def start_progress(self, total=100):
return tracker

def is_cancelled(self):
if self.exception is not None:
return True
try:
self.check_for_cancel()
return False
Expand Down