Skip to content

Commit

Permalink
#198 mvp for removing shorelines from files
Browse files Browse the repository at this point in the history
  • Loading branch information
2320sharon committed Oct 25, 2023
1 parent 81aae9a commit 16563cd
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 5 deletions.
81 changes: 79 additions & 2 deletions src/coastseg/coastseg_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,74 @@ def link_roi_list(self, widget):
traitlets.dlink((self, "roi_ids_list"), (widget, "options"))


def find_shorelines_directory(path, roi_id):
# List the contents of the specified path
contents = os.listdir(path)
print("Contents of the path:", contents)

# Check for extracted shorelines geojson file in the specified path
extracted_shorelines_file = [
file
for file in contents
if "extracted_shorelines" in file and file.endswith(".geojson")
]
if extracted_shorelines_file:
return path

# If the file is not found, check for a directory with the ROI ID
roi_directory = [
directory
for directory in contents
if os.path.isdir(os.path.join(path, directory)) and roi_id in directory
]
if roi_directory:
roi_path = os.path.join(path, roi_directory[0])
roi_contents = os.listdir(roi_path)
extracted_shorelines_file = [
file
for file in roi_contents
if "extracted_shorelines" in file and file.endswith(".geojson")
]
if extracted_shorelines_file:
return roi_path

return None


def delete_extracted_shorelines_files(session_path, selected_items):
# delete the extracted shorelines from both geojson files
geodata_processing.edit_geojson_files(
session_path,
[
"extracted_shorelines_lines.geojson",
"extracted_shorelines_points.geojson",
],
selected_items,
common.remove_rows,
)
# delete the extracted shorelines from the extracted_shorelines_dict.json file
filename = "extracted_shorelines_dict.json"
json_file = os.path.join(session_path, filename)
data = file_utilities.load_data_from_json(json_file)
new_dict = common.update_selected_shorelines_dict(data, selected_items)
file_utilities.to_file(new_dict, json_file)
# delete the extracted shorelines from the transect_time_series.csv files
filenames = [
"transect_time_series.csv",
"transect_time_series_tidally_corrected.csv",
]
filepaths = [os.path.join(session_path, filename) for filename in filenames]
dates_list, sat_list = common.extract_dates_and_sats(selected_items)
common.update_transect_time_series(filepaths, dates_list)
# delete the selected shorelines from all the individual csv files
file_patterns = ["_timeseries_tidally_corrected", "_timeseries_raw.csv"]
for file_pattern in file_patterns:
common.drop_rows_from_csv(file_pattern, session_path, dates_list)
# delete the extracted shorelines from the jpg deterction files
jpg_path = os.path.join(session_path, "jpg_files", "detection")
common.delete_jpg_files(dates_list, sat_list, jpg_path)


class CoastSeg_Map:
def __init__(self, **kwargs):
# Basic settings and configurations
Expand Down Expand Up @@ -166,7 +234,7 @@ def load_extracted_shoreline_layer(self, gdf, layer_name, colormap):
min_date = projected_gdf["date"].min()
max_date = projected_gdf["date"].max()
if min_date == max_date:
# If there's only one date, set delta to 0.25
# If there's only one date, set delta to 0.25
delta = np.array([0.25])
else:
delta = (projected_gdf["date"] - min_date) / (max_date - min_date)
Expand Down Expand Up @@ -219,7 +287,16 @@ def delete_selected_shorelines(
self, layer_name: str, selected_id: str, selected_shorelines: List = None
) -> None:
if selected_shorelines and selected_id:
pass
session_name = self.get_session_name()
session_path = file_utilities.get_session_location(
session_name=session_name, raise_error=True
)
# get the path to the session directory that contains the extracted shoreline files
session_path = find_shorelines_directory(session_path, selected_id)
if os.path.exists(session_path) and os.path.isdir(session_path):
delete_extracted_shorelines_files(
session_path, list(selected_shorelines)
)
# this will remove the selected shorelines from the files
# do some fancy logic to remove the selected shorelines from the files
print(f"Deleting {selected_shorelines} ")
Expand Down
222 changes: 219 additions & 3 deletions src/coastseg/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
import logging
import random
import string
from datetime import datetime

from datetime import datetime, timezone

# Third-party imports
import requests
Expand All @@ -27,7 +26,7 @@
from shapely.geometry import MultiPoint, LineString

# Specific classes/functions from modules
from typing import Callable, List, Optional, Union, Dict, Set
from typing import Callable, List, Optional, Union, Dict, Set, Any, Tuple

# Internal dependencies imports
from coastseg import exceptions
Expand All @@ -41,6 +40,171 @@
logger = logging.getLogger(__name__)


