diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java index 5ede1a33165..ad4de74e139 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java @@ -1,24 +1,15 @@ package org.broadinstitute.hellbender.engine.spark.datasources; -import com.google.common.annotations.VisibleForTesting; import htsjdk.samtools.BamFileIoUtils; import htsjdk.samtools.SAMFileHeader; import htsjdk.samtools.SAMRecord; import htsjdk.samtools.cram.build.CramIO; -import htsjdk.samtools.cram.common.CramVersions; -import htsjdk.samtools.util.BlockCompressedStreamConstants; import org.apache.commons.collections4.iterators.IteratorIterable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.*; import org.apache.parquet.avro.AvroParquetOutputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -27,7 +18,6 @@ import org.bdgenomics.adam.models.RecordGroupDictionary; import org.bdgenomics.adam.models.SequenceDictionary; import org.bdgenomics.formats.avro.AlignmentRecord; -import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.gcs.BucketUtils; import org.broadinstitute.hellbender.utils.io.IOUtils; @@ -35,21 +25,13 @@ import org.broadinstitute.hellbender.utils.read.GATKReadToBDGAlignmentRecordConverter; import org.broadinstitute.hellbender.utils.read.HeaderlessSAMRecordCoordinateComparator; import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat; -import org.seqdoop.hadoop_bam.CRAMInputFormat; -import org.seqdoop.hadoop_bam.KeyIgnoringBAMOutputFormat; -import org.seqdoop.hadoop_bam.KeyIgnoringCRAMOutputFormat; -import org.seqdoop.hadoop_bam.SAMFormat; -import org.seqdoop.hadoop_bam.SAMRecordWritable; -import org.seqdoop.hadoop_bam.util.SAMOutputPreparer; +import org.seqdoop.hadoop_bam.*; +import org.seqdoop.hadoop_bam.util.SAMFileMerger; import scala.Tuple2; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; import java.util.Comparator; -import java.util.UUID; /** * ReadsSparkSink writes GATKReads to a file. This code lifts from the HadoopGenomics/Hadoop-BAM @@ -244,8 +226,9 @@ private static void writeReadsSingle( final SAMFileHeader header, final int numReducers) throws IOException { final JavaRDD sortedReads = sortReads(reads, header, numReducers); - saveAsShardedHadoopFiles(ctx, outputFile, referenceFile, samOutputFormat, sortedReads, header, false); - mergeHeaderlessBamShards(ctx, outputFile, samOutputFormat, header); + final String outputPartsDirectory = outputFile + ".parts"; + saveAsShardedHadoopFiles(ctx, outputPartsDirectory, referenceFile, samOutputFormat, sortedReads, header, false); + SAMFileMerger.mergeParts(outputPartsDirectory, outputFile, samOutputFormat, header); } private static JavaRDD sortReads(final JavaRDD reads, final SAMFileHeader header, final int numReducers) { @@ -301,81 +284,6 @@ private static void deleteHadoopFile(String fileToObliterate, Configuration conf pathToDelete.getFileSystem(conf).delete(pathToDelete, true); } - private static void mergeHeaderlessBamShards(final JavaSparkContext ctx, final String outputFile, final SAMFormat samOutputFormat, final SAMFileHeader header) throws IOException { - // At this point, the part files (part-r-00000, part-r-00001, etc) are in a directory named outputFile. - // Each part file is a BAM file with no header or terminating end-of-file marker (Hadoop-BAM does not add - // end-of-file markers), so to merge into a single BAM we concatenate the header with the part files and a - // terminating end-of-file marker. Note that this has the side effect of being ever-so-slightly less efficient - // than writing a BAM in one go because the last block of each file isn't completely full. - - final String outputParentDir = outputFile.substring(0, outputFile.lastIndexOf('/') + 1); - // First, check for the _SUCCESS file. - final String successFile = outputFile + "/_SUCCESS"; - final Path successPath = new Path(successFile); - final Configuration conf = ctx.hadoopConfiguration(); - - //it's important that we get the appropriate file system by requesting it from the path - final FileSystem fs = successPath.getFileSystem(conf); - if (!fs.exists(successPath)) { - throw new GATKException("unable to find " + successFile + " file"); - } - final Path outputPath = new Path(outputFile); - final Path tmpPath = new Path(outputParentDir + "tmp" + UUID.randomUUID()); - - fs.rename(outputPath, tmpPath); - fs.delete(outputPath, true); - - try (final OutputStream out = fs.create(outputPath)) { - new SAMOutputPreparer().prepareForRecords(out, samOutputFormat, header); // write the header - mergeInto(out, tmpPath, conf); - writeTerminatorBlock(out, samOutputFormat); - } - - fs.delete(tmpPath, true); - } - - //Terminate the aggregated output stream with an appropriate SAMOutputFormat-dependent terminator block - private static void writeTerminatorBlock(final OutputStream out, final SAMFormat samOutputFormat) throws IOException { - if (SAMFormat.CRAM == samOutputFormat) { - CramIO.issueEOF(CramVersions.DEFAULT_CRAM_VERSION, out); // terminate with CRAM EOF container - } - else { - out.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK); // add the BGZF terminator - } - } - - @VisibleForTesting - static void mergeInto(OutputStream out, Path directory, Configuration conf) throws IOException { - final FileSystem fs = directory.getFileSystem(conf); - final FileStatus[] parts = getBamFragments(directory, fs); - - if( parts.length == 0){ - throw new GATKException("Could not write bam file because no part files were found."); - } - - for (final FileStatus part : parts) { - try (final InputStream in = fs.open(part.getPath())) { - org.apache.hadoop.io.IOUtils.copyBytes(in, out, conf, false); - } - } - for (final FileStatus part : parts) { - fs.delete(part.getPath(), false); - } - } - - @VisibleForTesting - static FileStatus[] getBamFragments( final Path directory, final FileSystem fs ) throws IOException { - final FileStatus[] parts = fs.globStatus(new Path(directory, "part-r-[0-9][0-9][0-9][0-9][0-9]*")); - - // FileSystem.globStatus() has a known bug that causes it to not sort the array returned by - // name (despite claiming to): https://issues.apache.org/jira/browse/HADOOP-10798 - // Because of this bug, we do an explicit sort here to avoid assembling the bam fragments - // in the wrong order. - Arrays.sort(parts); - - return parts; - } - private static String makeFilePathAbsolute(String path){ if(BucketUtils.isCloudStorageUrl(path) || BucketUtils.isHadoopUrl(path) || BucketUtils.isFileUrl(path)){ return path; diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java index 7832dfa3d03..b0f853d62c6 100644 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java @@ -203,23 +203,4 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw Assert.assertEquals(observed.getCigar(), expected.getCigar(), "getCigar"); } } - - @Test(groups = "spark") - public void testGetBamFragments() throws IOException { - final Path fragmentDir = new Path(getToolTestDataDir(), "fragments_test"); - final FileSystem fs = fragmentDir.getFileSystem(new Configuration()); - - final FileStatus[] bamFragments = ReadsSparkSink.getBamFragments(fragmentDir, fs); - final List expectedFragmentNames = Arrays.asList("part-r-00000", "part-r-00001", "part-r-00002", "part-r-00003"); - - Assert.assertEquals(bamFragments.length, expectedFragmentNames.size(), "Wrong number of fragments returned by ReadsSparkSink.getBamFragments()"); - for ( int i = 0; i < bamFragments.length; ++i ) { - Assert.assertEquals(bamFragments[i].getPath().getName(), expectedFragmentNames.get(i), "Fragments are not in correct order"); - } - } - - @Test( expectedExceptions = GATKException.class, groups = "spark" ) - public void testMergeIntoThrowsIfNoPartFiles() throws IOException { - ReadsSparkSink.mergeInto(null, new Path(getToolTestDataDir() + "directoryWithNoPartFiles"), new Configuration()); - } }