Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated MarkDuplicates to use Picard metrics code #4779

Merged
merged 8 commits into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.broadinstitute.hellbender.tools.spark.transforms.markduplicates;

import com.google.common.collect.Iterators;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import appears to not be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize it isn't your code, but can you remove the other two rouge imports....
line 21
import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource;

and
import picard.sam.markduplicates.util.OpticalDuplicateFinder;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.metrics.MetricsFile;
import org.apache.spark.Partitioner;
Expand All @@ -23,7 +24,7 @@
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadUtils;
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;
import org.broadinstitute.hellbender.utils.read.markduplicates.DuplicationMetrics;
import org.broadinstitute.hellbender.utils.read.markduplicates.GATKDuplicationMetrics;
import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy;
import org.broadinstitute.hellbender.utils.read.markduplicates.SerializableOpticalDuplicatesFinder;
import picard.sam.markduplicates.util.OpticalDuplicateFinder;
Expand Down Expand Up @@ -91,6 +92,7 @@ public static JavaRDD<GATKRead> mark(final JavaRDD<GATKRead> reads, final SAMFil
final MarkDuplicatesScoringStrategy scoringStrategy,
final OpticalDuplicateFinder opticalDuplicateFinder,
final int numReducers, final boolean dontMarkUnmappedMates) {
final boolean markUnmappedMates = !dontMarkUnmappedMates;
SAMFileHeader headerForTool = header.clone();

// If the input isn't queryname sorted, sort it before duplicate marking
Expand All @@ -107,11 +109,13 @@ public static JavaRDD<GATKRead> mark(final JavaRDD<GATKRead> reads, final SAMFil
// Here we combine the original bam with the repartitioned unmarked readnames to produce our marked reads
return sortedReadsForMarking.zipPartitions(repartitionedReadNames, (readsIter, readNamesIter) -> {
final Map<String,Integer> namesOfNonDuplicateReadsAndOpticalCounts = Utils.stream(readNamesIter).collect(Collectors.toMap(Tuple2::_1,Tuple2::_2, (t1,t2) -> {throw new GATKException("Detected multiple mark duplicate records objects corresponding to read with name, this could be the result of readnames spanning more than one partition");}));
return Utils.stream(readsIter).peek(read -> {
return Utils.stream(readsIter)
.peek(read -> read.setIsDuplicate(false))
.peek(read -> {
// Handle reads that have been marked as non-duplicates (which also get tagged with optical duplicate summary statistics)
if( namesOfNonDuplicateReadsAndOpticalCounts.containsKey(read.getName())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spacing around if appears incorrect

read.setIsDuplicate(false);
if (!(dontMarkUnmappedMates && read.isUnmapped())) {
if ( markUnmappedMates || !read.isUnmapped()) {
int dupCount = namesOfNonDuplicateReadsAndOpticalCounts.replace(read.getName(), -1);
if (dupCount > -1) {
((SAMRecordToGATKReadAdapter) read).setTransientAttribute(MarkDuplicatesSparkUtils.OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME, dupCount);
Expand All @@ -122,8 +126,10 @@ public static JavaRDD<GATKRead> mark(final JavaRDD<GATKRead> reads, final SAMFil
read.setIsDuplicate(false);
// Everything else is a duplicate
} else{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after else

if (!(dontMarkUnmappedMates && read.isUnmapped())) {
if ( markUnmappedMates || !read.isUnmapped()) {
read.setIsDuplicate(true);
} else {
read.setIsDuplicate(false);
}
}
}).iterator();
Expand Down Expand Up @@ -182,9 +188,9 @@ protected void runTool(final JavaSparkContext ctx) {
final JavaRDD<GATKRead> finalReadsForMetrics = mark(reads, header, markDuplicatesSparkArgumentCollection.duplicatesScoringStrategy, finder, getRecommendedNumReducers(), markDuplicatesSparkArgumentCollection.dontMarkUnmappedMates);

if (metricsFile != null) {
final JavaPairRDD<String, DuplicationMetrics> metricsByLibrary = MarkDuplicatesSparkUtils.generateMetrics(
final JavaPairRDD<String, GATKDuplicationMetrics> metricsByLibrary = MarkDuplicatesSparkUtils.generateMetrics(
header, finalReadsForMetrics);
final MetricsFile<DuplicationMetrics, Double> resultMetrics = getMetricsFile();
final MetricsFile<GATKDuplicationMetrics, Double> resultMetrics = getMetricsFile();
MarkDuplicatesSparkUtils.saveMetricsRDD(resultMetrics, header, metricsByLibrary, metricsFile);
}
header.setSortOrder(SAMFileHeader.SortOrder.coordinate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,27 +408,12 @@ private static Tuple2<IndexPair<String>, Integer> handleFragments(List<MarkDupli
.orElse(null);
}

static JavaPairRDD<String, DuplicationMetrics> generateMetrics(final SAMFileHeader header, final JavaRDD<GATKRead> reads) {
return reads.filter(read -> !read.isSecondaryAlignment() && !read.isSupplementaryAlignment())
.mapToPair(read -> {
static JavaPairRDD<String, GATKDuplicationMetrics> generateMetrics(final SAMFileHeader header, final JavaRDD<GATKRead> reads) {
return reads.mapToPair(read -> {
final String library = LibraryIdGenerator.getLibraryName(header, read.getReadGroup());
DuplicationMetrics metrics = new DuplicationMetrics();
GATKDuplicationMetrics metrics = new GATKDuplicationMetrics();
metrics.LIBRARY = library;
if (read.isUnmapped()) {
++metrics.UNMAPPED_READS;
} else if (!read.isPaired() || read.mateIsUnmapped()) {
++metrics.UNPAIRED_READS_EXAMINED;
} else {
++metrics.READ_PAIRS_EXAMINED;
}

if (read.isDuplicate()) {
if (!read.isPaired() || read.mateIsUnmapped()) {
++metrics.UNPAIRED_READ_DUPLICATES;
} else {
++metrics.READ_PAIR_DUPLICATES;
}
}
metrics.updateMetrics(read);
// NOTE: we use the SAMRecord transientAttribute field here specifically to prevent the already
// serialized read from being parsed again here for performance reasons.
if (((SAMRecordToGATKReadAdapter) read).getTransientAttribute(OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME)!=null) {
Expand All @@ -438,31 +423,22 @@ static JavaPairRDD<String, DuplicationMetrics> generateMetrics(final SAMFileHead
}
return new Tuple2<>(library, metrics);
})
.foldByKey(new DuplicationMetrics(), (metricsSum, m) -> {
if (metricsSum.LIBRARY == null) {
metricsSum.LIBRARY = m.LIBRARY;
}
// This should never happen, as we grouped by key using library as the key.
.foldByKey(new GATKDuplicationMetrics(), (metricsSum, m) -> {
metricsSum.merge(m);
if (!metricsSum.LIBRARY.equals(m.LIBRARY)) {
throw new GATKException("Two different libraries encountered while summing metrics: " + metricsSum.LIBRARY
+ " and " + m.LIBRARY);
}
metricsSum.UNMAPPED_READS += m.UNMAPPED_READS;
metricsSum.UNPAIRED_READS_EXAMINED += m.UNPAIRED_READS_EXAMINED;
metricsSum.READ_PAIRS_EXAMINED += m.READ_PAIRS_EXAMINED;
metricsSum.UNPAIRED_READ_DUPLICATES += m.UNPAIRED_READ_DUPLICATES;
metricsSum.READ_PAIR_DUPLICATES += m.READ_PAIR_DUPLICATES;
metricsSum.READ_PAIR_OPTICAL_DUPLICATES += m.READ_PAIR_OPTICAL_DUPLICATES;
return metricsSum;
})
.mapValues(metrics -> {
DuplicationMetrics copy = metrics.copy();
final GATKDuplicationMetrics copy = metrics.copy();
// Divide these by 2 because they are counted for each read
// when they should be counted by pair.
copy.READ_PAIRS_EXAMINED = metrics.READ_PAIRS_EXAMINED / 2;
copy.READ_PAIR_DUPLICATES = metrics.READ_PAIR_DUPLICATES / 2;

copy.calculateDerivedMetrics();
copy.calculateDerivedFields();
if (copy.ESTIMATED_LIBRARY_SIZE == null) {
copy.ESTIMATED_LIBRARY_SIZE = 0L;
}
Expand All @@ -475,19 +451,19 @@ static JavaPairRDD<String, DuplicationMetrics> generateMetrics(final SAMFileHead
* Note: the SamFileHeader is needed in order to include libraries that didn't have any duplicates.
* @param result metrics object, potentially pre-initialized with headers,
*/
public static void saveMetricsRDD(final MetricsFile<DuplicationMetrics, Double> result, final SAMFileHeader header, final JavaPairRDD<String, DuplicationMetrics> metricsRDD, final String metricsOutputPath) {
public static void saveMetricsRDD(final MetricsFile<GATKDuplicationMetrics, Double> result, final SAMFileHeader header, final JavaPairRDD<String, GATKDuplicationMetrics> metricsRDD, final String metricsOutputPath) {
final LibraryIdGenerator libraryIdGenerator = new LibraryIdGenerator(header);

final Map<String, DuplicationMetrics> nonEmptyMetricsByLibrary = metricsRDD.collectAsMap(); //Unknown Library
final Map<String, DuplicationMetrics> emptyMapByLibrary = libraryIdGenerator.getMetricsByLibraryMap();//with null
final Map<String, GATKDuplicationMetrics> nonEmptyMetricsByLibrary = metricsRDD.collectAsMap(); //Unknown Library
final Map<String, GATKDuplicationMetrics> emptyMapByLibrary = libraryIdGenerator.getMetricsByLibraryMap();//with null

final List<String> sortedListOfLibraryNames = new ArrayList<>(Sets.union(emptyMapByLibrary.keySet(), nonEmptyMetricsByLibrary.keySet()));
sortedListOfLibraryNames.sort(Utils.COMPARE_STRINGS_NULLS_FIRST);
for (final String library : sortedListOfLibraryNames) {
//if a non-empty exists, take it, otherwise take from the the empties. This is done to include libraries with zero data in them.
//But not all libraries are listed in the header (esp in testing data) so we union empty and non-empty
final DuplicationMetrics metricsToAdd = nonEmptyMetricsByLibrary.containsKey(library) ? nonEmptyMetricsByLibrary.get(library) : emptyMapByLibrary.get(library);
metricsToAdd.calculateDerivedMetrics();
final GATKDuplicationMetrics metricsToAdd = nonEmptyMetricsByLibrary.containsKey(library) ? nonEmptyMetricsByLibrary.get(library) : emptyMapByLibrary.get(library);
metricsToAdd.calculateDerivedFields();
result.addMetric(metricsToAdd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import picard.cmdline.programgroups.DiagnosticsAndQCProgramGroup;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.read.markduplicates.AbstractOpticalDuplicateFinderCommandLineProgram;
import org.broadinstitute.hellbender.utils.read.markduplicates.DuplicationMetrics;
import org.broadinstitute.hellbender.utils.read.markduplicates.GATKDuplicationMetrics;
import picard.sam.markduplicates.util.OpticalDuplicateFinder;
import org.broadinstitute.hellbender.utils.runtime.ProgressLogger;
import picard.sam.util.PhysicalLocation;
Expand Down Expand Up @@ -437,11 +437,11 @@ protected Object doWork() {
}
sorter.cleanup();

final MetricsFile<DuplicationMetrics, Integer> file = getMetricsFile();
final MetricsFile<GATKDuplicationMetrics, Integer> file = getMetricsFile();
for (final String library : duplicationHistosByLibrary.keySet()) {
final Histogram<Integer> duplicationHisto = duplicationHistosByLibrary.get(library);
final Histogram<Integer> opticalHisto = opticalHistosByLibrary.get(library);
final DuplicationMetrics metrics = new DuplicationMetrics();
final GATKDuplicationMetrics metrics = new GATKDuplicationMetrics();
metrics.LIBRARY = library;

// Filter out any bins that have only a single entry in them and calcu
Expand All @@ -456,7 +456,7 @@ protected Object doWork() {
}
}

metrics.calculateDerivedMetrics();
metrics.calculateDerivedFields();
file.addMetric(metrics);
file.addHistogram(duplicationHisto);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,46 +118,29 @@ protected Object doWork() {
try (final CloseableIterator<SAMRecord> iterator = headerAndIterator.iterator) {
while (iterator.hasNext()) {
final SAMRecord rec = iterator.next();
if (!rec.isSecondaryOrSupplementary()) {
final String library = LibraryIdGenerator.getLibraryName(header, rec);
DuplicationMetrics metrics = libraryIdGenerator.getMetricsByLibrary(library);
if (metrics == null) {
metrics = new DuplicationMetrics();
metrics.LIBRARY = library;
libraryIdGenerator.addMetricsByLibrary(library, metrics);
}
final String library = LibraryIdGenerator.getLibraryName(header, rec);
GATKDuplicationMetrics metrics = libraryIdGenerator.getMetricsByLibrary(library);
if (metrics == null) {
metrics = new GATKDuplicationMetrics();
metrics.LIBRARY = library;
libraryIdGenerator.addMetricsByLibrary(library, metrics);
}

// First bring the simple metrics up to date
if (rec.getReadUnmappedFlag()) {
++metrics.UNMAPPED_READS;
} else if (!rec.getReadPairedFlag() || rec.getMateUnmappedFlag()) {
++metrics.UNPAIRED_READS_EXAMINED;
if (recordInFileIndex == nextDuplicateIndex) {
rec.setDuplicateReadFlag(true);
// Now try and figure out the next duplicate index
if (this.duplicateIndexes.hasNext()) {
nextDuplicateIndex = this.duplicateIndexes.next();
} else {
++metrics.READ_PAIRS_EXAMINED; // will need to be divided by 2 at the end
// Only happens once we've marked all the duplicates
nextDuplicateIndex = -1;
}
} else {
rec.setDuplicateReadFlag(false);
}

metrics.updateMetrics(rec);

if (recordInFileIndex == nextDuplicateIndex) {
rec.setDuplicateReadFlag(true);

// Update the duplication metrics
if (!rec.getReadPairedFlag() || rec.getMateUnmappedFlag()) {
++metrics.UNPAIRED_READ_DUPLICATES;
} else {
++metrics.READ_PAIR_DUPLICATES;// will need to be divided by 2 at the end
}

// Now try and figure out the next duplicate index
if (this.duplicateIndexes.hasNext()) {
nextDuplicateIndex = this.duplicateIndexes.next();
} else {
// Only happens once we've marked all the duplicates
nextDuplicateIndex = -1;
}
} else {
rec.setDuplicateReadFlag(false);
}
}
recordInFileIndex++;

if (!this.REMOVE_DUPLICATES || !rec.getDuplicateReadFlag()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ public static boolean readHasMappedMate( final GATKRead read ) {
return read.isPaired() && ! read.mateIsUnmapped();
}

/**
* @param read read to check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds confusing, could you rephrase?

* @return true if the read is paired and has a mapped mate, otherwise false
*/
public static boolean readHasMappedMate( final SAMRecord read ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do spaces go around the argument?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats a good question, just looking in this file alone it seems to be wholly inconsistent one way or another. I'm not going to bother changing it because it would imply that there was some principled reason for me to unify ALL the formatting in this file at least which I don't necessarily want to do in this PR.

return read.getReadPairedFlag() && ! read.getMateUnmappedFlag();
}

/**
* Check whether the given String represents a legal attribute name according to the SAM spec,
* and throw an exception if it doesn't.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,17 @@ protected Map<String, String> getChainedPgIds(final SAMFileHeader outputHeader)
*/
protected void finalizeAndWriteMetrics(final LibraryIdGenerator libraryIdGenerator) {
//We want to sort libraries by name
final SortedMap<String, DuplicationMetrics> metricsByLibrary = new TreeMap<>(Utils.COMPARE_STRINGS_NULLS_FIRST);
final SortedMap<String, GATKDuplicationMetrics> metricsByLibrary = new TreeMap<>(Utils.COMPARE_STRINGS_NULLS_FIRST);
metricsByLibrary.putAll(libraryIdGenerator.getMetricsByLibraryMap());

final Histogram<Short> opticalDuplicatesByLibraryId = libraryIdGenerator.getOpticalDuplicatesByLibraryIdMap();
final Map<String, Short> libraryIds = libraryIdGenerator.getLibraryIdsMap();

// Write out the metrics
final MetricsFile<DuplicationMetrics, Double> file = getMetricsFile();
for (final Map.Entry<String, DuplicationMetrics> entry : metricsByLibrary.entrySet()) {
final MetricsFile<GATKDuplicationMetrics, Double> file = getMetricsFile();
for (final Map.Entry<String, GATKDuplicationMetrics> entry : metricsByLibrary.entrySet()) {
final String libraryName = entry.getKey();
final DuplicationMetrics metrics = entry.getValue();
final GATKDuplicationMetrics metrics = entry.getValue();

metrics.READ_PAIRS_EXAMINED = metrics.READ_PAIRS_EXAMINED / 2;
metrics.READ_PAIR_DUPLICATES = metrics.READ_PAIR_DUPLICATES / 2;
Expand All @@ -141,7 +141,7 @@ protected void finalizeAndWriteMetrics(final LibraryIdGenerator libraryIdGenerat
metrics.READ_PAIR_OPTICAL_DUPLICATES = (long) bin.getValue();
}
}
metrics.calculateDerivedMetrics();
metrics.calculateDerivedFields();
file.addMetric(metrics);
}

Expand Down
Loading