Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/integration iqair devices #3993

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_devices(
device_category: DeviceCategory = DeviceCategory.NONE,
) -> List[Dict[str, Any]]:
"""
Retrieve devices given a tenant and device category.
Retrieve devices given a network and device category.

Args:
- network (str): An Enum that represents site ownership.
Expand Down Expand Up @@ -198,7 +198,6 @@ def get_devices(
"device_category": str(
DeviceCategory.from_str(device.pop("category", None))
),
"network": device.get("network"),
"device_manufacturer": device.get("network", "airqo"),
**device,
}
Expand Down
13 changes: 7 additions & 6 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame:
null_cols=["pm2_5_calibrated_value"],
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.AIRQO,
network=str(Tenant.AIRQO),
)

return DataValidationUtils.remove_outliers(hourly_uncalibrated_data)
Expand All @@ -79,7 +79,7 @@ def extract_data_from_bigquery(
table=table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.AIRQO,
network=str(Tenant.AIRQO),
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down Expand Up @@ -117,7 +117,10 @@ def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame:

@staticmethod
def extract_aggregated_raw_data(
start_date_time: str, end_date_time: str, dynamic_query: bool = False
start_date_time: str,
end_date_time: str,
network: str = None,
dynamic_query: bool = False,
) -> pd.DataFrame:
"""
Retrieves raw pm2.5 sensor data from bigquery and computes averages for the numeric columns grouped by device_number, device_id and site_id
Expand All @@ -128,9 +131,7 @@ def extract_aggregated_raw_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.raw_measurements_table,
network=str(
Tenant.AIRQO
), # TODO Replace tenant implementation with network implementation
network=network,
dynamic_query=dynamic_query,
)

Expand Down
30 changes: 20 additions & 10 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def compose_query(
table: str,
start_date_time: str,
end_date_time: str,
network: str,
network: str = "all",
where_fields: dict = None,
null_cols: list = None,
columns: list = None,
Expand All @@ -536,17 +536,15 @@ def compose_query(
Exception: If an invalid column is provided in `where_fields` or `null_cols`,
or if the `query_type` is not supported.
"""
tenant = "airqo"

null_cols = [] if null_cols is None else null_cols
where_fields = {} if where_fields is None else where_fields

columns = ", ".join(map(str, columns)) if columns else " * "
where_clause = (
f" timestamp >= '{start_date_time}' and timestamp <= '{end_date_time}' "
)
if tenant != Tenant.ALL:
where_clause = f" {where_clause} and tenant = '{str(tenant)}' or network = '{str(network)}' "
where_clause = f" timestamp between '{start_date_time}' and '{end_date_time}' "

if network:
where_clause += f"AND network = '{network}' "
Comment on lines +544 to +547
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate SQL Query Construction for network Filter

The inclusion of the network filter in the where_clause should be carefully constructed to prevent SQL injection vulnerabilities. Ensure that the network variable is sanitized or parameterized within the query to enhance security.


valid_cols = self.get_columns(table=table)

Expand Down Expand Up @@ -613,7 +611,7 @@ def query_data(
start_date_time: str,
end_date_time: str,
table: str,
network: str,
network: str = None,
dynamic_query: bool = False,
columns: list = None,
where_fields: dict = None,
Expand Down Expand Up @@ -649,7 +647,11 @@ def query_data(
)
else:
query = self.dynamic_averaging_query(
table, start_date_time, end_date_time, time_granularity=time_granularity
table,
start_date_time,
end_date_time,
network=network,
time_granularity=time_granularity,
)

dataframe = self.client.query(query=query).result().to_dataframe()
Expand All @@ -669,6 +671,7 @@ def dynamic_averaging_query(
end_date_time: str,
exclude_columns: list = None,
group_by: list = None,
network: str = "all",
time_granularity: str = "HOUR",
) -> str:
"""
Expand Down Expand Up @@ -728,11 +731,18 @@ def dynamic_averaging_query(
]
)

where_clause: str = (
f"timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' "
)

if network:
where_clause += f"AND network = '{network}' "

# Include time granularity in both SELECT and GROUP BY
timestamp_trunc = f"TIMESTAMP_TRUNC(timestamp, {time_granularity.upper()}) AS {time_granularity.lower()}"
group_by_clause = ", ".join(group_by + [time_granularity.lower()])

query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""
query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE {where_clause} GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""

return query

Expand Down
2 changes: 0 additions & 2 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def query_hourly_data(start_date_time, end_date_time) -> pd.DataFrame:
table=bigquery_api.hourly_measurements_table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.ALL,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand All @@ -57,7 +56,6 @@ def query_daily_data(start_date_time, end_date_time) -> pd.DataFrame:
table=bigquery_api.daily_measurements_table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.ALL,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down
3 changes: 0 additions & 3 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def extract_hourly_bam_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.bam_measurements_table,
tenant=Tenant.ALL,
)

if data.empty:
Expand All @@ -59,7 +58,6 @@ def extract_data_from_big_query(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.consolidated_data_table,
tenant=Tenant.ALL,
)

@staticmethod
Expand All @@ -83,7 +81,6 @@ def extract_hourly_low_cost_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.hourly_measurements_table,
tenant=Tenant.ALL,
)

if data.empty:
Expand Down
5 changes: 5 additions & 0 deletions src/workflows/airqo_etl_utils/schema/raw_measurements.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "network",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "timestamp",
"type": "TIMESTAMP",
Expand Down
2 changes: 0 additions & 2 deletions src/workflows/airqo_etl_utils/weather_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def extract_hourly_weather_data(start_date_time, end_date_time) -> pd.DataFrame:
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.hourly_weather_table,
tenant=Tenant.ALL,
)
cols = bigquery_api.get_columns(table=bigquery_api.hourly_weather_table)
return pd.DataFrame([], cols) if measurements.empty else measurements
Expand Down Expand Up @@ -79,7 +78,6 @@ def extract_raw_data_from_bigquery(start_date_time, end_date_time) -> pd.DataFra
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.raw_weather_table,
tenant=Tenant.ALL,
)

return measurements
Expand Down
1 change: 1 addition & 0 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def extract_device_measurements(**kwargs) -> pd.DataFrame:
return AirQoDataUtils.extract_aggregated_raw_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
network="airqo",
dynamic_query=True,
)

Expand Down
Loading