Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storeDir is not working #1359

Open
lucacozzuto opened this issue Nov 5, 2019 · 16 comments
Open

storeDir is not working #1359

lucacozzuto opened this issue Nov 5, 2019 · 16 comments

Comments

@lucacozzuto
Copy link

Bug report

Using storeDir makes the pipeline crash complaining that the required output file is not there any more.

Expected behavior and actual behavior

In past storeDir was copying the files to another folder, now it moves them and this breaks the workflow execution.

  • Nextflow version: 19.10.0
Error executing process > 'buildIndex (Cowc_gDNA_mtDNA.fasta)'

Caused by:
  Missing output file(s) `STARgenome` expected by process `buildIndex (Cowc_gDNA_mtDNA.fasta)`

Command executed:

  mkdir STARgenome
  if [ `echo Cowc_gDNA_mtDNA.fasta | grep ".gz"` ]; then 
      zcat Cowc_gDNA_mtDNA.fasta > `basename Cowc_gDNA_mtDNA.fasta .gz`
      STAR --runMode genomeGenerate --genomeDir STARgenome --runThreadN 8             --genomeFastaFiles `basename Cowc_gDNA_mtDNA.fasta .gz` --sjdbGTFfile Cowc_long.annot.exon.gtf             --sjdbOverhang 49 --outFileNamePrefix STARgenome             ;
      rm `basename Cowc_gDNA_mtDNA.fasta .gz`
  else 
      STAR --runMode genomeGenerate --genomeDir STARgenome --runThreadN 8             --genomeFastaFiles Cowc_gDNA_mtDNA.fasta --sjdbGTFfile Cowc_long.annot.exon.gtf             --sjdbOverhang 49 --outFileNamePrefix STARgenome             
  fi

Command exit status:
  0

Command output:
  Nov 05 10:29:40 ..... started STAR run
  Nov 05 10:29:40 ... starting to generate Genome files
  Nov 05 10:29:40 ... starting to sort Suffix Array. This may take a long time...
  Nov 05 10:29:41 ... sorting Suffix Array chunks and saving them to disk...
  Nov 05 10:29:45 ... loading chunks from disk, packing SA...
  Nov 05 10:29:46 ... finished generating suffix array
  Nov 05 10:29:46 ... generating Suffix Array index
  Nov 05 10:29:52 ... completed Suffix Array index
  Nov 05 10:29:52 ..... processing annotations GTF
  Nov 05 10:29:52 ..... inserting junctions into the genome indices
  Nov 05 10:30:03 ... writing Genome to disk ...
  Nov 05 10:30:03 ... writing Suffix Array to disk ...
  Nov 05 10:30:03 ... writing SAindex to disk
  Nov 05 10:30:09 ..... finished successfully

@lucacozzuto
Copy link
Author

lucacozzuto commented Jan 21, 2020

it was a problem with our storage... I''m wondering if this kind of asynchronous copy can give problem to the pipelines

@lucacozzuto
Copy link
Author

I'll add a couple of suggestions: it looks like this problem is related to the fact that big files take time to copy to the final directory indicated in storageDir. So you might want to add either a sleep() after the execution of the command or into a "afterScript" definition. Another solution can be to add a retry with an incremental sleep in the nextflow.config file

@lucacozzuto lucacozzuto reopened this Apr 26, 2024
@lucacozzuto
Copy link
Author

From time to time I still see this problem with storeDir... I don't know how many people see it. Maybe @JoseEspinosa also experience it

@lucacozzuto
Copy link
Author

it would be nice to have a function that allows some waiting time before the file is copied to the place indicated by storeDir

@bentsherman
Copy link
Member

Are you trying to use the storeDir output in a downstream task within the same pipeline? I think storeDir is designed only to work across pipeline runs

@pditommaso it looks like the task outputs are actually moved into the storeDir, which would prevent downstream tasks from being able to also use the output. Maybe instead we should keep a link in the task directory to the storeDir so that it can still be used as a regular output

@lucacozzuto
Copy link
Author

Hi @bentsherman, I'm trying to run the pipeline just once and it fails. Likely because the process is looking at the output file in a position while it is being copied.

@lucacozzuto
Copy link
Author

I think the presence of a soft link can be a patch (but not so sure it will work in AWS. batch).

@bentsherman
Copy link
Member

Recently I was reading about software-defined assets in Dagster, which looks like this:

@asset
def logins(website_events: DataFrame) -> DataFrame:
   return website_events[website_events["type"] == "login"]

This flips the script, instead of saying "this is a task, here are it's outputs", it says "this is a data asset, here is how to compute it". This is a very intuitive way to think about the pipeline as it focuses on the thing we care about most, the results.

I wonder if we could do something like this in Nextflow. Ideally keeping the same syntax for processes, but making the workflow definition more output-centric.

