diff --git a/environment.yml b/environment.yml index b4da34088..f127a28ab 100644 --- a/environment.yml +++ b/environment.yml @@ -16,6 +16,7 @@ dependencies: - pyyaml >=5.1 - typing_inspect >= 0.6.0 - typing_extensions >= 3.7.4.3 + - frictionless # testing and dependencies - black >= 20.8b1 diff --git a/pandera/io.py b/pandera/io.py index bff044cdf..8f9506eb5 100644 --- a/pandera/io.py +++ b/pandera/io.py @@ -3,19 +3,25 @@ import warnings from functools import partial from pathlib import Path +from typing import Dict, Optional, Union import pandas as pd +from .checks import Check from .dtypes import PandasDtype +from .schema_components import Column from .schema_statistics import get_dataframe_schema_statistics +from .schemas import DataFrameSchema try: import black import yaml + from frictionless import Schema as FrictionlessSchema except ImportError as exc: # pragma: no cover raise ImportError( - 'IO and formatting requires "pyyaml" and "black" to be installed. \n' - "You can install pandera together with the IO dependencies with: \n" + "IO and formatting requires 'pyyaml', 'black' and 'frictionless'" + "to be installed.\n" + "You can install pandera together with the IO dependencies with:\n" "pip install pandera[io]\n" ) from exc @@ -152,8 +158,6 @@ def handle_stat_dtype(stat): def _deserialize_component_stats(serialized_component_stats): - from pandera import Check # pylint: disable=import-outside-toplevel - pandas_dtype = serialized_component_stats.get("pandas_dtype") if pandas_dtype: pandas_dtype = PandasDtype.from_str_alias(pandas_dtype) @@ -188,7 +192,7 @@ def _deserialize_component_stats(serialized_component_stats): def _deserialize_schema(serialized_schema): # pylint: disable=import-outside-toplevel - from pandera import Check, Column, DataFrameSchema, Index, MultiIndex + from pandera import Index, MultiIndex columns, index, checks = None, None, None if serialized_schema["columns"] is not None: @@ -309,8 +313,7 @@ def _format_checks(checks_dict): ) else: args = ", ".join( - "{}={}".format(k, v.__repr__()) - for k, v in check_kwargs.items() + f"{k}={v.__repr__()}" for k, v in check_kwargs.items() ) checks.append(f"Check.{check_name}({args})") return f"[{', '.join(checks)}]" @@ -381,7 +384,7 @@ def to_script(dataframe_schema, path_or_buf=None): else _format_index(statistics["index"]) ) - column_str = ", ".join("'{}': {}".format(k, v) for k, v in columns.items()) + column_str = ", ".join(f"'{k}': {v}" for k, v in columns.items()) script = SCRIPT_TEMPLATE.format( columns=column_str, @@ -404,3 +407,206 @@ def to_script(dataframe_schema, path_or_buf=None): with Path(path_or_buf).open("w") as f: f.write(formatted_script) + + +class FrictionlessFieldParser: + """Parses frictionless data schema field specifications so we can convert + them to an equivalent :class:`pandera.schema_components.Column` schema. + + For this implementation, we are using field names, constraints and types + but leaving other frictionless parameters out (e.g. foreign keys, type + formats, titles, descriptions). + + :param field: a field object from a frictionless schema. + :primary_keys: the primary keys from a frictionless schema. These are used + to ensure primary key fields are treated properly - no duplicates, + no missing values etc. + """ + + def __init__(self, field, primary_keys) -> None: + self.constraints = field.constraints or {} + self.name = field.name + self.is_a_primary_key = self.name in primary_keys + self.type = field.get("type", "string") + + @property + def pandas_dtype(self) -> str: + """Determine what type of field this is, so we can feed that into + :class:`~pandera.dtypes.PandasDtype`. If no type is specified in the + frictionless schema, we default to string values. + + :returns: the pandas-compatible representation of this field type as a + string. + """ + types = { + "string": "string", + "number": "float", + "integer": "int", + "boolean": "bool", + "object": "object", + "array": "object", + "date": "string", + "time": "string", + "datetime": "datetime64[ns]", + "year": "int", + "yearmonth": "string", + "duration": "timedelta64[ns]", + "geopoint": "object", + "geojson": "object", + "any": "string", + } + return ( + "category" + if self.constraints.get("enum", None) + else types[self.type] + ) + + @property + def checks(self) -> Optional[Dict]: + """Convert a set of frictionless schema field constraints into checks. + + This parses the standard set of frictionless constraints which can be + found + `here `_ + and maps them into the equivalent pandera checks. + + :returns: a dictionary of pandera :class:`pandera.checks.Check` + objects which capture the standard constraint logic of a + frictionless schema field. + """ + if not self.constraints: + return None + constraints = self.constraints.copy() + checks = {} + + def _combine_constraints(check_name, min_constraint, max_constraint): + """Catches bounded constraints where we need to combine a min and max + pair of constraints into a single check.""" + if min_constraint in constraints and max_constraint in constraints: + checks[check_name] = { + "min_value": constraints.pop(min_constraint), + "max_value": constraints.pop(max_constraint), + } + + _combine_constraints("in_range", "minimum", "maximum") + _combine_constraints("str_length", "minLength", "maxLength") + + for constraint_type, constraint_value in constraints.items(): + if constraint_type == "maximum": + checks["less_than_or_equal_to"] = constraint_value + elif constraint_type == "minimum": + checks["greater_than_or_equal_to"] = constraint_value + elif constraint_type == "maxLength": + checks["str_length"] = { + "min_value": None, + "max_value": constraint_value, + } + elif constraint_type == "minLength": + checks["str_length"] = { + "min_value": constraint_value, + "max_value": None, + } + elif constraint_type == "pattern": + checks["str_matches"] = rf"^{constraint_value}$" + elif constraint_type == "enum": + checks["isin"] = constraint_value + return checks or None + + @property + def nullable(self) -> bool: + """Determine whether this field can contain missing values.""" + if self.is_a_primary_key: + return False + return not self.constraints.get("required", False) + + @property + def allow_duplicates(self) -> bool: + """Determine whether this field can contain duplicate values.""" + if self.is_a_primary_key: + return False + return not self.constraints.get("unique", False) + + @property + def coerce(self) -> bool: + """Determine whether values within this field should be coerced.""" + return True + + @property + def required(self) -> bool: + """Determine whether this field must exist within the data.""" + return True + + @property + def regex(self) -> bool: + """Determine whether this field name should be used for regex matches.""" + return False + + def to_pandera_column(self) -> Dict: + """Export this field to a column spec dictionary.""" + return { + "allow_duplicates": self.allow_duplicates, + "checks": self.checks, + "coerce": self.coerce, + "nullable": self.nullable, + "pandas_dtype": self.pandas_dtype, + "required": self.required, + "name": self.name, + "regex": self.regex, + } + + +def from_frictionless_schema( + schema: Union[str, Path, Dict, FrictionlessSchema] +) -> DataFrameSchema: + """Create a :class:`~pandera.schemas.DataFrameSchema` from a frictionless + json/yaml schema file on disk, or a frictionless schema already loaded + into memory. + + Each field from the frictionless schema will be converted to a pandera + column specification using :class:`~pandera.io.FrictionlessFieldParser` + to map field characteristics to pandera column specifications. + + :param schema: the frictionless schema object (or a + string/Path to the location on disk of a schema specification) to + parse. + :returns: dataframe schema with frictionless field specs converted to + pandera column checks and constraints for use as normal. + + :example: + + >>> from pandera.io import from_frictionless_schema + >>> + >>> FRICTIONLESS_SCHEMA = { + ... "fields": [ + ... { + ... "name": "column_1", + ... "type": "integer", + ... "constraints": {"minimum": 10, "maximum": 99} + ... } + ... ], + ... "primaryKey": "column_1" + ... } + >>> schema = from_frictionless_schema(FRICTIONLESS_SCHEMA) + >>> schema.columns["column_1"].checks + [] + >>> schema.columns["column_1"].required + True + >>> schema.columns["column_1"].allow_duplicates + False + """ + if not isinstance(schema, FrictionlessSchema): + schema = FrictionlessSchema(schema) + + assembled_schema = { + "columns": { + field.name: FrictionlessFieldParser( + field, schema.primary_key + ).to_pandera_column() + for field in schema.fields + }, + "index": None, + "checks": None, + "coerce": True, + "strict": True, + } + return _deserialize_schema(assembled_schema) diff --git a/requirements-dev.txt b/requirements-dev.txt index 7a56dc0f1..c5522879e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -11,6 +11,7 @@ wrapt pyyaml >=5.1 typing_inspect >= 0.6.0 typing_extensions >= 3.7.4.3 +frictionless black >= 20.8b1 isort >= 5.7.0 codecov diff --git a/setup.py b/setup.py index f61101613..915a1669c 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ _extras_require = { "strategies": ["hypothesis >= 5.41.1"], "hypotheses": ["scipy"], - "io": ["pyyaml >= 5.1", "black"], + "io": ["pyyaml >= 5.1", "black", "frictionless"], } extras_require = { **_extras_require, diff --git a/tests/io/test_io.py b/tests/io/test_io.py index 0c8b33f84..3b9f70351 100644 --- a/tests/io/test_io.py +++ b/tests/io/test_io.py @@ -600,3 +600,281 @@ def unregistered_check(self, _): with pytest.warns(UserWarning, match=".*registered checks.*"): CheckedSchemaModel.to_yaml() + + +FRICTIONLESS_YAML = yaml.safe_load( + """ +fields: + - constraints: + maximum: 99 + minimum: 10 + name: integer_col + type: integer + - constraints: + maximum: 30 + name: integer_col_2 + type: integer + - constraints: + maxLength: 80 + minLength: 3 + name: string_col + - constraints: + pattern: \\d{3}[A-Z] + name: string_col_2 + - constraints: + minLength: 3 + name: string_col_3 + - constraints: + maxLength: 3 + name: string_col_4 + - constraints: + enum: + - 1.0 + - 2.0 + - 3.0 + required: true + name: float_col + type: number + - constraints: + name: float_col_2 + type: number + - constraints: + minimum: "20201231" + name: date_col +primaryKey: integer_col +""" +) + +FRICTIONLESS_JSON = { + "fields": [ + { + "name": "integer_col", + "type": "integer", + "constraints": {"minimum": 10, "maximum": 99}, + }, + { + "name": "integer_col_2", + "type": "integer", + "constraints": {"maximum": 30}, + }, + { + "name": "string_col", + "constraints": {"maxLength": 80, "minLength": 3}, + }, + { + "name": "string_col_2", + "constraints": {"pattern": r"\d{3}[A-Z]"}, + }, + { + "name": "string_col_3", + "constraints": {"minLength": 3}, + }, + { + "name": "string_col_4", + "constraints": {"maxLength": 3}, + }, + { + "name": "float_col", + "type": "number", + "constraints": {"enum": [1.0, 2.0, 3.0], "required": True}, + }, + { + "name": "float_col_2", + "type": "number", + }, + { + "name": "date_col", + "type": "date", + "constraints": {"minimum": "20201231"}, + }, + ], + "primaryKey": "integer_col", +} + +# pandas dtype aliases to support testing across multiple pandas versions: +STR_DTYPE = pa.dtypes.PandasDtype.from_str_alias("string").value +STR_DTYPE_ALIAS = pa.dtypes.PandasDtype.from_str_alias("string").str_alias +INT_DTYPE = pa.dtypes.PandasDtype.from_str_alias("int").value +INT_DTYPE_ALIAS = pa.dtypes.PandasDtype.from_str_alias("int").str_alias + +YAML_FROM_FRICTIONLESS = f""" +schema_type: dataframe +version: {pa.__version__} +columns: + integer_col: + pandas_dtype: {INT_DTYPE} + nullable: false + checks: + in_range: + min_value: 10 + max_value: 99 + allow_duplicates: false + coerce: true + required: true + regex: false + integer_col_2: + pandas_dtype: {INT_DTYPE} + nullable: true + checks: + less_than_or_equal_to: 30 + allow_duplicates: true + coerce: true + required: true + regex: false + string_col: + pandas_dtype: {STR_DTYPE} + nullable: true + checks: + str_length: + min_value: 3 + max_value: 80 + allow_duplicates: true + coerce: true + required: true + regex: false + string_col_2: + pandas_dtype: {STR_DTYPE} + nullable: true + checks: + str_matches: ^\\d{{3}}[A-Z]$ + allow_duplicates: true + coerce: true + required: true + regex: false + string_col_3: + pandas_dtype: {STR_DTYPE} + nullable: true + checks: + str_length: 3 + allow_duplicates: true + coerce: true + required: true + regex: false + string_col_4: + pandas_dtype: {STR_DTYPE} + nullable: true + checks: + str_length: 3 + allow_duplicates: true + coerce: true + required: true + regex: false + float_col: + pandas_dtype: category + nullable: false + checks: + isin: + - 1.0 + - 2.0 + - 3.0 + allow_duplicates: true + coerce: true + required: true + regex: false + float_col_2: + pandas_dtype: float + nullable: true + checks: null + allow_duplicates: true + coerce: true + required: true + regex: false + date_col: + pandas_dtype: {STR_DTYPE} + nullable: true + checks: + greater_than_or_equal_to: '20201231' + allow_duplicates: true + coerce: true + required: true + regex: false +checks: null +index: null +coerce: true +strict: true +""" + +VALID_FRICTIONLESS_DF = pd.DataFrame( + { + "integer_col": [10, 11, 12, 13, 14], + "integer_col_2": [1, 2, 3, 3, 1], + "string_col": ["aaa", None, "ccc", "ddd", "eee"], + "string_col_2": ["123A", "456B", None, "789C", "101D"], + "string_col_3": ["123ABC", "456B", None, "78a9C", "1A3F01D"], + "string_col_4": ["23A", "46B", None, "78C", "1D"], + "float_col": [1.0, 1.0, 1.0, 2.0, 3.0], + "float_col_2": [1, 1, None, 2, 3], + "date_col": [ + "20210101", + "20210102", + "20210103", + "20210104", + "20210105", + ], + } +) + +INVALID_FRICTIONLESS_DF = pd.DataFrame( + { + "integer_col": [1, 180, 12, 12, 18], + "integer_col_2": [10, 11, 12, 113, 14], + "string_col": ["a", "bbb", "ccc", "d" * 100, "eee"], + "string_col_2": ["123A", "456B", None, "789c", "101D"], + "string_col_3": ["1A", "456B", None, "789c", "101D"], + "string_col_4": ["123A", "4B", None, "c", "1D"], + "float_col": [1.0, 1.1, None, 3.0, 3.8], + "float_col_2": ["a", 1, None, 3.0, 3.8], + "unexpected_column": [1, 2, 3, 4, 5], + } +) + + +@pytest.mark.parametrize( + "frictionless_schema", [FRICTIONLESS_YAML, FRICTIONLESS_JSON] +) +def test_frictionless_schema_parses_correctly(frictionless_schema): + """Test parsing frictionless schema from yaml and json.""" + schema = pa.io.from_frictionless_schema(frictionless_schema) + + assert str(schema.to_yaml()).strip() == YAML_FROM_FRICTIONLESS.strip() + + assert isinstance( + schema, pa.schemas.DataFrameSchema + ), "schema object not loaded successfully" + + df = schema.validate(VALID_FRICTIONLESS_DF) + assert dict(df.dtypes) == { + "integer_col": INT_DTYPE_ALIAS, + "integer_col_2": INT_DTYPE_ALIAS, + "string_col": STR_DTYPE_ALIAS, + "string_col_2": STR_DTYPE_ALIAS, + "string_col_3": STR_DTYPE_ALIAS, + "string_col_4": STR_DTYPE_ALIAS, + "float_col": pd.CategoricalDtype( + categories=[1.0, 2.0, 3.0], ordered=False + ), + "float_col_2": "float64", + "date_col": STR_DTYPE_ALIAS, + }, "dtypes not parsed correctly from frictionless schema" + + with pytest.raises(pa.errors.SchemaErrors) as err: + schema.validate(INVALID_FRICTIONLESS_DF, lazy=True) + # check we're capturing all errors according to the frictionless schema: + assert err.value.failure_cases[["check", "failure_case"]].fillna( + "NaN" + ).to_dict(orient="records") == [ + {"check": "column_in_schema", "failure_case": "unexpected_column"}, + {"check": "column_in_dataframe", "failure_case": "date_col"}, + {"check": "coerce_dtype('float64')", "failure_case": "object"}, + {"check": "no_duplicates", "failure_case": 12}, + {"check": "less_than_or_equal_to(30)", "failure_case": 113}, + {"check": "str_length(3, 80)", "failure_case": "a"}, + {"check": "str_length(3, 80)", "failure_case": "d" * 100}, + { + "check": "str_matches(re.compile('^\\\\d{3}[A-Z]$'))", + "failure_case": "789c", + }, + {"check": "str_length(3, None)", "failure_case": "1A"}, + {"check": "str_length(None, 3)", "failure_case": "123A"}, + {"check": "not_nullable", "failure_case": "NaN"}, + ], "validation failure cases not as expected"