Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn on more CWL tests #3628

Closed
wants to merge 86 commits into from
Closed
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
d74e71b
Turn on 3 more CWL tests I can't get to reproducibly fail
adamnovak May 20, 2021
9fe5dee
Merge remote-tracking branch 'upstream/master' into issues/3538-turn-…
adamnovak May 20, 2021
7285373
Make promises say when they are being made and fulfilled
adamnovak May 21, 2021
e273be9
Fix promise ID reporting and add job generation reporting so we can c…
adamnovak May 21, 2021
00d2d75
Fix multiline f-string
adamnovak May 21, 2021
1041d02
Stop thinking the 0th local job also needs to run on Kubernetes
adamnovak May 21, 2021
f37ac53
Fix log levels
adamnovak May 21, 2021
ffec068
Merge remote-tracking branch 'upstream/master' into issues/3538-turn-…
adamnovak May 26, 2021
e89d056
Catch in advance if the worker is trying to use a file we never imported
adamnovak May 26, 2021
5f3d7b0
Get the workflow to run through by processing the embedded tool's tool
adamnovak May 26, 2021
9300d65
Move all the imports back to the initial setup where they belong
adamnovak May 26, 2021
9ede711
Quiet debugging
adamnovak May 26, 2021
53b2587
Drop extra whitespace
adamnovak May 26, 2021
671ee57
Make error for an escaped import more decisive
adamnovak May 26, 2021
14756e0
Apply the longer conformance test timeout
adamnovak May 26, 2021
bc9d677
Catch the new Kubernetes missing config exception in decorator
adamnovak Jun 2, 2021
0be75a0
Say we have handled all CWL Process objects
adamnovak Jun 2, 2021
c04ad88
Simplify dispatch
adamnovak Jun 2, 2021
6606f8b
Recursively list directories in the input and in tools when setting u…
adamnovak Jun 2, 2021
3a1235a
Make sure to map everything in the listing and the shadow listing exa…
adamnovak Jun 2, 2021
357aad1
Add a bunch of code to try and rewrite listings to stop CWL from look…
adamnovak Jun 2, 2021
73e972d
Come up with a method to send directory contents around that at least…
adamnovak Jun 2, 2021
212aa60
Drop whitespace
adamnovak Jun 2, 2021
126d9c1
Widen typing to better match what's actually passed
adamnovak Jun 7, 2021
32c61f4
Remove some no longer useful debug prints
adamnovak Jun 7, 2021
143d712
Just use the base Process class
adamnovak Jun 7, 2021
75904fd
Don't clear listings at the ends of jobs
adamnovak Jun 8, 2021
f3a148d
Make the ToilPathMapper make directories
adamnovak Jun 8, 2021
b6eb6c1
Document what visit() is supposed to do
adamnovak Jun 8, 2021
170ccfc
Don't stage children when we stage parents, and go back to clearing l…
adamnovak Jun 8, 2021
536fd3a
Steal cwltool's work of making the _: directories
adamnovak Jun 9, 2021
856a67d
Make stage_listing actually work
adamnovak Jun 9, 2021
beb9753
Remind the output file stager that it doesn't actually have the file …
adamnovak Jun 9, 2021
216be15
Fix CWL test 87 by making sure Directory listings are in final output
adamnovak Jun 9, 2021
a1fd2bd
Reorder job execution so we can rebuild the listing earlier
adamnovak Jun 9, 2021
d0102e5
Revert "Reorder job execution so we can rebuild the listing earlier"
adamnovak Jun 9, 2021
b8f1c00
Use a single complex pass everywhere to encode directory contents wit…
adamnovak Jun 9, 2021
9c68963
Make sure listings' File and Directory objects get visited if they ex…
adamnovak Jun 9, 2021
c0dd699
Remove whitespace
adamnovak Jun 9, 2021
a94e8de
Hackily propagate CWL unsupported feature detection back to cwltoil
adamnovak Jun 10, 2021
805352c
fail_exit_code → failure_exit_code
mr-c Jun 10, 2021
30bbaf6
Merge remote-tracking branch 'upstream/master' into issues/3538-turn-…
adamnovak Jun 15, 2021
9e71644
Skip missing optional secondary files in workflow input
adamnovak Jun 15, 2021
63e2cc9
Accept optional secondary files in tool definitions
adamnovak Jun 15, 2021
8a1cd8f
Merge branch 'master' into issues/3538-turn-on-tests
adamnovak Jun 16, 2021
21d1728
Allow bypassign the file store for in place update support
adamnovak Jun 16, 2021
3c21664
Reenable file staging postprocessing even when not using the FileStor…
adamnovak Jun 16, 2021
fe4b221
Test everything with file store bypass
adamnovak Jun 16, 2021
9bbd57e
Stage output files into place from per-job output temp directories
adamnovak Jun 17, 2021
23140ba
bump cwltool version
mr-c Jun 17, 2021
215b919
Filter secondary files always but tolerate file:
adamnovak Jun 18, 2021
ca7b35e
Try adding a toil imported flag for hiding illegitimate secondary files
adamnovak Jun 18, 2021
f173ae4
Stop running all jobs as top level
adamnovak Jun 18, 2021
29e3c1b
Stop tagging files as imported
adamnovak Jun 18, 2021
f740116
Merge branch 'master' into issues/3651-allow-filestore-bypass
adamnovak Jun 18, 2021
fc47153
Document --bypass-file-store
adamnovak Jun 18, 2021
af5cd74
Merge branch 'master' into issues/3651-allow-filestore-bypass
adamnovak Jun 23, 2021
0339197
Merge branch 'master' into issues/3651-allow-filestore-bypass
mr-c Jun 23, 2021
937f461
Merge remote-tracking branch 'upstream/master' into issues/3651-allow…
adamnovak Jun 24, 2021
9155270
Use the right name for the function
adamnovak Jun 24, 2021
4dec732
Fix check sense and assert variable names
adamnovak Jun 24, 2021
f6ab2b3
Give a better message when using unsupported features.
adamnovak Jun 25, 2021
6013aa9
Merge remote-tracking branch 'upstream/master' into issues/3651-allow…
adamnovak Jun 25, 2021
6313d2d
Spell type constraint
adamnovak Jun 25, 2021
37711e6
Use toilfile: instead of toilfs:
adamnovak Jun 25, 2021
5cd5282
Call job generations versions instead
adamnovak Jun 25, 2021
6af2d1c
Improve comments
adamnovak Jun 25, 2021
1feae24
Use the One True path_to_loc
adamnovak Jun 25, 2021
21ebb0f
Use more succinct human readable name finding
adamnovak Jun 25, 2021
25918b2
Link to CWL issue about the listing specs
adamnovak Jun 25, 2021
11ab044
Fix comment I stopped writing because it was wrong
adamnovak Jun 25, 2021
052cdd3
Drop TODO that doesn't seem to break any conformance tests
adamnovak Jun 25, 2021
2f867c0
Implement toildir: for output staging
adamnovak Jun 25, 2021
513797b
Note when we think later on is
adamnovak Jun 25, 2021
cf8ba34
Break out CWL utilities and add tests for them
adamnovak Jun 26, 2021
ff2967c
Get new unit tests to pass
adamnovak Jun 26, 2021
92834e3
Revert "Use the One True path_to_loc"
adamnovak Jun 26, 2021
00e70c1
Merge remote-tracking branch 'upstream/issues/3538-turn-on-tests' int…
adamnovak Jun 26, 2021
1bc8f7c
Fix lingering toilfs scheme name
adamnovak Jul 14, 2021
00f87a4
Merge remote-tracking branch 'upstream/master' into issues/3651-allow…
adamnovak Jul 14, 2021
e15bb24
Add download_structure test
adamnovak Jul 14, 2021
1e0eeb9
Fix typo
adamnovak Jul 14, 2021
0c6e493
Pass MyPy type checking
adamnovak Jul 14, 2021
f933c49
Drop trailing whitespace
adamnovak Jul 14, 2021
a8c4e35
Import moved function
adamnovak Jul 15, 2021
2cd58c4
Merge branch 'master' into issues/3538-turn-on-tests
mr-c Jul 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/toil/batchSystems/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def issueBatchJob(self, jobDesc):

