Skip to content

Commit

Permalink
Feature/420 (#454)
Browse files Browse the repository at this point in the history
* parse frictionless schema

- using frictionless-py for some of the heavy lifting
- accept yaml/json/frictionless schema files/objects directly
- frictionless becomes a new requirement for io
- apply pre-commit formatting updates to other code in pandera.io
- add test to validate schema parsing, from yaml and json sources

* improve documentation

* update docstrings per code review

Co-authored-by: Niels Bantilan <[email protected]>

* add type hints

* standardise class properties for easier re-use in future

* simplify key check

* add missing alternative type

* update docstring

* align name with Column arg

* fix NaN check

* fix type assertion

* create empty dict if constraints not provided

Co-authored-by: Niels Bantilan <[email protected]>
  • Loading branch information
TColl and cosmicBboy committed Jul 22, 2021
1 parent f668c3d commit 3a1436a
Show file tree
Hide file tree
Showing 5 changed files with 493 additions and 6 deletions.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- pyyaml >=5.1
- typing_inspect >= 0.6.0
- typing_extensions >= 3.7.4.3
- frictionless

# testing and dependencies
- black >= 20.8b1
Expand Down
217 changes: 212 additions & 5 deletions pandera/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@
from collections.abc import Mapping
from functools import partial
from pathlib import Path
from typing import Dict, Optional, Union

import pandas as pd

import pandera.errors

from .checks import Check
from .dtypes import PandasDtype
from .schema_components import Column
from .schema_statistics import get_dataframe_schema_statistics
from .schemas import DataFrameSchema

try:
import black
import yaml
from frictionless import Schema as FrictionlessSchema
except ImportError as exc: # pragma: no cover
raise ImportError(
'IO and formatting requires "pyyaml" and "black" to be installed. \n'
"You can install pandera together with the IO dependencies with: \n"
"IO and formatting requires 'pyyaml', 'black' and 'frictionless'"
"to be installed.\n"
"You can install pandera together with the IO dependencies with:\n"
"pip install pandera[io]\n"
) from exc

Expand Down Expand Up @@ -156,8 +162,6 @@ def handle_stat_dtype(stat):


def _deserialize_component_stats(serialized_component_stats):
from pandera import Check # pylint: disable=import-outside-toplevel

pandas_dtype = serialized_component_stats.get("pandas_dtype")
if pandas_dtype:
pandas_dtype = PandasDtype.from_str_alias(pandas_dtype)
Expand Down Expand Up @@ -190,7 +194,7 @@ def _deserialize_component_stats(serialized_component_stats):

def _deserialize_schema(serialized_schema):
# pylint: disable=import-outside-toplevel
from pandera import Check, Column, DataFrameSchema, Index, MultiIndex
from pandera import Index, MultiIndex

# GH#475
serialized_schema = serialized_schema if serialized_schema else {}
Expand Down Expand Up @@ -417,3 +421,206 @@ def to_script(dataframe_schema, path_or_buf=None):

with Path(path_or_buf).open("w") as f:
f.write(formatted_script)


class FrictionlessFieldParser:
"""Parses frictionless data schema field specifications so we can convert
them to an equivalent :class:`pandera.schema_components.Column` schema.
For this implementation, we are using field names, constraints and types
but leaving other frictionless parameters out (e.g. foreign keys, type
formats, titles, descriptions).
:param field: a field object from a frictionless schema.
:primary_keys: the primary keys from a frictionless schema. These are used
to ensure primary key fields are treated properly - no duplicates,
no missing values etc.
"""

def __init__(self, field, primary_keys) -> None:
self.constraints = field.constraints or {}
self.name = field.name
self.is_a_primary_key = self.name in primary_keys
self.type = field.get("type", "string")

@property
def pandas_dtype(self) -> str:
"""Determine what type of field this is, so we can feed that into
:class:`~pandera.dtypes.PandasDtype`. If no type is specified in the
frictionless schema, we default to string values.
:returns: the pandas-compatible representation of this field type as a
string.
"""
types = {
"string": "string",
"number": "float",
"integer": "int",
"boolean": "bool",
"object": "object",
"array": "object",
"date": "string",
"time": "string",
"datetime": "datetime64[ns]",
"year": "int",
"yearmonth": "string",
"duration": "timedelta64[ns]",
"geopoint": "object",
"geojson": "object",
"any": "string",
}
return (
"category"
if self.constraints.get("enum", None)
else types[self.type]
)

@property
def checks(self) -> Optional[Dict]:
"""Convert a set of frictionless schema field constraints into checks.
This parses the standard set of frictionless constraints which can be
found
`here <https://specs.frictionlessdata.io/table-schema/#constraints>`_
and maps them into the equivalent pandera checks.
:returns: a dictionary of pandera :class:`pandera.checks.Check`
objects which capture the standard constraint logic of a
frictionless schema field.
"""
if not self.constraints:
return None
constraints = self.constraints.copy()
checks = {}

def _combine_constraints(check_name, min_constraint, max_constraint):
"""Catches bounded constraints where we need to combine a min and max
pair of constraints into a single check."""
if min_constraint in constraints and max_constraint in constraints:
checks[check_name] = {
"min_value": constraints.pop(min_constraint),
"max_value": constraints.pop(max_constraint),
}

_combine_constraints("in_range", "minimum", "maximum")
_combine_constraints("str_length", "minLength", "maxLength")

for constraint_type, constraint_value in constraints.items():
if constraint_type == "maximum":
checks["less_than_or_equal_to"] = constraint_value
elif constraint_type == "minimum":
checks["greater_than_or_equal_to"] = constraint_value
elif constraint_type == "maxLength":
checks["str_length"] = {
"min_value": None,
"max_value": constraint_value,
}
elif constraint_type == "minLength":
checks["str_length"] = {
"min_value": constraint_value,
"max_value": None,
}
elif constraint_type == "pattern":
checks["str_matches"] = rf"^{constraint_value}$"
elif constraint_type == "enum":
checks["isin"] = constraint_value
return checks or None

@property
def nullable(self) -> bool:
"""Determine whether this field can contain missing values."""
if self.is_a_primary_key:
return False
return not self.constraints.get("required", False)

@property
def allow_duplicates(self) -> bool:
"""Determine whether this field can contain duplicate values."""
if self.is_a_primary_key:
return False
return not self.constraints.get("unique", False)

@property
def coerce(self) -> bool:
"""Determine whether values within this field should be coerced."""
return True

@property
def required(self) -> bool:
"""Determine whether this field must exist within the data."""
return True

@property
def regex(self) -> bool:
"""Determine whether this field name should be used for regex matches."""
return False

def to_pandera_column(self) -> Dict:
"""Export this field to a column spec dictionary."""
return {
"allow_duplicates": self.allow_duplicates,
"checks": self.checks,
"coerce": self.coerce,
"nullable": self.nullable,
"pandas_dtype": self.pandas_dtype,
"required": self.required,
"name": self.name,
"regex": self.regex,
}


def from_frictionless_schema(
schema: Union[str, Path, Dict, FrictionlessSchema]
) -> DataFrameSchema:
"""Create a :class:`~pandera.schemas.DataFrameSchema` from a frictionless
json/yaml schema file on disk, or a frictionless schema already loaded
into memory.
Each field from the frictionless schema will be converted to a pandera
column specification using :class:`~pandera.io.FrictionlessFieldParser`
to map field characteristics to pandera column specifications.
:param schema: the frictionless schema object (or a
string/Path to the location on disk of a schema specification) to
parse.
:returns: dataframe schema with frictionless field specs converted to
pandera column checks and constraints for use as normal.
:example:
>>> from pandera.io import from_frictionless_schema
>>>
>>> FRICTIONLESS_SCHEMA = {
... "fields": [
... {
... "name": "column_1",
... "type": "integer",
... "constraints": {"minimum": 10, "maximum": 99}
... }
... ],
... "primaryKey": "column_1"
... }
>>> schema = from_frictionless_schema(FRICTIONLESS_SCHEMA)
>>> schema.columns["column_1"].checks
[<Check in_range: in_range(10, 99)>]
>>> schema.columns["column_1"].required
True
>>> schema.columns["column_1"].allow_duplicates
False
"""
if not isinstance(schema, FrictionlessSchema):
schema = FrictionlessSchema(schema)

assembled_schema = {
"columns": {
field.name: FrictionlessFieldParser(
field, schema.primary_key
).to_pandera_column()
for field in schema.fields
},
"index": None,
"checks": None,
"coerce": True,
"strict": True,
}
return _deserialize_schema(assembled_schema)
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ wrapt
pyyaml >=5.1
typing_inspect >= 0.6.0
typing_extensions >= 3.7.4.3
frictionless
black >= 20.8b1
isort >= 5.7.0
codecov
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
_extras_require = {
"strategies": ["hypothesis >= 5.41.1"],
"hypotheses": ["scipy"],
"io": ["pyyaml >= 5.1", "black"],
"io": ["pyyaml >= 5.1", "black", "frictionless"],
}
extras_require = {
**_extras_require,
Expand Down
Loading

0 comments on commit 3a1436a

Please sign in to comment.