From e735eb78f07a0c341624f3dc7f5701aaa6146ac0 Mon Sep 17 00:00:00 2001 From: Karolis Kontrimas <56597152+KarolisKont@users.noreply.github.com> Date: Mon, 13 Nov 2023 09:29:19 +0200 Subject: [PATCH] Refactor: drop ability to configure BigQuery offline store export parquet file size (#17) * Drop BigQuery offline store export parquet file size functionality * Use lazy formating for log messages * Fix formating * Black format off for specific line --- .../feast/infra/offline_stores/bigquery.py | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 8e21503501..36b7cc63a9 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -20,7 +20,7 @@ import pandas as pd import pyarrow import pyarrow.parquet -from pydantic import ConstrainedStr, Field, StrictStr, validator +from pydantic import ConstrainedStr, StrictStr, validator from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed @@ -47,6 +47,7 @@ from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import get_user_agent, log_exceptions_and_usage + from .bigquery_source import ( BigQueryLoggingDestination, BigQuerySource, @@ -105,9 +106,6 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): gcs_staging_location: Optional[str] = None """ (optional) GCS location used for offloading BigQuery results as parquet files.""" - gcs_staging_file_size_mb: Optional[int] = Field(None, ge=1, le=1000) - """ (optional) Specify the staging file size in Megabytes. If it is not set, the BigQuery export function will determine the export file size automatically.""" - table_create_disposition: Optional[BigQueryTableCreateDisposition] = None """ (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED.""" @@ -579,29 +577,19 @@ def to_remote_storage(self) -> List[str]: table = None try: - logger.info(f"Starting data export to '{self._gcs_path}'") + logger.info("Starting data export to '%s'", self._gcs_path) table = self.to_bigquery() - logger.info(f"Data exported to table '{table}'") - - if self.config.offline_store.gcs_staging_file_size_mb is not None: - table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024 - number_of_files = max( - 1, - int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb), - ) - destination_uris = [ - f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files) - ] - else: - destination_uris = [f"{self._gcs_path}/*.parquet"] + logger.info("Data exported to table '%s'", table) job_config = bigquery.job.ExtractJobConfig() job_config.destination_format = "PARQUET" - logger.info(f"Starting data extraction from '{table}' to '{self._gcs_path}'") + logger.info( + "Starting data extraction from '%s' to '%s'", table, self._gcs_path + ) extract_job = self.client.extract_table( table, - destination_uris=destination_uris, + destination_uris=[f"{self._gcs_path}/*.parquet"], location=self.config.offline_store.location, job_config=job_config, ) @@ -610,10 +598,14 @@ def to_remote_storage(self) -> List[str]: bucket: str prefix: str if self.config.offline_store.billing_project_id: - storage_client = StorageClient(project=self.config.offline_store.project_id) + storage_client = StorageClient( + project=self.config.offline_store.project_id + ) else: storage_client = StorageClient(project=self.client.project) + # fmt: off bucket, prefix = self._gcs_path[len("gs://"):].split("/", 1) + # fmt: on if prefix.startswith("/"): prefix = prefix[1:] @@ -622,11 +614,13 @@ def to_remote_storage(self) -> List[str]: for b in blobs: results.append(f"gs://{b.bucket.name}/{b.name}") - logger.info(f"Data extraction completed. Extracted to {len(results)} files") + logger.info( + "Data extraction completed. Extracted to %s files", len(results) + ) return results finally: if table: - logger.info(f"Cleanup: Deleting temporary table '{table}'") + logger.info("Cleanup: Deleting temporary table '%s'", table) self.client.delete_table(table=table, not_found_ok=True)