-
Notifications
You must be signed in to change notification settings - Fork 0
/
VG_iterative.py
executable file
·197 lines (153 loc) · 8.34 KB
/
VG_iterative.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import os
import sys
import argparse
import tempfile
from datetime import datetime
import random
import re
import subprocess
import glob
import json
import shutil
from pathlib import Path
print(datetime.now())
print(Path.cwd().resolve())
def setup_temporary_directory(prefix='vg-'):
"""Create and return a temporary directory."""
tmp_dir = tempfile.mkdtemp(prefix=prefix)
print(f"Temporary directory: {tmp_dir}")
return Path(tmp_dir)
def prepare_output_directory(base_name, specified_output_dir=None):
"""Prepare the output directory based on the base name and optional specified directory."""
output_dir = Path(specified_output_dir if specified_output_dir else f"{base_name}_output").resolve()
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
def move_and_rename_vg_file(src_path, dest_dir, new_filename):
"""
Move and rename a VG file from src_path to dest_dir with a new filename.
:param src_path: Path of the source VG file.
:param dest_dir: Destination directory Path.
:param new_filename: New filename for the VG file.
"""
try:
dest_path = dest_dir.joinpath(new_filename)
shutil.move(src_path, dest_path)
print(f"Moved and renamed VG file to {dest_path}")
except FileNotFoundError as e:
print(f"Error: Source file not found {src_path}")
except Exception as e:
print(f"Error moving {src_path} to {dest_dir}/{new_filename}: {e}")
# Set up argument parser
parser = argparse.ArgumentParser(description='Script to process FASTA files and generate output graphs.')
parser.add_argument('fasta_filename', help='The FASTA file to process.')
parser.add_argument('-gf', '--graph_fasta_file', help='Optional: The FASTA file to use as base for the graph. If not specified, a random file is chosen.', default=None)
parser.add_argument('-o', '--output_graph_name', help='The name of the output graph file. Defaults to the base name of the input file.', default=None)
parser.add_argument('-d', '--output_dir_name', help='The name of the output directory. Defaults to a directory named after the input file.', default=None)
# Parse arguments using pathlib for path arguments
args = parser.parse_args()
fasta_filename = Path(args.fasta_filename)
base_name = fasta_filename.stem
output_graph_name = args.output_graph_name if args.output_graph_name else base_name
output_dir = Path(args.output_dir_name if args.output_dir_name else f"{base_name}_output").resolve()
# Assign arguments to variables
fasta_filename = args.fasta_filename
base_name = os.path.splitext(os.path.basename(fasta_filename))[0]
output_graph_name = args.output_graph_name if args.output_graph_name else base_name
output_dir_name = args.output_dir_name if args.output_dir_name else os.path.join(os.path.realpath(os.getcwd()), base_name + "_output")
output_dir.mkdir(parents=True, exist_ok=True)
# Make temporary directory
tmp_dir = setup_temporary_directory()
output_dir = prepare_output_directory(base_name, args.output_dir_name)
os.chdir(tmp_dir)
Path('fasta_files').mkdir(exist_ok=True)
Path('alignments').mkdir(exist_ok=True)
# Split fasta file into individual sequences and save to the fasta_files/ dir
# Count number of sequences in fasta file
if fasta_filename.endswith(".gz"):
nr = subprocess.check_output(f"zcat {fasta_filename} | grep '>' -c", shell=True, text=True)
subprocess.call(f"zcat {fasta_filename} | awk 'BEGIN {{n_seq=0;}} /^>/ {{if(n_seq%1==0){{file=sprintf(\"fasta_files/seq_%d.fa\",n_seq);}} print >> file; n_seq++; next;}} {{ print >> file; }}'", shell=True)
else:
nr = subprocess.check_output(f"grep '>' {fasta_filename} -c", shell=True, text=True)
subprocess.call(f"cat {fasta_filename} | awk 'BEGIN {{n_seq=0;}} /^>/ {{if(n_seq%1==0){{file=sprintf(\"fasta_files/seq_%d.fa\",n_seq);}} print >> file; n_seq++; next;}} {{ print >> file; }}'", shell=True)
nr = int(nr.strip())
print("Number of sequences in total: ", nr)
os.chdir("fasta_files")
if args.graph_fasta_file:
graph_fasta = args.graph_fasta_file
else:
graph_fasta = random.choice(os.listdir())
#print(f"Randomly selected FASTA file: {graph_fasta}")
random_file = graph_fasta # Set this for output naming
## Choose a random file/sequence
#random_file = random.choice(os.listdir())
graph_name = re.search(r'^>[a-zA-Z0-9_.]+', open(graph_fasta).read()).group(0)[1:]
print("Choosing file {} ({}) to build graph from".format(graph_fasta, graph_name))
os.chdir("..")
# Construct vg graph from the chosen file and circularize it
subprocess.run(f"vg construct -M fasta_files/{random_file} > graph.vg", shell=True)
subprocess.run(f"vg circularize -p {graph_name} graph.vg > graph_circ.vg", shell=True)
subprocess.run(f"vg stats -z graph_circ.vg", shell=True)
#os.remove("graph.vg")
#os.remove(f"fasta_files/{random_file}")
Path("graph.vg").unlink(missing_ok=True)
Path(f"fasta_files/{random_file}").unlink(missing_ok=True)
os.chdir("fasta_files")
for i in range(1, nr):
identity = 0
# Index graph
subprocess.run(f"vg index -x {tmp_dir}/graph_circ.xg {tmp_dir}/graph_circ.vg", shell=True) # XG index
subprocess.run(f"vg prune -k 48 {tmp_dir}/graph_circ.vg > {tmp_dir}/graph_circ_pruned.vg", shell=True)
subprocess.run(f"vg index -g {tmp_dir}/graph_circ.gcsa -Z 400 {tmp_dir}/graph_circ_pruned.vg", shell=True) # GCSA index
os.remove(f"{tmp_dir}/graph_circ_pruned.vg")
for file in glob.glob("*.fa"):
# Extract path name
name = re.search(r'^>[a-zA-Z0-9_.]+', open(file).read()).group(0)[1:]
# convert mitogenome to a string and align to graph
with open(file) as f:
next(f)
mitogenome_str = f.read().replace('\n', '')
subprocess.run(f"vg map -s {mitogenome_str} -V {name} -g {tmp_dir}/graph_circ.gcsa -x {tmp_dir}/graph_circ.xg > {tmp_dir}/alignments/{file}.gam", shell=True)
# Run vg view command and capture output
command = f"vg view -a {tmp_dir}/alignments/{file}.gam"
output = subprocess.check_output(command, shell=True)
# Extract identity using jq
data = json.loads(output)
try:
identity_new = data["identity"] # when encountering a file with no identity it crashes
# Set identity to 0.0 if it is "null"
except:
identity_new = 0.0
# Save the file with the highest %identity
if float(identity_new) >= identity:
identity = float(identity_new)
best_file = file
best_name = name
print(f"{file} ({name}) aligned with identity score {identity_new}")
print(f"{best_file} has the highest %identity {identity}")
# Augment graph with that sequence
subprocess.run(f"vg augment -i -S {tmp_dir}/graph_circ.vg {tmp_dir}/alignments/{best_file}.gam > {tmp_dir}/graph_aug.vg", shell=True)
#subprocess.run(f"rm {tmp_dir}/fasta_files/{best_file}", shell=True)
os.remove(os.path.join(tmp_dir, "fasta_files", best_file))
subprocess.run(f"mv {tmp_dir}/graph_aug.vg {tmp_dir}/graph_circ.vg", shell=True)
subprocess.run(f"vg stats -z {tmp_dir}/graph_circ.vg", shell=True)
# Clear the alignments dir for the next round
#could do rmtree followed by mkdir instead
#for file in os.listdir(os.path.join(tmp_dir, "alignments")):
#os.remove(os.path.join(tmp_dir, "alignments", file))
for file in Path(tmp_dir).joinpath("alignments").iterdir():
file.unlink(missing_ok=True)
# Move the graph file to the output directory
#shutil.move(f"{tmp_dir}/graph_circ.vg", f"{output_dir_name}/{output_graph_name}.vg")
#shutil.move(os.path.join(tmp_dir, "graph_circ.vg"), os.path.join(output_dir_name, f"{output_graph_name}.vg"))
#shutil.move(Path(tmp_dir).joinpath("graph_circ.vg"), output_dir.joinpath(f"{output_graph_name}.vg"))
out_file = f"{output_graph_name}_{graph_name}"
move_and_rename_vg_file(Path(tmp_dir).joinpath("graph_circ.vg"), output_dir, out_file + ".vg")
# Change to the new directory
os.chdir(output_dir_name)
# Convert the file to ODGI and GFA formats
subprocess.run(f"vg_1.44.0 convert -o {out_file}.vg > {out_file}.odgi", shell=True)
subprocess.run(f"vg convert -f {out_file}.vg > {out_file}.gfa", shell=True)
# Remove the temporary directory
os.system(f"rm -rf {tmp_dir}")
# Print the current date and time
print(datetime.now())