def remove_rows(selected_items, gdf):
if "dates" not in gdf.columns and "satname" not in gdf.columns:
return gdf
# Loop through each dictionary in dates_tuple
for criteria in list(selected_items):
satname, dates = criteria.split("_")
# Convert the dates string to a Timestamp object
# dates_obj = Timestamp(datetime.strptime(dates, "%Y-%m-%d-%H-%M-%S"))
dates_obj = datetime.strptime(dates, "%Y-%m-%d %H:%M:%S")
# Use boolean indexing to select the rows that match the criteria
mask = (gdf["date"] == dates_obj) & (gdf["satname"] == satname)
# Drop the rows that match the criteria
gdf = gdf.drop(gdf[mask].index)
# Return the new GeoDataFrame
gdf["date"] = gdf["date"].dt.strftime("%Y-%m-%d %H:%M:%S")
return gdf


def get_selected_indexes(
data_dict: Dict[str, Union[List[Any], pd.Series]],
dates_list: List[Union[str, pd.Timestamp]],
sat_list: List[str],
) -> List[int]:
"""
Retrieve indexes of rows in a dictionary that match specified dates and satellite names.
This function accepts a dictionary containing at least two keys: 'dates' and 'satname'.
It then returns a list of indexes where the dates and satellite names match those provided in
the dates_list and sat_list respectively.
Parameters:
- data_dict (Dict[str, Union[List[Any], pd.Series]]): The dictionary containing data arrays.
Expected keys are 'dates' and 'satname'.
If the keys are absent, they will be set with empty lists.
- dates_list (List[Union[str, pd.Timestamp]]): A list containing dates to match against.
- sat_list (List[str]): A list containing satellite names to match against.
Returns:
- List[int]: A list of integer indexes where the 'dates' and 'satname' in the data_dict
match the provided lists. Returns an empty list if no matches are found or if the data_dict is empty.
Examples:
>>> data = {'dates': ['2021-01-01', '2021-01-02'], 'satname': ['sat1', 'sat2']}
>>> get_selected_indexes(data, ['2021-01-01'], ['sat1'])
[0]
"""
if not data_dict:
return []
data_dict.setdefault("dates", [])
data_dict.setdefault("satname", [])
# Convert dictionary to DataFrame
df = pd.DataFrame(data_dict)

# Initialize an empty list to store selected indexes
selected_indexes = []

# Iterate over dates and satellite names, and get the index of the first matching row
for date, sat in zip(dates_list, sat_list):
match = df[(df["dates"] == date) & (df["satname"] == sat)]
if not match.empty:
selected_indexes.append(match.index[0])

return selected_indexes

def update_transect_time_series(filepaths: str, dates_list) -> None:
for filepath in filepaths:
# Read the CSV file into a DataFrame
df = pd.read_csv(filepath)

# Format the dates to match the format in the CSV file
formatted_dates = [date.strftime('%Y-%m-%d %H:%M:%S+00:00') for date in dates_list]
# Keep only the rows where the 'dates' column isn't in the list of formatted dates
df = df[~df['dates'].isin(formatted_dates)]
# Write the updated DataFrame to the same CSV file
df.to_csv(filepath, index=False)

