Skip to content

Commit

Permalink
Reserve file descriptors for tasks. Limit download workers to prevent…
Browse files Browse the repository at this point in the history
… file descriptor overload.
  • Loading branch information
rtibbles committed Mar 31, 2022
1 parent 6767113 commit a638ccd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
35 changes: 34 additions & 1 deletion kolibri/core/content/management/commands/importcontent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from kolibri.core.tasks.management.commands.base import AsyncCommand
from kolibri.core.tasks.utils import get_current_job
from kolibri.utils import conf
from kolibri.utils.options import FD_PER_THREAD
from kolibri.utils.system import get_fd_limit
from kolibri.utils.system import get_free_space

# constants to specify the transfer method to be used
Expand Down Expand Up @@ -300,7 +302,38 @@ def _transfer( # noqa: max-complexity=16
if method == DOWNLOAD_METHOD:
session = requests.Session()

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
executor = (
concurrent.futures.ProcessPoolExecutor
if conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"]
else concurrent.futures.ThreadPoolExecutor
)

max_workers = 10

if not conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"]:
# If we're not using multiprocessing for workers, we may need
# to limit the number of workers depending on the number of allowed
# file descriptors.
# This is a heuristic method, where we know there can be issues if
# the max number of file descriptors for a process is 256, and we use 10
# workers, with potentially 4 concurrent tasks downloading files.
# The number of concurrent tasks that might be downloading files is determined
# by the number of regular workers running in the task runner
# (although the high priority task queue could also be running a channel database download).
server_reserved_fd_count = (
FD_PER_THREAD * conf.OPTIONS["Server"]["CHERRYPY_THREAD_POOL"]
)
max_descriptors_per_download_task = (
get_fd_limit() - server_reserved_fd_count
) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"]
# Each download task only needs to have a maximum of two open file descriptors at once:
# The temporary download file that the file is streamed to initially, and then
# the actual destination file that it is moved to. To add tolerance, we divide
# the number of file descriptors that could be allocated to this task by four,
# which should give us leeway in case of unforeseen descriptor use during the process.
max_workers = min(1, max_descriptors_per_download_task // 4)

with executor(max_workers=max_workers) as executor:
batch_size = 100
# ThreadPoolExecutor allows us to download files concurrently,
# greatly reducing download time in most cases. However, loading
Expand Down
9 changes: 7 additions & 2 deletions kolibri/utils/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
)
)

# Reserve some file descriptors for file operations happening in asynchronous tasks
# when the server is running with threaded task runners.
MIN_RESERVED_FD = 64


def calculate_thread_pool():
"""
Expand Down Expand Up @@ -79,8 +83,9 @@ def calculate_thread_pool():
pool_size = MAX_POOL

# ensure (number of threads) x (open file descriptors) < (fd limit)
max_threads = get_fd_limit() // FD_PER_THREAD
return min(pool_size, max_threads)
max_threads = (get_fd_limit() - MIN_RESERVED_FD) // FD_PER_THREAD
# Ensure that the number of threads never goes below 1
return max(1, min(pool_size, max_threads))


ALL_LANGUAGES = "kolibri-all"
Expand Down

0 comments on commit a638ccd

Please sign in to comment.