Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/420 #454

Merged
merged 12 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- pyyaml >=5.1
- typing_inspect >= 0.6.0
- typing_extensions >= 3.7.4.3
- frictionless

# testing and dependencies
- black >= 20.8b1
Expand Down
222 changes: 214 additions & 8 deletions pandera/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@
import warnings
from functools import partial
from pathlib import Path
from typing import Dict, Optional, Union

import pandas as pd

from .checks import Check
from .dtypes import PandasDtype
from .schema_components import Column
from .schema_statistics import get_dataframe_schema_statistics
from .schemas import DataFrameSchema

try:
import black
import yaml
from frictionless import Schema as FrictionlessSchema
except ImportError as exc: # pragma: no cover
raise ImportError(
'IO and formatting requires "pyyaml" and "black" to be installed. \n'
"You can install pandera together with the IO dependencies with: \n"
"IO and formatting requires 'pyyaml', 'black' and 'frictionless'"
"to be installed.\n"
"You can install pandera together with the IO dependencies with:\n"
"pip install pandera[io]\n"
) from exc

Expand Down Expand Up @@ -152,8 +158,6 @@ def handle_stat_dtype(stat):


def _deserialize_component_stats(serialized_component_stats):
from pandera import Check # pylint: disable=import-outside-toplevel

pandas_dtype = serialized_component_stats.get("pandas_dtype")
if pandas_dtype:
pandas_dtype = PandasDtype.from_str_alias(pandas_dtype)
Expand Down Expand Up @@ -188,7 +192,7 @@ def _deserialize_component_stats(serialized_component_stats):

def _deserialize_schema(serialized_schema):
# pylint: disable=import-outside-toplevel
from pandera import Check, Column, DataFrameSchema, Index, MultiIndex
from pandera import Index, MultiIndex

columns, index, checks = None, None, None
if serialized_schema["columns"] is not None:
Expand Down Expand Up @@ -309,8 +313,7 @@ def _format_checks(checks_dict):
)
else:
args = ", ".join(
"{}={}".format(k, v.__repr__())
for k, v in check_kwargs.items()
f"{k}={v.__repr__()}" for k, v in check_kwargs.items()
)
checks.append(f"Check.{check_name}({args})")
return f"[{', '.join(checks)}]"
Expand Down Expand Up @@ -381,7 +384,7 @@ def to_script(dataframe_schema, path_or_buf=None):
else _format_index(statistics["index"])
)

column_str = ", ".join("'{}': {}".format(k, v) for k, v in columns.items())
column_str = ", ".join(f"'{k}': {v}" for k, v in columns.items())

