diff --git a/src/workflows/airqo_etl_utils/airqo_api.py b/src/workflows/airqo_etl_utils/airqo_api.py index 4dc91e2587..b0866fc26b 100644 --- a/src/workflows/airqo_etl_utils/airqo_api.py +++ b/src/workflows/airqo_etl_utils/airqo_api.py @@ -135,7 +135,7 @@ def get_devices( device_category: DeviceCategory = DeviceCategory.NONE, ) -> List[Dict[str, Any]]: """ - Retrieve devices given a tenant and device category. + Retrieve devices given a network and device category. Args: - network (str): An Enum that represents site ownership. @@ -198,7 +198,6 @@ def get_devices( "device_category": str( DeviceCategory.from_str(device.pop("category", None)) ), - "network": device.get("network"), "device_manufacturer": device.get("network", "airqo"), **device, } diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 8524317968..71e519ec80 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -59,7 +59,7 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame: null_cols=["pm2_5_calibrated_value"], start_date_time=start_date_time, end_date_time=end_date_time, - tenant=Tenant.AIRQO, + network=str(Tenant.AIRQO), ) return DataValidationUtils.remove_outliers(hourly_uncalibrated_data) @@ -79,7 +79,7 @@ def extract_data_from_bigquery( table=table, start_date_time=start_date_time, end_date_time=end_date_time, - tenant=Tenant.AIRQO, + network=str(Tenant.AIRQO), ) return DataValidationUtils.remove_outliers(raw_data) @@ -117,7 +117,10 @@ def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame: @staticmethod def extract_aggregated_raw_data( - start_date_time: str, end_date_time: str, dynamic_query: bool = False + start_date_time: str, + end_date_time: str, + network: str = None, + dynamic_query: bool = False, ) -> pd.DataFrame: """ Retrieves raw pm2.5 sensor data from bigquery and computes averages for the numeric columns grouped by device_number, device_id and site_id @@ -128,9 +131,7 @@ def extract_aggregated_raw_data( start_date_time=start_date_time, end_date_time=end_date_time, table=bigquery_api.raw_measurements_table, - network=str( - Tenant.AIRQO - ), # TODO Replace tenant implementation with network implementation + network=network, dynamic_query=dynamic_query, ) diff --git a/src/workflows/airqo_etl_utils/bigquery_api.py b/src/workflows/airqo_etl_utils/bigquery_api.py index 1c814fd3b4..f692bb49d1 100644 --- a/src/workflows/airqo_etl_utils/bigquery_api.py +++ b/src/workflows/airqo_etl_utils/bigquery_api.py @@ -509,7 +509,7 @@ def compose_query( table: str, start_date_time: str, end_date_time: str, - network: str, + network: str = "all", where_fields: dict = None, null_cols: list = None, columns: list = None, @@ -536,17 +536,15 @@ def compose_query( Exception: If an invalid column is provided in `where_fields` or `null_cols`, or if the `query_type` is not supported. """ - tenant = "airqo" null_cols = [] if null_cols is None else null_cols where_fields = {} if where_fields is None else where_fields columns = ", ".join(map(str, columns)) if columns else " * " - where_clause = ( - f" timestamp >= '{start_date_time}' and timestamp <= '{end_date_time}' " - ) - if tenant != Tenant.ALL: - where_clause = f" {where_clause} and tenant = '{str(tenant)}' or network = '{str(network)}' " + where_clause = f" timestamp between '{start_date_time}' and '{end_date_time}' " + + if network: + where_clause += f"AND network = '{network}' " valid_cols = self.get_columns(table=table) @@ -613,7 +611,7 @@ def query_data( start_date_time: str, end_date_time: str, table: str, - network: str, + network: str = None, dynamic_query: bool = False, columns: list = None, where_fields: dict = None, @@ -649,7 +647,11 @@ def query_data( ) else: query = self.dynamic_averaging_query( - table, start_date_time, end_date_time, time_granularity=time_granularity + table, + start_date_time, + end_date_time, + network=network, + time_granularity=time_granularity, ) dataframe = self.client.query(query=query).result().to_dataframe() @@ -669,6 +671,7 @@ def dynamic_averaging_query( end_date_time: str, exclude_columns: list = None, group_by: list = None, + network: str = "all", time_granularity: str = "HOUR", ) -> str: """ @@ -728,11 +731,18 @@ def dynamic_averaging_query( ] ) + where_clause: str = ( + f"timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' " + ) + + if network: + where_clause += f"AND network = '{network}' " + # Include time granularity in both SELECT and GROUP BY timestamp_trunc = f"TIMESTAMP_TRUNC(timestamp, {time_granularity.upper()}) AS {time_granularity.lower()}" group_by_clause = ", ".join(group_by + [time_granularity.lower()]) - query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};""" + query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE {where_clause} GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};""" return query diff --git a/src/workflows/airqo_etl_utils/daily_data_utils.py b/src/workflows/airqo_etl_utils/daily_data_utils.py index 63c3f0455a..804382c8e2 100644 --- a/src/workflows/airqo_etl_utils/daily_data_utils.py +++ b/src/workflows/airqo_etl_utils/daily_data_utils.py @@ -44,7 +44,6 @@ def query_hourly_data(start_date_time, end_date_time) -> pd.DataFrame: table=bigquery_api.hourly_measurements_table, start_date_time=start_date_time, end_date_time=end_date_time, - tenant=Tenant.ALL, ) return DataValidationUtils.remove_outliers(raw_data) @@ -57,7 +56,6 @@ def query_daily_data(start_date_time, end_date_time) -> pd.DataFrame: table=bigquery_api.daily_measurements_table, start_date_time=start_date_time, end_date_time=end_date_time, - tenant=Tenant.ALL, ) return DataValidationUtils.remove_outliers(raw_data) diff --git a/src/workflows/airqo_etl_utils/data_warehouse_utils.py b/src/workflows/airqo_etl_utils/data_warehouse_utils.py index 7e3da8b2d3..3c2ed5491e 100644 --- a/src/workflows/airqo_etl_utils/data_warehouse_utils.py +++ b/src/workflows/airqo_etl_utils/data_warehouse_utils.py @@ -33,7 +33,6 @@ def extract_hourly_bam_data( start_date_time=start_date_time, end_date_time=end_date_time, table=biq_query_api.bam_measurements_table, - tenant=Tenant.ALL, ) if data.empty: @@ -59,7 +58,6 @@ def extract_data_from_big_query( start_date_time=start_date_time, end_date_time=end_date_time, table=biq_query_api.consolidated_data_table, - tenant=Tenant.ALL, ) @staticmethod @@ -83,7 +81,6 @@ def extract_hourly_low_cost_data( start_date_time=start_date_time, end_date_time=end_date_time, table=biq_query_api.hourly_measurements_table, - tenant=Tenant.ALL, ) if data.empty: diff --git a/src/workflows/airqo_etl_utils/schema/raw_measurements.json b/src/workflows/airqo_etl_utils/schema/raw_measurements.json index bd492d4209..b75ecb5b88 100644 --- a/src/workflows/airqo_etl_utils/schema/raw_measurements.json +++ b/src/workflows/airqo_etl_utils/schema/raw_measurements.json @@ -4,6 +4,11 @@ "type": "STRING", "mode": "NULLABLE" }, + { + "name": "network", + "type": "STRING", + "mode": "NULLABLE" + }, { "name": "timestamp", "type": "TIMESTAMP", diff --git a/src/workflows/airqo_etl_utils/weather_data_utils.py b/src/workflows/airqo_etl_utils/weather_data_utils.py index 725b633e83..d8c24b7e26 100644 --- a/src/workflows/airqo_etl_utils/weather_data_utils.py +++ b/src/workflows/airqo_etl_utils/weather_data_utils.py @@ -24,7 +24,6 @@ def extract_hourly_weather_data(start_date_time, end_date_time) -> pd.DataFrame: start_date_time=start_date_time, end_date_time=end_date_time, table=bigquery_api.hourly_weather_table, - tenant=Tenant.ALL, ) cols = bigquery_api.get_columns(table=bigquery_api.hourly_weather_table) return pd.DataFrame([], cols) if measurements.empty else measurements @@ -79,7 +78,6 @@ def extract_raw_data_from_bigquery(start_date_time, end_date_time) -> pd.DataFra start_date_time=start_date_time, end_date_time=end_date_time, table=bigquery_api.raw_weather_table, - tenant=Tenant.ALL, ) return measurements diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 339d41823a..4804193fea 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -42,6 +42,7 @@ def extract_device_measurements(**kwargs) -> pd.DataFrame: return AirQoDataUtils.extract_aggregated_raw_data( start_date_time=start_date_time, end_date_time=end_date_time, + network="airqo", dynamic_query=True, )