Skip to content

Commit

Permalink
Package input files (no autodownload, no multiprocessing DownloadMana…
Browse files Browse the repository at this point in the history
…ger) (#510)

* package input files and folders (backend)

* package input files and folders (frontend)

* remove "input_dir" from staging_paths dict

* ensure execution context matches the notebook directory

* update snapshots

* copy staging folder to output folder after job runs (SUCESS or FAILURE)

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* copy staging folder and side effects to output after job runs, track and redownload files

* remove staging to output copying logic from executor

* refactor output files creation logic into a separate function for clarity

* Fix job definition data model

* add packaged_files to JobDefinition and DescribeJobDefinition model

* fix existing pytests

* clarify FilesDirectoryLink title

* Dynamically display input folder in the checkbox text

* display packageInputFolder parameter as 'Files included'

* use helper text with input directory for 'include files' checkbox

* Update Playwright Snapshots

* add test side effects accountability test for execution manager

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Use "Run job with input folder" for packageInputFolder checkbox text

* Update Playwright Snapshots

* Use "Ran with input folder" in detail page

* Update src/components/input-folder-checkbox.tsx

Co-authored-by: Jason Weill <[email protected]>

* fix lint error

* Update Playwright Snapshots

* Update existing screenshots

* Update "Submit the Create Job" section mentioning “Run job with input folder” option

* Update docs/users/index.md

Co-authored-by: Jason Weill <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update src/components/input-folder-checkbox.tsx

Co-authored-by: Jason Weill <[email protected]>

* Update Playwright Snapshots

* Describe side effects behavior better

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Jason Weill <[email protected]>
  • Loading branch information
4 people authored Apr 26, 2024
1 parent 1531982 commit 4d7de94
Show file tree
Hide file tree
Showing 25 changed files with 396 additions and 31 deletions.
6 changes: 3 additions & 3 deletions docs/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ For more information on writing a custom implementation, please see the {doc}`de
### Example: Capturing side effect files

The default scheduler and execution manager classes do not capture
**side effect files**, files that are created as a side effect of executing
cells in a notebook. The `ArchivingScheduler` and `ArchivingExecutionManager`
classes do capture side effect files. If you intend to run notebooks that produce
**side effect files** (files that are created as a side effect of executing
cells in a notebook) unless “Run job with input folder” is checked. The `ArchivingScheduler` and `ArchivingExecutionManager`
classes do capture side effect files by default. If you intend to run notebooks that produce
side effect files, you can use these classes by running:

```
Expand Down
Binary file modified docs/users/images/create_job_form.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions docs/users/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ To create a _job_ or _job definition_ from an open notebook, click on a “Creat

Give your notebook job or job definition a name, choose an environment to run it in, select its output formats, and provide parameters that are set as local variables when your notebook gets executed. This parameterized execution is similar to the one used in [Papermill](https://papermill.readthedocs.io/en/latest/).

If you check "Run job with input folder", the scheduled job will have access to all files within the same folder as the input file.
The scheduler will copy all files from the input file to a staging directory at runtime, and will make these files and any side effect files created during the job run available for download after the job has finished.
Use caution with this option if your input file's directory has many large files in it.

To create a _job_ that runs once, select "Run now" in the "Schedule" section, and click "Create".
!["Create Job Form"](./images/create_job_form.png)

Expand Down
42 changes: 34 additions & 8 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,47 @@ def execute(self):
if job.parameters:
nb = add_parameters(nb, job.parameters)

staging_dir = os.path.dirname(self.staging_paths["input"])
ep = ExecutePreprocessor(
kernel_name=nb.metadata.kernelspec["name"],
store_widget_state=True,
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir
)

try:
ep.preprocess(nb)
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
except CellExecutionError as e:
raise e
finally:
for output_format in job.output_formats:
cls = nbconvert.get_exporter(output_format)
output, resources = cls().from_notebook_node(nb)
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
f.write(output)
self.add_side_effects_files(staging_dir)
self.create_output_files(job, nb)

def add_side_effects_files(self, staging_dir: str):
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
input_notebook = os.path.relpath(self.staging_paths["input"])
new_files_set = set()
for root, _, files in os.walk(staging_dir):
for file in files:
file_rel_path = os.path.relpath(os.path.join(root, file), staging_dir)
if file_rel_path != input_notebook:
new_files_set.add(file_rel_path)

if new_files_set:
with self.db_session() as session:
current_packaged_files_set = set(
session.query(Job.packaged_files).filter(Job.job_id == self.job_id).scalar()
or []
)
updated_packaged_files = list(current_packaged_files_set.union(new_files_set))
session.query(Job).filter(Job.job_id == self.job_id).update(
{"packaged_files": updated_packaged_files}
)
session.commit()

def create_output_files(self, job: DescribeJob, notebook_node):
for output_format in job.output_formats:
cls = nbconvert.get_exporter(output_format)
output, _ = cls().from_notebook_node(notebook_node)
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
f.write(output)

def supported_features(cls) -> Dict[JobFeature, bool]:
return {
Expand Down
13 changes: 11 additions & 2 deletions jupyter_scheduler/job_files_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
job = await ensure_async(self.scheduler.get_job(job_id, False))
staging_paths = await ensure_async(self.scheduler.get_staging_paths(job))
output_filenames = self.scheduler.get_job_filenames(job)
output_dir = self.scheduler.get_local_output_path(job)
output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True)

p = Process(
target=Downloader(
Expand All @@ -30,6 +30,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
staging_paths=staging_paths,
output_dir=output_dir,
redownload=redownload,
include_staging_files=job.package_input_folder,
).download
)
p.start()
Expand All @@ -43,22 +44,30 @@ def __init__(
staging_paths: Dict[str, str],
output_dir: str,
redownload: bool,
include_staging_files: bool = False,
):
self.output_formats = output_formats
self.output_filenames = output_filenames
self.staging_paths = staging_paths
self.output_dir = output_dir
self.redownload = redownload
self.include_staging_files = include_staging_files

def generate_filepaths(self):
"""A generator that produces filepaths"""
output_formats = self.output_formats + ["input"]

for output_format in output_formats:
input_filepath = self.staging_paths[output_format]
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
if not os.path.exists(output_filepath) or self.redownload:
yield input_filepath, output_filepath
if self.include_staging_files:
staging_dir = os.path.dirname(self.staging_paths["input"])
for file_relative_path in self.output_filenames["files"]:
input_filepath = os.path.join(staging_dir, file_relative_path)
output_filepath = os.path.join(self.output_dir, file_relative_path)
if not os.path.exists(output_filepath) or self.redownload:
yield input_filepath, output_filepath

def download_tar(self, archive_format: str = "tar"):
archive_filepath = self.staging_paths[archive_format]
Expand Down
6 changes: 6 additions & 0 deletions jupyter_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class CreateJob(BaseModel):
name: str
output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE
compute_type: Optional[str] = None
package_input_folder: Optional[bool] = None

@root_validator
def compute_input_filename(cls, values) -> Dict:
Expand Down Expand Up @@ -145,6 +146,8 @@ class DescribeJob(BaseModel):
status: Status = Status.CREATED
status_message: Optional[str] = None
downloaded: bool = False
package_input_folder: Optional[bool] = None
packaged_files: Optional[List[str]] = []

class Config:
orm_mode = True
Expand Down Expand Up @@ -209,6 +212,7 @@ class CreateJobDefinition(BaseModel):
compute_type: Optional[str] = None
schedule: Optional[str] = None
timezone: Optional[str] = None
package_input_folder: Optional[bool] = None

@root_validator
def compute_input_filename(cls, values) -> Dict:
Expand All @@ -234,6 +238,8 @@ class DescribeJobDefinition(BaseModel):
create_time: int
update_time: int
active: bool
package_input_folder: Optional[bool] = None
packaged_files: Optional[List[str]] = []

class Config:
orm_mode = True
Expand Down
2 changes: 2 additions & 0 deletions jupyter_scheduler/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class CommonColumns:
output_filename_template = Column(String(256))
update_time = Column(Integer, default=get_utc_timestamp, onupdate=get_utc_timestamp)
create_time = Column(Integer, default=get_utc_timestamp)
package_input_folder = Column(Boolean)
packaged_files = Column(JsonType, default=[])


class Job(CommonColumns, Base):
Expand Down
107 changes: 96 additions & 11 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import random
import shutil
from typing import Dict, Optional, Type, Union
from typing import Dict, List, Optional, Type, Union

import fsspec
import psutil
Expand Down Expand Up @@ -39,7 +39,11 @@
UpdateJobDefinition,
)
from jupyter_scheduler.orm import Job, JobDefinition, create_session
from jupyter_scheduler.utils import create_output_directory, create_output_filename
from jupyter_scheduler.utils import (
copy_directory,
create_output_directory,
create_output_filename,
)


class BaseScheduler(LoggingConfigurable):
Expand Down Expand Up @@ -248,7 +252,29 @@ def file_exists(self, path: str):
else:
return os.path.isfile(os_path)

def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
def dir_exists(self, path: str):
"""Returns True if the directory exists, else returns False.
API-style wrapper for os.path.isdir
Parameters
----------
path : string
The relative path to the directory (with '/' as separator)
Returns
-------
exists : bool
Whether the directory exists.
"""
root = os.path.abspath(self.root_dir)
os_path = to_os_path(path, root)
if not (os.path.abspath(os_path) + os.path.sep).startswith(root):
return False
else:
return os.path.isdir(os_path)

def get_job_filenames(self, model: DescribeJob) -> Dict[str, Union[str, List[str]]]:
"""Returns dictionary mapping output formats to
the job filenames in the JupyterLab workspace.
Expand All @@ -265,7 +291,8 @@ def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
{
'ipynb': 'helloworld-2022-10-10.ipynb',
'html': 'helloworld-2022-10-10.html',
'input': 'helloworld.ipynb'
'input': 'helloworld.ipynb',
'files': ['data/helloworld.csv', 'images/helloworld.png']
}
"""
Expand All @@ -278,6 +305,9 @@ def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:

filenames["input"] = model.input_filename

if model.package_input_folder and model.packaged_files:
filenames["files"] = [relative_path for relative_path in model.packaged_files]

return filenames

def add_job_files(self, model: DescribeJob):
Expand All @@ -289,7 +319,8 @@ def add_job_files(self, model: DescribeJob):
mapping = self.environments_manager.output_formats_mapping()
job_files = []
output_filenames = self.get_job_filenames(model)
output_dir = os.path.relpath(self.get_local_output_path(model), self.root_dir)
output_dir = self.get_local_output_path(model, root_dir_relative=True)

for output_format in model.output_formats:
filename = output_filenames[output_format]
output_path = os.path.join(output_dir, filename)
Expand All @@ -313,16 +344,42 @@ def add_job_files(self, model: DescribeJob):
)
)

