Skip to content

Commit

Permalink
Merge pull request #3338 from airqo-platform/satellite-data
Browse files Browse the repository at this point in the history
setup job to retrieve satelite data
  • Loading branch information
Baalmart authored Oct 11, 2024
2 parents 805d76a + 0b03f7e commit f0dc189
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 16 deletions.
4 changes: 3 additions & 1 deletion src/workflows/airqo_etl_utils/airnow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame:
"""
air_now_data = []

devices = AirQoApi().get_devices(tenant=Tenant.ALL, device_category=DeviceCategory.BAM)
devices = AirQoApi().get_devices(
tenant=Tenant.ALL, device_category=DeviceCategory.BAM
)

# Precompute device mapping for faster lookup
device_mapping = {}
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,4 +858,4 @@ def save_data_to_bigquery(data: pd.DataFrame, table: str):
if_exists="append",
credentials=credentials,
)
print("Hourly data saved to bigquery")
print(" data saved to bigquery")
102 changes: 101 additions & 1 deletion src/workflows/airqo_etl_utils/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
import os
from enum import Enum


class DeviceCategory(Enum):
Expand Down Expand Up @@ -319,3 +319,103 @@ class CityModel(Enum):
KAMPALA = "kampala"
MOMBASA = "mombasa"
DEFAULT = "default"


# TODO: May need to remove when no. of locations grow
satellite_cities = [
# NOTE: Syntax is lon, lat for GEE, not the usual lat, lon
{"city": "kampala", "coords": [32.6313083, 0.336219]},
{"city": "nairobi", "coords": [36.886487, -1.243396]},
{"city": "lagos", "coords": [3.39936, 6.53257]},
{"city": "accra", "coords": [-0.205874, 5.614818]},
{"city": "bujumbura", "coords": [29.3599, 3.3614]},
{"city": "yaounde", "coords": [11.5202, 3.8617]},
{"city": "kisumu", "coords": [34.7680, 0.0917]},
]
satellite_collections = {
"COPERNICUS/S5P/OFFL/L3_SO2": [
"SO2_column_number_density",
"SO2_column_number_density_amf",
"SO2_slant_column_number_density",
"absorbing_aerosol_index",
"cloud_fraction",
"sensor_azimuth_angle",
"sensor_zenith_angle",
"solar_azimuth_angle",
"solar_zenith_angle",
"SO2_column_number_density_15km",
],
"COPERNICUS/S5P/OFFL/L3_CO": [
"CO_column_number_density",
"H2O_column_number_density",
"cloud_height",
"sensor_altitude",
"sensor_azimuth_angle",
"sensor_zenith_angle",
"solar_azimuth_angle",
"solar_zenith_angle",
],
"COPERNICUS/S5P/OFFL/L3_NO2": [
"NO2_column_number_density",
"tropospheric_NO2_column_number_density",
"stratospheric_NO2_column_number_density",
"NO2_slant_column_number_density",
"tropopause_pressure",
"absorbing_aerosol_index",
"cloud_fraction",
"sensor_altitude",
"sensor_azimuth_angle",
"sensor_zenith_angle",
"solar_azimuth_angle",
"solar_zenith_angle",
],
"COPERNICUS/S5P/OFFL/L3_HCHO": [
"tropospheric_HCHO_column_number_density",
"tropospheric_HCHO_column_number_density_amf",
"HCHO_slant_column_number_density",
"cloud_fraction",
"solar_zenith_angle",
"solar_azimuth_angle",
"sensor_zenith_angle",
"sensor_azimuth_angle",
],
"COPERNICUS/S5P/OFFL/L3_O3": [
"O3_column_number_density",
"O3_effective_temperature",
"cloud_fraction",
"sensor_azimuth_angle",
"sensor_zenith_angle",
"solar_azimuth_angle",
"solar_zenith_angle",
],
"COPERNICUS/S5P/OFFL/L3_AER_AI": [
"absorbing_aerosol_index",
"sensor_altitude",
"sensor_azimuth_angle",
"sensor_zenith_angle",
"solar_azimuth_angle",
"solar_zenith_angle",
],
"COPERNICUS/S5P/OFFL/L3_CH4": [
"CH4_column_volume_mixing_ratio_dry_air",
"aerosol_height",
"aerosol_optical_depth",
"sensor_zenith_angle",
"sensor_azimuth_angle",
"solar_azimuth_angle",
"solar_zenith_angle",
],
"COPERNICUS/S5P/OFFL/L3_CLOUD": [
"cloud_fraction",
"cloud_top_pressure",
"cloud_top_height",
"cloud_base_pressure",
"cloud_base_height",
"cloud_optical_depth",
"surface_albedo",
"sensor_azimuth_angle",
"sensor_zenith_angle",
"solar_azimuth_angle",
"solar_zenith_angle",
],
}
1 change: 1 addition & 0 deletions src/workflows/airqo_etl_utils/message_broker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

logger = logging.getLogger(__name__)


class MessageBrokerUtils:
def __init__(self):
self.__partitions = configuration.TOPIC_PARTITIONS
Expand Down
141 changes: 141 additions & 0 deletions src/workflows/airqo_etl_utils/satellite_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from datetime import datetime
from typing import List, Dict, Any

import ee
import numpy as np
import pandas as pd
from google.oauth2 import service_account

from airqo_etl_utils.config import configuration


class SatelliteUtils:
@staticmethod
def initialize_earth_engine():
ee.Initialize(
credentials=service_account.Credentials.from_service_account_file(
configuration.GOOGLE_APPLICATION_CREDENTIALS,
scopes=["https://www.googleapis.com/auth/earthengine"],
),
project=configuration.GOOGLE_CLOUD_PROJECT_ID,
)

@staticmethod
def extract_data_for_image(image: ee.Image, aoi: ee.Geometry.Point) -> ee.Feature:
return ee.Feature(
None,
image.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=aoi,
scale=1113.2, # TODO: Review this, possibly a need for custom scales.
).set("date", image.date().format("YYYY-MM-dd")),
)

@staticmethod
def get_satellite_data(
aoi: ee.Geometry.Point,
collection: str,
fields: List[str],
start_date: datetime,
end_date: datetime,
) -> ee.FeatureCollection:
return (
ee.ImageCollection(collection)
.filterDate(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
.filterBounds(aoi)
.select(fields)
.map(lambda image: SatelliteUtils.extract_data_for_image(image, aoi))
)

@staticmethod
def process_time_series(
time_series: Dict[str, Any], fields: List[str]
) -> Dict[str, Dict[str, List[float]]]:
daily_data = {}
for feature in time_series["features"]:
date = feature["properties"]["date"]
if date not in daily_data:
daily_data[date] = {field: [] for field in fields}
for field in fields:
if field in feature["properties"]:
daily_data[date][field].append(feature["properties"][field])
return daily_data

@staticmethod
def calculate_daily_means(
daily_data: Dict[str, Dict[str, List[float]]], fields: List[str], city: str
) -> List[Dict[str, Any]]:
results = []
for date, data in daily_data.items():
result = {
"timestamp": datetime.strptime(date, "%Y-%m-%d").replace(
hour=0, minute=0, second=0, microsecond=0
),
"city": city,
}
for field in fields:
if data[field]:
result[field] = sum(filter(None, data[field])) / len(data[field])
else:
result[field] = None
results.append(result)
return results

@staticmethod
def extract_data_for_location(
location: Dict[str, Any],
collections: Dict[str, List[str]],
start_date: datetime,
end_date: datetime,
) -> List[Dict[str, Any]]:
aoi = ee.Geometry.Point(location["coords"])
all_data = []

for collection, fields in collections.items():
prefixed_fields = [f"{collection}_{field}" for field in fields]
satellite_data = SatelliteUtils.get_satellite_data(
aoi, collection, fields, start_date, end_date
)
time_series = satellite_data.getInfo()
daily_data = SatelliteUtils.process_time_series(time_series, fields)
prefixed_daily_data = {
date: {
f"{collection}_{field}": values for field, values in data.items()
}
for date, data in daily_data.items()
}
all_data.extend(
SatelliteUtils.calculate_daily_means(
prefixed_daily_data, prefixed_fields, location["city"]
)
)

return all_data

@staticmethod
def extract_satellite_data(
locations: List[Dict[str, Any]],
start_date: datetime,
end_date: datetime,
satellite_collections: Dict[str, List[str]],
) -> pd.DataFrame:
SatelliteUtils.initialize_earth_engine()
all_data = []
for location in locations:
all_data.extend(
SatelliteUtils.extract_data_for_location(
location, satellite_collections, start_date, end_date
)
)
all_data = pd.DataFrame(all_data)

df_fixed = all_data.groupby(["timestamp", "city"]).agg(
lambda x: x.dropna().iloc[0] if len(x.dropna()) > 0 else np.nan
)

df_fixed.columns = df_fixed.columns.str.lower()
df_fixed.columns = [
c.replace("/", "_").replace(" ", "_").lower() for c in df_fixed.columns
]

return df_fixed
1 change: 1 addition & 0 deletions src/workflows/airqo_etl_utils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"great_expectations===0.18.18",
"airflow-provider-great-expectations==0.2.8",
"sqlalchemy-bigquery==1.11.0",
"earthengine-api",
],
keywords=["python", "airflow", "AirQo"],
license="MIT",
Expand Down
38 changes: 28 additions & 10 deletions src/workflows/airqo_etl_utils/tests/test_airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ def test_map_site_ids_to_historical_data(self):
self.assertEqual(data.iloc[0]["device_number"], 2)
self.assertEqual(date_to_str(data.iloc[0]["timestamp"]), "2022-01-01T10:00:00Z")

@patch('airqo_etl_utils.airqo_utils.BigQueryApi')
@patch("airqo_etl_utils.airqo_utils.BigQueryApi")
def test_extract_aggregated_raw_data(self, MockBigQueryApi):

mock_bigquery_api = MockBigQueryApi.return_value

input_data = {
Expand Down Expand Up @@ -104,7 +103,18 @@ def test_extract_aggregated_raw_data(self, MockBigQueryApi):
102.75,
98.78,
],
"s2_pm2_5": [94.98, 84.6, 90.67, 99.8, 96.33, 97.8, 88.62, 95.4, 95.43, 92.05],
"s2_pm2_5": [
94.98,
84.6,
90.67,
99.8,
96.33,
97.8,
88.62,
95.4,
95.43,
92.05,
],
"pm10": [
106.415,
96.675,
Expand Down Expand Up @@ -176,8 +186,10 @@ def test_extract_aggregated_raw_data(self, MockBigQueryApi):
}

input_dataframe = pd.DataFrame(input_data)
input_dataframe['timestamp'] = pd.to_datetime(input_dataframe['timestamp'])
input_dataframe['device_number'] = input_dataframe['device_number'].astype('int64')
input_dataframe["timestamp"] = pd.to_datetime(input_dataframe["timestamp"])
input_dataframe["device_number"] = input_dataframe["device_number"].astype(
"int64"
)

mock_bigquery_api.query_data.return_value = input_dataframe

Expand Down Expand Up @@ -216,13 +228,19 @@ def test_extract_aggregated_raw_data(self, MockBigQueryApi):
"site_id": ["60d058c8048305120d2d616d"],
}
expected_dataframe = pd.DataFrame(expected_data)
expected_dataframe['timestamp'] = pd.to_datetime(expected_dataframe['timestamp'])
expected_dataframe['device_number'] = expected_dataframe['device_number'].astype('int64')
expected_dataframe["timestamp"] = pd.to_datetime(
expected_dataframe["timestamp"]
)
expected_dataframe["device_number"] = expected_dataframe[
"device_number"
].astype("int64")

start_date_time = '2024-07-10T00:00:00Z'
end_date_time = '2024-07-11T11:59:59Z'
start_date_time = "2024-07-10T00:00:00Z"
end_date_time = "2024-07-11T11:59:59Z"

result = AirQoDataUtils.extract_aggregated_raw_data(start_date_time, end_date_time)
result = AirQoDataUtils.extract_aggregated_raw_data(
start_date_time, end_date_time
)

pd.testing.assert_frame_equal(result, expected_dataframe)

Expand Down
1 change: 0 additions & 1 deletion src/workflows/dags/airnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from airqo_etl_utils.workflows_custom_utils import AirflowUtils



# Historical Data DAG
@dag(
dag_id="Airnow-Historical-Bam-Data",
Expand Down
1 change: 0 additions & 1 deletion src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
start_date=days_ago(1),
)
def airqo_bam_historical_measurements():

@task()
def extract_bam_data(**kwargs) -> pd.DataFrame:
start_date_time, end_date_time = DateUtils.get_dag_date_time_values(
Expand Down
Loading

0 comments on commit f0dc189

Please sign in to comment.