Skip to content

Commit

Permalink
use dask futures for files download instead of multiprocessing module
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed Jun 21, 2024
1 parent dd38b8c commit 66c89c1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
4 changes: 3 additions & 1 deletion jupyter_scheduler/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def initialize_settings(self):
dask_client_future=dask_client_future,
)

job_files_manager = self.job_files_manager_class(scheduler=scheduler)
job_files_manager = self.job_files_manager_class(
scheduler=scheduler, dask_client_future=dask_client_future
)

self.settings.update(
environments_manager=environments_manager,
Expand Down
2 changes: 1 addition & 1 deletion jupyter_scheduler/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def job_files_manager(self):
if not self._job_files_manager:
self._job_files_manager = self.settings.get("job_files_manager", None)

return self._job_files_manager
return self._job_files_managerdela

@authenticated
async def get(self, job_id):
Expand Down
17 changes: 11 additions & 6 deletions jupyter_scheduler/job_files_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import random
import tarfile
from multiprocessing import Process
from typing import Dict, List, Optional, Type
from typing import Awaitable, Dict, List, Optional, Type

import fsspec
from dask.distributed import Client as DaskClient
from jupyter_server.utils import ensure_async

from jupyter_scheduler.exceptions import SchedulerError
Expand All @@ -14,17 +14,23 @@
class JobFilesManager:
scheduler = None

def __init__(self, scheduler: Type[BaseScheduler]):
def __init__(
self,
scheduler: Type[BaseScheduler],
dask_client_future: Awaitable[DaskClient],
):
self.scheduler = scheduler
self.dask_client_future = dask_client_future

async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = False):
job = await ensure_async(self.scheduler.get_job(job_id, False))
staging_paths = await ensure_async(self.scheduler.get_staging_paths(job))
output_filenames = self.scheduler.get_job_filenames(job)
output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True)

p = Process(
target=Downloader(
dask_client: DaskClient = await self.dask_client_future
dask_client.submit(
Downloader(
output_formats=job.output_formats,
output_filenames=output_filenames,
staging_paths=staging_paths,
Expand All @@ -33,7 +39,6 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
include_staging_files=job.package_input_folder,
).download
)
p.start()


class Downloader:
Expand Down

0 comments on commit 66c89c1

Please sign in to comment.