Skip to content

Commit

Permalink
fix: keyerror when the load_table_from_dataframe accesses a unmapped …
Browse files Browse the repository at this point in the history
…dtype dataframe index (#1535)
  • Loading branch information
chelsea-lin authored Mar 28, 2023
1 parent 3c92580 commit a69348a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 28 deletions.
4 changes: 2 additions & 2 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
# pandas dtype.
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
if bq_type is None:
sample_data = _first_valid(dataframe[column])
sample_data = _first_valid(dataframe.reset_index()[column])
if (
isinstance(sample_data, _BaseGeometry)
and sample_data is not None # Paranoia
Expand Down Expand Up @@ -544,7 +544,7 @@ def augment_schema(dataframe, current_bq_schema):
augmented_schema.append(field)
continue

arrow_table = pyarrow.array(dataframe[field.name])
arrow_table = pyarrow.array(dataframe.reset_index()[field.name])

if pyarrow.types.is_list(arrow_table.type):
# `pyarrow.ListType`
Expand Down
106 changes: 80 additions & 26 deletions tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,32 +930,6 @@ def test_list_columns_and_indexes_with_multiindex(module_under_test):
assert columns_and_indexes == expected


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_dict_sequence(module_under_test):
df_data = collections.OrderedDict(
[
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("bool_column", [True, False]),
]
)
dataframe = pandas.DataFrame(df_data)

dict_schema = [
{"name": "str_column", "type": "STRING", "mode": "NULLABLE"},
{"name": "bool_column", "type": "BOOL", "mode": "REQUIRED"},
]

returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, dict_schema)

expected_schema = (
schema.SchemaField("str_column", "STRING", "NULLABLE"),
schema.SchemaField("int_column", "INTEGER", "NULLABLE"),
schema.SchemaField("bool_column", "BOOL", "REQUIRED"),
)
assert returned_schema == expected_schema


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_arrow_with_multiindex(module_under_test):
bq_schema = (
Expand Down Expand Up @@ -1190,6 +1164,86 @@ def test_dataframe_to_parquet_compression_method(module_under_test):
assert call_args.kwargs.get("compression") == "ZSTD"


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_w_named_index(module_under_test):
df_data = collections.OrderedDict(
[
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("bool_column", [True, False]),
]
)
index = pandas.Index(["a", "b"], name="str_index")
dataframe = pandas.DataFrame(df_data, index=index)

returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, [])

expected_schema = (
schema.SchemaField("str_index", "STRING", "NULLABLE"),
schema.SchemaField("str_column", "STRING", "NULLABLE"),
schema.SchemaField("int_column", "INTEGER", "NULLABLE"),
schema.SchemaField("bool_column", "BOOLEAN", "NULLABLE"),
)
assert returned_schema == expected_schema


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_w_multiindex(module_under_test):
df_data = collections.OrderedDict(
[
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("bool_column", [True, False]),
]
)
index = pandas.MultiIndex.from_tuples(
[
("a", 0, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)),
("a", 0, datetime.datetime(2000, 1, 1, 0, 0, 0)),
],
names=["str_index", "int_index", "dt_index"],
)
dataframe = pandas.DataFrame(df_data, index=index)

returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, [])

expected_schema = (
schema.SchemaField("str_index", "STRING", "NULLABLE"),
schema.SchemaField("int_index", "INTEGER", "NULLABLE"),
schema.SchemaField("dt_index", "DATETIME", "NULLABLE"),
schema.SchemaField("str_column", "STRING", "NULLABLE"),
schema.SchemaField("int_column", "INTEGER", "NULLABLE"),
schema.SchemaField("bool_column", "BOOLEAN", "NULLABLE"),
)
assert returned_schema == expected_schema


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_w_bq_schema(module_under_test):
df_data = collections.OrderedDict(
[
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("bool_column", [True, False]),
]
)
dataframe = pandas.DataFrame(df_data)

dict_schema = [
{"name": "str_column", "type": "STRING", "mode": "NULLABLE"},
{"name": "bool_column", "type": "BOOL", "mode": "REQUIRED"},
]

returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, dict_schema)

expected_schema = (
schema.SchemaField("str_column", "STRING", "NULLABLE"),
schema.SchemaField("int_column", "INTEGER", "NULLABLE"),
schema.SchemaField("bool_column", "BOOL", "REQUIRED"),
)
assert returned_schema == expected_schema


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow(module_under_test):
dataframe = pandas.DataFrame(
Expand Down

0 comments on commit a69348a

Please sign in to comment.