Skip to content

Commit

Permalink
Merge pull request #3444 from NicholasTurner23/update-fix/optimize_hi…
Browse files Browse the repository at this point in the history
…storical_hourly_measurements

Update fix/optimize historical hourly measurements
  • Loading branch information
Baalmart authored Sep 16, 2024
2 parents 5b4cb05 + a97dfdf commit b4ef239
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 95 deletions.
26 changes: 6 additions & 20 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,40 +117,26 @@ def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame:
return not_duplicated_data

@staticmethod
def extract_aggregated_raw_data(start_date_time, end_date_time) -> pd.DataFrame:
def extract_aggregated_raw_data(
start_date_time: str, end_date_time: str, dynamic_query: bool = False
) -> pd.DataFrame:
"""
Retrieves raw pm2.5 sensor data from bigquery and computes averages for the numeric columns grouped by device_number, device_id and site_id
"""
bigquery_api = BigQueryApi()

measurements = bigquery_api.query_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.raw_measurements_table,
tenant=Tenant.AIRQO,
dynamic_query=dynamic_query,
)

if measurements.empty:
return pd.DataFrame([])

measurements = measurements.dropna(subset=["timestamp"])
measurements["timestamp"] = pd.to_datetime(measurements["timestamp"])
averaged_measurements_list: List[pd.DataFrame] = []

for (device_number, device_id, site_id), device_site in measurements.groupby(
["device_number", "device_id", "site_id"]
):
data = device_site.sort_index(axis=0)
numeric_columns = data.select_dtypes(include="number").columns
averages = data.resample("1H", on="timestamp")[numeric_columns].mean()
averages["timestamp"] = averages.index
averages["device_number"] = device_number
averages["device_id"] = device_id
averages["site_id"] = site_id
averaged_measurements_list.append(averages)

averaged_measurements = pd.concat(averaged_measurements_list, ignore_index=True)

return averaged_measurements
return measurements

@staticmethod
def flatten_field_8(device_category: DeviceCategory, field_8: str = None):
Expand Down
198 changes: 151 additions & 47 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@

from typing import List

import logging

logger = logging.getLogger(__name__)


