Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTCONDOR-2688 python-htcondor2 #619

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions contrib/bdii/htcondor-ce-provider
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ from datetime import datetime
import subprocess
from collections import defaultdict, namedtuple
import signal
import htcondor
import classad as ca
import htcondor2 as htcondor
import classad2 as ca

SERVICE_LDIF = """dn: GLUE2ServiceID={central_manager},GLUE2GroupID=resource,o=glue
GLUE2ServiceID: {central_manager}
Expand Down
2 changes: 1 addition & 1 deletion contrib/bosco/bosco-cluster-remote-hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys

try:
import classad
import classad2 as classad
except ImportError:
sys.exit("ERROR: Could not load HTCondor Python bindings. "
"Ensure the 'htcondor' and 'classad' are in PYTHONPATH")
Expand Down
2 changes: 1 addition & 1 deletion src/collector_to_agis
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import optparse

if 'CONDOR_CONFIG' not in os.environ:
os.environ['CONDOR_CONFIG'] = '/etc/condor-ce/condor_config'
import htcondor
import htcondor2 as htcondor

import htcondorce.agis_compat

Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_host_network_check
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import fnmatch
import optparse

os.environ.setdefault('CONDOR_CONFIG', '/etc/condor-ce/condor_config')
import htcondor
import htcondor2 as htcondor
from htcondorce.tools import to_str

from socket import AF_INET, AF_INET6, inet_ntop
Expand Down
4 changes: 2 additions & 2 deletions src/condor_ce_info_status
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import optparse

os.environ.setdefault("CONDOR_CONFIG", "/etc/condor-ce/condor_config")

import classad
import htcondor
import classad2 as classad
import htcondor2 as htcondor

import htcondorce.info_query

Expand Down
6 changes: 3 additions & 3 deletions src/condor_ce_jobmetrics
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def _newstat():
return {"Running": 0, "Idle": 0, "Held": 0, "Jobs": 0}

def process_one_schedd(ad):
import htcondor
import classad
import htcondor2 as htcondor
import classad2 as classad
ad = classad.ClassAd(ad)
schedd = htcondor.Schedd(ad)
status_map = {1: "Idle", 2: "Running", 5: "Held"}
Expand All @@ -52,7 +52,7 @@ def process_one_schedd(ad):
gpu_job_count = collections.defaultdict(_newstat)
print(f"Processing CE {ad.get('Name', 'Unknown')}.", file=sys.stderr)
try:
for job in schedd.xquery("RoutedJob is UNDEFINED", ["JobStatus", 'x509UserProxyVOName',
for job in schedd.query("RoutedJob is UNDEFINED", ["JobStatus", 'x509UserProxyVOName',
'x509UserProxyFirstFQAN', 'x509userproxysubject', 'RequestGPUs']):
dn = job.get("x509userproxysubject") or 'Unknown'
vo = job.get('x509UserProxyVOName') or 'Unknown'
Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_register
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import socket
import subprocess
import sys

import htcondor
import htcondor2 as htcondor

DEFAULT_COLLECTOR_PORT="9619"

Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_router_defaults
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import re
import pwd
import classad
import classad2 as classad
from collections import OrderedDict

JOB_ROUTER_CONFIG = r"""JOB_ROUTER_TRANSFORM_Env @=jrt
Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_scitoken_exchange
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import time

os.environ.setdefault("CONDOR_CONFIG", "/etc/condor-ce/condor_config")

import htcondor
import htcondor2 as htcondor
import htcondorce.tools as ce


Expand Down
143 changes: 34 additions & 109 deletions src/condor_ce_trace
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import traceback

os.environ.setdefault("CONDOR_CONFIG", "/etc/condor-ce/condor_config")

import classad
import htcondor
import classad2 as classad
import htcondor2 as htcondor
import htcondorce.tools as ce

G_DEBUG = False
Expand All @@ -39,14 +39,10 @@ def verify_matching_condor_versions():
'Please ensure that you only have one version of HTCondor installed.')


def run_ping(address, daemon='SCHEDD'):
def run_ping(collector_name, schedd_name):
print("Testing HTCondor-CE authorization...")

if not address.startswith("<"):
address = "<%s>" % address
if daemon == "SCHEDD":
args = ["condor_ce_ping", "-addr", str(address), "-verbose", "-type", daemon, "-debug", "WRITE"]
else:
args = ["condor_ce_ping", "-addr", str(address), "-verbose", "-type", daemon, "-debug", "READ"]
args = ["condor_ce_ping", "-pool", collector_name, "-name", schedd_name, "-verbose", "-debug", "WRITE"]

try:
rc, stdout, _ = ce.run_command(args)
Expand All @@ -60,13 +56,13 @@ def run_ping(address, daemon='SCHEDD'):

