Skip to content

Commit

Permalink
Merge pull request kedro-org#913 from quantumblacklabs/merge-master-t…
Browse files Browse the repository at this point in the history
…o-develop

Merge master into develop via merge-master-to-develop
  • Loading branch information
idanov authored Dec 10, 2020
2 parents e500f99 + f2265a3 commit bdfc68b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 11 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Deprecated `kedro.framework.context.load_context`, it will be removed in 0.18.0 release.
* Relax `pyspark` requirements to allow for installation of `pyspark` 3.0.
* Deprecated `kedro.framework.cli.get_project_context`, it will be removed in 0.18.0 release.
* Added a `--fs-args` option to the `kedro pipeline pull` command to specify configuration options for the fsspec filesystem arguments used when pulling modular pipelines from non-PyPI locations.

## Breaking changes to the API
* `kedro.io.DataCatalog.exists()` returns `False` when the dataset does not exist, as opposed to raising an exception.
Expand Down
15 changes: 15 additions & 0 deletions docs/source/06_nodes_and_pipelines/03_modular_pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ kedro pipeline pull https://<bucket_name>.s3.<aws-region>.amazonaws.com/<pipelin
kedro pipeline pull <pypi-package-name>
```

If you are pulling the pipeline from a location that isn't PyPI, Kedro uses [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to locate and pull down your pipeline. If you need to provide any `fsspec`-specific arguments (say, if you're pulling your pipeline down from an S3 bucket and want to provide the S3 credentials inline or from a local server that requires tokens in the header) then you can use the `--fs-args` option to point to a YAML (or any `anyconfig`-supported configuration) file that contains the required configuration.

```bash
kedro pipeline pull https://<url-to-pipeline.whl> --fs-args pipeline_pull_args.yml
```

where

```
# pipeline_pull_args.yml
client_kwargs:
headers:
Authorization: token <token>
```

## A modular pipeline example template

Here is an example of a modular pipeline which combines all of these concepts within a Kedro project:
Expand Down
37 changes: 26 additions & 11 deletions kedro/framework/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import tempfile
from pathlib import Path
from textwrap import indent
from typing import Any, List, NamedTuple, Tuple, Union
from typing import Any, List, NamedTuple, Optional, Tuple, Union
from zipfile import ZipFile

import click
Expand Down Expand Up @@ -233,9 +233,17 @@ def describe_pipeline(
callback=_check_pipeline_name,
help="Alternative name to unpackage under.",
)
@click.option(
"--fs-args",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True
),
default=None,
help="Location of a configuration file for the fsspec filesystem used to pull the package.",
)
@click.pass_obj # this will pass the metadata as first argument
def pull_package(
metadata: ProjectMetadata, package_path, env, alias, **kwargs
metadata: ProjectMetadata, package_path, env, alias, fs_args, **kwargs
): # pylint:disable=unused-argument
"""Pull a modular pipeline package, unpack it and install the files to corresponding
locations.
Expand All @@ -244,7 +252,7 @@ def pull_package(
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir).resolve()

_unpack_wheel(package_path, temp_dir_path)
_unpack_wheel(package_path, temp_dir_path, fs_args)

dist_info_file = list(temp_dir_path.glob("*.dist-info"))
if len(dist_info_file) != 1:
Expand All @@ -260,6 +268,19 @@ def pull_package(
_install_files(metadata, package_name, temp_dir_path, env, alias)


def _get_fsspec_filesystem(location: str, fs_args: Optional[str]):
# pylint: disable=import-outside-toplevel
import anyconfig
import fsspec

from kedro.io.core import get_protocol_and_path

protocol, _ = get_protocol_and_path(location)
fs_args_config = anyconfig.load(fs_args) if fs_args else {}

return fsspec.filesystem(protocol, **fs_args_config)


@pipeline.command("package")
@env_option(
help="Environment where the pipeline configuration lives. Defaults to `base`."
Expand Down Expand Up @@ -313,14 +334,8 @@ def _echo_deletion_warning(message: str, **paths: List[Path]):
click.echo(indent(paths_str, " " * 2))


def _unpack_wheel(location: str, destination: Path) -> None:
# pylint: disable=import-outside-toplevel
import fsspec

from kedro.io.core import get_protocol_and_path

protocol, _ = get_protocol_and_path(location)
filesystem = fsspec.filesystem(protocol)
def _unpack_wheel(location: str, destination: Path, fs_args: Optional[str]) -> None:
filesystem = _get_fsspec_filesystem(location, fs_args)

if location.endswith(".whl") and filesystem.exists(location):
with filesystem.open(location) as fs_file:
Expand Down
32 changes: 32 additions & 0 deletions tests/framework/cli/test_pipeline_packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from zipfile import ZipFile

import pytest
import yaml
from click import ClickException
from click.testing import CliRunner

Expand Down Expand Up @@ -466,6 +467,37 @@ def test_pull_local_whl_compare(
assert not filecmp.dircmp(test_path, test_dest).diff_files
assert source_params_config.read_bytes() == dest_params_config.read_bytes()

def test_pull_whl_fs_args(
self, fake_project_cli, fake_repo_path, mocker, tmp_path, fake_metadata
):
"""
Test for pulling a wheel file with custom fs_args specified.
"""
self.call_pipeline_create(fake_project_cli.cli, fake_metadata)
self.call_pipeline_package(fake_project_cli.cli, fake_metadata)
self.call_pipeline_delete(fake_project_cli.cli, fake_metadata)

fs_args_config = tmp_path / "fs_args_config.yml"
with fs_args_config.open(mode="w") as f:
yaml.dump({"fs_arg_1": 1, "fs_arg_2": {"fs_arg_2_nested_1": 2}}, f)
mocked_filesystem = mocker.patch("fsspec.filesystem")

wheel_file = (
fake_repo_path
/ "src"
/ "dist"
/ _get_wheel_name(name=PIPELINE_NAME, version="0.1")
)

options = ["--fs-args", str(fs_args_config)]
CliRunner().invoke(
fake_project_cli.cli, ["pipeline", "pull", str(wheel_file), *options]
)

mocked_filesystem.assert_called_once_with(
"file", fs_arg_1=1, fs_arg_2=dict(fs_arg_2_nested_1=2)
)

def test_pull_two_dist_info(
self, fake_project_cli, fake_repo_path, mocker, tmp_path, fake_metadata
):
Expand Down

0 comments on commit bdfc68b

Please sign in to comment.