def extract_dates_and_sats(
selected_items: List[str],
) -> Tuple[List[datetime], List[str]]:
"""
Extract the dates and satellite names from a list of selected items.
Args:
selected_items: A list of strings, where each string is in the format "satname_dates".
Returns:
A tuple of two lists: the first list contains datetime objects corresponding to the dates in the selected items,
and the second list contains the satellite names in the selected items.
"""
dates_list = []
sat_list = []
for criteria in selected_items:
satname, dates = criteria.split("_")
sat_list.append(satname)
dates_list.append(
datetime.strptime(dates, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
)
return dates_list, sat_list


def transform_data_to_nested_arrays(
data_dict: Dict[str, Union[List[Union[int, float, np.ndarray]], np.ndarray]]
) -> Dict[str, np.ndarray]:
"""
Convert a dictionary of data to a new dictionary with nested NumPy arrays.
Args:
data_dict: A dictionary of data, where each value is either a list of integers, floats, or NumPy arrays, or a NumPy array.
Returns:
A new dictionary with the same keys as `data_dict`, where each value is a NumPy array or a nested NumPy array.
Raises:
TypeError: If `data_dict` is not a dictionary, or if any value in `data_dict` is not a list or NumPy array.
"""
transformed_dict = {}
for key, items in data_dict.items():
if any(isinstance(element, np.ndarray) for element in items):
nested_array = np.empty(len(items), dtype=object)
for index, array_element in enumerate(items):
nested_array[index] = array_element
transformed_dict[key] = nested_array
else:
transformed_dict[key] = np.array(items)
return transformed_dict


def update_selected_shorelines_dict(
data_input: Union[Dict[str, Any], str],
selected_items: List[Tuple[str, str]],
session_path: str = "",
filename: str = "",
) -> None:
# Determine if data_input is a dictionary or a file path
if isinstance(data_input, dict):
data = data_input
elif isinstance(data_input, str):
if not session_path or not filename:
raise ValueError(
"If data_input is a file path, session_path and filename must be provided."
)
# Define the full path to the JSON file
json_file = os.path.join(session_path, filename)
# Load data from the JSON file
data = file_utilities.load_data_from_json(json_file)
else:
raise TypeError("data_input must be either a dictionary or a string file path.")

# Transform data to nested arrays
new_dict = transform_data_to_nested_arrays(data)

# Extract dates and satellite names from the selected items
dates_list, sat_list = extract_dates_and_sats(selected_items)

# Get the indexes of the selected items in the new_dict
selected_indexes = get_selected_indexes(new_dict, dates_list, sat_list)

# Delete the selected indexes from the new_dict
if selected_indexes:
for key in new_dict.keys():
new_dict[key] = np.delete(new_dict[key], selected_indexes)

return new_dict


def create_new_config(roi_ids: list, settings: dict, roi_settings: dict) -> dict:
"""
Creates a new configuration dictionary by combining the given settings and ROI settings.
Expand Down Expand Up @@ -162,6 +326,58 @@ def filter_images_by_roi(roi_settings: list[dict]):
logger.info(f"Partial images filtered out: {bad_images}")


def drop_rows_from_csv(file_pattern, session_path, dates_list):
"""
Drop rows containing formatted dates from CSV files that match a specified pattern in a directory.
Args:
file_pattern: A string representing the file pattern to match, e.g. '*_timeseries_raw.csv'.
session_path: A string representing the directory path where the CSV files are located.
formatted_dates: A list of formatted dates to drop, e.g. ['2018-12-01 19:03:46+00:00', '2018-12-02 20:04:47+00:00'].
"""
# Get a list of files that match the file pattern in the directory
# Compile the regex pattern
# Filter files based on the regex pattern
matched_files = glob.glob(session_path + f"/*{file_pattern}*")
formatted_dates = [date.strftime("%Y-%m-%d %H:%M:%S+00:00") for date in dates_list]
# Loop through each file in the matched files list
for file in matched_files:
# Read the CSV file into a DataFrame
df = pd.read_csv(file)

# Convert the 'dates' column to datetime objects
df["dates"] = pd.to_datetime(df["dates"])

# Drop rows where the 'dates' column is in the list of formatted dates
df = df[~df["dates"].isin(formatted_dates)]

# Write the updated DataFrame to the same CSV file
df.to_csv(file, index=False)


def delete_jpg_files(dates_list, sat_list, jpg_path):
# Format the dates in dates_list as strings
formatted_dates = [date.strftime("%Y-%m-%d-%H-%M-%S") for date in dates_list]

# Get a list of all JPEG files in jpg_path
jpg_files = set(os.listdir(jpg_path))

# Create a list of filenames to delete
delete_list = [
date + "_" + sat + ".jpg"
for date, sat in zip(formatted_dates, sat_list)
if (date + "_" + sat + ".jpg") in jpg_files
]

# Loop through each filename in the delete list
for filename in delete_list:
# Construct the full file path by joining the directory path with the filename
file_path = os.path.join(jpg_path, filename)
if os.path.exists(file_path):
# Use the os.remove function to delete the file
os.remove(file_path)


def filter_partial_images(
roi_gdf: gpd.geodataframe,
directory: str,
Expand Down
13 changes: 13 additions & 0 deletions src/coastseg/geodata_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@
logger = logging.getLogger(__name__)


def edit_geojson_files(location, filenames, selected_items, filter_function):
for filename in filenames:
filepath = os.path.join(location, filename)
if os.path.exists(filepath):
edit_gdf_file(filepath, selected_items, filter_function)


def edit_gdf_file(filepath, selected_items, filter_function):
gdf = read_gpd_file(filepath)
new_gdf = filter_function(selected_items, gdf)
new_gdf.to_file(filepath, driver="GeoJSON")


def create_geofeature_geodataframe(
geofeature_path: str, roi_gdf: gpd.GeoDataFrame, epsg_code: str, feature_type: str
) -> gpd.GeoDataFrame:
Expand Down

0 comments on commit 16563cd

Please sign in to comment.