Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get sandbox in schedd #8645

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
90570d7
test14
novicecpp Aug 9, 2024
03fa08c
test14 (client)
novicecpp Aug 9, 2024
1044f49
WIP: purge dry run
novicecpp Aug 9, 2024
f52f2f4
sandbox at schedd instead
novicecpp Aug 9, 2024
3f3c9a4
Revert "WIP: purge dry run"
novicecpp Aug 9, 2024
ea75f6b
fix logger
novicecpp Aug 9, 2024
3a2490e
move logger function inside getSandbox
novicecpp Aug 9, 2024
419366e
fix getSandBox function
novicecpp Aug 9, 2024
7df7f6e
make downloadFromS3 to be sure that file is not broken
novicecpp Aug 9, 2024
039bb0d
merge into single rest call for task info
novicecpp Aug 9, 2024
c830e31
more print and beautify
novicecpp Aug 9, 2024
3d7039d
unbuffer?
novicecpp Aug 9, 2024
4d5ebd0
some comment
novicecpp Aug 9, 2024
ea48412
revert downloadFromS3 back to master (/dev/null => /dev/null_tmp)
novicecpp Aug 9, 2024
d86d45b
make donwload sandbox robust at adjustsites instead
novicecpp Aug 9, 2024
7bc3d0f
disable submit --dryrun
novicecpp Aug 9, 2024
1d5a356
fix wording
novicecpp Aug 9, 2024
00d950f
fix wording2
novicecpp Aug 9, 2024
82756cc
heck if sandbox exist
novicecpp Aug 9, 2024
99c7496
debug
novicecpp Aug 9, 2024
8ca7082
S3HeadObeject
novicecpp Aug 9, 2024
47eb7b4
fix sandboxName
novicecpp Aug 9, 2024
93aaf61
test header range with wget
novicecpp Aug 9, 2024
b9ccf4f
rename
novicecpp Aug 9, 2024
1dd5824
break
novicecpp Aug 9, 2024
1f40a30
check s3 object
novicecpp Aug 9, 2024
963a904
try to run inside bash
novicecpp Aug 9, 2024
02fb98c
Revert "break"
novicecpp Aug 9, 2024
325d8b0
update comment
novicecpp Aug 9, 2024
34894d3
comment a bit
novicecpp Aug 9, 2024
6345a6e
can get presigned with head_object api
novicecpp Aug 9, 2024
57840ae
explicit transfer sandbox.tar.gz
novicecpp Aug 9, 2024
954b0d3
remove pdb
novicecpp Aug 9, 2024
e51ee37
sandbox
novicecpp Aug 12, 2024
990baab
Revert "explicit transfer sandbox.tar.gz"
novicecpp Aug 12, 2024
001727e
also upload inputFiles.tar.gz to s3
novicecpp Aug 14, 2024
73bd9ec
stop dryrun
novicecpp Aug 15, 2024
340178d
add comment
novicecpp Aug 15, 2024
1c8ad91
dryrun action (only set status to UPLOADED)
novicecpp Aug 15, 2024
42a4b91
fix import
novicecpp Aug 15, 2024
c7a22d9
enable dryrun bacck
novicecpp Aug 15, 2024
2d44321
fix logic
novicecpp Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cicd/gitlab/env/test14
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
KUBECONTEXT=cmsweb-test14
Environment=crab-dev-tw07
REST_Instance=test14
CRABClient_version=GH
2 changes: 1 addition & 1 deletion cicd/gitlab/parseEnv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
TAG="${1}"

# validate tag
REGEX_DEV_TAG='^pypi-(preprod|test2|test11|test12|test1)-.*'
REGEX_DEV_TAG='^pypi-(preprod|test2|test11|test12|test1|test14)-.*'
REGEX_RELEASE_TAG='^v3\.[0-9]{6}.*'
if [[ $TAG =~ $REGEX_DEV_TAG ]]; then # Do not quote regexp variable here
IFS='-' read -ra TMPSTR <<< "${TAG}"
Expand Down
114 changes: 90 additions & 24 deletions scripts/AdjustSites.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
""" This script is called by dag_bootstrap_startup.sh when the job is (re)submitted and:
"""
This script is called by dag_bootstrap_startup.sh when the job is (re)submitted.
It is not only do adjusting sites (blacklist/whitelist) but also:
- It downloads sandbox from S3 (if not exist).
- It creates the webdir if necessary
- It updates both the webdir ant the proxied version of it on the REST task db
- For resubmission: adjust the exit codes of the PJ in the RunJobs.dag.nodes.log files and
Expand All @@ -14,6 +17,7 @@
import time
import glob
import shutil
import logging
from urllib.parse import urlencode
import traceback
from datetime import datetime
Expand All @@ -23,15 +27,29 @@
import htcondor

