Skip to content

Commit

Permalink
Merge pull request #3994 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Dec 4, 2024
2 parents 03a25cf + 369cafd commit d9febe2
Show file tree
Hide file tree
Showing 23 changed files with 817 additions and 773 deletions.
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-da0db8f0-1733248410
tag: prod-1ea1da62-1733249735
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-da0db8f0-1733248410
tag: prod-03a25cf1-1733249950
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-1ea1da62-1733249735
tag: prod-03a25cf1-1733249950
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-1ea1da62-1733249735
tag: prod-03a25cf1-1733249950
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-da0db8f0-1733248410
tag: prod-1ea1da62-1733249735
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-da0db8f0-1733248410
tag: prod-1ea1da62-1733249735
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-da0db8f0-1733248410
tag: prod-03a25cf1-1733249950
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis
containers: eu.gcr.io/airqo-250220/airqo-stage-workflows
tag: stage-7f60b036-1733224934
tag: stage-f43cad79-1733305404
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
3 changes: 1 addition & 2 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_devices(
device_category: DeviceCategory = DeviceCategory.NONE,
) -> List[Dict[str, Any]]:
"""
Retrieve devices given a tenant and device category.
Retrieve devices given a network and device category.
Args:
- network (str): An Enum that represents site ownership.
Expand Down Expand Up @@ -198,7 +198,6 @@ def get_devices(
"device_category": str(
DeviceCategory.from_str(device.pop("category", None))
),
"network": device.get("network"),
"device_manufacturer": device.get("network", "airqo"),
**device,
}
Expand Down
34 changes: 21 additions & 13 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame:
null_cols=["pm2_5_calibrated_value"],
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.AIRQO,
network=str(Tenant.AIRQO),
)

return DataValidationUtils.remove_outliers(hourly_uncalibrated_data)
Expand All @@ -79,7 +79,7 @@ def extract_data_from_bigquery(
table=table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.AIRQO,
network=str(Tenant.AIRQO),
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down Expand Up @@ -117,7 +117,10 @@ def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame:

@staticmethod
def extract_aggregated_raw_data(
start_date_time: str, end_date_time: str, dynamic_query: bool = False
start_date_time: str,
end_date_time: str,
network: str = None,
dynamic_query: bool = False,
) -> pd.DataFrame:
"""
Retrieves raw pm2.5 sensor data from bigquery and computes averages for the numeric columns grouped by device_number, device_id and site_id
Expand All @@ -128,9 +131,7 @@ def extract_aggregated_raw_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.raw_measurements_table,
network=str(
Tenant.AIRQO
), # TODO Replace tenant implementation with network implementation
network=network,
dynamic_query=dynamic_query,
)