# Try the job as local
localID = self.handleLocalJob(jobDesc)
if localID:
if localID is not None:
# It is a local job
return localID
else:
Expand Down
1,100 changes: 910 additions & 190 deletions src/toil/cwl/cwltoil.py

Large diffs are not rendered by default.

45 changes: 34 additions & 11 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,10 @@ def makeString(x):
# the logging has been captured to be reported on the leader.
self.logJobStoreFileID = None

# Every time we update a job description in place in the job store, we
# increment this.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only for debugging? The purpose of this isn't clear to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this for debugging because I thought we might have been forgetting to save an update or running the wrong version of a job somewhere; it turned out that wasn't the case and we were giving the same version of the job to two batch systems, but I left this in because I want to use it later when poking around the leader looking for race conditions with its weird job update system.

self._generation = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would _job_version be clearer? I'm particularly noticing it's printed as v{_generation} currently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rename this.


def serviceHostIDsInBatches(self):
"""
Get an iterator over all batches of service host job IDs that can be
Expand Down Expand Up @@ -635,6 +639,8 @@ def replace(self, other):
self.filesToDelete += other.filesToDelete
self.jobsToDelete += other.jobsToDelete

self._generation = other._generation

def addChild(self, childID):
"""
Make the job with the given ID a child of the described job.
Expand Down Expand Up @@ -814,6 +820,8 @@ def __str__(self):
if self.jobStoreID is not None:
printedName += ' ' + str(self.jobStoreID)