from RESTInteractions import CRABRest
from ServerUtilities import getProxiedWebDir, getColumn

from ServerUtilities import getProxiedWebDir, getColumn, downloadFromS3

def printLog(msg):
""" Utility function to print the timestamp in the log. Can be replaced
with anything (e.g.: logging.info if we decided to set up a logger here)
"""
print("%s: %s" % (datetime.utcnow(), msg))

def setupStreamLogger():
"""
Setup logger object with stream handler. Needed by `downloadFromS3()`.

:returns: stream logger object
:rtype: logging.Logger
"""
logHandler = logging.StreamHandler()
logFormatter = logging.Formatter(
"%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s")
logHandler.setFormatter(logFormatter)
logger = logging.getLogger('AdjustSites') # hardcode
logger.addHandler(logHandler)
logger.setLevel(logging.DEBUG)
return logger

def adjustPostScriptExitStatus(resubmitJobIds, filename):
"""
Expand Down Expand Up @@ -362,28 +380,24 @@ def setupLog():
os.close(logfd)


def checkTaskInfo(crabserver, ad):
def checkTaskInfo(taskDict, ad):
"""
Function checks that given task is registered in the database with status SUBMITTED and with the
same clusterId and schedd name in the database as in the condor ads where it is currently running.
In case above condition is not met, script immediately terminates
Function checks that given task is registered in the database with status
SUBMITTED and with the same clusterId and schedd name in the database as in
the condor ads where it is currently running.
In case above condition is not met, script immediately terminates.

:param taskDict: task info return from REST.
:type taskDict: dict
:param ad: kv classad
:type ad: dict
belforte marked this conversation as resolved.
Show resolved Hide resolved
"""

task = ad['CRAB_ReqName']
clusterIdOnSchedd = ad['ClusterId']
data = {'subresource': 'search', 'workflow': task}

try:
dictresult, _, _ = crabserver.get(api='task', data=data)
except HTTPException as hte:
printLog(traceback.format_exc())
printLog(hte.headers)
printLog(hte.result)
sys.exit(2)

taskStatusOnDB = getColumn(dictresult, 'tm_task_status')
clusteridOnDB = getColumn(dictresult, 'clusterid')
scheddOnDB = getColumn(dictresult, 'tm_schedd')
taskStatusOnDB = getColumn(taskDict, 'tm_task_status')
clusteridOnDB = getColumn(taskDict, 'clusterid')
scheddOnDB = getColumn(taskDict, 'tm_schedd')

scheddName = os.environ['schedd_name']

Expand All @@ -395,6 +409,44 @@ def checkTaskInfo(crabserver, ad):
sys.exit(3)


def getSandbox(taskDict, crabserver):
"""
Getting user sandbox (sandbox.tar.gz) from S3. It will not redownload
sandbox if file exists.

This function contains side effect where sandbox.tar.gz(_tmp) are created in
current directory.

:param taskDict: task info return from REST.
:type taskDict: dict
:param crabserver: CRABRest object to talk with RESTCache API
:type crabserver: RESTInteractions.CRABRest
"""
sandboxTarBall = 'sandbox.tar.gz'
sandboxTarBallTmp = sandboxTarBall + '_tmp'
if os.path.exists(sandboxTarBall):
printLog('sandbox.tar.gz already exist. Do nothing.')
return

# init logger require by downloadFromS3
logger = setupStreamLogger()

# get info
username = getColumn(taskDict, 'tm_username')
sandboxName = getColumn(taskDict, 'tm_user_sandbox')

# download
try:
downloadFromS3(crabserver=crabserver, objecttype='sandbox', username=username,
tarballname=sandboxName, filepath=sandboxTarBallTmp, logger=logger)
shutil.move(sandboxTarBallTmp, sandboxTarBall)
except Exception as ex:
logger.exception("The CRAB server backend could not download the input sandbox with your code " + \
"from S3.\nThis could be a temporary glitch; please try to submit a new task later " + \
"(resubmit will not work) and contact the experts if the error persists.\nError reason: %s" % str(ex))
sys.exit(4)


def main():
"""
Need a doc string here.
Expand All @@ -409,8 +461,7 @@ def main():

with open(os.environ['_CONDOR_JOB_AD']) as fd:
ad = classad.parseOne(fd)
printLog("Parsed ad: %s" % ad)

printLog("Parsed ad: %s\n" % ad)

# instantiate a server object to talk with crabserver
host = ad['CRAB_RestHost']
Expand All @@ -419,8 +470,24 @@ def main():
crabserver = CRABRest(host, cert, cert, retry=3, userAgent='CRABSchedd')
crabserver.setDbInstance(dbInstance)

