Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(components): Added HuggingFace dataset components #5707

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions components/datasets/HuggingFace/Load_dataset/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import NamedTuple

from kfp.components import create_component_from_func, OutputPath


def load_dataset_using_huggingface(
dataset_name: str,
dataset_dict_path: OutputPath('HuggingFaceDatasetDict'),
) -> NamedTuple('Outputs', [
('splits', list),
]):
from datasets import load_dataset

dataset_dict = load_dataset(dataset_name)
dataset_dict.save_to_disk(dataset_dict_path)
splits = list(dataset_dict.keys())
return (splits,)


if __name__ == '__main__':
load_dataset_op = create_component_from_func(
load_dataset_using_huggingface,
base_image='python:3.9',
packages_to_install=['datasets==1.6.2'],
annotations={
'author': 'Alexey Volkov <[email protected]>',
},
output_component_file='component.yaml',
)
81 changes: 81 additions & 0 deletions components/datasets/HuggingFace/Load_dataset/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
name: Load dataset using huggingface
metadata:
annotations: {author: Alexey Volkov <[email protected]>}
inputs:
- {name: dataset_name, type: String}
outputs:
- {name: dataset_dict, type: HuggingFaceDatasetDict}
- {name: splits, type: JsonArray}
implementation:
container:
image: python:3.9
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'datasets==1.6.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install
--quiet --no-warn-script-location 'datasets==1.6.2' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path

def load_dataset_using_huggingface(
dataset_name,
dataset_dict_path,
):
from datasets import load_dataset

dataset_dict = load_dataset(dataset_name)
dataset_dict.save_to_disk(dataset_dict_path)
splits = list(dataset_dict.keys())
return (splits,)

def _serialize_json(obj) -> str:
if isinstance(obj, str):
return obj
import json
def default_serializer(obj):
if hasattr(obj, 'to_struct'):
return obj.to_struct()
else:
raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__)
return json.dumps(obj, default=default_serializer, sort_keys=True)

import argparse
_parser = argparse.ArgumentParser(prog='Load dataset using huggingface', description='')
_parser.add_argument("--dataset-name", dest="dataset_name", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--dataset-dict", dest="dataset_dict_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])

_outputs = load_dataset_using_huggingface(**_parsed_args)

_output_serializers = [
_serialize_json,

]

import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --dataset-name
- {inputValue: dataset_name}
- --dataset-dict
- {outputPath: dataset_dict}
- '----output-paths'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a little awkward. What is it?

Copy link
Contributor Author

@Ark-kun Ark-kun Jul 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All lightweight python components created from functions with return values have this. https://github.com/search?q=%22----output-paths%22&type=code

The paths for all outputs are passed as a single list after the ----output-paths command-line flag. The ---- prefix was chosen to prevent possible collisions with the user-defined input/output names.
We could make the generated command-line more beautiful, but the generated parsing code would be more complicated.

- {outputPath: splits}
35 changes: 35 additions & 0 deletions components/datasets/HuggingFace/Split_dataset/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from kfp.components import create_component_from_func, InputPath, OutputPath


def split_dataset_huggingface(
dataset_dict_path: InputPath('HuggingFaceDatasetDict'),
dataset_split_path: OutputPath('HuggingFaceDataset'),
dataset_path: OutputPath('HuggingFaceArrowDataset'),
# dataset_indices_path: OutputPath('HuggingFaceArrowDataset'),
dataset_info_path: OutputPath(dict),
dataset_state_path: OutputPath(dict),
split_name: str = None,
):
import os
import shutil
from datasets import config as datasets_config

print(f'DatasetDict contents: {os.listdir(dataset_dict_path)}')
shutil.copytree(os.path.join(dataset_dict_path, split_name), dataset_split_path)
print(f'Dataset contents: {os.listdir(os.path.join(dataset_dict_path, split_name))}')
shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_ARROW_FILENAME), dataset_path)
# shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_INDICES_FILENAME), dataset_indices_path)
shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_INFO_FILENAME), dataset_info_path)
shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_STATE_JSON_FILENAME), dataset_state_path)


if __name__ == '__main__':
split_dataset_op = create_component_from_func(
split_dataset_huggingface,
base_image='python:3.9',
packages_to_install=['datasets==1.6.2'],
annotations={
'author': 'Alexey Volkov <[email protected]>',
},
output_component_file='component.yaml',
)
80 changes: 80 additions & 0 deletions components/datasets/HuggingFace/Split_dataset/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
name: Split dataset huggingface
metadata:
annotations: {author: Alexey Volkov <[email protected]>}
inputs:
- {name: dataset_dict, type: HuggingFaceDatasetDict}
- {name: split_name, type: String, optional: true}
outputs:
- {name: dataset_split, type: HuggingFaceDataset}
- {name: dataset, type: HuggingFaceArrowDataset}
- {name: dataset_info, type: JsonObject}
- {name: dataset_state, type: JsonObject}
implementation:
container:
image: python:3.9
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'datasets==1.6.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install
--quiet --no-warn-script-location 'datasets==1.6.2' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path

def split_dataset_huggingface(
dataset_dict_path,
dataset_split_path,
dataset_path,
# dataset_indices_path: OutputPath('HuggingFaceArrowDataset'),
dataset_info_path,
dataset_state_path,
split_name = None,
):
import os
import shutil
from datasets import config as datasets_config

print(f'DatasetDict contents: {os.listdir(dataset_dict_path)}')
shutil.copytree(os.path.join(dataset_dict_path, split_name), dataset_split_path)
print(f'Dataset contents: {os.listdir(os.path.join(dataset_dict_path, split_name))}')
shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_ARROW_FILENAME), dataset_path)
# shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_INDICES_FILENAME), dataset_indices_path)
shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_INFO_FILENAME), dataset_info_path)
shutil.copy(os.path.join(dataset_dict_path, split_name, datasets_config.DATASET_STATE_JSON_FILENAME), dataset_state_path)

import argparse
_parser = argparse.ArgumentParser(prog='Split dataset huggingface', description='')
_parser.add_argument("--dataset-dict", dest="dataset_dict_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--split-name", dest="split_name", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--dataset-split", dest="dataset_split_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--dataset", dest="dataset_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--dataset-info", dest="dataset_info_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--dataset-state", dest="dataset_state_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())

_outputs = split_dataset_huggingface(**_parsed_args)
args:
- --dataset-dict
- {inputPath: dataset_dict}
- if:
cond: {isPresent: split_name}
then:
- --split-name
- {inputValue: split_name}
- --dataset-split
- {outputPath: dataset_split}
- --dataset
- {outputPath: dataset}
- --dataset-info
- {outputPath: dataset_info}
- --dataset-state
- {outputPath: dataset_state}
24 changes: 24 additions & 0 deletions components/datasets/HuggingFace/_samples/sample.pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from kfp import components
from kfp import dsl


load_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d0e14a1dad4b851ad2a60a0c1a8201493f3d931c/components/datasets/HuggingFace/Load_dataset/component.yaml')
split_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d0e14a1dad4b851ad2a60a0c1a8201493f3d931c/components/datasets/HuggingFace/Split_dataset/component.yaml')


def huggingface_pipeline():
dataset_dict_task = load_dataset_op(dataset_name='imdb')
with dsl.ParallelFor(dataset_dict_task.outputs['splits']) as split_name:
deataset_task = split_dataset_op(
dataset_dict=dataset_dict_task.outputs['dataset_dict'],
split_name=split_name,
)


if __name__ == '__main__':
import kfp
kfp_endpoint = None
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(
huggingface_pipeline,
arguments={}
)