I think we need to combine storeDir and publishDir into a single concept and make it more prominent in the workflow definition. I should be able to say "this asset should exist in (storeDir/publishDir), here's how to re-compute it if needed (process definition)":

Using rnaseq-nf as an example:

process INDEX {
  tag "$transcriptome.simpleName"
  conda 'salmon=1.10.2'
  input:
    path transcriptome
  output:
    path index
  script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i index
    """
}

workflow {
  index = asset('/data/index') {
    INDEX(params.transcriptome)
  }
}

The main idea is to make the store/publish dir external to the process invocation. The workflow should request an asset from some local path, show how to re-compute it, then Nextflow can figure out if the request asset is up to date and decide whether to re-compute it.

Sorry for the rant @lucacozzuto , I know you're just trying to fix the bug right in front of you, I just got inspired to think about the long-term solution. If you could provide a minimal test case, we should be able to get to the bottom of it

@lucacozzuto
Copy link
Author

Don't worry I love this way of thinking more in broad terms... I just think that the output centric way can be a bit reductive... A nice thing about Nextflow is to forget about the naming of input / outputs... So I would think twice about changing this assets of the language. About storedir I'll work on a test case

@bentsherman
Copy link
Member

I agree with your point about naming things. Dagster seems to work well with the output-centric model because it's all just variable names and in-memory dataframes, so you don't have to worry about organizing files in a filesystem. Nextflow also saves you from having to name things all the time by having the hidden work directory and then you specify which outputs to publish and where. An output-centric model in Nextflow should continue in that pattern.

Maybe we can incorporate storeDir into the upcoming workflow publish definition #4784 :

workflow {
  main:
  index = INDEX(params.transcriptome)

  publish:
  index >> '/data/index'
}

One thing we have added with this feature is an option to not re-publish a file if the publish destination is up to date, similar to the cache directive. I guess the natural next step would be, if all outputs published by a process are already up to date in the publish destination, don't re-execute the task.

@lucacozzuto
Copy link
Author

Agree. However, I cannot always reproduce the error. So it is likely that if the (big) file is not copied completely, it fails. Storedir is useful when you have a big task (like indexing) that you don't want to re-execute each time and use the same file among different pipeline runs... but we really need some "waiting time" between the copy in the "store" folder and the check of the existence of the output file.

@bentsherman
Copy link
Member

Looking at the code more closely, the way it works is:

  • the job .command.run moves the output files to the storeDir using mv
  • after the job is complete (including the mv command), Nextflow checks the storeDir for the output files as declared in the output: section

Because the mv command is part of the job, I would expect the storeDir to be up to date once the job is finished

@lucacozzuto
Copy link
Author

the problem happens on a HPC... so I think is some problem with some asynchronous process

@bentsherman
Copy link
Member

The procedure I described is not asynchronous. Nextflow does not check the storeDir until after the job and file transfer has completed.

I think a way to test your theory would be to copy a large file to the storeDir and then try to access the file from the storeDir

@mbeckste
Copy link

mbeckste commented Jul 4, 2024

We noticed exactly the same problem in one of the epi2me ONT workflows today and I agree with @lucacozzuto that it is a problem with asynchrony in a HPC environment. In our scenario the Nextflow head job and the job using storeDir were scheduled to two different nodes in our cluster and workdir (NXF_WORK) and also the storeDir are on a shared filesystem with some latency. The job with the storeDir process finished successfully (RC=0), i.e. nxf_unstage() in .command.run and subsequently nx_fs_move() moves all the files to storeDir. Then Nextflow (on a different node) checks the output and due to slow NFS does not find the file(s) and finally resulting in a Missing output file(s) error.

@bentsherman
Copy link
Member

I remember that the grid executors have this hack to deal with NFS delays when checking the exitcode file for a job:

String workDirList = null
if( exitTimestampMillis1 && FileHelper.workDirIsNFS ) {
/*
* When the file is in a NFS folder in order to avoid false negative
* list the content of the parent path to force refresh of NFS metadata
* http://stackoverflow.com/questions/3833127/alternative-to-file-exists-in-java
* http://superuser.com/questions/422061/how-to-determine-whether-a-directory-is-on-an-nfs-mounted-drive
*/
workDirList = FileHelper.listDirectory(task.workDir)
}

Apparently if you list a directory it will force-update the NFS metadata. Might be able to do the same thing for storeDir, if someone would like to give it a try:

/**
* Check if exists a *storeDir* for the specified task. When if exists
* and contains the expected result files, the process execution is skipped.
*
* @param task The task for which check the stored output
* @return {@code true} when the folder exists and it contains the expected outputs,
* {@code false} otherwise
*/
final boolean checkStoredOutput( TaskRun task ) {

I think it would be enough to list the storeDir before calling collectOutputs()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants