Skip to content

Commit

Permalink
#197 test_merge_sessions.py check dates,satellites
Browse files Browse the repository at this point in the history
  • Loading branch information
2320sharon committed Dec 19, 2023
1 parent ea75e80 commit 08dd8d6
Showing 1 changed file with 150 additions and 96 deletions.
246 changes: 150 additions & 96 deletions scripts/test_merge_sessions.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,138 @@
import os
import pandas as pd
from coastseg import merge_utils, file_utilities
from coastseg.common import (
convert_linestrings_to_multipoints,
stringify_datetime_columns,
get_cross_distance_df,
)
from functools import reduce
import geopandas as gpd
from coastsat import SDS_transects
import numpy as np
from merge_sessions import main, parse_arguments
import pytest
import sys
import argparse
import os
import shutil

TEST_DATA_LOCATION = r"C:\development\doodleverse\coastseg\CoastSeg\test_data"
SAVE_LOCATION = (
r"C:\development\doodleverse\coastseg\CoastSeg\test_data\merged_sessions"
)

# helper functions
# --------------
def get_unique_dates_from_geojson_files(gdfs:list[gpd.GeoDataFrame]):
unique_dates = set()
for gdf in gdfs:
unique_dates.update(gdf.date)
return unique_dates

def check_geojson_files(gdf1, gdf2, columns: list[str]):
"""
Check if the specified columns exist in both GeoDataFrame objects and if their values match.
Args:
gdf1 (GeoDataFrame): The first GeoDataFrame object.
gdf2 (GeoDataFrame): The second GeoDataFrame object.
columns (list[str]): A list of column names to check.
Raises:
AssertionError: If any of the specified columns are missing in either GeoDataFrame or if their values do not match.
"""
if isinstance(columns, str):
columns = [columns]
for column in columns:
assert (column in gdf1.columns) and (column in gdf2.columns), f"{column} column missing in one of the files"
assert set(gdf1[column]) == set(gdf2[column]), f"{column} do not match"

def assert_all_files_exist(dest:str):
"""
Check if all the required files exist in the specified destination directory.
Parameters:
dest (str): The destination directory path.
Raises:
AssertionError: If any of the required files does not exist.
"""
assert os.path.exists(dest)
assert os.path.exists(os.path.join(dest, "extracted_shorelines_dict.json"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_lines.geojson"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_points.geojson"))
assert os.path.exists(os.path.join(dest, "merged_config.geojson"))
assert os.path.exists(os.path.join(dest, "transect_time_series.csv"))

def verify_merged_session(dest:str):
"""
Verify the consistency of a merged session.
Args:
dest (str): The destination directory of the merged session.
Raises:
AssertionError: If any of the verification checks fail.
Returns:
None
"""
# 1. read in extracted_shorelines_points.geojson from merged session
shoreline_points_gdf = merge_utils.read_first_geojson_file(dest, "extracted_shorelines_points.geojson")
# 2. read in extracted_shorelines_lines.geojson from merged session
shoreline_lines_gdf = merge_utils.read_first_geojson_file(dest, "extracted_shorelines_lines.geojson")
# 3.verify that the 'date' and 'satname' columns are present and consistent in the geojson files
check_geojson_files(shoreline_points_gdf, shoreline_lines_gdf, ['date', 'satname'])
# 4. read in the extracted_shoreline_dict.json from merged session
extracted_shorelines_dict=file_utilities.load_data_from_json(os.path.join(dest, "extracted_shorelines_dict.json"))
# 5. Check if all the dates & satellites in the geojson files are present in the dictionary
columns = ['dates', 'satname']
for column in columns:
if column == 'dates':
assert shoreline_points_gdf['date'].isin(extracted_shorelines_dict.get(column)).all()
else:
assert shoreline_points_gdf[column].isin(extracted_shorelines_dict.get(column)).all()
# 6. Read in the transects_time_series.csv from merged session
transect_time_series= pd.read_csv(os.path.join(dest, "transect_time_series.csv"))
# 8. Check if all the dates in the geojson files are present in the csv file
assert shoreline_points_gdf['date'].isin(transect_time_series['dates']).all()
# 9. Check if the length of dates in the dictionary is the same as the number of dates in the csv file
assert len(extracted_shorelines_dict.get('dates')) == len(transect_time_series['dates'])
# 10. Check if the length of all the values in the dictionary is the same as the number of dates in the csv file
for key in extracted_shorelines_dict.keys():
assert len(extracted_shorelines_dict.get(key)) == len(transect_time_series['dates'])
def validate_dates(session_locations:list[str], dest:str):
"""
Validates that the dates before and after merging shoreline geojson files are the same.
Args:
session_locations (list[str]): List of file paths to the original sessions.
dest (str): File path to the new merged session.
Raises:
AssertionError: If the dates before and after merging are not the same.
"""
# get the dates from the shoreline geojson files located in the original sessions
gdfs = merge_utils.process_geojson_files(
session_locations,
["extracted_shorelines_points.geojson", "extracted_shorelines.geojson"],
merge_utils.convert_lines_to_multipoints,
merge_utils.read_first_geojson_file,
)
# get all the dates before merging
unique_dates = get_unique_dates_from_geojson_files(gdfs)
# apply the merge and average functions that's applied in merge_sessions.py
merged_shorelines = reduce(merge_utils.merge_and_average, gdfs)
merged_dates = set(merged_shorelines["date"])
# check that the dates are the same before and after merging
assert unique_dates == merged_dates

