Skip to content

Commit

Permalink
Add script for India CPCB AirQualityData
Browse files Browse the repository at this point in the history
  • Loading branch information
vish-cs committed Dec 9, 2024
1 parent c9f7994 commit 8a66489
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 0 deletions.
28 changes: 28 additions & 0 deletions scripts/india_cpcb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# India CPCB Air Quality Data

This is an import of Air Quality dataset from [India CPCB website](https://airquality.cpcb.gov.in/caaqms/rss\_feed). This is an hourly RSS feed.

This dataset involves two imports:
- Air Quality sites
- Air Quality data

**Air Quality Sites**
We need to import site info along with air quality data since the RSS feed may include new sites which have not been added to Data Commons. Thus, we need to periodically import site data as well. In order to add sites, place resolution is performed. Each site includes LatLong information. These coordinates are used with the DC resolve API to retrieve the corresponding district dcid (WikiDataId). Finally, this data is joined (using WikiDataId) with the India LGD dataset to obtain the LGD code for the corresponding district for each site. This LGD code along with site/city is used to construct SiteId in the form cpcbAq/LGDDistrictCode\_Station\_City

**Air Quality Data**
Steps involved in importing the air quality data:
- Convert the RSS feed to CSV using XSLT stylesheet
- Join the dataset with sites data (obtained in the previous step) using site name to get place information (SiteId) for each observation


**Known issues**
Few sites have “duplicate” stations which are differentiated only by a suffix attached to the site name. Current site id ignores the suffix (to align with historical site data already imported into KG). Thus, we drop air quality data associated with such duplicate sites in the current implementation.

**Script usage**

```
export DC_API_KEY=<api_key>
python process.py
```

The script can also read the air quality data from a file for processing using the flag dataFile.
Empty file added scripts/india_cpcb/__init__.py
Empty file.
29 changes: 29 additions & 0 deletions scripts/india_cpcb/india_air_quality.xslt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
<xsl:output indent="yes"/>
<xsl:strip-space elements="*"/>
<xsl:template match="Station">
<Station>
<SiteName><xsl:value-of select="@id"/></SiteName>
<City><xsl:value-of select="../@id"/></City>
<State><xsl:value-of select="../../@id"/></State>
<Country><xsl:value-of select="../../../@id"/></Country>
<LastUpdate><xsl:value-of select="@lastupdate"/></LastUpdate>
<Latitude><xsl:value-of select="@latitude"/></Latitude>
<Longitude><xsl:value-of select="@longitude"/></Longitude>
<xsl:apply-templates select="Pollutant_Index"/>
<AQI><xsl:value-of select="Air_Quality_Index/@Value"/></AQI>
</Station>
</xsl:template>

<xsl:template match="Pollutant_Index">
<xsl:element name="{@id}_Mean">
<xsl:value-of select="@Avg"/>
</xsl:element>
<xsl:element name="{@id}_Min">
<xsl:value-of select="@Min"/>
</xsl:element>
<xsl:element name="{@id}_Max">
<xsl:value-of select="@Max"/>
</xsl:element>
</xsl:template>
</xsl:stylesheet>
6 changes: 6 additions & 0 deletions scripts/india_cpcb/india_air_quality_sites.tmcf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Node: E:India_AirQuality->E1
typeOf: dcs:AirQualitySite
dcid: C:india_air_quality_sites->SiteId
name: C:india_air_quality_sites->SiteName
location: C:india_air_quality_sites->Coordinates
containedInPlace: C:india_air_quality_sites->DistrictDCID
249 changes: 249 additions & 0 deletions scripts/india_cpcb/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from absl import app
from absl import flags
from absl import logging
from datetime import datetime
from google.cloud.storage import Client
from io import StringIO
import os
import pandas as pd
import requests

FLAGS = flags.FLAGS
flags.DEFINE_string("dataFile", "", \
"Location of air quality data file (local/gcs)") #optional
flags.DEFINE_string("sitesFile", "", \
"Location of air quality sites file (local/gcs).") #optional
flags.DEFINE_string("outputBucket", "", \
"Location of output GCS bucket.") #optional

# URL of the RSS feed
RSS_URL = "https://airquality.cpcb.gov.in/caaqms/rss_feed"
POLLUTANTS = {
"PM2.5": "PM2.5",
"PM10": "PM10",
"NO2": "NO2",
"SO2": "SO2",
"CO": "CO",
"Ammonia": "NH3",
"Ozone": "OZONE",
}
STATS = ["Min", "Max", "Mean"]
LGD_FILE = "../india/geo/LocalGovernmentDirectory_Districts.csv"
OUTPUT_FILE = "india_air_quality_data"


class IndiaAirQualityDataLoader:
"""
This class is responsible for processing India CPCB air quality data (RSS
feed).
"""

def __init__(self):
self.xslt = "india_air_quality.xslt" # stylesheet for CSV conversion
self.xpath = "//Station" # Row element in XML
self.resolve_api_url = "https://api.datacommons.org/v2/resolve?key=" + \
os.environ["DC_API_KEY"]

# Downloads a GCS file to local.
def download_gcs_file(self, bucket: str, file_name: str):
gcs_client = Client()
bucket = gcs_client.get_bucket(bucket)
blob = bucket.get_blob(file_name)
blob.download_to_filename(file_name)

# Uploads a GCS file from local.
def upload_gcs_file(self, bucket: str, file_name: str):
gcs_client = Client()
bucket = gcs_client.get_bucket(bucket)
blob = bucket.blob(file_name)
blob.upload_from_filename(file_name)

# Invokes DC resolve API to get district wikidata id from the LatLang
# coordinates of the station.
def __get_place_id(self, row):
resolve_api = self.resolve_api_url + "&nodes=" + \
str(row["Latitude"]) + "%23" + str(row["Longitude"]) + \
"&property=%3C-geoCoordinate-%3Edcid"
response = requests.get(resolve_api, timeout=30)
if response.status_code == 200:
data = response.json()
for i in ((data["entities"])[0])["candidates"]:
if(i["dominantType"] == "AdministrativeArea2" \
or i["dominantType"] == "City"):
dcid = i["dcid"]
return dcid
return None

def __cleanup_name(self, name):
return name.replace("-","").replace(".","")\
.replace("(","").replace(")","").replace(" ","")

# Reads RSS feed of air quality data and converts to CSV format using XSLT
# stylesheet. Data is read from a local/gcs file if provided. Else it reads
# the live RSS feed.
def get_feed(self, file_name: str) -> pd.DataFrame:
if not file_name:
response = requests.get(RSS_URL, timeout=60)
if response.status_code == 200:
rss_feed = response.text
print(rss_feed, file = open(OUTPUT_FILE + ".xml", \
"w", encoding="utf-8"))
else:
raise IOError("Error in fetching AQI data from the source: " +
RSS_URL)
else:
logging.info("Reading AQI data from file: %s", file_name)
feed = open(file_name, "r", encoding="utf-8")
rss_feed = feed.read()
feed.close()

in_df = pd.read_xml(StringIO(rss_feed),
xpath=self.xpath,
stylesheet=self.xslt)
return in_df

# Processes air quality data to add formatting and site id info.
def process_data(self, in_df: pd.DataFrame, \
site_df: pd.DataFrame) -> pd.DataFrame:
# Generates the Flattened csv data from India CPCB RSS feed.
in_df["LastUpdate"] = in_df["LastUpdate"].apply(
lambda x: datetime.strptime(x, "%d-%m-%Y %H:%M:%S").isoformat())
place_df = site_df[[
"SiteName", "DistrictDCID", "LGDDistrictCode", "SiteId"
]]
out_df = pd.merge(in_df, place_df, on="SiteName", how="inner")
return out_df

# Generates air quality sites CSV from the feed.
def generate_sites(self, in_df: pd.DataFrame, sites_file: str, \
lgd_file: str) -> pd.DataFrame:
if sites_file:
sites = pd.read_csv(sites_file)
else:
sites = pd.DataFrame(columns=[
"SiteName", "City", "State", "Country", "DistrictDCID",
"LGDDistrictCode", "SiteId", "Coordinates"
])
site_df = in_df[[
"SiteName", "Latitude", "Longitude", "City", "State", "Country"
]].copy()
# Perform place resolution only for new sites.
site_df = site_df[~site_df.SiteName.isin(sites.SiteName)]
if not site_df.empty:
lgd_df = pd.read_csv(lgd_file)
lgd_df = lgd_df[["DistrictDCID", "LGDDistrictCode"]]
logging.info("Performing place resolution for %d entities",
site_df.shape[0])
site_df["WikiDataId"] = site_df.apply(self.__get_place_id, axis=1)
# Merge site data with LGD data to fetch LGD code for each station.
site_df = pd.merge(site_df,
lgd_df,
left_on="WikiDataId",
right_on="DistrictDCID",
how="left")
site_df.dropna(subset=["LGDDistrictCode"], inplace=True)
site_df["LGDDistrictCode"] = site_df["LGDDistrictCode"].astype(
int).astype(str)
site_df = site_df.assign(
Coordinates = lambda x : "[LatLong " + x.Latitude.astype(str) + \
" " + x.Longitude.astype(str) + "]")
site_df = site_df.assign(
Station=site_df["SiteName"].str.split(",").str[0])
site_df["Station"] = site_df["Station"].apply(self.__cleanup_name)
site_df["City"] = site_df["City"].apply(self.__cleanup_name)
site_df = site_df.assign(
SiteId = lambda x : "cpcbAq/" + x.LGDDistrictCode + \
"_" + x.Station + "_" + x.City)
site_df.drop_duplicates(subset=["SiteId"], keep=False, inplace=True)
site_df.drop(
columns=["Latitude", "Longitude", "Station", "WikiDataId"],
inplace=True)
site_df = pd.concat([sites, site_df])
return site_df
return sites

# Generates template MCF corresponding to air quality pollutant data.
def generate_mcf(self, out_file: str):
tmcf_template = ("Node: E:{filename}->E{index}\n"
"typeOf: dcs:StatVarObservation\n"
"variableMeasured: dcs:{statvar}\n"
"observationAbout: C:{filename}->SiteId\n"
"observationDate: C:{filename}->LastUpdate\n"
"observationPeriod: P1H\n"
"value: C:{filename}->{value}\n")

with open(out_file, "w", encoding="utf-8") as tmcf_f:
index = 1
for key, value in POLLUTANTS.items():
for stats in STATS:
format_dict = {
"filename": OUTPUT_FILE,
"statvar": stats + "_Concentration_AirPollutant_" + key,
"value": value + "_" + stats,
"index": index,
}
tmcf_f.write(tmcf_template.format_map(format_dict))
tmcf_f.write("\n")
index += 1

index += 1
format_dict = {
"filename": OUTPUT_FILE,
"statvar": "AirQualityIndex_AirPollutant",
"value": "AQI",
"index": index,
}
tmcf_f.write(tmcf_template.format_map(format_dict))
tmcf_f.write("\n")


def main(_):
"""Runs the code."""
loader = IndiaAirQualityDataLoader()
logging.info("Loading data from source...")
data_file = FLAGS.dataFile
if FLAGS.dataFile and FLAGS.dataFile.startswith("gs://"):
self.download_gcs_file(
FLAGS.dataFile.split("/")[2],
flags.dataFile.split("/")[3])
data_file = flags.dataFile.split("/")[3]
in_df = loader.get_feed(data_file)

logging.info("Generting sites data...")
sites_file = FLAGS.sitesFile
if FLAGS.sitesFile and FLAGS.sitesFile.startswith("gs://"):
loader.download_gcs_file(
FLAGS.sitesFile.split("/")[2],
FLAGS.sitesFile.split("/")[3])
sites_file = flags.sitesFile.split("/")[3]
site_df = loader.generate_sites(in_df, sites_file, LGD_FILE)
logging.info("Processing air quality data...")
result = loader.process_data(in_df, site_df)
result.to_csv(OUTPUT_FILE + ".csv", index=False)
site_df.to_csv(OUTPUT_FILE + "_sites.csv", index=False)

logging.info("Generating tmcf file...")
loader.generate_mcf(OUTPUT_FILE + ".tmcf")

if FLAGS.outputBucket:
logging.info("Uploading results to GCS...")
loader.upload_gcs_file(FLAGS.outputBucket, OUTPUT_FILE + ".csv")
loader.upload_gcs_file(FLAGS.outputBucket, OUTPUT_FILE + "_sites.csv")


if __name__ == "__main__":
app.run(main)
45 changes: 45 additions & 0 deletions scripts/india_cpcb/process_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import unittest
import pandas as pd
from pandas.testing import assert_frame_equal
from india_cpcb.process import IndiaAirQualityDataLoader

module_dir = os.path.dirname(__file__)


class TestProcess(unittest.TestCase):
"""
Test Class to compare expected output in test/ directory to the
output generated by india_cpcb.process.IndiaAirQualityDataLoader class
"""

def test_process(self):
os.environ["DC_API_KEY"] = "test_key"
loader = IndiaAirQualityDataLoader()
in_df = loader.get_feed(os.path.join(module_dir, "test", "data.xml"))
sites_file = os.path.join(module_dir, "test", "sites.csv")
lgd_file = os.path.join(module_dir, "test", "lgd.csv")
site_df = loader.generate_sites(in_df, sites_file, lgd_file)
result = loader.process_data(in_df, site_df)

expected_data = pd.read_csv(os.path.join(module_dir, "test",
"data.csv"))
assert_frame_equal(result, expected_data)


if __name__ == "__main__":
unittest.main()
5 changes: 5 additions & 0 deletions scripts/india_cpcb/test/data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SiteName,City,State,Country,LastUpdate,Latitude,Longitude,PM2.5_Mean,PM2.5_Min,PM2.5_Max,PM10_Mean,PM10_Min,PM10_Max,NO2_Mean,NO2_Min,NO2_Max,CO_Mean,CO_Min,CO_Max,OZONE_Mean,OZONE_Min,OZONE_Max,AQI,NH3_Mean,NH3_Min,NH3_Max,SO2_Mean,SO2_Min,SO2_Max,DistrictDCID,LGDDistrictCode,SiteId
"IGI Airport (T3), Delhi - IMD",Delhi,Delhi,India,2024-10-16T14:00:00,28.5627763,77.1180053,235,77,337,200,115,293,70,32,97,86,54,111,31,2,81,235,,,,,,,wikidataId/Q2379189,84,cpcbAq/84_IGIAirportT3_Delhi
"Jawaharlal Nehru Stadium, Delhi - DPCC",Delhi,Delhi,India,2024-10-16T14:00:00,28.58028,77.233829,158,55,260,185,113,262,102,31,152,23,3,124,38,11,96,185,7.0,5.0,8.0,26.0,8.0,64.0,wikidataId/Q2061938,83,cpcbAq/83_JawaharlalNehruStadium_Delhi
"BTM Layout, Bengaluru - CPCB",Bengaluru,Karnataka,India,2024-10-16T14:00:00,12.9135218,77.5950804,28,25,34,25,19,41,61,42,73,30,28,36,14,11,20,61,9.0,8.0,9.0,12.0,9.0,19.0,wikidataId/Q806463,525,cpcbAq/525_BTMLayout_Bengaluru
"Hebbal 1st Stage, Mysuru - KSPCB",Mysuru,Karnataka,India,2024-10-16T14:00:00,12.21041,76.37376,30,16,43,44,32,59,25,13,43,29,5,61,25,4,45,44,5.0,3.0,9.0,2.0,1.0,3.0,wikidataId/Q591781,545,cpcbAq/545_Hebbal1stStage_Mysuru
Loading

0 comments on commit 8a66489

Please sign in to comment.