Skip to content

Commit

Permalink
Merge pull request #7521 from kollivier/sonic_downloads
Browse files Browse the repository at this point in the history
 Concurrent import file download
  • Loading branch information
rtibbles authored Oct 19, 2020
2 parents f287108 + 5530496 commit ef27835
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 62 deletions.
98 changes: 59 additions & 39 deletions kolibri/core/content/management/commands/importcontent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os

import concurrent.futures
import requests
from django.core.management.base import CommandError
from le_utils.constants import content_kinds
Expand Down Expand Up @@ -285,11 +286,10 @@ def _transfer( # noqa: max-complexity=16
with self.start_progress(
total=total_bytes_to_transfer + dummy_bytes_for_annotation
) as overall_progress_update:
exception = None # Exception that is not caught by the retry logic

if method == DOWNLOAD_METHOD:
session = requests.Session()

file_transfers = []
for f in files_to_download:

if self.is_cancelled():
Expand Down Expand Up @@ -318,6 +318,7 @@ def _transfer( # noqa: max-complexity=16
filetransfer = transfer.FileDownload(
url, dest, session=session, cancel_check=self.is_cancelled
)
file_transfers.append((f, filetransfer))
elif method == COPY_METHOD:
try:
srcpath = paths.get_content_storage_file_path(
Expand All @@ -330,37 +331,59 @@ def _transfer( # noqa: max-complexity=16
filetransfer = transfer.FileCopy(
srcpath, dest, cancel_check=self.is_cancelled
)

try:
status = self._start_file_transfer(
f, filetransfer, overall_progress_update
)

if self.is_cancelled():
break

if status == FILE_SKIPPED:
number_of_skipped_files += 1
else:
file_checksums_to_annotate.append(f.id)
transferred_file_size += f.file_size
except transfer.TransferCanceled:
break
except Exception as e:
logger.error(
"An error occurred during content import: {}".format(e)
)
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 404
) or (isinstance(e, OSError) and e.errno == 2):
# Continue file import when the current file is not found from the source and is skipped.
overall_progress_update(f.file_size)
number_of_skipped_files += 1
continue
else:
exception = e
break
file_transfers.append((f, filetransfer))

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
batch_size = 100
# ThreadPoolExecutor allows us to download files concurrently,
# greatly reducing download time in most cases. However, loading
# all the downloads into the pool requires considerable memory,
# so we divide the downloads into batches to keep memory usage down.
# In batches of 100, total RAM usage doesn't exceed 250MB in testing.
while len(file_transfers) > 0:
future_file_transfers = {}
for i in range(batch_size):
if len(file_transfers) > 0:
f, filetransfer = file_transfers.pop()
future = executor.submit(
self._start_file_transfer,
f,
filetransfer,
overall_progress_update,
)
future_file_transfers[future] = (f, filetransfer)

for future in concurrent.futures.as_completed(
future_file_transfers
):
f, filetransfer = future_file_transfers[future]
try:
status = future.result()
if self.is_cancelled():
break

if status == FILE_SKIPPED:
number_of_skipped_files += 1
else:
file_checksums_to_annotate.append(f.id)
transferred_file_size += f.file_size
except transfer.TransferCanceled:
break
except Exception as e:
logger.error(
"An error occurred during content import: {}".format(e)
)
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 404
) or (isinstance(e, OSError) and e.errno == 2):
# Continue file import when the current file is not found from the source and is skipped.
overall_progress_update(f.file_size)
number_of_skipped_files += 1
continue
else:
self.exception = e
break

with db_task_write_lock:
annotation.set_content_visibility(
Expand Down Expand Up @@ -395,8 +418,8 @@ def _transfer( # noqa: max-complexity=16

overall_progress_update(dummy_bytes_for_annotation)

if exception:
raise exception
if self.exception:
raise self.exception

if self.is_cancelled():
self.cancel()
Expand All @@ -408,13 +431,10 @@ def _start_file_transfer(self, f, filetransfer, overall_progress_update):
* FILE_TRANSFERRED - successfully transfer the file.
* FILE_SKIPPED - the file does not exist so it is skipped.
"""
with filetransfer, self.start_progress(
total=filetransfer.total_size
) as file_dl_progress_update:
with filetransfer:
for chunk in filetransfer:
length = len(chunk)
overall_progress_update(length)
file_dl_progress_update(length)

# Ensure that if for some reason the total file size for the transfer
# is less than what we have marked in the database that we make up
Expand Down
42 changes: 19 additions & 23 deletions kolibri/core/content/test/test_import_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def test_remote_cancel_immediately(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True],
side_effect=[False, False, False, True, True, True],
)
def test_remote_cancel_during_transfer(
self,
Expand Down Expand Up @@ -399,7 +399,7 @@ def test_local_cancel_immediately(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True],
side_effect=[False, True, True],
)
def test_local_cancel_during_transfer(
self,
Expand All @@ -425,22 +425,20 @@ def test_local_cancel_during_transfer(
cancel_mock.assert_called_with()
annotation_mock.set_content_visibility.assert_called()

@patch("kolibri.core.content.management.commands.importcontent.len")
@patch(
"kolibri.core.content.utils.transfer.Transfer.next",
side_effect=ConnectionError("connection error"),
)
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True, True],
side_effect=[False, True, True, True],
)
def test_remote_cancel_during_connect_error(
self,
is_cancelled_mock,
cancel_mock,
next_mock,
len_mock,
annotation_mock,
files_to_transfer_mock,
channel_list_status_mock,
Expand All @@ -467,7 +465,6 @@ def test_remote_cancel_during_connect_error(
node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"],
)
cancel_mock.assert_called_with()
len_mock.assert_not_called()
annotation_mock.set_content_visibility.assert_called()

@patch("kolibri.core.content.management.commands.importcontent.logger.warning")
Expand Down Expand Up @@ -527,7 +524,7 @@ def test_remote_import_httperror_404(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, False, True, True, True],
side_effect=[False, False, True, True, True, True],
)
@patch(
"kolibri.core.content.management.commands.importcontent.paths.get_content_storage_file_path",
Expand All @@ -552,7 +549,7 @@ def test_remote_import_httperror_502(
LocalFile.objects.filter(
files__contentnode__channel_id=self.the_channel_id
).update(file_size=1)
files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10)
files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10)
call_command("importcontent", "network", self.the_channel_id)

sleep_mock.assert_called_once()
Expand Down Expand Up @@ -587,7 +584,6 @@ def test_remote_import_httperror_500(
self.the_channel_id, [], node_ids=None, exclude_node_ids=None, public=False
)

@patch("kolibri.core.content.management.commands.importcontent.len")
@patch("kolibri.core.content.utils.transfer.sleep")
@patch(
"kolibri.core.content.utils.transfer.Transfer.next",
Expand All @@ -596,15 +592,14 @@ def test_remote_import_httperror_500(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, False, False, True, True, True],
side_effect=[False, False, False, True, True, True, True],
)
def test_remote_import_chunkedencodingerror(
self,
is_cancelled_mock,
cancel_mock,
error_mock,
sleep_mock,
len_mock,
annotation_mock,
files_to_transfer_mock,
channel_list_status_mock,
Expand All @@ -630,9 +625,7 @@ def test_remote_import_chunkedencodingerror(
self.the_channel_id,
node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"],
)
sleep_mock.assert_called_once()
cancel_mock.assert_called_with()
len_mock.assert_not_called()
annotation_mock.set_content_visibility.assert_called()

@patch("kolibri.core.content.management.commands.importcontent.logger.warning")
Expand All @@ -642,7 +635,7 @@ def test_remote_import_chunkedencodingerror(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, True, True],
side_effect=[False, True],
)
def test_local_import_oserror_dne(
self,
Expand All @@ -659,7 +652,7 @@ def test_local_import_oserror_dne(
LocalFile.objects.filter(
files__contentnode__channel_id=self.the_channel_id
).update(file_size=1)
files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10)
files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10)
call_command("importcontent", "disk", self.the_channel_id, "destination")
self.assertTrue("1 files are skipped" in logger_mock.call_args_list[0][0][0])
annotation_mock.set_content_visibility.assert_called()
Expand All @@ -681,7 +674,7 @@ def test_local_import_oserror_permission_denied(
dest_path = tempfile.mkstemp()[1]
path_mock.side_effect = [dest_path, "/test/dne"]
getsize_mock.side_effect = ["1", OSError("Permission denied")]
files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10)
files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10)
with self.assertRaises(OSError):
call_command("importcontent", "disk", self.the_channel_id, "destination")
self.assertTrue("Permission denied" in logger_mock.call_args_list[0][0][0])
Expand All @@ -698,7 +691,7 @@ def test_local_import_oserror_permission_denied(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False, False, True, True],
side_effect=[False, False, True, True, True],
)
def test_local_import_source_corrupted(
self,
Expand All @@ -718,9 +711,11 @@ def test_local_import_source_corrupted(
).update(file_size=1)
path_mock.side_effect = [local_dest_path, local_src_path]
files_to_transfer_mock.return_value = (
LocalFile.objects.filter(
files__contentnode="32a941fb77c2576e8f6b294cde4c3b0c"
),
[
LocalFile.objects.filter(
files__contentnode="32a941fb77c2576e8f6b294cde4c3b0c"
).first()
],
10,
)
call_command(
Expand All @@ -731,7 +726,7 @@ def test_local_import_source_corrupted(
node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"],
)
cancel_mock.assert_called_with()
remove_mock.assert_called_with(local_dest_path)
remove_mock.assert_any_call(local_dest_path)

@patch(
"kolibri.core.content.management.commands.importcontent.os.path.isfile",
Expand Down Expand Up @@ -908,7 +903,8 @@ def test_remote_import_full_import(
@patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel")
@patch(
"kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled",
side_effect=[False] * 32 + [True, True, True],
# We have to return False for 30 1-second checks to ensure we actually retry.
side_effect=[False] * 32 + [True] * 5,
)
def test_remote_import_file_compressed_on_gcs(
self,
Expand All @@ -930,7 +926,7 @@ def test_remote_import_file_compressed_on_gcs(
LocalFile.objects.filter(
files__contentnode__channel_id=self.the_channel_id
).update(file_size=1)
files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10)
files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10)

m = mock_open()
with patch("kolibri.core.content.utils.transfer.open", m) as open_mock:
Expand Down
3 changes: 3 additions & 0 deletions kolibri/core/content/utils/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(
# record whether the destination file already exists, so it can be checked, but don't error out
self.dest_exists = os.path.isfile(dest)

def start(self):
# open the destination file for writing
self.dest_file_obj = open(self.dest_tmp, "wb")

Expand Down Expand Up @@ -159,6 +160,7 @@ def __init__(self, *args, **kwargs):
super(FileDownload, self).__init__(*args, **kwargs)

def start(self):
super(FileDownload, self).start()
# initiate the download, check for status errors, and calculate download size
try:
self.response = self.session.get(
Expand Down Expand Up @@ -276,6 +278,7 @@ def start(self):
assert (
not self.started
), "File copy has already been started, and cannot be started again"
super(FileCopy, self).start()
self.total_size = os.path.getsize(self.source)
self.source_file_obj = open(self.source, "rb")
self.started = True
Expand Down
8 changes: 8 additions & 0 deletions kolibri/core/tasks/management/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class AsyncCommand(BaseCommand):

def __init__(self, *args, **kwargs):
self.progresstrackers = []

# The importcontent command stores an unhandled exception and re-raises it
# later. We check it in is_cancelled so that we cancel remaining tasks once
# an unhandled exception has occurred.
self.exception = None

super(AsyncCommand, self).__init__(*args, **kwargs)

def _update_all_progress(self, progress_fraction, progress):
Expand Down Expand Up @@ -129,6 +135,8 @@ def start_progress(self, total=100):
return tracker

def is_cancelled(self):
if self.exception is not None:
return True
try:
self.check_for_cancel()
return False
Expand Down

0 comments on commit ef27835

Please sign in to comment.