Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stdin for parameters #501

Merged
merged 7 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 88 additions & 11 deletions src/actinia_core/core/common/process_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# performance processing of geographical data that uses GRASS GIS for
# computational tasks. For details, see https://actinia.mundialis.de/
#
# Copyright (c) 2016-2021 Sören Gebbert and mundialis GmbH & Co. KG
# Copyright (c) 2016-2024 Sören Gebbert and mundialis GmbH & Co. KG
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -56,11 +56,18 @@
" Anika Weinmann"
)
__copyright__ = (
"Copyright 2016-2022, Sören Gebbert and mundialis GmbH & Co. KG"
"Copyright 2016-2024, Sören Gebbert and mundialis GmbH & Co. KG"
)
__maintainer__ = "mundialis"


def get_param_stdin_part(text):
"""Function to get method and filter from parameter value"""
for delimiter in ["::", " ", "+", "-", "*", ":", "(", ")"]:
text = text.split(delimiter, 1)[0]
return text


class ProcessChainConverter(object):
"""
Convert the process chain description into a process list that can be
Expand Down Expand Up @@ -143,6 +150,7 @@ def __init__(
self.webhook_finished = None
self.webhook_update = None
self.webhook_auth = None
self.stdin_num = 0

def process_chain_to_process_list(self, process_chain):
if not process_chain:
Expand Down Expand Up @@ -614,6 +622,7 @@ def _create_module_process(self, module_descr):
if self.message_logger:
self.message_logger.info(str(module_descr))
params = []
param_stdin_funcs = {}

if "id" not in module_descr:
raise AsyncProcessError(
Expand All @@ -640,12 +649,12 @@ def _create_module_process(self, module_descr):

if "inputs" in module_descr:
self._add_grass_module_input_parameter_to_list(
module_descr, params, id
module_descr, params, param_stdin_funcs, id
)

if "outputs" in module_descr:
self._add_grass_module_output_parameter_to_list(
module_descr, params, id, module_name
module_descr, params, param_stdin_funcs, id, module_name
)

if "flags" in module_descr:
Expand Down Expand Up @@ -688,6 +697,7 @@ def _create_module_process(self, module_descr):
executable=module_name,
executable_params=params,
stdin_source=stdin_func,
param_stdin_sources=param_stdin_funcs,
id=id,
)

Expand All @@ -714,16 +724,63 @@ def _create_stdin_process(self, module_descr, id):
"The stdin option in id %s misses the ::" % str(id)
)
object_id, method = module_descr["stdin"].split("::")
if "stdout" == method is True:
stdin_func = self.process_dict[object_id].stdout
elif "stderr" == method is True:
stdin_func = self.process_dict[object_id].stderr
if "stdout" == method:
stdin_func = self.process_dict[object_id].get_stdout
elif "stderr" == method:
stdin_func = self.process_dict[object_id].get_stderr
else:
raise AsyncProcessError(
"The stdout or stderr flag in id %s is missing" % str(id)
)
return stdin_func

def _create_param_stdin_process(self, param_stdin_funcs, param_val, param):
"""Helper methods to create parameter stdin process.

Args:
module_descr (dict): The module description
param_stdin_funcs(dict): The dictionary with the stdout/stderr
functions
param_val(str): The value of parameter of the module
param(str): The parameter name of the module

Returns:
stdin_func(Process): An object of type Process that
contains the module name, the
parameter list and stdin definitions
filter(str): A string to filter stdout e.g. "max" for r.univar
"""
for mod in self.process_dict:
p_splitted = param_val.split(f"{mod}::")
p_len = len(p_splitted)
for i in range(1, p_len):
object_id = mod
method = get_param_stdin_part(p_splitted[i])
rest_str = p_splitted[i].replace(method, "")
filter = None
if rest_str.startswith("::"):
filter = get_param_stdin_part(rest_str)

if "stdout" == method:
stdin_func = self.process_dict[object_id].get_stdout
elif "stderr" == method:
stdin_func = self.process_dict[object_id].get_stderr
else:
raise AsyncProcessError(
f"The stdout or stderr flag in id {id} is missing"
)
func_str = f"{object_id}::{method}"
func_name = f"PARAM_STDIN_FUNC_{self.stdin_num}"
if filter:
func_str += f"::{filter}"
param += f"::{filter}"

param_stdin_funcs[self.stdin_num] = stdin_func
param_val = param_val.replace(func_str, func_name)
self.stdin_num += 1

return param_val

def _create_exec_process(self, module_descr):
"""Analyse a grass process description dict and create a Process
that is used to execute a common Linux binary.
Expand Down Expand Up @@ -770,7 +827,6 @@ def _create_exec_process(self, module_descr):
executable = module_descr["exe"]

params = []

if "params" in module_descr:
for search_string in module_descr["params"]:
# Search for file identifiers and generate the temporary file
Expand Down Expand Up @@ -1166,7 +1222,7 @@ def _create_exec_process_legacy(self, id, module_descr):
return p

