Using pyArrow/Pandas to read CSV, JSONL and Parquet ensuring that you get the best representation of the column types in the resulting Pandas dataframe. Also ensures data type conformance by maintaining column types when reading and writing data back into Pandas (even when round tripping across multiple data types).
This package also can read in data given an MoJ-Metadata schema again to result in a Pandas dataframe that best represents the provided schema.
Can also be used to write data back to supported formats using Pandas (for CSV/JSONL) and PyArrow (for Parquet).
pip install arrow-pd-parser
or via GitHub
pip install arrow-pd-parser @ git+https://github.com/moj-analytical-services/mojap-arrow-pd-parser
This package uses PyArrow
and/or Pandas
to parse CSVs, JSONL and Parquet files and convert them to a Pandas Dataframe that are the best representation of those datatypes and ensure conformance between them. Also can write data back into the above formats to still maintain conformance to the provided schema.
from arrow_pd_parser import reader, writer
df1 = reader.read("tests/data/all_types.csv")
df1.dtypes
# i Int64
# my_bool boolean
# my_nullable_bool boolean
# my_date object
# my_datetime object
# my_int Int64
# my_string string
df2 = reader.read("tests/data/all_types.jsonl")
df2.dtypes # Note that Pandas struggles with nullable booleans when it reads JSONL (hence it is an Int64)
# Write the dataframe to parquet
# note deafult settings for the parquet writer is to
# compress using snappy. (compression is not inferred from filepath but may do in future releases)
writer.write(df1, "new_output.snappy.parquet")
Note that the default behavior of this package is to utilise the new pandas datatypes for Integers, Booleans and Strings that represent Nulls as pd.NA()
. Dates are returned as nullable objects of datetime.date()
type and timestamps are datetime.datetime()
. By default we enforce these types instead of the native pandas timestamp as the indexing for the Pandas timestamp is nanoseconds and can cause dates to be out of bounds. See the timestamps section for more details.
The reader.read()
method will infer what the file format is based on the extension of your filepath failing that it will take the format from your metadata if provided. By default the reader.read() will use the following readers for the prescribed file format.
Data Type | Reader | Writer |
---|---|---|
CSV | Pandas | Pandas |
JSONL | Pandas | Pandas |
Parquet | Arrow | Arrow |
You can also specify the file format in the reader and writer function or specify the reader type:
from arrow_pd_parser import reader, writer
# Specifying the reader
# Both reader statements are equivalent and call the same readers
# under the hood
df1 = reader.read("tests/data/all_types.csv", file_format="csv")
df2 = reader.csv.read("tests/data/all_types.csv")
# You can also pass the reader args to the reader as kwargs
df3 = reader.csv.read("tests/data/all_types.csv", nrows = 2)
# The writer API has the same functionality
writer.write(df1, file_format="parquet")
writer.parquet.write(df1)
The main usefulness of this repo comes from specifying metadata for the data you want. We are going to focus on CSV and JSONL in this section as Parquet becomes a bit more nuanced as it encodes data types (unlike CSV and JSONL that can be more vague).
When reading and writing you can specify metadata (using our metadata schema definitions) to ensure the data read into pandas conforms.
from arrow_pd_parser import reader
# If no meta is provided we let the reader infer type
df1 = reader.read("tests/data/all_types.csv")
df1.dtypes
# i Int64
# my_bool boolean
# my_nullable_bool boolean
# my_date object
# my_datetime object
# my_int Int64
# my_string string
# If metadata is provided we ensure conformance
meta = {
"columns": [
{"name": "my_bool", "type": "string"},
{"name": "my_nullable_bool", "type": "bool"},
{"name": "my_date", "type": "string"},
{"name": "my_datetime", "type": "string"},
{"name": "my_int", "type": "float64"},
{"name": "my_string", "type": "string"}
]
}
df2 = reader.read("tests/data/all_types.csv", metadata=meta)
df2.dtypes
# i Int64
# my_bool string
# my_nullable_bool boolean
# my_date string
# my_datetime string
# my_int float64
# my_string string
df3 = reader.read("tests/data/all_types.jsonl", metadata=meta)
df3.dtypes
assert df2.dtypes.to_list() == df3.dtypes.to_list()
As of the v1 release we expect the API for reading and writing will remain the same but will still be lacking in how the caster works (what is called under the hood when casting the data to the prescribed metadata). The caster should improve with subsequent releases.
When using the reader.read()
or writer.write()
it pulls the default method with their default settings. However, if you want to customise your reader/writer you can do.
You can pass arguments to the underlying reader that is used to read data in. In the example below we use the nrows arg in pd.pd.read_csv
that is used for our underlying reader.
from arrow_pd_parser import reader
# Passing args to the read function.
df = reader.csv.read("tests/data/all_types.jsonl", nrows=1000)
The readers and writers have some settings that you can use to configure how it reads/writes data. One of the main settings is how we deal with Pandas types. We default to the new pandas Series types: StringDtype
for string
, BooleanDtype
for bool
and Int64Dtype
for integer
. We also force dates and timestamps to be a series of objects (see the Timestamp section below)[#Reader-Pandas-Timestamps]. To change what pandas types to use you can change the reader settings:
from arrow_pd_parser.reader import csv
from io import StringIO
csv.pd_integer = False
csv.pd_string = False
csv.bool_map = {"Yes": True, "No": False}
data = """
int_col,str_col,bool_col
1,"Hello, mate",Yes
2,Hi,No
"""
meta = {
"columns": [
{"name": "int_col", "type": "int64"},
{"name": "str_col", "type": "string"},
{"name": "bool_col", "type": "bool"},
]
}
f = StringIO(data)
df = csv.read(f, metadata = meta)
df.dtypes
# int_col int64
# str_col object
# bool_col boolean
#### Using your own reader
If you wanted to create your own instance of a reader that wasn't the default provided you can.
from arrow_pd_parser.reader import PandasCsvReader, csv
# But you can also create your own and init the settings
specific_csv = PandasCsvReader(
pd_integer = False,
pd_string = False,
bool_map = {"Yes": True, "No": False}
)
# This may come in handy if you want to have two instances of a CSV reader with different settings
df1 = csv.read("tests/data/all_types.csv") # default reader
df2 = specific_csv.read("tests/data/all_types.csv")
The default parquet reader and writer uses Arrow under the hood. This also means it uses a different method for casting datatypes (it is done in Arrow not with Pandas). Because parquet is stricter, most likely if there is a deviation from your metadata it will fail to cast. When you provide metadata to the reader and writer, the API forces the data to that metadata. This makes sense for CSV and JSONL, but may make less sense for parquet. Therefore when using the parquet reader and writer you might want to consider if you want to provide your metadata. This may still be advantagous to check that the parquet matches metadata schema you have for the data.
In some cases (see this issue) pyarrow will silently convert some data types when writing a parquet file, overriding the provided metadata. The writer
will raise a warning if this happens, and you may wish to update your metdata so it matches the type that arrow expects.
from arrow_pd_parser import reader, writer
# Note all readers (CSV, JSONL and Parquet) support S3 paths in the following way
df = reader.parquet.read("s3://bucket/in.snappy.parquet")
writer.parquet.write("s3://bucket/out.snappy.parquet")
print(writer.parquet.compression, writer.parquet.version)
# The default settings for the Parquet writer are snappy compression and version = '2.0'
# These are passed to the arrow parquet write method but can either be changed by setting the
# properties or by passing them as kwargs (which will super seed the default properties) e.g.
writer.parquet.write("s3://bucket/out.parquet", compression=None, version='1.0')
When metadata is provided (or for all Arrow Readers without metadata) we default to dates and datetimes as a series of objects rather than the Pandas timestamps this is because Pandas timestamps (currently) only support nanosecond resolution which is not ideal for a lot of timestamps as the range can be often too small.
import pandas as pd
pd.Timestamp.min # Timestamp('1677-09-22 00:12:43.145225')
pd.Timestamp.max # Timestamp('2262-04-11 23:47:16.854775807')
Whereas Spark 3.0 (for example) allows timestamps from 0001-01-01 00:00:00
to 9999-12-31 23:59:59.999999
(source). By default we do not allow Timestamps for this reason instead we use the python native datetime class as default for our types (wrapped in a Pandas object column type). Users can specify other Pandas date/timestamp using the pd_timestamp_type
parameter which can either be object
(default), pd_timestamp
or pd_period
.
When setting pd_timestamp_type=pd_period
pd_arrow_parser will identify the correct pandas period resolution based on the arrow column type.
from arrow_pd_parser.reader import csv
# The Pandas Period
csv.pd_timestamp_type="pd_period"
df = csv.read("tests/data/datetime_type.csv")
df.my_datetime.dtype # "period[S]"
# Use pandas timestamp
csv.pd_timestamp_type="pd_timestamp"
df = csv.read("tests/data/datetime_type.csv")
df.my_datetime.dtype # dtype('<M8[ns]')
Datasets that are too large to fit into memory can be read in chunks. If the chunksize
parameter is given to reader.read
then an iterator of dataframes is returned rather than a single dataframe. chunksize
can be an integer indicating the number of rows each chunk contains, or a string indicating the amount of memory each chunk should fill, e.g. "1 GB". Note that the memory size should not fill available memory as some overhead is required for reading and writing. The writer.write
function can then use these iterators instead of a dataframe.
from arrow_pd_parser import reader, writer
df_iter = reader.read("s3://my_bucket/csv_data/my_table.csv", chunksize=10000)
writer.write(df_iter, "s3://my_bucket/parquet_data/my_table.parquet")
If the dataframe needs transforming before writing then use a generator.
from arrow_pd_parser import reader, writer
def transform(df):
# do something
return df
df_iter = reader.read("s3://my_bucket/csv_data/my_table.csv", chunksize="500MB")
# Using a generator comprehension
df_transformed_iter = (transform(df) for df in df_iter)
writer.write(df_transformed_iter, "s3://my_bucket/parquet_data/my_transformed_table.parquet")