printedName += ' v' + str(self._generation)

return printedName

# Not usable as a key (not hashable) and doesn't have any value-equality.
Expand All @@ -823,6 +831,16 @@ def __str__(self):
def __repr__(self):
return '%s( **%r )' % (self.__class__.__name__, self.__dict__)

def pre_update_hook(self) -> None:
"""
Called by the job store before pickling and saving a created or updated
version of a job.
"""

self._generation += 1
logger.debug("New generation: %s", self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would Job updated: or New Job Generation/Version be clearer? New Generation might benefit from being more explicit since the jobStoreID it prints is just a garbled string most of the time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go with "New job version".




class ServiceJobDescription(JobDescription):
"""
Expand Down Expand Up @@ -1448,7 +1466,8 @@ def registerPromise(self, path):
raise JobPromiseConstraintError(self)
# TODO: can we guarantee self.jobStoreID is populated and so pass that here?
with self._promiseJobStore.writeFileStream() as (fileHandle, jobStoreFileID):
promise = UnfulfilledPromiseSentinel(str(self.description), False)
promise = UnfulfilledPromiseSentinel(str(self.description), jobStoreFileID, False)
logger.debug('Issuing promise %s for result of %s', jobStoreFileID, self.description)
pickle.dump(promise, fileHandle, pickle.HIGHEST_PROTOCOL)
self._rvs[path].append(jobStoreFileID)
return self._promiseJobStore.config.jobStore, jobStoreFileID
Expand Down Expand Up @@ -1874,7 +1893,7 @@ def _fulfillPromises(self, returnValues, jobStore):
# either case, we just pass it on.
promisedValue = returnValues
else:
# If there is an path ...
# If there is a path ...
if isinstance(returnValues, Promise):
# ... and the value itself is a Promise, we need to created a new, narrower
# promise and pass it on.
Expand All @@ -1888,8 +1907,11 @@ def _fulfillPromises(self, returnValues, jobStore):
# File may be gone if the job is a service being re-run and the accessing job is
# already complete.
if jobStore.fileExists(promiseFileStoreID):
logger.debug("Resolve promise %s from %s with a %s", promiseFileStoreID, self, type(promisedValue))
with jobStore.updateFileStream(promiseFileStoreID) as fileHandle:
pickle.dump(promisedValue, fileHandle, pickle.HIGHEST_PROTOCOL)
else:
logger.debug("Do not resolve promise %s from %s because it is no longer needed", promiseFileStoreID, self)

# Functions associated with Job.checkJobGraphAcyclic to establish that the job graph does not
# contain any cycles of dependencies:
Expand Down Expand Up @@ -2381,7 +2403,6 @@ def _jobName(self):
"""
return self._description.displayName


class JobException(Exception):
"""
General job exception.
Expand Down Expand Up @@ -2577,7 +2598,7 @@ class EncapsulatedJob(Job):
predecessors automatically. Care should be exercised to ensure the encapsulated job has the
proper set of predecessors.

The return value of an encapsulatd job (as accessed by the :func:`toil.job.Job.rv` function)
The return value of an encapsulated job (as accessed by the :func:`toil.job.Job.rv` function)
is the return value of the root job, e.g. A().encapsulate().rv() and A().rv() will resolve to
the same value after A or A.encapsulate() has been run.
"""
Expand Down Expand Up @@ -2930,17 +2951,19 @@ def convertPromises(kwargs):
class UnfulfilledPromiseSentinel:
"""This should be overwritten by a proper promised value. Throws an
exception when unpickled."""
def __init__(self, fulfillingJobName, unpickled):
def __init__(self, fulfillingJobName, file_id: str, unpickled):
self.fulfillingJobName = fulfillingJobName
self.file_id = file_id

@staticmethod
def __setstate__(stateDict):
"""Only called when unpickling. This won't be unpickled unless the
promise wasn't resolved, so we throw an exception."""
jobName = stateDict['fulfillingJobName']
raise RuntimeError("This job was passed a promise that wasn't yet resolved when it "
"ran. The job {jobName} that fulfills this promise hasn't yet "
"finished. This means that there aren't enough constraints to "
"ensure the current job always runs after {jobName}. Consider adding a "
"follow-on indirection between this job and its parent, or adding "
"this job as a child/follow-on of {jobName}.".format(jobName=jobName))
file_id = stateDict['file_id']
raise RuntimeError(f"This job was passed promise {file_id} that wasn't yet resolved when it "
f"ran. The job {jobName} that fulfills this promise hasn't yet "
f"finished. This means that there aren't enough constraints to "
f"ensure the current job always runs after {jobName}. Consider adding a "
f"follow-on indirection between this job and its parent, or adding "
f"this job as a child/follow-on of {jobName}.")
4 changes: 4 additions & 0 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,8 @@ def create(self, jobDescription):
"""
Writes the given JobDescription to the job store. The job must have an ID assigned already.

Must call jobDescription.pre_update_hook()

:return: The JobDescription passed.
:rtype: toil.job.JobDescription
"""
Expand Down Expand Up @@ -817,6 +819,8 @@ def update(self, jobDescription):
"""
Persists changes to the state of the given JobDescription in this store atomically.

Must call jobDescription.pre_update_hook()

:param toil.job.JobDescription job: the job to write to this job store
"""
raise NotImplementedError()
Expand Down
3 changes: 3 additions & 0 deletions src/toil/jobStores/aws/jobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ def batch(self):
range(0, len(self._batchedUpdates), self.jobsPerBatchInsert)]

