Skip to content

Commit

Permalink
parent 23707a1 (#6818)
Browse files Browse the repository at this point in the history
Initial changes for python3. Make it possible to run with python3 on sched.
  • Loading branch information
belforte authored Oct 26, 2021
1 parent 23707a1 commit 16a275f
Show file tree
Hide file tree
Showing 30 changed files with 182 additions and 161 deletions.
41 changes: 21 additions & 20 deletions scripts/AdjustSites.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import time
import glob
import shutil
import urllib
from urllib.parse import urlencode
import traceback
from datetime import datetime
from httplib import HTTPException
from http.client import HTTPException

import classad
import htcondor
Expand Down Expand Up @@ -238,23 +238,6 @@ def makeWebDir(ad):
os.symlink(os.path.abspath(os.path.join(".", ".job.ad")), os.path.join(path, "job_ad.txt"))
os.symlink(os.path.abspath(os.path.join(".", "task_process/status_cache.txt")), os.path.join(path, "status_cache"))
os.symlink(os.path.abspath(os.path.join(".", "task_process/status_cache.pkl")), os.path.join(path, "status_cache.pkl"))
# prepare a startup cache_info file with time info for client to have something useful to print
# in crab status while waiting for task_process to fill with actual jobs info. Do it in two ways
# new way: a pickle file for python3 compatibility
startInfo = {'bootstrapTime': {}}
startInfo['bootstrapTime']['date'] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
startInfo['bootstrapTime']['fromEpoch'] = int(time.time())
with open(os.path.abspath(os.path.join(".", "task_process/status_cache.pkl")), 'w') as fp:
pickle.dump(startInfo, fp)
# old way: a file with multiple lines and print-like output
startInfo = "# Task bootstrapped at " + datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") + "\n"
startInfo += "%d\n" % (int(time.time())) # machines will like seconds from Epoch more
# prepare fake status_cache info to please current (v3.210127) CRAB Client
fakeInfo = startInfo + "{"
fakeInfo += "'DagStatus': {'SubDagStatus': {}, 'Timestamp': 0L, 'NodesTotal': 1L, 'SubDags': {}, 'DagStatus': 1L}"
fakeInfo += "}\n{}\n"
with open(os.path.abspath(os.path.join(".", "task_process/status_cache.txt")), 'w') as fd:
fd.write(fakeInfo)
os.symlink(os.path.abspath(os.path.join(".", "prejob_logs/predag.0.txt")), os.path.join(path, "AutomaticSplitting_Log0.txt"))
os.symlink(os.path.abspath(os.path.join(".", "prejob_logs/predag.0.txt")), os.path.join(path, "AutomaticSplitting/DagLog0.txt"))
os.symlink(os.path.abspath(os.path.join(".", "prejob_logs/predag.1.txt")), os.path.join(path, "AutomaticSplitting/DagLog1.txt"))
Expand All @@ -266,6 +249,24 @@ def makeWebDir(ad):
except Exception as ex: #pylint: disable=broad-except
#Should we just catch OSError and IOError? Is that enough?
printLog("Failed to copy/symlink files in the user web directory: %s" % str(ex))

# prepare a startup cache_info file with time info for client to have something useful to print
# in crab status while waiting for task_process to fill with actual jobs info. Do it in two ways
# new way: a pickle file for python3 compatibility
startInfo = {'bootstrapTime': {}}
startInfo['bootstrapTime']['date'] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
startInfo['bootstrapTime']['fromEpoch'] = int(time.time())
with open(os.path.abspath(os.path.join(".", "task_process/status_cache.pkl")), 'wb') as fp:
pickle.dump(startInfo, fp)
# old way: a file with multiple lines and print-like output
startInfo = "# Task bootstrapped at " + datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") + "\n"
startInfo += "%d\n" % (int(time.time())) # machines will like seconds from Epoch more
# prepare fake status_cache info to please current (v3.210127) CRAB Client
fakeInfo = startInfo + "{"
fakeInfo += "'DagStatus': {'SubDagStatus': {}, 'Timestamp': 0L, 'NodesTotal': 1L, 'SubDags': {}, 'DagStatus': 1L}"
fakeInfo += "}\n{}\n"
with open(os.path.abspath(os.path.join(".", "task_process/status_cache.txt")), 'w') as fd:
fd.write(fakeInfo)
printLog("WEB_DIR created, sym links in place and status_cache initialized")

try:
Expand All @@ -287,7 +288,7 @@ def uploadWebDir(crabserver, ad):

try:
printLog("Uploading webdir %s to the REST" % data['webdirurl'])
crabserver.post(api='task', data=urllib.urlencode(data))
crabserver.post(api='task', data=urlencode(data))
return 0
except HTTPException as hte:
printLog(traceback.format_exc())
Expand Down
2 changes: 1 addition & 1 deletion scripts/CMSRunAnalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ def StripReport(report):
try:
oldName = 'UNKNOWN'
newName = 'UNKNOWN'
for oldName, newName in literal_eval(options.outFiles).iteritems():
for oldName, newName in literal_eval(options.outFiles).items():
os.rename(oldName, newName)
except Exception as ex:
handleException("FAILED", EC_MoveOutErr, "Exception while moving file %s to %s." %(oldName, newName))
Expand Down
20 changes: 8 additions & 12 deletions scripts/dag_bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,28 @@ set -o pipefail
set -x
echo "Beginning dag_bootstrap.sh (stdout)"
echo "Beginning dag_bootstrap.sh (stderr)" 1>&2
export PYTHONPATH=$PYTHONPATH:/cvmfs/cms.cern.ch/rucio/current/lib/python2.7/site-packages
export PYTHONPATH=$PYTHONPATH:/cvmfs/cms.cern.ch/rucio/x86_64/slc7/py3/current/lib/python3.6/site-packages/
export PYTHONPATH=$PYTHONPATH:/data/srv/pycurl3/7.44.1
if [ "X$TASKWORKER_ENV" = "X" -a ! -e CRAB3.zip ]
then

command -v python > /dev/null
command -v python3 > /dev/null
rc=$?
if [[ $rc != 0 ]]
then
echo "Error: Python isn't available on `hostname`." >&2
echo "Error: bootstrap execution requires python" >&2
echo "Error: Python3 isn't available on `hostname`." >&2
echo "Error: bootstrap execution requires python3" >&2
exit 1
else
echo "I found python at.."
echo `which python`
echo "I found python3 at.."
echo `which python3`
fi

if [ "x$CRAB3_VERSION" = "x" ]; then
TARBALL_NAME=TaskManagerRun.tar.gz
else
TARBALL_NAME=TaskManagerRun-$CRAB3_VERSION.tar.gz
fi

if [[ "X$CRAB_TASKMANAGER_TARBALL" == "X" ]]; then
CRAB_TASKMANAGER_TARBALL="http://hcc-briantest.unl.edu/$TARBALL_NAME"
fi

if [[ "X$CRAB_TASKMANAGER_TARBALL" != "Xlocal" ]]; then
# pass, we'll just use that value
Expand Down Expand Up @@ -96,7 +93,6 @@ export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH

os_ver=$(source /etc/os-release;echo $VERSION_ID)
curl_path="/cvmfs/cms.cern.ch/slc${os_ver}_amd64_gcc700/external/curl/7.59.0"
libcurl_path="${curl_path}/lib"
source ${curl_path}/etc/profile.d/init.sh

export PYTHONUNBUFFERED=1
Expand All @@ -111,5 +107,5 @@ if [ "X$_CONDOR_JOB_AD" != "X" ]; then
cat $_CONDOR_JOB_AD
fi
echo "Now running the job in `pwd`..."
exec nice -n 19 python -m TaskWorker.TaskManagerBootstrap "$@"
exec nice -n 19 python3 -m TaskWorker.TaskManagerBootstrap "$@"
} 2>&1 | tee dag_bootstrap.out
23 changes: 13 additions & 10 deletions scripts/dag_bootstrap_startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ done
export PATH="/usr/local/bin:/bin:/usr/bin:/usr/bin:$PATH"
export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH

os_ver=$(source /etc/os-release;echo $VERSION_ID)
curl_path="/cvmfs/cms.cern.ch/slc${os_ver}_amd64_gcc700/external/curl/7.59.0"
libcurl_path="${curl_path}/lib"
source ${curl_path}/etc/profile.d/init.sh
export PYTHONPATH=$PYTHONPATH:/data/srv/pycurl3/7.44.1
#os_ver=$(source /etc/os-release;echo $VERSION_ID)
#curl_path="/cvmfs/cms.cern.ch/slc${os_ver}_amd64_gcc700/external/curl/7.59.0"
#libcurl_path="${curl_path}/lib"
#source ${curl_path}/etc/profile.d/init.sh
source /cvmfs/cms.cern.ch/slc7_amd64_gcc900/external/curl/7.59.0/etc/profile.d/init.sh


srcname=$0
env > ${srcname%.sh}.env
Expand Down Expand Up @@ -85,16 +88,16 @@ fi
# Bootstrap the runtime - we want to do this before DAG is submitted
# so all the children don't try this at once.
if [ "X$TASKWORKER_ENV" = "X" -a ! -e CRAB3.zip ]; then
command -v python > /dev/null
command -v python3 > /dev/null
rc=$?
if [[ $rc != 0 ]]; then
echo "Error: Python isn't available on `hostname`." >&2
echo "Error: Bootstrap execution requires python" >&2
condor_qedit $CONDOR_ID DagmanHoldReason "'Error: Bootstrap execution requires python.'"
echo "Error: Bootstrap execution requires python3" >&2
condor_qedit $CONDOR_ID DagmanHoldReason "'Error: Bootstrap execution requires python3.'"
exit 1
else
echo "I found python at.."
echo `which python`
echo "I found python3 at.."
echo `which python3`
fi

