diff --git a/tools/differ/README.md b/tools/differ/README.md new file mode 100644 index 0000000000..3478a35f42 --- /dev/null +++ b/tools/differ/README.md @@ -0,0 +1,23 @@ +# Dataset Differ + +This utility generates a diff (point and series analysis) of two data files of the same dataset for import analysis. + +**Usage** +``` +python differ.py --currentData= --previousData= +``` + +Parameter description +currentDataFile: Location of the current MCF data file +previousDataFile: Location of the previous MCF data file +groupbyColumns: Columns to group data for diff analysis in the order var,place,time etc. Default value: “variableMeasured,observationAbout,observationDate” +valueColumns: Columns with statvar value (unit etc.) for diff analysis. Default value: "value,unit" + +Output generated is of the form below showing counts of differences for each variable. +Detailed diff output is written to a file for further analysis. + +variableMeasured added deleted modified same total +0 dcid:var1 1 0 0 0 1 +1 dcid:var2 0 2 1 1 4 +2 dcid:var3 0 0 1 0 1 +3 dcid:var4 0 2 0 0 2 diff --git a/tools/differ/__init__.py b/tools/differ/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/differ/differ.py b/tools/differ/differ.py new file mode 100644 index 0000000000..51a89cbb81 --- /dev/null +++ b/tools/differ/differ.py @@ -0,0 +1,167 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from absl import app +from absl import flags +from absl import logging +import pandas as pd +import helper +import os + +FLAGS = flags.FLAGS +flags.DEFINE_string('current_data', '', 'Path to the current MCF data.') +flags.DEFINE_string('previous_data', '', 'Path to the previous MCF data.') +flags.DEFINE_string('output_location', 'results', 'Path to the output data.') + +flags.DEFINE_string('groupby_columns', 'variableMeasured,observationAbout,observationDate', + 'Columns to group data for diff analysis in the order (var,place,time etc.).') +flags.DEFINE_string('value_columns', 'value,unit', + 'Columns with statvar value (unit etc.) for diff analysis.') + +SAMPLE_COUNT=3 + +class DatasetDiffer: + ''' + This utility generates a diff (point and series analysis) + of two data files of the same dataset for import analysis. + + Usage: + $ python differ.py --current_data= --previous_data= + + Summary output generated is of the form below showing counts of differences for each + variable. Detailed diff output is written to files for further analysis. + +variableMeasured added deleted modified same total +0 dcid:var1 1 0 0 0 1 +1 dcid:var2 0 2 1 1 4 +2 dcid:var3 0 0 1 0 1 +3 dcid:var4 0 2 0 0 2 + + ''' + + def __init__(self, groupby_columns, value_columns): + self.groupby_columns = groupby_columns.split(',') + self.value_columns = value_columns.split(',') + self.variable_column = self.groupby_columns[0] + self.place_column = self.groupby_columns[1] + self.time_column = self.groupby_columns[2] + self.diff_column = '_diff_result' + + def __cleanup_data(self, df: pd.DataFrame): + for column in ['added', 'deleted', 'modified', 'same']: + df[column] = df[column] if column in df.columns else 0 + df[column] = df[column].fillna(0).astype(int) + + # Pro-rocesses two dataset files to identify changes. + def process_data(self, previous_df: pd.DataFrame, current_df: pd.DataFrame) -> pd.DataFrame: + cur_df_columns = current_df.columns.values.tolist() + self.groupby_columns = [i for i in self.groupby_columns if i in cur_df_columns] + self.value_columns = [i for i in self.value_columns if i in cur_df_columns] + df1 = previous_df.loc[:, self.groupby_columns + self.value_columns] + df2 = current_df.loc[:, self.groupby_columns + self.value_columns] + df1['_value_combined'] = df1[self.value_columns]\ + .apply(lambda row: '_'.join(row.values.astype(str)), axis=1) + df2['_value_combined'] = df2[self.value_columns]\ + .apply(lambda row: '_'.join(row.values.astype(str)), axis=1) + df1.drop(columns=self.value_columns, inplace=True) + df2.drop(columns=self.value_columns, inplace=True) + # Perform outer join operation to identify differences. + result = pd.merge(df1, df2, on = self.groupby_columns, how='outer', indicator=self.diff_column) + result[self.diff_column] = result.apply( + lambda row: 'added' if row[self.diff_column] == 'right_only' \ + else 'deleted' if row[self.diff_column] == 'left_only' \ + else 'modified' if row['_value_combined_x'] != row['_value_combined_y'] \ + else 'same', axis=1) + return result + + # Performs point diff analysis to identify data point changes. + def point_analysis(self, in_data: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame): + column_list = [self.variable_column, self.place_column, self.time_column, self.diff_column] + result = in_data.loc[:, column_list] + # summary = summary.groupby([variable,'result'], observed=True, as_index=False).agg(['count']).reset_index() + result = result.groupby([self.variable_column, self.diff_column], observed=True, as_index=False)[[self.place_column, self.time_column]].agg(lambda x: x.tolist()) + result['size'] = result.apply(lambda row:len(row[self.place_column]), axis=1) + result[self.place_column] = result.apply(lambda row: row[self.place_column][0:SAMPLE_COUNT], axis=1) + result[self.time_column] = result.apply(lambda row: row[self.time_column][0:SAMPLE_COUNT], axis=1) + # result = result.groupby( + # [self.variable_column, self.diff_column], observed=True, as_index=False).size() + summary = result.pivot( + index=self.variable_column, columns=self.diff_column, values='size')\ + .reset_index().rename_axis(None, axis=1) + self.__cleanup_data(summary) + summary['total'] = summary.apply( + lambda row: row['added'] + row['deleted'] + row['modified'] + row['same'] , axis=1) + return summary, result + + # Performs series diff analysis to identify time series changes. + def series_analysis(self, in_data: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame): + column_list = [self.variable_column, self.place_column, self.diff_column] + result = in_data.loc[:, column_list] + result = result.groupby(column_list, as_index=False).size() + result = result.pivot( + index=[self.variable_column, self.place_column], columns=self.diff_column, values='size')\ + .reset_index().rename_axis(None, axis=1) + self.__cleanup_data(result) + result[self.diff_column] = result.apply(lambda row: 'added' if row['added'] > 0 \ + and row['deleted'] + row['modified'] + row['same'] == 0 \ + else 'deleted' if row['deleted'] > 0 and row['added'] + row['modified'] + row['same'] == 0 \ + else 'modified' if row['deleted'] > 0 or row['added'] > 0 or row['modified'] > 0 \ + else 'same', axis=1) + result = result[column_list] + result = result.groupby([self.variable_column, self.diff_column], observed=True, as_index=False)[self.place_column].agg(lambda x: x.tolist()) + result['size'] = result.apply(lambda row:len(row[self.place_column]), axis=1) + result[self.place_column] = result.apply(lambda row: row[self.place_column][0:SAMPLE_COUNT], axis=1) + summary = result.pivot( + index=self.variable_column, columns=self.diff_column, values='size')\ + .reset_index().rename_axis(None, axis=1) + self.__cleanup_data(summary) + summary['total'] = summary.apply( + lambda row: row['added'] + row['deleted'] + row['modified'] + row['same'], axis=1) + return summary, result + +def main(_): + '''Runs the code.''' + differ = DatasetDiffer( + FLAGS.groupby_columns, FLAGS.value_columns) + + if not os.path.exists(FLAGS.output_location): + os.makedirs(FLAGS.output_location) + logging.info('Loading data...') + previous_df = helper.load_data(FLAGS.current_data, FLAGS.output_location) + current_df = helper.load_data(FLAGS.previous_data, FLAGS.output_location) + + logging.info('Processing data...') + in_data = differ.process_data(previous_df, current_df) + + logging.info('Point analysis:') + summary, result = differ.point_analysis(in_data) + result.sort_values(by=[differ.diff_column, differ.variable_column], inplace=True) + print(summary.head(10)) + print(result.head(10)) + helper.write_data(summary, FLAGS.output_location, 'point-analysis-summary.csv') + helper.write_data(result, FLAGS.output_location, 'point-analysis-results.csv') + + logging.info('Series analysis:') + summary, result = differ.series_analysis(in_data) + result.sort_values(by=[differ.diff_column, differ.variable_column], inplace=True) + print(summary.head(10)) + print(result.head(10)) + helper.write_data(summary, FLAGS.output_location, 'series-analysis-summary.csv') + helper.write_data(result, FLAGS.output_location, 'series-analysis-results.csv') + + logging.info('Differ output written to %s', FLAGS.output_location) + + +if __name__ == '__main__': + app.run(main) diff --git a/tools/differ/differ_test.py b/tools/differ/differ_test.py new file mode 100644 index 0000000000..d3b25e8e0f --- /dev/null +++ b/tools/differ/differ_test.py @@ -0,0 +1,53 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest +import pandas as pd +from pandas.testing import assert_frame_equal +from differ import DatasetDiffer +import helper + +module_dir = os.path.dirname(__file__) + + +class TestDiffer(unittest.TestCase): + """ + Test Class to compare expected output in test/ directory to the + output generated by DatasetDiffer class + """ + + def test_diff_analysis(self): + groupby_columns = "variableMeasured,observationAbout,observationDate" + value_columns = "value" + + differ = DatasetDiffer(groupby_columns, value_columns) + current = helper.load_mcf_file( + os.path.join(module_dir, "test", "current.mcf")) + previous = helper.load_mcf_file( + os.path.join(module_dir, "test", "previous.mcf")) + + in_data = differ.process_data(previous, current) + summary, result = differ.point_analysis(in_data) + result = pd.read_csv(os.path.join(module_dir, "test", "result1.csv")) + assert_frame_equal(summary, result) + + summary, result = differ.series_analysis(in_data) + result = pd.read_csv(os.path.join(module_dir, "test", "result2.csv")) + assert_frame_equal(summary, result) + + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/differ/helper.py b/tools/differ/helper.py new file mode 100644 index 0000000000..49fadb1efe --- /dev/null +++ b/tools/differ/helper.py @@ -0,0 +1,69 @@ + +import pandas as pd +import re +import glob +import os +from google.cloud.storage import Client + +# Reads an MCF text file as a dataframe. +def load_mcf_file(file: str) -> pd.DataFrame: + mcf_file = open(file, "r", encoding="utf-8") + mcf_contents = mcf_file.read() + mcf_file.close() + # nodes separated by a blank line + mcf_nodes_text = mcf_contents.split("\n\n") + # lines seprated as property: constraint + mcf_line = re.compile(r"^(\w+): (.*)$") + mcf_nodes = [] + for node in mcf_nodes_text: + current_mcf_node = {} + for line in node.split("\n"): + parsed_line = mcf_line.match(line) + if parsed_line is not None: + current_mcf_node[parsed_line.group(1)] = parsed_line.group(2) + if current_mcf_node and current_mcf_node["typeOf"] == "dcid:StatVarObservation": + mcf_nodes.append(current_mcf_node) + df = pd.DataFrame(mcf_nodes) + return df + +def load_mcf_files(path: str) -> pd.DataFrame: + """Load all sharded files in the given directory and combine into a + single dataframe.""" + df = pd.DataFrame() + filenames = glob.glob(path + ".mcf") + for filename in filenames: + df2 = load_mcf_file(filename) + # Merge data frames, expects same headers + df = pd.concat([df, df2]) + return df + +def write_data(df: pd.DataFrame, path: str, file: str): + out_file = open(os.path.join(path, file), + mode="w", encoding="utf-8") + df.to_csv(out_file, index=False, mode="w") + out_file.close() + +def load_data(path: str, tmp_dir: str ) -> pd.DataFrame: + if path.startswith("gs://"): + path = get_gcs_data(path, tmp_dir) + + if path.endswith("*"): + return load_mcf_files(path) + else: + return load_mcf_file(path) + +def get_gcs_data(uri: str, tmp_dir: str) -> str: + client = Client() + bucket = client.get_bucket(uri.split("/")[2]) + if uri.endswith("*"): + blobs = client.list_blobs(bucket) + for blob in blobs: + path = os.path.join(os.getcwd(), tmp_dir, blob.name) + blob.download_to_filename(path) + return os.path.join(os.getcwd(), tmp_dir, "*") + else: + file_name = uri.split("/")[3] + blob = bucket.get_blob(file_name) + path = os.path.join(os.getcwd(), tmp_dir, file_name) + blob.download_to_filename(path) + return path diff --git a/tools/differ/test/current.mcf b/tools/differ/test/current.mcf new file mode 100644 index 0000000000..2e994a7a45 --- /dev/null +++ b/tools/differ/test/current.mcf @@ -0,0 +1,35 @@ +Node: cpcb_air_quality/E17/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Max_Concentration_AirPollutant_Ozone +value: 53.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/bhdp3vy7dee0d" + +Node: cpcb_air_quality/E18/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Mean_Concentration_AirPollutant_Ozone +value: 28.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/8e11gqvkt183b" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 42.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-25T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 40.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" diff --git a/tools/differ/test/previous.mcf b/tools/differ/test/previous.mcf new file mode 100644 index 0000000000..ce9fcb31d1 --- /dev/null +++ b/tools/differ/test/previous.mcf @@ -0,0 +1,62 @@ +Node: cpcb_air_quality/E18/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Mean_Concentration_AirPollutant_Ozone +value: 29.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/8e11gqvkt183b" + +Node: cpcb_air_quality/E16/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Min_Concentration_AirPollutant_Ozone +value: 18.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/z8j7g5sw11klh" + +Node: cpcb_air_quality/E16/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Min_Concentration_AirPollutant_Ozone +value: 18.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/z8j7g5sw11klh" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 41.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-25T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 40.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 41.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-25T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 40.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" diff --git a/tools/differ/test/result1.csv b/tools/differ/test/result1.csv new file mode 100644 index 0000000000..4d344b5639 --- /dev/null +++ b/tools/differ/test/result1.csv @@ -0,0 +1,5 @@ +variableMeasured,added,deleted,modified,same,total +dcid:Max_Concentration_AirPollutant_Ozone,1,0,0,0,1 +dcid:Mean_Concentration_AirPollutant_CO,0,2,1,1,4 +dcid:Mean_Concentration_AirPollutant_Ozone,0,0,1,0,1 +dcid:Min_Concentration_AirPollutant_Ozone,0,2,0,0,2 diff --git a/tools/differ/test/result2.csv b/tools/differ/test/result2.csv new file mode 100644 index 0000000000..4f3b954643 --- /dev/null +++ b/tools/differ/test/result2.csv @@ -0,0 +1,5 @@ +variableMeasured,added,deleted,modified,same,total +dcid:Max_Concentration_AirPollutant_Ozone,1,0,0,0,1 +dcid:Mean_Concentration_AirPollutant_CO,0,1,1,0,2 +dcid:Mean_Concentration_AirPollutant_Ozone,0,0,1,0,1 +dcid:Min_Concentration_AirPollutant_Ozone,0,2,0,0,2