Skip to content

Commit

Permalink
Add dataset import differ utility
Browse files Browse the repository at this point in the history
  • Loading branch information
vish-cs committed Dec 12, 2024
1 parent d8e35d9 commit f440497
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 0 deletions.
23 changes: 23 additions & 0 deletions tools/differ/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Dataset Differ

This utility generates a diff (point and series analysis) of two data files of the same dataset for import analysis.

**Usage**
```
python differ.py --currentData=<filepath> --previousData=<filepath>
```

Parameter description
currentDataFile: Location of the current MCF data file
previousDataFile: Location of the previous MCF data file
groupbyColumns: Columns to group data for diff analysis in the order var,place,time etc. Default value: “variableMeasured,observationAbout,observationDate”
valueColumns: Columns with statvar value (unit etc.) for diff analysis. Default value: "value,unit"

Output generated is of the form below showing counts of differences for each variable.
Detailed diff output is written to a file for further analysis.

variableMeasured added deleted modified same total
0 dcid:var1 1 0 0 0 1
1 dcid:var2 0 2 1 1 4
2 dcid:var3 0 0 1 0 1
3 dcid:var4 0 2 0 0 2
Empty file added tools/differ/__init__.py
Empty file.
167 changes: 167 additions & 0 deletions tools/differ/differ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from absl import app
from absl import flags
from absl import logging
import pandas as pd
import helper
import os

FLAGS = flags.FLAGS
flags.DEFINE_string('current_data', '', 'Path to the current MCF data.')
flags.DEFINE_string('previous_data', '', 'Path to the previous MCF data.')
flags.DEFINE_string('output_location', 'results', 'Path to the output data.')

flags.DEFINE_string('groupby_columns', 'variableMeasured,observationAbout,observationDate',
'Columns to group data for diff analysis in the order (var,place,time etc.).')
flags.DEFINE_string('value_columns', 'value,unit',
'Columns with statvar value (unit etc.) for diff analysis.')

SAMPLE_COUNT=3

class DatasetDiffer:
'''
This utility generates a diff (point and series analysis)
of two data files of the same dataset for import analysis.
Usage:
$ python differ.py --current_data=<filepath> --previous_data=<filepath>
Summary output generated is of the form below showing counts of differences for each
variable. Detailed diff output is written to files for further analysis.
variableMeasured added deleted modified same total
0 dcid:var1 1 0 0 0 1
1 dcid:var2 0 2 1 1 4
2 dcid:var3 0 0 1 0 1
3 dcid:var4 0 2 0 0 2
'''

def __init__(self, groupby_columns, value_columns):
self.groupby_columns = groupby_columns.split(',')
self.value_columns = value_columns.split(',')
self.variable_column = self.groupby_columns[0]
self.place_column = self.groupby_columns[1]
self.time_column = self.groupby_columns[2]
self.diff_column = '_diff_result'

def __cleanup_data(self, df: pd.DataFrame):
for column in ['added', 'deleted', 'modified', 'same']:
df[column] = df[column] if column in df.columns else 0
df[column] = df[column].fillna(0).astype(int)

# Pro-rocesses two dataset files to identify changes.
def process_data(self, previous_df: pd.DataFrame, current_df: pd.DataFrame) -> pd.DataFrame:
cur_df_columns = current_df.columns.values.tolist()
self.groupby_columns = [i for i in self.groupby_columns if i in cur_df_columns]
self.value_columns = [i for i in self.value_columns if i in cur_df_columns]
df1 = previous_df.loc[:, self.groupby_columns + self.value_columns]
df2 = current_df.loc[:, self.groupby_columns + self.value_columns]
df1['_value_combined'] = df1[self.value_columns]\
.apply(lambda row: '_'.join(row.values.astype(str)), axis=1)
df2['_value_combined'] = df2[self.value_columns]\
.apply(lambda row: '_'.join(row.values.astype(str)), axis=1)
df1.drop(columns=self.value_columns, inplace=True)
df2.drop(columns=self.value_columns, inplace=True)
# Perform outer join operation to identify differences.
result = pd.merge(df1, df2, on = self.groupby_columns, how='outer', indicator=self.diff_column)
result[self.diff_column] = result.apply(
lambda row: 'added' if row[self.diff_column] == 'right_only' \
else 'deleted' if row[self.diff_column] == 'left_only' \
else 'modified' if row['_value_combined_x'] != row['_value_combined_y'] \
else 'same', axis=1)
return result

