Skip to content

Commit

Permalink
Provide the user with more options for setting the to_bigquery config (
Browse files Browse the repository at this point in the history
…#1661)

* Provide more options for to_bigquery config

Signed-off-by: Cody Lin <[email protected]>

* Fix default job_config when none; remove excessive testing

Signed-off-by: Cody Lin <[email protected]>

* Add param type and docstring

Signed-off-by: Cody Lin <[email protected]>

* add docstrings and typing

Signed-off-by: Cody Lin <[email protected]>

* Apply docstring suggestions from code review

Co-authored-by: Willem Pienaar <[email protected]>
Signed-off-by: Cody Lin <[email protected]>

Co-authored-by: Willem Pienaar <[email protected]>
  • Loading branch information
codyjlin and woop authored Jun 28, 2021
1 parent 1761b10 commit b9dd955
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 31 deletions.
48 changes: 34 additions & 14 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,31 +239,51 @@ def to_df(self):
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df

def to_bigquery(self, dry_run=False) -> Optional[str]:
def to_sql(self) -> str:
"""
Returns the SQL query that will be executed in BigQuery to build the historical feature table.
"""
return self.query

def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]:
"""
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table.
Args:
job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.
Returns:
Returns the destination table name or returns None if job_config.dry_run is True.
"""

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _block_until_done():
return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"]

today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = self.config.offline_store.project_id or self.client.project
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run)
bq_job = self.client.query(self.query, job_config=job_config)

if dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
if not job_config:
today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = (
self.config.offline_store.project_id or self.client.project
)
return None
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path)

bq_job = self.client.query(self.query, job_config=job_config)

_block_until_done()

if bq_job.exception():
raise bq_job.exception()

print(f"Done writing to '{path}'.")
return path
if job_config.dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
)
return None

print(f"Done writing to '{job_config.destination}'.")
return str(job_config.destination)


@dataclass(frozen=True)
Expand Down
18 changes: 1 addition & 17 deletions sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,29 +438,13 @@ def test_historical_features_from_bigquery_sources(
],
)

# Just a dry run, should not create table
bq_dry_run = job_from_sql.to_bigquery(dry_run=True)
assert bq_dry_run is None

bq_temp_table_path = job_from_sql.to_bigquery()
assert bq_temp_table_path.split(".")[0] == gcp_project

if provider_type == "gcp_custom_offline_config":
assert bq_temp_table_path.split(".")[1] == "foo"
else:
assert bq_temp_table_path.split(".")[1] == bigquery_dataset

# Check that this table actually exists
actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path)
assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1]

start_time = datetime.utcnow()
actual_df_from_sql_entities = job_from_sql.to_df()
end_time = datetime.utcnow()
with capsys.disabled():
print(
str(
f"\nTime to execute job_from_df.to_df() = '{(end_time - start_time)}'"
f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'"
)
)

Expand Down

0 comments on commit b9dd955

Please sign in to comment.