diff --git a/RELEASE.md b/RELEASE.md index 17d36cdb12..b6bfca49fe 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -20,6 +20,7 @@ * Deprecated `kedro.framework.context.load_context`, it will be removed in 0.18.0 release. * Relax `pyspark` requirements to allow for installation of `pyspark` 3.0. * Deprecated `kedro.framework.cli.get_project_context`, it will be removed in 0.18.0 release. +* Added a `--fs-args` option to the `kedro pipeline pull` command to specify configuration options for the fsspec filesystem arguments used when pulling modular pipelines from non-PyPI locations. ## Breaking changes to the API * `kedro.io.DataCatalog.exists()` returns `False` when the dataset does not exist, as opposed to raising an exception. diff --git a/docs/source/06_nodes_and_pipelines/03_modular_pipelines.md b/docs/source/06_nodes_and_pipelines/03_modular_pipelines.md index 3acff3fbf9..2ca188a9cd 100644 --- a/docs/source/06_nodes_and_pipelines/03_modular_pipelines.md +++ b/docs/source/06_nodes_and_pipelines/03_modular_pipelines.md @@ -160,6 +160,21 @@ kedro pipeline pull https://.s3..amazonaws.com/ ``` +If you are pulling the pipeline from a location that isn't PyPI, Kedro uses [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to locate and pull down your pipeline. If you need to provide any `fsspec`-specific arguments (say, if you're pulling your pipeline down from an S3 bucket and want to provide the S3 credentials inline or from a local server that requires tokens in the header) then you can use the `--fs-args` option to point to a YAML (or any `anyconfig`-supported configuration) file that contains the required configuration. + +```bash +kedro pipeline pull https:// --fs-args pipeline_pull_args.yml +``` + +where + +``` +# pipeline_pull_args.yml +client_kwargs: + headers: + Authorization: token +``` + ## A modular pipeline example template Here is an example of a modular pipeline which combines all of these concepts within a Kedro project: diff --git a/kedro/framework/cli/pipeline.py b/kedro/framework/cli/pipeline.py index e1e3f5913c..2f6eee5150 100644 --- a/kedro/framework/cli/pipeline.py +++ b/kedro/framework/cli/pipeline.py @@ -33,7 +33,7 @@ import tempfile from pathlib import Path from textwrap import indent -from typing import Any, List, NamedTuple, Tuple, Union +from typing import Any, List, NamedTuple, Optional, Tuple, Union from zipfile import ZipFile import click @@ -233,9 +233,17 @@ def describe_pipeline( callback=_check_pipeline_name, help="Alternative name to unpackage under.", ) +@click.option( + "--fs-args", + type=click.Path( + exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True + ), + default=None, + help="Location of a configuration file for the fsspec filesystem used to pull the package.", +) @click.pass_obj # this will pass the metadata as first argument def pull_package( - metadata: ProjectMetadata, package_path, env, alias, **kwargs + metadata: ProjectMetadata, package_path, env, alias, fs_args, **kwargs ): # pylint:disable=unused-argument """Pull a modular pipeline package, unpack it and install the files to corresponding locations. @@ -244,7 +252,7 @@ def pull_package( with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir).resolve() - _unpack_wheel(package_path, temp_dir_path) + _unpack_wheel(package_path, temp_dir_path, fs_args) dist_info_file = list(temp_dir_path.glob("*.dist-info")) if len(dist_info_file) != 1: @@ -260,6 +268,19 @@ def pull_package( _install_files(metadata, package_name, temp_dir_path, env, alias) +def _get_fsspec_filesystem(location: str, fs_args: Optional[str]): + # pylint: disable=import-outside-toplevel + import anyconfig + import fsspec + + from kedro.io.core import get_protocol_and_path + + protocol, _ = get_protocol_and_path(location) + fs_args_config = anyconfig.load(fs_args) if fs_args else {} + + return fsspec.filesystem(protocol, **fs_args_config) + + @pipeline.command("package") @env_option( help="Environment where the pipeline configuration lives. Defaults to `base`." @@ -313,14 +334,8 @@ def _echo_deletion_warning(message: str, **paths: List[Path]): click.echo(indent(paths_str, " " * 2)) -def _unpack_wheel(location: str, destination: Path) -> None: - # pylint: disable=import-outside-toplevel - import fsspec - - from kedro.io.core import get_protocol_and_path - - protocol, _ = get_protocol_and_path(location) - filesystem = fsspec.filesystem(protocol) +def _unpack_wheel(location: str, destination: Path, fs_args: Optional[str]) -> None: + filesystem = _get_fsspec_filesystem(location, fs_args) if location.endswith(".whl") and filesystem.exists(location): with filesystem.open(location) as fs_file: diff --git a/tests/framework/cli/test_pipeline_packaging.py b/tests/framework/cli/test_pipeline_packaging.py index 59987bdbd0..e1358f6320 100644 --- a/tests/framework/cli/test_pipeline_packaging.py +++ b/tests/framework/cli/test_pipeline_packaging.py @@ -30,6 +30,7 @@ from zipfile import ZipFile import pytest +import yaml from click import ClickException from click.testing import CliRunner @@ -466,6 +467,37 @@ def test_pull_local_whl_compare( assert not filecmp.dircmp(test_path, test_dest).diff_files assert source_params_config.read_bytes() == dest_params_config.read_bytes() + def test_pull_whl_fs_args( + self, fake_project_cli, fake_repo_path, mocker, tmp_path, fake_metadata + ): + """ + Test for pulling a wheel file with custom fs_args specified. + """ + self.call_pipeline_create(fake_project_cli.cli, fake_metadata) + self.call_pipeline_package(fake_project_cli.cli, fake_metadata) + self.call_pipeline_delete(fake_project_cli.cli, fake_metadata) + + fs_args_config = tmp_path / "fs_args_config.yml" + with fs_args_config.open(mode="w") as f: + yaml.dump({"fs_arg_1": 1, "fs_arg_2": {"fs_arg_2_nested_1": 2}}, f) + mocked_filesystem = mocker.patch("fsspec.filesystem") + + wheel_file = ( + fake_repo_path + / "src" + / "dist" + / _get_wheel_name(name=PIPELINE_NAME, version="0.1") + ) + + options = ["--fs-args", str(fs_args_config)] + CliRunner().invoke( + fake_project_cli.cli, ["pipeline", "pull", str(wheel_file), *options] + ) + + mocked_filesystem.assert_called_once_with( + "file", fs_arg_1=1, fs_arg_2=dict(fs_arg_2_nested_1=2) + ) + def test_pull_two_dist_info( self, fake_project_cli, fake_repo_path, mocker, tmp_path, fake_metadata ):