Skip to content

Commit

Permalink
Fix feather (#442)
Browse files Browse the repository at this point in the history
* check feather dep

* foggeddaboutdid

* adjust alscatispenumnoipa message

* make pyarrow optional

* add test

* fix typo
  • Loading branch information
yannikschaelte authored Apr 8, 2021
1 parent 65e5218 commit 23c08bf
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 19 deletions.
63 changes: 44 additions & 19 deletions pyabc/storage/dataframe_bytes_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,43 @@
from io import StringIO, BytesIO
import csv
import numpy as np
import pyarrow
import pyarrow.parquet as parquet
import warnings

try:
import pyarrow
import pyarrow.parquet as parquet
except ImportError:
pyarrow = parquet = None
warnings.warn(
"Cannot find pyarrow, thus falling back to the less efficient csv "
"format to store pandas.DataFrame data.",
)


class DataFrameLoadException(Exception):
pass


def df_to_bytes_csv_(df: pd.DataFrame) -> bytes:
def df_to_bytes_csv(df: pd.DataFrame) -> bytes:
return df.to_csv(quoting=csv.QUOTE_NONNUMERIC).encode()


def df_from_bytes_csv_(bytes_: bytes) -> pd.DataFrame:
def df_from_bytes_csv(bytes_: bytes) -> pd.DataFrame:
try:
s = StringIO(bytes_.decode())
s.seek(0)
return pd.read_csv(s, index_col=0, header=0,
float_precision="round_trip",
quotechar='"')
return pd.read_csv(
s, index_col=0, header=0, float_precision="round_trip",
quotechar='"')
except UnicodeDecodeError:
raise DataFrameLoadException("Not a DataFram")
raise DataFrameLoadException("Not a DataFrame")


def df_to_bytes_msgpack_(df: pd.DataFrame) -> bytes:
def df_to_bytes_msgpack(df: pd.DataFrame) -> bytes:
return df.to_msgpack()


def df_from_bytes_msgpack_(bytes_: bytes) -> pd.DataFrame:
def df_from_bytes_msgpack(bytes_: bytes) -> pd.DataFrame:
try:
df = pd.read_msgpack(BytesIO(bytes_))
except UnicodeDecodeError:
Expand All @@ -39,15 +48,15 @@ def df_from_bytes_msgpack_(bytes_: bytes) -> pd.DataFrame:
return df


def df_to_bytes_json_(df: pd.DataFrame) -> bytes:
def df_to_bytes_json(df: pd.DataFrame) -> bytes:
return df.to_json().encode()


def df_from_bytes_json_(bytes_: bytes) -> pd.DataFrame:
def df_from_bytes_json(bytes_: bytes) -> pd.DataFrame:
return pd.read_json(bytes_.decode())


def df_to_bytes_parquet_(df: pd.DataFrame) -> bytes:
def df_to_bytes_parquet(df: pd.DataFrame) -> bytes:
"""
pyarrow parquet is the standard conversion method of pandas
DataFrames since pyabc 0.9.14, because msgpack became
Expand All @@ -60,7 +69,7 @@ def df_to_bytes_parquet_(df: pd.DataFrame) -> bytes:
return b.read()


def df_from_bytes_parquet_(bytes_: bytes) -> pd.DataFrame:
def df_from_bytes_parquet(bytes_: bytes) -> pd.DataFrame:
"""
Since pyabc 0.9.14, pandas DataFrames are converted using
pyarrow parquet. If the conversion to DataFrame fails,
Expand All @@ -74,24 +83,40 @@ def df_from_bytes_parquet_(bytes_: bytes) -> pd.DataFrame:
table = parquet.read_table(b)
df = table.to_pandas()
except pyarrow.lib.ArrowIOError:
df = df_from_bytes_msgpack_(bytes_)
df = df_from_bytes_msgpack(bytes_)
return df


def df_to_bytes_np_records_(df: pd.DataFrame) -> bytes:
def df_to_bytes_np_records(df: pd.DataFrame) -> bytes:
b = BytesIO()
rec = df.to_records()
np.save(b, rec, allow_pickle=False)
b.seek(0)
return b.read()


def df_from_np_records_(bytes_: bytes) -> pd.DataFrame:
def df_from_bytes_np_records(bytes_: bytes) -> pd.DataFrame:
b = BytesIO(bytes_)
rec = np.load(b)
df = pd.DataFrame.from_records(rec, index="index")
return df


df_to_bytes = df_to_bytes_parquet_
df_from_bytes = df_from_bytes_parquet_
def df_to_bytes(df: pd.DataFrame) -> bytes:
"""Write dataframe to bytes.
Use pyarrow parquet if available, otherwise csv.
"""
if pyarrow is None:
return df_to_bytes_csv(df)
return df_to_bytes_parquet(df)


def df_from_bytes(bytes_: bytes) -> pd.DataFrame:
"""Read dataframe from bytes.
If pyarrow is not available, try csv.
"""
if pyarrow is None:
return df_from_bytes_csv(bytes_)
return df_from_bytes_parquet(bytes_)
40 changes: 40 additions & 0 deletions test/base/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@
from pyabc.population import Particle, Population
from pyabc import History
from pyabc.storage.df_to_file import sumstat_to_json
from pyabc.storage.dataframe_bytes_storage import (
df_to_bytes_parquet,
df_from_bytes_parquet,
df_to_bytes_csv,
df_from_bytes_csv,
df_to_bytes_json,
df_from_bytes_json,
df_to_bytes_np_records,
df_from_bytes_np_records,
DataFrameLoadException,
)


def example_df():
Expand Down Expand Up @@ -458,3 +469,32 @@ def test_create_db():
os.remove(file_)
with pytest.raises(ValueError):
pyabc.History("sqlite:///" + file_, create=False)


def test_dataframe_formats():
"""Test correct behavior of the different dataframe storage methods."""
df = pd.DataFrame(
{'a': [6.57, 7],
'b': [True, False],
'c': ['hola', 'hej']},
)

df_parquet = df_to_bytes_parquet(df)
df_csv = df_to_bytes_csv(df)
df_json = df_to_bytes_json(df)

# np does not allow object arrays
df_float = pd.DataFrame({'a': [4.32, 5], 'b': [4, 1.24]})
df_np_records = df_to_bytes_np_records(df_float)

assert (df == df_from_bytes_parquet(df_parquet)).all(axis=None)
assert (df == df_from_bytes_csv(df_csv)).all(axis=None)
assert (df == df_from_bytes_json(df_json)).all(axis=None)
assert (df_float == df_from_bytes_np_records(df_np_records)).all(axis=None)

with pytest.raises(DataFrameLoadException):
df_from_bytes_csv(df_parquet)

# will interpret as mspack, but late pandas version dropped that method
with pytest.raises(AttributeError):
df_from_bytes_parquet(df_csv)

0 comments on commit 23c08bf

Please sign in to comment.