Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent import file download #7521

Merged
merged 8 commits into from
Oct 19, 2020
69 changes: 39 additions & 30 deletions kolibri/core/content/management/commands/importcontent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import logging
import os

Expand Down Expand Up @@ -285,11 +286,10 @@ def _transfer( # noqa: max-complexity=16
with self.start_progress(
total=total_bytes_to_transfer + dummy_bytes_for_annotation
) as overall_progress_update:
exception = None # Exception that is not caught by the retry logic

if method == DOWNLOAD_METHOD:
session = requests.Session()

file_transfers = []
rtibbles marked this conversation as resolved.
Show resolved Hide resolved
for f in files_to_download:

if self.is_cancelled():
Expand Down Expand Up @@ -318,6 +318,7 @@ def _transfer( # noqa: max-complexity=16
filetransfer = transfer.FileDownload(
url, dest, session=session, cancel_check=self.is_cancelled
)
file_transfers.append((f, filetransfer))
elif method == COPY_METHOD:
try:
srcpath = paths.get_content_storage_file_path(
Expand All @@ -330,37 +331,45 @@ def _transfer( # noqa: max-complexity=16
filetransfer = transfer.FileCopy(
srcpath, dest, cancel_check=self.is_cancelled
)
file_transfers.append((f, filetransfer))

try:
status = self._start_file_transfer(
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
rtibbles marked this conversation as resolved.
Show resolved Hide resolved
future_file_transfers = {}
for f, filetransfer in file_transfers:
future = executor.submit(self._start_file_transfer,
kollivier marked this conversation as resolved.
Show resolved Hide resolved
f, filetransfer, overall_progress_update
)
future_file_transfers[future] = (f, filetransfer)

if self.is_cancelled():
break

if status == FILE_SKIPPED:
number_of_skipped_files += 1
else:
file_checksums_to_annotate.append(f.id)
transferred_file_size += f.file_size
except transfer.TransferCanceled:
break
except Exception as e:
logger.error(
"An error occurred during content import: {}".format(e)
)
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 404
) or (isinstance(e, OSError) and e.errno == 2):
# Continue file import when the current file is not found from the source and is skipped.
overall_progress_update(f.file_size)
number_of_skipped_files += 1
continue
else:
exception = e
for future in concurrent.futures.as_completed(future_file_transfers):
f, filetransfer = future_file_transfers[future]
try:
status = future.result()
if self.is_cancelled():
break

if status == FILE_SKIPPED:
number_of_skipped_files += 1
else:
file_checksums_to_annotate.append(f.id)
transferred_file_size += f.file_size
except transfer.TransferCanceled:
break
except Exception as e:
logger.error(
"An error occurred during content import: {}".format(e)
)
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 404
) or (isinstance(e, OSError) and e.errno == 2):
# Continue file import when the current file is not found from the source and is skipped.
overall_progress_update(f.file_size)
number_of_skipped_files += 1
continue
else:
self.exception = e
break

with db_task_write_lock:
annotation.set_content_visibility(
Expand Down Expand Up @@ -395,8 +404,8 @@ def _transfer( # noqa: max-complexity=16

overall_progress_update(dummy_bytes_for_annotation)

if exception:
raise exception
if self.exception:
raise self.exception

if self.is_cancelled():
self.cancel()
Expand Down
8 changes: 8 additions & 0 deletions kolibri/core/tasks/management/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class AsyncCommand(BaseCommand):

def __init__(self, *args, **kwargs):
self.progresstrackers = []

# The importcontent command stores an unhandled exception and re-raises it
# later. We check it in is_cancelled so that we cancel remaining tasks once
# an unhandled exception has occurred.
self.exception = None

super(AsyncCommand, self).__init__(*args, **kwargs)

def _update_all_progress(self, progress_fraction, progress):
Expand Down Expand Up @@ -129,6 +135,8 @@ def start_progress(self, total=100):
return tracker

def is_cancelled(self):
if self.exception is not None:
return True
try:
self.check_for_cancel()
return False
Expand Down