if [[ "X$CRAB_TASKMANAGER_TARBALL" == "X" ]]; then
Expand Down Expand Up @@ -138,7 +141,7 @@ cp $_CONDOR_JOB_AD ./_CONDOR_JOB_AD
if [ -e AdjustSites.py ]; then
export schedd_name=`condor_config_val schedd_name`
echo "Execute AdjustSites.py ..."
python AdjustSites.py
python3 AdjustSites.py
ret=$?
if [ $ret -eq 1 ]; then
echo "Error: AdjustSites.py failed to update the webdir." >&2
Expand Down
2 changes: 1 addition & 1 deletion scripts/task_process/FTS_Transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import os
import subprocess
from datetime import timedelta
from httplib import HTTPException
from http.client import HTTPException

import fts3.rest.client.easy as fts3

Expand Down
2 changes: 1 addition & 1 deletion scripts/task_process/cache_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def parseJobLog(fp, nodes, nodeMap):

def parseErrorReport(data, nodes):
#iterate over the jobs and set the error dict for those which are failed
for jobid, statedict in nodes.iteritems():
for jobid, statedict in nodes.items():
if 'State' in statedict and statedict['State'] == 'failed' and jobid in data:
# data[jobid] is a dictionary with the retry number as a key and error summary information as a value.
# Here we want to get the error summary information, and since values() returns a list
Expand Down
46 changes: 28 additions & 18 deletions scripts/task_process/cache_status_jel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import copy
from shutil import move
import pickle
import json
import htcondor
import classad

Expand Down Expand Up @@ -211,16 +212,18 @@ def parseJobLog(jel, nodes, nodeMap):
def parseErrorReport(data, nodes):
"""
iterate over the jobs and set the error dict for those which are failed
:param data:
:param nodes:
:return:
:param data: a dictionary as returned by summarizeFjrParseResults() : {jobid:errdict}
errdict is {crab_retry:error_summary} from PostJob/prepareErrorSummary
which writes one line for PostJoun run: {job_id : {crab_retry : error_summary}}
where crab_retry is a string and error_summary a list [exitcode, errorMsg, {}]
:param nodes: a dictionary with format {jobid:statedict}
:return: nothing, modifies nodes in place
"""
for jobid, statedict in nodes.iteritems():
for jobid, statedict in nodes.items():
if 'State' in statedict and statedict['State'] == 'failed' and jobid in data:
# data[jobid] is a dictionary with the retry number as a key and error summary information as a value.
# Here we want to get the error summary information, and since values() returns a list
# (even if there's only a single value) it has to be indexed to zero.
statedict['Error'] = data[jobid].values()[0] #data[jobid] contains all retries. take the last one
# pick error info from last retry (SB: AFAICT only last retry is listed anyhow)
for key in data[jobid]:
statedict['Error'] = data[jobid][key]

