Skip to content

Commit

Permalink
Copy user files that were imported by workflow in pyflyte run (flyteo…
Browse files Browse the repository at this point in the history
…rg#2663)

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Mecoli1219 <michaellai901026@gmail.com>
  • Loading branch information
thomasjpfan authored and Mecoli1219 committed Aug 14, 2024
1 parent cbe052b commit 4f86504
Showing 4 changed files with 246 additions and 69 deletions.
5 changes: 3 additions & 2 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@
from flytekit.remote import FlyteLaunchPlan, FlyteRemote, FlyteTask, FlyteWorkflow, remote_fs
from flytekit.remote.executions import FlyteWorkflowExecution
from flytekit.tools import module_loader
from flytekit.tools.script_mode import _find_project_root, compress_scripts
from flytekit.tools.script_mode import _find_project_root, compress_scripts, get_all_modules
from flytekit.tools.translator import Options


@@ -493,7 +493,8 @@ def _update_flyte_context(params: RunLevelParams) -> FlyteContext.Builder:
if output_prefix and ctx.file_access.is_remote(output_prefix):
with tempfile.TemporaryDirectory() as tmp_dir:
archive_fname = pathlib.Path(os.path.join(tmp_dir, "script_mode.tar.gz"))
compress_scripts(params.computed_params.project_root, str(archive_fname), params.computed_params.module)
modules = get_all_modules(params.computed_params.project_root, params.computed_params.module)
compress_scripts(params.computed_params.project_root, str(archive_fname), modules)
remote_dir = file_access.get_random_remote_directory()
remote_archive_fname = f"{remote_dir}/script_mode.tar.gz"
file_access.put_data(str(archive_fname), remote_archive_fname)
4 changes: 2 additions & 2 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@
from flytekit.remote.remote_fs import get_flyte_fs
from flytekit.tools.fast_registration import FastPackageOptions, fast_package
from flytekit.tools.interactive import ipython_check
from flytekit.tools.script_mode import _find_project_root, compress_scripts, hash_file
from flytekit.tools.script_mode import _find_project_root, compress_scripts, get_all_modules, hash_file
from flytekit.tools.translator import (
FlyteControlPlaneEntity,
FlyteLocalEntity,
@@ -1049,7 +1049,7 @@ def register_script(
)
else:
archive_fname = pathlib.Path(os.path.join(tmp_dir, "script_mode.tar.gz"))
compress_scripts(source_path, str(archive_fname), module_name)
compress_scripts(source_path, str(archive_fname), get_all_modules(source_path, module_name))
md5_bytes, upload_native_url = self.upload_file(
archive_fname, project or self.default_project, domain or self.default_domain
)
159 changes: 98 additions & 61 deletions flytekit/tools/script_mode.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import gzip
import hashlib
import importlib
import os
import shutil
import site
import sys
import tarfile
import tempfile
import typing
from pathlib import Path
from types import ModuleType
from typing import List, Optional

from flytekit import PythonFunctionTask
from flytekit.core.tracker import get_full_module_path
from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase


def compress_scripts(source_path: str, destination: str, module_name: str):
def compress_scripts(source_path: str, destination: str, modules: List[ModuleType]):
"""
Compresses the single script while maintaining the folder structure for that file.
@@ -25,27 +24,28 @@ def compress_scripts(source_path: str, destination: str, module_name: str):
│   ├── example.py
│   ├── another_example.py
│   ├── yet_another_example.py
│   ├── unused_example.py
│   └── __init__.py
Let's say you want to compress `example.py`. In that case we specify the the full module name as
flyte.workflows.example and that will produce a tar file that contains only that file alongside
with the folder structure, i.e.:
Let's say you want to compress `example.py` imports `another_example.py`. And `another_example.py`
imports on `yet_another_example.py`. This will produce a tar file that contains only that
file alongside with the folder structure, i.e.:
.
├── flyte
│   ├── __init__.py
│   └── workflows
│   ├── example.py
│   ├── another_example.py
│   ├── yet_another_example.py
│   └── __init__.py
Note: If `example.py` didn't import tasks or workflows from `another_example.py` and `yet_another_example.py`, these files were not copied to the destination..
"""
with tempfile.TemporaryDirectory() as tmp_dir:
destination_path = os.path.join(tmp_dir, "code")
os.mkdir(destination_path)
add_imported_modules_from_source(source_path, destination_path, modules)

visited: typing.List[str] = []
copy_module_to_destination(source_path, destination_path, module_name, visited)
tar_path = os.path.join(tmp_dir, "tmp.tar")
with tarfile.open(tar_path, "w") as tar:
tmp_path: str = os.path.join(tmp_dir, "code")
@@ -57,54 +57,6 @@ def compress_scripts(source_path: str, destination: str, module_name: str):
gzipped.write(tar_file.read())


def copy_module_to_destination(
original_source_path: str, original_destination_path: str, module_name: str, visited: typing.List[str]
):
"""
Copy the module (file) to the destination directory. If the module relative imports other modules, flytekit will
recursively copy them as well.
"""
mod = importlib.import_module(module_name)
full_module_name = get_full_module_path(mod, mod.__name__)
if full_module_name in visited:
return
visited.append(full_module_name)

source_path = original_source_path
destination_path = original_destination_path
pkgs = full_module_name.split(".")

for p in pkgs[:-1]:
os.makedirs(os.path.join(destination_path, p), exist_ok=True)
destination_path = os.path.join(destination_path, p)
source_path = os.path.join(source_path, p)
init_file = Path(os.path.join(source_path, "__init__.py"))
if init_file.exists():
shutil.copy(init_file, Path(os.path.join(destination_path, "__init__.py")))

# Ensure destination path exists to cover the case of a single file and no modules.
os.makedirs(destination_path, exist_ok=True)
script_file = Path(source_path, f"{pkgs[-1]}.py")
script_file_destination = Path(destination_path, f"{pkgs[-1]}.py")
# Build the final script relative path and copy it to a known place.
shutil.copy(
script_file,
script_file_destination,
)

# Try to copy other files to destination if tasks or workflows aren't in the same file
for flyte_entity_name in mod.__dict__:
flyte_entity = mod.__dict__[flyte_entity_name]
if (
isinstance(flyte_entity, (PythonFunctionTask, WorkflowBase))
and not isinstance(flyte_entity, ImperativeWorkflow)
and flyte_entity.instantiated_in
):
copy_module_to_destination(
original_source_path, original_destination_path, flyte_entity.instantiated_in, visited
)


# Takes in a TarInfo and returns the modified TarInfo:
# https://docs.python.org/3/library/tarfile.html#tarinfo-objects
# intended to be passed as a filter to tarfile.add
@@ -127,6 +79,91 @@ def tar_strip_file_attributes(tar_info: tarfile.TarInfo) -> tarfile.TarInfo:
return tar_info


def add_imported_modules_from_source(source_path: str, destination: str, modules: List[ModuleType]):
"""Copies modules into destination that are in modules. The module files are copied only if:
1. Not a site-packages. These are installed packages and not user files.
2. Not in the bin. These are also installed and not user files.
3. Does not share a common path with the source_path.
"""

site_packages = site.getsitepackages()
site_packages_set = set(site_packages)
bin_directory = os.path.dirname(sys.executable)

for mod in modules:
try:
mod_file = mod.__file__
except AttributeError:
continue

if mod_file is None:
continue

# Check to see if mod_file is in site_packages or bin_directory, which are
# installed packages & libraries that are not user files. This happens when
# there is a virtualenv like `.venv` in the working directory.
try:
if os.path.commonpath(site_packages + [mod_file]) in site_packages_set:
# Do not upload files from site-packages
continue

if os.path.commonpath([bin_directory, mod_file]) == bin_directory:
# Do not upload from the bin directory
continue

except ValueError:
# ValueError is raised by windows if the paths are not from the same drive
# If the files are not in the same drive, then mod_file is not
# in the site-packages or bin directory.
pass

try:
common_path = os.path.commonpath([mod_file, source_path])
if common_path != source_path:
# Do not upload files that do not share a common directory with the source
continue
except ValueError:
# ValueError is raised by windows if the paths are not from the same drive
# If they are not in the same directory, then they do not share a common path,
# so we do not upload the file.
continue

relative_path = os.path.relpath(mod_file, start=source_path)
new_destination = os.path.join(destination, relative_path)

if os.path.exists(new_destination):
# No need to copy if it already exists
continue

os.makedirs(os.path.dirname(new_destination), exist_ok=True)
shutil.copy(mod_file, new_destination)


def get_all_modules(source_path: str, module_name: Optional[str]) -> List[ModuleType]:
"""Import python file with module_name in source_path and return all modules."""
sys_modules = list(sys.modules.values())
if module_name is None or module_name in sys.modules:
# module already exists, there is no need to import it again
return sys_modules

full_module = os.path.join(source_path, *module_name.split("."))
full_module_path = f"{full_module}.py"

is_python_file = os.path.exists(full_module_path) and os.path.isfile(full_module_path)
if not is_python_file:
return sys_modules

from flytekit.core.tracker import import_module_from_file

try:
new_module = import_module_from_file(module_name, full_module_path)
return sys_modules + [new_module]
except Exception:
# Import failed so we fallback to `sys_modules`
return sys_modules


def hash_file(file_path: typing.Union[os.PathLike, str]) -> (bytes, str, int):
"""
Hash a file and produce a digest to be used as a version
Loading

0 comments on commit 4f86504

Please sign in to comment.