Skip to content

Commit

Permalink
Write parquet files in place of temporary JSON files. (#6)
Browse files Browse the repository at this point in the history
* Replace string dtypes to object dtypes.

* Add function attribute to indicate if parquet will be written to `write_output` function.

* Write parquet files directly in place of temporary JSON files.

* Add flag to ignore datatype conversion errors when casting integer columns with NaNs, and change output file name.

* Edit concat_outputs to simply move files instead for parquet files generated from json output.

* Edit file path from string concatenation to os path join.

* Minor edit to ps.path.join.

* Added if statement to check if file exists before attempting to delete temporary file.

* Add `.snappy` file extension to parquet files.

* Simplified file name to simply moving to directory instead.

* Simplify specifying columns by changing `schema.names` to `dtypes`.

* Parse strings of dictionary into dictionary with `json.loads` before loading into dataframe.

* Read in temporary parquet files, repartition and write back parquet files.

* Remove setting writing metadata file in parquet to False as it's the default function argument.

* Remove unused imports.

* Remove if condition to check for temp files before deleting them.
  • Loading branch information
srgk26 authored Feb 3, 2023
1 parent c582ed8 commit 1da768b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 76 deletions.
19 changes: 7 additions & 12 deletions abstar/core/abstar.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
from Bio import SeqIO

import dask.dataframe as dd
import dask.bag as db
import pandas as pd

from abutils.core.sequence import Sequence, read_json, read_csv
from abutils.utils import log
Expand Down Expand Up @@ -444,35 +442,32 @@ def concat_outputs(input_file, temp_output_file_dicts, output_dir, args):
ohandle = gzip.open(ofile + ".gz", 'wb')
else:
ohandle = open(ofile, 'wb')

with ohandle as out_file:
# JSON-formatted files don't have headers, so we don't worry about it
if output_type == 'json':
if output_type == 'json' and not args.parquet:
for temp_file in temp_files:
with open(temp_file, "rb") as f:
shutil.copyfileobj(f, out_file, length=16 * 1024**2) # Increasing buffer size to 16MB for faster transfer
# For file formats with headers, only keep headers from the first file
if output_type in ['imgt', 'tabular', 'airr']:
elif output_type in ['imgt', 'tabular', 'airr']:
for i, temp_file in enumerate(temp_files):
with open(temp_file, "rb") as f:
for j, line in enumerate(f):
if i == 0:
out_file.write(line)
elif j >= 1:
out_file.write(line)

if args.parquet:
logger.info("Converting concatenated output to parquet format")
pname = f"{oprefix}_from_{output_type}" # Specify from which output format the parquet file will be written with
pfile = os.path.join(output_subdir, pname)
dtypes = get_parquet_dtypes(output_type)

if output_type == "json":
meta = pd.DataFrame(columns=dtypes).astype(dtypes)
df = (
db.read_text(ofile, blocksize=2**28)
.map(json.loads)
.to_dataframe(meta=meta)
)
df.to_parquet(
df = dd.read_parquet(os.path.dirname(temp_files[0])) # Read in all parquet files in temp dir
df.repartition(partition_size="100MB").to_parquet(
pfile,
engine="pyarrow",
compression="snappy",
Expand Down Expand Up @@ -724,7 +719,7 @@ def run_abstar(seq_file, output_dir, log_dir, file_format, arg_dict):
failed_loghandle.write(ab.format_log())
# outputs = [outputs_dict[ot] for ot in sorted(args.output_type)]
# write_output(outputs, output_files)
output_file_dict = write_output(outputs_dict, output_dir, output_filename)
output_file_dict = write_output(outputs_dict, output_dir, output_filename, args.parquet)
# capture the log for all unsuccessful sequences
for vdj in assigner.unassigned:
unassigned_loghandle.write(vdj.format_log())
Expand Down
2 changes: 1 addition & 1 deletion abstar/core/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def run_abstar(sequence_file, output_directory, args):
if args.debug:
assigned_log += ab.format_log()
results = get_abstar_results(assigned, pretty=args.pretty, padding=args.padding, raw=args.raw)
write_output(results, output_file, args.output_type)
write_output(results, output_file, args.output_type, args.parquet)
# capture the log for all unsuccessful sequences
for vdj in unassigned:
unassigned_log += vdj.format_log()
Expand Down
138 changes: 75 additions & 63 deletions abstar/utils/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import os
import traceback
import uuid
import pandas as pd

from abutils.utils import log
from abstar.utils.parquet_schema import schema

from .cigar import make_cigar

Expand Down Expand Up @@ -546,67 +548,67 @@ def get_parquet_dtypes(output_format):
'var_muts_nt': 'object', 'var_muts_aa': 'object'}
elif output_format == "json":
dtypes = {
"seq_id": str,
"chain": str,
"v_gene": str,
"d_gene": str,
"j_gene": str,
"assigner_scores": str,
"vdj_assigner": str,
"isotype": str,
"seq_id": object,
"chain": object,
"v_gene": object,
"d_gene": object,
"j_gene": object,
"assigner_scores": object,
"vdj_assigner": object,
"isotype": object,
"isotype_score": int,
"isotype_alignment": str,
"nt_identity": str,
"aa_identity": str,
"isotype_alignment": object,
"nt_identity": object,
"aa_identity": object,
"junc_len": int,
"cdr3_len": int,
"vdj_nt": str,
"gapped_vdj_nt": str,
"fr1_nt": str,
"cdr1_nt": str,
"fr2_nt": str,
"cdr2_nt": str,
"fr3_nt": str,
"cdr3_nt": str,
"fr4_nt": str,
"vdj_germ_nt": str,
"gapped_vdj_germ_nt": str,
"junc_nt": str,
"region_len_nt": str,
"var_muts_nt": str,
"join_muts_nt": str,
"vdj_nt": object,
"gapped_vdj_nt": object,
"fr1_nt": object,
"cdr1_nt": object,
"fr2_nt": object,
"cdr2_nt": object,
"fr3_nt": object,
"cdr3_nt": object,
"fr4_nt": object,
"vdj_germ_nt": object,
"gapped_vdj_germ_nt": object,
"junc_nt": object,
"region_len_nt": object,
"var_muts_nt": object,
"join_muts_nt": object,
"mut_count_nt": int,
"vdj_aa": str,
"fr1_aa": str,
"cdr1_aa": str,
"fr2_aa": str,
"cdr2_aa": str,
"fr3_aa": str,
"cdr3_aa": str,
"fr4_aa": str,
"vdj_germ_aa": str,
"junc_aa": str,
"region_len_aa": str,
"var_muts_aa": str,
"join_muts_aa": str,
"region_muts_nt": str,
"region_muts_aa": str,
"prod": str,
"productivity_issues": str,
"junction_in_frame": str,
"raw_input": str,
"oriented_input": str,
"strand": str,
"germ_alignments_nt": str,
"exo_trimming": str,
"junc_nt_breakdown": str,
"germline_database": str,
"species": str,
"align_info": str,
"j_del": str,
"v_del": str,
"j_ins": str,
"v_ins": str,
"vdj_aa": object,
"fr1_aa": object,
"cdr1_aa": object,
"fr2_aa": object,
"cdr2_aa": object,
"fr3_aa": object,
"cdr3_aa": object,
"fr4_aa": object,
"vdj_germ_aa": object,
"junc_aa": object,
"region_len_aa": object,
"var_muts_aa": object,
"join_muts_aa": object,
"region_muts_nt": object,
"region_muts_aa": object,
"prod": object,
"productivity_issues": object,
"junction_in_frame": object,
"raw_input": object,
"oriented_input": object,
"strand": object,
"germ_alignments_nt": object,
"exo_trimming": object,
"junc_nt_breakdown": object,
"germline_database": object,
"species": object,
"align_info": object,
"j_del": object,
"v_del": object,
"j_ins": object,
"v_ins": object,
}
else:
dtypes = {}
Expand Down Expand Up @@ -642,15 +644,25 @@ def get_output(result, output_type):
# with open(outfile, 'w') as f:
# f.write('\n'.join(_outputs))

def write_output(output_dict, output_dir, output_prefix):
def write_output(output_dict, output_dir, output_prefix, write_parquet: bool):
output_file_dict = {}
for fmt in output_dict.keys():
subdir = os.path.join(output_dir, fmt)
output_name = output_prefix + get_output_suffix(fmt)
output_file = os.path.join(subdir, output_name)
with open(output_file, 'w') as f:
f.write('\n'.join(output_dict[fmt]))
f.write("\n")

if fmt == "json" and write_parquet:
output_name = output_prefix + ".snappy.parquet"
output_file = os.path.join(subdir, output_name)
dtypes = get_parquet_dtypes(fmt)
df = pd.DataFrame.from_records([json.loads(line) for line in output_dict[fmt]])
df = df.reindex(columns=dtypes).astype(dtypes)
df.to_parquet(output_file, engine="pyarrow", compression="snappy", schema=schema)
else:
output_name = output_prefix + get_output_suffix(fmt)
output_file = os.path.join(subdir, output_name)
with open(output_file, 'w') as f:
f.write('\n'.join(output_dict[fmt]))
f.write("\n")

output_file_dict[fmt] = output_file
return output_file_dict

Expand Down

0 comments on commit 1da768b

Please sign in to comment.