diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java index d71a47404da..4c3b3def0e1 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java @@ -250,7 +250,7 @@ public void writeReads(final JavaSparkContext ctx, final String outputFile, Java reads, readsHeader, shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, getRecommendedNumReducers()); } catch (IOException e) { - throw new GATKException("unable to write bam: " + e); + throw new GATKException("unable to write bam: " + outputFile, e); } } diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java index c22cca2c890..b2f8e28b788 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java @@ -1,14 +1,21 @@ package org.broadinstitute.hellbender.engine.spark.datasources; +import com.google.common.annotations.VisibleForTesting; import htsjdk.samtools.BamFileIoUtils; import htsjdk.samtools.SAMFileHeader; import htsjdk.samtools.SAMRecord; import htsjdk.samtools.cram.build.CramIO; +import htsjdk.samtools.cram.common.CramVersions; +import htsjdk.samtools.util.BlockCompressedStreamConstants; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapreduce.*; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.parquet.avro.AvroParquetOutputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -17,6 +24,7 @@ import org.bdgenomics.adam.models.RecordGroupDictionary; import org.bdgenomics.adam.models.SequenceDictionary; import org.bdgenomics.formats.avro.AlignmentRecord; +import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.gcs.BucketUtils; import org.broadinstitute.hellbender.utils.io.IOUtils; @@ -25,18 +33,23 @@ import org.broadinstitute.hellbender.utils.read.HeaderlessSAMRecordCoordinateComparator; import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat; import org.seqdoop.hadoop_bam.*; -import org.seqdoop.hadoop_bam.util.SAMFileMerger; +import org.seqdoop.hadoop_bam.util.SAMOutputPreparer; import scala.Tuple2; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; import java.util.Comparator; +import java.util.UUID; /** * ReadsSparkSink writes GATKReads to a file. This code lifts from the HadoopGenomics/Hadoop-BAM * read writing code as well as from bigdatagenomics/adam. */ public final class ReadsSparkSink { + private static Logger logger = LogManager.getLogger(ReadsSparkSink.class); // Output format class for writing BAM files through saveAsNewAPIHadoopFile. Must be public. public static class SparkBAMOutputFormat extends KeyIgnoringBAMOutputFormat { @@ -140,7 +153,7 @@ public static void writeReads( String absoluteReferenceFile = referenceFile != null ? BucketUtils.makeFilePathAbsolute(referenceFile) : referenceFile; - setHadoopBAMConfigurationProperties(ctx, absoluteOutputFile, absoluteReferenceFile, format); + setHadoopBAMConfigurationProperties(ctx, absoluteOutputFile, absoluteReferenceFile); // The underlying reads are required to be in SAMRecord format in order to be // written out, so we convert them to SAMRecord explicitly here. If they're already @@ -225,9 +238,13 @@ private static void writeReadsSingle( final SAMFileHeader header, final int numReducers) throws IOException { final JavaRDD sortedReads = sortReads(reads, header, numReducers); - final String outputPartsDirectory = outputFile + ".parts"; - saveAsShardedHadoopFiles(ctx, outputPartsDirectory, referenceFile, samOutputFormat, sortedReads, header, false); - SAMFileMerger.mergeParts(outputPartsDirectory, outputFile, samOutputFormat, header); + final String tempDirectory = getTempDirectory(outputFile); + logger.info("Saving bam file as shards"); + saveAsShardedHadoopFiles(ctx, tempDirectory, referenceFile, samOutputFormat, sortedReads, header, false); + logger.info("Finished saving shards, beginning to combine shards into a single output file."); + mergeHeaderlessBamShards(ctx, tempDirectory, outputFile, header, samOutputFormat); + logger.info("Finished merging shards, deleting temporary files."); + deleteHadoopFile(tempDirectory, ctx.hadoopConfiguration()); } private static JavaRDD sortReads(final JavaRDD reads, final SAMFileHeader header, final int numReducers) { @@ -283,6 +300,88 @@ private static void deleteHadoopFile(String fileToObliterate, Configuration conf pathToDelete.getFileSystem(conf).delete(pathToDelete, true); } + private static void mergeHeaderlessBamShards(final JavaSparkContext ctx, String inputDirectory, final String outputFile, final SAMFileHeader header, final SAMFormat samOutputFormat) throws IOException { + // At this point, the part files (part-r-00000, part-r-00001, etc) are in a directory named outputFile. + // Each part file is a BAM file with no header or terminating end-of-file marker (Hadoop-BAM does not add + // end-of-file markers), so to merge into a single BAM we concatenate the header with the part files and a + // terminating end-of-file marker. Note that this has the side effect of being ever-so-slightly less efficient + // than writing a BAM in one go because the last block of each file isn't completely full. + + assertSuccessFileExists(ctx, inputDirectory); + + final Path tmpPath = new Path(inputDirectory); + final Path outputPath = new Path(outputFile); + FileSystem fs = outputPath.getFileSystem(ctx.hadoopConfiguration()); + + try (final OutputStream out = fs.create(outputPath)) { + new SAMOutputPreparer().prepareForRecords(out, samOutputFormat, header); // write the header + mergeInto(out, tmpPath, ctx.hadoopConfiguration()); + writeTerminatorBlock(out, samOutputFormat); + } + } + + private static String getTempDirectory(String outputFile) { + final String outputParentDir = outputFile.substring(0, outputFile.lastIndexOf('/') + 1); + return outputParentDir + "tmp" + UUID.randomUUID(); + } + + private static void assertSuccessFileExists(JavaSparkContext ctx, String outputFile) throws IOException { + // First, check for the _SUCCESS file. + final String successFile = outputFile + "/_SUCCESS"; + final Path successPath = new Path(successFile); + + //it's important that we get the appropriate file system by requesting it from the path + final FileSystem fs = successPath.getFileSystem(ctx.hadoopConfiguration()); + if (!fs.exists(successPath)) { + throw new GATKException("unable to find " + successFile + " file"); + } + } + + //Terminate the aggregated output stream with an appropriate SAMOutputFormat-dependent terminator block + private static void writeTerminatorBlock(final OutputStream out, final SAMFormat samOutputFormat) throws IOException { + if (SAMFormat.CRAM == samOutputFormat) { + CramIO.issueEOF(CramVersions.DEFAULT_CRAM_VERSION, out); // terminate with CRAM EOF container + } + else { + out.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK); // add the BGZF terminator + } + } + + @VisibleForTesting + static void mergeInto(OutputStream out, Path directory, Configuration conf) throws IOException { + final FileSystem fs = directory.getFileSystem(conf); + final FileStatus[] parts = getBamFragments(directory, fs); + + if( parts.length == 0){ + throw new GATKException("Could not write bam file because no part files were found."); + } + + logger.info("Found " + parts.length + " parts."); + int counter = 0; + for (final FileStatus part : parts) { + try (final InputStream in = fs.open(part.getPath())) { + org.apache.hadoop.io.IOUtils.copyBytes(in, out, conf, false); + } + counter++; + if( counter % 10 == 0 ) { + logger.info("Merged " + counter + "/" + parts.length + " parts."); + } + } + } + + @VisibleForTesting + static FileStatus[] getBamFragments( final Path directory, final FileSystem fs ) throws IOException { + final FileStatus[] parts = fs.globStatus(new Path(directory, "part-r-[0-9][0-9][0-9][0-9][0-9]*")); + + // FileSystem.globStatus() has a known bug that causes it to not sort the array returned by + // name (despite claiming to): https://issues.apache.org/jira/browse/HADOOP-10798 + // Because of this bug, we do an explicit sort here to avoid assembling the bam fragments + // in the wrong order. + Arrays.sort(parts); + + return parts; + } + /** * Propagate any values that need to be passed to Hadoop-BAM through configuration properties: * @@ -292,17 +391,11 @@ private static void deleteHadoopFile(String fileToObliterate, Configuration conf * from passing a stale value through to htsjdk when multiple calls are made serially * with different outputs but the same Spark context */ - private static void setHadoopBAMConfigurationProperties(final JavaSparkContext ctx, final String outputName, - final String referenceName, final ReadsWriteFormat format) { + private static void setHadoopBAMConfigurationProperties(final JavaSparkContext ctx, final String outputName, final String referenceName) { final Configuration conf = ctx.hadoopConfiguration(); if (!IOUtils.isCramFileName(outputName)) { // only set the reference for CRAM output conf.unset(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY); - if (format == ReadsWriteFormat.SINGLE && IOUtils.isBamFileName(outputName)) { - conf.setBoolean(BAMOutputFormat.WRITE_SPLITTING_BAI, true); - } else { - conf.setBoolean(BAMOutputFormat.WRITE_SPLITTING_BAI, false); - } } else { if (null == referenceName) { diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java index 63fed7e9b3e..a128f12e055 100644 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java @@ -14,13 +14,11 @@ import org.broadinstitute.hellbender.engine.spark.SparkContextFactory; import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.utils.gcs.BucketUtils; -import org.broadinstitute.hellbender.utils.io.IOUtils; import org.broadinstitute.hellbender.utils.read.GATKRead; import org.broadinstitute.hellbender.utils.read.ReadCoordinateComparator; import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat; import org.broadinstitute.hellbender.utils.test.BaseTest; import org.broadinstitute.hellbender.utils.test.MiniClusterUtils; -import org.seqdoop.hadoop_bam.SplittingBAMIndexer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -29,7 +27,6 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -125,11 +122,6 @@ private void assertSingleShardedWritingWorks(String inputBam, String referenceFi ReadsSparkSink.writeReads(ctx, outputPath, referenceFile, rddParallelReads, header, ReadsWriteFormat.SINGLE); - // check that a splitting bai file is created - if (IOUtils.isBamFileName(outputPath)) { - Assert.assertTrue(Files.exists(IOUtils.getPath(outputPath + SplittingBAMIndexer.OUTPUT_FILE_EXTENSION))); - } - JavaRDD rddParallelReads2 = readSource.getParallelReads(outputPath, referenceFile); final List writtenReads = rddParallelReads2.collect(); @@ -211,4 +203,23 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw Assert.assertEquals(observed.getCigar(), expected.getCigar(), "getCigar"); } } + + @Test(groups = "spark") + public void testGetBamFragments() throws IOException { + final Path fragmentDir = new Path(getToolTestDataDir(), "fragments_test"); + final FileSystem fs = fragmentDir.getFileSystem(new Configuration()); + + final FileStatus[] bamFragments = ReadsSparkSink.getBamFragments(fragmentDir, fs); + final List expectedFragmentNames = Arrays.asList("part-r-00000", "part-r-00001", "part-r-00002", "part-r-00003"); + + Assert.assertEquals(bamFragments.length, expectedFragmentNames.size(), "Wrong number of fragments returned by ReadsSparkSink.getBamFragments()"); + for ( int i = 0; i < bamFragments.length; ++i ) { + Assert.assertEquals(bamFragments[i].getPath().getName(), expectedFragmentNames.get(i), "Fragments are not in correct order"); + } + } + + @Test( expectedExceptions = GATKException.class, groups = "spark" ) + public void testMergeIntoThrowsIfNoPartFiles() throws IOException { + ReadsSparkSink.mergeInto(null, new Path(getToolTestDataDir() + "directoryWithNoPartFiles"), new Configuration()); + } }