diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java index 136b5228af5..cdb702506fa 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java @@ -503,7 +503,7 @@ protected List getReadSourceName(){ /** * Returns a map of read input to header. */ - protected LinkedHashMap getReadSouceHeaderMap(){ + protected LinkedHashMap getReadSourceHeaderMap(){ return readInputs; } @@ -568,16 +568,13 @@ private void initializeReads(final JavaSparkContext sparkContext) { private SamFileHeaderMerger createHeaderMerger() { return new SamFileHeaderMerger(identifySortOrder(readInputs.values()), readInputs.values(), true); } - @VisibleForTesting + // If multiple bams have had their contents merged make no assumption about the underlying sort order static SAMFileHeader.SortOrder identifySortOrder(final Collection headers){ - final Set sortOrders = headers.stream().map(SAMFileHeader::getSortOrder).collect(Collectors.toSet()); - final SAMFileHeader.SortOrder order; - if (sortOrders.size() == 1) { - order = sortOrders.iterator().next(); + if (headers.size() > 1){ + return SAMFileHeader.SortOrder.unsorted; } else { - order = SAMFileHeader.SortOrder.unsorted; + return headers.iterator().next().getSortOrder(); } - return order; } /** diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java index 676456d0d05..4b306d87da5 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java @@ -8,7 +8,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.broadinstitute.barclay.argparser.Argument; import org.broadinstitute.barclay.argparser.ArgumentCollection; -import org.broadinstitute.barclay.argparser.BetaFeature; import org.broadinstitute.barclay.argparser.CommandLineProgramProperties; import org.broadinstitute.barclay.help.DocumentedFeature; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; @@ -17,13 +16,11 @@ import org.broadinstitute.hellbender.engine.filters.ReadFilter; import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary; import org.broadinstitute.hellbender.engine.spark.GATKSparkTool; -import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource; import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.Utils; import org.broadinstitute.hellbender.utils.read.GATKRead; import org.broadinstitute.hellbender.utils.read.ReadUtils; -import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter; import org.broadinstitute.hellbender.utils.read.markduplicates.GATKDuplicationMetrics; import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; import org.broadinstitute.hellbender.utils.spark.SparkUtils; @@ -43,10 +40,11 @@ * *
    *
  • MarkDuplicatesSpark processing can replace both the MarkDuplicates and SortSam steps of the Best Practices single sample pipeline. After flagging duplicate sets, the tool automatically coordinate-sorts the records. It is still necessary to subsequently run SetNmMdAndUqTags before running BQSR.
  • - *
  • The tool is optimized to run on queryname-grouped alignments. If provided coordinate-sorted alignments, the tool will spend additional time first queryname sorting the reads internally. This can result in the tool being up to 2x slower processing under some circumstances.
  • + *
  • The tool is optimized to run on queryname-grouped alignments (that is, all reads with the same queryname are together in the input file). If provided coordinate-sorted alignments, the tool will spend additional time first queryname sorting the reads internally. This can result in the tool being up to 2x slower processing under some circumstances.
  • *
  • Due to MarkDuplicatesSpark queryname-sorting coordinate-sorted inputs internally at the start, the tool produces identical results regardless of the input sort-order. That is, it will flag duplicates sets that include secondary, and supplementary and unmapped mate records no matter the sort-order of the input. This differs from how Picard MarkDuplicates behaves given the differently sorted inputs.
  • *
  • Collecting duplicate metrics slows down performance and thus the metrics collection is optional and must be specified for the Spark version of the tool with '-M'. It is possible to collect the metrics with the standalone Picard tool EstimateLibraryComplexity.
  • *
  • MarkDuplicatesSpark is optimized to run locally on a single machine by leveraging core parallelism that MarkDuplicates and SortSam cannot. It will typically run faster than MarkDuplicates and SortSam by a factor of 15% over the same data at 2 cores and will scale linearly to upwards of 16 cores. This means MarkDuplicatesSpark, even without access to a Spark cluster, is faster than MarkDuplicates.
  • + *
  • MarkDuplicatesSpark can be run with multiple input bams. If this is the case all of the inputs must be a mix queryname-grouped or queryname sorted.
  • *
* *

For a typical 30x coverage WGS BAM, we recommend running on a machine with at least 16 GB. Memory usage scales with library complexity and the tool will need more memory for larger or more complex data. If the tool is running slowly it is possible Spark is running out of memory and is spilling data to disk excessively. If this is the case then increasing the memory available to the tool should yield speedup to a threshold; otherwise, increasing memory should have no effect beyond that threshold.

@@ -288,12 +286,15 @@ public int getPartition(Object key) { @Override protected void runTool(final JavaSparkContext ctx) { - // Check if we are using multiple inputs that the headers are all in the correct querygrouped ordering - Map headerMap = getReadSouceHeaderMap(); + final SAMFileHeader mergedHeader = getHeaderForReads(); + + // Check if we are using multiple inputs that the headers are all in the correct querygrouped ordering, if so set the aggregate header to reflect this + Map headerMap = getReadSourceHeaderMap(); if (headerMap.size() > 1) { headerMap.entrySet().stream().forEach(h -> {if(!ReadUtils.isReadNameGroupedBam(h.getValue())) { - throw new UserException("Multiple inputs to MarkDuplicatesSpark detected but input "+h.getKey()+" was sorted in "+h.getValue().getSortOrder()+" order"); + throw new UserException("Multiple inputs to MarkDuplicatesSpark detected. MarkDuplicatesSpark requires all inputs to be queryname sorted or querygroup-sorted for multi-input processing but input "+h.getKey()+" was sorted in "+h.getValue().getSortOrder()+" order"); }}); + mergedHeader.setGroupOrder(SAMFileHeader.GroupOrder.query); } JavaRDD reads = getReads(); @@ -304,14 +305,13 @@ protected void runTool(final JavaSparkContext ctx) { markDuplicatesSparkArgumentCollection.taggingPolicy = MarkDuplicates.DuplicateTaggingPolicy.OpticalOnly; } - final SAMFileHeader header = getHeaderForReads(); - final JavaRDD finalReadsForMetrics = mark(reads, header, finder, markDuplicatesSparkArgumentCollection, getRecommendedNumReducers()); + final JavaRDD finalReadsForMetrics = mark(reads, mergedHeader, finder, markDuplicatesSparkArgumentCollection, getRecommendedNumReducers()); if (metricsFile != null) { final JavaPairRDD metricsByLibrary = MarkDuplicatesSparkUtils.generateMetrics( - header, finalReadsForMetrics); + mergedHeader, finalReadsForMetrics); final MetricsFile resultMetrics = getMetricsFile(); - MarkDuplicatesSparkUtils.saveMetricsRDD(resultMetrics, header, metricsByLibrary, metricsFile); + MarkDuplicatesSparkUtils.saveMetricsRDD(resultMetrics, mergedHeader, metricsByLibrary, metricsFile); } JavaRDD readsForWriting = finalReadsForMetrics; // Filter out the duplicates if instructed to do so @@ -321,8 +321,8 @@ protected void runTool(final JavaSparkContext ctx) { readsForWriting = readsForWriting.filter(r -> !MarkDuplicates.DUPLICATE_TYPE_SEQUENCING.equals(r.getAttributeAsString(MarkDuplicates.DUPLICATE_TYPE_TAG))); } - header.setSortOrder(SAMFileHeader.SortOrder.coordinate); - writeReads(ctx, output, readsForWriting, header, true); + mergedHeader.setSortOrder(SAMFileHeader.SortOrder.coordinate); + writeReads(ctx, output, readsForWriting, mergedHeader, true); } } diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/MarkDuplicatesSparkIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/MarkDuplicatesSparkIntegrationTest.java index 7d47536bc4e..a2e12cc37ff 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/MarkDuplicatesSparkIntegrationTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/MarkDuplicatesSparkIntegrationTest.java @@ -91,6 +91,14 @@ public Object[][] md(){ .put("Solexa-16404", ImmutableList.of(3L, 9L, 3L, 0L, 2L, 0L, 0.190476, 17L)) .put("Solexa-16406", ImmutableList.of(1L, 10L, 1L, 0L, 0L, 0L, 0.0, 0L)) .put("Solexa-16412", ImmutableList.of(3L, 6L, 3L, 0L, 1L, 0L, 0.133333, 15L)).build()}, + // Testing that that multiple input file behavior still functions when both files are mixed between queryname and querygroup sorting + {new File[]{new File(TEST_DATA_DIR, "optical_dupes.queryname.bam"), new File(TEST_DATA_DIR, "example.chr1.1-1K.markedDups.querygrouped.bam")}, 94, 8, + ImmutableMap.builder().put("mylib", ImmutableList.of(0L, 2L, 0L, 0L, 1L, 1L, 0.5, 0L)) + .put("Solexa-16419", ImmutableList.of(4L, 4L, 4L, 0L, 0L, 0L, 0.0, 0L)) + .put("Solexa-16416", ImmutableList.of(2L, 2L, 2L, 0L, 0L, 0L, 0.0, 0L)) + .put("Solexa-16404", ImmutableList.of(3L, 9L, 3L, 0L, 2L, 0L, 0.190476, 17L)) + .put("Solexa-16406", ImmutableList.of(1L, 10L, 1L, 0L, 0L, 0L, 0.0, 0L)) + .put("Solexa-16412", ImmutableList.of(3L, 6L, 3L, 0L, 1L, 0L, 0.133333, 15L)).build()}, }; } diff --git a/src/test/resources/org/broadinstitute/hellbender/tools/walkers/MarkDuplicatesGATK/example.chr1.1-1K.markedDups.querygrouped.bam b/src/test/resources/org/broadinstitute/hellbender/tools/walkers/MarkDuplicatesGATK/example.chr1.1-1K.markedDups.querygrouped.bam new file mode 100644 index 00000000000..2f6f6c3581f Binary files /dev/null and b/src/test/resources/org/broadinstitute/hellbender/tools/walkers/MarkDuplicatesGATK/example.chr1.1-1K.markedDups.querygrouped.bam differ