# Performs point diff analysis to identify data point changes.
def point_analysis(self, in_data: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame):
column_list = [self.variable_column, self.place_column, self.time_column, self.diff_column]
result = in_data.loc[:, column_list]
# summary = summary.groupby([variable,'result'], observed=True, as_index=False).agg(['count']).reset_index()
result = result.groupby([self.variable_column, self.diff_column], observed=True, as_index=False)[[self.place_column, self.time_column]].agg(lambda x: x.tolist())
result['size'] = result.apply(lambda row:len(row[self.place_column]), axis=1)
result[self.place_column] = result.apply(lambda row: row[self.place_column][0:SAMPLE_COUNT], axis=1)
result[self.time_column] = result.apply(lambda row: row[self.time_column][0:SAMPLE_COUNT], axis=1)
# result = result.groupby(
# [self.variable_column, self.diff_column], observed=True, as_index=False).size()
summary = result.pivot(
index=self.variable_column, columns=self.diff_column, values='size')\
.reset_index().rename_axis(None, axis=1)
self.__cleanup_data(summary)
summary['total'] = summary.apply(
lambda row: row['added'] + row['deleted'] + row['modified'] + row['same'] , axis=1)
return summary, result

# Performs series diff analysis to identify time series changes.
def series_analysis(self, in_data: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame):
column_list = [self.variable_column, self.place_column, self.diff_column]
result = in_data.loc[:, column_list]
result = result.groupby(column_list, as_index=False).size()
result = result.pivot(
index=[self.variable_column, self.place_column], columns=self.diff_column, values='size')\
.reset_index().rename_axis(None, axis=1)
self.__cleanup_data(result)
result[self.diff_column] = result.apply(lambda row: 'added' if row['added'] > 0 \
and row['deleted'] + row['modified'] + row['same'] == 0 \
else 'deleted' if row['deleted'] > 0 and row['added'] + row['modified'] + row['same'] == 0 \
else 'modified' if row['deleted'] > 0 or row['added'] > 0 or row['modified'] > 0 \
else 'same', axis=1)
result = result[column_list]
result = result.groupby([self.variable_column, self.diff_column], observed=True, as_index=False)[self.place_column].agg(lambda x: x.tolist())
result['size'] = result.apply(lambda row:len(row[self.place_column]), axis=1)
result[self.place_column] = result.apply(lambda row: row[self.place_column][0:SAMPLE_COUNT], axis=1)
summary = result.pivot(
index=self.variable_column, columns=self.diff_column, values='size')\
.reset_index().rename_axis(None, axis=1)
self.__cleanup_data(summary)
summary['total'] = summary.apply(
lambda row: row['added'] + row['deleted'] + row['modified'] + row['same'], axis=1)
return summary, result

def main(_):
'''Runs the code.'''
differ = DatasetDiffer(
FLAGS.groupby_columns, FLAGS.value_columns)

if not os.path.exists(FLAGS.output_location):
os.makedirs(FLAGS.output_location)
logging.info('Loading data...')
previous_df = helper.load_data(FLAGS.current_data, FLAGS.output_location)
current_df = helper.load_data(FLAGS.previous_data, FLAGS.output_location)

logging.info('Processing data...')
in_data = differ.process_data(previous_df, current_df)

logging.info('Point analysis:')
summary, result = differ.point_analysis(in_data)
result.sort_values(by=[differ.diff_column, differ.variable_column], inplace=True)
print(summary.head(10))
print(result.head(10))
helper.write_data(summary, FLAGS.output_location, 'point-analysis-summary.csv')
helper.write_data(result, FLAGS.output_location, 'point-analysis-results.csv')

logging.info('Series analysis:')
summary, result = differ.series_analysis(in_data)
result.sort_values(by=[differ.diff_column, differ.variable_column], inplace=True)
print(summary.head(10))
print(result.head(10))
helper.write_data(summary, FLAGS.output_location, 'series-analysis-summary.csv')
helper.write_data(result, FLAGS.output_location, 'series-analysis-results.csv')

logging.info('Differ output written to %s', FLAGS.output_location)


if __name__ == '__main__':
app.run(main)
53 changes: 53 additions & 0 deletions tools/differ/differ_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import unittest
import pandas as pd
from pandas.testing import assert_frame_equal
from differ import DatasetDiffer
import helper

module_dir = os.path.dirname(__file__)


class TestDiffer(unittest.TestCase):
"""
Test Class to compare expected output in test/ directory to the
output generated by DatasetDiffer class
"""

def test_diff_analysis(self):
groupby_columns = "variableMeasured,observationAbout,observationDate"
value_columns = "value"

differ = DatasetDiffer(groupby_columns, value_columns)
current = helper.load_mcf_file(
os.path.join(module_dir, "test", "current.mcf"))
previous = helper.load_mcf_file(
os.path.join(module_dir, "test", "previous.mcf"))