# Add link to output folder with packaged input files and side effects
if model.package_input_folder and model.packaged_files:
job_files.append(
JobFile(
display_name="Files",
file_format="files",
file_path=output_dir if self.dir_exists(output_dir) else None,
)
)

model.job_files = job_files
model.downloaded = all(job_file.file_path for job_file in job_files)

def get_local_output_path(self, model: DescribeJob) -> str:
packaged_files = []
if model.package_input_folder and model.packaged_files:
packaged_files = [
os.path.join(output_dir, packaged_file_rel_path)
for packaged_file_rel_path in model.packaged_files
]
model.downloaded = all(job_file.file_path for job_file in job_files) and all(
self.file_exists(file_path) for file_path in packaged_files
)

def get_local_output_path(
self, model: DescribeJob, root_dir_relative: Optional[bool] = False
) -> str:
"""Returns the local output directory path
where all the job files will be downloaded
from the staging location.
"""
output_dir_name = create_output_directory(model.input_filename, model.job_id)
return os.path.join(self.root_dir, self.output_directory, output_dir_name)
if root_dir_relative:
return os.path.relpath(
os.path.join(self.root_dir, self.output_directory, output_dir_name), self.root_dir
)
else:
return os.path.join(self.root_dir, self.output_directory, output_dir_name)


