From f80f05f5d2042c6fff533b0dc85612acda5a7417 Mon Sep 17 00:00:00 2001 From: vmallya-123 Date: Sat, 1 Oct 2022 22:45:56 +0800 Subject: [PATCH] feat: Adding billing_project_id in BigQueryOfflineStoreConfig (#3253) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adding_billing_project_in_config Signed-off-by: “Varun * Fix lint Signed-off-by: Danny Chiao Signed-off-by: “Varun Signed-off-by: Danny Chiao Co-authored-by: Danny Chiao --- .../feast/infra/offline_stores/bigquery.py | 60 ++++++++++++++----- 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index c570b8d38a..bf010c82aa 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -19,7 +19,7 @@ import pandas as pd import pyarrow import pyarrow.parquet -from pydantic import StrictStr +from pydantic import StrictStr, validator from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed @@ -83,7 +83,8 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): project_id: Optional[StrictStr] = None """ (optional) GCP project name used for the BigQuery offline store """ - + billing_project_id: Optional[StrictStr] = None + """ (optional) GCP project name used to run the bigquery jobs at """ location: Optional[StrictStr] = None """ (optional) GCP location name used for the BigQuery offline store. Examples of location names include ``US``, ``EU``, ``us-central1``, ``us-west4``. @@ -94,6 +95,14 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): gcs_staging_location: Optional[str] = None """ (optional) GCS location used for offloading BigQuery results as parquet files.""" + @validator("billing_project_id") + def project_id_exists(cls, v, values, **kwargs): + if v and not values["project_id"]: + raise ValueError( + "please specify project_id if billing_project_id is specified" + ) + return v + class BigQueryOfflineStore(OfflineStore): @staticmethod @@ -122,9 +131,11 @@ def pull_latest_from_table_or_query( timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" field_string = ", ".join(join_key_columns + feature_name_columns + timestamps) - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, + project=project_id, location=config.offline_store.location, ) query = f""" @@ -162,9 +173,11 @@ def pull_all_from_table_or_query( assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) assert isinstance(data_source, BigQuerySource) from_expression = data_source.get_table_query_string() - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, + project=project_id, location=config.offline_store.location, ) field_string = ", ".join( @@ -197,17 +210,22 @@ def get_historical_features( assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) for fv in feature_views: assert isinstance(fv.batch_source, BigQuerySource) - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, + project=project_id, location=config.offline_store.location, ) assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) - + if config.offline_store.billing_project_id: + dataset_project = str(config.offline_store.project_id) + else: + dataset_project = client.project table_reference = _get_table_reference_for_new_entity( client, - client.project, + dataset_project, config.offline_store.dataset, config.offline_store.location, ) @@ -295,9 +313,11 @@ def write_logged_features( ): destination = logging_config.destination assert isinstance(destination, BigQueryLoggingDestination) - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, + project=project_id, location=config.offline_store.location, ) @@ -353,9 +373,11 @@ def offline_write_batch( if table.schema != pa_schema: table = table.cast(pa_schema) - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, + project=project_id, location=config.offline_store.location, ) @@ -451,7 +473,10 @@ def to_bigquery( if not job_config: today = date.today().strftime("%Y%m%d") rand_id = str(uuid.uuid4())[:7] - path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" + if self.config.offline_store.billing_project_id: + path = f"{self.config.offline_store.project_id}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" + else: + path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" job_config = bigquery.QueryJobConfig(destination=path) if not job_config.dry_run and self.on_demand_feature_views: @@ -525,7 +550,10 @@ def to_remote_storage(self) -> List[str]: bucket: str prefix: str - storage_client = StorageClient(project=self.client.project) + if self.config.offline_store.billing_project_id: + storage_client = StorageClient(project=self.config.offline_store.project_id) + else: + storage_client = StorageClient(project=self.client.project) bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1) prefix = prefix.rsplit("/", 1)[0] if prefix.startswith("/"):