script = SCRIPT_TEMPLATE.format(
columns=column_str,
Expand All @@ -404,3 +407,206 @@ def to_script(dataframe_schema, path_or_buf=None):

with Path(path_or_buf).open("w") as f:
f.write(formatted_script)


class FrictionlessFieldParser:
"""Parses frictionless data schema field specifications so we can convert
them to an equivalent :class:`pandera.schema_components.Column` schema.

For this implementation, we are using field names, constraints and types
but leaving other frictionless parameters out (e.g. foreign keys, type
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can go back and add this feature when #331 is done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in anticipation of this, I've standardised all properties of FrictionlessFieldParser to reflect the properties we need for a pandera Column object - so when #331 is complete it'll be clearer where to add the new properties in, too.

formats, titles, descriptions).

:param field: a field object from a frictionless schema.
:primary_keys: the primary keys from a frictionless schema. These are used
to ensure primary key fields are treated properly - no duplicates,
no missing values etc.
"""

def __init__(self, field, primary_keys) -> None:
self.constraints = field.constraints or {}
self.name = field.name
self.is_a_primary_key = self.name in primary_keys
self.type = field.get("type", "string")

@property
def pandas_dtype(self) -> str:
"""Determine what type of field this is, so we can feed that into
:class:`~pandera.dtypes.PandasDtype`. If no type is specified in the
frictionless schema, we default to string values.

:returns: the pandas-compatible representation of this field type as a
string.
"""
types = {
"string": "string",
"number": "float",
"integer": "int",
"boolean": "bool",
"object": "object",
"array": "object",
"date": "string",
"time": "string",
"datetime": "datetime64[ns]",
"year": "int",
"yearmonth": "string",
"duration": "timedelta64[ns]",
"geopoint": "object",
"geojson": "object",
"any": "string",
}
return (
"category"
if self.constraints.get("enum", None)
else types[self.type]
)

@property
def checks(self) -> Optional[Dict]:
"""Convert a set of frictionless schema field constraints into checks.

This parses the standard set of frictionless constraints which can be
found
`here <https://specs.frictionlessdata.io/table-schema/#constraints>`_
and maps them into the equivalent pandera checks.

:returns: a dictionary of pandera :class:`pandera.checks.Check`
objects which capture the standard constraint logic of a
frictionless schema field.
"""
if not self.constraints:
return None
constraints = self.constraints.copy()
checks = {}

def _combine_constraints(check_name, min_constraint, max_constraint):
"""Catches bounded constraints where we need to combine a min and max
pair of constraints into a single check."""
if min_constraint in constraints and max_constraint in constraints:
checks[check_name] = {
"min_value": constraints.pop(min_constraint),
"max_value": constraints.pop(max_constraint),
}

_combine_constraints("in_range", "minimum", "maximum")
_combine_constraints("str_length", "minLength", "maxLength")

for constraint_type, constraint_value in constraints.items():
if constraint_type == "maximum":
checks["less_than_or_equal_to"] = constraint_value
elif constraint_type == "minimum":
checks["greater_than_or_equal_to"] = constraint_value
elif constraint_type == "maxLength":
checks["str_length"] = {
"min_value": None,
"max_value": constraint_value,
}
elif constraint_type == "minLength":
checks["str_length"] = {
"min_value": constraint_value,
"max_value": None,
}
elif constraint_type == "pattern":
checks["str_matches"] = rf"^{constraint_value}$"
elif constraint_type == "enum":
checks["isin"] = constraint_value
return checks or None

@property
def nullable(self) -> bool:
"""Determine whether this field can contain missing values."""
if self.is_a_primary_key:
return False
return not self.constraints.get("required", False)

@property
def allow_duplicates(self) -> bool:
"""Determine whether this field can contain duplicate values."""
if self.is_a_primary_key:
return False
return not self.constraints.get("unique", False)

@property
def coerce(self) -> bool:
"""Determine whether values within this field should be coerced."""
return True

@property
def required(self) -> bool:
"""Determine whether this field must exist within the data."""
return True

@property
def regex(self) -> bool:
"""Determine whether this field name should be used for regex matches."""
return False

def to_pandera_column(self) -> Dict:
"""Export this field to a column spec dictionary."""
return {
"allow_duplicates": self.allow_duplicates,
"checks": self.checks,
"coerce": self.coerce,
"nullable": self.nullable,
"pandas_dtype": self.pandas_dtype,
"required": self.required,
"name": self.name,
"regex": self.regex,
}


def from_frictionless_schema(
schema: Union[str, Path, Dict, FrictionlessSchema]
) -> DataFrameSchema:
"""Create a :class:`~pandera.schemas.DataFrameSchema` from a frictionless
json/yaml schema file on disk, or a frictionless schema already loaded
into memory.

Each field from the frictionless schema will be converted to a pandera
column specification using :class:`~pandera.io.FrictionlessFieldParser`
to map field characteristics to pandera column specifications.

:param schema: the frictionless schema object (or a
string/Path to the location on disk of a schema specification) to
parse.
:returns: dataframe schema with frictionless field specs converted to
pandera column checks and constraints for use as normal.

:example:

>>> from pandera.io import from_frictionless_schema
>>>
>>> FRICTIONLESS_SCHEMA = {
... "fields": [
... {
... "name": "column_1",
... "type": "integer",
... "constraints": {"minimum": 10, "maximum": 99}
... }
... ],
... "primaryKey": "column_1"
... }
>>> schema = from_frictionless_schema(FRICTIONLESS_SCHEMA)
>>> schema.columns["column_1"].checks
[<Check in_range: in_range(10, 99)>]
>>> schema.columns["column_1"].required
True
>>> schema.columns["column_1"].allow_duplicates
False
"""
if not isinstance(schema, FrictionlessSchema):
schema = FrictionlessSchema(schema)

assembled_schema = {
"columns": {
field.name: FrictionlessFieldParser(
field, schema.primary_key
).to_pandera_column()
for field in schema.fields
},
"index": None,
"checks": None,
"coerce": True,
"strict": True,
}
return _deserialize_schema(assembled_schema)
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ wrapt
pyyaml >=5.1
typing_inspect >= 0.6.0
typing_extensions >= 3.7.4.3
frictionless
black >= 20.8b1
isort >= 5.7.0
codecov
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
_extras_require = {
"strategies": ["hypothesis >= 5.41.1"],
"hypotheses": ["scipy"],
"io": ["pyyaml >= 5.1", "black"],
"io": ["pyyaml >= 5.1", "black", "frictionless"],
}
extras_require = {
**_extras_require,
Expand Down
Loading