Skip to content

Commit

Permalink
Refactor: drop ability to configure BigQuery offline store export par…
Browse files Browse the repository at this point in the history
…quet file size (#17)

* Drop BigQuery offline store export parquet file size functionality

* Use lazy formating for log messages

* Fix formating

* Black format off for specific line
  • Loading branch information
KarolisKont authored Nov 13, 2023
1 parent f80913d commit e735eb7
Showing 1 changed file with 17 additions and 23 deletions.
40 changes: 17 additions & 23 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import ConstrainedStr, Field, StrictStr, validator
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

Expand All @@ -47,6 +47,7 @@
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import get_user_agent, log_exceptions_and_usage

from .bigquery_source import (
BigQueryLoggingDestination,
BigQuerySource,
Expand Down Expand Up @@ -105,9 +106,6 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

gcs_staging_file_size_mb: Optional[int] = Field(None, ge=1, le=1000)
""" (optional) Specify the staging file size in Megabytes. If it is not set, the BigQuery export function will determine the export file size automatically."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""

Expand Down Expand Up @@ -579,29 +577,19 @@ def to_remote_storage(self) -> List[str]:

table = None
try:
logger.info(f"Starting data export to '{self._gcs_path}'")
logger.info("Starting data export to '%s'", self._gcs_path)
table = self.to_bigquery()
logger.info(f"Data exported to table '{table}'")

if self.config.offline_store.gcs_staging_file_size_mb is not None:
table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(
1,
int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb),
)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
else:
destination_uris = [f"{self._gcs_path}/*.parquet"]
logger.info("Data exported to table '%s'", table)

job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = "PARQUET"

logger.info(f"Starting data extraction from '{table}' to '{self._gcs_path}'")
logger.info(
"Starting data extraction from '%s' to '%s'", table, self._gcs_path
)
extract_job = self.client.extract_table(
table,
destination_uris=destination_uris,
destination_uris=[f"{self._gcs_path}/*.parquet"],
location=self.config.offline_store.location,
job_config=job_config,
)
Expand All @@ -610,10 +598,14 @@ def to_remote_storage(self) -> List[str]:
bucket: str
prefix: str
if self.config.offline_store.billing_project_id:
storage_client = StorageClient(project=self.config.offline_store.project_id)
storage_client = StorageClient(
project=self.config.offline_store.project_id
)
else:
storage_client = StorageClient(project=self.client.project)
# fmt: off
bucket, prefix = self._gcs_path[len("gs://"):].split("/", 1)
# fmt: on
if prefix.startswith("/"):
prefix = prefix[1:]

Expand All @@ -622,11 +614,13 @@ def to_remote_storage(self) -> List[str]:
for b in blobs:
results.append(f"gs://{b.bucket.name}/{b.name}")

logger.info(f"Data extraction completed. Extracted to {len(results)} files")
logger.info(
"Data extraction completed. Extracted to %s files", len(results)
)
return results
finally:
if table:
logger.info(f"Cleanup: Deleting temporary table '{table}'")
logger.info("Cleanup: Deleting temporary table '%s'", table)
self.client.delete_table(table=table, not_found_ok=True)


Expand Down

0 comments on commit e735eb7

Please sign in to comment.