diff --git a/kolibri/core/content/management/commands/importcontent.py b/kolibri/core/content/management/commands/importcontent.py index b681a4db5ef..9e2271b7301 100644 --- a/kolibri/core/content/management/commands/importcontent.py +++ b/kolibri/core/content/management/commands/importcontent.py @@ -1,6 +1,7 @@ import logging import os +import concurrent.futures import requests from django.core.management.base import CommandError from le_utils.constants import content_kinds @@ -285,11 +286,10 @@ def _transfer( # noqa: max-complexity=16 with self.start_progress( total=total_bytes_to_transfer + dummy_bytes_for_annotation ) as overall_progress_update: - exception = None # Exception that is not caught by the retry logic - if method == DOWNLOAD_METHOD: session = requests.Session() + file_transfers = [] for f in files_to_download: if self.is_cancelled(): @@ -318,6 +318,7 @@ def _transfer( # noqa: max-complexity=16 filetransfer = transfer.FileDownload( url, dest, session=session, cancel_check=self.is_cancelled ) + file_transfers.append((f, filetransfer)) elif method == COPY_METHOD: try: srcpath = paths.get_content_storage_file_path( @@ -330,37 +331,59 @@ def _transfer( # noqa: max-complexity=16 filetransfer = transfer.FileCopy( srcpath, dest, cancel_check=self.is_cancelled ) - - try: - status = self._start_file_transfer( - f, filetransfer, overall_progress_update - ) - - if self.is_cancelled(): - break - - if status == FILE_SKIPPED: - number_of_skipped_files += 1 - else: - file_checksums_to_annotate.append(f.id) - transferred_file_size += f.file_size - except transfer.TransferCanceled: - break - except Exception as e: - logger.error( - "An error occurred during content import: {}".format(e) - ) - if ( - isinstance(e, requests.exceptions.HTTPError) - and e.response.status_code == 404 - ) or (isinstance(e, OSError) and e.errno == 2): - # Continue file import when the current file is not found from the source and is skipped. - overall_progress_update(f.file_size) - number_of_skipped_files += 1 - continue - else: - exception = e - break + file_transfers.append((f, filetransfer)) + + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + batch_size = 100 + # ThreadPoolExecutor allows us to download files concurrently, + # greatly reducing download time in most cases. However, loading + # all the downloads into the pool requires considerable memory, + # so we divide the downloads into batches to keep memory usage down. + # In batches of 100, total RAM usage doesn't exceed 250MB in testing. + while len(file_transfers) > 0: + future_file_transfers = {} + for i in range(batch_size): + if len(file_transfers) > 0: + f, filetransfer = file_transfers.pop() + future = executor.submit( + self._start_file_transfer, + f, + filetransfer, + overall_progress_update, + ) + future_file_transfers[future] = (f, filetransfer) + + for future in concurrent.futures.as_completed( + future_file_transfers + ): + f, filetransfer = future_file_transfers[future] + try: + status = future.result() + if self.is_cancelled(): + break + + if status == FILE_SKIPPED: + number_of_skipped_files += 1 + else: + file_checksums_to_annotate.append(f.id) + transferred_file_size += f.file_size + except transfer.TransferCanceled: + break + except Exception as e: + logger.error( + "An error occurred during content import: {}".format(e) + ) + if ( + isinstance(e, requests.exceptions.HTTPError) + and e.response.status_code == 404 + ) or (isinstance(e, OSError) and e.errno == 2): + # Continue file import when the current file is not found from the source and is skipped. + overall_progress_update(f.file_size) + number_of_skipped_files += 1 + continue + else: + self.exception = e + break with db_task_write_lock: annotation.set_content_visibility( @@ -395,8 +418,8 @@ def _transfer( # noqa: max-complexity=16 overall_progress_update(dummy_bytes_for_annotation) - if exception: - raise exception + if self.exception: + raise self.exception if self.is_cancelled(): self.cancel() @@ -408,13 +431,10 @@ def _start_file_transfer(self, f, filetransfer, overall_progress_update): * FILE_TRANSFERRED - successfully transfer the file. * FILE_SKIPPED - the file does not exist so it is skipped. """ - with filetransfer, self.start_progress( - total=filetransfer.total_size - ) as file_dl_progress_update: + with filetransfer: for chunk in filetransfer: length = len(chunk) overall_progress_update(length) - file_dl_progress_update(length) # Ensure that if for some reason the total file size for the transfer # is less than what we have marked in the database that we make up diff --git a/kolibri/core/content/test/test_import_export.py b/kolibri/core/content/test/test_import_export.py index dafb2de4b24..b4e45d67015 100644 --- a/kolibri/core/content/test/test_import_export.py +++ b/kolibri/core/content/test/test_import_export.py @@ -283,7 +283,7 @@ def test_remote_cancel_immediately( @patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel") @patch( "kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled", - side_effect=[False, True], + side_effect=[False, False, False, True, True, True], ) def test_remote_cancel_during_transfer( self, @@ -399,7 +399,7 @@ def test_local_cancel_immediately( @patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel") @patch( "kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled", - side_effect=[False, True], + side_effect=[False, True, True], ) def test_local_cancel_during_transfer( self, @@ -425,7 +425,6 @@ def test_local_cancel_during_transfer( cancel_mock.assert_called_with() annotation_mock.set_content_visibility.assert_called() - @patch("kolibri.core.content.management.commands.importcontent.len") @patch( "kolibri.core.content.utils.transfer.Transfer.next", side_effect=ConnectionError("connection error"), @@ -433,14 +432,13 @@ def test_local_cancel_during_transfer( @patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel") @patch( "kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled", - side_effect=[False, True, True], + side_effect=[False, True, True, True], ) def test_remote_cancel_during_connect_error( self, is_cancelled_mock, cancel_mock, next_mock, - len_mock, annotation_mock, files_to_transfer_mock, channel_list_status_mock, @@ -467,7 +465,6 @@ def test_remote_cancel_during_connect_error( node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"], ) cancel_mock.assert_called_with() - len_mock.assert_not_called() annotation_mock.set_content_visibility.assert_called() @patch("kolibri.core.content.management.commands.importcontent.logger.warning") @@ -527,7 +524,7 @@ def test_remote_import_httperror_404( @patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel") @patch( "kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled", - side_effect=[False, False, True, True, True], + side_effect=[False, False, True, True, True, True], ) @patch( "kolibri.core.content.management.commands.importcontent.paths.get_content_storage_file_path", @@ -552,7 +549,7 @@ def test_remote_import_httperror_502( LocalFile.objects.filter( files__contentnode__channel_id=self.the_channel_id ).update(file_size=1) - files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10) + files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10) call_command("importcontent", "network", self.the_channel_id) sleep_mock.assert_called_once() @@ -587,7 +584,6 @@ def test_remote_import_httperror_500( self.the_channel_id, [], node_ids=None, exclude_node_ids=None, public=False ) - @patch("kolibri.core.content.management.commands.importcontent.len") @patch("kolibri.core.content.utils.transfer.sleep") @patch( "kolibri.core.content.utils.transfer.Transfer.next", @@ -596,7 +592,7 @@ def test_remote_import_httperror_500( @patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel") @patch( "kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled", - side_effect=[False, False, False, True, True, True], + side_effect=[False, False, False, True, True, True, True], ) def test_remote_import_chunkedencodingerror( self, @@ -604,7 +600,6 @@ def test_remote_import_chunkedencodingerror( cancel_mock, error_mock, sleep_mock, - len_mock, annotation_mock, files_to_transfer_mock, channel_list_status_mock, @@ -630,9 +625,7 @@ def test_remote_import_chunkedencodingerror( self.the_channel_id, node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"], ) - sleep_mock.assert_called_once() cancel_mock.assert_called_with() - len_mock.assert_not_called() annotation_mock.set_content_visibility.assert_called() @patch("kolibri.core.content.management.commands.importcontent.logger.warning") @@ -642,7 +635,7 @@ def test_remote_import_chunkedencodingerror( @patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel") @patch( "kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled", - side_effect=[False, True, True], + side_effect=[False, True], ) def test_local_import_oserror_dne( self, @@ -659,7 +652,7 @@ def test_local_import_oserror_dne( LocalFile.objects.filter( files__contentnode__channel_id=self.the_channel_id ).update(file_size=1) - files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10) + files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10) call_command("importcontent", "disk", self.the_channel_id, "destination") self.assertTrue("1 files are skipped" in logger_mock.call_args_list[0][0][0]) annotation_mock.set_content_visibility.assert_called() @@ -681,7 +674,7 @@ def test_local_import_oserror_permission_denied( dest_path = tempfile.mkstemp()[1] path_mock.side_effect = [dest_path, "/test/dne"] getsize_mock.side_effect = ["1", OSError("Permission denied")] - files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10) + files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10) with self.assertRaises(OSError): call_command("importcontent", "disk", self.the_channel_id, "destination") self.assertTrue("Permission denied" in logger_mock.call_args_list[0][0][0]) @@ -698,7 +691,7 @@ def test_local_import_oserror_permission_denied( @patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel") @patch( "kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled", - side_effect=[False, False, True, True], + side_effect=[False, False, True, True, True], ) def test_local_import_source_corrupted( self, @@ -718,9 +711,11 @@ def test_local_import_source_corrupted( ).update(file_size=1) path_mock.side_effect = [local_dest_path, local_src_path] files_to_transfer_mock.return_value = ( - LocalFile.objects.filter( - files__contentnode="32a941fb77c2576e8f6b294cde4c3b0c" - ), + [ + LocalFile.objects.filter( + files__contentnode="32a941fb77c2576e8f6b294cde4c3b0c" + ).first() + ], 10, ) call_command( @@ -731,7 +726,7 @@ def test_local_import_source_corrupted( node_ids=["32a941fb77c2576e8f6b294cde4c3b0c"], ) cancel_mock.assert_called_with() - remove_mock.assert_called_with(local_dest_path) + remove_mock.assert_any_call(local_dest_path) @patch( "kolibri.core.content.management.commands.importcontent.os.path.isfile", @@ -908,7 +903,8 @@ def test_remote_import_full_import( @patch("kolibri.core.content.management.commands.importcontent.AsyncCommand.cancel") @patch( "kolibri.core.content.management.commands.importcontent.AsyncCommand.is_cancelled", - side_effect=[False] * 32 + [True, True, True], + # We have to return False for 30 1-second checks to ensure we actually retry. + side_effect=[False] * 32 + [True] * 5, ) def test_remote_import_file_compressed_on_gcs( self, @@ -930,7 +926,7 @@ def test_remote_import_file_compressed_on_gcs( LocalFile.objects.filter( files__contentnode__channel_id=self.the_channel_id ).update(file_size=1) - files_to_transfer_mock.return_value = (LocalFile.objects.all(), 10) + files_to_transfer_mock.return_value = ([LocalFile.objects.first()], 10) m = mock_open() with patch("kolibri.core.content.utils.transfer.open", m) as open_mock: diff --git a/kolibri/core/content/utils/transfer.py b/kolibri/core/content/utils/transfer.py index 71d3a2afa5f..c6525083e8b 100644 --- a/kolibri/core/content/utils/transfer.py +++ b/kolibri/core/content/utils/transfer.py @@ -82,6 +82,7 @@ def __init__( # record whether the destination file already exists, so it can be checked, but don't error out self.dest_exists = os.path.isfile(dest) + def start(self): # open the destination file for writing self.dest_file_obj = open(self.dest_tmp, "wb") @@ -159,6 +160,7 @@ def __init__(self, *args, **kwargs): super(FileDownload, self).__init__(*args, **kwargs) def start(self): + super(FileDownload, self).start() # initiate the download, check for status errors, and calculate download size try: self.response = self.session.get( @@ -276,6 +278,7 @@ def start(self): assert ( not self.started ), "File copy has already been started, and cannot be started again" + super(FileCopy, self).start() self.total_size = os.path.getsize(self.source) self.source_file_obj = open(self.source, "rb") self.started = True diff --git a/kolibri/core/tasks/management/commands/base.py b/kolibri/core/tasks/management/commands/base.py index 5a3feee7a31..8b8ba9ce0e1 100644 --- a/kolibri/core/tasks/management/commands/base.py +++ b/kolibri/core/tasks/management/commands/base.py @@ -97,6 +97,12 @@ class AsyncCommand(BaseCommand): def __init__(self, *args, **kwargs): self.progresstrackers = [] + + # The importcontent command stores an unhandled exception and re-raises it + # later. We check it in is_cancelled so that we cancel remaining tasks once + # an unhandled exception has occurred. + self.exception = None + super(AsyncCommand, self).__init__(*args, **kwargs) def _update_all_progress(self, progress_fraction, progress): @@ -129,6 +135,8 @@ def start_progress(self, total=100): return tracker def is_cancelled(self): + if self.exception is not None: + return True try: self.check_for_cancel() return False