diff --git a/src/actinia_core/core/utils.py b/src/actinia_core/core/utils.py index 9d9f80a8a..a62ae660e 100644 --- a/src/actinia_core/core/utils.py +++ b/src/actinia_core/core/utils.py @@ -28,6 +28,7 @@ import os from actinia_core.core.common.process_object import Process from actinia_core.core.common.exceptions import SecurityError +from actinia_core.core.common.config import global_config __license__ = "GPLv3" __author__ = "Sören Gebbert, Anika Weinmann" @@ -37,24 +38,90 @@ __maintainer__ = "mundialis" -def os_path_normpath(path_parts): - """The function returns a joined norm path or raises an error if it is not +def ensure_valid_path(path_parts, intent = "r"): + """ + The function returns a joined and normalized path or raises an error if it is not as expected. Args: - path_parts (list of strings): The parts of the path in a list + - path_parts (list of strings): The parts of the path in a list + - intent (string): for what you want the path to be ensured, can be "r" (read), + "w" (write) or "rw" (read and write). Default ist to check for read access. Returns: path: joined normalized path Raise: - raises a SecurityError if the path is not as expected + - raises a TypeError if the intent is not 'r', 'w' or 'rw' + - raises a SecurityError if the path is not valid for the given intent """ + allowed_intents = ["r", "w", "rw"] + if intent not in allowed_intents: + raise TypeError(f"Intent '{intent}' not allowed, allowed are 'r', 'w' and 'rw'") + path = os.path.normpath(os.path.join(*path_parts)) + error_message = f"Path '{path}' with intent '{intent}' is insecure." + if not path.startswith(path_parts[0]): - raise SecurityError(f"Used path '{path}' is insecure") + raise SecurityError(f"{error_message} Too deep path traversal?") + if intent == "r" and not path_is_in_allowed_read_paths(path): + raise SecurityError(error_message) + if intent == "w" and not path_is_in_allowed_write_paths(path): + raise SecurityError(error_message) + if intent == "rw" and not path_is_in_allowed_read_and_write_paths(path): + raise SecurityError(error_message) + return path +# should these two variables be moved to global_config? +allowed_read_starts = [ + global_config.GRASS_USER_DATABASE, + global_config.GRASS_DATABASE, + global_config.GRASS_RESOURCE_DIR +] +allowed_write_starts = [ + global_config.GRASS_USER_DATABASE, + global_config.GRASS_RESOURCE_DIR +] + +def path_is_in_allowed_read_and_write_paths(path): + """Checks whether the passed path is a valid read and write path, uses the + global_config variables `GRASS_USER_DATABASE` and `GRASS_RESOURCE_DIR` + for the check. + + Args: + path (string): a normalized path + + Returns: + (bool) Whether the passed path is a valid write path + """ + return path_is_in_allowed_read_paths(path) and path_is_in_allowed_write_paths(path) + +def path_is_in_allowed_read_paths(path): + """Checks whether the passed path is a valid read path, uses the global_config + variables `GRASS_USER_DATABASE`, `GRASS_DATABASE` and `GRASS_RESOURCE_DIR` + for the check. + + Args: + path (string): a normalized path + + Returns: + (bool) Whether the passed path is a valid write path + """ + return any(path.startswith(allowed) for allowed in allowed_read_starts) + +def path_is_in_allowed_write_paths(path): + """Checks whether the passed path is a valid write path, uses the global_config + variables `GRASS_USER_DATABASE` and `GRASS_RESOURCE_DIR` for the check. + + Args: + path (string): a normalized path + + Returns: + (bool) Whether the passed path is a valid write path + """ + return any(path.startswith(allowed) for allowed in allowed_write_starts) + def get_wget_process(source, url): """The function returns a wget Process for the given source and url diff --git a/src/actinia_core/rest/location_management.py b/src/actinia_core/rest/location_management.py index 962799480..56bd9c633 100644 --- a/src/actinia_core/rest/location_management.py +++ b/src/actinia_core/rest/location_management.py @@ -48,7 +48,7 @@ from actinia_core.models.response_models import SimpleResponseModel from actinia_core.rest.base.resource_base import ResourceBase from actinia_core.core.common.redis_interface import enqueue_job -from actinia_core.core.utils import os_path_normpath +from actinia_core.core.utils import ensure_valid_path from actinia_core.processing.common.location_management import ( read_current_region, create_location, @@ -199,11 +199,11 @@ def delete(self, location_name): database. """ # Delete only locations from the user database - location = os_path_normpath( + location = ensure_valid_path( [self.grass_user_data_base, self.user_group, location_name] ) - permanent_mapset = os_path_normpath([location, "PERMANENT"]) - wind_file = os_path_normpath([permanent_mapset, "WIND"]) + permanent_mapset = ensure_valid_path([location, "PERMANENT"]) + wind_file = ensure_valid_path([permanent_mapset, "WIND"]) # Check the location path, only "valid" locations can be deleted if os.path.isdir(location): if os.path.isdir(permanent_mapset) and os.path.isfile(wind_file): @@ -246,7 +246,7 @@ def post(self, location_name): """Create a new location based on EPSG code in the user database.""" # Create only new locations if they did not exist in the global # database - location = os_path_normpath([self.grass_data_base, location_name]) + location = ensure_valid_path([self.grass_data_base, location_name]) # Check the location path if os.path.isdir(location): @@ -256,7 +256,7 @@ def post(self, location_name): ) # Check also for the user database - location = os_path_normpath( + location = ensure_valid_path( [self.grass_user_data_base, self.user_group, location_name] ) # Check the location path diff --git a/src/actinia_core/rest/resource_streamer.py b/src/actinia_core/rest/resource_streamer.py index d65bb8689..32910e63d 100644 --- a/src/actinia_core/rest/resource_streamer.py +++ b/src/actinia_core/rest/resource_streamer.py @@ -31,7 +31,7 @@ from actinia_core.core.common.config import global_config from actinia_core.core.common.app import auth from actinia_core.core.common.api_logger import log_api_call -from actinia_core.core.utils import os_path_normpath +from actinia_core.core.utils import ensure_valid_path __license__ = "GPLv3" @@ -87,12 +87,12 @@ def get(self, user_id, resource_id, file_name): """ resource_dir = global_config.GRASS_RESOURCE_DIR - user_export_path = os_path_normpath([resource_dir, user_id]) - resource_export_path = os_path_normpath( - [user_export_path, resource_id] + user_export_path = ensure_valid_path([resource_dir, user_id], "w") + resource_export_path = ensure_valid_path( + [user_export_path, resource_id], "w" ) - resource_export_file_path = os_path_normpath( - [resource_export_path, file_name] + resource_export_file_path = ensure_valid_path( + [resource_export_path, file_name], "w" ) if (