Skip to content

Commit

Permalink
Use SAMFileMerger from Hadoop-BAM.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Sep 9, 2016
1 parent e0fe8ad commit e98fe09
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 117 deletions.
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
package org.broadinstitute.hellbender.engine.spark.datasources;

import com.google.common.annotations.VisibleForTesting;
import htsjdk.samtools.BamFileIoUtils;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.cram.build.CramIO;
import htsjdk.samtools.cram.common.CramVersions;
import htsjdk.samtools.util.BlockCompressedStreamConstants;
import org.apache.commons.collections4.iterators.IteratorIterable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.*;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -27,29 +18,20 @@
import org.bdgenomics.adam.models.RecordGroupDictionary;
import org.bdgenomics.adam.models.SequenceDictionary;
import org.bdgenomics.formats.avro.AlignmentRecord;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.GATKReadToBDGAlignmentRecordConverter;
import org.broadinstitute.hellbender.utils.read.HeaderlessSAMRecordCoordinateComparator;
import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat;
import org.seqdoop.hadoop_bam.CRAMInputFormat;
import org.seqdoop.hadoop_bam.KeyIgnoringBAMOutputFormat;
import org.seqdoop.hadoop_bam.KeyIgnoringCRAMOutputFormat;
import org.seqdoop.hadoop_bam.SAMFormat;
import org.seqdoop.hadoop_bam.SAMRecordWritable;
import org.seqdoop.hadoop_bam.util.SAMOutputPreparer;
import org.seqdoop.hadoop_bam.*;
import org.seqdoop.hadoop_bam.util.SAMFileMerger;
import scala.Tuple2;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.UUID;

/**
* ReadsSparkSink writes GATKReads to a file. This code lifts from the HadoopGenomics/Hadoop-BAM
Expand Down Expand Up @@ -244,8 +226,9 @@ private static void writeReadsSingle(
final SAMFileHeader header, final int numReducers) throws IOException {

final JavaRDD<SAMRecord> sortedReads = sortReads(reads, header, numReducers);
saveAsShardedHadoopFiles(ctx, outputFile, referenceFile, samOutputFormat, sortedReads, header, false);
mergeHeaderlessBamShards(ctx, outputFile, samOutputFormat, header);
final String outputPartsDirectory = outputFile + ".parts";
saveAsShardedHadoopFiles(ctx, outputPartsDirectory, referenceFile, samOutputFormat, sortedReads, header, false);
SAMFileMerger.mergeParts(outputPartsDirectory, outputFile, samOutputFormat, header);
}

private static JavaRDD<SAMRecord> sortReads(final JavaRDD<SAMRecord> reads, final SAMFileHeader header, final int numReducers) {
Expand Down Expand Up @@ -301,81 +284,6 @@ private static void deleteHadoopFile(String fileToObliterate, Configuration conf
pathToDelete.getFileSystem(conf).delete(pathToDelete, true);
}

private static void mergeHeaderlessBamShards(final JavaSparkContext ctx, final String outputFile, final SAMFormat samOutputFormat, final SAMFileHeader header) throws IOException {
// At this point, the part files (part-r-00000, part-r-00001, etc) are in a directory named outputFile.
// Each part file is a BAM file with no header or terminating end-of-file marker (Hadoop-BAM does not add
// end-of-file markers), so to merge into a single BAM we concatenate the header with the part files and a
// terminating end-of-file marker. Note that this has the side effect of being ever-so-slightly less efficient
// than writing a BAM in one go because the last block of each file isn't completely full.

final String outputParentDir = outputFile.substring(0, outputFile.lastIndexOf('/') + 1);
// First, check for the _SUCCESS file.
final String successFile = outputFile + "/_SUCCESS";
final Path successPath = new Path(successFile);
final Configuration conf = ctx.hadoopConfiguration();

//it's important that we get the appropriate file system by requesting it from the path
final FileSystem fs = successPath.getFileSystem(conf);
if (!fs.exists(successPath)) {
throw new GATKException("unable to find " + successFile + " file");
}
final Path outputPath = new Path(outputFile);
final Path tmpPath = new Path(outputParentDir + "tmp" + UUID.randomUUID());

fs.rename(outputPath, tmpPath);
fs.delete(outputPath, true);

try (final OutputStream out = fs.create(outputPath)) {
new SAMOutputPreparer().prepareForRecords(out, samOutputFormat, header); // write the header
mergeInto(out, tmpPath, conf);
writeTerminatorBlock(out, samOutputFormat);
}

fs.delete(tmpPath, true);
}

//Terminate the aggregated output stream with an appropriate SAMOutputFormat-dependent terminator block
private static void writeTerminatorBlock(final OutputStream out, final SAMFormat samOutputFormat) throws IOException {
if (SAMFormat.CRAM == samOutputFormat) {
CramIO.issueEOF(CramVersions.DEFAULT_CRAM_VERSION, out); // terminate with CRAM EOF container
}
else {
out.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK); // add the BGZF terminator
}
}

@VisibleForTesting
static void mergeInto(OutputStream out, Path directory, Configuration conf) throws IOException {
final FileSystem fs = directory.getFileSystem(conf);
final FileStatus[] parts = getBamFragments(directory, fs);

if( parts.length == 0){
throw new GATKException("Could not write bam file because no part files were found.");
}

for (final FileStatus part : parts) {
try (final InputStream in = fs.open(part.getPath())) {
org.apache.hadoop.io.IOUtils.copyBytes(in, out, conf, false);
}
}
for (final FileStatus part : parts) {
fs.delete(part.getPath(), false);
}
}

@VisibleForTesting
static FileStatus[] getBamFragments( final Path directory, final FileSystem fs ) throws IOException {
final FileStatus[] parts = fs.globStatus(new Path(directory, "part-r-[0-9][0-9][0-9][0-9][0-9]*"));

// FileSystem.globStatus() has a known bug that causes it to not sort the array returned by
// name (despite claiming to): https://issues.apache.org/jira/browse/HADOOP-10798
// Because of this bug, we do an explicit sort here to avoid assembling the bam fragments
// in the wrong order.
Arrays.sort(parts);

return parts;
}

private static String makeFilePathAbsolute(String path){
if(BucketUtils.isCloudStorageUrl(path) || BucketUtils.isHadoopUrl(path) || BucketUtils.isFileUrl(path)){
return path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,4 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw
Assert.assertEquals(observed.getCigar(), expected.getCigar(), "getCigar");
}
}

@Test(groups = "spark")
public void testGetBamFragments() throws IOException {
final Path fragmentDir = new Path(getToolTestDataDir(), "fragments_test");
final FileSystem fs = fragmentDir.getFileSystem(new Configuration());

final FileStatus[] bamFragments = ReadsSparkSink.getBamFragments(fragmentDir, fs);
final List<String> expectedFragmentNames = Arrays.asList("part-r-00000", "part-r-00001", "part-r-00002", "part-r-00003");

Assert.assertEquals(bamFragments.length, expectedFragmentNames.size(), "Wrong number of fragments returned by ReadsSparkSink.getBamFragments()");
for ( int i = 0; i < bamFragments.length; ++i ) {
Assert.assertEquals(bamFragments[i].getPath().getName(), expectedFragmentNames.get(i), "Fragments are not in correct order");
}
}

@Test( expectedExceptions = GATKException.class, groups = "spark" )
public void testMergeIntoThrowsIfNoPartFiles() throws IOException {
ReadsSparkSink.mergeInto(null, new Path(getToolTestDataDir() + "directoryWithNoPartFiles"), new Configuration());
}
}

0 comments on commit e98fe09

Please sign in to comment.