Skip to content

Commit

Permalink
abort dag_bootstrap if task runs on different schedd (fix #6151)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddaina committed Jan 6, 2021
1 parent 0fd1a99 commit a66ecd8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
40 changes: 39 additions & 1 deletion scripts/AdjustSites.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from httplib import HTTPException

from RESTInteractions import HTTPRequests
from ServerUtilities import getProxiedWebDir
from ServerUtilities import getProxiedWebDir, getColumn


def printLog(msg):
Expand Down Expand Up @@ -343,6 +343,43 @@ def setupLog():
os.close(logfd)


def checkTaskInfo(ad):
"""
Function checks that given task is registered in the database with status SUBMITTED and with the
same clusterId and schedd name in the database as in the condor ads where it is currently running.
In case above condition is not met, script immediately terminates
"""

task = ad['CRAB_ReqName']
host = ad['CRAB_RestHost']
uri = ad['CRAB_RestURInoAPI'] + '/task'
cert = ad['X509UserProxy']
clusterIdOnSchedd = ad['ClusterId']
data = {'subresource': 'search', 'workflow': task}

try:
server = HTTPRequests(host, cert, cert, retry=3)
dictresult, _, _ = server.get(uri, data=data)
except HTTPException as hte:
printLog(traceback.format_exc())
printLog(hte.headers)
printLog(hte.result)
sys.exit(2)

taskStatusOnDB = getColumn(dictresult, 'tm_task_status')
clusteridOnDB = getColumn(dictresult, 'clusterid')
scheddOnDB = getColumn(dictresult, 'tm_schedd')

scheddName = os.environ['schedd_name']

printLog("Task status on DB: %s, clusterID on DB: %s, scheddy name on DB: %s; \nclusterID on condor ads: %s, scheddy name on condor ads: %s "
% (taskStatusOnDB, clusteridOnDB, scheddOnDB, clusterIdOnSchedd, scheddName))

if not (taskStatusOnDB == 'SUBMITTED' and scheddOnDB == scheddName and clusteridOnDB == str(clusterIdOnSchedd)):
printLog('Exiting AdjustSites because task already runs or is not submitted.')
sys.exit(3)


def main():
"""
Need a doc string here.
Expand All @@ -359,6 +396,7 @@ def main():
ad = classad.parseOne(fd)
printLog("Parsed ad: %s" % ad)

checkTaskInfo(ad)
makeWebDir(ad)

printLog("Webdir has been set up. Uploading the webdir URL to the REST")
Expand Down
9 changes: 9 additions & 0 deletions scripts/dag_bootstrap_startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,22 @@ fi

# Recalculate the black / whitelist
if [ -e AdjustSites.py ]; then
export schedd_name=`condor_config_val schedd_name`
echo "Execute AdjustSites.py ..."
python AdjustSites.py
ret=$?
if [ $ret -eq 1 ]; then
echo "Error: AdjustSites.py failed to update the webdir." >&2
condor_qedit $CONDOR_ID DagmanHoldReason "'AdjustSites.py failed to update the webdir.'"
exit 1
elif [ $ret -eq 2 ]; then
echo "Error: Cannot get data from REST Interface" >&2
condor_qedit $CONDOR_ID DagmanHoldReason "'Cannot get data from REST Interface.'"
exit 1
elif [ $ret -eq 3 ]; then
echo "Error: task already runs or is not submitted yet." >&2
condor_qedit $CONDOR_ID DagmanHoldReason "'Task already runs or is not submitted yet.'"
exit 1
fi
else
echo "Error: AdjustSites.py does not exist." >&2
Expand Down

0 comments on commit a66ecd8

Please sign in to comment.