From d0e14a1dad4b851ad2a60a0c1a8201493f3d931c Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 19 May 2021 21:05:44 -0700 Subject: [PATCH 1/2] Components - Added HuggingFace dataset components --- .../HuggingFace/Load_dataset/component.py | 29 +++++++ .../HuggingFace/Load_dataset/component.yaml | 81 +++++++++++++++++++ .../HuggingFace/Split_dataset/component.py | 35 ++++++++ .../HuggingFace/Split_dataset/component.yaml | 80 ++++++++++++++++++ 4 files changed, 225 insertions(+) create mode 100644 components/datasets/HuggingFace/Load_dataset/component.py create mode 100644 components/datasets/HuggingFace/Load_dataset/component.yaml create mode 100644 components/datasets/HuggingFace/Split_dataset/component.py create mode 100644 components/datasets/HuggingFace/Split_dataset/component.yaml diff --git a/components/datasets/HuggingFace/Load_dataset/component.py b/components/datasets/HuggingFace/Load_dataset/component.py new file mode 100644 index 00000000000..1419b90bc03 --- /dev/null +++ b/components/datasets/HuggingFace/Load_dataset/component.py @@ -0,0 +1,29 @@ +from typing import NamedTuple + +from kfp.components import create_component_from_func, OutputPath + + +def load_dataset_using_huggingface( + dataset_name: str, + dataset_dict_path: OutputPath('HuggingFaceDatasetDict'), +) -> NamedTuple('Outputs', [ + ('splits', list), +]): + from datasets import load_dataset + + dataset_dict = load_dataset(dataset_name) + dataset_dict.save_to_disk(dataset_dict_path) + splits = list(dataset_dict.keys()) + return (splits,) + + +if __name__ == '__main__': + load_dataset_op = create_component_from_func( + load_dataset_using_huggingface, + base_image='python:3.9', + packages_to_install=['datasets==1.6.2'], + annotations={ + 'author': 'Alexey Volkov ', + }, + output_component_file='component.yaml', + ) diff --git a/components/datasets/HuggingFace/Load_dataset/component.yaml b/components/datasets/HuggingFace/Load_dataset/component.yaml new file mode 100644 index 00000000000..0f90d7e0408 --- /dev/null +++ b/components/datasets/HuggingFace/Load_dataset/component.yaml @@ -0,0 +1,81 @@ +name: Load dataset using huggingface +metadata: + annotations: {author: Alexey Volkov } +inputs: +- {name: dataset_name, type: String} +outputs: +- {name: dataset_dict, type: HuggingFaceDatasetDict} +- {name: splits, type: JsonArray} +implementation: + container: + image: python:3.9 + command: + - sh + - -c + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location + 'datasets==1.6.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install + --quiet --no-warn-script-location 'datasets==1.6.2' --user) && "$0" "$@" + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def load_dataset_using_huggingface( + dataset_name, + dataset_dict_path, + ): + from datasets import load_dataset + + dataset_dict = load_dataset(dataset_name) + dataset_dict.save_to_disk(dataset_dict_path) + splits = list(dataset_dict.keys()) + return (splits,) + + def _serialize_json(obj) -> str: + if isinstance(obj, str): + return obj + import json + def default_serializer(obj): + if hasattr(obj, 'to_struct'): + return obj.to_struct() + else: + raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) + return json.dumps(obj, default=default_serializer, sort_keys=True) + + import argparse + _parser = argparse.ArgumentParser(prog='Load dataset using huggingface', description='') + _parser.add_argument("--dataset-name", dest="dataset_name", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--dataset-dict", dest="dataset_dict_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = load_dataset_using_huggingface(**_parsed_args) + + _output_serializers = [ + _serialize_json, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --dataset-name + - {inputValue: dataset_name} + - --dataset-dict + - {outputPath: dataset_dict} + - '----output-paths' + - {outputPath: splits} diff --git a/components/datasets/HuggingFace/Split_dataset/component.py b/components/datasets/HuggingFace/Split_dataset/component.py new file mode 100644 index 00000000000..fb932da9593 --- /dev/null +++ b/components/datasets/HuggingFace/Split_dataset/component.py @@ -0,0 +1,35 @@ +from kfp.components import create_component_from_func, InputPath, OutputPath + + +def split_dataset_huggingface( + dataset_dict_path: InputPath('HuggingFaceDatasetDict'), + dataset_split_path: OutputPath('HuggingFaceDataset'), + dataset_path: OutputPath('HuggingFaceArrowDataset'), + # dataset_indices_path: OutputPath('HuggingFaceArrowDataset'), + dataset_info_path: OutputPath(dict), + dataset_state_path: OutputPath(dict), + split_name: str = None, +): + import os + import shutil + from datasets import config as datasets_config + + print(f'DatasetDict contents: {os.listdir(dataset_dict_path)}') + shutil.copytree(os.path.join(dataset_dict_path, split_name), dataset_split_path) + print(f'Dataset contents: {os.listdir(os.path.join(dataset_dict_path, split_name))}') + shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_ARROW_FILENAME), dataset_path) + # shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_INDICES_FILENAME), dataset_indices_path) + shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_INFO_FILENAME), dataset_info_path) + shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_STATE_JSON_FILENAME), dataset_state_path) + + +if __name__ == '__main__': + split_dataset_op = create_component_from_func( + split_dataset_huggingface, + base_image='python:3.9', + packages_to_install=['datasets==1.6.2'], + annotations={ + 'author': 'Alexey Volkov ', + }, + output_component_file='component.yaml', + ) diff --git a/components/datasets/HuggingFace/Split_dataset/component.yaml b/components/datasets/HuggingFace/Split_dataset/component.yaml new file mode 100644 index 00000000000..87769e12ae9 --- /dev/null +++ b/components/datasets/HuggingFace/Split_dataset/component.yaml @@ -0,0 +1,80 @@ +name: Split dataset huggingface +metadata: + annotations: {author: Alexey Volkov } +inputs: +- {name: dataset_dict, type: HuggingFaceDatasetDict} +- {name: split_name, type: String, optional: true} +outputs: +- {name: dataset_split, type: HuggingFaceDataset} +- {name: dataset, type: HuggingFaceArrowDataset} +- {name: dataset_info, type: JsonObject} +- {name: dataset_state, type: JsonObject} +implementation: + container: + image: python:3.9 + command: + - sh + - -c + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location + 'datasets==1.6.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install + --quiet --no-warn-script-location 'datasets==1.6.2' --user) && "$0" "$@" + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def split_dataset_huggingface( + dataset_dict_path, + dataset_split_path, + dataset_path, + # dataset_indices_path: OutputPath('HuggingFaceArrowDataset'), + dataset_info_path, + dataset_state_path, + split_name = None, + ): + import os + import shutil + from datasets import config as datasets_config + + print(f'DatasetDict contents: {os.listdir(dataset_dict_path)}') + shutil.copytree(os.path.join(dataset_dict_path, split_name), dataset_split_path) + print(f'Dataset contents: {os.listdir(os.path.join(dataset_dict_path, split_name))}') + shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_ARROW_FILENAME), dataset_path) + # shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_INDICES_FILENAME), dataset_indices_path) + shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_INFO_FILENAME), dataset_info_path) + shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_STATE_JSON_FILENAME), dataset_state_path) + + import argparse + _parser = argparse.ArgumentParser(prog='Split dataset huggingface', description='') + _parser.add_argument("--dataset-dict", dest="dataset_dict_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--split-name", dest="split_name", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--dataset-split", dest="dataset_split_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--dataset", dest="dataset_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--dataset-info", dest="dataset_info_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--dataset-state", dest="dataset_state_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = split_dataset_huggingface(**_parsed_args) + args: + - --dataset-dict + - {inputPath: dataset_dict} + - if: + cond: {isPresent: split_name} + then: + - --split-name + - {inputValue: split_name} + - --dataset-split + - {outputPath: dataset_split} + - --dataset + - {outputPath: dataset} + - --dataset-info + - {outputPath: dataset_info} + - --dataset-state + - {outputPath: dataset_state} From 49b544adfcecb6750dc7cd3ec02e3cb74da3506f Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 19 May 2021 21:07:19 -0700 Subject: [PATCH 2/2] Added a sample pipeline --- .../HuggingFace/_samples/sample.pipeline.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 components/datasets/HuggingFace/_samples/sample.pipeline.py diff --git a/components/datasets/HuggingFace/_samples/sample.pipeline.py b/components/datasets/HuggingFace/_samples/sample.pipeline.py new file mode 100644 index 00000000000..bf6ced5b7c8 --- /dev/null +++ b/components/datasets/HuggingFace/_samples/sample.pipeline.py @@ -0,0 +1,24 @@ +from kfp import components +from kfp import dsl + + +load_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d0e14a1dad4b851ad2a60a0c1a8201493f3d931c/components/datasets/HuggingFace/Load_dataset/component.yaml') +split_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d0e14a1dad4b851ad2a60a0c1a8201493f3d931c/components/datasets/HuggingFace/Split_dataset/component.yaml') + + +def huggingface_pipeline(): + dataset_dict_task = load_dataset_op(dataset_name='imdb') + with dsl.ParallelFor(dataset_dict_task.outputs['splits']) as split_name: + deataset_task = split_dataset_op( + dataset_dict=dataset_dict_task.outputs['dataset_dict'], + split_name=split_name, + ) + + +if __name__ == '__main__': + import kfp + kfp_endpoint = None + kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func( + huggingface_pipeline, + arguments={} + )