From d185ade82de446be7d82dad12f0e6a3613baa29a Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Thu, 24 Oct 2024 09:37:36 +0000 Subject: [PATCH] Add script for India CPCB AirQualityData --- scripts/india_cpcb/README.md | 28 ++ scripts/india_cpcb/__init__.py | 0 scripts/india_cpcb/india_air_quality.xslt | 29 ++ .../india_cpcb/india_air_quality_sites.tmcf | 6 + scripts/india_cpcb/process.py | 249 ++++++++++++++++++ scripts/india_cpcb/process_test.py | 45 ++++ scripts/india_cpcb/test/data.csv | 5 + scripts/india_cpcb/test/data.xml | 54 ++++ scripts/india_cpcb/test/lgd.csv | 5 + scripts/india_cpcb/test/sites.csv | 5 + 10 files changed, 426 insertions(+) create mode 100644 scripts/india_cpcb/README.md create mode 100644 scripts/india_cpcb/__init__.py create mode 100644 scripts/india_cpcb/india_air_quality.xslt create mode 100644 scripts/india_cpcb/india_air_quality_sites.tmcf create mode 100644 scripts/india_cpcb/process.py create mode 100644 scripts/india_cpcb/process_test.py create mode 100644 scripts/india_cpcb/test/data.csv create mode 100644 scripts/india_cpcb/test/data.xml create mode 100644 scripts/india_cpcb/test/lgd.csv create mode 100644 scripts/india_cpcb/test/sites.csv diff --git a/scripts/india_cpcb/README.md b/scripts/india_cpcb/README.md new file mode 100644 index 0000000000..fb6b4ec4bf --- /dev/null +++ b/scripts/india_cpcb/README.md @@ -0,0 +1,28 @@ +# India CPCB Air Quality Data + +This is an import of Air Quality dataset from [India CPCB website](https://airquality.cpcb.gov.in/caaqms/rss\_feed). This is an hourly RSS feed. + +This dataset involves two imports: +- Air Quality sites +- Air Quality data + +**Air Quality Sites** +We need to import site info along with air quality data since the RSS feed may include new sites which have not been added to Data Commons. Thus, we need to periodically import site data as well. In order to add sites, place resolution is performed. Each site includes LatLong information. These coordinates are used with the DC resolve API to retrieve the corresponding district dcid (WikiDataId). Finally, this data is joined (using WikiDataId) with the India LGD dataset to obtain the LGD code for the corresponding district for each site. This LGD code along with site/city is used to construct SiteId in the form cpcbAq/LGDDistrictCode\_Station\_City + +**Air Quality Data** +Steps involved in importing the air quality data: +- Convert the RSS feed to CSV using XSLT stylesheet +- Join the dataset with sites data (obtained in the previous step) using site name to get place information (SiteId) for each observation + + +**Known issues** +Few sites have “duplicate” stations which are differentiated only by a suffix attached to the site name. Current site id ignores the suffix (to align with historical site data already imported into KG). Thus, we drop air quality data associated with such duplicate sites in the current implementation. + +**Script usage** + +``` +export DC_API_KEY= +python process.py +``` + +The script can also read the air quality data from a file for processing using the flag dataFile. diff --git a/scripts/india_cpcb/__init__.py b/scripts/india_cpcb/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/scripts/india_cpcb/india_air_quality.xslt b/scripts/india_cpcb/india_air_quality.xslt new file mode 100644 index 0000000000..4189deeafc --- /dev/null +++ b/scripts/india_cpcb/india_air_quality.xslt @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/scripts/india_cpcb/india_air_quality_sites.tmcf b/scripts/india_cpcb/india_air_quality_sites.tmcf new file mode 100644 index 0000000000..0eb74df084 --- /dev/null +++ b/scripts/india_cpcb/india_air_quality_sites.tmcf @@ -0,0 +1,6 @@ +Node: E:India_AirQuality->E1 +typeOf: dcs:AirQualitySite +dcid: C:india_air_quality_sites->SiteId +name: C:india_air_quality_sites->SiteName +location: C:india_air_quality_sites->Coordinates +containedInPlace: C:india_air_quality_sites->DistrictDCID \ No newline at end of file diff --git a/scripts/india_cpcb/process.py b/scripts/india_cpcb/process.py new file mode 100644 index 0000000000..360818b6cc --- /dev/null +++ b/scripts/india_cpcb/process.py @@ -0,0 +1,249 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from absl import app +from absl import flags +from absl import logging +from datetime import datetime +from google.cloud.storage import Client +from io import StringIO +import os +import pandas as pd +import requests + +FLAGS = flags.FLAGS +flags.DEFINE_string("dataFile", "", \ + "Location of air quality data file (local/gcs)") #optional +flags.DEFINE_string("sitesFile", "", \ + "Location of air quality sites file (local/gcs).") #optional +flags.DEFINE_string("outputBucket", "", \ + "Location of output GCS bucket.") #optional + +# URL of the RSS feed +RSS_URL = "https://airquality.cpcb.gov.in/caaqms/rss_feed" +POLLUTANTS = { + "PM2.5": "PM2.5", + "PM10": "PM10", + "NO2": "NO2", + "SO2": "SO2", + "CO": "CO", + "Ammonia": "NH3", + "Ozone": "OZONE", +} +STATS = ["Min", "Max", "Mean"] +LGD_FILE = "../india/geo/LocalGovernmentDirectory_Districts.csv" +OUTPUT_FILE = "india_air_quality_data" + + +class IndiaAirQualityDataLoader: + """ + This class is responsible for processing India CPCB air quality data (RSS + feed). + """ + + def __init__(self): + self.xslt = "india_air_quality.xslt" # stylesheet for CSV conversion + self.xpath = "//Station" # Row element in XML + self.resolve_api_url = "https://api.datacommons.org/v2/resolve?key=" + \ + os.environ["DC_API_KEY"] + + # Downloads a GCS file to local. + def download_gcs_file(self, bucket: str, file_name: str): + gcs_client = Client() + bucket = gcs_client.get_bucket(bucket) + blob = bucket.get_blob(file_name) + blob.download_to_filename(file_name) + + # Uploads a GCS file from local. + def upload_gcs_file(self, bucket: str, file_name: str): + gcs_client = Client() + bucket = gcs_client.get_bucket(bucket) + blob = bucket.blob(file_name) + blob.upload_from_filename(file_name) + + # Invokes DC resolve API to get district wikidata id from the LatLang + # coordinates of the station. + def __get_place_id(self, row): + resolve_api = self.resolve_api_url + "&nodes=" + \ + str(row["Latitude"]) + "%23" + str(row["Longitude"]) + \ + "&property=%3C-geoCoordinate-%3Edcid" + response = requests.get(resolve_api, timeout=30) + if response.status_code == 200: + data = response.json() + for i in ((data["entities"])[0])["candidates"]: + if(i["dominantType"] == "AdministrativeArea2" \ + or i["dominantType"] == "City"): + dcid = i["dcid"] + return dcid + return None + + def __cleanup_name(self, name): + return name.replace("-","").replace(".","")\ + .replace("(","").replace(")","").replace(" ","") + + # Reads RSS feed of air quality data and converts to CSV format using XSLT + # stylesheet. Data is read from a local/gcs file if provided. Else it reads + # the live RSS feed. + def get_feed(self, file_name: str) -> pd.DataFrame: + if not file_name: + response = requests.get(RSS_URL, timeout=60) + if response.status_code == 200: + rss_feed = response.text + print(rss_feed, file = open(OUTPUT_FILE + ".xml", \ + "w", encoding="utf-8")) + else: + raise IOError("Error in fetching AQI data from the source: " + + RSS_URL) + else: + logging.info("Reading AQI data from file: %s", file_name) + feed = open(file_name, "r", encoding="utf-8") + rss_feed = feed.read() + feed.close() + + in_df = pd.read_xml(StringIO(rss_feed), + xpath=self.xpath, + stylesheet=self.xslt) + return in_df + + # Processes air quality data to add formatting and site id info. + def process_data(self, in_df: pd.DataFrame, \ + site_df: pd.DataFrame) -> pd.DataFrame: + # Generates the Flattened csv data from India CPCB RSS feed. + in_df["LastUpdate"] = in_df["LastUpdate"].apply( + lambda x: datetime.strptime(x, "%d-%m-%Y %H:%M:%S").isoformat()) + place_df = site_df[[ + "SiteName", "DistrictDCID", "LGDDistrictCode", "SiteId" + ]] + out_df = pd.merge(in_df, place_df, on="SiteName", how="inner") + return out_df + + # Generates air quality sites CSV from the feed. + def generate_sites(self, in_df: pd.DataFrame, sites_file: str, \ + lgd_file: str) -> pd.DataFrame: + if sites_file: + sites = pd.read_csv(sites_file) + else: + sites = pd.DataFrame(columns=[ + "SiteName", "City", "State", "Country", "DistrictDCID", + "LGDDistrictCode", "SiteId", "Coordinates" + ]) + site_df = in_df[[ + "SiteName", "Latitude", "Longitude", "City", "State", "Country" + ]].copy() + # Perform place resolution only for new sites. + site_df = site_df[~site_df.SiteName.isin(sites.SiteName)] + if not site_df.empty: + lgd_df = pd.read_csv(lgd_file) + lgd_df = lgd_df[["DistrictDCID", "LGDDistrictCode"]] + logging.info("Performing place resolution for %d entities", + site_df.shape[0]) + site_df["WikiDataId"] = site_df.apply(self.__get_place_id, axis=1) + # Merge site data with LGD data to fetch LGD code for each station. + site_df = pd.merge(site_df, + lgd_df, + left_on="WikiDataId", + right_on="DistrictDCID", + how="left") + site_df.dropna(subset=["LGDDistrictCode"], inplace=True) + site_df["LGDDistrictCode"] = site_df["LGDDistrictCode"].astype( + int).astype(str) + site_df = site_df.assign( + Coordinates = lambda x : "[LatLong " + x.Latitude.astype(str) + \ + " " + x.Longitude.astype(str) + "]") + site_df = site_df.assign( + Station=site_df["SiteName"].str.split(",").str[0]) + site_df["Station"] = site_df["Station"].apply(self.__cleanup_name) + site_df["City"] = site_df["City"].apply(self.__cleanup_name) + site_df = site_df.assign( + SiteId = lambda x : "cpcbAq/" + x.LGDDistrictCode + \ + "_" + x.Station + "_" + x.City) + site_df.drop_duplicates(subset=["SiteId"], keep=False, inplace=True) + site_df.drop( + columns=["Latitude", "Longitude", "Station", "WikiDataId"], + inplace=True) + site_df = pd.concat([sites, site_df]) + return site_df + return sites + + # Generates template MCF corresponding to air quality pollutant data. + def generate_mcf(self, out_file: str): + tmcf_template = ("Node: E:{filename}->E{index}\n" + "typeOf: dcs:StatVarObservation\n" + "variableMeasured: dcs:{statvar}\n" + "observationAbout: C:{filename}->SiteId\n" + "observationDate: C:{filename}->LastUpdate\n" + "observationPeriod: P1H\n" + "value: C:{filename}->{value}\n") + + with open(out_file, "w", encoding="utf-8") as tmcf_f: + index = 1 + for key, value in POLLUTANTS.items(): + for stats in STATS: + format_dict = { + "filename": OUTPUT_FILE, + "statvar": stats + "_Concentration_AirPollutant_" + key, + "value": value + "_" + stats, + "index": index, + } + tmcf_f.write(tmcf_template.format_map(format_dict)) + tmcf_f.write("\n") + index += 1 + + index += 1 + format_dict = { + "filename": OUTPUT_FILE, + "statvar": "AirQualityIndex_AirPollutant", + "value": "AQI", + "index": index, + } + tmcf_f.write(tmcf_template.format_map(format_dict)) + tmcf_f.write("\n") + + +def main(_): + """Runs the code.""" + loader = IndiaAirQualityDataLoader() + logging.info("Loading data from source...") + data_file = FLAGS.dataFile + if FLAGS.dataFile and FLAGS.dataFile.startswith("gs://"): + self.download_gcs_file( + FLAGS.dataFile.split("/")[2], + flags.dataFile.split("/")[3]) + data_file = flags.dataFile.split("/")[3] + in_df = loader.get_feed(data_file) + + logging.info("Generting sites data...") + sites_file = FLAGS.sitesFile + if FLAGS.sitesFile and FLAGS.sitesFile.startswith("gs://"): + loader.download_gcs_file( + FLAGS.sitesFile.split("/")[2], + FLAGS.sitesFile.split("/")[3]) + sites_file = flags.sitesFile.split("/")[3] + site_df = loader.generate_sites(in_df, sites_file, LGD_FILE) + logging.info("Processing air quality data...") + result = loader.process_data(in_df, site_df) + result.to_csv(OUTPUT_FILE + ".csv", index=False) + site_df.to_csv(OUTPUT_FILE + "_sites.csv", index=False) + + logging.info("Generating tmcf file...") + loader.generate_mcf(OUTPUT_FILE + ".tmcf") + + if FLAGS.outputBucket: + logging.info("Uploading results to GCS...") + loader.upload_gcs_file(FLAGS.outputBucket, OUTPUT_FILE + ".csv") + loader.upload_gcs_file(FLAGS.outputBucket, OUTPUT_FILE + "_sites.csv") + + +if __name__ == "__main__": + app.run(main) diff --git a/scripts/india_cpcb/process_test.py b/scripts/india_cpcb/process_test.py new file mode 100644 index 0000000000..641b724545 --- /dev/null +++ b/scripts/india_cpcb/process_test.py @@ -0,0 +1,45 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest +import pandas as pd +from pandas.testing import assert_frame_equal +from india_cpcb.process import IndiaAirQualityDataLoader + +module_dir = os.path.dirname(__file__) + + +class TestProcess(unittest.TestCase): + """ + Test Class to compare expected output in test/ directory to the + output generated by india_cpcb.process.IndiaAirQualityDataLoader class + """ + + def test_process(self): + os.environ["DC_API_KEY"] = "test_key" + loader = IndiaAirQualityDataLoader() + in_df = loader.get_feed(os.path.join(module_dir, "test", "data.xml")) + sites_file = os.path.join(module_dir, "test", "sites.csv") + lgd_file = os.path.join(module_dir, "test", "lgd.csv") + site_df = loader.generate_sites(in_df, sites_file, lgd_file) + result = loader.process_data(in_df, site_df) + + expected_data = pd.read_csv(os.path.join(module_dir, "test", + "data.csv")) + assert_frame_equal(result, expected_data) + + +if __name__ == "__main__": + unittest.main() diff --git a/scripts/india_cpcb/test/data.csv b/scripts/india_cpcb/test/data.csv new file mode 100644 index 0000000000..5140eefcbe --- /dev/null +++ b/scripts/india_cpcb/test/data.csv @@ -0,0 +1,5 @@ +SiteName,City,State,Country,LastUpdate,Latitude,Longitude,PM2.5_Mean,PM2.5_Min,PM2.5_Max,PM10_Mean,PM10_Min,PM10_Max,NO2_Mean,NO2_Min,NO2_Max,CO_Mean,CO_Min,CO_Max,OZONE_Mean,OZONE_Min,OZONE_Max,AQI,NH3_Mean,NH3_Min,NH3_Max,SO2_Mean,SO2_Min,SO2_Max,DistrictDCID,LGDDistrictCode,SiteId +"IGI Airport (T3), Delhi - IMD",Delhi,Delhi,India,2024-10-16T14:00:00,28.5627763,77.1180053,235,77,337,200,115,293,70,32,97,86,54,111,31,2,81,235,,,,,,,wikidataId/Q2379189,84,cpcbAq/84_IGIAirportT3_Delhi +"Jawaharlal Nehru Stadium, Delhi - DPCC",Delhi,Delhi,India,2024-10-16T14:00:00,28.58028,77.233829,158,55,260,185,113,262,102,31,152,23,3,124,38,11,96,185,7.0,5.0,8.0,26.0,8.0,64.0,wikidataId/Q2061938,83,cpcbAq/83_JawaharlalNehruStadium_Delhi +"BTM Layout, Bengaluru - CPCB",Bengaluru,Karnataka,India,2024-10-16T14:00:00,12.9135218,77.5950804,28,25,34,25,19,41,61,42,73,30,28,36,14,11,20,61,9.0,8.0,9.0,12.0,9.0,19.0,wikidataId/Q806463,525,cpcbAq/525_BTMLayout_Bengaluru +"Hebbal 1st Stage, Mysuru - KSPCB",Mysuru,Karnataka,India,2024-10-16T14:00:00,12.21041,76.37376,30,16,43,44,32,59,25,13,43,29,5,61,25,4,45,44,5.0,3.0,9.0,2.0,1.0,3.0,wikidataId/Q591781,545,cpcbAq/545_Hebbal1stStage_Mysuru diff --git a/scripts/india_cpcb/test/data.xml b/scripts/india_cpcb/test/data.xml new file mode 100644 index 0000000000..e47aff4e93 --- /dev/null +++ b/scripts/india_cpcb/test/data.xml @@ -0,0 +1,54 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/scripts/india_cpcb/test/lgd.csv b/scripts/india_cpcb/test/lgd.csv new file mode 100644 index 0000000000..1b1fa92408 --- /dev/null +++ b/scripts/india_cpcb/test/lgd.csv @@ -0,0 +1,5 @@ +LGDDistrictCode,LGDDistrictName,LGDStateCode,LGDStateName,LGDCensus2001Code,LGDCensus2011Code,closestDistrictLabel,district,districtLabel,districtDescription,state,stateLabel,stateDescription,census2011Code,WikiDataId,LGDDistrictNameTitleCase,districtLabelTitleCase,StateDCID,DistrictDCID +083,south,07,delhi,07.09,098,south delhi,http://www.wikidata.org/entity/Q2061938,south delhi,"district of Delhi, India",http://www.wikidata.org/entity/Q1353,delhi,Indian metropolis and union territory that includes New Delhi,098,Q2061938,South,South Delhi,wikidataId/Q1353,wikidataId/Q2061938 +084,south west,07,delhi,07.08,097,south west delhi,http://www.wikidata.org/entity/Q2379189,south west delhi,"district of Delhi, India",http://www.wikidata.org/entity/Q1353,delhi,Indian metropolis and union territory that includes New Delhi,097,Q2379189,South West,South West Delhi,wikidataId/Q1353,wikidataId/Q2379189 +525,bengaluru urban,29,karnataka,29.20,572,bengaluru urban,http://www.wikidata.org/entity/Q806463,bengaluru urban,"district of Karnataka, India",http://www.wikidata.org/entity/Q1185,karnataka,Indian state,572,Q806463,Bengaluru Urban,Bengaluru Urban,wikidataId/Q1185,wikidataId/Q806463 +545,mysuru,29,karnataka,29.26,577,mysore,http://www.wikidata.org/entity/Q591781,mysore,"district in Karnataka, India",http://www.wikidata.org/entity/Q1185,karnataka,Indian state,577,Q591781,Mysuru,Mysore,wikidataId/Q1185,wikidataId/Q591781 \ No newline at end of file diff --git a/scripts/india_cpcb/test/sites.csv b/scripts/india_cpcb/test/sites.csv new file mode 100644 index 0000000000..056f07db5f --- /dev/null +++ b/scripts/india_cpcb/test/sites.csv @@ -0,0 +1,5 @@ +SiteName,City,State,Country,DistrictDCID,LGDDistrictCode,SiteId,Coordinates +"IGI Airport (T3), Delhi - IMD",Delhi,Delhi,India,wikidataId/Q2379189,84,cpcbAq/84_IGIAirportT3_Delhi,[LatLong 28.5627763 77.1180053] +"BTM Layout, Bengaluru - CPCB",Bengaluru,Karnataka,India,wikidataId/Q806463,525,cpcbAq/525_BTMLayout_Bengaluru,[LatLong 12.9135218 77.5950804] +"Jawaharlal Nehru Stadium, Delhi - DPCC",Delhi,Delhi,India,wikidataId/Q2061938,83,cpcbAq/83_JawaharlalNehruStadium_Delhi,[LatLong 28.58028 77.233829] +"Hebbal 1st Stage, Mysuru - KSPCB",Mysuru,Karnataka,India,wikidataId/Q591781,545,cpcbAq/545_Hebbal1stStage_Mysuru,[LatLong 12.21041 76.37376]