Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Reduce dry runs from read_gbq with table #1129

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 45 additions & 38 deletions bigframes/session/_io/bigquery/read_gbq_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def get_table_metadata(
return cached_table

table = bqclient.get_table(table_ref)
# local time will lag a little bit do to network latency
# make sure it is at least table creation time.
# This is relevant if the table was created immediately before loading it here.
if (table.created is not None) and (table.created > bq_time):
bq_time = table.created

cached_table = (bq_time, table)
cache[table_ref] = cached_table
Expand All @@ -85,64 +90,66 @@ def get_table_metadata(

def validate_table(
bqclient: bigquery.Client,
table_ref: bigquery.table.TableReference,
table: bigquery.table.Table,
columns: Optional[Sequence[str]],
snapshot_time: datetime.datetime,
table_type: str,
filter_str: Optional[str] = None,
) -> bool:
"""Validates that the table can be read, returns True iff snapshot is supported."""
# First run without snapshot to verify table can be read
sql = bigframes.session._io.bigquery.to_query(
query_or_table=f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}",
columns=columns or (),
sql_predicate=filter_str,
)
dry_run_config = bigquery.QueryJobConfig()
dry_run_config.dry_run = True
try:
bqclient.query_and_wait(sql, job_config=dry_run_config)
except google.api_core.exceptions.Forbidden as ex:
if "Drive credentials" in ex.message:
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
raise

time_travel_not_found = False
# Anonymous dataset, does not support snapshot ever
if table_ref.dataset_id.startswith("_"):
return False

# Materialized views,does not support snapshot
if table_type == "MATERIALIZED_VIEW":
warnings.warn(
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that as materialized views "
"are updated periodically, modifications to the underlying data in the view may "
"result in errors or unexpected behavior.",
category=bigframes.exceptions.TimeTravelDisabledWarning,
if table.dataset_id.startswith("_"):
pass
# Only true tables support time travel
elif table.table_type != "TABLE":
if table.table_type == "MATERIALIZED_VIEW":
warnings.warn(
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that as materialized views "
"are updated periodically, modifications to the underlying data in the view may "
"result in errors or unexpected behavior.",
category=bigframes.exceptions.TimeTravelDisabledWarning,
)
else:
# table might support time travel, lets do a dry-run query with time travel
snapshot_sql = bigframes.session._io.bigquery.to_query(
query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}",
columns=columns or (),
sql_predicate=filter_str,
time_travel_timestamp=snapshot_time,
)
return False
try:
# If this succeeds, we don't need to query without time travel, that would surely succeed
bqclient.query_and_wait(
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
)
return True
except google.api_core.exceptions.NotFound:
# note that a notfound caused by a simple typo will be
# caught above when the metadata is fetched, not here
time_travel_not_found = True

# Second, try with snapshot to verify table supports this feature
# At this point, time travel is known to fail, but can we query without time travel?
snapshot_sql = bigframes.session._io.bigquery.to_query(
query_or_table=f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}",
query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}",
columns=columns or (),
sql_predicate=filter_str,
time_travel_timestamp=snapshot_time,
time_travel_timestamp=None,
)
try:
bqclient.query_and_wait(snapshot_sql, job_config=dry_run_config)
return True
except google.api_core.exceptions.NotFound:
# note that a notfound caused by a simple typo will be
# caught above when the metadata is fetched, not here
# Any erorrs here should just be raised to user
bqclient.query_and_wait(
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
)
if time_travel_not_found:
warnings.warn(
"NotFound error when reading table with time travel."
" Attempting query without time travel. Warning: Without"
" time travel, modifications to the underlying table may"
" result in errors or unexpected behavior.",
category=bigframes.exceptions.TimeTravelDisabledWarning,
)
return False
return False


def are_index_cols_unique(
Expand Down
20 changes: 12 additions & 8 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,18 @@ def read_gbq_table(
else (*columns, *[col for col in index_cols if col not in columns])
)

enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
self._bqclient,
table_ref,
all_columns,
time_travel_timestamp,
table.table_type,
filter_str,
)
try:
enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
self._bqclient,
table,
all_columns,
time_travel_timestamp,
filter_str,
)
except google.api_core.exceptions.Forbidden as ex:
if "Drive credentials" in ex.message:
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
raise

# ----------------------------
# Create ordering and validate
Expand Down
1 change: 1 addition & 0 deletions tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def test_read_gbq_cached_table():
table._properties["location"] = session._location
table._properties["numRows"] = "1000000000"
table._properties["location"] = session._location
table._properties["type"] = "TABLE"
session._loader._df_snapshot[table_ref] = (
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
table,
Expand Down