Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarified behavior of MarkDuplicatesSpark when given multiple input bams #5901

Merged
merged 2 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ protected List<String> getReadSourceName(){
/**
* Returns a map of read input to header.
*/
protected LinkedHashMap<String, SAMFileHeader> getReadSouceHeaderMap(){
protected LinkedHashMap<String, SAMFileHeader> getReadSourceHeaderMap(){
return readInputs;
}

Expand Down Expand Up @@ -568,16 +568,13 @@ private void initializeReads(final JavaSparkContext sparkContext) {
private SamFileHeaderMerger createHeaderMerger() {
return new SamFileHeaderMerger(identifySortOrder(readInputs.values()), readInputs.values(), true);
}
@VisibleForTesting
// If multiple bams have had their contents merged make no assumption about the underlying sort order
static SAMFileHeader.SortOrder identifySortOrder(final Collection<SAMFileHeader> headers){
final Set<SAMFileHeader.SortOrder> sortOrders = headers.stream().map(SAMFileHeader::getSortOrder).collect(Collectors.toSet());
final SAMFileHeader.SortOrder order;
if (sortOrders.size() == 1) {
order = sortOrders.iterator().next();
if (headers.size() > 1){
return SAMFileHeader.SortOrder.unsorted;
} else {
order = SAMFileHeader.SortOrder.unsorted;
return headers.iterator().next().getSortOrder();
}
return order;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.ArgumentCollection;
import org.broadinstitute.barclay.argparser.BetaFeature;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.barclay.help.DocumentedFeature;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
Expand All @@ -17,13 +16,11 @@
import org.broadinstitute.hellbender.engine.filters.ReadFilter;
import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary;
import org.broadinstitute.hellbender.engine.spark.GATKSparkTool;
import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadUtils;
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;
import org.broadinstitute.hellbender.utils.read.markduplicates.GATKDuplicationMetrics;
import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy;
import org.broadinstitute.hellbender.utils.spark.SparkUtils;
Expand All @@ -43,10 +40,11 @@
*
* <ul>
* <li>MarkDuplicatesSpark processing can replace both the MarkDuplicates and SortSam steps of the Best Practices <a href="https://software.broadinstitute.org/gatk/documentation/article?id=7899#2">single sample pipeline</a>. After flagging duplicate sets, the tool automatically coordinate-sorts the records. It is still necessary to subsequently run SetNmMdAndUqTags before running BQSR. </li>
* <li>The tool is optimized to run on queryname-grouped alignments. If provided coordinate-sorted alignments, the tool will spend additional time first queryname sorting the reads internally. This can result in the tool being up to 2x slower processing under some circumstances.</li>
* <li>The tool is optimized to run on queryname-grouped alignments (that is, all reads with the same queryname are together in the input file). If provided coordinate-sorted alignments, the tool will spend additional time first queryname sorting the reads internally. This can result in the tool being up to 2x slower processing under some circumstances.</li>
* <li>Due to MarkDuplicatesSpark queryname-sorting coordinate-sorted inputs internally at the start, the tool produces identical results regardless of the input sort-order. That is, it will flag duplicates sets that include secondary, and supplementary and unmapped mate records no matter the sort-order of the input. This differs from how Picard MarkDuplicates behaves given the differently sorted inputs. </li>
* <li>Collecting duplicate metrics slows down performance and thus the metrics collection is optional and must be specified for the Spark version of the tool with '-M'. It is possible to collect the metrics with the standalone Picard tool <a href='https://software.broadinstitute.org/gatk/documentation/tooldocs/current/picard_sam_markduplicates_EstimateLibraryComplexity.php'>EstimateLibraryComplexity</a>.</li>
* <li>MarkDuplicatesSpark is optimized to run locally on a single machine by leveraging core parallelism that MarkDuplicates and SortSam cannot. It will typically run faster than MarkDuplicates and SortSam by a factor of 15% over the same data at 2 cores and will scale linearly to upwards of 16 cores. This means MarkDuplicatesSpark, even without access to a Spark cluster, is faster than MarkDuplicates.</li>
* <li>MarkDuplicatesSpark can be run with multiple input bams. If this is the case all of the inputs must be a mix queryname-grouped or queryname sorted.</li>
* </ul>
*
* <p>For a typical 30x coverage WGS BAM, we recommend running on a machine with at least 16 GB. Memory usage scales with library complexity and the tool will need more memory for larger or more complex data. If the tool is running slowly it is possible Spark is running out of memory and is spilling data to disk excessively. If this is the case then increasing the memory available to the tool should yield speedup to a threshold; otherwise, increasing memory should have no effect beyond that threshold. </p>
Expand Down Expand Up @@ -288,12 +286,15 @@ public int getPartition(Object key) {

@Override
protected void runTool(final JavaSparkContext ctx) {
// Check if we are using multiple inputs that the headers are all in the correct querygrouped ordering
Map<String, SAMFileHeader> headerMap = getReadSouceHeaderMap();
final SAMFileHeader mergedHeader = getHeaderForReads();

// Check if we are using multiple inputs that the headers are all in the correct querygrouped ordering, if so set the aggregate header to reflect this
Map<String, SAMFileHeader> headerMap = getReadSourceHeaderMap();
if (headerMap.size() > 1) {
headerMap.entrySet().stream().forEach(h -> {if(!ReadUtils.isReadNameGroupedBam(h.getValue())) {
throw new UserException("Multiple inputs to MarkDuplicatesSpark detected but input "+h.getKey()+" was sorted in "+h.getValue().getSortOrder()+" order");
throw new UserException("Multiple inputs to MarkDuplicatesSpark detected. MarkDuplicatesSpark requires all inputs to be queryname sorted or querygroup-sorted for multi-input processing but input "+h.getKey()+" was sorted in "+h.getValue().getSortOrder()+" order");
}});
mergedHeader.setGroupOrder(SAMFileHeader.GroupOrder.query);
}

JavaRDD<GATKRead> reads = getReads();
Expand All @@ -304,14 +305,13 @@ protected void runTool(final JavaSparkContext ctx) {
markDuplicatesSparkArgumentCollection.taggingPolicy = MarkDuplicates.DuplicateTaggingPolicy.OpticalOnly;
}

final SAMFileHeader header = getHeaderForReads();
final JavaRDD<GATKRead> finalReadsForMetrics = mark(reads, header, finder, markDuplicatesSparkArgumentCollection, getRecommendedNumReducers());
final JavaRDD<GATKRead> finalReadsForMetrics = mark(reads, mergedHeader, finder, markDuplicatesSparkArgumentCollection, getRecommendedNumReducers());

if (metricsFile != null) {
final JavaPairRDD<String, GATKDuplicationMetrics> metricsByLibrary = MarkDuplicatesSparkUtils.generateMetrics(
header, finalReadsForMetrics);
mergedHeader, finalReadsForMetrics);
final MetricsFile<GATKDuplicationMetrics, Double> resultMetrics = getMetricsFile();
MarkDuplicatesSparkUtils.saveMetricsRDD(resultMetrics, header, metricsByLibrary, metricsFile);
MarkDuplicatesSparkUtils.saveMetricsRDD(resultMetrics, mergedHeader, metricsByLibrary, metricsFile);
}
JavaRDD<GATKRead> readsForWriting = finalReadsForMetrics;
// Filter out the duplicates if instructed to do so
Expand All @@ -321,8 +321,8 @@ protected void runTool(final JavaSparkContext ctx) {
readsForWriting = readsForWriting.filter(r -> !MarkDuplicates.DUPLICATE_TYPE_SEQUENCING.equals(r.getAttributeAsString(MarkDuplicates.DUPLICATE_TYPE_TAG)));
}

header.setSortOrder(SAMFileHeader.SortOrder.coordinate);
writeReads(ctx, output, readsForWriting, header, true);
mergedHeader.setSortOrder(SAMFileHeader.SortOrder.coordinate);
writeReads(ctx, output, readsForWriting, mergedHeader, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public Object[][] md(){
.put("Solexa-16404", ImmutableList.of(3L, 9L, 3L, 0L, 2L, 0L, 0.190476, 17L))
.put("Solexa-16406", ImmutableList.of(1L, 10L, 1L, 0L, 0L, 0L, 0.0, 0L))
.put("Solexa-16412", ImmutableList.of(3L, 6L, 3L, 0L, 1L, 0L, 0.133333, 15L)).build()},
// Testing that that multiple input file behavior still functions when both files are mixed between queryname and querygroup sorting
{new File[]{new File(TEST_DATA_DIR, "optical_dupes.queryname.bam"), new File(TEST_DATA_DIR, "example.chr1.1-1K.markedDups.querygrouped.bam")}, 94, 8,
ImmutableMap.builder().put("mylib", ImmutableList.of(0L, 2L, 0L, 0L, 1L, 1L, 0.5, 0L))
.put("Solexa-16419", ImmutableList.of(4L, 4L, 4L, 0L, 0L, 0L, 0.0, 0L))
.put("Solexa-16416", ImmutableList.of(2L, 2L, 2L, 0L, 0L, 0L, 0.0, 0L))
.put("Solexa-16404", ImmutableList.of(3L, 9L, 3L, 0L, 2L, 0L, 0.190476, 17L))
.put("Solexa-16406", ImmutableList.of(1L, 10L, 1L, 0L, 0L, 0L, 0.0, 0L))
.put("Solexa-16412", ImmutableList.of(3L, 6L, 3L, 0L, 1L, 0L, 0.133333, 15L)).build()},
};
}

Expand Down
Binary file not shown.