for batch in batches:
for jobDescription in batch:
jobDescription.pre_update_hook()
items = {compat_bytes(jobDescription.jobStoreID): self._awsJobToItem(jobDescription) for jobDescription in batch}
for attempt in retry_sdb():
with attempt:
Expand Down Expand Up @@ -359,6 +361,7 @@ def load(self, jobStoreID):

def update(self, jobDescription):
logger.debug("Updating job %s", jobDescription.jobStoreID)
jobDescription.pre_update_hook()
item = self._awsJobToItem(jobDescription)
for attempt in retry_sdb():
with attempt:
Expand Down
2 changes: 2 additions & 0 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ def update(self, job):
assert job.jobStoreID is not None, f"Tried to update job {job} without an ID"
assert not isinstance(job.jobStoreID, TemporaryID), f"Tried to update job {job} without an assigned ID"

job.pre_update_hook()

# The job is serialised to a file suffixed by ".new"
# We insist on creating the file; an existing .new file indicates
# multiple simultaneous attempts to update the job, which will lose
Expand Down
2 changes: 2 additions & 0 deletions src/toil/jobStores/googleJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def assignID(self, jobDescription):

def create(self, jobDescription):
# TODO: we don't implement batching, but we probably should.
jobDescription.pre_update_hook()
self._writeBytes(jobDescription.jobStoreID, pickle.dumps(jobDescription, protocol=pickle.HIGHEST_PROTOCOL))
return jobDescription

Expand Down Expand Up @@ -212,6 +213,7 @@ def load(self, jobStoreID):
return job