class BigQueryApi:
def __init__(self):
self.client = bigquery.Client()
self.schema_mapping = configuration.SCHEMA_FILE_MAPPING
self.hourly_measurements_table = configuration.BIGQUERY_HOURLY_EVENTS_TABLE
# TODO: Remove later
self.hourly_measurements_table_prod = (
Expand Down Expand Up @@ -131,19 +136,19 @@ def validate_data(
date_time_columns = (
date_time_columns
if date_time_columns
else self.get_columns(table=table, column_type=ColumnDataType.TIMESTAMP)
else self.get_columns(table=table, column_type=[ColumnDataType.TIMESTAMP])
)

float_columns = (
float_columns
if float_columns
else self.get_columns(table=table, column_type=ColumnDataType.FLOAT)
else self.get_columns(table=table, column_type=[ColumnDataType.FLOAT])
)

integer_columns = (
integer_columns
if integer_columns
else self.get_columns(table=table, column_type=ColumnDataType.INTEGER)
else self.get_columns(table=table, column_type=[ColumnDataType.INTEGER])
)

from .data_validator import DataValidationUtils
Expand All @@ -158,46 +163,21 @@ def validate_data(
return dataframe.drop_duplicates(keep="first")

def get_columns(
self, table: str = "all", column_type: ColumnDataType = ColumnDataType.NONE
self,
table: str = "all",
column_type: List[ColumnDataType] = [ColumnDataType.NONE],
) -> List[str]:
"""
Retrieves a list of columns that match a schema of a given table and or match data type as well. The schemas should match the tables in bigquery.
Args:
table(project.data_set.table): Data asset name as appears in bigquery.
column_type: One of the predetermined ColumnDataType Enums
table (str): The data asset name as it appears in BigQuery, in the format 'project.dataset.table'.
column_type (List[ColumnDataType]): A list of predetermined ColumnDataType Enums to filter by. Defaults to [ColumnDataType.NONE].
Returns:
A list of column names that match the passed specifications.
List[str]: A list of column names that match the passed specifications.
"""
# TODO Find an approach that pushes this mapping to config or make it more dynamic
schema_mapping = {
self.hourly_measurements_table: "measurements.json",
self.daily_measurements_table: "measurements.json",
self.raw_measurements_table: "raw_measurements.json",
self.hourly_weather_table: "weather_data.json",
self.raw_weather_table: "weather_data.json",
self.latest_measurements_table: "latest_measurements.json",
self.consolidated_data_table: "data_warehouse.json",
self.airqlouds_table: "airqlouds.json",
self.airqlouds_sites_table: "airqlouds_sites.json",
self.grids_table: "grids.json",
self.cohorts_table: "cohorts.json",
self.grids_sites_table: "grids_sites.json",
self.cohorts_devices_table: "cohorts_devices.json",
self.sites_table: "sites.json",
self.sites_meta_data_table: "sites_meta_data.json",
self.sensor_positions_table: "sensor_positions.json",
self.devices_table: "devices.json",
self.clean_mobile_raw_measurements_table: "mobile_measurements.json",
self.unclean_mobile_raw_measurements_table: "mobile_measurements.json",
self.airqo_mobile_measurements_table: "airqo_mobile_measurements.json",
self.bam_measurements_table: "bam_measurements.json",
self.raw_bam_measurements_table: "bam_raw_measurements.json",
"all": None,
}

schema_file = schema_mapping.get(table)
schema_file = self.schema_mapping.get(table, None)

if schema_file is None and table != "all":
raise Exception("Invalid table")
Expand All @@ -223,13 +203,17 @@ def get_columns(
file_schema = Utils.load_schema(file_name=f"{file}.json")
schema.extend(file_schema)

# Convert column_type list to strings for comparison
column_type_strings = [str(ct) for ct in column_type]

# Retrieve columns that match any of the specified types or match ColumnDataType.NONE
columns: List[str] = list(
set(
[
column["name"]
for column in schema
if column_type == ColumnDataType.NONE
or column["type"] == str(column_type)
if ColumnDataType.NONE in column_type
or column["type"] in column_type_strings
]
)
)
Expand Down Expand Up @@ -531,6 +515,28 @@ def compose_query(
null_cols: list = None,
columns: list = None,
) -> str:
"""
Composes a SQL query for BigQuery based on the query type (GET or DELETE),
and optionally includes a dynamic selection and aggregation of numeric columns.
Args:
query_type (QueryType): The type of query (GET or DELETE).
table (str): The BigQuery table to query.
start_date_time (str): The start datetime for filtering records.
end_date_time (str): The end datetime for filtering records.
tenant (Tenant): The tenant or ownership information (e.g., to filter data).
where_fields (dict): Optional dictionary of fields to filter on.
null_cols (list): Optional list of columns to check for null values.
columns (list): Optional list of columns to select. If None, selects all.
exclude_columns (list): List of columns to exclude from aggregation if dynamically selecting numeric columns.
Returns:
str: The composed SQL query as a string.
Raises:
Exception: If an invalid column is provided in `where_fields` or `null_cols`,
or if the `query_type` is not supported.
"""
null_cols = [] if null_cols is None else null_cols
where_fields = {} if where_fields is None else where_fields

Expand Down Expand Up @@ -608,30 +614,128 @@ def query_data(
end_date_time: str,
table: str,
tenant: Tenant,
dynamic_query: bool = False,
columns: list = None,
where_fields: dict = None,
null_cols: list = None,
time_granularity: str = "HOUR",
) -> pd.DataFrame:
query = self.compose_query(
QueryType.GET,
table=table,
tenant=tenant,
start_date_time=start_date_time,
end_date_time=end_date_time,
where_fields=where_fields,
null_cols=null_cols,
columns=columns,
)
"""
Queries data from a specified BigQuery table based on the provided parameters.
Args:
start_date_time (str): The start datetime for the data query in ISO format.
end_date_time (str): The end datetime for the data query in ISO format.
table (str): The name of the table from which to retrieve the data.
tenant (Tenant): An Enum representing the site ownership. Defaults to `Tenant.ALL` if not supplied, representing all tenants.
dynamic_query (bool): A boolean value to signal bypassing the automatic query composition to a more dynamic averaging approach.
columns (list, optional): A list of column names to include in the query. If None, all columns are included. Defaults to None.
where_fields (dict, optional): A dictionary of additional WHERE clause filters where the key is the field name and the value is the filter value. Defaults to None.
null_cols (list, optional): A list of columns to filter out null values for. Defaults to None.
Returns:
pd.DataFrame: A pandas DataFrame containing the queried data, with duplicates removed and timestamps converted to `datetime` format. If no data is retrieved, an empty DataFrame is returned.
"""
if not dynamic_query:
query = self.compose_query(
QueryType.GET,
table=table,
tenant=tenant,
start_date_time=start_date_time,
end_date_time=end_date_time,
where_fields=where_fields,
null_cols=null_cols,
columns=columns,
)
else:
query = self.dynamic_averaging_query(
table, start_date_time, end_date_time, time_granularity=time_granularity
)

dataframe = self.client.query(query=query).result().to_dataframe()

if dataframe.empty:
return pd.DataFrame()

dataframe.rename(columns={time_granularity.lower(): "timestamp"}, inplace=True)
dataframe["timestamp"] = dataframe["timestamp"].apply(pd.to_datetime)

return dataframe.drop_duplicates(keep="first")

def dynamic_averaging_query(
self,
table: str,
start_date_time: str,
end_date_time: str,
exclude_columns: list = None,
group_by: list = None,
time_granularity: str = "HOUR",
) -> str:
"""
Constructs a dynamic SQL query to select and average numeric columns, allowing exclusions,
custom groupings, and ordering by a specified time granularity (hour, day, week, month).
Args:
table (str): The BigQuery table to query.
start_date_time (str): The start datetime for filtering records.
end_date_time (str): The end datetime for filtering records.
exclude_columns (list): List of columns to exclude from selection and aggregation.
Defaults to excluding `device_number`, `device_id`, `site_id`, `timestamp`.
group_by (list): List of columns to group by in the query. Defaults to
`["device_number", "device_id", "site_id", <time_granularity>]`.
time_granularity (str): Time truncation granularity for ordering, must be one of `HOUR`,
`DAY`, `WEEK`, or `MONTH`. Defaults to `HOUR`.
Returns:
str: A dynamic SQL query string that averages numeric columns and groups data based on
the provided granularity and group-by fields.
Example:
query = dynamic_averaging_query(
table="project.dataset.table",
start_date_time="2024-01-01T00:00:00",
end_date_time="2024-01-04T00:00:00",
exclude_columns=["device_number", "device_id", "site_id", "timestamp"],
group_by=["device_number", "site_id"],
time_granularity="HOUR"
)
"""
valid_granularities = ["HOUR", "DAY", "WEEK", "MONTH"]
if time_granularity.upper() not in valid_granularities:
logger.exception(
f"Invalid time granularity: {time_granularity}. Must be one of {valid_granularities}."
)

# Default for exclude_columns and group_by
exclude_columns = exclude_columns or [
"device_number",
"device_id",
"site_id",
"timestamp",
]
group_by = group_by or ["device_number", "device_id", "site_id"]

numeric_columns = self.get_columns(
table, [ColumnDataType.FLOAT, ColumnDataType.INTEGER]
)

# Construct dynamic AVG statements for numeric columns
avg_columns = ",\n ".join(
[
f"AVG({col}) AS {col}"
for col in numeric_columns
if col not in exclude_columns
]
)

# Include time granularity in both SELECT and GROUP BY
timestamp_trunc = f"TIMESTAMP_TRUNC(timestamp, {time_granularity.upper()}) AS {time_granularity.lower()}"
group_by_clause = ", ".join(group_by + [time_granularity.lower()])

query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""

return query

def query_devices(self, tenant: Tenant) -> pd.DataFrame:
if tenant == Tenant.ALL:
query = f"""
Expand Down
47 changes: 24 additions & 23 deletions src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,29 +218,30 @@ class Config:
"pm1_pi": "pm1",
}

BIGQUERY_SCHEMA_MAPPING = {
"hourly_measurements_table": "measurements.json",
"daily_measurements_table": "measurements.json",
"raw_measurements_table": "raw_measurements.json",
"hourly_weather_table": "weather_data.json",
"raw_weather_table": "weather_data.json",
"latest_measurements_table": "latest_measurements.json",
"consolidated_data_table": "data_warehouse.json",
"airqlouds_table": "airqlouds.json",
"airqlouds_sites_table": "airqlouds_sites.json",
"grids_table": "grids.json",
"cohorts_table": "cohorts.json",
"grids_sites_table": "grids_sites.json",
"cohorts_devices_table": "cohorts_devices.json",
"sites_table": "sites.json",
"sites_meta_data_table": "sites_meta_data.json",
"sensor_positions_table": "sensor_positions.json",
"devices_table": "devices.json",
"clean_mobile_raw_measurements_table": "mobile_measurements.json",
"unclean_mobile_raw_measurements_table": "mobile_measurements.json",
"airqo_mobile_measurements_table": "airqo_mobile_measurements.json",
"bam_measurements_table": "bam_measurements.json",
"raw_bam_measurements_table": "bam_raw_measurements.json",
# Schema files mapping
SCHEMA_FILE_MAPPING = {
BIGQUERY_HOURLY_EVENTS_TABLE: "measurements.json",
BIGQUERY_DAILY_EVENTS_TABLE: "measurements.json",
BIGQUERY_RAW_EVENTS_TABLE: "raw_measurements.json",
BIGQUERY_HOURLY_WEATHER_TABLE: "weather_data.json",
BIGQUERY_RAW_WEATHER_TABLE: "weather_data.json",
BIGQUERY_LATEST_EVENTS_TABLE: "latest_measurements.json",
BIGQUERY_ANALYTICS_TABLE: "data_warehouse.json",
BIGQUERY_AIRQLOUDS_TABLE: "airqlouds.json",
BIGQUERY_AIRQLOUDS_SITES_TABLE: "airqlouds_sites.json",
BIGQUERY_GRIDS_TABLE: "grids.json",
BIGQUERY_COHORTS_TABLE: "cohorts.json",
BIGQUERY_GRIDS_SITES_TABLE: "grids_sites.json",
BIGQUERY_COHORTS_DEVICES_TABLE: "cohorts_devices.json",
BIGQUERY_SITES_TABLE: "sites.json",
BIGQUERY_SITES_META_DATA_TABLE: "sites_meta_data.json",
SENSOR_POSITIONS_TABLE: "sensor_positions.json",
BIGQUERY_DEVICES_TABLE: "devices.json",
BIGQUERY_CLEAN_RAW_MOBILE_EVENTS_TABLE: "mobile_measurements.json",
BIGQUERY_UNCLEAN_RAW_MOBILE_EVENTS_TABLE: "mobile_measurements.json",
BIGQUERY_AIRQO_MOBILE_EVENTS_TABLE: "airqo_mobile_measurements.json",
BIGQUERY_BAM_EVENTS_TABLE: "bam_measurements.json",
BIGQUERY_RAW_BAM_DATA_TABLE: "bam_raw_measurements.json",
"all": None,
}

Expand Down
Loading

0 comments on commit b4ef239

Please sign in to comment.