if rc < 0:
raise ce.CondorRunException("Failed to ping %s; condor_ping terminated with signal %d." \
% (address, -rc))
% (schedd_name, -rc))
elif rc > 0:
if re.search('Failed to connect', stdout):
raise ce.CondorRunException("Failed to ping %s: Please contact the site's system adminstrator to " \
"ensure that the CE you're trying to contact is functional." % address)
"ensure that the CE you're trying to contact is functional." % schedd_name)
else:
message = "Failed to ping %s; authorization check exited with code %d." % (address, rc)
message = "Failed to ping %s; authorization check exited with code %d." % (schedd_name, rc)
if not G_DEBUG:
message = message + " Re-run the command with '-d' for more verbose output."
raise ce.CondorRunException(message)
Expand All @@ -76,7 +72,9 @@ def run_ping(address, daemon='SCHEDD'):
raise ce.CondorUserException("User %s does not have permissions for %s. Please contact the CE's " \
"system administrator to ensure that your user is mapped properly " \
"in the site's authentication system."
% (unauthorized_user.group(1), address))
% (unauthorized_user.group(1), schedd_name))

print(f"Verified WRITE access for scheduler daemon {schedd_name}")

def parse_opts():

Expand Down Expand Up @@ -104,91 +102,18 @@ def parse_opts():
return opts, args


def check_authz(collector_ad, schedd_ad):
print("Testing HTCondor-CE authorization...")
ping_args = [(collector_ad, 'READ', 'collector'),
(schedd_ad, 'WRITE', 'scheduler')]

for daemon_ad, cmd, dtype in ping_args:
addr = daemon_ad['MyAddress']
try:
ping_ad = htcondor.SecMan().ping(daemon_ad, cmd)
except RuntimeError:
# if python binding ping fails to authz, it raises an exception and doesn't tell us anything useful
# actually run condor_ping for troubleshooting information
msg = 'ERROR: %s access failed for %s daemon at %s.' % (cmd, dtype, addr)
if not G_DEBUG:
msg += " Re-run with '--debug' for more information."
else:
cmd = ['condor_ping', '-addr', addr, '-verbose', cmd]
_, stdout, _ = ce.run_command(cmd)
msg += '\n' + stdout
raise ce.CondorRunException(msg)
else:
print(f"Verified {cmd} access for {dtype} daemon at {addr}")
if G_DEBUG:
print("*"*5, "Ping response ClassAd", "*"*5, ping_ad)
print("*"*20)


def job_proxy_info(proxy_path):
"""Given the path to a proxy, return a dict of x509 ClassAd attrs describing the proxy
"""
results = {}
subj_rc, subj_stdout, _ = ce.run_command(["voms-proxy-info", "-file", proxy_path, "-subject"])
if subj_rc != 0:
raise ce.CondorUserException("Cannot parse proxy at %s." % proxy_path)
subj_stdout = subj_stdout.strip()
results['x509userproxysubject'] = subj_stdout
results['x509UserProxyFQAN'] = subj_stdout

time_rc, time_stdout, _ = ce.run_command(["voms-proxy-info", "-file", proxy_path, "-timeleft"])
if time_rc != 0:
raise ce.CondorUserException("Cannot parse proxy at %s." % proxy_path)
time_stdout = int(time_stdout)
if time_stdout <= 0:
print("WARNING: available proxy appears to be expired")
results['x509UserProxyExpiration'] = int(time.time()) + int(time_stdout)

vo_rc, vo_stdout, _ = ce.run_command(["voms-proxy-info", "-file", proxy_path, "-vo"])
if vo_rc == 0:
results['x509UserProxyVOName'] = vo_stdout.strip()

fqan_rc, fqan_stdout, _ = ce.run_command(["voms-proxy-info", "-file", proxy_path, "-fqan"])
if fqan_rc == 0:
fqan_lines = [i.strip() for i in fqan_stdout.strip().split('\n')]
results['x509UserProxyFirstFQAN'] = fqan_lines[0]
results['x509UserProxyFQAN'] += "," + ",".join(fqan_lines)

return results


def set_classad_value_type(value):
if value.lower() == 'true':
return True
elif value.lower() == 'false':
return False
elif re.match(r'\d+\.\d+$', value):
return float(value)
elif re.match(r'\d+$', value):
return int(value)

return value


def setup_user_creds():
"""Return a dict of token/X.509 attributes that are necessary for remote submission to a schedd
"""
results = {}
try:
results['SciTokensFile'] = ce.bearer_token_path()
results['scitokens_file'] = ce.bearer_token_path()
except OSError:
pass

try:
proxy = ce.x509_user_proxy_path()
results['x509userproxy'] = proxy
results.update(job_proxy_info(proxy))
except OSError:
pass

Expand All @@ -200,24 +125,24 @@ def setup_user_creds():

def check_job_submit(job_info, schedd_ad, setup_creds=True):

job_ad = classad.ClassAd()
job_ad["Cmd"] = '/usr/bin/env'
job_ad["Args"] = ''
job_ad["TransferExecutable"] = False
job_ad['Out'] = job_info['stdout_file']
job_ad['Err'] = job_info['stderr_file']
job_ad['Log'] = job_info['log_file']
job_ad['LeaveJobInQueue'] = classad.ExprTree("( StageOutFinish > 0 ) =!= true")
sub = {
"universe": "vanilla",
"executable": "/usr/bin/env",
"transfer_executable": "false",
"output": job_info['stdout_file'],
"error": job_info['stderr_file'],
"log": job_info['log_file'],
"leave_in_queue": "( StageOutFinish > 0 ) =!= true",
}
for attr in job_info['attribute']:
key, value = attr.split('=', 1)
# Accept submit format '+AttributeName'
job_ad[key.lstrip('+').strip()] = set_classad_value_type(value.strip())
sub.update({key, value})