def parseNodeStateV2(fp, nodes, level):
"""
Expand Down Expand Up @@ -323,7 +326,7 @@ def storeNodesInfoInFile():

if jobLogCheckpoint:
# resume log parsing where we left
with open((LOG_PARSING_POINTERS_DIR+jobLogCheckpoint), 'r') as f:
with open((LOG_PARSING_POINTERS_DIR+jobLogCheckpoint), 'rb') as f:
jel = pickle.load(f)
else:
# parse log from beginning
Expand All @@ -336,7 +339,7 @@ def storeNodesInfoInFile():
newJelPickleName = 'jel-%d.pkl' % int(time.time())
if not os.path.exists(LOG_PARSING_POINTERS_DIR):
os.mkdir(LOG_PARSING_POINTERS_DIR)
with open((LOG_PARSING_POINTERS_DIR+newJelPickleName), 'w') as f:
with open((LOG_PARSING_POINTERS_DIR+newJelPickleName), 'wb') as f:
pickle.dump(jel, f)
newJobLogCheckpoint = newJelPickleName

Expand Down Expand Up @@ -382,7 +385,7 @@ def readOldStatusCacheFile():
if os.path.exists(PKL_STATUS_CACHE_FILE) and os.stat(PKL_STATUS_CACHE_FILE).st_size > 0:
logging.debug("cache file found, opening")
try:
with open(PKL_STATUS_CACHE_FILE, "r") as fp:
with open(PKL_STATUS_CACHE_FILE, "rb") as fp:
cacheDoc = pickle.load(fp)
# protect against fake file with just bootstrapTime created by AdjustSites.py
jobLogCheckpoint = getattr(cacheDoc, 'jobLogCheckpoint', None)
Expand Down Expand Up @@ -421,7 +424,7 @@ def parseCondorLog(cacheDoc):
nodeMap = cacheDoc['nodeMap']
if jobLogCheckpoint:
# resume log parsing where we left
with open((LOG_PARSING_POINTERS_DIR+jobLogCheckpoint), 'r') as f:
with open((LOG_PARSING_POINTERS_DIR+jobLogCheckpoint), 'rb') as f:
jel = pickle.load(f)
else:
# parse log from beginning
Expand All @@ -432,7 +435,7 @@ def parseCondorLog(cacheDoc):
newJelPickleName = 'jel-%d.pkl' % int(time.time())
if not os.path.exists(LOG_PARSING_POINTERS_DIR):
os.mkdir(LOG_PARSING_POINTERS_DIR)
with open((LOG_PARSING_POINTERS_DIR+newJelPickleName), 'w') as f:
with open((LOG_PARSING_POINTERS_DIR+newJelPickleName), 'wb') as f:
pickle.dump(jel, f)
newJobLogCheckpoint = newJelPickleName

Expand Down Expand Up @@ -466,7 +469,7 @@ def storeNodesInfoInPklFile(cacheDoc):
tempFilename = (PKL_STATUS_CACHE_FILE + ".%s") % os.getpid()

# persist cache info in py2-compatible pickle format
with open(tempFilename, "w") as fp:
with open(tempFilename, "wb") as fp:
pickle.dump(cacheDoc, fp, protocol=2)
move(tempFilename, PKL_STATUS_CACHE_FILE)

Expand Down Expand Up @@ -501,19 +504,26 @@ def summarizeFjrParseResults(checkpoint):
Return the updated error dictionary and also the location until which the
fjr_parse_results file was read so that we can store it and
don't have t re-read the same information next time the cache_status.py runs.
SB: what this does is to convert a JSON file with a list of dictionaries [{job:msg},...] which
may have the same jobId as key, into a single dictionay with for each jobId key contains only the
last value of msg
for d in content:
for k,v in d.items():
errDict[k] = v
'''

if os.path.exists(FJR_PARSE_RES_FILE):
with open(FJR_PARSE_RES_FILE, "r") as f:
f.seek(checkpoint)
content = f.readlines()
newCheckpoint = f.tell()

errDict = {}
for line in content:
fjrResult = ast.literal_eval(line)
jobId = fjrResult.keys()[0]
errDict[jobId] = fjrResult[jobId]
fjrResult = json.loads(line)
for jobId,msg in fjrResult.items():
errDict[jobId] = msg
return errDict, newCheckpoint
else:
return None, 0
Expand Down
6 changes: 4 additions & 2 deletions scripts/task_process/task_proc_wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ function cache_status {

function cache_status_jel {
log "Running cache_status_jel.py"
python task_process/cache_status_jel.py
python3 task_process/cache_status_jel.py
}

function manage_transfers {
Expand Down Expand Up @@ -101,9 +101,11 @@ TIME_OF_LAST_QUERY=$(date +"%s")
# submission is most likely pointless and relatively expensive, the script will run normally and perform the query later.
DAG_INFO="init"

export PYTHONPATH=$PYTHONPATH:/data/srv/pycurl3/7.44.1

export PYTHONPATH=`pwd`/task_process:`pwd`/CRAB3.zip:`pwd`/WMCore.zip:$PYTHONPATH

export PYTHONPATH=$PYTHONPATH:/cvmfs/cms.cern.ch/rucio/current/lib/python2.7/site-packages
export PYTHONPATH=$PYTHONPATH:/cvmfs/cms.cern.ch/rucio/x86_64/slc7/py3/current/lib/python3.6/site-packages/

log "Starting task daemon wrapper"
while true
Expand Down
2 changes: 1 addition & 1 deletion src/python/CRABInterface/Attrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def attr(*args, **kwargs):
def wrap_ob(ob):
for name in args:
setattr(ob, name, True)
for name, value in kwargs.iteritems():
for name, value in kwargs.items():
setattr(ob, name, value)
return ob
return wrap_ob
2 changes: 1 addition & 1 deletion src/python/CRABInterface/DataFileMetadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def changeState(self, **kwargs): #kwargs are (taskname, outlfn, filestate)
"""
self.logger.debug("Changing state of file %(outlfn)s in task %(taskname)s to %(filestate)s" % kwargs)

