Skip to content

Commit

Permalink
ARROW-2136: [Python] Check null counts for non-nullable fields when c…
Browse files Browse the repository at this point in the history
…onverting from pandas.DataFrame with supplied schema

Author: Wes McKinney <[email protected]>

Closes #4683 from wesm/ARROW-2136 and squashes the following commits:

4bcae1b <Wes McKinney> Check null counts for non-nullable fields when converting from pandas DataFrame with supplied schema
  • Loading branch information
wesm authored and kszucs committed Jun 25, 2019
1 parent b72544f commit a7f354f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
44 changes: 27 additions & 17 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,11 @@ def _get_columns_to_convert(df, schema, preserve_index, columns):
columns = _resolve_columns_of_interest(df, schema, columns)

column_names = []
type = None

index_levels = _get_index_level_values(df.index) if preserve_index else []

columns_to_convert = []
convert_types = []
convert_fields = []

if not df.columns.is_unique:
raise ValueError(
Expand All @@ -350,10 +349,11 @@ def _get_columns_to_convert(df, schema, preserve_index, columns):

if schema is not None:
field = schema.field_by_name(name)
type = getattr(field, "type", None)
else:
field = None

columns_to_convert.append(col)
convert_types.append(type)
convert_fields.append(field)
column_names.append(name)

index_descriptors = []
Expand All @@ -364,7 +364,7 @@ def _get_columns_to_convert(df, schema, preserve_index, columns):
descr = _get_range_index_descriptor(index_level)
else:
columns_to_convert.append(index_level)
convert_types.append(None)
convert_fields.append(None)
descr = name
index_column_names.append(name)
index_descriptors.append(descr)
Expand All @@ -380,10 +380,10 @@ def _get_columns_to_convert(df, schema, preserve_index, columns):
# index_levels : the extracted index level values
# columns_to_convert : assembled raw data (both data columns and indexes)
# to be converted to Arrow format
# columns_types : specified column types to use for coercion / casting
# columns_fields : specified column to use for coercion / casting
# during serialization, if a Schema was provided
return (all_names, column_names, index_column_names, index_descriptors,
index_levels, columns_to_convert, convert_types)
index_levels, columns_to_convert, convert_fields)


def _get_range_index_descriptor(level):
Expand Down Expand Up @@ -452,8 +452,8 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
index_descriptors,
index_columns,
columns_to_convert,
convert_types) = _get_columns_to_convert(df, schema, preserve_index,
columns)
convert_fields) = _get_columns_to_convert(df, schema, preserve_index,
columns)

# NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
# using a thread pool is worth it. Currently the heuristic is whether the
Expand All @@ -465,26 +465,36 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
else:
nthreads = 1

def convert_column(col, ty):
def convert_column(col, field):
if field is None:
field_nullable = True
type_ = None
else:
field_nullable = field.nullable
type_ = field.type

try:
return pa.array(col, type=ty, from_pandas=True, safe=safe)
result = pa.array(col, type=type_, from_pandas=True, safe=safe)
except (pa.ArrowInvalid,
pa.ArrowNotImplementedError,
pa.ArrowTypeError) as e:
e.args += ("Conversion failed for column {0!s} with type {1!s}"
.format(col.name, col.dtype),)
raise e
if not field_nullable and result.null_count > 0:
raise ValueError("Field {} was non-nullable but pandas column "
"had {} null values".format(str(field),
result.null_count))
return result

if nthreads == 1:
arrays = [convert_column(c, t)
for c, t in zip(columns_to_convert,
convert_types)]
arrays = [convert_column(c, f)
for c, f in zip(columns_to_convert, convert_fields)]
else:
from concurrent import futures
with futures.ThreadPoolExecutor(nthreads) as executor:
arrays = list(executor.map(convert_column,
columns_to_convert,
convert_types))
arrays = list(executor.map(convert_column, columns_to_convert,
convert_fields))

types = [x.type for x in arrays]

Expand Down
11 changes: 11 additions & 0 deletions python/pyarrow/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2542,6 +2542,17 @@ def test_to_pandas_deduplicate_date_time():

# ---------------------------------------------------------------------

def test_table_from_pandas_checks_field_nullability():
# ARROW-2136
df = pd.DataFrame({'a': [1.2, 2.1, 3.1],
'b': [np.nan, 'string', 'foo']})
schema = pa.schema([pa.field('a', pa.float64(), nullable=False),
pa.field('b', pa.utf8(), nullable=False)])

with pytest.raises(ValueError):
pa.Table.from_pandas(df, schema=schema)


def test_table_from_pandas_keeps_column_order_of_dataframe():
df1 = pd.DataFrame(OrderedDict([
('partition', [0, 0, 1, 1]),
Expand Down

0 comments on commit a7f354f

Please sign in to comment.