Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert nio sam file merger #2333

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void writeReads(final JavaSparkContext ctx, final String outputFile, Java
reads, readsHeader, shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE,
getRecommendedNumReducers());
} catch (IOException e) {
throw new GATKException("unable to write bam: " + e);
throw new GATKException("unable to write bam: " + outputFile, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package org.broadinstitute.hellbender.engine.spark.datasources;

import com.google.common.annotations.VisibleForTesting;
import htsjdk.samtools.BamFileIoUtils;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.cram.build.CramIO;
import htsjdk.samtools.cram.common.CramVersions;
import htsjdk.samtools.util.BlockCompressedStreamConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -17,6 +24,7 @@
import org.bdgenomics.adam.models.RecordGroupDictionary;
import org.bdgenomics.adam.models.SequenceDictionary;
import org.bdgenomics.formats.avro.AlignmentRecord;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
Expand All @@ -25,18 +33,23 @@
import org.broadinstitute.hellbender.utils.read.HeaderlessSAMRecordCoordinateComparator;
import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat;
import org.seqdoop.hadoop_bam.*;
import org.seqdoop.hadoop_bam.util.SAMFileMerger;
import org.seqdoop.hadoop_bam.util.SAMOutputPreparer;
import scala.Tuple2;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.UUID;

/**
* ReadsSparkSink writes GATKReads to a file. This code lifts from the HadoopGenomics/Hadoop-BAM
* read writing code as well as from bigdatagenomics/adam.
*/
public final class ReadsSparkSink {
private static Logger logger = LogManager.getLogger(ReadsSparkSink.class);

// Output format class for writing BAM files through saveAsNewAPIHadoopFile. Must be public.
public static class SparkBAMOutputFormat extends KeyIgnoringBAMOutputFormat<NullWritable> {
Expand Down Expand Up @@ -140,7 +153,7 @@ public static void writeReads(
String absoluteReferenceFile = referenceFile != null ?
BucketUtils.makeFilePathAbsolute(referenceFile) :
referenceFile;
setHadoopBAMConfigurationProperties(ctx, absoluteOutputFile, absoluteReferenceFile, format);
setHadoopBAMConfigurationProperties(ctx, absoluteOutputFile, absoluteReferenceFile);

// The underlying reads are required to be in SAMRecord format in order to be
// written out, so we convert them to SAMRecord explicitly here. If they're already
Expand Down Expand Up @@ -225,9 +238,13 @@ private static void writeReadsSingle(
final SAMFileHeader header, final int numReducers) throws IOException {

final JavaRDD<SAMRecord> sortedReads = sortReads(reads, header, numReducers);
final String outputPartsDirectory = outputFile + ".parts";
saveAsShardedHadoopFiles(ctx, outputPartsDirectory, referenceFile, samOutputFormat, sortedReads, header, false);
SAMFileMerger.mergeParts(outputPartsDirectory, outputFile, samOutputFormat, header);
final String tempDirectory = getTempDirectory(outputFile);
logger.info("Saving bam file as shards");
saveAsShardedHadoopFiles(ctx, tempDirectory, referenceFile, samOutputFormat, sortedReads, header, false);
logger.info("Finished saving shards, beginning to combine shards into a single output file.");
mergeHeaderlessBamShards(ctx, tempDirectory, outputFile, header, samOutputFormat);
logger.info("Finished merging shards, deleting temporary files.");
deleteHadoopFile(tempDirectory, ctx.hadoopConfiguration());
}

private static JavaRDD<SAMRecord> sortReads(final JavaRDD<SAMRecord> reads, final SAMFileHeader header, final int numReducers) {
Expand Down Expand Up @@ -283,6 +300,88 @@ private static void deleteHadoopFile(String fileToObliterate, Configuration conf
pathToDelete.getFileSystem(conf).delete(pathToDelete, true);
}

private static void mergeHeaderlessBamShards(final JavaSparkContext ctx, String inputDirectory, final String outputFile, final SAMFileHeader header, final SAMFormat samOutputFormat) throws IOException {
// At this point, the part files (part-r-00000, part-r-00001, etc) are in a directory named outputFile.
// Each part file is a BAM file with no header or terminating end-of-file marker (Hadoop-BAM does not add
// end-of-file markers), so to merge into a single BAM we concatenate the header with the part files and a
// terminating end-of-file marker. Note that this has the side effect of being ever-so-slightly less efficient
// than writing a BAM in one go because the last block of each file isn't completely full.

assertSuccessFileExists(ctx, inputDirectory);

final Path tmpPath = new Path(inputDirectory);
final Path outputPath = new Path(outputFile);
FileSystem fs = outputPath.getFileSystem(ctx.hadoopConfiguration());

try (final OutputStream out = fs.create(outputPath)) {
new SAMOutputPreparer().prepareForRecords(out, samOutputFormat, header); // write the header
mergeInto(out, tmpPath, ctx.hadoopConfiguration());
writeTerminatorBlock(out, samOutputFormat);
}
}

private static String getTempDirectory(String outputFile) {
final String outputParentDir = outputFile.substring(0, outputFile.lastIndexOf('/') + 1);
return outputParentDir + "tmp" + UUID.randomUUID();
}

private static void assertSuccessFileExists(JavaSparkContext ctx, String outputFile) throws IOException {
// First, check for the _SUCCESS file.
final String successFile = outputFile + "/_SUCCESS";
final Path successPath = new Path(successFile);

//it's important that we get the appropriate file system by requesting it from the path
final FileSystem fs = successPath.getFileSystem(ctx.hadoopConfiguration());
if (!fs.exists(successPath)) {
throw new GATKException("unable to find " + successFile + " file");
}
}

//Terminate the aggregated output stream with an appropriate SAMOutputFormat-dependent terminator block
private static void writeTerminatorBlock(final OutputStream out, final SAMFormat samOutputFormat) throws IOException {
if (SAMFormat.CRAM == samOutputFormat) {
CramIO.issueEOF(CramVersions.DEFAULT_CRAM_VERSION, out); // terminate with CRAM EOF container
}
else {
out.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK); // add the BGZF terminator
}
}

@VisibleForTesting
static void mergeInto(OutputStream out, Path directory, Configuration conf) throws IOException {
final FileSystem fs = directory.getFileSystem(conf);
final FileStatus[] parts = getBamFragments(directory, fs);

if( parts.length == 0){
throw new GATKException("Could not write bam file because no part files were found.");
}

logger.info("Found " + parts.length + " parts.");
int counter = 0;
for (final FileStatus part : parts) {
try (final InputStream in = fs.open(part.getPath())) {
org.apache.hadoop.io.IOUtils.copyBytes(in, out, conf, false);
}
counter++;
if( counter % 10 == 0 ) {
logger.info("Merged " + counter + "/" + parts.length + " parts.");
}
}
}

@VisibleForTesting
static FileStatus[] getBamFragments( final Path directory, final FileSystem fs ) throws IOException {
final FileStatus[] parts = fs.globStatus(new Path(directory, "part-r-[0-9][0-9][0-9][0-9][0-9]*"));

// FileSystem.globStatus() has a known bug that causes it to not sort the array returned by
// name (despite claiming to): https://issues.apache.org/jira/browse/HADOOP-10798
// Because of this bug, we do an explicit sort here to avoid assembling the bam fragments
// in the wrong order.
Arrays.sort(parts);

return parts;
}

/**
* Propagate any values that need to be passed to Hadoop-BAM through configuration properties:
*
Expand All @@ -292,17 +391,11 @@ private static void deleteHadoopFile(String fileToObliterate, Configuration conf
* from passing a stale value through to htsjdk when multiple calls are made serially
* with different outputs but the same Spark context
*/
private static void setHadoopBAMConfigurationProperties(final JavaSparkContext ctx, final String outputName,
final String referenceName, final ReadsWriteFormat format) {
private static void setHadoopBAMConfigurationProperties(final JavaSparkContext ctx, final String outputName, final String referenceName) {
final Configuration conf = ctx.hadoopConfiguration();

if (!IOUtils.isCramFileName(outputName)) { // only set the reference for CRAM output
conf.unset(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY);
if (format == ReadsWriteFormat.SINGLE && IOUtils.isBamFileName(outputName)) {
conf.setBoolean(BAMOutputFormat.WRITE_SPLITTING_BAI, true);
} else {
conf.setBoolean(BAMOutputFormat.WRITE_SPLITTING_BAI, false);
}
}
else {
if (null == referenceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
import org.broadinstitute.hellbender.engine.spark.SparkContextFactory;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadCoordinateComparator;
import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat;
import org.broadinstitute.hellbender.utils.test.BaseTest;
import org.broadinstitute.hellbender.utils.test.MiniClusterUtils;
import org.seqdoop.hadoop_bam.SplittingBAMIndexer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -29,7 +27,6 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
Expand Down Expand Up @@ -125,11 +122,6 @@ private void assertSingleShardedWritingWorks(String inputBam, String referenceFi

ReadsSparkSink.writeReads(ctx, outputPath, referenceFile, rddParallelReads, header, ReadsWriteFormat.SINGLE);

// check that a splitting bai file is created
if (IOUtils.isBamFileName(outputPath)) {
Assert.assertTrue(Files.exists(IOUtils.getPath(outputPath + SplittingBAMIndexer.OUTPUT_FILE_EXTENSION)));
}

JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputPath, referenceFile);
final List<GATKRead> writtenReads = rddParallelReads2.collect();

Expand Down Expand Up @@ -211,4 +203,23 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw
Assert.assertEquals(observed.getCigar(), expected.getCigar(), "getCigar");
}
}

@Test(groups = "spark")
public void testGetBamFragments() throws IOException {
final Path fragmentDir = new Path(getToolTestDataDir(), "fragments_test");
final FileSystem fs = fragmentDir.getFileSystem(new Configuration());

final FileStatus[] bamFragments = ReadsSparkSink.getBamFragments(fragmentDir, fs);
final List<String> expectedFragmentNames = Arrays.asList("part-r-00000", "part-r-00001", "part-r-00002", "part-r-00003");

Assert.assertEquals(bamFragments.length, expectedFragmentNames.size(), "Wrong number of fragments returned by ReadsSparkSink.getBamFragments()");
for ( int i = 0; i < bamFragments.length; ++i ) {
Assert.assertEquals(bamFragments[i].getPath().getName(), expectedFragmentNames.get(i), "Fragments are not in correct order");
}
}

@Test( expectedExceptions = GATKException.class, groups = "spark" )
public void testMergeIntoThrowsIfNoPartFiles() throws IOException {
ReadsSparkSink.mergeInto(null, new Path(getToolTestDataDir() + "directoryWithNoPartFiles"), new Configuration());
}
}