class Scheduler(BaseScheduler):
Expand Down Expand Up @@ -371,6 +428,15 @@ def copy_input_file(self, input_uri: str, copy_to_path: str):
with fsspec.open(copy_to_path, "wb") as output_file:
output_file.write(input_file.read())

def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri))
staging_dir = os.path.dirname(nb_copy_to_path)
return copy_directory(
source_dir=input_dir_path,
destination_dir=staging_dir,
)

def create_job(self, model: CreateJob) -> str:
if not model.job_definition_id and not self.file_exists(model.input_uri):
raise InputUriError(model.input_uri)
Expand All @@ -397,11 +463,20 @@ def create_job(self, model: CreateJob) -> str:
model.output_formats = []

job = Job(**model.dict(exclude_none=True, exclude={"input_uri"}))

session.add(job)
session.commit()

staging_paths = self.get_staging_paths(DescribeJob.from_orm(job))
self.copy_input_file(model.input_uri, staging_paths["input"])
if model.package_input_folder:
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
input_notebook_filename = os.path.basename(model.input_uri)
job.packaged_files = [
file for file in copied_files if file != input_notebook_filename
]
session.commit()
else:
self.copy_input_file(model.input_uri, staging_paths["input"])

# The MP context forces new processes to not be forked on Linux.
# This is necessary because `asyncio.get_event_loop()` is bugged in
Expand Down Expand Up @@ -538,12 +613,22 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
session.add(job_definition)
session.commit()

# copy values for use after session is closed to avoid DetachedInstanceError
job_definition_id = job_definition.job_definition_id
job_definition_schedule = job_definition.schedule

staging_paths = self.get_staging_paths(DescribeJobDefinition.from_orm(job_definition))
self.copy_input_file(model.input_uri, staging_paths["input"])
if model.package_input_folder:
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
input_notebook_filename = os.path.basename(model.input_uri)
job_definition.packaged_files = [
file for file in copied_files if file != input_notebook_filename
]
session.commit()
else:
self.copy_input_file(model.input_uri, staging_paths["input"])

if self.task_runner and job_definition.schedule:
if self.task_runner and job_definition_schedule:
self.task_runner.add_job_definition(job_definition_id)

return job_definition_id
Expand Down
Loading

0 comments on commit 4d7de94

Please sign in to comment.