forked from broadinstitute/pyro-cov
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_nextclade.py
executable file
·68 lines (55 loc) · 2.23 KB
/
run_nextclade.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Copyright Contributors to the Pyro-Cov project.
# SPDX-License-Identifier: Apache-2.0
import argparse
import json
import logging
import os
import pickle
from collections import Counter
from pyrocov.fasta import NextcladeDB
logger = logging.getLogger(__name__)
logging.basicConfig(format="%(relativeCreated) 9d %(message)s", level=logging.INFO)
def count_mutations(mutation_counts, row):
# Check whether row is valid
if row["qc.overallStatus"] != "good":
return
for col in ["aaSubstitutions", "aaDeletions"]:
ms = row[col]
if ms:
mutation_counts.update(ms.split(","))
def main(args):
logger.info(f"Filtering {args.gisaid_file_in}")
if not os.path.exists(args.gisaid_file_in):
raise OSError(
"Each user must independently request a data feed from gisaid.org"
)
os.makedirs("results", exist_ok=True)
db = NextcladeDB()
schedule = db.maybe_schedule if args.no_new else db.schedule
mutation_counts = Counter()
with open(args.gisaid_file_in, "rt") as f:
for i, line in enumerate(f):
seq = json.loads(line)["sequence"]
# Filter by length.
nchars = sum(seq.count(b) for b in "ACGT")
if args.min_nchars <= nchars <= args.max_nchars:
seq = seq.replace("\n", "")
schedule(seq, count_mutations, mutation_counts)
if i % args.log_every == 0:
print(".", end="", flush=True)
db.wait(log_every=args.log_every)
logger.info(f"saving {args.counts_file_out}")
with open(args.counts_file_out, "wb") as f:
pickle.dump(dict(mutation_counts), f)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run NextClade on all sequences")
parser.add_argument(
"--gisaid-file-in", default=os.path.expanduser("results/gisaid.json")
)
parser.add_argument("--counts-file-out", default="results/nextclade.counts.pkl")
parser.add_argument("--min-nchars", default=29000, type=int)
parser.add_argument("--max-nchars", default=31000, type=int)
parser.add_argument("--no-new", action="store_true")
parser.add_argument("-l", "--log-every", default=1000, type=int)
args = parser.parse_args()
main(args)