Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement dataframe types #672

Merged
merged 3 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<hr>

*A data validation library for scientists, engineers, and analysts seeking
*A dataframe validation library for scientists, engineers, and analysts seeking
correctness.*

<br>
Expand All @@ -22,10 +22,18 @@ correctness.*
[![Downloads](https://pepy.tech/badge/pandera/month)](https://pepy.tech/project/pandera)
[![Downloads](https://pepy.tech/badge/pandera)](https://pepy.tech/project/pandera)

`pandas` data structures contain information that `pandera` explicitly
validates at runtime. This is useful in production-critical or reproducible
research settings. With `pandera`, you can:
`pandera` provides a flexible and expressive API for performing data
validation on dataframes to make data processing pipelines more readable and
robust.

Dataframes contain information that `pandera` explicitly validates at runtime.
This is useful in production-critical or reproducible research settings. With
`pandera`, you can:

1. Define a schema once and use it to validate
[different dataframe types](https://pandera.readthedocs.io/en/stable/supported_libraries.html)
including [pandas](http://pandas.pydata.org), [dask](https://dask.org),
[modin](https://modin.readthedocs.io/), and [koalas](https://koalas.readthedocs.io).
1. [Check](https://pandera.readthedocs.io/en/stable/checks.html) the types and
properties of columns in a `DataFrame` or values in a `Series`.
1. Perform more complex statistical validation like
Expand All @@ -37,11 +45,11 @@ research settings. With `pandera`, you can:
with pydantic-style syntax and validate dataframes using the typing syntax.
1. [Synthesize data](https://pandera.readthedocs.io/en/stable/data_synthesis_strategies.html#data-synthesis-strategies)
from schema objects for property-based testing with pandas data structures.

`pandera` provides a flexible and expressive API for performing data validation
on tidy (long-form) and wide data to make data processing pipelines more
readable and robust.

1. [Lazily Validate](https://pandera.readthedocs.io/en/stable/lazy_validation.html)
dataframes so that all validation checks are executed before raising an error.
1. [Integrate](https://pandera.readthedocs.io/en/stable/integrations.html) with
a rich ecosystem of python tools like [pydantic](https://pydantic-docs.helpmanual.io)
and [mypy](http://mypy-lang.org/).

## Documentation

Expand Down
8 changes: 7 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
]

doctest_global_setup = """
import platform
import sys
import pandas as pd
import numpy as np
Expand All @@ -76,6 +77,8 @@
SKIP_PANDAS_LT_V1 = version.parse(pd.__version__).release < (1, 0) or PY36
SKIP_SCALING = True
SKIP_SCHEMA_MODEL = SKIP_PANDAS_LT_V1 or KOALAS_INSTALLED
SKIP_MODIN = platform.system() == "Windows"

"""

doctest_default_flags = (
Expand Down Expand Up @@ -175,7 +178,10 @@
intersphinx_mapping = {
"python": ("https://docs.python.org/3/", None),
"numpy": ("https://docs.scipy.org/doc/numpy/", None),
"pandas": ("http://pandas.pydata.org/pandas-docs/stable/", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None),
"dask": ("https://docs.dask.org/en/latest/", None),
"koalas": ("https://koalas.readthedocs.io/en/latest/", None),
"modin": ("https://modin.readthedocs.io/en/latest/", None),
}

# strip prompts
Expand Down
134 changes: 134 additions & 0 deletions docs/source/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
.. currentmodule:: pandera

.. _scaling_dask:

Data Validation with Dask
=========================

*new in 0.8.0*

`Dask <https://docs.dask.org/en/latest/dataframe.html>`__ is a distributed
compute framework that offers a pandas-like dataframe API.
You can use pandera to validate :py:func:`~dask.dataframe.DataFrame`
and :py:func:`~dask.dataframe.Series` objects directly. First, install
``pandera`` with the ``dask`` extra:

.. code:: bash

pip install pandera[dask]


Then you can use pandera schemas to validate dask dataframes. In the example
below we'll use the :ref:`class-based API <schema_models>` to define a
:py:class:`SchemaModel` for validation.

.. testcode:: scaling_dask

import dask.dataframe as dd
import pandas as pd
import pandera as pa

from pandera.typing.dask import DataFrame, Series


class Schema(pa.SchemaModel):
state: Series[str]
city: Series[str]
price: Series[int] = pa.Field(in_range={"min_value": 5, "max_value": 20})


ddf = dd.from_pandas(
pd.DataFrame(
{
'state': ['FL','FL','FL','CA','CA','CA'],
'city': [
'Orlando',
'Miami',
'Tampa',
'San Francisco',
'Los Angeles',
'San Diego',
],
'price': [8, 12, 10, 16, 20, 18],
}
),
npartitions=2
)
pandera_ddf = Schema(ddf)

print(pandera_ddf)


.. testoutput:: scaling_dask

Dask DataFrame Structure:
state city price
npartitions=2
0 object object int64
3 ... ... ...
5 ... ... ...
Dask Name: validate, 4 tasks


As you can see, passing the dask dataframe into ``Schema`` will produce
another dask dataframe which hasn't been evaluated yet. What this means is
that pandera will only validate when the dask graph is evaluated.

.. testcode:: scaling_dask

print(pandera_ddf.compute())


.. testoutput:: scaling_dask

state city price
0 FL Orlando 8
1 FL Miami 12
2 FL Tampa 10
3 CA San Francisco 16
4 CA Los Angeles 20
5 CA San Diego 18


You can also use the :py:func:`~pandera.check_types` decorator to validate
dask dataframes at runtime:

.. testcode:: scaling_dask

@pa.check_types
def function(ddf: DataFrame[Schema]) -> DataFrame[Schema]:
return ddf[ddf["state"] == "CA"]

print(function(ddf).compute())


.. testoutput:: scaling_dask

state city price
3 CA San Francisco 16
4 CA Los Angeles 20
5 CA San Diego 18


And of course, you can use the object-based API to validate dask dataframes:


.. testcode:: scaling_dask

schema = pa.DataFrameSchema({
"state": pa.Column(str),
"city": pa.Column(str),
"price": pa.Column(int, pa.Check.in_range(min_value=5, max_value=20))
})
print(schema(ddf).compute())


.. testoutput:: scaling_dask

state city price
0 FL Orlando 8
1 FL Miami 12
2 FL Tampa 10
3 CA San Francisco 16
4 CA Los Angeles 20
5 CA San Diego 18
4 changes: 4 additions & 0 deletions docs/source/dataframe_schemas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ The :class:`~pandera.schemas.DataFrameSchema` object consists of |column|_\s and
coerce=True,
)

You can refer to :ref:`schema_models` to see how to define dataframe schemas
using the alternative pydantic/dataclass-style syntax.


.. _column:

Column Validation
Expand Down
4 changes: 2 additions & 2 deletions docs/source/dtypes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

.. _dtypes:

Pandera Data Types (new)
========================
Pandera Data Types
==================

*new in 0.7.0*

Expand Down
39 changes: 22 additions & 17 deletions docs/source/scaling.rst → docs/source/fugue.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
.. currentmodule:: pandera

.. _scaling:
.. _scaling_fugue:

Scaling Pandera to Big Data
=================================
Data Validation with Fugue
==========================

Validation on big data comes in two forms. The first is performing one set of
validations on data that doesn't fit in memory. The second happens when a large dataset
Expand All @@ -17,8 +17,8 @@ code can be used on top of ``Spark`` or ``Dask`` engines with
to be performed in a distributed setting. ``Fugue`` is an open source abstraction layer that
ports ``Python``, ``pandas``, and ``SQL`` code to ``Spark`` and ``Dask``.

Fugue
-----
What is Fugue?
--------------

``Fugue`` serves as an interface to distributed computing. Because of its non-invasive design,
existing ``Python`` code can be scaled to a distributed setting without significant changes.
Expand All @@ -40,17 +40,22 @@ In this example, a pandas ``DataFrame`` is created with ``state``, ``city`` and
columns. ``Pandera`` will be used to validate that the ``price`` column values are within
a certain range.

.. testcode:: scaling_pandera
.. testcode:: scaling_fugue

import pandas as pd

data = pd.DataFrame({'state': ['FL','FL','FL','CA','CA','CA'],
'city': ['Orlando', 'Miami', 'Tampa',
'San Francisco', 'Los Angeles', 'San Diego'],
'price': [8, 12, 10, 16, 20, 18]})
data = pd.DataFrame(
{
'state': ['FL','FL','FL','CA','CA','CA'],
'city': [
'Orlando', 'Miami', 'Tampa', 'San Francisco', 'Los Angeles', 'San Diego'
],
'price': [8, 12, 10, 16, 20, 18],
}
)
print(data)

.. testoutput:: scaling_pandera
.. testoutput:: scaling_fugue

state city price
0 FL Orlando 8
Expand All @@ -64,7 +69,7 @@ a certain range.
Validation is then applied using pandera. A ``price_validation`` function is
created that runs the validation. None of this will be new.

.. testcode:: scaling_pandera
.. testcode:: scaling_fugue

from pandera import Column, DataFrameSchema, Check

Expand All @@ -85,7 +90,7 @@ to run the code on top of ``Spark``. ``Fugue`` also has a ``DaskExecutionEngine`
the default pandas-based ``ExecutionEngine``. Because the ``SparkExecutionEngine`` is used, the result
becomes a ``Spark DataFrame``.

.. testcode:: scaling_pandera
.. testcode:: scaling_fugue
:skipif: SKIP_SCALING

from fugue import transform
Expand All @@ -94,7 +99,7 @@ becomes a ``Spark DataFrame``.
spark_df = transform(data, price_validation, schema="*", engine=SparkExecutionEngine)
spark_df.show()

.. testoutput:: scaling_pandera
.. testoutput:: scaling_fugue
:skipif: SKIP_SCALING

+-----+-------------+-----+
Expand All @@ -118,7 +123,7 @@ price range for the records with ``state`` FL is lower than the range for the ``
Two :class:`~pandera.schemas.DataFrameSchema` will be created to reflect this. Notice their ranges
for the :class:`~pandera.checks.Check` differ.

.. testcode:: scaling_pandera
.. testcode:: scaling_fugue

price_check_FL = DataFrameSchema({
"price": Column(int, Check.in_range(min_value=7,max_value=13)),
Expand All @@ -139,7 +144,7 @@ To partition our data by ``state``, all we need to do is pass it into the ``tran
through the ``partition`` argument. This splits up the data across different workers before they
each run the ``price_validation`` function. Again, this is like a groupby-validation.

.. testcode:: scaling_pandera
.. testcode:: scaling_fugue
:skipif: SKIP_SCALING

def price_validation(df:pd.DataFrame) -> pd.DataFrame:
Expand All @@ -156,7 +161,7 @@ each run the ``price_validation`` function. Again, this is like a groupby-valida

spark_df.show()

.. testoutput:: scaling_pandera
.. testoutput:: scaling_fugue
:skipif: SKIP_SCALING

SparkDataFrame
Expand Down
Loading