diff --git a/kolibri/core/content/management/commands/importcontent.py b/kolibri/core/content/management/commands/importcontent.py index 23c4d230c92..c10e8ad2776 100644 --- a/kolibri/core/content/management/commands/importcontent.py +++ b/kolibri/core/content/management/commands/importcontent.py @@ -22,6 +22,8 @@ from kolibri.core.tasks.management.commands.base import AsyncCommand from kolibri.core.tasks.utils import get_current_job from kolibri.utils import conf +from kolibri.utils.options import FD_PER_THREAD +from kolibri.utils.system import get_fd_limit from kolibri.utils.system import get_free_space # constants to specify the transfer method to be used @@ -300,7 +302,38 @@ def _transfer( # noqa: max-complexity=16 if method == DOWNLOAD_METHOD: session = requests.Session() - with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + executor = ( + concurrent.futures.ProcessPoolExecutor + if conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"] + else concurrent.futures.ThreadPoolExecutor + ) + + max_workers = 10 + + if not conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"]: + # If we're not using multiprocessing for workers, we may need + # to limit the number of workers depending on the number of allowed + # file descriptors. + # This is a heuristic method, where we know there can be issues if + # the max number of file descriptors for a process is 256, and we use 10 + # workers, with potentially 4 concurrent tasks downloading files. + # The number of concurrent tasks that might be downloading files is determined + # by the number of regular workers running in the task runner + # (although the high priority task queue could also be running a channel database download). + server_reserved_fd_count = ( + FD_PER_THREAD * conf.OPTIONS["Server"]["CHERRYPY_THREAD_POOL"] + ) + max_descriptors_per_download_task = ( + get_fd_limit() - server_reserved_fd_count + ) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"] + # Each download task only needs to have a maximum of two open file descriptors at once: + # The temporary download file that the file is streamed to initially, and then + # the actual destination file that it is moved to. To add tolerance, we divide + # the number of file descriptors that could be allocated to this task by four, + # which should give us leeway in case of unforeseen descriptor use during the process. + max_workers = min(1, max_descriptors_per_download_task // 4) + + with executor(max_workers=max_workers) as executor: batch_size = 100 # ThreadPoolExecutor allows us to download files concurrently, # greatly reducing download time in most cases. However, loading diff --git a/kolibri/utils/options.py b/kolibri/utils/options.py index 9154c5fd21c..660778edb93 100644 --- a/kolibri/utils/options.py +++ b/kolibri/utils/options.py @@ -47,6 +47,10 @@ ) ) +# Reserve some file descriptors for file operations happening in asynchronous tasks +# when the server is running with threaded task runners. +MIN_RESERVED_FD = 64 + def calculate_thread_pool(): """ @@ -79,8 +83,9 @@ def calculate_thread_pool(): pool_size = MAX_POOL # ensure (number of threads) x (open file descriptors) < (fd limit) - max_threads = get_fd_limit() // FD_PER_THREAD - return min(pool_size, max_threads) + max_threads = (get_fd_limit() - MIN_RESERVED_FD) // FD_PER_THREAD + # Ensure that the number of threads never goes below 1 + return max(1, min(pool_size, max_threads)) ALL_LANGUAGES = "kolibri-all"