-
Notifications
You must be signed in to change notification settings - Fork 1
/
split_fasta
executable file
·94 lines (82 loc) · 2.9 KB
/
split_fasta
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/python
# coding=utf-8
'''
Created on Feb 24, 2015
@author: Allis Tauri <[email protected]>
'''
import os
import sys
import argparse
import signal
from Bio import SeqIO
FMT = 'fasta'
EXT = 'fasta'
abort = False
cur_file = None
def sig_handler(signal, frame):
global abort
if cur_file is not None:
cur_file.close()
abort = True
print 'Aborted.'
#end def
def strip_ext(name):
return name[:name.rfind('.')]
def out_file(name, num):
filename = os.path.basename(strip_ext(name))
filename += '-%03d.%s' % (num, EXT)
return open(filename, 'wb')
#end def
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Split huge %s files with many records into a set of smaller files.' % FMT)
parser.add_argument('sequence_files', metavar='file.%s' % FMT,
type=str, nargs='+',
help='File(s) with sequences.')
parser.add_argument('-s', '--sequences-per-file', dest='max_seqs', metavar='number',
type=int, default=10000,
help='Maximum number of sequences per output file')
parser.add_argument('-e', '--output-extension', dest='ext', metavar='.ext',
type=str, default='fasta',
help='Extension of output files')
args = parser.parse_args()
if not args.sequence_files:
print 'You should provide at least one file for conversion.'
sys.exit(1)
EXT = args.ext
#setup signal handler
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGQUIT, sig_handler)
topd = os.curdir;
for sequence_file in args.sequence_files:
os.chdir(topd)
#check file existence
if not os.path.isfile(sequence_file):
print "No such file:", sequence_file
continue
print 'Splitting %s into slices of %d sequences max' % (sequence_file, args.max_seqs)
#make working directory
wd = strip_ext(sequence_file)
if not os.path.isdir(wd): os.mkdir(wd)
os.chdir(wd)
print 'Working directory: %s' % wd
#initialize slicing
seqs = 0
files = 1
cur_file = None
for seq in SeqIO.parse(sequence_file, FMT):
if cur_file is None or seqs >= args.max_seqs:
if cur_file is not None:
print 'Wrote %d sequences to %s' % (seqs, cur_file.name)
cur_file.close()
cur_file = out_file(sequence_file, files)
files += 1
seqs = 0
if abort or cur_file.closed: sys.exit()
SeqIO.write(seq, cur_file, FMT)
seqs += 1
if cur_file is not None:
print 'Wrote %d sequences to %s' % (seqs, cur_file.name)
cur_file.close()
print '-'*30 + 'Done' + '-'*30
#end main