Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid policy tags 403 error in load_table_from_dataframe #557

Merged
merged 4 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2291,9 +2291,18 @@ def load_table_from_dataframe(
name
for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe)
)
# schema fields not present in the dataframe are not needed
job_config.schema = [
field for field in table.schema if field.name in columns_and_indexes
# Field description and policy tags are not needed to
# serialize a data frame.
SchemaField(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual bug fix. Rather than populate all properties of schema field from the table schema, just populate the minimum we need to convert to parquet/CSV and then upload

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to revisit this for parameterization constraints, but that's a problem for future Tim.

field.name,
field.field_type,
mode=field.mode,
fields=field.fields,
)
# schema fields not present in the dataframe are not needed
for field in table.schema
if field.name in columns_and_indexes
]

job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
Expand Down
43 changes: 21 additions & 22 deletions google/cloud/bigquery/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from google.cloud.bigquery_v2 import types


_DEFAULT_VALUE = object()
_STRUCT_TYPES = ("RECORD", "STRUCT")

# SQL types reference:
Expand Down Expand Up @@ -73,14 +74,18 @@ def __init__(
name,
field_type,
mode="NULLABLE",
description=None,
description=_DEFAULT_VALUE,
fields=(),
policy_tags=None,
):
self._name = name
self._field_type = field_type
self._mode = mode
self._description = description
self._properties = {
"name": name,
"type": field_type,
}
if mode is not None:
self._properties["mode"] = mode.upper()
if description is not _DEFAULT_VALUE:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shollyman This is one of the key changes: we no longer set the resource value for "description" if it's not explicitly set.

We already omit policy_tags from the resource if none (though arguably it should get the same treatment so that someone can unset policy tags from Python)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My default inclination would be for special handling for None values to happen at the places where it's significant, like when calling tables.update. It's also the case that schema fields can't be manipulated individually, so perhaps I'm simply just not thinking this through properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I called that out as a possibility in #558, but that'd require updating our field mask logic to support sub-fields, which gets into some hairy string parsing (perhaps not all that hairy, as it could be as simple as split on '.', but is definitely a departure from what we've been doing).

Also, it might mean that we'd have to introduce a field mask to our load job methods. Based on the error message we're seeing, it sounds like it's possible to make updates to fields like policy tags from a load job.

self._properties["description"] = description
self._fields = tuple(fields)
self._policy_tags = policy_tags

Expand All @@ -98,7 +103,7 @@ def from_api_repr(cls, api_repr):
"""
# Handle optional properties with default values
mode = api_repr.get("mode", "NULLABLE")
description = api_repr.get("description")
description = api_repr.get("description", _DEFAULT_VALUE)
fields = api_repr.get("fields", ())

return cls(
Expand All @@ -113,7 +118,7 @@ def from_api_repr(cls, api_repr):
@property
def name(self):
"""str: The name of the field."""
return self._name
return self._properties["name"]

@property
def field_type(self):
Expand All @@ -122,7 +127,7 @@ def field_type(self):
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type
"""
return self._field_type
return self._properties["type"]

@property
def mode(self):
Expand All @@ -131,17 +136,17 @@ def mode(self):
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.mode
"""
return self._mode
return self._properties.get("mode")

@property
def is_nullable(self):
"""bool: whether 'mode' is 'nullable'."""
return self._mode == "NULLABLE"
return self.mode == "NULLABLE"

@property
def description(self):
"""Optional[str]: description for the field."""
return self._description
return self._properties.get("description")

@property
def fields(self):
Expand All @@ -164,13 +169,7 @@ def to_api_repr(self):
Returns:
Dict: A dictionary representing the SchemaField in a serialized form.
"""
# Put together the basic representation. See http://bit.ly/2hOAT5u.
answer = {
"mode": self.mode.upper(),
"name": self.name,
"type": self.field_type.upper(),
"description": self.description,
}
answer = self._properties.copy()