if setup_creds:
job_ad.update(setup_user_creds())
sub.update(setup_user_creds())

if G_DEBUG:
print(f"Job ad, pre-submit:\n{job_ad}")
print(f"Job submit description:\n{sub}")

try:
schedd = htcondor.Schedd(schedd_ad)
Expand All @@ -226,18 +151,20 @@ def check_job_submit(job_info, schedd_ad, setup_creds=True):
print(f"Submitting job to schedd {schedd_ad['MyAddress']}")
ad_results = []
try:
cluster = schedd.submit(job_ad, 1, True, ad_results)
result = schedd.submit(htcondor.Submit(sub), 1, True)
except RuntimeError as exc:
raise ce.CondorRunException("- Failed to submit job to %s due to the following error:\n%s" \
% (schedd_ad['Machine'], exc))

cluster = result.cluster()
print(f"- Successful submission; cluster ID {cluster}")

print(f"Resulting job ad: {ad_results[0]}")
if G_DEBUG:
print(f"Resulting job ad: {result.clusterad()}")

print(f"Spooling cluster {cluster} files to schedd {schedd_ad['MyAddress']}")
try:
schedd.spool(ad_results)
schedd.spool(result)
except RuntimeError as exc:
raise ce.CondorRunException(f"- Failed to spool files to {schedd_ad['Machine']} due to the following error:\n{exc}")

Expand Down Expand Up @@ -321,19 +248,17 @@ def main():

try:
coll = htcondor.Collector(collector_hostname)
coll_ad = coll.locate(htcondor.DaemonTypes.Collector)
except IOError:
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd, job_info['schedd_name'])
except Exception:
raise ce.CondorRunException("ERROR: Could not contact CE collector at '%s'. " % collector_hostname + \
"Verify that the Collector daemon is up with `condor_ce_status -any`.")

try:
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd, job_info['schedd_name'])
except ValueError:
raise ce.CondorRunException('ERROR: Could not find CE schedd at %s.\n' % job_info['schedd_name'] + \
if schedd_ad is None:
raise ce.CondorRunException('ERROR: Could not find CE schedd %s.\n' % job_info['schedd_name'] + \
'Verify that the Scheduler daemon is up with `condor_ce_status -any`.')

os.environ.setdefault('_condor_SEC_CLIENT_AUTHENTICATION_METHODS', 'SCITOKENS,SSL,FS')
check_authz(coll_ad, schedd_ad)
run_ping(collector_hostname, job_info['schedd_name'])
try:
job_info.update(ce.generate_job_files())
check_job_submit(job_info, schedd_ad, setup_creds=not opts.skip_scitokens)
Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ os.environ.setdefault('CONDOR_CONFIG', '/etc/condor-ce/condor_config')
import gunicorn.app.base
import gunicorn.glogging

import htcondor
import htcondor2 as htcondor
import htcondorce.web

ALIVE_HEARTBEAT = 60
Expand Down
2 changes: 1 addition & 1 deletion src/gratia_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

os.environ['CONDOR_CONFIG'] = '/etc/condor-ce/condor_config'

import htcondor
import htcondor2 as htcondor

GRATIA_DIR = htcondor.param['PER_JOB_HISTORY_DIR']
HISTORY_FILES = [os.path.join(GRATIA_DIR, x) for x in os.listdir(GRATIA_DIR)]
Expand Down
4 changes: 2 additions & 2 deletions src/htcondorce/info_query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import classad
import htcondor
import classad2 as classad
import htcondor2 as htcondor

import logging
_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion src/htcondorce/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def check_token_path(path, suffix=''):
# Punt on this one until we address HTCONDOR-634

try:
# 2. BEARER_TOKEN_PATH containing the path to the token
# 2. BEARER_TOKEN_FILE containing the path to the token
path = check_token_path(os.environ['BEARER_TOKEN_FILE'])
except (KeyError, FileNotFoundError):
try:
Expand Down
2 changes: 1 addition & 1 deletion src/htcondorce/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from jinja2 import Environment, FileSystemLoader, select_autoescape

import classad
import classad2 as classad
htcondor = None

import htcondorce.rrd
Expand Down
2 changes: 1 addition & 1 deletion src/htcondorce/web_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import socket
import traceback

import classad
import classad2 as classad
htcondor = None

def check_htcondor():
Expand Down
4 changes: 2 additions & 2 deletions src/verify_ce_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def find_malformed_entries(entries_config):

# Verify that the HTCondor Python bindings are in the PYTHONPATH
try:
import classad
import htcondor
import classad2 as classad
import htcondor2 as htcondor
except ImportError:
error("Could not load HTCondor Python bindings. "
+ "Please ensure that the 'htcondor' and 'classad' are in your PYTHONPATH")
Expand Down
Loading