diff --git a/PILOTVERSION b/PILOTVERSION index e88f7664..bbafcf0b 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.2.8 \ No newline at end of file +3.9.0.17 \ No newline at end of file diff --git a/pilot.py b/pilot.py index bb1bd797..5405cdb9 100755 --- a/pilot.py +++ b/pilot.py @@ -118,9 +118,6 @@ def main() -> int: https_setup(args, get_pilot_version()) args.amq = None - # update the OIDC token if necessary - update_local_oidc_token_info(args.url, args.port) - # let the server know that the worker has started if args.update_server and args.workerpilotstatusupdate: send_worker_status( @@ -149,6 +146,12 @@ def main() -> int: logger.fatal(error) return error.get_error_code() + # update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal) + if 'no_token_renewal' in infosys.queuedata.catchall: + logger.info("OIDC token will not be renewed by the pilot") + else: + update_local_oidc_token_info(args.url, args.port) + # handle special CRIC variables via params # internet protocol versions 'IPv4' or 'IPv6' can be set via CRIC PQ.params.internet_protocol_version # (must be defined per PQ if wanted). The pilot default is IPv6 diff --git a/pilot/control/data.py b/pilot/control/data.py index 28db36a3..c3d1af73 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -938,6 +938,8 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: entry.is_altstaged = True logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files)) + f = [entry.lfn for entry in remain_files] + job.piloterrordiags.append(f'Alternative stage-out for {f}') client.transfer(xdata, activity, **kwargs) except PilotException as error: @@ -1087,7 +1089,7 @@ def generate_fileinfo(job: JobData) -> dict: 'surl': entry.turl } if entry.is_altstaged: - dat['ddmendpoint'] = entry.ddmendpoint + dat['endpoint'] = entry.ddmendpoint fileinfo[entry.lfn] = dat diff --git a/pilot/control/job.py b/pilot/control/job.py index f9b80c25..d9b4d1fa 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -3226,7 +3226,8 @@ def send_heartbeat_if_time(job: Any, args: Any, update_time: float) -> int: # job.completed will anyway be checked in https::send_update() if job.serverstate not in {'finished', 'failed'}: logger.info(f'will send heartbeat for job in \'{job.state}\' state') - send_state(job, args, job.state) + logger.info("note: will only send \'running\' state to server to prevent sending any final state too early") + send_state(job, args, "running") update_time = time.time() else: logger.debug('will not send any job update') diff --git a/pilot/control/monitor.py b/pilot/control/monitor.py index 24e056de..f2110c18 100644 --- a/pilot/control/monitor.py +++ b/pilot/control/monitor.py @@ -106,7 +106,10 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 if tokendownloadchecktime: if int(time.time() - last_token_check) > tokendownloadchecktime: last_token_check = time.time() - update_local_oidc_token_info(args.url, args.port) + if 'no_token_renewal' in queuedata.catchall: + logger.info("OIDC token will not be renewed by the pilot") + else: + update_local_oidc_token_info(args.url, args.port) # abort if kill signal arrived too long time ago, ie loop is stuck if args.kill_time and int(time.time()) - args.kill_time > MAX_KILL_WAIT_TIME: diff --git a/pilot/util/constants.py b/pilot/util/constants.py index e497600c..98d66af2 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -26,9 +26,9 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 -VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '8' # build number should be reset to '1' for every new development cycle +VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates +REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '17' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index 5243eca6..74c109b9 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 """Functions for executing commands.""" @@ -31,6 +31,7 @@ import threading from os import environ, getcwd, getpgid, kill #, setpgrp, getpgid #setsid +from queue import Queue from signal import SIGTERM, SIGKILL from time import sleep from typing import Any, TextIO @@ -46,7 +47,223 @@ execute_lock = threading.Lock() -def execute(executable: Any, **kwargs: dict) -> Any: +def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901 + """ + Executes the command with its options in the provided executable list using subprocess time-out handler. + + The function also determines whether the command should be executed within a container. + + :param executable: command to be executed (str or list) + :param kwargs: kwargs (dict) + :return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument). + """ + usecontainer = kwargs.get('usecontainer', False) + job = kwargs.get('job') + #shell = kwargs.get("shell", False) + obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message + + # convert executable to string if it is a list + if isinstance(executable, list): + executable = ' '.join(executable) + + # switch off pilot controlled containers for user defined containers + if job and job.imagename != "" and "runcontainer" in executable: + usecontainer = False + job.usecontainer = usecontainer + + # Import user specific code if necessary (in case the command should be executed in a container) + # Note: the container.wrapper() function must at least be declared + if usecontainer: + executable, diagnostics = containerise_executable(executable, **kwargs) + if not executable: + return None if kwargs.get('returnproc', False) else -1, "", diagnostics + + if not kwargs.get('mute', False): + print_executable(executable, obscure=obscure) + + # always use a timeout to prevent stdout buffer problem in nodes with lots of cores + timeout = get_timeout(kwargs.get('timeout', None)) + + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] + + # try: intercept exception such as OSError -> report e.g. error.RESOURCEUNAVAILABLE: "Resource temporarily unavailable" + exit_code = 0 + stdout = '' + stderr = '' + + # Acquire the lock before creating the subprocess + process = None + with execute_lock: + process = subprocess.Popen(exe, + bufsize=-1, + stdout=kwargs.get('stdout', subprocess.PIPE), + stderr=kwargs.get('stderr', subprocess.PIPE), + cwd=kwargs.get('cwd', getcwd()), + preexec_fn=os.setsid, # setpgrp + encoding='utf-8', + errors='replace') + if kwargs.get('returnproc', False): + return process + + # Create threads to read stdout and stderr asynchronously + stdout_queue = Queue() + stderr_queue = Queue() + + def read_output(stream, queue): + while True: + try: + line = stream.readline() + except AttributeError: + # Handle the case where stream is None + break + + if not line: + break + + queue.put(line) + + stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue)) + stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue)) + + stdout_thread.start() + stderr_thread.start() + + try: + logger.debug(f'subprocess.communicate() will use timeout {timeout} s') + stdout, stderr = process.communicate(timeout=timeout) + except subprocess.TimeoutExpired as exc: + # make sure that stdout buffer gets flushed - in case of time-out exceptions + # flush_handler(name="stream_handler") + stderr += f'subprocess communicate sent TimeoutExpired: {exc}' + logger.warning(stderr) + exit_code = errors.COMMANDTIMEDOUT + stderr = kill_all(process, stderr) + except Exception as exc: + logger.warning(f'exception caused when executing command: {executable}: {exc}') + exit_code = errors.UNKNOWNEXCEPTION + stderr = kill_all(process, str(exc)) + else: + exit_code = process.poll() + + # Wait for the threads to finish reading + stdout_thread.join() + stderr_thread.join() + + # Read the remaining output from the queues + while not stdout_queue.empty(): + stdout += stdout_queue.get() + while not stderr_queue.empty(): + stderr += stderr_queue.get() + + # wait for the process to finish + # (not strictly necessary when process.communicate() is used) + try: + # wait for the process to complete with a timeout of 60 seconds + if process: + process.wait(timeout=60) + except subprocess.TimeoutExpired: + # Handle the case where the process did not complete within the timeout + if process: + logger.warning("process did not complete within the timeout of 60s - terminating") + process.terminate() + + # remove any added \n + if stdout and stdout.endswith('\n'): + stdout = stdout[:-1] + + return exit_code, stdout, stderr + + +def execute_old2(executable: Any, **kwargs: dict) -> Any: # noqa: C901 + usecontainer = kwargs.get('usecontainer', False) + job = kwargs.get('job') + obscure = kwargs.get('obscure', '') + + if isinstance(executable, list): + executable = ' '.join(executable) + + if job and job.imagename != "" and "runcontainer" in executable: + usecontainer = False + job.usecontainer = usecontainer + + if usecontainer: + executable, diagnostics = containerise_executable(executable, **kwargs) + if not executable: + return None if kwargs.get('returnproc', False) else -1, "", diagnostics + + if not kwargs.get('mute', False): + print_executable(executable, obscure=obscure) + + timeout = get_timeout(kwargs.get('timeout', None)) + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] + + exit_code = 0 + stdout = '' + stderr = '' + + def read_output(pipe, output_list): + for line in iter(pipe.readline, ''): + output_list.append(line) + pipe.close() + + process = None + with execute_lock: + process = subprocess.Popen(exe, + bufsize=-1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=kwargs.get('cwd', getcwd()), + preexec_fn=os.setsid, + encoding='utf-8', + errors='replace') + if kwargs.get('returnproc', False): + return process + + stdout_lines = [] + stderr_lines = [] + + stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_lines)) + stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_lines)) + + stdout_thread.start() + stderr_thread.start() + + try: + logger.debug(f'subprocess.communicate() will use timeout {timeout} s') + process.wait(timeout=timeout) + except subprocess.TimeoutExpired as exc: + stderr += f'subprocess communicate sent TimeoutExpired: {exc}' + logger.warning(stderr) + exit_code = errors.COMMANDTIMEDOUT + stderr = kill_all(process, stderr) + except Exception as exc: + logger.warning(f'exception caused when executing command: {executable}: {exc}') + exit_code = errors.UNKNOWNEXCEPTION + stderr = kill_all(process, str(exc)) + else: + exit_code = process.poll() + + stdout_thread.join() + stderr_thread.join() + + stdout = ''.join(stdout_lines) + stderr = ''.join(stderr_lines) + + try: + if process: + process.wait(timeout=60) + except subprocess.TimeoutExpired: + if process: + logger.warning("process did not complete within the timeout of 60s - terminating") + process.terminate() + + if stdout and stdout.endswith('\n'): + stdout = stdout[:-1] + + return exit_code, stdout, stderr + + +def execute_old(executable: Any, **kwargs: dict) -> Any: """ Execute the command with its options in the provided executable list using subprocess time-out handler. diff --git a/pilot/util/https.py b/pilot/util/https.py index a26c75ce..70bcacc8 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -858,7 +858,7 @@ def request2(url: str = "", # Send the request securely try: logger.debug('sending data to server') - with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response: + with urllib.request.urlopen(req, context=ssl_context, timeout=config.Pilot.http_maxtime) as response: # Handle the response here logger.debug(f"response.status={response.status}, response.reason={response.reason}") ret = response.read().decode('utf-8') @@ -870,13 +870,14 @@ def request2(url: str = "", ret = "" else: if secure and isinstance(ret, str): - if ret.startswith('{') and ret.endswith('}'): + if ret == 'Succeeded': # this happens for sending modeOn (debug mode) + ret = {'StatusCode': '0'} + elif ret.startswith('{') and ret.endswith('}'): try: ret = json.loads(ret) except json.JSONDecodeError as e: logger.warning(f'failed to parse response: {e}') - else: - # For panda server interactions, the response should be in dictionary format + else: # response="StatusCode=_some number_" # Parse the query string into a dictionary query_dict = parse_qs(ret) diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 4a16dcad..b1c16536 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -54,6 +54,44 @@ def find_processes_in_group(cpids: list, pid: int, ps_cache: str = ""): The cpids input parameter list gets updated in the function. + :param cpids: list of pid's for all child processes to the parent pid, as well as the parent pid itself (int) + :param pid: parent process id (int) + :param ps_cache: ps command output (str). + """ + visited = set() + stack = [pid] + + while stack: + current_pid = stack.pop() + if current_pid in visited: + continue + visited.add(current_pid) + cpids.append(current_pid) + lines = grep_str([str(current_pid)], ps_cache) + + if lines and lines != ['']: + for line in lines: + try: + thispid, thisppid = [int(x) for x in line.split()[:2]] + except Exception as error: + logger.warning(f'exception caught: {error}') + else: + if thisppid == current_pid: + stack.append(thispid) + + +def find_processes_in_group_old(cpids: list, pid: int, ps_cache: str = ""): + """ + Find all processes that belong to the same group using the given ps command output. + + Recursively search for the children processes belonging to pid and return their pid's. + pid is the parent pid and cpids is a list that has to be initialized before calling this function and it contains + the pids of the children AND the parent. + + ps_cache is expected to be the output from the command "ps -eo pid,ppid -m". + + The cpids input parameter list gets updated in the function. + :param cpids: list of pid's for all child processes to the parent pid, as well as the parent pid itself (int) :param pid: parent process id (int) :param ps_cache: ps command output (str). @@ -616,7 +654,7 @@ def cleanup(job: JobData, args: object): logger.info("collected zombie processes") if job.pid: - logger.info(f"will attempt to kill all subprocesses of pid={job.pid}") + logger.info(f"will attempt to kill all subprocesses of pid={job.pid}") kill_processes(job.pid) else: logger.warning('cannot kill any subprocesses since job.pid is not set')