def _add_grass_module_input_parameter_to_list(
self, module_descr, params, id
self, module_descr, params, param_stdin_funcs, id
):
"""Helper method to set the input parameters of a grass module and add
them to the params list.
Expand All @@ -1176,6 +1232,8 @@ def _add_grass_module_input_parameter_to_list(
params (list): The list of the grass module inputs parameters with
param=value entries (here the input parameter are
added)
param_stdin_funcs (dict): The dictonary with the stdin parameter
functions
id (str): The id of this process in the process chain
"""
if isinstance(module_descr["inputs"], list) is False:
Expand Down Expand Up @@ -1210,6 +1268,14 @@ def _add_grass_module_input_parameter_to_list(
file_id
] = self.generate_temp_file_path()
param = "%s=%s" % (param, self.temporary_pc_files[file_id])
elif "::" in value and value.split("::")[1] in [
"stdout",
"stderr",
]:
param_val = self._create_param_stdin_process(
param_stdin_funcs, value, param
)
param = f"{param}={param_val}"
else:
param = "%s=%s" % (param, value)
# Check for mapset in input name and append it,
Expand Down Expand Up @@ -1255,7 +1321,7 @@ def _add_grass_module_input_parameter_to_list(
params.append(param)

def _add_grass_module_output_parameter_to_list(
self, module_descr, params, id, module_name
self, module_descr, params, param_stdin_funcs, id, module_name
):
"""Helper method to set the output parameters of a grass module and add
them to the params list. If export is in the output parameter the
Expand All @@ -1266,6 +1332,8 @@ def _add_grass_module_output_parameter_to_list(
params (list): The list of the grass module parameters with
param=value entries (here the output parameter are
added)
param_stdin_funcs (dict): The dictonary with the stdin parameter
functions
id (str): The id of this process in the process chain
module_name (str): The name of the grass module

Expand Down Expand Up @@ -1341,6 +1409,15 @@ def _add_grass_module_output_parameter_to_list(
file_id,
output["export"]["format"].lower(),
)
elif "::" in value and value.split("::")[1] in [
"stdout",
"stderr",
]:
id = module_descr["id"]
param_val = self._create_param_stdin_process(
param_stdin_funcs, value, param
)
param = f"{param}={param_val}"
else:
param = "%s=%s" % (param, value)
params.append(param)
Expand Down
4 changes: 4 additions & 0 deletions src/actinia_core/core/common/process_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
executable,
executable_params,
stdin_source=None,
param_stdin_sources=None,
skip_permission_check=False,
id=None,
):
Expand All @@ -58,6 +59,8 @@ def __init__(
for the executable
stdin_source (str): The get_stdout or get_stderr method of a
Process
param_stdin_sources (dict): The get_stdout or get_stderr methods of
a process parameter
skip_permission_check(boolean): Skip permission check for the
module or executable, this is
meaningful for internal process
Expand All @@ -75,6 +78,7 @@ def __init__(

self.executable_params = executable_params
self.stdin_source = stdin_source
self.param_stdin_sources = param_stdin_sources
self.stdout = None
self.stderr = None
self.skip_permission_check = skip_permission_check
Expand Down
6 changes: 5 additions & 1 deletion src/actinia_core/models/process_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,11 @@ class Executable(Schema):
"params": {
"type": "array",
"items": {"type": "string"},
"description": "A list of input parameters of a GRASS GIS module.",
"description": "A list of input parameters of a GRASS GIS module."
"By setting module_id::stdout(::filter) the stdout of another "
"module can be used as input for the current module. E.g. "
"'r_univar_module_id::stdout::max' can be used to do a rescaling "
"of a raster.",
},
"stdin": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# performance processing of geographical data that uses GRASS GIS for
# computational tasks. For details, see https://actinia.mundialis.de/
#
# Copyright (c) 2016-2023 Sören Gebbert and mundialis GmbH & Co. KG
# Copyright (c) 2016-2024 Sören Gebbert and mundialis GmbH & Co. KG
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -47,7 +47,10 @@
from actinia_core.core.redis_lock import RedisLockingInterface
from actinia_core.core.resources_logger import ResourceLogger
from actinia_core.core.mapset_merge_utils import change_mapsetname
from actinia_core.core.common.process_chain import ProcessChainConverter
from actinia_core.core.common.process_chain import (
get_param_stdin_part,
ProcessChainConverter,
)
from actinia_core.core.common.exceptions import (
AsyncProcessError,
AsyncProcessTermination,
Expand All @@ -71,7 +74,7 @@
__license__ = "GPLv3"
__author__ = "Sören Gebbert, Anika Weinmann, Lina Krisztian"
__copyright__ = (
"Copyright 2016-2023, Sören Gebbert and mundialis GmbH & Co. KG"
"Copyright 2016-2024, Sören Gebbert and mundialis GmbH & Co. KG"
)
__maintainer__ = "mundialis GmbH & Co. KG"

Expand Down Expand Up @@ -1685,6 +1688,42 @@ def _run_executable(self, process, poll_time=0.005):
)
stdin_file = None

if process.param_stdin_sources:
for num, func in process.param_stdin_sources.items():
func_name = f"PARAM_STDIN_FUNC_{num}"
for i in range(len(process.executable_params)):
param = process.executable_params[i]
if func_name in param:
par, val = param.split("=", 1)
par_val = func().strip()
val_splitted = val.split(func_name)
for j in range(1, len(val_splitted)):
filtered_par_value = par_val
filtered_func_name = func_name
# filter stdout/stderr
if "::" in val_splitted[j]:
filter = get_param_stdin_part(
val_splitted[j][2:]
)
if "=" not in par_val:
raise AsyncProcessError(
"Error while running executable "
f"<{process.executable}>: <{filter}> "
"cannot be selected. Maybe you have to "
"set the '-g' flag for the stdout/stderr "
"module."
)
filtered_par_value = {
x.split("=")[0]: x.split("=")[1]
for x in par_val.split()
}[filter]
filtered_func_name += f"::{filter}"
process.executable_params[
i
] = process.executable_params[i].replace(
filtered_func_name, filtered_par_value
)

if process.stdin_source is not None:
tmp_file = self.proc_chain_converter.generate_temp_file_path()
stdin_file = open(tmp_file, "w")
Expand Down
Loading