def update(self, job):
job.pre_update_hook()
self._writeBytes(job.jobStoreID, pickle.dumps(job, protocol=pickle.HIGHEST_PROTOCOL), update=True)

@googleRetry
Expand Down
35 changes: 30 additions & 5 deletions src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import pickle
import sys
import time
from typing import List

import enlighten

from toil import resolveEntryPoint
from toil.batchSystems import DeadlockException
from toil.batchSystems.abstractBatchSystem import BatchJobExitReason
from toil.common import Toil, ToilMetrics
from toil.job import CheckpointJobDescription, ServiceJobDescription
from toil.jobStores.abstractJobStore import NoSuchJobException
from toil.job import JobDescription, CheckpointJobDescription, ServiceJobDescription
from toil.jobStores.abstractJobStore import AbstractJobStore, NoSuchJobException
from toil.lib.conversions import bytes2human
from toil.lib.throttle import LocalThrottle
from toil.provisioners.abstractProvisioner import AbstractProvisioner
Expand All @@ -41,10 +42,11 @@
from toil.toilState import ToilState

try:
from toil.cwl.cwltoil import CWL_INTERNAL_JOBS
from toil.cwl.cwltoil import CWL_INTERNAL_JOBS, CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE
except ImportError:
# CWL extra not installed
CWL_INTERNAL_JOBS = ()
CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE = 0


logger = logging.getLogger( __name__ )
Expand All @@ -66,8 +68,18 @@


class FailedJobsException(Exception):
def __init__(self, jobStoreLocator, failedJobs, jobStore):
def __init__(self, jobStoreLocator: str, failedJobs: List[JobDescription], jobStore: AbstractJobStore, exit_code: int = 1):
"""
Make an exception to report failed jobs.

:param jobStoreLocator: The job store locator that says what job store has the failed jobs.
:param failedJobs: All the failed jobs
:param jobStore: The actual open job store with the failed jobs in it.
:param exit_code: Recommended process exit code.

"""
self.msg = "The job store '%s' contains %i failed jobs" % (jobStoreLocator, len(failedJobs))
self.exit_code = exit_code
try:
self.msg += ": %s" % ", ".join((str(failedJob) for failedJob in failedJobs))
for jobDesc in failedJobs:
Expand Down Expand Up @@ -191,6 +203,11 @@ def __init__(self, config, batchSystem, provisioner: AbstractProvisioner, jobSto
self.PROGRESS_BAR_FORMAT = ('{desc}{desc_pad}{percentage:3.0f}%|{bar}| {count:{len_total}d}/{total:d} '
'({count_1:d} failures) [{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]')

# What exit code should the process use if the workflow failed?
# Needed in case a worker detects a CWL issue that a CWL runner must
# report to its caller.
self.recommended_fail_exit_code = 1

# TODO: No way to set background color on the terminal for the bar.

def run(self):
Expand Down Expand Up @@ -258,7 +275,8 @@ def run(self):

if len(self.toilState.totalFailedJobs):
logger.info("Failed jobs at end of the run: %s", ' '.join(str(job) for job in self.toilState.totalFailedJobs))
raise FailedJobsException(self.config.jobStore, self.toilState.totalFailedJobs, self.jobStore)
raise FailedJobsException(self.config.jobStore, self.toilState.totalFailedJobs,
self.jobStore, exit_code=self.recommended_fail_exit_code)

return self.jobStore.getRootJobReturnValue()

Expand Down Expand Up @@ -568,6 +586,13 @@ def _gatherUpdatedJobs(self, updatedJobTuple):
else:
logger.warning(f'Job failed with exit value {exitStatus}: {updatedJob}\n'
f'Exit reason: {exitReason}')
if exitStatus == CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE:
# This is a CWL job informing us that the workflow is
# asking things of us that Toil can't do. When we raise an
# exception because of this, make sure to forward along
# this exit code.
logger.warning("This indicates an unsupported CWL requirement!")
self.recommended_fail_exit_code = CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE
if self.toilMetrics:
self.toilMetrics.logCompletedJob(updatedJob)
self.processFinishedJob(jobID, exitStatus, wallTime=wallTime, exitReason=exitReason)
Expand Down
11 changes: 8 additions & 3 deletions src/toil/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,16 @@ def needs_kubernetes(test_item):
test_item = _mark_test('kubernetes', test_item)
try:
import kubernetes
kubernetes.config.load_kube_config()
try:
kubernetes.config.load_kube_config()
except kubernetes.config.ConfigException:
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
return unittest.skip("Configure Kubernetes (~/.kube/config, $KUBECONFIG, "
"or current pod) to include this test.")(test_item)
except ImportError:
return unittest.skip("Install Toil with the 'kubernetes' extra to include this test.")(test_item)
except TypeError:
return unittest.skip("Configure Kubernetes (~/.kube/config) to include this test.")(test_item)
return test_item


Expand Down
5 changes: 3 additions & 2 deletions src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,10 @@ def run_kubernetes_cwl_conformance(self, **kwargs):
**kwargs)
@slow
@needs_kubernetes
def test_kubernetes_cwl_20(self):
@pytest.mark.timeout(CONFORMANCE_TEST_TIMEOUT)
def test_kubernetes_cwl_group(self):
for caching in [True, False]:
self.run_kubernetes_cwl_conformance(selected_tests="20", caching=caching)
self.run_kubernetes_cwl_conformance(selected_tests="20,35,39,42,56", caching=caching)