printLog("Sleeping 60 seconds to give TW time to update taskDB")
time.sleep(60) # give TW time to update taskDB #8411
checkTaskInfo(crabserver, ad)

# get task info
task = ad['CRAB_ReqName']
data = {'subresource': 'search', 'workflow': task}
try:
dictresult, _, _ = crabserver.get(api='task', data=data)
except HTTPException as hte:
printLog(traceback.format_exc())
printLog(hte.headers)
printLog(hte.result)
sys.exit(2)

# check task status
checkTaskInfo(taskDict=dictresult, ad=ad)
# get sandbox
getSandbox(taskDict=dictresult, crabserver=crabserver)

# is this the first time this script runs for this task ? (it runs at each resubmit as well !)
if not os.path.exists('WEB_DIR'):
Expand Down Expand Up @@ -503,4 +570,3 @@ def main():

if __name__ == '__main__':
main()

9 changes: 7 additions & 2 deletions scripts/dag_bootstrap_startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ cp $_CONDOR_JOB_AD ./_CONDOR_JOB_AD
if [ -e AdjustSites.py ]; then
export schedd_name=`condor_config_val schedd_name`
echo "Execute AdjustSites.py ..."
python3 AdjustSites.py
# need unbffered otherwise it will got weird looking log.
PYTHONUNBUFFERED=1 python3 AdjustSites.py
ret=$?
if [ $ret -eq 1 ]; then
echo "Error: AdjustSites.py failed to update the webdir." >&2
Expand All @@ -158,11 +159,15 @@ if [ -e AdjustSites.py ]; then
elif [ $ret -eq 2 ]; then
echo "Error: Cannot get data from REST Interface" >&2
condor_qedit $CONDOR_ID DagmanHoldReason "'Cannot get data from REST Interface.'"
exit 1
exit 1
elif [ $ret -eq 3 ]; then
echo "Error: this dagman does not match task information in TASKS DB" >&2
condor_qedit $CONDOR_ID DagmanHoldReason "'This dagman does not match task information in TASKS DB'"
exit 1
elif [ $ret -eq 4 ]; then
echo "Error: Failed to get user sandbox from S3." >&2
condor_qedit $CONDOR_ID DagmanHoldReason "'Failed to get user sandbox from S3.'"
exit 1
fi
else
echo "Error: AdjustSites.py does not exist." >&2
Expand Down
10 changes: 7 additions & 3 deletions src/python/CRABInterface/RESTCache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

# CRABServer dependecies here
from CRABInterface.RESTExtensions import authz_login_valid, authz_operator
from CRABInterface.Regexps import RX_SUBRES_CACHE, RX_CACHE_OBJECTTYPE, RX_TASKNAME, RX_USERNAME, RX_TARBALLNAME
from CRABInterface.Regexps import (RX_SUBRES_CACHE, RX_CACHE_OBJECTTYPE, RX_TASKNAME,
RX_USERNAME, RX_TARBALLNAME, RX_PRESIGNED_CLIENT_METHOD)
from ServerUtilities import getUsernameFromTaskname, MeasureTime


Expand Down Expand Up @@ -99,9 +100,10 @@ def validate(self, apiobj, method, api, param, safe):
validate_str('taskname', param, safe, RX_TASKNAME, optional=True)
validate_str('username', param, safe, RX_USERNAME, optional=True)
validate_str('tarballname', param, safe, RX_TARBALLNAME, optional=True)
validate_str('clientmethod', param, safe, RX_PRESIGNED_CLIENT_METHOD, optional=True)

@restcall
def get(self, subresource, objecttype, taskname, username, tarballname): # pylint: disable=redefined-builtin
def get(self, subresource, objecttype, taskname, username, tarballname, clientmethod): # pylint: disable=redefined-builtin
"""
:arg str subresource: the specific information to be accessed;
"""
Expand Down Expand Up @@ -165,14 +167,16 @@ def get(self, subresource, objecttype, taskname, username, tarballname): # pyli
authz_operator(username=ownerName, group='crab3', role='operator')
if subresource == 'sandbox' and not username:
raise MissingParameter("username is missing")
if not clientmethod:
clientmethod = 'get_object'
# returns a PreSignedUrl to download the file within the expiration time
expiration = 60 * 60 # 1 hour default is good for retries and debugging
if objecttype in ['debugfiles', 'clientlog', 'twlog']:
expiration = 60*60 * 24 * 30 # for logs make url valid as long as we keep files (1 month)
try:
with MeasureTime(self.logger, modulename=__name__, label="get.download.generate_presigned_post") as _:
response = self.s3_client.generate_presigned_url(
'get_object', Params={'Bucket': self.s3_bucket, 'Key': s3_objectKey},
clientmethod, Params={'Bucket': self.s3_bucket, 'Key': s3_objectKey},
ExpiresIn=expiration)
preSignedUrl = response
except ClientError as e:
Expand Down
1 change: 1 addition & 0 deletions src/python/CRABInterface/Regexps.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
#subresources of Cache resource
RX_SUBRES_CACHE = re.compile(r"^(upload|download|retrieve|list|used)$")
RX_CACHE_OBJECTTYPE = re.compile(r"^(clientlog|twlog|sandbox|debugfiles|runtimefiles)$")
RX_PRESIGNED_CLIENT_METHOD = re.compile(r"^(get_object|head_object)$")

