Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preview-csv-dataset #129

Merged
merged 20 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions kedro-datasets/kedro_datasets/pandas/csv_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)

def _preview(self, nrows) -> Dict:
Huongg marked this conversation as resolved.
Show resolved Hide resolved
# Create a copy so it doesn't contaminate the original dataset
dataset_copy = self._copy()
dataset_copy._load_args["nrows"] = nrows
data = dataset_copy.load()

return data.to_dict()
Huongg marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 10 additions & 1 deletion kedro-datasets/kedro_datasets/pandas/excel_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ def __init__(
if save_args is not None:
self._save_args.update(save_args)
self._writer_args = self._save_args.pop("writer", {}) # type: ignore
self._writer_args.setdefault("engine", engine or "openpyxl") # type: ignore
self._writer_args.setdefault(
"engine", engine or "openpyxl") # type: ignore

if version and self._writer_args.get("mode") == "a": # type: ignore
raise DataSetError(
Expand Down Expand Up @@ -257,3 +258,11 @@ def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)

def _preview(self, nrows) -> Dict:
# Create a copy so it doesn't contaminate the original dataset
dataset_copy = self._copy()
dataset_copy._load_args["nrows"] = nrows
data = dataset_copy.load()

return data.to_dict()
8 changes: 8 additions & 0 deletions kedro-datasets/kedro_datasets/pandas/parquet_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,11 @@ def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)

def _preview(self, nrows) -> Dict:
# Create a copy so it doesn't contaminate the original dataset
dataset_copy = self._copy()
data = dataset_copy.load().head(nrows)
print(data)

return data.to_dict()
Huongg marked this conversation as resolved.
Show resolved Hide resolved
23 changes: 19 additions & 4 deletions kedro-datasets/tests/pandas/test_csv_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
from pathlib import Path, PurePosixPath
from time import sleep
from urllib import response

import boto3
import pandas as pd
Expand Down Expand Up @@ -92,6 +93,16 @@ def test_save_and_load(self, csv_data_set, dummy_dataframe):
reloaded = csv_data_set.load()
assert_frame_equal(dummy_dataframe, reloaded)

def test_preview(self, csv_data_set, dummy_dataframe):
"""Test _preview returns the correct nrows amount."""
nrows = 2

csv_data_set.save(dummy_dataframe)
response = csv_data_set._preview(nrows=nrows)

for rows in response.values():
Huongg marked this conversation as resolved.
Show resolved Hide resolved
assert len(rows) == nrows

def test_exists(self, csv_data_set, dummy_dataframe):
"""Test `exists` method invocation for both existing and
nonexistent data set."""
Expand Down Expand Up @@ -126,7 +137,8 @@ def test_save_extra_params(self, csv_data_set, save_args):
def test_storage_options_dropped(self, load_args, save_args, caplog, tmp_path):
filepath = str(tmp_path / "test.csv")

ds = CSVDataSet(filepath=filepath, load_args=load_args, save_args=save_args)
ds = CSVDataSet(filepath=filepath, load_args=load_args,
save_args=save_args)

records = [r for r in caplog.records if r.levelname == "WARNING"]
expected_log_message = (
Expand Down Expand Up @@ -234,7 +246,8 @@ def test_multiple_loads(

def test_multiple_saves(self, dummy_dataframe, filepath_csv):
"""Test multiple cycles of save followed by load for the same dataset"""
ds_versioned = CSVDataSet(filepath=filepath_csv, version=Version(None, None))
ds_versioned = CSVDataSet(
filepath=filepath_csv, version=Version(None, None))

# first save
ds_versioned.save(dummy_dataframe)
Expand Down Expand Up @@ -362,9 +375,11 @@ def test_load_and_confirm(self, mocker, mocked_csv_in_s3, mocked_dataframe):
assert df._protocol == "s3"
# if Python >= 3.10, modify test procedure (see #67)
if sys.version_info[1] >= 10:
read_patch = mocker.patch("pandas.read_csv", return_value=mocked_dataframe)
read_patch = mocker.patch(
"pandas.read_csv", return_value=mocked_dataframe)
df.load()
read_patch.assert_called_once_with(mocked_csv_in_s3, storage_options={})
read_patch.assert_called_once_with(
mocked_csv_in_s3, storage_options={})
else:
loaded = df.load()
assert_frame_equal(loaded, mocked_dataframe)
13 changes: 12 additions & 1 deletion kedro-datasets/tests/pandas/test_excel_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ def test_save_and_load(self, excel_data_set, dummy_dataframe):
reloaded = excel_data_set.load()
assert_frame_equal(dummy_dataframe, reloaded)

def test_preview(self, excel_data_set, dummy_dataframe):
"""Test _preview returns the correct nrows amount."""
nrows = 2

excel_data_set.save(dummy_dataframe)
response = excel_data_set._preview(nrows=nrows)

for rows in response.values():
assert len(rows) == nrows

def test_save_and_load_multiple_sheets(
self, excel_multisheet_data_set, dummy_dataframe, another_dummy_dataframe
):
Expand Down Expand Up @@ -110,7 +120,8 @@ def test_save_extra_params(self, excel_data_set, save_args):
def test_storage_options_dropped(self, load_args, save_args, caplog, tmp_path):
filepath = str(tmp_path / "test.csv")

ds = ExcelDataSet(filepath=filepath, load_args=load_args, save_args=save_args)
ds = ExcelDataSet(filepath=filepath,
load_args=load_args, save_args=save_args)

records = [r for r in caplog.records if r.levelname == "WARNING"]
expected_log_message = (
Expand Down
30 changes: 24 additions & 6 deletions kedro-datasets/tests/pandas/test_parquet_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ def test_save_and_load(self, tmp_path, dummy_dataframe):
assert all(files)
assert len(files) == 1

def test_preview(self, tmp_path, dummy_dataframe):
"""Test _preview returns the correct nrows amount."""
nrows = 2

filepath = (tmp_path / FILENAME).as_posix()
parquet_data_set = ParquetDataSet(filepath=filepath)
parquet_data_set.save(dummy_dataframe)
response = parquet_data_set._preview(nrows=nrows)

for rows in response.values():
assert len(rows) == nrows

def test_save_and_load_non_existing_dir(self, tmp_path, dummy_dataframe):
"""Test saving and reloading the data set to non-existing directory."""
filepath = (tmp_path / "non-existing" / FILENAME).as_posix()
Expand Down Expand Up @@ -107,7 +119,8 @@ def test_save_extra_params(self, parquet_data_set, save_args):
def test_storage_options_dropped(self, load_args, save_args, caplog, tmp_path):
filepath = str(tmp_path / "test.csv")

ds = ParquetDataSet(filepath=filepath, load_args=load_args, save_args=save_args)
ds = ParquetDataSet(filepath=filepath,
load_args=load_args, save_args=save_args)

records = [r for r in caplog.records if r.levelname == "WARNING"]
expected_log_message = (
Expand Down Expand Up @@ -167,7 +180,8 @@ def test_catalog_release(self, protocol, path, mocker):

def test_read_partitioned_file(self, mocker, tmp_path, dummy_dataframe):
"""Test read partitioned parquet file from local directory."""
mock_pandas_call = mocker.patch("pandas.read_parquet", wraps=pd.read_parquet)
mock_pandas_call = mocker.patch(
"pandas.read_parquet", wraps=pd.read_parquet)
dummy_dataframe.to_parquet(str(tmp_path), partition_cols=["col2"])
data_set = ParquetDataSet(filepath=tmp_path.as_posix())

Expand Down Expand Up @@ -239,7 +253,8 @@ def test_save_and_load(self, versioned_parquet_data_set, dummy_dataframe, mocker
the versioned data set."""
mocker.patch(
"pyarrow.fs._ensure_filesystem",
return_value=PyFileSystem(FSSpecHandler(versioned_parquet_data_set._fs)),
return_value=PyFileSystem(FSSpecHandler(
versioned_parquet_data_set._fs)),
)
versioned_parquet_data_set.save(dummy_dataframe)
reloaded_df = versioned_parquet_data_set.load()
Expand All @@ -256,7 +271,8 @@ def test_exists(self, versioned_parquet_data_set, dummy_dataframe, mocker):
assert not versioned_parquet_data_set.exists()
mocker.patch(
"pyarrow.fs._ensure_filesystem",
return_value=PyFileSystem(FSSpecHandler(versioned_parquet_data_set._fs)),
return_value=PyFileSystem(FSSpecHandler(
versioned_parquet_data_set._fs)),
)
versioned_parquet_data_set.save(dummy_dataframe)
assert versioned_parquet_data_set.exists()
Expand All @@ -268,7 +284,8 @@ def test_prevent_overwrite(
corresponding parquet file for a given save version already exists."""
mocker.patch(
"pyarrow.fs._ensure_filesystem",
return_value=PyFileSystem(FSSpecHandler(versioned_parquet_data_set._fs)),
return_value=PyFileSystem(FSSpecHandler(
versioned_parquet_data_set._fs)),
)
versioned_parquet_data_set.save(dummy_dataframe)
pattern = (
Expand Down Expand Up @@ -300,7 +317,8 @@ def test_save_version_warning(
)
mocker.patch(
"pyarrow.fs._ensure_filesystem",
return_value=PyFileSystem(FSSpecHandler(versioned_parquet_data_set._fs)),
return_value=PyFileSystem(FSSpecHandler(
versioned_parquet_data_set._fs)),
)
with pytest.warns(UserWarning, match=pattern):
versioned_parquet_data_set.save(dummy_dataframe)
Expand Down