in_data = differ.process_data(previous, current)
summary, result = differ.point_analysis(in_data)
result = pd.read_csv(os.path.join(module_dir, "test", "result1.csv"))
assert_frame_equal(summary, result)

summary, result = differ.series_analysis(in_data)
result = pd.read_csv(os.path.join(module_dir, "test", "result2.csv"))
assert_frame_equal(summary, result)



if __name__ == "__main__":
unittest.main()
69 changes: 69 additions & 0 deletions tools/differ/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@

import pandas as pd
import re
import glob
import os
from google.cloud.storage import Client

# Reads an MCF text file as a dataframe.
def load_mcf_file(file: str) -> pd.DataFrame:
mcf_file = open(file, "r", encoding="utf-8")
mcf_contents = mcf_file.read()
mcf_file.close()
# nodes separated by a blank line
mcf_nodes_text = mcf_contents.split("\n\n")
# lines seprated as property: constraint
mcf_line = re.compile(r"^(\w+): (.*)$")
mcf_nodes = []
for node in mcf_nodes_text:
current_mcf_node = {}
for line in node.split("\n"):
parsed_line = mcf_line.match(line)
if parsed_line is not None:
current_mcf_node[parsed_line.group(1)] = parsed_line.group(2)
if current_mcf_node and current_mcf_node["typeOf"] == "dcid:StatVarObservation":
mcf_nodes.append(current_mcf_node)
df = pd.DataFrame(mcf_nodes)
return df

def load_mcf_files(path: str) -> pd.DataFrame:
"""Load all sharded files in the given directory and combine into a
single dataframe."""
df = pd.DataFrame()
filenames = glob.glob(path + ".mcf")
for filename in filenames:
df2 = load_mcf_file(filename)
# Merge data frames, expects same headers
df = pd.concat([df, df2])
return df

def write_data(df: pd.DataFrame, path: str, file: str):
out_file = open(os.path.join(path, file),
mode="w", encoding="utf-8")
df.to_csv(out_file, index=False, mode="w")
out_file.close()

def load_data(path: str, tmp_dir: str ) -> pd.DataFrame:
if path.startswith("gs://"):
path = get_gcs_data(path, tmp_dir)

if path.endswith("*"):
return load_mcf_files(path)
else:
return load_mcf_file(path)

def get_gcs_data(uri: str, tmp_dir: str) -> str:
client = Client()
bucket = client.get_bucket(uri.split("/")[2])
if uri.endswith("*"):
blobs = client.list_blobs(bucket)
for blob in blobs:
path = os.path.join(os.getcwd(), tmp_dir, blob.name)
blob.download_to_filename(path)
return os.path.join(os.getcwd(), tmp_dir, "*")
else:
file_name = uri.split("/")[3]
blob = bucket.get_blob(file_name)
path = os.path.join(os.getcwd(), tmp_dir, file_name)
blob.download_to_filename(path)
return path
35 changes: 35 additions & 0 deletions tools/differ/test/current.mcf
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
Node: cpcb_air_quality/E17/944d9e6d-ec38-4e61-175a-9bbabfd35f97
observationDate: "2024-09-24T12:00:00"
unit: dcid:MicrogramsPerCubicMeter
observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB
variableMeasured: dcid:Max_Concentration_AirPollutant_Ozone
value: 53.0
typeOf: dcid:StatVarObservation
dcid: "dc/o/bhdp3vy7dee0d"

Node: cpcb_air_quality/E18/944d9e6d-ec38-4e61-175a-9bbabfd35f97
observationDate: "2024-09-24T12:00:00"
unit: dcid:MicrogramsPerCubicMeter
observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB
variableMeasured: dcid:Mean_Concentration_AirPollutant_Ozone
value: 28.0
typeOf: dcid:StatVarObservation
dcid: "dc/o/8e11gqvkt183b"

Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97
observationDate: "2024-09-24T12:00:00"
unit: dcid:MicrogramsPerCubicMeter
observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD
variableMeasured: dcid:Mean_Concentration_AirPollutant_CO
value: 42.0
typeOf: dcid:StatVarObservation
dcid: "dc/o/h1sjhdxycwwmc"

Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97
observationDate: "2024-09-25T12:00:00"
unit: dcid:MicrogramsPerCubicMeter
observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD
variableMeasured: dcid:Mean_Concentration_AirPollutant_CO
value: 40.0
typeOf: dcid:StatVarObservation
dcid: "dc/o/h1sjhdxycwwmc"
Loading

0 comments on commit f440497

Please sign in to comment.