From 0ebd4e2df8d658843ae6ab1baf09cb8e471e6075 Mon Sep 17 00:00:00 2001 From: Tom White Date: Fri, 9 Dec 2016 10:08:11 +0000 Subject: [PATCH 1/3] Revert "Use SAMFileMerger from Hadoop-BAM." This reverts commit e98fe09cc9822b2d57b3d2c55a5160fab87a59fc. --- .../spark/datasources/ReadsSparkSink.java | 98 ++++++++++++++++++- .../datasources/ReadsSparkSinkUnitTest.java | 19 ++++ 2 files changed, 112 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java index c22cca2c890..a8c8222cdbf 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java @@ -1,14 +1,23 @@ package org.broadinstitute.hellbender.engine.spark.datasources; +import com.google.common.annotations.VisibleForTesting; import htsjdk.samtools.BamFileIoUtils; import htsjdk.samtools.SAMFileHeader; import htsjdk.samtools.SAMRecord; import htsjdk.samtools.cram.build.CramIO; +import htsjdk.samtools.cram.common.CramVersions; +import htsjdk.samtools.util.BlockCompressedStreamConstants; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.avro.AvroParquetOutputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -17,6 +26,7 @@ import org.bdgenomics.adam.models.RecordGroupDictionary; import org.bdgenomics.adam.models.SequenceDictionary; import org.bdgenomics.formats.avro.AlignmentRecord; +import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.gcs.BucketUtils; import org.broadinstitute.hellbender.utils.io.IOUtils; @@ -25,12 +35,16 @@ import org.broadinstitute.hellbender.utils.read.HeaderlessSAMRecordCoordinateComparator; import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat; import org.seqdoop.hadoop_bam.*; -import org.seqdoop.hadoop_bam.util.SAMFileMerger; +import org.seqdoop.hadoop_bam.util.SAMOutputPreparer; import scala.Tuple2; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; import java.util.Comparator; +import java.util.UUID; /** * ReadsSparkSink writes GATKReads to a file. This code lifts from the HadoopGenomics/Hadoop-BAM @@ -225,9 +239,8 @@ private static void writeReadsSingle( final SAMFileHeader header, final int numReducers) throws IOException { final JavaRDD sortedReads = sortReads(reads, header, numReducers); - final String outputPartsDirectory = outputFile + ".parts"; - saveAsShardedHadoopFiles(ctx, outputPartsDirectory, referenceFile, samOutputFormat, sortedReads, header, false); - SAMFileMerger.mergeParts(outputPartsDirectory, outputFile, samOutputFormat, header); + saveAsShardedHadoopFiles(ctx, outputFile, referenceFile, samOutputFormat, sortedReads, header, false); + mergeHeaderlessBamShards(ctx, outputFile, samOutputFormat, header); } private static JavaRDD sortReads(final JavaRDD reads, final SAMFileHeader header, final int numReducers) { @@ -283,6 +296,81 @@ private static void deleteHadoopFile(String fileToObliterate, Configuration conf pathToDelete.getFileSystem(conf).delete(pathToDelete, true); } + private static void mergeHeaderlessBamShards(final JavaSparkContext ctx, final String outputFile, final SAMFormat samOutputFormat, final SAMFileHeader header) throws IOException { + // At this point, the part files (part-r-00000, part-r-00001, etc) are in a directory named outputFile. + // Each part file is a BAM file with no header or terminating end-of-file marker (Hadoop-BAM does not add + // end-of-file markers), so to merge into a single BAM we concatenate the header with the part files and a + // terminating end-of-file marker. Note that this has the side effect of being ever-so-slightly less efficient + // than writing a BAM in one go because the last block of each file isn't completely full. + + final String outputParentDir = outputFile.substring(0, outputFile.lastIndexOf('/') + 1); + // First, check for the _SUCCESS file. + final String successFile = outputFile + "/_SUCCESS"; + final Path successPath = new Path(successFile); + final Configuration conf = ctx.hadoopConfiguration(); + + //it's important that we get the appropriate file system by requesting it from the path + final FileSystem fs = successPath.getFileSystem(conf); + if (!fs.exists(successPath)) { + throw new GATKException("unable to find " + successFile + " file"); + } + final Path outputPath = new Path(outputFile); + final Path tmpPath = new Path(outputParentDir + "tmp" + UUID.randomUUID()); + + fs.rename(outputPath, tmpPath); + fs.delete(outputPath, true); + + try (final OutputStream out = fs.create(outputPath)) { + new SAMOutputPreparer().prepareForRecords(out, samOutputFormat, header); // write the header + mergeInto(out, tmpPath, conf); + writeTerminatorBlock(out, samOutputFormat); + } + + fs.delete(tmpPath, true); + } + + //Terminate the aggregated output stream with an appropriate SAMOutputFormat-dependent terminator block + private static void writeTerminatorBlock(final OutputStream out, final SAMFormat samOutputFormat) throws IOException { + if (SAMFormat.CRAM == samOutputFormat) { + CramIO.issueEOF(CramVersions.DEFAULT_CRAM_VERSION, out); // terminate with CRAM EOF container + } + else { + out.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK); // add the BGZF terminator + } + } + + @VisibleForTesting + static void mergeInto(OutputStream out, Path directory, Configuration conf) throws IOException { + final FileSystem fs = directory.getFileSystem(conf); + final FileStatus[] parts = getBamFragments(directory, fs); + + if( parts.length == 0){ + throw new GATKException("Could not write bam file because no part files were found."); + } + + for (final FileStatus part : parts) { + try (final InputStream in = fs.open(part.getPath())) { + org.apache.hadoop.io.IOUtils.copyBytes(in, out, conf, false); + } + } + for (final FileStatus part : parts) { + fs.delete(part.getPath(), false); + } + } + + @VisibleForTesting + static FileStatus[] getBamFragments( final Path directory, final FileSystem fs ) throws IOException { + final FileStatus[] parts = fs.globStatus(new Path(directory, "part-r-[0-9][0-9][0-9][0-9][0-9]*")); + + // FileSystem.globStatus() has a known bug that causes it to not sort the array returned by + // name (despite claiming to): https://issues.apache.org/jira/browse/HADOOP-10798 + // Because of this bug, we do an explicit sort here to avoid assembling the bam fragments + // in the wrong order. + Arrays.sort(parts); + + return parts; + } + /** * Propagate any values that need to be passed to Hadoop-BAM through configuration properties: * diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java index 63fed7e9b3e..206013d550d 100644 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java @@ -211,4 +211,23 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw Assert.assertEquals(observed.getCigar(), expected.getCigar(), "getCigar"); } } + + @Test(groups = "spark") + public void testGetBamFragments() throws IOException { + final Path fragmentDir = new Path(getToolTestDataDir(), "fragments_test"); + final FileSystem fs = fragmentDir.getFileSystem(new Configuration()); + + final FileStatus[] bamFragments = ReadsSparkSink.getBamFragments(fragmentDir, fs); + final List expectedFragmentNames = Arrays.asList("part-r-00000", "part-r-00001", "part-r-00002", "part-r-00003"); + + Assert.assertEquals(bamFragments.length, expectedFragmentNames.size(), "Wrong number of fragments returned by ReadsSparkSink.getBamFragments()"); + for ( int i = 0; i < bamFragments.length; ++i ) { + Assert.assertEquals(bamFragments[i].getPath().getName(), expectedFragmentNames.get(i), "Fragments are not in correct order"); + } + } + + @Test( expectedExceptions = GATKException.class, groups = "spark" ) + public void testMergeIntoThrowsIfNoPartFiles() throws IOException { + ReadsSparkSink.mergeInto(null, new Path(getToolTestDataDir() + "directoryWithNoPartFiles"), new Configuration()); + } } From 40d3684813375b3008b799473629832837087b28 Mon Sep 17 00:00:00 2001 From: Tom White Date: Fri, 9 Dec 2016 10:36:51 +0000 Subject: [PATCH 2/3] Revert "Write splitting-bai files when writing (non-sharded) BAM files from Spark. (#2169)" This reverts commit a30af5ab06c8b1a888f1985b7d85635314af0199. --- .../engine/spark/datasources/ReadsSparkSink.java | 10 ++-------- .../spark/datasources/ReadsSparkSinkUnitTest.java | 8 -------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java index a8c8222cdbf..e0d2a6134b5 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java @@ -154,7 +154,7 @@ public static void writeReads( String absoluteReferenceFile = referenceFile != null ? BucketUtils.makeFilePathAbsolute(referenceFile) : referenceFile; - setHadoopBAMConfigurationProperties(ctx, absoluteOutputFile, absoluteReferenceFile, format); + setHadoopBAMConfigurationProperties(ctx, absoluteOutputFile, absoluteReferenceFile); // The underlying reads are required to be in SAMRecord format in order to be // written out, so we convert them to SAMRecord explicitly here. If they're already @@ -380,17 +380,11 @@ static FileStatus[] getBamFragments( final Path directory, final FileSystem fs ) * from passing a stale value through to htsjdk when multiple calls are made serially * with different outputs but the same Spark context */ - private static void setHadoopBAMConfigurationProperties(final JavaSparkContext ctx, final String outputName, - final String referenceName, final ReadsWriteFormat format) { + private static void setHadoopBAMConfigurationProperties(final JavaSparkContext ctx, final String outputName, final String referenceName) { final Configuration conf = ctx.hadoopConfiguration(); if (!IOUtils.isCramFileName(outputName)) { // only set the reference for CRAM output conf.unset(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY); - if (format == ReadsWriteFormat.SINGLE && IOUtils.isBamFileName(outputName)) { - conf.setBoolean(BAMOutputFormat.WRITE_SPLITTING_BAI, true); - } else { - conf.setBoolean(BAMOutputFormat.WRITE_SPLITTING_BAI, false); - } } else { if (null == referenceName) { diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java index 206013d550d..a128f12e055 100644 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java @@ -14,13 +14,11 @@ import org.broadinstitute.hellbender.engine.spark.SparkContextFactory; import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.utils.gcs.BucketUtils; -import org.broadinstitute.hellbender.utils.io.IOUtils; import org.broadinstitute.hellbender.utils.read.GATKRead; import org.broadinstitute.hellbender.utils.read.ReadCoordinateComparator; import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat; import org.broadinstitute.hellbender.utils.test.BaseTest; import org.broadinstitute.hellbender.utils.test.MiniClusterUtils; -import org.seqdoop.hadoop_bam.SplittingBAMIndexer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -29,7 +27,6 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -125,11 +122,6 @@ private void assertSingleShardedWritingWorks(String inputBam, String referenceFi ReadsSparkSink.writeReads(ctx, outputPath, referenceFile, rddParallelReads, header, ReadsWriteFormat.SINGLE); - // check that a splitting bai file is created - if (IOUtils.isBamFileName(outputPath)) { - Assert.assertTrue(Files.exists(IOUtils.getPath(outputPath + SplittingBAMIndexer.OUTPUT_FILE_EXTENSION))); - } - JavaRDD rddParallelReads2 = readSource.getParallelReads(outputPath, referenceFile); final List writtenReads = rddParallelReads2.collect(); From e78330b3b05afefaf2df2ff20d172009c51a3aaa Mon Sep 17 00:00:00 2001 From: Louis Bergelson Date: Tue, 10 Jan 2017 11:39:07 -0500 Subject: [PATCH 3/3] changing the way writing a single bam file works it seems like the issues related to some sort of race condition with caching and deleting/renaming files in the gcs hdfs adaptor changing the behavior so that temporary files are first written to a temporary directory and then concatenated into the final file previously it wrote them to a directory with the same name as the final file, renamed that, and then copied them into the renamed file improving exception message when failing to write a bam on spark --- .../engine/spark/GATKSparkTool.java | 2 +- .../spark/datasources/ReadsSparkSink.java | 63 +++++++++++-------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java index d71a47404da..4c3b3def0e1 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java @@ -250,7 +250,7 @@ public void writeReads(final JavaSparkContext ctx, final String outputFile, Java reads, readsHeader, shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, getRecommendedNumReducers()); } catch (IOException e) { - throw new GATKException("unable to write bam: " + e); + throw new GATKException("unable to write bam: " + outputFile, e); } } diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java index e0d2a6134b5..b2f8e28b788 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSink.java @@ -13,11 +13,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.*; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.parquet.avro.AvroParquetOutputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -51,6 +49,7 @@ * read writing code as well as from bigdatagenomics/adam. */ public final class ReadsSparkSink { + private static Logger logger = LogManager.getLogger(ReadsSparkSink.class); // Output format class for writing BAM files through saveAsNewAPIHadoopFile. Must be public. public static class SparkBAMOutputFormat extends KeyIgnoringBAMOutputFormat { @@ -239,8 +238,13 @@ private static void writeReadsSingle( final SAMFileHeader header, final int numReducers) throws IOException { final JavaRDD sortedReads = sortReads(reads, header, numReducers); - saveAsShardedHadoopFiles(ctx, outputFile, referenceFile, samOutputFormat, sortedReads, header, false); - mergeHeaderlessBamShards(ctx, outputFile, samOutputFormat, header); + final String tempDirectory = getTempDirectory(outputFile); + logger.info("Saving bam file as shards"); + saveAsShardedHadoopFiles(ctx, tempDirectory, referenceFile, samOutputFormat, sortedReads, header, false); + logger.info("Finished saving shards, beginning to combine shards into a single output file."); + mergeHeaderlessBamShards(ctx, tempDirectory, outputFile, header, samOutputFormat); + logger.info("Finished merging shards, deleting temporary files."); + deleteHadoopFile(tempDirectory, ctx.hadoopConfiguration()); } private static JavaRDD sortReads(final JavaRDD reads, final SAMFileHeader header, final int numReducers) { @@ -296,37 +300,41 @@ private static void deleteHadoopFile(String fileToObliterate, Configuration conf pathToDelete.getFileSystem(conf).delete(pathToDelete, true); } - private static void mergeHeaderlessBamShards(final JavaSparkContext ctx, final String outputFile, final SAMFormat samOutputFormat, final SAMFileHeader header) throws IOException { + private static void mergeHeaderlessBamShards(final JavaSparkContext ctx, String inputDirectory, final String outputFile, final SAMFileHeader header, final SAMFormat samOutputFormat) throws IOException { // At this point, the part files (part-r-00000, part-r-00001, etc) are in a directory named outputFile. // Each part file is a BAM file with no header or terminating end-of-file marker (Hadoop-BAM does not add // end-of-file markers), so to merge into a single BAM we concatenate the header with the part files and a // terminating end-of-file marker. Note that this has the side effect of being ever-so-slightly less efficient // than writing a BAM in one go because the last block of each file isn't completely full. + assertSuccessFileExists(ctx, inputDirectory); + + final Path tmpPath = new Path(inputDirectory); + final Path outputPath = new Path(outputFile); + FileSystem fs = outputPath.getFileSystem(ctx.hadoopConfiguration()); + + try (final OutputStream out = fs.create(outputPath)) { + new SAMOutputPreparer().prepareForRecords(out, samOutputFormat, header); // write the header + mergeInto(out, tmpPath, ctx.hadoopConfiguration()); + writeTerminatorBlock(out, samOutputFormat); + } + } + + private static String getTempDirectory(String outputFile) { final String outputParentDir = outputFile.substring(0, outputFile.lastIndexOf('/') + 1); + return outputParentDir + "tmp" + UUID.randomUUID(); + } + + private static void assertSuccessFileExists(JavaSparkContext ctx, String outputFile) throws IOException { // First, check for the _SUCCESS file. final String successFile = outputFile + "/_SUCCESS"; final Path successPath = new Path(successFile); - final Configuration conf = ctx.hadoopConfiguration(); //it's important that we get the appropriate file system by requesting it from the path - final FileSystem fs = successPath.getFileSystem(conf); + final FileSystem fs = successPath.getFileSystem(ctx.hadoopConfiguration()); if (!fs.exists(successPath)) { throw new GATKException("unable to find " + successFile + " file"); } - final Path outputPath = new Path(outputFile); - final Path tmpPath = new Path(outputParentDir + "tmp" + UUID.randomUUID()); - - fs.rename(outputPath, tmpPath); - fs.delete(outputPath, true); - - try (final OutputStream out = fs.create(outputPath)) { - new SAMOutputPreparer().prepareForRecords(out, samOutputFormat, header); // write the header - mergeInto(out, tmpPath, conf); - writeTerminatorBlock(out, samOutputFormat); - } - - fs.delete(tmpPath, true); } //Terminate the aggregated output stream with an appropriate SAMOutputFormat-dependent terminator block @@ -348,13 +356,16 @@ static void mergeInto(OutputStream out, Path directory, Configuration conf) thro throw new GATKException("Could not write bam file because no part files were found."); } + logger.info("Found " + parts.length + " parts."); + int counter = 0; for (final FileStatus part : parts) { try (final InputStream in = fs.open(part.getPath())) { org.apache.hadoop.io.IOUtils.copyBytes(in, out, conf, false); } - } - for (final FileStatus part : parts) { - fs.delete(part.getPath(), false); + counter++; + if( counter % 10 == 0 ) { + logger.info("Merged " + counter + "/" + parts.length + " parts."); + } } }