Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.9.0.17 #149

Merged
merged 17 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.8.2.8
3.9.0.17
9 changes: 6 additions & 3 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ def main() -> int:
https_setup(args, get_pilot_version())
args.amq = None

# update the OIDC token if necessary
update_local_oidc_token_info(args.url, args.port)

# let the server know that the worker has started
if args.update_server and args.workerpilotstatusupdate:
send_worker_status(
Expand Down Expand Up @@ -149,6 +146,12 @@ def main() -> int:
logger.fatal(error)
return error.get_error_code()

# update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal)
if 'no_token_renewal' in infosys.queuedata.catchall:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)

# handle special CRIC variables via params
# internet protocol versions 'IPv4' or 'IPv6' can be set via CRIC PQ.params.internet_protocol_version
# (must be defined per PQ if wanted). The pilot default is IPv6
Expand Down
4 changes: 3 additions & 1 deletion pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,8 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
entry.is_altstaged = True

logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files))
f = [entry.lfn for entry in remain_files]
job.piloterrordiags.append(f'Alternative stage-out for {f}')
client.transfer(xdata, activity, **kwargs)

except PilotException as error:
Expand Down Expand Up @@ -1087,7 +1089,7 @@ def generate_fileinfo(job: JobData) -> dict:
'surl': entry.turl
}
if entry.is_altstaged:
dat['ddmendpoint'] = entry.ddmendpoint
dat['endpoint'] = entry.ddmendpoint

fileinfo[entry.lfn] = dat

Expand Down
3 changes: 2 additions & 1 deletion pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3226,7 +3226,8 @@ def send_heartbeat_if_time(job: Any, args: Any, update_time: float) -> int:
# job.completed will anyway be checked in https::send_update()
if job.serverstate not in {'finished', 'failed'}:
logger.info(f'will send heartbeat for job in \'{job.state}\' state')
send_state(job, args, job.state)
logger.info("note: will only send \'running\' state to server to prevent sending any final state too early")
send_state(job, args, "running")
update_time = time.time()
else:
logger.debug('will not send any job update')
Expand Down
5 changes: 4 additions & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
if tokendownloadchecktime:
if int(time.time() - last_token_check) > tokendownloadchecktime:
last_token_check = time.time()
update_local_oidc_token_info(args.url, args.port)
if 'no_token_renewal' in queuedata.catchall:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)

# abort if kill signal arrived too long time ago, ie loop is stuck
if args.kill_time and int(time.time()) - args.kill_time > MAX_KILL_WAIT_TIME:
Expand Down
6 changes: 3 additions & 3 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '8' # build number should be reset to '1' for every new development cycle
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '17' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
221 changes: 219 additions & 2 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#
# Authors:
# - Paul Nilsson, [email protected], 2018-23
# - Paul Nilsson, [email protected], 2018-24

"""Functions for executing commands."""

Expand All @@ -31,6 +31,7 @@
import threading

from os import environ, getcwd, getpgid, kill #, setpgrp, getpgid #setsid
from queue import Queue
from signal import SIGTERM, SIGKILL
from time import sleep
from typing import Any, TextIO
Expand All @@ -46,7 +47,223 @@
execute_lock = threading.Lock()


def execute(executable: Any, **kwargs: dict) -> Any:
def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901
"""
Executes the command with its options in the provided executable list using subprocess time-out handler.

The function also determines whether the command should be executed within a container.

:param executable: command to be executed (str or list)
:param kwargs: kwargs (dict)
:return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument).
"""
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
#shell = kwargs.get("shell", False)
obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message

# convert executable to string if it is a list
if isinstance(executable, list):
executable = ' '.join(executable)

# switch off pilot controlled containers for user defined containers
if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

# Import user specific code if necessary (in case the command should be executed in a container)
# Note: the container.wrapper() function must at least be declared
if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

# always use a timeout to prevent stdout buffer problem in nodes with lots of cores
timeout = get_timeout(kwargs.get('timeout', None))

exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

# try: intercept exception such as OSError -> report e.g. error.RESOURCEUNAVAILABLE: "Resource temporarily unavailable"
exit_code = 0
stdout = ''
stderr = ''