Expand Down Expand Up @@ -757,19 +758,26 @@ def clean_low_cost_sensor_data(
AirQoGxExpectations.from_pandas().pm2_5_low_cost_sensor_raw_data(
data
)

else:
data["timestamp"] = pd.to_datetime(data["timestamp"])
data.dropna(subset=["timestamp"], inplace=True)
data["timestamp"] = pd.to_datetime(data["timestamp"])

data.drop_duplicates(
subset=["timestamp", "device_id"], keep="first", inplace=True
)
# TODO Find an appropriate place to put this
if device_category == DeviceCategory.LOW_COST:
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
data["pm10"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
is_airqo_network = data["network"] == "airqo"

pm2_5_mean = data.loc[is_airqo_network, ["s1_pm2_5", "s2_pm2_5"]].mean(
axis=1
)
pm10_mean = data.loc[is_airqo_network, ["s1_pm10", "s2_pm10"]].mean(axis=1)

data.loc[is_airqo_network, "pm2_5_raw_value"] = pm2_5_mean
data.loc[is_airqo_network, "pm2_5"] = pm2_5_mean
data.loc[is_airqo_network, "pm10_raw_value"] = pm10_mean
data.loc[is_airqo_network, "pm10"] = pm10_mean
return data

@staticmethod
Expand Down Expand Up @@ -1032,7 +1040,7 @@ def merge_aggregated_weather_data(
@staticmethod
def extract_devices_deployment_logs() -> pd.DataFrame:
airqo_api = AirQoApi()
devices = airqo_api.get_devices(tenant=Tenant.AIRQO)
devices = airqo_api.get_devices(network=str(Tenant.AIRQO))
devices_history = pd.DataFrame()
for device in devices:
try:
Expand Down
30 changes: 20 additions & 10 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def compose_query(
table: str,
start_date_time: str,
end_date_time: str,
network: str,
network: str = "all",
where_fields: dict = None,
null_cols: list = None,
columns: list = None,
Expand All @@ -536,17 +536,15 @@ def compose_query(
Exception: If an invalid column is provided in `where_fields` or `null_cols`,
or if the `query_type` is not supported.
"""
tenant = "airqo"

null_cols = [] if null_cols is None else null_cols
where_fields = {} if where_fields is None else where_fields

columns = ", ".join(map(str, columns)) if columns else " * "
where_clause = (
f" timestamp >= '{start_date_time}' and timestamp <= '{end_date_time}' "
)
if tenant != Tenant.ALL:
where_clause = f" {where_clause} and tenant = '{str(tenant)}' or network = '{str(network)}' "
where_clause = f" timestamp between '{start_date_time}' and '{end_date_time}' "

if network:
where_clause += f"AND network = '{network}' "

valid_cols = self.get_columns(table=table)

Expand Down Expand Up @@ -613,7 +611,7 @@ def query_data(
start_date_time: str,
end_date_time: str,
table: str,
network: str,
network: str = None,
dynamic_query: bool = False,
columns: list = None,
where_fields: dict = None,
Expand Down Expand Up @@ -649,7 +647,11 @@ def query_data(
)
else:
query = self.dynamic_averaging_query(
table, start_date_time, end_date_time, time_granularity=time_granularity
table,
start_date_time,
end_date_time,
network=network,
time_granularity=time_granularity,
)

dataframe = self.client.query(query=query).result().to_dataframe()
Expand All @@ -669,6 +671,7 @@ def dynamic_averaging_query(
end_date_time: str,
exclude_columns: list = None,
group_by: list = None,
network: str = "all",
time_granularity: str = "HOUR",
) -> str:
"""
Expand Down Expand Up @@ -728,11 +731,18 @@ def dynamic_averaging_query(
]
)

where_clause: str = (
f"timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' "
)

if network:
where_clause += f"AND network = '{network}' "

# Include time granularity in both SELECT and GROUP BY
timestamp_trunc = f"TIMESTAMP_TRUNC(timestamp, {time_granularity.upper()}) AS {time_granularity.lower()}"
group_by_clause = ", ".join(group_by + [time_granularity.lower()])

query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""
query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE {where_clause} GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""

return query

Expand Down
2 changes: 0 additions & 2 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def query_hourly_data(start_date_time, end_date_time) -> pd.DataFrame:
table=bigquery_api.hourly_measurements_table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.ALL,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand All @@ -57,7 +56,6 @@ def query_daily_data(start_date_time, end_date_time) -> pd.DataFrame:
table=bigquery_api.daily_measurements_table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.ALL,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down
6 changes: 2 additions & 4 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def format_data_types(
data[col] = (
data[col]
.astype(str)
.str.replace(r"[^\w\s\.\-:]", "", regex=True)
.str.replace(r"[^\w\s\.\-+:]", "", regex=True)
.str.replace(r"(?<!\.\d{3})Z$", ".000Z", regex=True)
) # Negative lookbehind to add missing milliseconds if needed
data[col] = pd.to_datetime(data[col], errors="coerce")
data[col] = pd.to_datetime(data[col], errors="coerce", utc=True)

if integers:
for col in integers:
Expand Down Expand Up @@ -142,7 +142,6 @@ def remove_outliers(data: pd.DataFrame) -> pd.DataFrame:
dtype: list(set(columns) & set(data.columns))
for dtype, columns in column_types.items()
}

data = DataValidationUtils.format_data_types(
data=data,
floats=filtered_columns[ColumnDataType.FLOAT],
Expand All @@ -151,7 +150,6 @@ def remove_outliers(data: pd.DataFrame) -> pd.DataFrame:
)

validated_columns = list(chain.from_iterable(filtered_columns.values()))

for col in validated_columns:
is_airqo_network = data["network"] == "airqo"
mapped_name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None)
Expand Down
3 changes: 0 additions & 3 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def extract_hourly_bam_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.bam_measurements_table,
tenant=Tenant.ALL,
)

if data.empty:
Expand All @@ -59,7 +58,6 @@ def extract_data_from_big_query(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.consolidated_data_table,
tenant=Tenant.ALL,
)

@staticmethod
Expand All @@ -83,7 +81,6 @@ def extract_hourly_low_cost_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.hourly_measurements_table,
tenant=Tenant.ALL,
)

if data.empty:
Expand Down
Loading

0 comments on commit d9febe2

Please sign in to comment.