Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load subset into dataframe #54

Merged
merged 7 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 8 additions & 24 deletions fondant/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import dask.dataframe as dd

from fondant.component_spec import FondantComponentSpec
from fondant.manifest import Manifest, Index
from fondant.manifest import Manifest
from fondant.schema import Type, Field

logger = logging.getLogger(__name__)
Expand All @@ -30,9 +30,7 @@ def __init__(self, manifest: Manifest):
"__null_dask_index__": "int64",
}

def _load_subset(
self, name: str, fields: t.List[str], index: t.Optional[Index] = None
) -> dd.DataFrame:
def _load_subset(self, name: str, fields: t.List[str]) -> dd.DataFrame:
# get subset from the manifest
subset = self.manifest.subsets[name]
# get remote path
Expand All @@ -49,13 +47,6 @@ def _load_subset(
columns=fields,
)

# filter on default index of manifest if no index is provided
if index is None:
index_df = self._load_index()
ids = index_df["id"].compute()
sources = index_df["source"].compute()
df = df[df["id"].isin(ids) & df["source"].isin(sources)]

# add subset prefix to columns
df = df.rename(
columns={
Expand All @@ -70,26 +61,19 @@ def _load_index(self):
index = self.manifest.index
# get remote path
remote_path = index.location

df = dd.read_parquet(remote_path)

if list(df.columns) != ["id", "source"]:
raise ValueError(
f"Index columns should be 'id' and 'source', found {df.columns}"
)
# load index from parquet, expecting id and source columns
df = dd.read_parquet(remote_path, columns=["id", "source"])

return df

def load_dataframe(self, spec: FondantComponentSpec) -> dd.DataFrame:
subset_dfs = []
# load index into dataframe
df = self._load_index()
for name, subset in spec.input_subsets.items():
fields = list(subset.fields.keys())
subset_df = self._load_subset(name, fields)
subset_dfs.append(subset_df)

# return a single dataframe with column_names called subset_field
# TODO perhaps leverage dd.merge here instead
df = dd.concat(subset_dfs)
# left joins -> filter on index
df = dd.merge(df, subset_df, on=["id", "source"], how="left")

logging.info("Columns of dataframe:", list(df.columns))

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ classifiers = [
[tool.poetry.dependencies]
python = "^3.8"
jsonschema = "^4.17.3"
dask = "^2022.2.0"
dask = {extras = ["dataframe"], version = "^2023.4.1"}

pyarrow = "^11.0.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we move this up from the optional to the required dependencies?

kfp = { version = "^1.8.19", optional = true }
kubernetes = { version = "^18.20.0", optional = true }
pandas = { version = "^1.3.5", optional = true }
Expand Down
21 changes: 21 additions & 0 deletions tests/example_data/components/1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Test component 1
description: This is an example component
image: example_component:latest

input_subsets:
properties:
fields:
Name:
type: "utf8"
HP:
type: "int32"
types:
fields:
Type 1:
type: "utf8"
Type 2:
type: "utf8"
args:
storage_args:
description: Storage arguments
type: str
34 changes: 34 additions & 0 deletions tests/example_data/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"metadata": {
"base_path": "tests/example_data/subsets",
"run_id": "12345",
"component_id": "67890"
},
"index": {
"location": "/index"
},
"subsets": {
"properties": {
"location": "/properties",
"fields": {
"Name": {
"type": "utf8"
},
"HP": {
"type": "int32"
}
}
},
"types": {
"location": "/types",
"fields": {
"Type 1": {
"type": "utf8"
},
"Type 2": {
"type": "utf8"
}
}
}
}
}
39 changes: 39 additions & 0 deletions tests/example_data/raw/split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
This is a small script to split the raw data into different subsets to be used while testing.

The data is the 151 first pokemon and the following fields are available:

'id', 'Name', 'Type 1', 'Type 2', 'Total', 'HP', 'Attack', 'Defense',
'Sp. Atk', 'Sp. Def', 'Speed', 'source', 'Legendary'


"""
from pathlib import Path
import dask.dataframe as dd

data_path = Path(__file__).parent
output_path = Path(__file__).parent.parent / "subsets/"


def split_into_subsets():
# read in complete dataset
master_df = dd.read_parquet(path=data_path / "testset.parquet")

# create index subset
index_df = master_df[["id", "source"]]
index_df.set_index("id")
index_df.to_parquet(output_path / "index")

# create properties subset
properies_df = master_df[["id", "source", "Name", "HP"]]
properies_df.set_index("id")
properies_df.to_parquet(output_path / "properties")

# create types subset
types_df = master_df[["id", "source", "Type 1", "Type 2"]]
types_df.set_index("id")
types_df.to_parquet(output_path / "types")


if __name__ == "__main__":
split_into_subsets()
Binary file added tests/example_data/raw/testset.parquet
Binary file not shown.
Binary file added tests/example_data/subsets/index/part.0.parquet
Binary file not shown.
Binary file not shown.
Binary file added tests/example_data/subsets/types/part.0.parquet
Binary file not shown.
40 changes: 40 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import json
import pytest
import dask.dataframe as dd
from pathlib import Path

from fondant.manifest import Manifest
from fondant.dataset import FondantDataset
from fondant.component_spec import FondantComponentSpec

manifest_path = Path(__file__).parent / "example_data/manifest.json"
component_spec_path = Path(__file__).parent / "example_data/components/1.yaml"


@pytest.fixture
def manifest():
return Manifest.from_file(manifest_path)


@pytest.fixture
def component_spec():
return FondantComponentSpec.from_file(component_spec_path)


def test_load_index(manifest):
fds = FondantDataset(manifest)
assert len(fds._load_index()) == 151


def test_merge_subsets(manifest, component_spec):
fds = FondantDataset(manifest=manifest)
df = fds.load_dataframe(spec=component_spec)
assert len(df) == 151
assert list(df.columns) == [
"id",
"source",
"properties_Name",
"properties_HP",
"types_Type 1",
"types_Type 2",
]