# Acquire the lock before creating the subprocess
process = None
with execute_lock:
process = subprocess.Popen(exe,
bufsize=-1,
stdout=kwargs.get('stdout', subprocess.PIPE),
stderr=kwargs.get('stderr', subprocess.PIPE),
cwd=kwargs.get('cwd', getcwd()),
preexec_fn=os.setsid, # setpgrp
encoding='utf-8',
errors='replace')
if kwargs.get('returnproc', False):
return process

# Create threads to read stdout and stderr asynchronously
stdout_queue = Queue()
stderr_queue = Queue()

def read_output(stream, queue):
while True:
try:
line = stream.readline()
except AttributeError:
# Handle the case where stream is None
break

if not line:
break

queue.put(line)

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))

stdout_thread.start()
stderr_thread.start()

try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired as exc:
# make sure that stdout buffer gets flushed - in case of time-out exceptions
# flush_handler(name="stream_handler")
stderr += f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()

# Wait for the threads to finish reading
stdout_thread.join()
stderr_thread.join()

# Read the remaining output from the queues
while not stdout_queue.empty():
stdout += stdout_queue.get()
while not stderr_queue.empty():
stderr += stderr_queue.get()

# wait for the process to finish
# (not strictly necessary when process.communicate() is used)
try:
# wait for the process to complete with a timeout of 60 seconds
if process:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
# Handle the case where the process did not complete within the timeout
if process:
logger.warning("process did not complete within the timeout of 60s - terminating")
process.terminate()

# remove any added \n
if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr


def execute_old2(executable: Any, **kwargs: dict) -> Any: # noqa: C901
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
obscure = kwargs.get('obscure', '')

if isinstance(executable, list):
executable = ' '.join(executable)

if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

timeout = get_timeout(kwargs.get('timeout', None))
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

exit_code = 0
stdout = ''
stderr = ''

def read_output(pipe, output_list):
for line in iter(pipe.readline, ''):
output_list.append(line)
pipe.close()

process = None
with execute_lock:
process = subprocess.Popen(exe,
bufsize=-1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=kwargs.get('cwd', getcwd()),
preexec_fn=os.setsid,
encoding='utf-8',
errors='replace')
if kwargs.get('returnproc', False):
return process

stdout_lines = []
stderr_lines = []

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_lines))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_lines))

stdout_thread.start()
stderr_thread.start()

try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
process.wait(timeout=timeout)
except subprocess.TimeoutExpired as exc:
stderr += f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()

stdout_thread.join()
stderr_thread.join()

stdout = ''.join(stdout_lines)
stderr = ''.join(stderr_lines)

try:
if process:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
if process:
logger.warning("process did not complete within the timeout of 60s - terminating")
process.terminate()

if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr


def execute_old(executable: Any, **kwargs: dict) -> Any:
"""
Execute the command with its options in the provided executable list using subprocess time-out handler.

Expand Down
9 changes: 5 additions & 4 deletions pilot/util/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ def request2(url: str = "",
# Send the request securely
try:
logger.debug('sending data to server')
with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response:
with urllib.request.urlopen(req, context=ssl_context, timeout=config.Pilot.http_maxtime) as response:
# Handle the response here
logger.debug(f"response.status={response.status}, response.reason={response.reason}")
ret = response.read().decode('utf-8')
Expand All @@ -870,13 +870,14 @@ def request2(url: str = "",
ret = ""
else:
if secure and isinstance(ret, str):
if ret.startswith('{') and ret.endswith('}'):
if ret == 'Succeeded': # this happens for sending modeOn (debug mode)
ret = {'StatusCode': '0'}
elif ret.startswith('{') and ret.endswith('}'):
try:
ret = json.loads(ret)
except json.JSONDecodeError as e:
logger.warning(f'failed to parse response: {e}')
else:
# For panda server interactions, the response should be in dictionary format
else: # response="StatusCode=_some number_"
# Parse the query string into a dictionary
query_dict = parse_qs(ret)

Expand Down
Loading
Loading