self.api.modify(self.FileMetaData.ChangeFileState_sql, **dict((k, [v]) for k, v in kwargs.iteritems()))
self.api.modify(self.FileMetaData.ChangeFileState_sql, **dict((k, [v]) for k, v in kwargs.items()))

def delete(self, taskname, hours):
""" UNUSED method that deletes record from the FILEMETADATA table
Expand Down
4 changes: 2 additions & 2 deletions src/python/CRABInterface/HTCondorDataWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def parseASOState(self, fp, nodes, statusResult):
"""
transfers = {}
data = json.load(fp)
for docid, result in data['results'].iteritems():
for docid, result in data['results'].items():
#Oracle has an improved structure in aso_status
if isCouchDBURL(self.asoDBURL):
result = result['value']
Expand Down Expand Up @@ -618,7 +618,7 @@ def last(joberrors):
fp.seek(0)
data = json.load(fp)
#iterate over the jobs and set the error dict for those which are failed
for jobid, statedict in nodes.iteritems():
for jobid, statedict in nodes.items():
if 'State' in statedict and statedict['State'] == 'failed' and jobid in data:
statedict['Error'] = last(data[jobid]) #data[jobid] contains all retries. take the last one

Expand Down
2 changes: 1 addition & 1 deletion src/python/HTCondorUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, outputMessage, outputObj):
self.outputMessage = outputMessage
self.outputObj = outputObj
self.environmentStr = ""
for key, val in os.environ.iteritems():
for key, val in os.environ.items():
self.environmentStr += "%s=%s\n" % (key, val)


Expand Down
Loading

0 comments on commit 16a275f

Please sign in to comment.