@needs_cwl
class CWLSmallTests(ToilTest):
Expand Down
29 changes: 25 additions & 4 deletions src/toil/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import shutil
import signal
import socket
import stat
import sys
import time
import traceback
from typing import Callable, Any
from contextlib import contextmanager

from toil import logProcessContext
Expand All @@ -39,10 +41,14 @@
from toil.statsAndLogging import configure_root_logger, set_log_level

try:
from toil.cwl.cwltoil import CWL_INTERNAL_JOBS
from toil.cwl.cwltoil import (CWL_INTERNAL_JOBS,
CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE,
CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION)
except ImportError:
# CWL extra not installed
CWL_INTERNAL_JOBS = ()
CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE = None
CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION = type(None)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -275,6 +281,7 @@ def workerScript(jobStore, config, jobName, jobStoreID, redirectOutputToLogFile=
##########################################

jobAttemptFailed = False
failure_exit_code = 1
statsDict = MagicExpando()
statsDict.jobs = []
statsDict.workers.logsToMaster = []
Expand Down Expand Up @@ -492,9 +499,13 @@ def workerScript(jobStore, config, jobName, jobStoreID, redirectOutputToLogFile=
##########################################
#Trapping where worker goes wrong
##########################################
except: #Case that something goes wrong in worker
except Exception as e: #Case that something goes wrong in worker
traceback.print_exc()
logger.error("Exiting the worker because of a failed job on host %s", socket.gethostname())
if isinstance(e, CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION):
# We need to inform the leader that this is a CWL workflow problem
# and it needs to inform its caller.
failure_exit_code = CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE
AbstractFileStore._terminateEvent.set()

##########################################
Expand Down Expand Up @@ -587,7 +598,17 @@ def workerScript(jobStore, config, jobName, jobStoreID, redirectOutputToLogFile=
#Remove the temp dir
cleanUp = config.cleanWorkDir
if cleanUp == 'always' or (cleanUp == 'onSuccess' and not jobAttemptFailed) or (cleanUp == 'onError' and jobAttemptFailed):
shutil.rmtree(localWorkerTempDir)
def make_parent_writable(func: Callable[[str], Any], path: str, _) -> None:
"""
When encountering an error removing a file or directory, make sure
the parent directory is writable.

cwltool likes to lock down directory permissions, and doesn't clean
up after itself.
"""
# Just chmod it for rwx for user. This can't work anyway if it isn't ours.
os.chmod(os.path.dirname(path), stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
shutil.rmtree(localWorkerTempDir, onerror=make_parent_writable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is so useful, I didn't know rmtree allowed for this.


#This must happen after the log file is done with, else there is no place to put the log
if (not jobAttemptFailed) and jobDesc.command == None and next(jobDesc.successorsAndServiceHosts(), None) is None:
Expand All @@ -597,7 +618,7 @@ def workerScript(jobStore, config, jobName, jobStoreID, redirectOutputToLogFile=
jobStore.delete(jobDesc.jobStoreID)

if jobAttemptFailed:
return 1
return failure_exit_code
else:
return 0

Expand Down