Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Flytefile and Flytedirectory's copilot local execution work correctly #2887

26 changes: 24 additions & 2 deletions flytekit/core/container_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,44 @@ def _extract_command_key(self, cmd: str, **kwargs) -> Any:
return match.group(1)
return None

def _extract_path_command_key(self, cmd: str, input_data_dir: Optional[str]) -> Optional[str]:
"""
Extract the key from the path-like command using regex.
"""
import re

input_data_dir = input_data_dir or ""
input_regex = rf"{re.escape(input_data_dir)}/(.+)$"
match = re.match(input_regex, cmd)
if match:
return match.group(1)
return None

def _render_command_and_volume_binding(self, cmd: str, **kwargs) -> Tuple[str, Dict[str, Dict[str, str]]]:
"""
We support template-style references to inputs, e.g., "{{.inputs.infile}}".

For FlyteFile and FlyteDirectory commands, e.g., "/var/inputs/inputs", we extract the key from strings that begin with the specified `input_data_dir`.
"""
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile

command = ""
volume_binding = {}
k = self._extract_command_key(cmd)
path_k = self._extract_path_command_key(cmd, self._input_data_dir)
k = path_k if path_k else self._extract_command_key(cmd)

if k:
input_val = kwargs.get(k)
if type(input_val) in [FlyteFile, FlyteDirectory]:
if not path_k:
raise AssertionError(
"FlyteFile and FlyteDirectory commands should not use the template syntax like this: {{.inputs.infile}}\n"
"Please use a path-like syntax, such as: /var/inputs/infile.\n"
"This requirement is due to how Flyte Propeller processes template syntax inputs."
)
local_flyte_file_or_dir_path = str(input_val)
remote_flyte_file_or_dir_path = os.path.join(self._input_data_dir, k.replace(".", "/")) # type: ignore
remote_flyte_file_or_dir_path = os.path.join(self._input_data_dir, k) # type: ignore
volume_binding[local_flyte_file_or_dir_path] = {
"bind": remote_flyte_file_or_dir_path,
"mode": "rw",
Expand Down
104 changes: 102 additions & 2 deletions tests/flytekit/unit/core/test_local_raw_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_flytefile_wf():
command=[
"python",
"write_flytefile.py",
"{{.inputs.inputs}}",
"/var/inputs/inputs",
"/var/outputs/out",
],
)
Expand All @@ -60,6 +60,54 @@ def flyte_file_io_wf() -> FlyteFile:
assert content == "This is flyte_file.txt file."


@pytest.mark.skipif(
sys.platform in ["darwin", "win32"],
reason="Skip if running on windows or macos due to CI Docker environment setup failure",
)
def test_flytefile_wrong_syntax():
client = docker.from_env()
path_to_dockerfile = "tests/flytekit/unit/core/"
dockerfile_name = "Dockerfile.raw_container"
client.images.build(path=path_to_dockerfile, dockerfile=dockerfile_name, tag="flytekit:rawcontainer")

flyte_file_io = ContainerTask(
name="flyte_file_io",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(inputs=FlyteFile),
outputs=kwtypes(out=FlyteFile),
image="flytekit:rawcontainer",
command=[
"python",
"write_flytefile.py",
"{{.inputs.inputs}}",
"/var/outputs/out",
],
)

@task
def flyte_file_task() -> FlyteFile:
working_dir = flytekit.current_context().working_directory
write_file = os.path.join(working_dir, "flyte_file.txt")
with open(write_file, "w") as file:
file.write("This is flyte_file.txt file.")
return FlyteFile(path=write_file)

@workflow
def flyte_file_io_wf() -> FlyteFile:
ff = flyte_file_task()
return flyte_file_io(inputs=ff)

with pytest.raises(
AssertionError,
match=(
r"FlyteFile and FlyteDirectory commands should not use the template syntax like this: \{\{\.inputs\.infile\}\}\n"
r"Please use a path-like syntax, such as: /var/inputs/infile.\n"
r"This requirement is due to how Flyte Propeller processes template syntax inputs."
)
):
flyte_file_io_wf()

@pytest.mark.skipif(
sys.platform in ["darwin", "win32"],
reason="Skip if running on windows or macos due to CI Docker environment setup failure",
Expand All @@ -80,7 +128,7 @@ def test_flytedir_wf():
command=[
"python",
"write_flytedir.py",
"{{.inputs.inputs}}",
"/var/inputs/inputs",
"/var/outputs/out",
],
)
Expand Down Expand Up @@ -110,6 +158,58 @@ def flyte_dir_io_wf() -> FlyteDirectory:
assert content == "This is for flyte dir."


@pytest.mark.skipif(
sys.platform in ["darwin", "win32"],
reason="Skip if running on windows or macos due to CI Docker environment setup failure",
)
def test_flytedir_wrong_syntax():
client = docker.from_env()
path_to_dockerfile = "tests/flytekit/unit/core/"
dockerfile_name = "Dockerfile.raw_container"
client.images.build(path=path_to_dockerfile, dockerfile=dockerfile_name, tag="flytekit:rawcontainer")

flyte_dir_io = ContainerTask(
name="flyte_dir_io",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(inputs=FlyteDirectory),
outputs=kwtypes(out=FlyteDirectory),
image="flytekit:rawcontainer",
command=[
"python",
"write_flytedir.py",
"{{.inputs.inputs}}",
"/var/outputs/out",
],
)

@task
def flyte_dir_task() -> FlyteDirectory:
working_dir = flytekit.current_context().working_directory
local_dir = Path(os.path.join(working_dir, "csv_files"))
local_dir.mkdir(exist_ok=True)
write_file = os.path.join(local_dir, "flyte_dir.txt")
with open(write_file, "w") as file:
file.write("This is for flyte dir.")

return FlyteDirectory(path=str(local_dir))

@workflow
def flyte_dir_io_wf() -> FlyteDirectory:
fd = flyte_dir_task()
return flyte_dir_io(inputs=fd)

with pytest.raises(
AssertionError,
match=(
r"FlyteFile and FlyteDirectory commands should not use the template syntax like this: \{\{\.inputs\.infile\}\}\n"
r"Please use a path-like syntax, such as: /var/inputs/infile.\n"
r"This requirement is due to how Flyte Propeller processes template syntax inputs."
)
):
flyte_dir_io_wf()


@pytest.mark.skipif(
sys.platform in ["darwin", "win32"],
reason="Skip if running on windows or macos due to CI Docker environment setup failure",
Expand Down
Loading