diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 44b83caff1..f331d1b768 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -94,8 +94,14 @@ def get_historical_features( expected_join_keys = _get_join_keys(project, feature_views, registry) assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) + dataset_project = config.offline_store.project_id or client.project + table = _upload_entity_df_into_bigquery( - client, config.project, config.offline_store.dataset, entity_df + client=client, + project=config.project, + dataset_name=config.offline_store.dataset, + dataset_project=dataset_project, + entity_df=entity_df, ) entity_df_event_timestamp_col = _infer_event_timestamp_from_bigquery_query( @@ -225,7 +231,8 @@ def _block_until_done(): today = date.today().strftime("%Y%m%d") rand_id = str(uuid.uuid4())[:7] - path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" + dataset_project = self.config.offline_store.project_id or self.client.project + path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run) bq_job = self.client.query(self.query, job_config=job_config) @@ -261,12 +268,12 @@ class FeatureViewQueryContext: def _get_table_id_for_new_entity( - client: Client, project: str, dataset_name: str + client: Client, project: str, dataset_name: str, dataset_project: str ) -> str: """Gets the table_id for the new entity to be uploaded.""" # First create the BigQuery dataset if it doesn't exist - dataset = bigquery.Dataset(f"{client.project}.{dataset_name}") + dataset = bigquery.Dataset(f"{dataset_project}.{dataset_name}") dataset.location = "US" try: @@ -275,18 +282,21 @@ def _get_table_id_for_new_entity( # Only create the dataset if it does not exist client.create_dataset(dataset, exists_ok=True) - return f"{client.project}.{dataset_name}.entity_df_{project}_{int(time.time())}" + return f"{dataset_project}.{dataset_name}.entity_df_{project}_{int(time.time())}" def _upload_entity_df_into_bigquery( client: Client, project: str, dataset_name: str, + dataset_project: str, entity_df: Union[pandas.DataFrame, str], ) -> Table: """Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table""" - table_id = _get_table_id_for_new_entity(client, project, dataset_name) + table_id = _get_table_id_for_new_entity( + client, project, dataset_name, dataset_project + ) if type(entity_df) is str: job = client.query(f"CREATE TABLE {table_id} AS ({entity_df})") diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 25c55dcc2b..7c72fce944 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -50,7 +50,10 @@ class BigQueryOfflineStoreConfig(FeastBaseModel): """ Offline store type selector""" dataset: StrictStr = "feast" - """ (optional) BigQuery Dataset name for temporary tables """ + """ (optional) BigQuery dataset name used for the BigQuery offline store """ + + project_id: Optional[StrictStr] = None + """ (optional) GCP project name used for the BigQuery offline store """ OfflineStoreConfig = Union[FileOfflineStoreConfig, BigQueryOfflineStoreConfig]