#worker workflow
RX_WORKER_NAME = re.compile(r"^[A-Za-z0-9\-\._%]{1,100}$")
Expand Down
45 changes: 44 additions & 1 deletion src/python/ServerUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
'test6': {'restHost': 'cmsweb-test6.cern.ch', 'dbInstance': 'dev'},
'test11': {'restHost': 'cmsweb-test11.cern.ch', 'dbInstance': 'devtwo'},
'test12': {'restHost': 'cmsweb-test12.cern.ch', 'dbInstance': 'devthree'},
'test14': {'restHost': 'cmsweb-test14.cern.ch', 'dbInstance': 'tseethon'},
'stefanovm': {'restHost': 'stefanovm.cern.ch', 'dbInstance': 'dev'},
'stefanovm2': {'restHost': 'stefanovm2.cern.ch', 'dbInstance': 'dev'},
'other': {'restHost': None, 'dbInstance': None},
Expand Down Expand Up @@ -750,6 +751,45 @@ def downloadFromS3(crabserver=None, filepath=None, objecttype=None, taskname=Non
tarballname=tarballname, logger=logger)
downloadFromS3ViaPSU(filepath=filepath, preSignedUrl=preSignedUrl, logger=logger)

def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=None,
logger=None):
"""
Check if file exist in S3. Raise exception if wget is exit with non-zero.
Usually, you will see stderr with http response `404 Not Found` if file does not exists.
Note that presigned url from GetObject API could not use by HeadObject API.
Use 'head -c1000' to fetch few bytes instead, and need to wrap inside bash
belforte marked this conversation as resolved.
Show resolved Hide resolved
with `set -o pipefail` to make it early exit, so exit code from wget can
propagate back to Popen properly.

:param crabserver: CRABRest object, points to CRAB Server to use
:type crabserver: RESTInteractions.CRABRest
:param objecttype: the kind of object to dowbload: clientlog|twlog|sandbox|debugfiles|runtimefiles
:type objecttype: str
:param username: the username this sandbox belongs to, in case objecttype=sandbox
:type username: str
:param tarballname: for sandbox, taskname is not used but tarballname is needed
:type tarballname: str

:return: None, but raise exception if wget is exit with non-zero.
"""
preSignedUrl = getDownloadUrlFromS3(crabserver=crabserver, objecttype=objecttype,
username=username, tarballname=tarballname,
clientmethod='head_object',
logger=logger)
downloadCommand = ''
if os.getenv('CRAB_useGoCurl'):
raise NotImplementedError('HEAD with gocurl is not implemented')
downloadCommand += ' wget -Sq -O /dev/null --method=HEAD'
downloadCommand += ' "%s"' % preSignedUrl

with subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as downloadProcess:
logger.debug("Will execute:\n%s", downloadCommand)
stdout, stderr = downloadProcess.communicate()
exitcode = downloadProcess.returncode
logger.debug('exitcode: %s\nstdout: %s', exitcode, stdout)

if exitcode != 0:
raise Exception('Download command %s failed. stderr is:\n%s' % (downloadCommand, stderr))

def retrieveFromS3(crabserver=None, objecttype=None, taskname=None,
username=None, tarballname=None, logger=None):
Expand Down Expand Up @@ -827,7 +867,8 @@ def uploadToS3(crabserver=None, filepath=None, objecttype=None, taskname=None,


def getDownloadUrlFromS3(crabserver=None, objecttype=None, taskname=None,
username=None, tarballname=None, logger=None):
username=None, tarballname=None, clientmethod=None,
logger=None):
"""
obtains a PreSigned URL to access an existing object in S3
:param crabserver: a RESTInteraction/CRABRest object : points to CRAB Server to use
Expand All @@ -845,6 +886,8 @@ def getDownloadUrlFromS3(crabserver=None, objecttype=None, taskname=None,
dataDict['username'] = username
if tarballname:
dataDict['tarballname'] = tarballname
if clientmethod:
dataDict['clientmethod'] = clientmethod
data = encodeRequest(dataDict)
try:
# calls to restServer alway return a 3-ple ({'result':a-list}, HTTPcode, HTTPreason)
Expand Down
Loading