From 7418f428254086481e2bb68c7ebcbfb7c18adaf1 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Aug 2024 12:32:27 -0700 Subject: [PATCH 01/17] mid-debugging Signed-off-by: Yee Hing Tong --- chart | 32 +++++ flytekit/clis/sdk_in_container/register.py | 19 +++ flytekit/tools/fast_registration.py | 56 ++++++++- flytekit/tools/ignore.py | 4 + flytekit/tools/module_loader.py | 10 +- flytekit/tools/repo.py | 101 +++++++++++++--- flytekit/tools/script_mode.py | 133 ++++++++++++++++++++- 7 files changed, 329 insertions(+), 26 deletions(-) create mode 100644 chart diff --git a/chart b/chart new file mode 100644 index 0000000000..27a59398fc --- /dev/null +++ b/chart @@ -0,0 +1,32 @@ + + + Local Container + ========== ============= + + myproject/ /root + conf/ + data/ + src/ + lib/ + tasks/ + wfs/ + + + + + + + + + + + + + + + + + + + + diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index e578f06a17..d2e067cfdf 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -12,6 +12,7 @@ from flytekit.interaction.click_types import key_value_callback from flytekit.loggers import logger from flytekit.tools import repo +from flytekit.tools.fast_registration import CopyFileDetection _register_help = """ This command is similar to ``package`` but instead of producing a zip file, all your Flyte entities are compiled, @@ -95,6 +96,13 @@ is_flag=True, help="Skip zipping and uploading the package", ) +@click.option( + "--copy", + required=False, + type=click.Choice(["all", "auto", "none"], case_sensitive=False), + default="auto", + help="Skip zipping and uploading the package", +) @click.option( "--dry-run", default=False, @@ -139,6 +147,7 @@ def register( version: typing.Optional[str], deref_symlinks: bool, non_fast: bool, + copy: str, package_or_module: typing.Tuple[str], dry_run: bool, activate_launchplans: bool, @@ -148,6 +157,15 @@ def register( """ see help """ + # move to callback later + print(f"Register copy {copy} ----") + if copy == "auto": + copy_style = CopyFileDetection.LOADED_MODULES + elif copy == "all": + copy_style = CopyFileDetection.ALL + else: + copy_style = None + pkgs = ctx.obj[constants.CTX_PACKAGES] if not pkgs: logger.debug("No pkgs") @@ -190,6 +208,7 @@ def register( version, deref_symlinks, fast=not non_fast, + copy_style=copy_style, package_or_module=package_or_module, remote=remote, env=env, diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index d17bbe8994..f50f158a68 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -5,10 +5,12 @@ import os import posixpath import subprocess +import sys import tarfile import tempfile import typing from dataclasses import dataclass +from enum import Enum from typing import Optional import click @@ -18,12 +20,17 @@ from flytekit.exceptions.user import FlyteDataNotFoundException from flytekit.loggers import logger from flytekit.tools.ignore import DockerIgnore, FlyteIgnore, GitIgnore, Ignore, IgnoreGroup, StandardIgnore -from flytekit.tools.script_mode import tar_strip_file_attributes +from flytekit.tools.script_mode import ls_files, tar_strip_file_attributes FAST_PREFIX = "fast" FAST_FILEENDING = ".tar.gz" +class CopyFileDetection(Enum): + LOADED_MODULES = 1 + ALL = 2 + + @dataclass(frozen=True) class FastPackageOptions: """ @@ -32,6 +39,26 @@ class FastPackageOptions: ignores: list[Ignore] keep_default_ignores: bool = True + copy_style: CopyFileDetection = CopyFileDetection.LOADED_MODULES + + +""" +clarify tar behavior +- tar doesn't seem to add the folder, just the files, but extracts okay +- this doesn't work for empty folders (but edge case because gitignore ignores them anyways?) +changes to tar +- will now add files one at a time. +- the list will compute the list and the digest +- could have used tar list but seems less powerful, also does more than we want. +still need to +- hook up the actual commands args +- in order to support auto, we have to load all the modules first, then trigger copying, then serialize post-upload +- make each create a separate tar +- have a separate path for old and new commands +process +- like to beta and update docs and test before deprecate +- so basically have to preserve both styles of code - worth it to do this? or just a long beta. +""" def fast_package( @@ -39,6 +66,7 @@ def fast_package( output_dir: os.PathLike, deref_symlinks: bool = False, options: Optional[FastPackageOptions] = None, + # use_old: bool = False, ) -> os.PathLike: """ Takes a source directory and packages everything not covered by common ignores into a tarball @@ -67,6 +95,29 @@ def fast_package( archive_fname = os.path.join(output_dir, archive_fname) + if options and options.copy_style == CopyFileDetection.LOADED_MODULES: + sys_modules = list(sys.modules.values()) + ls, ls_digest = ls_files(str(source), sys_modules, deref_symlinks, ignore) + else: + ls, ls_digest = ls_files(str(source), [], deref_symlinks, ignore) + print(f"Digest check: old {digest} ==? new {ls_digest} -- {digest == ls_digest}") + + with tempfile.TemporaryDirectory() as tmp_dir: + tar_path = os.path.join(tmp_dir, "tmp.tar") + print(f"test tmp dir: {tar_path=}") + with tarfile.open(tar_path, "w", dereference=True) as tar: + for ws_file in ls: + rel_path = os.path.relpath(ws_file, start=source) + # not adding explicit folders, but extracting okay. + tar.add( + os.path.join(source, ws_file), + arcname=rel_path, + filter=lambda x: tar_strip_file_attributes(x), + ) + print("New tar file contents: ======================") + tar.list(verbose=True) + # breakpoint() + with tempfile.TemporaryDirectory() as tmp_dir: tar_path = os.path.join(tmp_dir, "tmp.tar") with tarfile.open(tar_path, "w", dereference=deref_symlinks) as tar: @@ -77,6 +128,9 @@ def fast_package( arcname=ws_file, filter=lambda x: ignore.tar_filter(tar_strip_file_attributes(x)), ) + print("Old tar file contents: ======================") + tar.list(verbose=True) + # breakpoint() with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped: with open(tar_path, "rb") as tar_file: gzipped.write(tar_file.read()) diff --git a/flytekit/tools/ignore.py b/flytekit/tools/ignore.py index e41daf0904..2761b79364 100644 --- a/flytekit/tools/ignore.py +++ b/flytekit/tools/ignore.py @@ -54,15 +54,19 @@ def _list_ignored(self) -> Dict: return {} def _is_ignored(self, path: str) -> bool: + # print(f"\nTesting {path=}") if self.ignored: # git-ls-files uses POSIX paths if Path(path).as_posix() in self.ignored: + # print(f"Path {path} is ignored") return True # Ignore empty directories if os.path.isdir(os.path.join(self.root, path)) and all( [self.is_ignored(os.path.join(path, f)) for f in os.listdir(os.path.join(self.root, path))] ): + # print(f"Path {path} is ignored") return True + # print(f"Path {path} is NOT ignored") return False diff --git a/flytekit/tools/module_loader.py b/flytekit/tools/module_loader.py index dc3a6bb9f4..977a194fbd 100644 --- a/flytekit/tools/module_loader.py +++ b/flytekit/tools/module_loader.py @@ -17,6 +17,12 @@ def add_sys_path(path: Union[str, os.PathLike]) -> Iterator[None]: sys.path.remove(path) +def module_load_error_handler(*args, **kwargs): + from flytekit import logger + + logger.info(f"Error walking package structure when loading: {args}, {kwargs}") + + def just_load_modules(pkgs: List[str]): """ This one differs from the above in that we don't yield anything, just load all the modules. @@ -29,7 +35,9 @@ def just_load_modules(pkgs: List[str]): continue # Note that walk_packages takes an onerror arg and swallows import errors silently otherwise - for _, name, _ in pkgutil.walk_packages(package.__path__, prefix=f"{package_name}."): + for _, name, _ in pkgutil.walk_packages( + package.__path__, prefix=f"{package_name}.", onerror=module_load_error_handler + ): importlib.import_module(name) diff --git a/flytekit/tools/repo.py b/flytekit/tools/repo.py index 5dd68b4261..8676bcf4a7 100644 --- a/flytekit/tools/repo.py +++ b/flytekit/tools/repo.py @@ -52,6 +52,48 @@ def serialize( return registrable_entities +def serialize_load_only( + pkgs: typing.List[str], + settings: SerializationSettings, + local_source_root: typing.Optional[str] = None, +): + """ + See :py:class:`flytekit.models.core.identifier.ResourceType` to match the trailing index in the file name with the + entity type. + :param settings: SerializationSettings to be used + :param pkgs: Dot-delimited Python packages/subpackages to look into for serialization. + :param local_source_root: Where to start looking for the code. + """ + settings.source_root = local_source_root + ctx_builder = FlyteContextManager.current_context().with_serialization_settings(settings) + with FlyteContextManager.with_context(ctx_builder): + # Scan all modules. the act of loading populates the global singleton that contains all objects + with module_loader.add_sys_path(local_source_root): + click.secho(f"Loading packages {pkgs} under source root {local_source_root}", fg="yellow") + module_loader.just_load_modules(pkgs=pkgs) + + +def serialize_get_control_plane_entities( + settings: SerializationSettings, + local_source_root: typing.Optional[str] = None, + options: typing.Optional[Options] = None, +) -> typing.List[FlyteControlPlaneEntity]: + """ + See :py:class:`flytekit.models.core.identifier.ResourceType` to match the trailing index in the file name with the + entity type. + :param options: + :param settings: SerializationSettings to be used + :param pkgs: Dot-delimited Python packages/subpackages to look into for serialization. + :param local_source_root: Where to start looking for the code. + """ + settings.source_root = local_source_root + ctx_builder = FlyteContextManager.current_context().with_serialization_settings(settings) + with FlyteContextManager.with_context(ctx_builder) as ctx: + registrable_entities = get_registrable_entities(ctx, options=options) + click.secho(f"Successfully serialized {len(registrable_entities)} flyte objects", fg="green") + return registrable_entities + + def serialize_to_folder( pkgs: typing.List[str], settings: SerializationSettings, @@ -118,6 +160,7 @@ def serialize_and_package( """ Fist serialize and then package all entities """ + # copy and remove as well. serializable_entities = serialize(pkgs, settings, source, options=options) package(serializable_entities, source, output, fast, deref_symlinks) @@ -147,12 +190,13 @@ def find_common_root( return project_root +# to be renamed, get module names def load_packages_and_modules( ss: SerializationSettings, project_root: Path, pkgs_or_mods: typing.List[str], options: typing.Optional[Options] = None, -) -> typing.List[FlyteControlPlaneEntity]: +) -> typing.List[str]: """ The project root is added as the first entry to sys.path, and then all the specified packages and modules given are loaded with all submodules. The reason for prepending the entry is to ensure that the name that @@ -169,7 +213,7 @@ def load_packages_and_modules( :return: The common detected root path, the output of _find_project_root """ ss.git_repo = _get_git_repo_url(project_root) - pkgs_and_modules = [] + pkgs_and_modules: typing.List[str] = [] for pm in pkgs_or_mods: p = Path(pm).resolve() rel_path_from_root = p.relative_to(project_root) @@ -182,9 +226,11 @@ def load_packages_and_modules( ) pkgs_and_modules.append(dot_delineated) - registrable_entities = serialize(pkgs_and_modules, ss, str(project_root), options) + return pkgs_and_modules - return registrable_entities + # registrable_entities = serialize(pkgs_and_modules, ss, str(project_root), options) + # + # return registrable_entities def secho(i: Identifier, state: str = "success", reason: str = None, op: str = "Registration"): @@ -221,6 +267,7 @@ def register( fast: bool, package_or_module: typing.Tuple[str], remote: FlyteRemote, + copy_style: typing.Optional[fast_registration.CopyFileDetection], env: typing.Optional[typing.Dict[str, str]], dry_run: bool = False, activate_launchplans: bool = False, @@ -228,14 +275,6 @@ def register( ): detected_root = find_common_root(package_or_module) click.secho(f"Detected Root {detected_root}, using this to create deployable package...", fg="yellow") - fast_serialization_settings = None - if fast: - md5_bytes, native_url = remote.fast_package(detected_root, deref_symlinks, output) - fast_serialization_settings = FastSerializationSettings( - enabled=True, - destination_dir=destination_dir, - distribution_location=native_url, - ) # Create serialization settings # Todo: Rely on default Python interpreter for now, this will break custom Spark containers @@ -244,28 +283,52 @@ def register( domain=domain, version=version, image_config=image_config, - fast_serialization_settings=fast_serialization_settings, + fast_serialization_settings=None, env=env, ) + # should probably add incomplete fast settings - if not version and fast: - version = remote._version_from_hash(md5_bytes, serialization_settings, service_account, raw_data_prefix) # noqa - click.secho(f"Computed version is {version}", fg="yellow") - elif not version: + if not version and not fast: click.secho("Version is required.", fg="red") return b = serialization_settings.new_builder() - b.version = version serialization_settings = b.build() options = Options.default_from(k8s_service_account=service_account, raw_data_prefix=raw_data_prefix) # Load all the entities FlyteContextManager.push_context(remote.context) - registrable_entities = load_packages_and_modules( + pkgs_and_modules = load_packages_and_modules( # to be renamed serialization_settings, detected_root, list(package_or_module), options ) + + # NB: The change here is that the loading of user code _cannot_ depend on fast register information (the computed + # version, upload native url, hash digest, etc.). + serialize_load_only(pkgs_and_modules, serialization_settings, str(detected_root)) + + # Move the fast registration stuff here + if fast: + md5_bytes, native_url = remote.fast_package( + detected_root, + deref_symlinks, + output, + options=fast_registration.FastPackageOptions([], copy_style=copy_style), + ) + fast_serialization_settings = FastSerializationSettings( + enabled=True, + destination_dir=destination_dir, + distribution_location=native_url, + ) + # update serialization settings from fast + serialization_settings.fast_serialization_settings = fast_serialization_settings + if not version: + version = remote._version_from_hash(md5_bytes, serialization_settings, service_account, raw_data_prefix) # noqa + serialization_settings.version = version + click.secho(f"Computed version is {version}", fg="yellow") + + registrable_entities = serialize_get_control_plane_entities(serialization_settings, str(detected_root), options) + FlyteContextManager.pop_context() if len(registrable_entities) == 0: click.secho("No Flyte entities were detected. Aborting!", fg="red") diff --git a/flytekit/tools/script_mode.py b/flytekit/tools/script_mode.py index 9d91731389..b616b7ffe2 100644 --- a/flytekit/tools/script_mode.py +++ b/flytekit/tools/script_mode.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import gzip import hashlib import os @@ -9,7 +11,10 @@ import typing from pathlib import Path from types import ModuleType -from typing import List, Optional +from typing import List, Optional, Tuple, Union + +from flytekit.loggers import logger +from flytekit.tools.ignore import IgnoreGroup def compress_scripts(source_path: str, destination: str, modules: List[ModuleType]): @@ -79,17 +84,114 @@ def tar_strip_file_attributes(tar_info: tarfile.TarInfo) -> tarfile.TarInfo: return tar_info -def add_imported_modules_from_source(source_path: str, destination: str, modules: List[ModuleType]): +def ls_files( + source_path: str, + modules: List[ModuleType], + deref_symlinks: bool = False, + ignore_group: Optional[IgnoreGroup] = None, +) -> Tuple[List[str], str]: + """ + user_modules_and_packages is a list of the Python modules and packages, expressed as absolute paths, that the + user has run this pyflyte command with. For pyflyte run for instance, this is just a list of one. + This is used for two reasons. + - Everything in this list needs to be returned. Files are returned and folders are walked. + - A common source path is derived from this is, which is just the common folder that contains everything in the + list. For ex. if you do + $ pyflyte --pkgs a.b,a.c package + Then the common root is just the folder a/. The modules list is filtered against this root. Only files + representing modules under this root are included + + + If the modules list should be a list of all the + + needs to compute digest as well. + """ + + # Unlike the below, the value error here is useful and should be returned to the user, like if absolute and + # relative paths are mixed. + + # This is --copy auto + if modules: + all_files = list_imported_modules_as_files(source_path, modules) + # this is --copy all + else: + all_files = list_all_files(source_path, deref_symlinks, ignore_group) + + hasher = hashlib.md5() + for abspath in all_files: + relpath = os.path.relpath(abspath, source_path) + _filehash_update(abspath, hasher) + _pathhash_update(relpath, hasher) + + digest = hasher.hexdigest() + + return all_files, digest + + +def _filehash_update(path: Union[os.PathLike, str], hasher: hashlib._Hash) -> None: + blocksize = 65536 + with open(path, "rb") as f: + bytes = f.read(blocksize) + while bytes: + hasher.update(bytes) + bytes = f.read(blocksize) + + +def _pathhash_update(path: Union[os.PathLike, str], hasher: hashlib._Hash) -> None: + path_list = path.split(os.sep) + hasher.update("".join(path_list).encode("utf-8")) + + +def list_all_files(source_path: str, deref_symlinks, ignore_group: Optional[IgnoreGroup] = None) -> List[str]: + all_files = [] + + # This is needed to prevent infinite recursion when walking with followlinks + visited_inodes = set() + + for root, dirnames, files in os.walk(source_path, topdown=True, followlinks=deref_symlinks): + if deref_symlinks: + inode = os.stat(root).st_ino + if inode in visited_inodes: + continue + visited_inodes.add(inode) + + ff = [] + files.sort() + for fname in files: + abspath = os.path.join(root, fname) + # Only consider files that exist (e.g. disregard symlinks that point to non-existent files) + if not os.path.exists(abspath): + logger.info(f"Skipping non-existent file {abspath}") + continue + if ignore_group: + if ignore_group.is_ignored(abspath): + continue + + ff.append(abspath) + all_files.extend(ff) + + # Remove directories that we've already visited from dirnames + if deref_symlinks: + dirnames[:] = [d for d in dirnames if os.stat(os.path.join(root, d)).st_ino not in visited_inodes] + + return all_files + + +def list_imported_modules_as_files(source_path: str, modules: List[ModuleType]) -> List[str]: """Copies modules into destination that are in modules. The module files are copied only if: 1. Not a site-packages. These are installed packages and not user files. 2. Not in the bin. These are also installed and not user files. 3. Does not share a common path with the source_path. """ + # source path is the folder holding the main script. + # but in register/package case, there are multiple folders. + # identify a common root amongst the packages listed? site_packages = site.getsitepackages() site_packages_set = set(site_packages) bin_directory = os.path.dirname(sys.executable) + files = [] for mod in modules: try: @@ -129,7 +231,26 @@ def add_imported_modules_from_source(source_path: str, destination: str, modules # so we do not upload the file. continue - relative_path = os.path.relpath(mod_file, start=source_path) + files.append(mod_file) + + return files + + +def add_imported_modules_from_source(source_path: str, destination: str, modules: List[ModuleType]): + """Copies modules into destination that are in modules. The module files are copied only if: + + 1. Not a site-packages. These are installed packages and not user files. + 2. Not in the bin. These are also installed and not user files. + 3. Does not share a common path with the source_path. + """ + # source path is the folder holding the main script. + # but in register/package case, there are multiple folders. + # identify a common root amongst the packages listed? + + files = list_imported_modules_as_files(source_path, modules) + print("files", files) + for file in files: + relative_path = os.path.relpath(file, start=source_path) new_destination = os.path.join(destination, relative_path) if os.path.exists(new_destination): @@ -137,7 +258,7 @@ def add_imported_modules_from_source(source_path: str, destination: str, modules continue os.makedirs(os.path.dirname(new_destination), exist_ok=True) - shutil.copy(mod_file, new_destination) + shutil.copy(file, new_destination) def get_all_modules(source_path: str, module_name: Optional[str]) -> List[ModuleType]: @@ -154,12 +275,14 @@ def get_all_modules(source_path: str, module_name: Optional[str]) -> List[Module if not is_python_file: return sys_modules + # should move it here probably from flytekit.core.tracker import import_module_from_file try: new_module = import_module_from_file(module_name, full_module_path) return sys_modules + [new_module] - except Exception: + except Exception as exc: + logger.error(f"Using system modules, failed to import {module_name} from {full_module_path}: {str(exc)}") # Import failed so we fallback to `sys_modules` return sys_modules From afee7ea33bd0952333b208848e6ffd0adf5c4d51 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Aug 2024 17:00:58 -0700 Subject: [PATCH 02/17] pipe through Signed-off-by: Yee Hing Tong --- chart | 32 ------- flytekit/clis/sdk_in_container/helpers.py | 12 +++ flytekit/clis/sdk_in_container/package.py | 14 ++- flytekit/clis/sdk_in_container/register.py | 25 +++-- flytekit/clis/sdk_in_container/run.py | 19 +++- flytekit/remote/remote.py | 2 +- flytekit/tools/fast_registration.py | 105 ++++++++++++--------- flytekit/tools/repo.py | 24 +++-- 8 files changed, 130 insertions(+), 103 deletions(-) delete mode 100644 chart diff --git a/chart b/chart deleted file mode 100644 index 27a59398fc..0000000000 --- a/chart +++ /dev/null @@ -1,32 +0,0 @@ - - - Local Container - ========== ============= - - myproject/ /root - conf/ - data/ - src/ - lib/ - tasks/ - wfs/ - - - - - - - - - - - - - - - - - - - - diff --git a/flytekit/clis/sdk_in_container/helpers.py b/flytekit/clis/sdk_in_container/helpers.py index 5ec4b9b262..d2b78983fc 100644 --- a/flytekit/clis/sdk_in_container/helpers.py +++ b/flytekit/clis/sdk_in_container/helpers.py @@ -7,6 +7,7 @@ from flytekit.configuration import ImageConfig from flytekit.configuration.plugin import get_plugin from flytekit.remote.remote import FlyteRemote +from flytekit.tools.fast_registration import CopyFileDetection FLYTE_REMOTE_INSTANCE_KEY = "flyte_remote" @@ -61,3 +62,14 @@ def patch_image_config(config_file: Optional[str], image_config: ImageConfig) -> if addl.name not in additional_image_names: new_additional_images.append(addl) return replace(image_config, default_image=new_default, images=new_additional_images) + + +def parse_copy(ctx, param, value) -> Optional[CopyFileDetection]: + if value == "auto": + copy_style = CopyFileDetection.LOADED_MODULES + elif value == "all": + copy_style = CopyFileDetection.ALL + else: + copy_style = None + + return copy_style diff --git a/flytekit/clis/sdk_in_container/package.py b/flytekit/clis/sdk_in_container/package.py index c61b02a16d..71ba386b7b 100644 --- a/flytekit/clis/sdk_in_container/package.py +++ b/flytekit/clis/sdk_in_container/package.py @@ -1,9 +1,11 @@ import os +import typing import rich_click as click from flytekit.clis.helpers import display_help_with_error from flytekit.clis.sdk_in_container import constants +from flytekit.clis.sdk_in_container.helpers import parse_copy from flytekit.configuration import ( DEFAULT_RUNTIME_PYTHON_INTERPRETER, FastSerializationSettings, @@ -11,6 +13,7 @@ SerializationSettings, ) from flytekit.interaction.click_types import key_value_callback +from flytekit.tools.fast_registration import CopyFileDetection from flytekit.tools.repo import NoSerializableEntitiesError, serialize_and_package @@ -53,6 +56,14 @@ help="This flag enables fast packaging, that allows `no container build` deploys of flyte workflows and tasks. " "Note this needs additional configuration, refer to the docs.", ) +@click.option( + "--copy", + required=False, + type=click.Choice(["all", "auto", "none"], case_sensitive=False), + default="none", # this will be changed to "all" after removing non-fast option + callback=parse_copy, + help="Specify how and whether to use fast register", +) @click.option( "-f", "--force", @@ -100,6 +111,7 @@ def package( source, output, force, + copy: typing.Optional[CopyFileDetection], fast, in_container_source_path, python_interpreter, @@ -136,6 +148,6 @@ def package( display_help_with_error(ctx, "No packages to scan for flyte entities. Aborting!") try: - serialize_and_package(pkgs, serialization_settings, source, output, fast, deref_symlinks) + serialize_and_package(pkgs, serialization_settings, source, output, fast, deref_symlinks, copy_style=copy) except NoSerializableEntitiesError: click.secho(f"No flyte objects found in packages {pkgs}", fg="yellow") diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index d2e067cfdf..673608055c 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -5,7 +5,11 @@ from flytekit.clis.helpers import display_help_with_error from flytekit.clis.sdk_in_container import constants -from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context, patch_image_config +from flytekit.clis.sdk_in_container.helpers import ( + get_and_save_remote_with_click_context, + parse_copy, + patch_image_config, +) from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec from flytekit.configuration import ImageConfig from flytekit.configuration.default_images import DefaultImages @@ -100,8 +104,9 @@ "--copy", required=False, type=click.Choice(["all", "auto", "none"], case_sensitive=False), - default="auto", - help="Skip zipping and uploading the package", + default="none", # this will be changed to "all" after removing non-fast option + callback=parse_copy, + help="Specify how and whether to use fast register", ) @click.option( "--dry-run", @@ -147,7 +152,7 @@ def register( version: typing.Optional[str], deref_symlinks: bool, non_fast: bool, - copy: str, + copy: typing.Optional[CopyFileDetection], package_or_module: typing.Tuple[str], dry_run: bool, activate_launchplans: bool, @@ -157,14 +162,8 @@ def register( """ see help """ - # move to callback later - print(f"Register copy {copy} ----") - if copy == "auto": - copy_style = CopyFileDetection.LOADED_MODULES - elif copy == "all": - copy_style = CopyFileDetection.ALL - else: - copy_style = None + + # Add error handling for fast/copy conflicts pkgs = ctx.obj[constants.CTX_PACKAGES] if not pkgs: @@ -208,7 +207,7 @@ def register( version, deref_symlinks, fast=not non_fast, - copy_style=copy_style, + copy_style=copy, package_or_module=package_or_module, remote=remote, env=env, diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index d8c215a598..22b6cd441d 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -16,7 +16,10 @@ from typing_extensions import get_origin from flytekit import Annotations, FlyteContext, FlyteContextManager, Labels, Literal -from flytekit.clis.sdk_in_container.helpers import patch_image_config +from flytekit.clis.sdk_in_container.helpers import ( + parse_copy, + patch_image_config, +) from flytekit.clis.sdk_in_container.utils import ( PyFlyteParams, domain_option, @@ -44,6 +47,7 @@ from flytekit.remote import FlyteLaunchPlan, FlyteRemote, FlyteTask, FlyteWorkflow, remote_fs from flytekit.remote.executions import FlyteWorkflowExecution from flytekit.tools import module_loader +from flytekit.tools.fast_registration import CopyFileDetection, FastPackageOptions from flytekit.tools.script_mode import _find_project_root, compress_scripts, get_all_modules from flytekit.tools.translator import Options @@ -88,6 +92,17 @@ class RunLevelParams(PyFlyteParams): help="Copy all files in the source root directory to the destination directory", ) ) + copy: typing.Optional[CopyFileDetection] = make_click_option_field( + click.Option( + param_decls=["--copy"], + required=False, + default="none", + type=click.Choice(["all", "auto", "none"], case_sensitive=False), + show_default=True, + callback=parse_copy, + help="Specifies how to detect which files to copy into image", + ) + ) image_config: ImageConfig = make_click_option_field( click.Option( param_decls=["-i", "--image", "image_config"], @@ -598,6 +613,7 @@ def _run(*args, **kwargs): image_config = patch_image_config(config_file, image_config) with context_manager.FlyteContextManager.with_context(remote.context.new_builder()): + fast_package_options = FastPackageOptions([], copy_style=run_level_params.copy) remote_entity = remote.register_script( entity, project=run_level_params.project, @@ -607,6 +623,7 @@ def _run(*args, **kwargs): source_path=run_level_params.computed_params.project_root, module_name=run_level_params.computed_params.module, copy_all=run_level_params.copy_all, + fast_package_options=fast_package_options, ) run_remote( diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index dd0d50b8af..0bd4e0fe00 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -1014,7 +1014,7 @@ def register_script( image_config = ImageConfig.auto_default_image() with tempfile.TemporaryDirectory() as tmp_dir: - if copy_all: + if copy_all or (fast_package_options and fast_package_options.copy_style): md5_bytes, upload_native_url = self.fast_package( pathlib.Path(source_path), False, tmp_dir, fast_package_options ) diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index f50f158a68..4b3e414e52 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -39,7 +39,7 @@ class FastPackageOptions: ignores: list[Ignore] keep_default_ignores: bool = True - copy_style: CopyFileDetection = CopyFileDetection.LOADED_MODULES + copy_style: Optional[CopyFileDetection] = None """ @@ -50,14 +50,13 @@ class FastPackageOptions: - will now add files one at a time. - the list will compute the list and the digest - could have used tar list but seems less powerful, also does more than we want. -still need to -- hook up the actual commands args - in order to support auto, we have to load all the modules first, then trigger copying, then serialize post-upload +- hook up the actual commands args - make each create a separate tar - have a separate path for old and new commands -process -- like to beta and update docs and test before deprecate -- so basically have to preserve both styles of code - worth it to do this? or just a long beta. + +- finish deprecating serialize function +- update comment """ @@ -66,7 +65,6 @@ def fast_package( output_dir: os.PathLike, deref_symlinks: bool = False, options: Optional[FastPackageOptions] = None, - # use_old: bool = False, ) -> os.PathLike: """ Takes a source directory and packages everything not covered by common ignores into a tarball @@ -74,6 +72,7 @@ def fast_package( :param os.PathLike source: :param os.PathLike output_dir: :param bool deref_symlinks: Enables dereferencing symlinks when packaging directory + :param options: The CopyFileDetection option set to None :return os.PathLike: """ default_ignores = [GitIgnore, DockerIgnore, StandardIgnore, FlyteIgnore] @@ -87,53 +86,65 @@ def fast_package( ignore = IgnoreGroup(source, ignores) digest = compute_digest(source, ignore.is_ignored) - archive_fname = f"{FAST_PREFIX}{digest}{FAST_FILEENDING}" + # Compute where the archive should be written + archive_fname = f"{FAST_PREFIX}{digest}{FAST_FILEENDING}" if output_dir is None: output_dir = tempfile.mkdtemp() click.secho(f"No output path provided, using a temporary directory at {output_dir} instead", fg="yellow") - archive_fname = os.path.join(output_dir, archive_fname) - if options and options.copy_style == CopyFileDetection.LOADED_MODULES: - sys_modules = list(sys.modules.values()) - ls, ls_digest = ls_files(str(source), sys_modules, deref_symlinks, ignore) + # This function is temporarily split into two, to support the creation of the tar file in both the old way, + # copying the underlying items in the source dir by doing a listdir, and the new way, relying on a list of files. + if options and ( + options.copy_style == CopyFileDetection.LOADED_MODULES or options.copy_style == CopyFileDetection.ALL + ): + if options.copy_style == CopyFileDetection.LOADED_MODULES: + # This is the 'auto' semantic by default used for pyflyte run, it only copies loaded .py files. + sys_modules = list(sys.modules.values()) + ls, ls_digest = ls_files(str(source), sys_modules, deref_symlinks, ignore) + else: + # This triggers listing of all files, mimicking the old way of creating the tar file. + ls, ls_digest = ls_files(str(source), [], deref_symlinks, ignore) + + print(f"Digest check: old {digest} ==? new {ls_digest} -- {digest == ls_digest}") + + with tempfile.TemporaryDirectory() as tmp_dir: + tar_path = os.path.join(tmp_dir, "tmp.tar") + print(f"test tmp dir: {tar_path=}") + with tarfile.open(tar_path, "w", dereference=True) as tar: + for ws_file in ls: + rel_path = os.path.relpath(ws_file, start=source) + tar.add( + os.path.join(source, ws_file), + arcname=rel_path, + filter=lambda x: tar_strip_file_attributes(x), + ) + print("New tar file contents: ======================") + tar.list(verbose=True) + + with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped: + with open(tar_path, "rb") as tar_file: + gzipped.write(tar_file.read()) + + # Original tar command else: - ls, ls_digest = ls_files(str(source), [], deref_symlinks, ignore) - print(f"Digest check: old {digest} ==? new {ls_digest} -- {digest == ls_digest}") - - with tempfile.TemporaryDirectory() as tmp_dir: - tar_path = os.path.join(tmp_dir, "tmp.tar") - print(f"test tmp dir: {tar_path=}") - with tarfile.open(tar_path, "w", dereference=True) as tar: - for ws_file in ls: - rel_path = os.path.relpath(ws_file, start=source) - # not adding explicit folders, but extracting okay. - tar.add( - os.path.join(source, ws_file), - arcname=rel_path, - filter=lambda x: tar_strip_file_attributes(x), - ) - print("New tar file contents: ======================") - tar.list(verbose=True) - # breakpoint() - - with tempfile.TemporaryDirectory() as tmp_dir: - tar_path = os.path.join(tmp_dir, "tmp.tar") - with tarfile.open(tar_path, "w", dereference=deref_symlinks) as tar: - files: typing.List[str] = os.listdir(source) - for ws_file in files: - tar.add( - os.path.join(source, ws_file), - arcname=ws_file, - filter=lambda x: ignore.tar_filter(tar_strip_file_attributes(x)), - ) - print("Old tar file contents: ======================") - tar.list(verbose=True) - # breakpoint() - with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped: - with open(tar_path, "rb") as tar_file: - gzipped.write(tar_file.read()) + with tempfile.TemporaryDirectory() as tmp_dir: + tar_path = os.path.join(tmp_dir, "tmp.tar") + with tarfile.open(tar_path, "w", dereference=deref_symlinks) as tar: + files: typing.List[str] = os.listdir(source) + for ws_file in files: + tar.add( + os.path.join(source, ws_file), + arcname=ws_file, + filter=lambda x: ignore.tar_filter(tar_strip_file_attributes(x)), + ) + print("Old tar file contents: ======================") + tar.list(verbose=True) + + with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped: + with open(tar_path, "rb") as tar_file: + gzipped.write(tar_file.read()) return archive_fname diff --git a/flytekit/tools/repo.py b/flytekit/tools/repo.py index 8676bcf4a7..4cc60b8ea2 100644 --- a/flytekit/tools/repo.py +++ b/flytekit/tools/repo.py @@ -116,6 +116,7 @@ def package( output: str = "./flyte-package.tgz", fast: bool = False, deref_symlinks: bool = False, + copy_style: typing.Optional[fast_registration.CopyFileDetection] = None, ): """ Package the given entities and the source code (if fast is enabled) into a package with the given name in output @@ -124,6 +125,7 @@ def package( :param output: output package name with suffix :param fast: fast enabled implies source code is bundled :param deref_symlinks: if enabled then symlinks are dereferenced during packaging + :param copy_style: """ if not serializable_entities: raise NoSerializableEntitiesError("Nothing to package") @@ -137,7 +139,9 @@ def package( if os.path.abspath(output).startswith(os.path.abspath(source)) and os.path.exists(output): click.secho(f"{output} already exists within {source}, deleting and re-creating it", fg="yellow") os.remove(output) - archive_fname = fast_registration.fast_package(source, output_tmpdir, deref_symlinks) + archive_fname = fast_registration.fast_package( + source, output_tmpdir, deref_symlinks, fast_registration.FastPackageOptions([], copy_style=copy_style) + ) click.secho(f"Fast mode enabled: compressed archive {archive_fname}", dim=True) with tarfile.open(output, "w:gz") as tar: @@ -156,13 +160,14 @@ def serialize_and_package( fast: bool = False, deref_symlinks: bool = False, options: typing.Optional[Options] = None, + copy_style: typing.Optional[fast_registration.CopyFileDetection] = None, ): """ Fist serialize and then package all entities """ - # copy and remove as well. - serializable_entities = serialize(pkgs, settings, source, options=options) - package(serializable_entities, source, output, fast, deref_symlinks) + serialize_load_only(pkgs, settings, source) + serializable_entities = serialize_get_control_plane_entities(settings, source, options=options) + package(serializable_entities, source, output, fast, deref_symlinks, copy_style) def find_common_root( @@ -273,6 +278,10 @@ def register( activate_launchplans: bool = False, skip_errors: bool = False, ): + """ + Temporarily, for fast register, specify both the fast arg as well as copy_style. + fast == True with copy_style == None means use the old fast register tar'ring method. + """ detected_root = find_common_root(package_or_module) click.secho(f"Detected Root {detected_root}, using this to create deployable package...", fg="yellow") @@ -283,10 +292,9 @@ def register( domain=domain, version=version, image_config=image_config, - fast_serialization_settings=None, + fast_serialization_settings=None, # should probably add incomplete fast settings env=env, ) - # should probably add incomplete fast settings if not version and not fast: click.secho("Version is required.", fg="red") @@ -307,7 +315,7 @@ def register( # version, upload native url, hash digest, etc.). serialize_load_only(pkgs_and_modules, serialization_settings, str(detected_root)) - # Move the fast registration stuff here + # Fast registration is handled after module loading if fast: md5_bytes, native_url = remote.fast_package( detected_root, @@ -315,12 +323,12 @@ def register( output, options=fast_registration.FastPackageOptions([], copy_style=copy_style), ) + # update serialization settings from fast register output fast_serialization_settings = FastSerializationSettings( enabled=True, destination_dir=destination_dir, distribution_location=native_url, ) - # update serialization settings from fast serialization_settings.fast_serialization_settings = fast_serialization_settings if not version: version = remote._version_from_hash(md5_bytes, serialization_settings, service_account, raw_data_prefix) # noqa From 4f1d8a3ab009e03a626bd565c4bca29d587d62b9 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Aug 2024 17:09:40 -0700 Subject: [PATCH 03/17] delete serialize function Signed-off-by: Yee Hing Tong --- flytekit/tools/fast_registration.py | 1 - flytekit/tools/repo.py | 36 +++++++---------------------- 2 files changed, 8 insertions(+), 29 deletions(-) diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index 4b3e414e52..e9d9e1b248 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -54,7 +54,6 @@ class FastPackageOptions: - hook up the actual commands args - make each create a separate tar - have a separate path for old and new commands - - finish deprecating serialize function - update comment """ diff --git a/flytekit/tools/repo.py b/flytekit/tools/repo.py index 4cc60b8ea2..8d4d70704c 100644 --- a/flytekit/tools/repo.py +++ b/flytekit/tools/repo.py @@ -25,33 +25,6 @@ class NoSerializableEntitiesError(Exception): pass -def serialize( - pkgs: typing.List[str], - settings: SerializationSettings, - local_source_root: typing.Optional[str] = None, - options: typing.Optional[Options] = None, -) -> typing.List[FlyteControlPlaneEntity]: - """ - See :py:class:`flytekit.models.core.identifier.ResourceType` to match the trailing index in the file name with the - entity type. - :param options: - :param settings: SerializationSettings to be used - :param pkgs: Dot-delimited Python packages/subpackages to look into for serialization. - :param local_source_root: Where to start looking for the code. - """ - settings.source_root = local_source_root - ctx = FlyteContextManager.current_context().with_serialization_settings(settings) - with FlyteContextManager.with_context(ctx) as ctx: - # Scan all modules. the act of loading populates the global singleton that contains all objects - with module_loader.add_sys_path(local_source_root): - click.secho(f"Loading packages {pkgs} under source root {local_source_root}", fg="yellow") - module_loader.just_load_modules(pkgs=pkgs) - - registrable_entities = get_registrable_entities(ctx, options=options) - click.secho(f"Successfully serialized {len(registrable_entities)} flyte objects", fg="green") - return registrable_entities - - def serialize_load_only( pkgs: typing.List[str], settings: SerializationSettings, @@ -106,7 +79,8 @@ def serialize_to_folder( """ if folder is None: folder = "." - loaded_entities = serialize(pkgs, settings, local_source_root, options=options) + serialize_load_only(pkgs, settings, local_source_root) + loaded_entities = serialize_get_control_plane_entities(settings, local_source_root, options=options) persist_registrable_entities(loaded_entities, folder) @@ -126,6 +100,10 @@ def package( :param fast: fast enabled implies source code is bundled :param deref_symlinks: if enabled then symlinks are dereferenced during packaging :param copy_style: + + Temporarily, for fast register, specify both the fast arg as well as copy_style fast == True with + copy_style == None means use the old fast register tar'ring method. + In the future the fast bool will be removed, and copy_style == None will mean do not fast register. """ if not serializable_entities: raise NoSerializableEntitiesError("Nothing to package") @@ -164,6 +142,8 @@ def serialize_and_package( ): """ Fist serialize and then package all entities + Temporarily for fast package, specify both the fast arg as well as copy_style. + fast == True with copy_style == None means use the old fast register tar'ring method. """ serialize_load_only(pkgs, settings, source) serializable_entities = serialize_get_control_plane_entities(settings, source, options=options) From 5b812642d9e86c6d153094af9a2d235f7861cd6d Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Aug 2024 18:45:57 -0700 Subject: [PATCH 04/17] update help messages and future hints Signed-off-by: Yee Hing Tong --- flytekit/clis/sdk_in_container/helpers.py | 2 ++ flytekit/clis/sdk_in_container/package.py | 16 ++++++++--- flytekit/clis/sdk_in_container/register.py | 21 ++++++++++---- flytekit/clis/sdk_in_container/run.py | 10 ++++--- flytekit/tools/fast_registration.py | 33 ++++++++++++++++------ 5 files changed, 61 insertions(+), 21 deletions(-) diff --git a/flytekit/clis/sdk_in_container/helpers.py b/flytekit/clis/sdk_in_container/helpers.py index d2b78983fc..766af2cfec 100644 --- a/flytekit/clis/sdk_in_container/helpers.py +++ b/flytekit/clis/sdk_in_container/helpers.py @@ -69,6 +69,8 @@ def parse_copy(ctx, param, value) -> Optional[CopyFileDetection]: copy_style = CopyFileDetection.LOADED_MODULES elif value == "all": copy_style = CopyFileDetection.ALL + elif value == "none": + copy_style = CopyFileDetection.TEMP_NO_COPY else: copy_style = None diff --git a/flytekit/clis/sdk_in_container/package.py b/flytekit/clis/sdk_in_container/package.py index 71ba386b7b..38b216312d 100644 --- a/flytekit/clis/sdk_in_container/package.py +++ b/flytekit/clis/sdk_in_container/package.py @@ -53,16 +53,18 @@ is_flag=True, default=False, required=False, - help="This flag enables fast packaging, that allows `no container build` deploys of flyte workflows and tasks. " - "Note this needs additional configuration, refer to the docs.", + help="[Will be deprecated, see --copy] This flag enables fast packaging, that allows `no container build`" + " deploys of flyte workflows and tasks. You can specify --copy all/auto instead" + " Note this needs additional configuration, refer to the docs.", ) @click.option( "--copy", required=False, type=click.Choice(["all", "auto", "none"], case_sensitive=False), - default="none", # this will be changed to "all" after removing non-fast option + default=None, # this will be changed to "none" after removing fast option callback=parse_copy, - help="Specify how and whether to use fast register", + help="[Beta] Specify how and whether to use fast register" + " 'all' will behave as the current fast flag copying all files, 'auto' copies only loaded Python modules", ) @click.option( "-f", @@ -125,6 +127,12 @@ def package( object contains the WorkflowTemplate, along with the relevant tasks for that workflow. This serialization step will set the name of the tasks to the fully qualified name of the task function. """ + if copy == CopyFileDetection.TEMP_NO_COPY: + raise ValueError("--copy none doesn't need to be specified, package by default does not copy files") + elif copy == CopyFileDetection.ALL or copy == CopyFileDetection.LOADED_MODULES: + # for those migrating, who only set --copy all/auto but don't have --fast set. + fast = True + if os.path.exists(output) and not force: raise click.BadParameter( click.style( diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 673608055c..12ea050940 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -98,15 +98,16 @@ "--non-fast", default=False, is_flag=True, - help="Skip zipping and uploading the package", + help="[Will be deprecated, see --copy] Skip zipping and uploading the package. You can specify --copy none instead", ) @click.option( "--copy", required=False, type=click.Choice(["all", "auto", "none"], case_sensitive=False), - default="none", # this will be changed to "all" after removing non-fast option + default=None, # this will be changed to "all" after removing non-fast option callback=parse_copy, - help="Specify how and whether to use fast register", + help="[Beta] Specify how and whether to use fast register" + " 'all' is the current behavior copying all files from root, 'auto' copies only loaded Python modules", ) @click.option( "--dry-run", @@ -163,7 +164,17 @@ def register( see help """ - # Add error handling for fast/copy conflicts + # Error handling for non-fast/copy conflicts + if copy == CopyFileDetection.TEMP_NO_COPY: + non_fast = True + # Set this to None because downstream logic currently detects None to mean old logic. + copy = None + elif copy == CopyFileDetection.ALL: + if non_fast: + raise ValueError("Conflicting options: cannot specify both --non-fast and --copy all") + elif copy == CopyFileDetection.LOADED_MODULES: + if non_fast: + raise ValueError("Conflicting options: cannot specify both --non-fast and --copy auto") pkgs = ctx.obj[constants.CTX_PACKAGES] if not pkgs: @@ -172,7 +183,7 @@ def register( raise ValueError("Unimplemented, just specify pkgs like folder/files as args at the end of the command") if non_fast and not version: - raise ValueError("Version is a required parameter in case --non-fast is specified.") + raise ValueError("Version is a required parameter in case --non-fast/--copy none is specified.") if len(package_or_module) == 0: display_help_with_error( diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index 22b6cd441d..c26491fb60 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -89,18 +89,20 @@ class RunLevelParams(PyFlyteParams): is_flag=True, default=False, show_default=True, - help="Copy all files in the source root directory to the destination directory", + help="[Will be deprecated, see --copy] Copy all files in the source root directory to" + " the destination directory. You can specify --copy all instead", ) ) copy: typing.Optional[CopyFileDetection] = make_click_option_field( click.Option( param_decls=["--copy"], required=False, - default="none", - type=click.Choice(["all", "auto", "none"], case_sensitive=False), + default=None, # this will change to "auto" after removing copy_all option + type=click.Choice(["all", "auto"], case_sensitive=False), show_default=True, callback=parse_copy, - help="Specifies how to detect which files to copy into image", + help="[Beta] Specifies how to detect which files to copy into image." + " 'all' will behave as the current copy-all flag, 'auto' copies only loaded Python modules", ) ) image_config: ImageConfig = make_click_option_field( diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index e9d9e1b248..0735b23b2a 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -29,6 +29,11 @@ class CopyFileDetection(Enum): LOADED_MODULES = 1 ALL = 2 + # This is a temporary option to be removed in the future. In the future this value of the enum should simply + # be Python None. Here now to distinguish between users explicitly setting --copy none and not setting the flag. + # This is only used for register, not for package or run because run doesn't have a no-fast-register option and + # package is by default non-fast. + TEMP_NO_COPY = 3 @dataclass(frozen=True) @@ -56,6 +61,11 @@ class FastPackageOptions: - have a separate path for old and new commands - finish deprecating serialize function - update comment +- move compute of old digest and fname construction + +- update click help +- move prints to debug, add click +- add manual option checking """ @@ -86,13 +96,6 @@ def fast_package( digest = compute_digest(source, ignore.is_ignored) - # Compute where the archive should be written - archive_fname = f"{FAST_PREFIX}{digest}{FAST_FILEENDING}" - if output_dir is None: - output_dir = tempfile.mkdtemp() - click.secho(f"No output path provided, using a temporary directory at {output_dir} instead", fg="yellow") - archive_fname = os.path.join(output_dir, archive_fname) - # This function is temporarily split into two, to support the creation of the tar file in both the old way, # copying the underlying items in the source dir by doing a listdir, and the new way, relying on a list of files. if options and ( @@ -108,6 +111,13 @@ def fast_package( print(f"Digest check: old {digest} ==? new {ls_digest} -- {digest == ls_digest}") + # Compute where the archive should be written + archive_fname = f"{FAST_PREFIX}{ls_digest}{FAST_FILEENDING}" + if output_dir is None: + output_dir = tempfile.mkdtemp() + click.secho(f"No output path provided, using a temporary directory at {output_dir} instead", fg="yellow") + archive_fname = os.path.join(output_dir, archive_fname) + with tempfile.TemporaryDirectory() as tmp_dir: tar_path = os.path.join(tmp_dir, "tmp.tar") print(f"test tmp dir: {tar_path=}") @@ -126,8 +136,15 @@ def fast_package( with open(tar_path, "rb") as tar_file: gzipped.write(tar_file.read()) - # Original tar command + # Original tar command - This condition to be removed in the future. else: + # Compute where the archive should be written + archive_fname = f"{FAST_PREFIX}{digest}{FAST_FILEENDING}" + if output_dir is None: + output_dir = tempfile.mkdtemp() + click.secho(f"No output path provided, using a temporary directory at {output_dir} instead", fg="yellow") + archive_fname = os.path.join(output_dir, archive_fname) + with tempfile.TemporaryDirectory() as tmp_dir: tar_path = os.path.join(tmp_dir, "tmp.tar") with tarfile.open(tar_path, "w", dereference=deref_symlinks) as tar: From 9710b394f9d667fd05b69335793034a0221f3477 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Aug 2024 18:47:48 -0700 Subject: [PATCH 05/17] personal log Signed-off-by: Yee Hing Tong --- flytekit/tools/fast_registration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index 0735b23b2a..d9de907d62 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -62,10 +62,10 @@ class FastPackageOptions: - finish deprecating serialize function - update comment - move compute of old digest and fname construction - - update click help -- move prints to debug, add click - add manual option checking + +- move prints to debug, add click """ From eca80d5bee1533594830657db535023dbb506d72 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 19 Aug 2024 17:09:24 -0700 Subject: [PATCH 06/17] ls workfing Signed-off-by: Yee Hing Tong --- flytekit/clis/sdk_in_container/package.py | 16 +++++- flytekit/clis/sdk_in_container/register.py | 10 ++++ flytekit/clis/sdk_in_container/run.py | 14 ++++- flytekit/tools/fast_registration.py | 61 +++++++++++++--------- flytekit/tools/repo.py | 15 +++--- 5 files changed, 79 insertions(+), 37 deletions(-) diff --git a/flytekit/clis/sdk_in_container/package.py b/flytekit/clis/sdk_in_container/package.py index 38b216312d..932079eaaa 100644 --- a/flytekit/clis/sdk_in_container/package.py +++ b/flytekit/clis/sdk_in_container/package.py @@ -13,7 +13,7 @@ SerializationSettings, ) from flytekit.interaction.click_types import key_value_callback -from flytekit.tools.fast_registration import CopyFileDetection +from flytekit.tools.fast_registration import CopyFileDetection, FastPackageOptions from flytekit.tools.repo import NoSerializableEntitiesError, serialize_and_package @@ -66,6 +66,14 @@ help="[Beta] Specify how and whether to use fast register" " 'all' will behave as the current fast flag copying all files, 'auto' copies only loaded Python modules", ) +@click.option( + "--ls-files", + required=False, + is_flag=True, + default=False, + show_default=True, + help="List the files copied into the image (valid only for new --copy switch)", +) @click.option( "-f", "--force", @@ -114,6 +122,7 @@ def package( output, force, copy: typing.Optional[CopyFileDetection], + ls_files: bool, fast, in_container_source_path, python_interpreter, @@ -156,6 +165,9 @@ def package( display_help_with_error(ctx, "No packages to scan for flyte entities. Aborting!") try: - serialize_and_package(pkgs, serialization_settings, source, output, fast, deref_symlinks, copy_style=copy) + fast_options = FastPackageOptions([], copy_style=copy, ls_files=ls_files) + serialize_and_package( + pkgs, serialization_settings, source, output, fast, deref_symlinks, fast_options=fast_options + ) except NoSerializableEntitiesError: click.secho(f"No flyte objects found in packages {pkgs}", fg="yellow") diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 12ea050940..b6e87e90d1 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -109,6 +109,14 @@ help="[Beta] Specify how and whether to use fast register" " 'all' is the current behavior copying all files from root, 'auto' copies only loaded Python modules", ) +@click.option( + "--ls-files", + required=False, + is_flag=True, + default=False, + show_default=True, + help="List the files copied into the image (valid only for new --copy switch)", +) @click.option( "--dry-run", default=False, @@ -154,6 +162,7 @@ def register( deref_symlinks: bool, non_fast: bool, copy: typing.Optional[CopyFileDetection], + ls_files: bool, package_or_module: typing.Tuple[str], dry_run: bool, activate_launchplans: bool, @@ -225,4 +234,5 @@ def register( dry_run=dry_run, activate_launchplans=activate_launchplans, skip_errors=skip_errors, + ls_files=ls_files, ) diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index c26491fb60..193b22880b 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -105,6 +105,16 @@ class RunLevelParams(PyFlyteParams): " 'all' will behave as the current copy-all flag, 'auto' copies only loaded Python modules", ) ) + ls_files: bool = make_click_option_field( + click.Option( + param_decls=["--ls-files", "ls_files"], + required=False, + is_flag=True, + default=False, + show_default=True, + help="List the files copied into the image (valid only for new --copy switch)", + ) + ) image_config: ImageConfig = make_click_option_field( click.Option( param_decls=["-i", "--image", "image_config"], @@ -615,7 +625,9 @@ def _run(*args, **kwargs): image_config = patch_image_config(config_file, image_config) with context_manager.FlyteContextManager.with_context(remote.context.new_builder()): - fast_package_options = FastPackageOptions([], copy_style=run_level_params.copy) + fast_package_options = FastPackageOptions( + [], copy_style=run_level_params.copy, ls_files=run_level_params.ls_files + ) remote_entity = remote.register_script( entity, project=run_level_params.project, diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index d9de907d62..c945040cba 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -3,6 +3,7 @@ import gzip import hashlib import os +import pathlib import posixpath import subprocess import sys @@ -14,6 +15,8 @@ from typing import Optional import click +from rich import print as rich_print +from rich.tree import Tree from flytekit.core.context_manager import FlyteContextManager from flytekit.core.utils import timeit @@ -45,28 +48,33 @@ class FastPackageOptions: ignores: list[Ignore] keep_default_ignores: bool = True copy_style: Optional[CopyFileDetection] = None + ls_files: bool = False -""" -clarify tar behavior -- tar doesn't seem to add the folder, just the files, but extracts okay -- this doesn't work for empty folders (but edge case because gitignore ignores them anyways?) -changes to tar -- will now add files one at a time. -- the list will compute the list and the digest -- could have used tar list but seems less powerful, also does more than we want. -- in order to support auto, we have to load all the modules first, then trigger copying, then serialize post-upload -- hook up the actual commands args -- make each create a separate tar -- have a separate path for old and new commands -- finish deprecating serialize function -- update comment -- move compute of old digest and fname construction -- update click help -- add manual option checking - -- move prints to debug, add click -""" +def print_ls_tree(source: os.PathLike, ls: typing.List[str]): + click.secho("Files to be copied for fast registration...", fg="bright_blue") + fff = [] + + tree_root = Tree( + f":open_file_folder: [link file://{source}]{source} (detected source root)", + guide_style="bold bright_blue", + ) + trees = {pathlib.Path(source): tree_root} + + for f in ls: + rpath = os.path.relpath(f, start=source) + fff.append(rpath) + fpp = pathlib.Path(f) + if fpp.parent not in trees: + # add trees for all intermediate folders + current = tree_root + current_path = pathlib.Path(source) + for subdir in fpp.parent.relative_to(source).parts: + current = current.add(f"{subdir}", guide_style="bold bright_blue") + current_path = current_path / subdir + trees[current_path] = current + trees[fpp.parent].add(f"{fpp.name}", guide_style="bold bright_blue") + rich_print(tree_root) def fast_package( @@ -94,6 +102,7 @@ def fast_package( ignores = default_ignores ignore = IgnoreGroup(source, ignores) + # Remove this after original tar command is removed. digest = compute_digest(source, ignore.is_ignored) # This function is temporarily split into two, to support the creation of the tar file in both the old way, @@ -109,8 +118,12 @@ def fast_package( # This triggers listing of all files, mimicking the old way of creating the tar file. ls, ls_digest = ls_files(str(source), [], deref_symlinks, ignore) - print(f"Digest check: old {digest} ==? new {ls_digest} -- {digest == ls_digest}") + logger.debug(f"Hash digest: {ls_digest}", fg="green") + + if options.ls_files: + print_ls_tree(source, ls) + # print(f"Digest check: old {digest} ==? new {ls_digest} -- {digest == ls_digest}") # Compute where the archive should be written archive_fname = f"{FAST_PREFIX}{ls_digest}{FAST_FILEENDING}" if output_dir is None: @@ -120,7 +133,6 @@ def fast_package( with tempfile.TemporaryDirectory() as tmp_dir: tar_path = os.path.join(tmp_dir, "tmp.tar") - print(f"test tmp dir: {tar_path=}") with tarfile.open(tar_path, "w", dereference=True) as tar: for ws_file in ls: rel_path = os.path.relpath(ws_file, start=source) @@ -129,8 +141,6 @@ def fast_package( arcname=rel_path, filter=lambda x: tar_strip_file_attributes(x), ) - print("New tar file contents: ======================") - tar.list(verbose=True) with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped: with open(tar_path, "rb") as tar_file: @@ -155,8 +165,7 @@ def fast_package( arcname=ws_file, filter=lambda x: ignore.tar_filter(tar_strip_file_attributes(x)), ) - print("Old tar file contents: ======================") - tar.list(verbose=True) + # tar.list(verbose=True) with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped: with open(tar_path, "rb") as tar_file: diff --git a/flytekit/tools/repo.py b/flytekit/tools/repo.py index 8d4d70704c..ca212841cb 100644 --- a/flytekit/tools/repo.py +++ b/flytekit/tools/repo.py @@ -90,7 +90,7 @@ def package( output: str = "./flyte-package.tgz", fast: bool = False, deref_symlinks: bool = False, - copy_style: typing.Optional[fast_registration.CopyFileDetection] = None, + fast_options: typing.Optional[fast_registration.FastPackageOptions] = None, ): """ Package the given entities and the source code (if fast is enabled) into a package with the given name in output @@ -99,7 +99,7 @@ def package( :param output: output package name with suffix :param fast: fast enabled implies source code is bundled :param deref_symlinks: if enabled then symlinks are dereferenced during packaging - :param copy_style: + :param fast_options: Temporarily, for fast register, specify both the fast arg as well as copy_style fast == True with copy_style == None means use the old fast register tar'ring method. @@ -117,9 +117,7 @@ def package( if os.path.abspath(output).startswith(os.path.abspath(source)) and os.path.exists(output): click.secho(f"{output} already exists within {source}, deleting and re-creating it", fg="yellow") os.remove(output) - archive_fname = fast_registration.fast_package( - source, output_tmpdir, deref_symlinks, fast_registration.FastPackageOptions([], copy_style=copy_style) - ) + archive_fname = fast_registration.fast_package(source, output_tmpdir, deref_symlinks, options=fast_options) click.secho(f"Fast mode enabled: compressed archive {archive_fname}", dim=True) with tarfile.open(output, "w:gz") as tar: @@ -138,7 +136,7 @@ def serialize_and_package( fast: bool = False, deref_symlinks: bool = False, options: typing.Optional[Options] = None, - copy_style: typing.Optional[fast_registration.CopyFileDetection] = None, + fast_options: typing.Optional[fast_registration.FastPackageOptions] = None, ): """ Fist serialize and then package all entities @@ -147,7 +145,7 @@ def serialize_and_package( """ serialize_load_only(pkgs, settings, source) serializable_entities = serialize_get_control_plane_entities(settings, source, options=options) - package(serializable_entities, source, output, fast, deref_symlinks, copy_style) + package(serializable_entities, source, output, fast, deref_symlinks, fast_options) def find_common_root( @@ -257,6 +255,7 @@ def register( dry_run: bool = False, activate_launchplans: bool = False, skip_errors: bool = False, + ls_files: bool = False, ): """ Temporarily, for fast register, specify both the fast arg as well as copy_style. @@ -301,7 +300,7 @@ def register( detected_root, deref_symlinks, output, - options=fast_registration.FastPackageOptions([], copy_style=copy_style), + options=fast_registration.FastPackageOptions([], copy_style=copy_style, ls_files=ls_files), ) # update serialization settings from fast register output fast_serialization_settings = FastSerializationSettings( From aa1b562367e1a0215793d05e28307dd973ce7290 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 19 Aug 2024 17:24:02 -0700 Subject: [PATCH 07/17] rename function Signed-off-by: Yee Hing Tong --- flytekit/tools/repo.py | 28 ++++++-------------------- tests/flytekit/unit/tools/test_repo.py | 4 ++-- 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/flytekit/tools/repo.py b/flytekit/tools/repo.py index ca212841cb..1acc9fba34 100644 --- a/flytekit/tools/repo.py +++ b/flytekit/tools/repo.py @@ -173,29 +173,18 @@ def find_common_root( return project_root -# to be renamed, get module names -def load_packages_and_modules( - ss: SerializationSettings, +def list_packages_and_modules( project_root: Path, pkgs_or_mods: typing.List[str], - options: typing.Optional[Options] = None, ) -> typing.List[str]: """ - The project root is added as the first entry to sys.path, and then all the specified packages and modules - given are loaded with all submodules. The reason for prepending the entry is to ensure that the name that - the various modules are loaded under are the fully-resolved name. - - For example, using flytesnacks cookbook, if you are in core/ and you call this function with - ``flyte_basics/hello_world.py control_flow/``, the ``hello_world`` module would be loaded - as ``core.flyte_basics.hello_world`` even though you're already in the core/ folder. + This is a helper function that returns the input list of python packages/modules as a dot delinated list + relative to the given project_root. - :param ss: :param project_root: :param pkgs_or_mods: - :param options: - :return: The common detected root path, the output of _find_project_root + :return: List of packages/modules, dot delineated. """ - ss.git_repo = _get_git_repo_url(project_root) pkgs_and_modules: typing.List[str] = [] for pm in pkgs_or_mods: p = Path(pm).resolve() @@ -211,10 +200,6 @@ def load_packages_and_modules( return pkgs_and_modules - # registrable_entities = serialize(pkgs_and_modules, ss, str(project_root), options) - # - # return registrable_entities - def secho(i: Identifier, state: str = "success", reason: str = None, op: str = "Registration"): state_ind = "[ ]" @@ -286,9 +271,8 @@ def register( # Load all the entities FlyteContextManager.push_context(remote.context) - pkgs_and_modules = load_packages_and_modules( # to be renamed - serialization_settings, detected_root, list(package_or_module), options - ) + serialization_settings.git_repo = _get_git_repo_url(str(detected_root)) + pkgs_and_modules = list_packages_and_modules(detected_root, list(package_or_module)) # NB: The change here is that the loading of user code _cannot_ depend on fast register information (the computed # version, upload native url, hash digest, etc.). diff --git a/tests/flytekit/unit/tools/test_repo.py b/tests/flytekit/unit/tools/test_repo.py index 8bb6bd773a..eefcaeb3be 100644 --- a/tests/flytekit/unit/tools/test_repo.py +++ b/tests/flytekit/unit/tools/test_repo.py @@ -7,7 +7,7 @@ import flytekit.configuration from flytekit.configuration import DefaultImages, ImageConfig -from flytekit.tools.repo import find_common_root, load_packages_and_modules +from flytekit.tools.repo import find_common_root, list_packages_and_modules task_text = """ from flytekit import task @@ -66,5 +66,5 @@ def test_module_loading(mock_entities, mock_entities_2): image_config=ImageConfig.auto(img_name=DefaultImages.default_image()), ) - x = load_packages_and_modules(serialization_settings, pathlib.Path(root), [bottom_level]) + x = list_packages_and_modules(pathlib.Path(root), [bottom_level]) assert len(x) == 1 From cde0ad77fd3f845b0ba8a408d74f9685d55d3c2f Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 20 Aug 2024 09:21:20 -0700 Subject: [PATCH 08/17] delete duplicate helpers Signed-off-by: Yee Hing Tong --- flytekit/clis/sdk_in_container/helpers.py | 1 + flytekit/tools/fast_registration.py | 16 +--------------- tests/flytekit/unit/cli/test_cli_helpers.py | 9 +++++++++ 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/flytekit/clis/sdk_in_container/helpers.py b/flytekit/clis/sdk_in_container/helpers.py index 766af2cfec..526ad0c341 100644 --- a/flytekit/clis/sdk_in_container/helpers.py +++ b/flytekit/clis/sdk_in_container/helpers.py @@ -65,6 +65,7 @@ def patch_image_config(config_file: Optional[str], image_config: ImageConfig) -> def parse_copy(ctx, param, value) -> Optional[CopyFileDetection]: + """Helper function to parse cmd line args into enum""" if value == "auto": copy_style = CopyFileDetection.LOADED_MODULES elif value == "all": diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index c945040cba..f23af77432 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -23,7 +23,7 @@ from flytekit.exceptions.user import FlyteDataNotFoundException from flytekit.loggers import logger from flytekit.tools.ignore import DockerIgnore, FlyteIgnore, GitIgnore, Ignore, IgnoreGroup, StandardIgnore -from flytekit.tools.script_mode import ls_files, tar_strip_file_attributes +from flytekit.tools.script_mode import _filehash_update, _pathhash_update, ls_files, tar_strip_file_attributes FAST_PREFIX = "fast" FAST_FILEENDING = ".tar.gz" @@ -202,20 +202,6 @@ def compute_digest(source: os.PathLike, filter: Optional[callable] = None) -> st return hasher.hexdigest() -def _filehash_update(path: os.PathLike, hasher: hashlib._Hash) -> None: - blocksize = 65536 - with open(path, "rb") as f: - bytes = f.read(blocksize) - while bytes: - hasher.update(bytes) - bytes = f.read(blocksize) - - -def _pathhash_update(path: os.PathLike, hasher: hashlib._Hash) -> None: - path_list = path.split(os.sep) - hasher.update("".join(path_list).encode("utf-8")) - - def get_additional_distribution_loc(remote_location: str, identifier: str) -> str: """ :param Text remote_location: diff --git a/tests/flytekit/unit/cli/test_cli_helpers.py b/tests/flytekit/unit/cli/test_cli_helpers.py index 455979943c..af0c63a312 100644 --- a/tests/flytekit/unit/cli/test_cli_helpers.py +++ b/tests/flytekit/unit/cli/test_cli_helpers.py @@ -1,3 +1,4 @@ +import mock import flyteidl.admin.launch_plan_pb2 as _launch_plan_pb2 import flyteidl.admin.task_pb2 as _task_pb2 import flyteidl.admin.workflow_pb2 as _workflow_pb2 @@ -8,6 +9,8 @@ from flytekit.clis import helpers from flytekit.clis.helpers import _hydrate_identifier, _hydrate_workflow_template_nodes, hydrate_registration_parameters +from flytekit.clis.sdk_in_container.helpers import parse_copy +from flytekit.tools.fast_registration import CopyFileDetection def test_parse_args_into_dict(): @@ -426,3 +429,9 @@ def test_hydrate_registration_parameters__subworkflows(): name="subworkflow", version="12345", ) + + +def test_parse_copy(): + click_current_ctx = mock.MagicMock + assert parse_copy(click_current_ctx, None, "auto") == CopyFileDetection.LOADED_MODULES + assert parse_copy(click_current_ctx, None, "all") == CopyFileDetection.ALL From f4a5037ed5eaac9718f2befc30cf2b6724ae2ad8 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 20 Aug 2024 10:49:36 -0700 Subject: [PATCH 09/17] add a test for ls files Signed-off-by: Yee Hing Tong --- .../unit/cli/pyflyte/test_script_mode.py | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 tests/flytekit/unit/cli/pyflyte/test_script_mode.py diff --git a/tests/flytekit/unit/cli/pyflyte/test_script_mode.py b/tests/flytekit/unit/cli/pyflyte/test_script_mode.py new file mode 100644 index 0000000000..215432fa5e --- /dev/null +++ b/tests/flytekit/unit/cli/pyflyte/test_script_mode.py @@ -0,0 +1,40 @@ +import pathlib +import pytest +import tempfile + +from flytekit.tools.script_mode import ls_files + + +# a pytest fixture that creates a tmp directory and creates +# a small file structure in it +@pytest.fixture +def dummy_dir_structure(): + # Create a temporary directory + with tempfile.TemporaryDirectory() as tmp_path: + + # Create directories + tmp_path = pathlib.Path(tmp_path) + subdir1 = tmp_path / "subdir1" + subdir2 = tmp_path / "subdir2" + subdir1.mkdir() + subdir2.mkdir() + + # Create files in the root of the temporary directory + (tmp_path / "file1.txt").write_text("This is file 1") + (tmp_path / "file2.txt").write_text("This is file 2") + + # Create files in subdir1 + (subdir1 / "file3.txt").write_text("This is file 3 in subdir1") + (subdir1 / "file4.txt").write_text("This is file 4 in subdir1") + + # Create files in subdir2 + (subdir2 / "file5.txt").write_text("This is file 5 in subdir2") + + # Return the path to the temporary directory + yield tmp_path + + +def test_list_dir(dummy_dir_structure): + files, d = ls_files(str(dummy_dir_structure), []) + assert len(files) == 5 + assert d == "c092f1b85f7c6b2a71881a946c00a855" From bfb70f1fe4c06a6a3892db36c25292c65f067edd Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 20 Aug 2024 10:51:43 -0700 Subject: [PATCH 10/17] add a test Signed-off-by: Yee Hing Tong --- tests/flytekit/unit/cli/pyflyte/test_script_mode.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/flytekit/unit/cli/pyflyte/test_script_mode.py b/tests/flytekit/unit/cli/pyflyte/test_script_mode.py index 215432fa5e..70cfb9b13c 100644 --- a/tests/flytekit/unit/cli/pyflyte/test_script_mode.py +++ b/tests/flytekit/unit/cli/pyflyte/test_script_mode.py @@ -38,3 +38,11 @@ def test_list_dir(dummy_dir_structure): files, d = ls_files(str(dummy_dir_structure), []) assert len(files) == 5 assert d == "c092f1b85f7c6b2a71881a946c00a855" + + +def test_list_filtered_on_modules(dummy_dir_structure): + import sys # any module will do + files, d = ls_files(str(dummy_dir_structure), [sys]) + # because none of the files are python modules, nothing should be returned + assert len(files) == 0 + assert d == "d41d8cd98f00b204e9800998ecf8427e" From 438d67f2043761c0980aca1417bcca33fff31303 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 20 Aug 2024 11:22:42 -0700 Subject: [PATCH 11/17] exclude nt from digest check Signed-off-by: Yee Hing Tong --- tests/flytekit/unit/cli/pyflyte/test_script_mode.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/flytekit/unit/cli/pyflyte/test_script_mode.py b/tests/flytekit/unit/cli/pyflyte/test_script_mode.py index 70cfb9b13c..dcccda0cd2 100644 --- a/tests/flytekit/unit/cli/pyflyte/test_script_mode.py +++ b/tests/flytekit/unit/cli/pyflyte/test_script_mode.py @@ -1,3 +1,4 @@ +import os import pathlib import pytest import tempfile @@ -37,7 +38,8 @@ def dummy_dir_structure(): def test_list_dir(dummy_dir_structure): files, d = ls_files(str(dummy_dir_structure), []) assert len(files) == 5 - assert d == "c092f1b85f7c6b2a71881a946c00a855" + if os.name != "nt": + assert d == "c092f1b85f7c6b2a71881a946c00a855" def test_list_filtered_on_modules(dummy_dir_structure): @@ -45,4 +47,5 @@ def test_list_filtered_on_modules(dummy_dir_structure): files, d = ls_files(str(dummy_dir_structure), [sys]) # because none of the files are python modules, nothing should be returned assert len(files) == 0 - assert d == "d41d8cd98f00b204e9800998ecf8427e" + if os.name != "nt": + assert d == "d41d8cd98f00b204e9800998ecf8427e" From e6acc78d7aae30e152d067cad3a4da58e760fd63 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 20 Aug 2024 13:07:16 -0700 Subject: [PATCH 12/17] prints Signed-off-by: Yee Hing Tong --- flytekit/tools/ignore.py | 4 ---- flytekit/tools/script_mode.py | 1 - 2 files changed, 5 deletions(-) diff --git a/flytekit/tools/ignore.py b/flytekit/tools/ignore.py index 2761b79364..e41daf0904 100644 --- a/flytekit/tools/ignore.py +++ b/flytekit/tools/ignore.py @@ -54,19 +54,15 @@ def _list_ignored(self) -> Dict: return {} def _is_ignored(self, path: str) -> bool: - # print(f"\nTesting {path=}") if self.ignored: # git-ls-files uses POSIX paths if Path(path).as_posix() in self.ignored: - # print(f"Path {path} is ignored") return True # Ignore empty directories if os.path.isdir(os.path.join(self.root, path)) and all( [self.is_ignored(os.path.join(path, f)) for f in os.listdir(os.path.join(self.root, path))] ): - # print(f"Path {path} is ignored") return True - # print(f"Path {path} is NOT ignored") return False diff --git a/flytekit/tools/script_mode.py b/flytekit/tools/script_mode.py index b616b7ffe2..adbcd313f4 100644 --- a/flytekit/tools/script_mode.py +++ b/flytekit/tools/script_mode.py @@ -248,7 +248,6 @@ def add_imported_modules_from_source(source_path: str, destination: str, modules # identify a common root amongst the packages listed? files = list_imported_modules_as_files(source_path, modules) - print("files", files) for file in files: relative_path = os.path.relpath(file, start=source_path) new_destination = os.path.join(destination, relative_path) From 63c8943203e2a8591f95bb4fdafcfa33764888e6 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 21 Aug 2024 15:09:45 -0700 Subject: [PATCH 13/17] pr comments, first round Signed-off-by: Yee Hing Tong --- flytekit/clis/sdk_in_container/helpers.py | 2 +- flytekit/clis/sdk_in_container/package.py | 20 +++++++------------- flytekit/clis/sdk_in_container/register.py | 12 ++---------- flytekit/clis/sdk_in_container/run.py | 15 ++++----------- flytekit/tools/fast_registration.py | 14 ++++++-------- 5 files changed, 20 insertions(+), 43 deletions(-) diff --git a/flytekit/clis/sdk_in_container/helpers.py b/flytekit/clis/sdk_in_container/helpers.py index 526ad0c341..6ed5072c36 100644 --- a/flytekit/clis/sdk_in_container/helpers.py +++ b/flytekit/clis/sdk_in_container/helpers.py @@ -71,7 +71,7 @@ def parse_copy(ctx, param, value) -> Optional[CopyFileDetection]: elif value == "all": copy_style = CopyFileDetection.ALL elif value == "none": - copy_style = CopyFileDetection.TEMP_NO_COPY + copy_style = CopyFileDetection.NO_COPY else: copy_style = None diff --git a/flytekit/clis/sdk_in_container/package.py b/flytekit/clis/sdk_in_container/package.py index 932079eaaa..98c57c0b5a 100644 --- a/flytekit/clis/sdk_in_container/package.py +++ b/flytekit/clis/sdk_in_container/package.py @@ -63,16 +63,8 @@ type=click.Choice(["all", "auto", "none"], case_sensitive=False), default=None, # this will be changed to "none" after removing fast option callback=parse_copy, - help="[Beta] Specify how and whether to use fast register" - " 'all' will behave as the current fast flag copying all files, 'auto' copies only loaded Python modules", -) -@click.option( - "--ls-files", - required=False, - is_flag=True, - default=False, - show_default=True, - help="List the files copied into the image (valid only for new --copy switch)", + help="[Beta] Specify whether local files should be copied and uploaded so task containers have up-to-date code" + " 'all' will behave as the current 'fast' flag, copying all files, 'auto' copies only loaded Python modules", ) @click.option( "-f", @@ -122,7 +114,6 @@ def package( output, force, copy: typing.Optional[CopyFileDetection], - ls_files: bool, fast, in_container_source_path, python_interpreter, @@ -136,8 +127,8 @@ def package( object contains the WorkflowTemplate, along with the relevant tasks for that workflow. This serialization step will set the name of the tasks to the fully qualified name of the task function. """ - if copy == CopyFileDetection.TEMP_NO_COPY: - raise ValueError("--copy none doesn't need to be specified, package by default does not copy files") + if copy is not None and fast: + raise ValueError("--fast and --copy cannot be used together. Please use --copy instead.") elif copy == CopyFileDetection.ALL or copy == CopyFileDetection.LOADED_MODULES: # for those migrating, who only set --copy all/auto but don't have --fast set. fast = True @@ -165,6 +156,9 @@ def package( display_help_with_error(ctx, "No packages to scan for flyte entities. Aborting!") try: + # verbosity greater than 0 means to print the files + ls_files = ctx.obj[constants.CTX_VERBOSE] > 0 + fast_options = FastPackageOptions([], copy_style=copy, ls_files=ls_files) serialize_and_package( pkgs, serialization_settings, source, output, fast, deref_symlinks, fast_options=fast_options diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index b6e87e90d1..1def589f48 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -109,14 +109,6 @@ help="[Beta] Specify how and whether to use fast register" " 'all' is the current behavior copying all files from root, 'auto' copies only loaded Python modules", ) -@click.option( - "--ls-files", - required=False, - is_flag=True, - default=False, - show_default=True, - help="List the files copied into the image (valid only for new --copy switch)", -) @click.option( "--dry-run", default=False, @@ -162,7 +154,6 @@ def register( deref_symlinks: bool, non_fast: bool, copy: typing.Optional[CopyFileDetection], - ls_files: bool, package_or_module: typing.Tuple[str], dry_run: bool, activate_launchplans: bool, @@ -174,7 +165,7 @@ def register( """ # Error handling for non-fast/copy conflicts - if copy == CopyFileDetection.TEMP_NO_COPY: + if copy == CopyFileDetection.NO_COPY: non_fast = True # Set this to None because downstream logic currently detects None to mean old logic. copy = None @@ -184,6 +175,7 @@ def register( elif copy == CopyFileDetection.LOADED_MODULES: if non_fast: raise ValueError("Conflicting options: cannot specify both --non-fast and --copy auto") + ls_files = ctx.obj[constants.CTX_VERBOSE] > 0 pkgs = ctx.obj[constants.CTX_PACKAGES] if not pkgs: diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index 193b22880b..fa3eed896e 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -105,16 +105,6 @@ class RunLevelParams(PyFlyteParams): " 'all' will behave as the current copy-all flag, 'auto' copies only loaded Python modules", ) ) - ls_files: bool = make_click_option_field( - click.Option( - param_decls=["--ls-files", "ls_files"], - required=False, - is_flag=True, - default=False, - show_default=True, - help="List the files copied into the image (valid only for new --copy switch)", - ) - ) image_config: ImageConfig = make_click_option_field( click.Option( param_decls=["-i", "--image", "image_config"], @@ -625,8 +615,11 @@ def _run(*args, **kwargs): image_config = patch_image_config(config_file, image_config) with context_manager.FlyteContextManager.with_context(remote.context.new_builder()): + ls_files = run_level_params.verbose > 0 fast_package_options = FastPackageOptions( - [], copy_style=run_level_params.copy, ls_files=run_level_params.ls_files + [], + copy_style=run_level_params.copy, + ls_files=ls_files, ) remote_entity = remote.register_script( entity, diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index f23af77432..e4b5bb81dc 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -32,11 +32,12 @@ class CopyFileDetection(Enum): LOADED_MODULES = 1 ALL = 2 - # This is a temporary option to be removed in the future. In the future this value of the enum should simply - # be Python None. Here now to distinguish between users explicitly setting --copy none and not setting the flag. - # This is only used for register, not for package or run because run doesn't have a no-fast-register option and - # package is by default non-fast. - TEMP_NO_COPY = 3 + # This option's meaning will change in the future. In the future this will mean that no files should be copied + # (i.e. no fast registration is used). For now, both this value and setting this Enum to Python None are both + # valid to distinguish between users explicitly setting --copy none and not setting the flag. + # Currently, this is only used for register, not for package or run because run doesn't have a no-fast-register + # option and package is by default non-fast. + NO_COPY = 3 @dataclass(frozen=True) @@ -53,7 +54,6 @@ class FastPackageOptions: def print_ls_tree(source: os.PathLike, ls: typing.List[str]): click.secho("Files to be copied for fast registration...", fg="bright_blue") - fff = [] tree_root = Tree( f":open_file_folder: [link file://{source}]{source} (detected source root)", @@ -62,8 +62,6 @@ def print_ls_tree(source: os.PathLike, ls: typing.List[str]): trees = {pathlib.Path(source): tree_root} for f in ls: - rpath = os.path.relpath(f, start=source) - fff.append(rpath) fpp = pathlib.Path(f) if fpp.parent not in trees: # add trees for all intermediate folders From 303bbd55be152cc38b981fb734540b736a171e69 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 21 Aug 2024 15:12:03 -0700 Subject: [PATCH 14/17] add another warning Signed-off-by: Yee Hing Tong --- flytekit/clis/sdk_in_container/package.py | 2 +- flytekit/clis/sdk_in_container/register.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/flytekit/clis/sdk_in_container/package.py b/flytekit/clis/sdk_in_container/package.py index 98c57c0b5a..c087d38634 100644 --- a/flytekit/clis/sdk_in_container/package.py +++ b/flytekit/clis/sdk_in_container/package.py @@ -128,7 +128,7 @@ def package( This serialization step will set the name of the tasks to the fully qualified name of the task function. """ if copy is not None and fast: - raise ValueError("--fast and --copy cannot be used together. Please use --copy instead.") + raise ValueError("--fast and --copy cannot be used together. Please use --copy all instead.") elif copy == CopyFileDetection.ALL or copy == CopyFileDetection.LOADED_MODULES: # for those migrating, who only set --copy all/auto but don't have --fast set. fast = True diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 1def589f48..fc935171b2 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -163,6 +163,8 @@ def register( """ see help """ + if copy is not None and non_fast: + raise ValueError("--non-fast and --copy cannot be used together. Use --copy none instead.") # Error handling for non-fast/copy conflicts if copy == CopyFileDetection.NO_COPY: From c22119489a09ca55b3407eb1d8a547e3a1af23f3 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 26 Aug 2024 17:20:17 -0700 Subject: [PATCH 15/17] add help Signed-off-by: Yee Hing Tong --- flytekit/clis/sdk_in_container/register.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index fc935171b2..8b3b4e868c 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -107,7 +107,8 @@ default=None, # this will be changed to "all" after removing non-fast option callback=parse_copy, help="[Beta] Specify how and whether to use fast register" - " 'all' is the current behavior copying all files from root, 'auto' copies only loaded Python modules", + " 'all' is the current behavior copying all files from root, 'auto' copies only loaded Python modules" + " 'none' means no files are copied, i.e. don't use fast register", ) @click.option( "--dry-run", From a91f9a1abdf340e8993ed1a566c6fc919e2af8c2 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 30 Aug 2024 14:50:44 -0700 Subject: [PATCH 16/17] remove duplicate error checks, print Signed-off-by: Yee Hing Tong --- flytekit/clis/sdk_in_container/register.py | 8 +------- flytekit/tools/fast_registration.py | 1 - 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 8b3b4e868c..810c7a59e0 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -167,17 +167,11 @@ def register( if copy is not None and non_fast: raise ValueError("--non-fast and --copy cannot be used together. Use --copy none instead.") - # Error handling for non-fast/copy conflicts + # Handle the new case where the copy flag is used instead of non-fast if copy == CopyFileDetection.NO_COPY: non_fast = True # Set this to None because downstream logic currently detects None to mean old logic. copy = None - elif copy == CopyFileDetection.ALL: - if non_fast: - raise ValueError("Conflicting options: cannot specify both --non-fast and --copy all") - elif copy == CopyFileDetection.LOADED_MODULES: - if non_fast: - raise ValueError("Conflicting options: cannot specify both --non-fast and --copy auto") ls_files = ctx.obj[constants.CTX_VERBOSE] > 0 pkgs = ctx.obj[constants.CTX_PACKAGES] diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index e4b5bb81dc..a994563117 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -121,7 +121,6 @@ def fast_package( if options.ls_files: print_ls_tree(source, ls) - # print(f"Digest check: old {digest} ==? new {ls_digest} -- {digest == ls_digest}") # Compute where the archive should be written archive_fname = f"{FAST_PREFIX}{ls_digest}{FAST_FILEENDING}" if output_dir is None: From 750e4fbdc2035c4bf12365473f04292655fdf04b Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 30 Aug 2024 14:59:22 -0700 Subject: [PATCH 17/17] rename variable from ls_files to show_files but keeping function name Signed-off-by: Yee Hing Tong --- flytekit/clis/sdk_in_container/package.py | 4 ++-- flytekit/clis/sdk_in_container/register.py | 4 ++-- flytekit/clis/sdk_in_container/run.py | 4 ++-- flytekit/tools/fast_registration.py | 4 ++-- flytekit/tools/repo.py | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/flytekit/clis/sdk_in_container/package.py b/flytekit/clis/sdk_in_container/package.py index c087d38634..6decbc32e1 100644 --- a/flytekit/clis/sdk_in_container/package.py +++ b/flytekit/clis/sdk_in_container/package.py @@ -157,9 +157,9 @@ def package( try: # verbosity greater than 0 means to print the files - ls_files = ctx.obj[constants.CTX_VERBOSE] > 0 + show_files = ctx.obj[constants.CTX_VERBOSE] > 0 - fast_options = FastPackageOptions([], copy_style=copy, ls_files=ls_files) + fast_options = FastPackageOptions([], copy_style=copy, show_files=show_files) serialize_and_package( pkgs, serialization_settings, source, output, fast, deref_symlinks, fast_options=fast_options ) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 810c7a59e0..dfbbd23d00 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -172,7 +172,7 @@ def register( non_fast = True # Set this to None because downstream logic currently detects None to mean old logic. copy = None - ls_files = ctx.obj[constants.CTX_VERBOSE] > 0 + show_files = ctx.obj[constants.CTX_VERBOSE] > 0 pkgs = ctx.obj[constants.CTX_PACKAGES] if not pkgs: @@ -223,5 +223,5 @@ def register( dry_run=dry_run, activate_launchplans=activate_launchplans, skip_errors=skip_errors, - ls_files=ls_files, + show_files=show_files, ) diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index 1e6b9dbb4a..1ab04452ee 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -643,11 +643,11 @@ def _run(*args, **kwargs): image_config = patch_image_config(config_file, image_config) with context_manager.FlyteContextManager.with_context(remote.context.new_builder()): - ls_files = run_level_params.verbose > 0 + show_files = run_level_params.verbose > 0 fast_package_options = FastPackageOptions( [], copy_style=run_level_params.copy, - ls_files=ls_files, + show_files=show_files, ) remote_entity = remote.register_script( entity, diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index a994563117..a65d24a740 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -49,7 +49,7 @@ class FastPackageOptions: ignores: list[Ignore] keep_default_ignores: bool = True copy_style: Optional[CopyFileDetection] = None - ls_files: bool = False + show_files: bool = False def print_ls_tree(source: os.PathLike, ls: typing.List[str]): @@ -118,7 +118,7 @@ def fast_package( logger.debug(f"Hash digest: {ls_digest}", fg="green") - if options.ls_files: + if options.show_files: print_ls_tree(source, ls) # Compute where the archive should be written diff --git a/flytekit/tools/repo.py b/flytekit/tools/repo.py index 1acc9fba34..6160823920 100644 --- a/flytekit/tools/repo.py +++ b/flytekit/tools/repo.py @@ -240,7 +240,7 @@ def register( dry_run: bool = False, activate_launchplans: bool = False, skip_errors: bool = False, - ls_files: bool = False, + show_files: bool = False, ): """ Temporarily, for fast register, specify both the fast arg as well as copy_style. @@ -284,7 +284,7 @@ def register( detected_root, deref_symlinks, output, - options=fast_registration.FastPackageOptions([], copy_style=copy_style, ls_files=ls_files), + options=fast_registration.FastPackageOptions([], copy_style=copy_style, show_files=show_files), ) # update serialization settings from fast register output fast_serialization_settings = FastSerializationSettings(