# read the merged geojson file and check if all the dates are present
# get the dates from the shoreline geojson files located in the new merged session
merged_gdfs = merge_utils.process_geojson_files(
dest,
["extracted_shorelines_points.geojson", "extracted_shorelines.geojson"],
merge_utils.convert_lines_to_multipoints,
merge_utils.read_first_geojson_file,
)
# get all the unique dates after merging
result_unique_dates = get_unique_dates_from_geojson_files(merged_gdfs)
# check that the dates are the same before and after merging
assert unique_dates == result_unique_dates
assert merged_dates == result_unique_dates

def clear_directory(directory):
"""
Expand All @@ -38,6 +150,7 @@ def clear_directory(directory):
except Exception as e:
print(f"Failed to delete {file_path}. Reason: {e}")

# ----------------------------

def test_with_all_arguments(monkeypatch):
# Test case 1: Required arguments provided
Expand Down Expand Up @@ -100,6 +213,7 @@ def test_main_with_overlapping():
raise Exception("Test data not found. Please download the test data")

merged_session_name = "merged_session"
merged_session_name = "merged_session_test_case4"
dest = os.path.join(SAVE_LOCATION, merged_session_name)
if os.path.exists(dest):
clear_directory(dest)
Expand All @@ -118,28 +232,12 @@ def test_main_with_overlapping():
)

main(mock_args)
assert os.path.exists(dest)
assert os.path.exists(os.path.join(dest, "extracted_shorelines_dict.json"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_lines.geojson"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_points.geojson"))
assert os.path.exists(os.path.join(dest, "merged_config.geojson"))
assert os.path.exists(os.path.join(dest, "transect_time_series.csv"))

# read all the shoreline geojson files to get the dates
gdfs = merge_utils.process_geojson_files(
session_locations,
["extracted_shorelines_points.geojson", "extracted_shorelines.geojson"],
merge_utils.convert_lines_to_multipoints,
merge_utils.read_first_geojson_file,
)
# get all the dates before merging
total_set = set()
for gdf in gdfs:
total_set.update(gdf.date)
# get the merged shorelines
merged_shorelines = reduce(merge_utils.merge_and_average, gdfs)
merged_set = set(merged_shorelines["date"])
assert total_set == merged_set
# after the merge sessions script is run, check that the merged session location exists
assert_all_files_exist(dest)
# check that the dates before and after merging are the same
validate_dates(session_locations, dest)
# check that the dates before and after merging are the same
verify_merged_session(dest)


def test_main_with_same_rois():
Expand All @@ -152,6 +250,7 @@ def test_main_with_same_rois():
raise Exception("Test data not found. Please download the test data")

merged_session_name = "merged_session"
merged_session_name = "merged_session_test_case1"
dest = os.path.join(SAVE_LOCATION, merged_session_name)
if os.path.exists(dest):
clear_directory(dest)
Expand All @@ -170,28 +269,12 @@ def test_main_with_same_rois():
)

main(mock_args)
assert os.path.exists(dest)
assert os.path.exists(os.path.join(dest, "extracted_shorelines_dict.json"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_lines.geojson"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_points.geojson"))
assert os.path.exists(os.path.join(dest, "merged_config.geojson"))
assert os.path.exists(os.path.join(dest, "transect_time_series.csv"))

# read all the shoreline geojson files to get the dates
gdfs = merge_utils.process_geojson_files(
session_locations,
["extracted_shorelines_points.geojson", "extracted_shorelines.geojson"],
merge_utils.convert_lines_to_multipoints,
merge_utils.read_first_geojson_file,
)
# get all the dates before merging
total_set = set()
for gdf in gdfs:
total_set.update(gdf.date)
# get the merged shorelines
merged_shorelines = reduce(merge_utils.merge_and_average, gdfs)
merged_set = set(merged_shorelines["date"])
assert total_set == merged_set
# after the merge sessions script is run, check that the merged session location exists
assert_all_files_exist(dest)
# check that the dates before and after merging are the same
validate_dates(session_locations, dest)
# check that the dates before and after merging are the same
verify_merged_session(dest)


def test_main_with_different_rois():
Expand All @@ -204,6 +287,7 @@ def test_main_with_different_rois():
raise Exception("Test data not found. Please download the test data")

merged_session_name = "merged_session"
merged_session_name = "merged_session_test_case2"
dest = os.path.join(SAVE_LOCATION, merged_session_name)
if os.path.exists(dest):
clear_directory(dest)
Expand All @@ -222,28 +306,13 @@ def test_main_with_different_rois():
)

main(mock_args)
assert os.path.exists(dest)
assert os.path.exists(os.path.join(dest, "extracted_shorelines_dict.json"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_lines.geojson"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_points.geojson"))
assert os.path.exists(os.path.join(dest, "merged_config.geojson"))
assert os.path.exists(os.path.join(dest, "transect_time_series.csv"))
# after the merge sessions script is run, check that the merged session location exists
assert_all_files_exist(dest)
# check that the dates before and after merging are the same
validate_dates(session_locations, dest)
# check that the dates before and after merging are the same
verify_merged_session(dest)

# read all the shoreline geojson files to get the dates
gdfs = merge_utils.process_geojson_files(
session_locations,
["extracted_shorelines_points.geojson", "extracted_shorelines.geojson"],
merge_utils.convert_lines_to_multipoints,
merge_utils.read_first_geojson_file,
)
# get all the dates before merging
total_set = set()
for gdf in gdfs:
total_set.update(gdf.date)
# get the merged shorelines
merged_shorelines = reduce(merge_utils.merge_and_average, gdfs)
merged_set = set(merged_shorelines["date"])
assert total_set == merged_set


def test_main_with_overlapping_dates():
Expand All @@ -256,6 +325,7 @@ def test_main_with_overlapping_dates():
raise Exception("Test data not found. Please download the test data")

merged_session_name = "merged_session"
merged_session_name = "merged_session_test_case3"
dest = os.path.join(SAVE_LOCATION, merged_session_name)
if os.path.exists(dest):
clear_directory(dest)
Expand All @@ -272,27 +342,11 @@ def test_main_with_overlapping_dates():
multiple_inter="auto",
prc_multiple=0.1,
)

# call the main function in the merge_sessions.py script
main(mock_args)
assert os.path.exists(dest)
assert os.path.exists(os.path.join(dest, "extracted_shorelines_dict.json"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_lines.geojson"))
assert os.path.exists(os.path.join(dest, "extracted_shorelines_points.geojson"))
assert os.path.exists(os.path.join(dest, "merged_config.geojson"))
assert os.path.exists(os.path.join(dest, "transect_time_series.csv"))

# read all the shoreline geojson files to get the dates
gdfs = merge_utils.process_geojson_files(
session_locations,
["extracted_shorelines_points.geojson", "extracted_shorelines.geojson"],
merge_utils.convert_lines_to_multipoints,
merge_utils.read_first_geojson_file,
)
# get all the dates before merging
total_set = set()
for gdf in gdfs:
total_set.update(gdf.date)
# get the merged shorelines
merged_shorelines = reduce(merge_utils.merge_and_average, gdfs)
merged_set = set(merged_shorelines["date"])
assert total_set == merged_set
# after the merge sessions script is run, check that the merged session location exists
assert_all_files_exist(dest)
# check that the dates before and after merging are the same
validate_dates(session_locations, dest)
# check that the dates before and after merging are the same
verify_merged_session(dest)

0 comments on commit 08dd8d6

Please sign in to comment.