# If this is a RECORD type, then sub-fields are also included,
# add this to the serialized representation.
Expand All @@ -193,10 +192,10 @@ def _key(self):
Tuple: The contents of this :class:`~google.cloud.bigquery.schema.SchemaField`.
"""
return (
self._name,
self._field_type.upper(),
self._mode.upper(),
self._description,
self.name,
self.field_type.upper(),
self.mode.upper(),
self.description,
self._fields,
self._policy_tags,
)
Expand Down
12 changes: 2 additions & 10 deletions tests/unit/job/test_load_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,11 @@ def test_schema_setter_fields(self):
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
}
age_repr = {
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
}
self.assertEqual(
config._properties["load"]["schema"], {"fields": [full_name_repr, age_repr]}
Expand All @@ -449,24 +447,18 @@ def test_schema_setter_fields(self):
def test_schema_setter_valid_mappings_list(self):
config = self._get_target_class()()

schema = [
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INTEGER", "mode": "REQUIRED"},
]
config.schema = schema

full_name_repr = {
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
}
age_repr = {
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
}
schema = [full_name_repr, age_repr]
config.schema = schema
self.assertEqual(
config._properties["load"]["schema"], {"fields": [full_name_repr, age_repr]}
)
Expand Down
113 changes: 74 additions & 39 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,18 +1596,8 @@ def test_create_table_w_schema_and_query(self):
{
"schema": {
"fields": [
{
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
},
{
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
},
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INTEGER", "mode": "REQUIRED"},
]
},
"view": {"query": query},
Expand Down Expand Up @@ -1641,18 +1631,8 @@ def test_create_table_w_schema_and_query(self):
},
"schema": {
"fields": [
{
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
},
{
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
},
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INTEGER", "mode": "REQUIRED"},
]
},
"view": {"query": query, "useLegacySql": False},
Expand Down Expand Up @@ -2602,7 +2582,7 @@ def test_update_table(self):
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
"description": "New field description",
},
]
},
Expand All @@ -2613,8 +2593,10 @@ def test_update_table(self):
}
)
schema = [
SchemaField("full_name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
SchemaField("full_name", "STRING", mode="REQUIRED", description=None),
SchemaField(
"age", "INTEGER", mode="REQUIRED", description="New field description"
),
]
creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)
Expand Down Expand Up @@ -2647,7 +2629,7 @@ def test_update_table(self):
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
"description": "New field description",
},
]
},
Expand Down Expand Up @@ -2773,13 +2755,24 @@ def test_update_table_w_query(self):
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
"description": "this is a column",
},
{"name": "country", "type": "STRING", "mode": "NULLABLE"},
]
}
schema = [
SchemaField("full_name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
SchemaField(
"full_name",
"STRING",
mode="REQUIRED",
# Explicitly unset the description.
description=None,
),
SchemaField(
"age", "INTEGER", mode="REQUIRED", description="this is a column"
),
# Omit the description to not make updates to it.
SchemaField("country", "STRING"),
]
resource = self._make_table_resource()
resource.update(
Expand Down Expand Up @@ -7658,18 +7651,47 @@ def test_load_table_from_file_w_invalid_job_config(self):
def test_load_table_from_dataframe(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import PolicyTagList, SchemaField

client = self._make_client()
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
dataframe = pandas.DataFrame(records)
records = [
{"id": 1, "age": 100, "accounts": [2, 3]},
{"id": 2, "age": 60, "accounts": [5]},
{"id": 3, "age": 40, "accounts": []},
]
# Mixup column order so that we can verify sent schema matches the
# serialized order, not the table column order.
column_order = ["age", "accounts", "id"]
dataframe = pandas.DataFrame(records, columns=column_order)
table_fields = {
"id": SchemaField(
"id",
"INTEGER",
mode="REQUIRED",
description="integer column",
policy_tags=PolicyTagList(names=("foo", "bar")),
),
"age": SchemaField(
"age",
"INTEGER",
mode="NULLABLE",
description="age column",
policy_tags=PolicyTagList(names=("baz",)),
),
"accounts": SchemaField(
"accounts", "INTEGER", mode="REPEATED", description="array column",
),
}
get_table_schema = [
table_fields["id"],
table_fields["age"],
table_fields["accounts"],
]

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
return_value=mock.Mock(
schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")]
),
return_value=mock.Mock(schema=get_table_schema),
)
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
Expand All @@ -7695,8 +7717,21 @@ def test_load_table_from_dataframe(self):
sent_file = load_table_from_file.mock_calls[0][1][1]
assert sent_file.closed

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
sent_config = load_table_from_file.mock_calls[0][2]["job_config"].to_api_repr()[
"load"
]
assert sent_config["sourceFormat"] == job.SourceFormat.PARQUET
for field_index, field in enumerate(sent_config["schema"]["fields"]):
assert field["name"] == column_order[field_index]
table_field = table_fields[field["name"]]
assert field["name"] == table_field.name
assert field["type"] == table_field.field_type
assert field["mode"] == table_field.mode
assert len(field.get("fields", [])) == len(table_field.fields)
# Omit unnecessary fields when they come from getting the table
# (not passed in via job_config)
assert "description" not in field
assert "policyTags" not in field

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
Expand Down
9 changes: 1 addition & 8 deletions tests/unit/test_external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,7 @@ def test_to_api_repr_base(self):
ec.schema = [schema.SchemaField("full_name", "STRING", mode="REQUIRED")]

exp_schema = {
"fields": [
{
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
}
]
"fields": [{"name": "full_name", "type": "STRING", "mode": "REQUIRED"}]
}
got_resource = ec.to_api_repr()
exp_resource = {
Expand Down
Loading