-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathbigquery.py
73 lines (60 loc) · 2.39 KB
/
bigquery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import os
from google.cloud import bigquery
from google.oauth2 import service_account
from google.auth.exceptions import DefaultCredentialsError
from loguru import logger
import time
from ingestion.models import PypiJobParameters, FileDownloads
import pyarrow as pa
PYPI_PUBLIC_DATASET = "bigquery-public-data.pypi.file_downloads"
def build_pypi_query(
params: PypiJobParameters, pypi_public_dataset: str = PYPI_PUBLIC_DATASET
) -> str:
# Query the public PyPI dataset from BigQuery
# /!\ This is a large dataset, filter accordingly /!\
return f"""
SELECT *
FROM
`{pypi_public_dataset}`
WHERE
project = '{params.pypi_project}'
AND {params.timestamp_column} >= TIMESTAMP("{params.start_date}")
AND {params.timestamp_column} < TIMESTAMP("{params.end_date}")
"""
def get_bigquery_client(project_name: str) -> bigquery.Client:
"""Get Big Query client"""
try:
service_account_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
if service_account_path:
credentials = service_account.Credentials.from_service_account_file(
service_account_path
)
bigquery_client = bigquery.Client(
project=project_name, credentials=credentials
)
return bigquery_client
raise EnvironmentError(
"No valid credentials found for BigQuery authentication."
)
except DefaultCredentialsError as creds_error:
raise creds_error
2
def get_bigquery_result(
query_str: str, bigquery_client: bigquery.Client, model: FileDownloads
) -> pa.Table:
"""Get query result from BigQuery and yield rows as dictionaries."""
try:
# Start measuring time
start_time = time.time()
# Run the query and directly load into a DataFrame
logger.info(f"Running query: {query_str}")
# dataframe = bigquery_client.query(query_str).to_dataframe(dtypes=FileDownloads().pandas_dtypes)
pa_tbl = bigquery_client.query(query_str).to_arrow()
# Log the time taken for query execution and data loading
elapsed_time = time.time() - start_time
logger.info(f"Query executed and data loaded in {elapsed_time:.2f} seconds")
# Iterate over DataFrame rows and yield as dictionaries
return pa_tbl
except Exception as e:
logger.error(f"Error running query: {e}")
raise