Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve interim results by include additional mapsets #399

Merged
merged 3 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions job_resumption.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ The interim results will be deleted automatically if a job resource is successfu
- Startup actinia with above config in preferred way, e.g.
`cd ~/repos/actinia` + press F5

## Additional mapsets
For parallelization on different regions some GRASS GIS processes might create
additional mapsets and use the data from these mapsets in further calculations
without copying them to the temporary mapsets. To add the possibility to also
resumpt jobs where such addional mapsets are created in a previous step you can
configure additional mapsets which should be included in the interim results
by setting a pattern for the mapset name, e.g.:
```
[MISC]
save_interim_results = onError
save_interim_results_endpoints_cfg = /etc/default/actinia_interim_endpoints.csv
include_additional_mapset_pattern = test_tmp_*
```


## Job resumption examples
```
Expand Down
12 changes: 12 additions & 0 deletions src/actinia_core/core/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def __init__(self):
"AsyncEphemeralExportResource".lower(): "AsyncEphemeralExportResource",
"AsyncPersistentResource".lower(): "AsyncPersistentResource",
}
self.INCLUDE_ADDITIONAL_MAPSET_PATTERN = None

"""
LOGGING
Expand Down Expand Up @@ -649,6 +650,11 @@ def write(self, path=DEFAULT_CONFIG_PATH):
"INTERIM_SAVING_ENDPOINTS",
str(self.INTERIM_SAVING_ENDPOINTS),
)
config.set(
"MISC",
"INCLUDE_ADDITIONAL_MAPSET_PATTERN",
str(self.INCLUDE_ADDITIONAL_MAPSET_PATTERN),
)

config.add_section("LOGGING")
config.set("LOGGING", "LOG_INTERFACE", self.LOG_INTERFACE)
Expand Down Expand Up @@ -913,6 +919,12 @@ def read(self, path=DEFAULT_CONFIG_PATH):
self.INTERIM_SAVING_ENDPOINTS.update(
endpoints_dict
)
if config.has_option(
"MISC", "INCLUDE_ADDITIONAL_MAPSET_PATTERN"
):
self.INCLUDE_ADDITIONAL_MAPSET_PATTERN = config.get(
"MISC", "INCLUDE_ADDITIONAL_MAPSET_PATTERN"
)

if config.has_section("LOGGING"):
if config.has_option("LOGGING", "LOG_INTERFACE"):
Expand Down
60 changes: 60 additions & 0 deletions src/actinia_core/core/interim_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import os
import subprocess
import shutil
from fnmatch import filter
from .messages_logger import MessageLogger
from actinia_core.core.common.config import global_config, DEFAULT_CONFIG_PATH
from actinia_core.core.common.exceptions import RsyncError
Expand Down Expand Up @@ -83,6 +84,9 @@ def __init__(self, user_id, resource_id, iteration, endpoint):
self.iteration = iteration if iteration is not None else 1
self.old_pc_step = None
self.endpoint = endpoint
self.include_additional_mapset_pattern = (
global_config.INCLUDE_ADDITIONAL_MAPSET_PATTERN
)

def set_old_pc_step(self, old_pc_step):
"""Set method for the number of the successfully finished steps of
Expand Down Expand Up @@ -197,6 +201,29 @@ def _compare_sha512sums_of_folders(self, folder1, folder2):
else:
return False

def rsync_additional_mapsets(self, dest_path):
"""Using rsync to update additional mapsets from interim results to
temporary mapset
Args:
dest_path (str): Path to destination folder where the additional
mapset should be saved
"""

src_path = (
f"{self._get_interim_mapset_path(self.old_pc_step)}_add_mapsets"
)
if not os.path.isdir(src_path):
return

for mapset in os.listdir(src_path):
src = os.path.join(src_path, mapset)
dest = os.path.join(dest_path, mapset)
rsync_status = self.rsync_mapsets(src, dest)
if rsync_status != "success":
self.logger.info(
f"Syncing additional mapset <{mapset}> failed."
)

def rsync_mapsets(self, src, dest):
"""Using rsync to update the mapset folder.
Args:
Expand Down Expand Up @@ -278,6 +305,25 @@ def _get_interim_path(self):
self.resource_id,
)

def _get_included_additional_mapset_paths(
self, temp_mapset_path, progress_step
):
"""Returns lists with source paths of hte additional mapsets and
destination paths for them"""

if self.include_additional_mapset_pattern:
pattern = self.include_additional_mapset_pattern
tmp_path = os.path.dirname(temp_mapset_path)
dest_path = (
f"{self._get_interim_mapset_path(progress_step)}_add_mapsets"
)
mapsets = filter(os.listdir(tmp_path), pattern)
srcs = [os.path.join(tmp_path, mapset) for mapset in mapsets]
dests = [os.path.join(dest_path, mapset) for mapset in mapsets]
return srcs, dests
else:
return [], []

def _get_interim_mapset_path(self, progress_step):
"""Returns path where the interim mapset is saved"""
return os.path.join(
Expand Down Expand Up @@ -309,6 +355,12 @@ def save_interim_results(
self.logger.info("Saving interim results of step %d" % progress_step)
dest_mapset = self._get_interim_mapset_path(progress_step)
dest_tmpdir = self._get_interim_tmpdir_path(progress_step)
addm_src, addm_dest = self._get_included_additional_mapset_paths(
temp_mapset_path, progress_step
)

if temp_mapset_path is None:
return

if progress_step == 1 or force_copy is True:
# copy temp mapset for first step
Expand All @@ -318,6 +370,8 @@ def save_interim_results(
"Maspset %s and temp_file_path %s are copied"
% (temp_mapset_path, temp_file_path)
)
for m_src, m_dest in zip(addm_src, addm_dest):
shutil.copytree(m_src, m_dest)
else:
old_dest_mapset = self._get_interim_mapset_path(progress_step - 1)
old_dest_tmpdir = self._get_interim_tmpdir_path(progress_step - 1)
Expand All @@ -330,3 +384,9 @@ def save_interim_results(
self._saving_folder(
temp_file_path, dest_tmpdir, old_dest_tmpdir, progress_step
)
# saving additional mapsets
_, old_dests = self._get_included_additional_mapset_paths(
temp_mapset_path, progress_step - 1
)
for m_src, m_dest, old_dest in zip(addm_src, addm_dest, old_dests):
self._saving_folder(m_src, m_dest, old_dest, progress_step)
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,9 @@ def _create_temporary_mapset(
"Error while rsyncing of interim results to new temporare "
"mapset"
)
self.interim_result.rsync_additional_mapsets(
os.path.dirname(self.temp_mapset_path)
)
if interim_result_file_path:
self.message_logger.info(
"Rsync interim result file path to temporary GRASS DB"
Expand Down