-
Notifications
You must be signed in to change notification settings - Fork 0
/
vcf_filter.py
executable file
·189 lines (137 loc) · 5.62 KB
/
vcf_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python
# -*- coding: utf-8 -*-
## CPCantalapiedra - EEAD - CSIC - 2016
## TODO:
# Make use of cyvcf https://github.com/arq5x/cyvcf based on https://github.com/jamescasbon/PyVCF
# Tests
import sys, traceback
from optparse import OptionParser
from src.output import *
GENES = 0
ISOFS = 1
def _print_parameters(options):
options_dict = options.__dict__
sys.stderr.write("Options:\n")
for option in options_dict:
if options_dict[option]:
sys.stderr.write("\t"+option+"="+str(options_dict[option])+"\n")
return
## Argument parsing
__usage = "usage: vcf_contigs_variants.py [VCF_FILE] [OPTIONS]"
optParser = OptionParser(__usage)
#optParser.add_option('-H', '--header', action='store', dest='vcf_header', type='string', \
# help='VCF header if VCF_FILE does not include header.')
optParser.add_option('-c', '--contigs_list', action='store', dest='contigs_list', type='string', \
help='')
optParser.add_option('-v', '--variants_list', action='store', dest='variants_list', type='string', \
help='')
optParser.add_option('-g', '--genes_list', action='store', dest='genes_list', type='string', \
help='')
optParser.add_option('-i', '--isof_list', action='store', dest='isof_list', type='string', \
help='')
optParser.add_option('-s', '--samples', action='store', dest='samples_filename', type='string', \
help='')
optParser.add_option('-t', '--samples_translation', action='store', dest='samples_translation', type='string', \
help='')
(options, arguments) = optParser.parse_args()
if not arguments or len(arguments)==0:
optParser.exit(0, "You may wish to run '-help' option.\n")
vcf_filename = arguments[0] # This is mandatory
#if options.vcf_header: vcf_header = options.vcf_header
#else: vcf_header = ""
if options.contigs_list:
query_file = options.contigs_list
else: query_file = ""
if options.variants_list:
variants_file = options.variants_list
else: variants_file = ""
if options.genes_list:
if options.isof_list:
raise Exception("Only a list of genes or a list of isoforms a list of contigs should be provided, not both.")
genes_file = options.genes_list
query_type = GENES
elif options.isof_list:
genes_file = options.isof_list
query_type = ISOFS
else: genes_file = ""
if options.samples_filename: samples_filename = options.samples_filename
else: samples_filename = ""
if query_file == "" and variants_file == "" and samples_filename == "" and genes_file == "":
raise Exception("Either a list of contigs or of variants is required.")
if options.samples_translation: samples_translation = options.samples_translation
else: samples_translation = ""
###############
###############
_print_parameters(options)
try:
genotypes_dict = {}
variants_dict = {}
sys.stderr.write("Parsing filter files...\n")
#### Parse queries file
####
query_list = parse_queries_file(query_file)
#### Variants to show
####
variants_list = parse_queries_file(variants_file, keys=(1,2))
#### Genes or isofoms
####
genes_list = parse_queries_file(genes_file)
#### Parse samples list
####
samples_list = parse_samples_list(samples_filename)
#### Parse samples translation
####
samples_trans_dict = parse_samples_translation(samples_translation)
#### Parse headers file
####
#header_found = parse_vcf_header_file(vcf_header, genotypes_dict, \
# samples_filename, samples_list, samples_translation, samples_trans_dict)
#### Parse VCF file
####
total_variants = 0
total_output = 0
vcf_file = open(vcf_filename, 'r')
sys.stderr.write("Parsing VCF file...\n")
for line in vcf_file:
if line.startswith("##"):
sys.stdout.write(line.strip()+"\n")
continue
line_data = line.strip().split()
# Header
if line.startswith("#"):# and vcf_header == "":
header_found = True
if len(samples_list)>0:
samples_fields = filter_header(line_data, samples_list, samples_translation, samples_trans_dict)
print_filtered_samples(line_data, samples_fields)
else:
sys.stdout.write(line.strip()+"\n")
continue
if not header_found:
raise Exception("No header found in VCF file.") # A header is needed
total_variants += 1
contig = line_data[VCF_CONTIG_COL]
if query_file != "" and not contig in query_list: continue
pos = line_data[VCF_POS_COL]
if variants_file != "" and not [contig, pos] in variants_list: continue
if genes_file != "":
variant_genes_isof = get_variant_genes_isof(line_data, query_type)
one_gene_isof_found = False
for gene_isof in variant_genes_isof:
if gene_isof in genes_list:
one_gene_isof_found = True
break
if not one_gene_isof_found: continue
if len(samples_list)>0:
print_filtered_samples(line_data, samples_fields)
else:
sys.stdout.write(line.strip()+"\n")
total_output += 1
sys.stderr.write("Total variants read: "+str(total_variants)+"\n")
sys.stderr.write("Total variants retained: "+str(total_output)+"\n")
sys.stderr.write("Finished.\n")
except Exception as e:
print e
traceback.print_exc()
finally:
vcf_file.close()
## END