-
Notifications
You must be signed in to change notification settings - Fork 15
/
phage.nf
executable file
·373 lines (327 loc) · 16.2 KB
/
phage.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
/*
* Nextflow -- What the Phage
* Author: [email protected]
*/
/*
Nextflow version check
Format is this: XX.YY.ZZ (e.g. 20.07.1)
change below
*/
XX = "21"
YY = "04"
ZZ = "0"
if ( nextflow.version.toString().tokenize('.')[0].toInteger() < XX.toInteger() ) {
println "\033[0;33mWtP requires at least Nextflow version " + XX + "." + YY + "." + ZZ + " -- You are using version $nextflow.version\u001B[0m"
exit 1
}
else if ( nextflow.version.toString().tokenize('.')[1].toInteger() == XX.toInteger() && nextflow.version.toString().tokenize('.')[1].toInteger() < YY.toInteger() ) {
println "\033[0;33mWtP requires at least Nextflow version " + XX + "." + YY + "." + ZZ + " -- You are using version $nextflow.version\u001B[0m"
exit 1
}
println "_____ _____ ____ ____ ___ ___ __ __ _ _ "
println " __ _______________________ "
println " / \\ / \\__ ___/\\______ \\"
println " \\ \\/\\/ / | | | ___/"
println " \\ / | | | | "
println " \\__/\\ / |____| |____| "
println " \\/ "
println "_____ _____ ____ ____ ___ ___ __ __ _ _ "
if (params.help) { exit 0, helpMSG() }
println " "
println "\u001B[32mProfile: $workflow.profile\033[0m"
println " "
println "\033[2mCurrent User: $workflow.userName"
println "Nextflow-version: $nextflow.version"
println "Starting time: $nextflow.timestamp"
println "Workdir location [--workdir]:"
println " $workflow.workDir"
println "Output location [--output]:"
println " $params.output"
println "\033[2mDatabase location [--databases]:"
println " $params.databases\u001B[0m"
if (workflow.profile.contains('singularity')) {
println "\033[2mSingularity cache location [--cachedir]:"
println " $params.cachedir"
println " "
println "\u001B[33m WARNING: Singularity image building sometimes fails!"
println " Please download all images first via --setup --cachedir IMAGE-LOCATION"
println " Manually remove faulty images in $params.cachedir for a rebuild\u001B[0m"
}
if (params.annotate) { println "\u001B[33mSkipping phage identification for fasta files\u001B[0m" }
if (params.identify) { println "\u001B[33mSkipping phage annotation\u001B[0m" }
println " "
println "\033[2mCPUs per process: $params.cores, maximal CPUs to use: $params.max_cores\033[0m"
println " "
/*************
* ERROR HANDLING
*************/
// profiles
if ( workflow.profile == 'standard' ) { exit 1, "NO VALID EXECUTION PROFILE SELECTED, use e.g. [-profile local,docker]" }
if (
workflow.profile.contains('singularity') ||
workflow.profile.contains('ukj_cloud') ||
workflow.profile.contains('stub') ||
workflow.profile.contains('docker')
) { "engine selected" }
else { exit 1, "No engine selected: -profile EXECUTER,ENGINE" }
if (
workflow.profile.contains('local') ||
workflow.profile.contains('test') ||
workflow.profile.contains('smalltest') ||
workflow.profile.contains('ebi') ||
workflow.profile.contains('slurm') ||
workflow.profile.contains('lsf') ||
workflow.profile.contains('ukj_cloud') ||
workflow.profile.contains('stub') ||
workflow.profile.contains('git_action')
) { "executer selected" }
else { exit 1, "No executer selected: -profile EXECUTER,ENGINE" }
// params tests
if (!params.setup && !workflow.profile.contains('test') && !workflow.profile.contains('smalltest')) {
if ( !params.fasta && !params.fastq ) {
exit 1, "input missing, use [--fasta] or [--fastq]"}
if ( params.ma && params.mp && params.vf && params.vs && params.pp && params.dv && params.sm && params.vn && params.vb && params.ph && params.vs2 && params.sk ) {
exit 0, "What the... you deactivated all the tools"}
}
/*************
* INPUT HANDLING
*************/
// fasta input or via csv file, fasta input is deactivated if test profile is choosen
if (params.fasta && params.list && !workflow.profile.contains('test') ) { fasta_input_ch = Channel
.fromPath( params.fasta, checkIfExists: true )
.splitCsv()
.map { row -> ["${row[0]}", file("${row[1]}", checkIfExists: true)] }
}
else if (params.fasta && !workflow.profile.contains('test') ) { fasta_input_ch = Channel
.fromPath( params.fasta, checkIfExists: true)
.map { file -> tuple(file.baseName, file) }
}
//get-citation-file for results
citation = Channel.fromPath(workflow.projectDir + "/docs/Citations.bib")
.collectFile(storeDir: params.output + "/literature")
/**************************
* Workflows to call
**************************/
include { input_validation_wf } from './workflows/input_validation_wf'
include { deepvirfinder_wf } from './workflows/deepvirfinder_wf.nf'
include { phigaro_wf } from './workflows/phigaro_wf'
include { seeker_wf } from './workflows/seeker_wf'
include { virfinder_wf } from './workflows/virfinder_wf'
include { virnet_wf } from './workflows/virnet_wf'
include { pprmeta_wf } from './workflows/pprmeta_wf'
include { metaphinder_wf } from './workflows/metaphinder_wf'
include { metaphinder_own_DB_wf } from './workflows/metaphinder_own_DB_wf'
include { vibrant_wf } from './workflows/vibrant_wf'
include { vibrant_virome_wf } from './workflows/vibrant_virome_wf'
include { virsorter_wf } from './workflows/virsorter_wf'
include { virsorter_virome_wf } from './workflows/virsorter_virome_wf'
include { virsorter2_wf } from './workflows/virsorter2_wf'
include { sourmash_wf } from './workflows/sourmash_wf'
include { prepare_results_wf } from './workflows/prepare_results_wf'
include { phage_annotation_wf } from './workflows/phage_annotation_wf'
include { checkV_wf } from './workflows/checkV_wf'
include { phage_tax_classification_wf } from './workflows/phage_tax_classification_wf'
include { setup_wf } from './workflows/setup_wf'
include { get_test_data_wf } from './workflows/get_test_data_wf'
include { markdown_report_wf } from './workflows/markdown_report_wf'
/**************************
* WtP Workflow
**************************/
workflow {
/**************************
* WtP setup
**************************/
if ( params.setup ) { setup_wf() }
else {
if (workflow.profile.contains('test') && !workflow.profile.contains('smalltest')) { fasta_input_ch = get_test_data_wf() }
if (workflow.profile.contains('smalltest') )
{ fasta_input_ch = Channel.fromPath(workflow.projectDir + "/test-data/all_pos_phage.fa", checkIfExists: true).map { file -> tuple(file.simpleName, file) } }
}
/**************************
* worflow flow control
**************************/
// create 3 "input channels" for each flow
// Annotation only
if ( params.fasta && params.annotate && !params.identify && !params.setup) { annotation_channel = input_validation_wf(fasta_input_ch) }
// Identify only
else if (params.fasta && params.identify && !params.annotate && !params.setup ) { prediction_channel = input_validation_wf(fasta_input_ch) }
// Full run
else if (params.fasta && !params.identify && !params.annotate && !params.setup ) { prediction_channel = input_validation_wf(fasta_input_ch) }
/**************************
* Prediction via benchmarked tools only
**************************/
// run annotation if identify flag or no flag at all
if (params.fasta && params.identify && !params.annotate && !params.setup && !params.all_tools || params.fasta && !params.identify && !params.annotate && !params.setup && !params.all_tools ) {
// actual tools
results = deepvirfinder_wf( prediction_channel)
.concat( seeker_wf(prediction_channel))
.concat( virfinder_wf(prediction_channel))
.concat( pprmeta_wf(prediction_channel))
.concat( metaphinder_wf(prediction_channel))
.concat( vibrant_wf(prediction_channel))
.concat( vibrant_virome_wf(prediction_channel))
.concat( virsorter_wf(prediction_channel))
.concat( virsorter_virome_wf(prediction_channel))
.concat( virsorter2_wf(prediction_channel))
.filter { it != 'deactivated' } // removes deactivated tool channels
.groupTuple()
prepare_results_wf(results, prediction_channel)
// markdown report input
// map identify output for input of annotaion tools
annotation_channel = input_validation_wf.out.join(results)
}
//&& !params.all_tools
/**************************
* Prediction via all tools
**************************/
// run annotation if identify flag or no flag at all
if (params.fasta && params.identify && !params.annotate && !params.setup && params.all_tools|| params.fasta && params.all_tools && !params.identify && !params.annotate && !params.setup ) {
// benchmarked tools
results = deepvirfinder_wf( prediction_channel)
.concat( phigaro_wf(prediction_channel))
.concat( seeker_wf(prediction_channel))
.concat( virfinder_wf(prediction_channel))
.concat( virnet_wf(prediction_channel))
.concat( pprmeta_wf(prediction_channel))
.concat( metaphinder_wf(prediction_channel))
.concat( metaphinder_own_DB_wf(prediction_channel))
.concat( vibrant_wf(prediction_channel))
.concat( vibrant_virome_wf(prediction_channel))
.concat( virsorter_wf(prediction_channel))
.concat( virsorter_virome_wf(prediction_channel))
.concat( virsorter2_wf(prediction_channel))
.concat( sourmash_wf(prediction_channel))
.filter { it != 'deactivated' } // removes deactivated tool channels
.groupTuple()
prepare_results_wf(results, prediction_channel)
// markdown report input
// map identify output for input of annotaion tools
annotation_channel = input_validation_wf.out.join(results)
}
/**************************
* Annotation
**************************/
// run annotation if annotate flag or no flag at all
if ( params.fasta && params.annotate && !params.identify && !params.setup || params.fasta && !params.identify && !params.annotate && !params.setup ) {
// actual tools
checkV_wf(annotation_channel)
phage_annotation_wf(annotation_channel, checkV_wf.out)
phage_tax_classification_wf(annotation_channel)
// markdown report input
phage_annotation_wf.out
}
/**************************
* Result Report
**************************/
// NO FLAG
if (params.fasta && !params.identify && !params.annotate && !params.setup ) {
markdown_report_wf( prepare_results_wf.out.upsetr_plot_markdown_input,
prepare_results_wf.out.heatmap_table_markdown_input,
phage_annotation_wf.out.annotationtable_markdown_input,
checkV_wf.out,
phage_tax_classification_wf.out
)
}
// --IDENTIFY
// dummy channel simulate the output from --annotate
if (params.fasta && params.identify && !params.annotate && !params.setup ) {
markdown_report_wf( prepare_results_wf.out.upsetr_plot_markdown_input,
prepare_results_wf.out.heatmap_table_markdown_input,
dummy_A = Channel.from( [ 'deactivated', 'deactivated']),
dummy_B = Channel.from( [ 'deactivated', 'deactivated']),
dummy_C = Channel.from( [ 'deactivated', 'deactivated']))
}
// --ANNOTATE
// dummy channel simulate the output from --identify
if (params.fasta && !params.identify && params.annotate && !params.setup ) {
markdown_report_wf( dummy_A = Channel.from( [ 'deactivated', 'deactivated']),
dummy_B = Channel.from( [ 'deactivated', 'deactivated']),
phage_annotation_wf.out.annotationtable_markdown_input,
checkV_wf.out,
phage_tax_classification_wf.out )
}
}
/*************
* --help
*************/
def helpMSG() {
c_green = "\033[0;32m";
c_reset = "\033[0m";
c_yellow = "\033[0;33m";
c_blue = "\033[0;34m";
c_purple = "\033[0;35m";
c_dim = "\033[2m";
log.info """
.
${c_purple}Usage examples:${c_reset}
nextflow run replikation/What_the_Phage --fasta '*/*.fasta' --cores 20 --max_cores 40 \\
--output results -profile local,docker
nextflow run phage.nf --fasta '*/*.fasta' --cores 20 \\
--output results -profile lsf,singularity \\
--cachedir /images/singularity_images \\
--databases /databases/WtP_databases/
${c_purple}Input:${c_reset}
--fasta '*.fasta' -> assembly file(s)
--fastq '*.fastq' -> long read file(s)
${c_dim} ..change above input to csv via --list ${c_reset}
${c_dim} e.g. --fasta inputs.csv --list
the .csv contains per line: name,/path/to/file${c_reset}
--setup skips analysis and just downloads databases and containers
${c_purple}Execution/Engine profiles:${c_reset}
WtP supports profiles to run via different ${c_green}Executers${c_reset} and ${c_blue}Engines${c_reset} e.g.:
-profile ${c_green}local${c_reset},${c_blue}docker${c_reset}
${c_green}Executer${c_reset} (choose one):
slurm
local
lsf
ebi
${c_blue}Engines${c_reset} (choose one):
docker
singularity
For a test run (~ 1h), add "smalltest" to the profile, e.g. -profile smalltest,local,singularity
${c_purple}Options:${c_reset}
--filter min contig size [bp] to analyse [default: $params.filter]
--cores max cores per process for local use [default: $params.cores]
--max_cores max cores used on the machine for local use [default: $params.max_cores]
--output name of the result folder [default: $params.output]
${c_purple}Tool control:${c_reset}
Deploy all integrated phage prediction tools
--all_tools activate all phage prediction tools
Deactivate tools individually by adding one or more of these flags
--dv deactivates deepvirfinder
--mp deactivates metaphinder
--pp deactivates PPRmeta
--sm deactivates sourmash
--vb deactivates vibrant
--vf deactivates virfinder
--vn deactivates virnet
--vs deactivates virsorter
--ph deactivates phigaro
--vs2 deactivates virsorter2
--sk deactivates seeker
${c_purple}Custom phage annotation Database:${c_reset}
--annotation_db /path/to/your/custom_phage_annotation_db.tar.gz
Please provide a custom_phage_annotation_db.tar.gz archive that contains the following file formats:
*.hmm *.hmm.h3f *.hmm.h3i *.hmm.h3m *.hmm.h3p
${c_yellow}Workflow control:${c_reset}
--identify only phage identification, skips analysis
--annotate only annotation, skips phage identification
--plot_completeness pharokka (annotation) will plot Phage-contigs with CheckV-completeness > 75.00 (or you provide your cutoff value, e.g. 80.00)
${c_yellow}Databases, file, container behaviour:${c_reset}
--databases specifiy download location of databases
[default: ${params.databases}]
${c_dim}WtP downloads DBs if not present at this path${c_reset}
--workdir defines the path where nextflow writes temporary files
[default: $params.workdir]
--cachedir defines the path where singularity images are cached
[default: $params.cachedir]
""".stripIndent()
}
if (!params.setup) {
workflow.onComplete {
log.info ( workflow.success ? "\nDone! Results are stored here --> $params.output \nThank you for using What the Phage\n \nPlease cite us: https://doi.org/10.1101/2020.07.24.219899 \
\n\nPlease also cite the other tools we use in our workflow --> $params.output/literature \n" : "Oops .. something went wrong" )
}
}