Skip to content

Commit

Permalink
Fix adapting tests to fsspec
Browse files Browse the repository at this point in the history
  • Loading branch information
cmutel committed May 28, 2024
1 parent b0a209f commit 00c2c25
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 178 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ class ExampleArrayInterface:
Serialized datapackages cannot contain executable code, both because of our chosen data formats, and for security reasons. Therefore, when loading a datapackage with an interface, that interface object needs to be reconstituted as Python code - we call this cycle dehydration and rehydration. Dehydration happens automatically when a datapackage is finalized with `finalize_serialization()`, but rehydration needs to be done manually using `rehydrate_interface()`. For example:

```python
from fs.zipfs import ZipFS
from fsspec.implementations.zip import ZipFileSystem
from bw_processing import load_datapackage

my_dp = load_datapackage(ZipFS("some-path.zip"))
my_dp = load_datapackage(ZipFileSystem("some-path.zip"))
my_dp.rehydrate_interface("some-resource-name", ExampleVectorInterface())
```

Expand All @@ -119,7 +119,7 @@ You can list the dehydrated interfaces present with `.dehydrated_interfaces()`.
You can store useful information for the interface object initialization under the resource key `config`. This can be used in instantiating an interface if you pass `initialize_with_config`:

```python
from fs.zipfs import ZipFS
from fsspec.implementations.zip import ZipFileSystem
from bw_processing import load_datapackage
import requests
import numpy as np
Expand All @@ -133,7 +133,7 @@ class MyInterface:
return np.array(requests.get(self.url).json())


my_dp = load_datapackage(ZipFS("some-path.zip"))
my_dp = load_datapackage(ZipFileSystem("some-path.zip"))
data_obj, resource_metadata = my_dp.get_resource("some-interface")
print(resource_metadata['config'])
>>> {"url": "example.com"}
Expand Down
12 changes: 6 additions & 6 deletions bw_processing/examples/parquet_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from pathlib import Path

import numpy as np
from fs.osfs import OSFS
from fs.zipfs import ZipFS
from fsspec.implementations.zip import ZipFileSystem

import bw_processing as bwp
from bw_processing.io_helpers import generic_directory_filesystem

if __name__ == "__main__":
print("This is a basic example on how to use parquet files.")
Expand All @@ -30,13 +30,13 @@
# VERSION OSFS
# Directory must exist for OSFS otherwise use OSFS(dirpath, create=True)!
# Every created object will be saved in that same directory
dp_dir = OSFS(str(dirpath / "datapackage_1"), create=True)
dp_dir = generic_directory_filesystem(dirpath=dirpath / "datapackage_1", create=True)
dp = bwp.create_datapackage(
fs=dp_dir, matrix_serialize_format_type=bwp.MatrixSerializeFormat.NUMPY
)
else:
# VERSION ZIP
dp_zip_file = ZipFS(str(dirpath / "datapackage_2.zip"), mode="w")
dp_zip_file = ZipFileSystem(str(dirpath / "datapackage_2.zip"), mode="w")
dp = bwp.create_datapackage(
fs=dp_zip_file, matrix_serialize_format_type=bwp.MatrixSerializeFormat.NUMPY
) # bwp.create_datapackage(fs=dp_zip_file, serialize_type=SerializeENum.parquet)
Expand Down Expand Up @@ -98,10 +98,10 @@
# OSFS must be open! (and it was closed with finalize_serialization())

if USE_OSFS:
dp_dir = OSFS(str(dirpath / "datapackage_1"))
dp_dir = generic_directory_filesystem(dirpath=dirpath / "datapackage_1")
dp2 = bwp.load_datapackage(fs_or_obj=dp_dir)
else:
dp_zip_file = ZipFS(str(dirpath / "datapackage_2.zip"))
dp_zip_file = ZipFileSystem(dirpath / "datapackage_2.zip")
dp2 = bwp.load_datapackage(fs_or_obj=dp_zip_file)

print("Done!")
Expand Down
11 changes: 5 additions & 6 deletions bw_processing/io_parquet_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import os

# for annotation
from io import BufferedWriter, RawIOBase
from io import BufferedWriter, IOBase, RawIOBase

import numpy
import numpy as np
import pyarrow.parquet as pq
from fs.iotools import RawWrapper

from .errors import WrongDatatype
from .io_pyarrow_helpers import (
Expand Down Expand Up @@ -59,12 +58,12 @@ def write_ndarray_to_parquet_file(
pq.write_table(table, file)


def read_parquet_file_to_ndarray(file: RawWrapper) -> numpy.ndarray:
def read_parquet_file_to_ndarray(file: RawIOBase) -> numpy.ndarray:
"""
Read an `ndarray` from a `parquet` file.
Args:
file (fs.iotools.RawWrapper): File to read from.
file (io.RawIOBase or fsspec file object): File to read from.
Raises:
`WrongDatatype` if the correct metadata is not found in the `parquet` file.
Expand Down Expand Up @@ -122,12 +121,12 @@ def save_arr_to_parquet(file: RawIOBase, arr: np.ndarray, meta_object: str, meta
write_ndarray_to_parquet_file(fid, arr, meta_object=meta_object, meta_type=meta_type)


def load_ndarray_from_parquet(file: RawWrapper) -> np.ndarray:
def load_ndarray_from_parquet(file: RawIOBase) -> np.ndarray:
"""
Deserialize a `numpy` `ndarray` from a `parquet` `file`.
Parameters
file (fs.iotools.RawWrapper): File to read from.
file (io.RawIOBase or fsspec file object): File to read from.
Returns
The corresponding `numpy` `ndarray`.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
# dependencies as strings with quotes, e.g. "foo"
# You can add version requirements
"fsspec",
"morefs",
"numpy",
"pandas",

Expand Down
148 changes: 0 additions & 148 deletions tests/calculation_package.py

This file was deleted.

24 changes: 19 additions & 5 deletions tests/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import numpy as np
import pandas as pd
import pytest
from fs.osfs import OSFS

from bw_processing import create_datapackage, load_datapackage
from bw_processing.constants import INDICES_DTYPE
from bw_processing.errors import NonUnique
from bw_processing.indexing import reindex, reset_index
from bw_processing.io_helpers import generic_directory_filesystem

### Fixture

Expand Down Expand Up @@ -66,14 +66,22 @@ def add_data(dp, id_field="id"):

@pytest.fixture
def fixture():
dp = load_datapackage(OSFS(str(Path(__file__).parent.resolve() / "fixtures" / "indexing")))
dp = load_datapackage(
generic_directory_filesystem(
dirpath=Path(__file__).parent.resolve() / "fixtures" / "indexing"
)
)
dp_unchanged(dp)
return dp


def dp_unchanged(dp=None):
if dp is None:
dp = load_datapackage(OSFS(str(Path(__file__).parent.resolve() / "fixtures" / "indexing")))
dp = load_datapackage(
generic_directory_filesystem(
dirpath=Path(__file__).parent.resolve() / "fixtures" / "indexing"
)
)

array, _ = dp.get_resource("vector.indices")
assert np.allclose(array["row"], np.array([11, 11, 13]))
Expand Down Expand Up @@ -124,7 +132,11 @@ def test_reset_index_modified(fixture):
reset_index(fixture, "vector-csv-rows")
assert fixture._modified == set([fixture._get_index("vector.indices")])

fixture = load_datapackage(OSFS(str(Path(__file__).parent.resolve() / "fixtures" / "indexing")))
fixture = load_datapackage(
generic_directory_filesystem(
dirpath=Path(__file__).parent.resolve() / "fixtures" / "indexing"
)
)
assert not fixture._modified

reset_index(fixture, "csv-multiple")
Expand Down Expand Up @@ -364,7 +376,9 @@ def test_reindex_data_iterable_wrong_type(fixture):
(dirpath / "indexing").mkdir(exist_ok=True)

dp = create_datapackage(
fs=OSFS(str(dirpath / "indexing")), name="indexing-fixture", id_="fixture-i"
fs=generic_directory_filesystem(dirpath=dirpath / "indexing"),
name="indexing-fixture",
id_="fixture-i",
)
add_data(dp)
dp.finalize_serialization()
Loading

0 comments on commit 00c2c25

Please sign in to comment.