Skip to content

Commit

Permalink
Updated MarkDuplicates to use Picard metrics code (#4779)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesemery authored May 22, 2018
1 parent e078a57 commit 2cc7abd
Show file tree
Hide file tree
Showing 19 changed files with 401 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
import org.broadinstitute.hellbender.engine.filters.ReadFilter;
import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary;
import org.broadinstitute.hellbender.engine.spark.GATKSparkTool;
import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadUtils;
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;
import org.broadinstitute.hellbender.utils.read.markduplicates.DuplicationMetrics;
import org.broadinstitute.hellbender.utils.read.markduplicates.GATKDuplicationMetrics;
import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy;
import org.broadinstitute.hellbender.utils.read.markduplicates.SerializableOpticalDuplicatesFinder;
import picard.sam.markduplicates.util.OpticalDuplicateFinder;
import org.broadinstitute.hellbender.utils.spark.SparkUtils;
import picard.sam.markduplicates.util.OpticalDuplicateFinder;
import picard.cmdline.programgroups.ReadDataManipulationProgramGroup;
import picard.sam.markduplicates.util.OpticalDuplicateFinder;
import scala.Tuple2;

import java.util.Collections;
Expand Down Expand Up @@ -91,6 +89,7 @@ public static JavaRDD<GATKRead> mark(final JavaRDD<GATKRead> reads, final SAMFil
final MarkDuplicatesScoringStrategy scoringStrategy,
final OpticalDuplicateFinder opticalDuplicateFinder,
final int numReducers, final boolean dontMarkUnmappedMates) {
final boolean markUnmappedMates = !dontMarkUnmappedMates;
SAMFileHeader headerForTool = header.clone();

// If the input isn't queryname sorted, sort it before duplicate marking
Expand All @@ -107,11 +106,13 @@ public static JavaRDD<GATKRead> mark(final JavaRDD<GATKRead> reads, final SAMFil
// Here we combine the original bam with the repartitioned unmarked readnames to produce our marked reads
return sortedReadsForMarking.zipPartitions(repartitionedReadNames, (readsIter, readNamesIter) -> {
final Map<String,Integer> namesOfNonDuplicateReadsAndOpticalCounts = Utils.stream(readNamesIter).collect(Collectors.toMap(Tuple2::_1,Tuple2::_2, (t1,t2) -> {throw new GATKException("Detected multiple mark duplicate records objects corresponding to read with name, this could be the result of readnames spanning more than one partition");}));
return Utils.stream(readsIter).peek(read -> {
return Utils.stream(readsIter)
.peek(read -> read.setIsDuplicate(false))
.peek(read -> {
// Handle reads that have been marked as non-duplicates (which also get tagged with optical duplicate summary statistics)
if( namesOfNonDuplicateReadsAndOpticalCounts.containsKey(read.getName())) {
if (namesOfNonDuplicateReadsAndOpticalCounts.containsKey(read.getName())) {
read.setIsDuplicate(false);
if (!(dontMarkUnmappedMates && read.isUnmapped())) {
if (markUnmappedMates || !read.isUnmapped()) {
int dupCount = namesOfNonDuplicateReadsAndOpticalCounts.replace(read.getName(), -1);
if (dupCount > -1) {
((SAMRecordToGATKReadAdapter) read).setTransientAttribute(MarkDuplicatesSparkUtils.OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME, dupCount);
Expand All @@ -121,9 +122,11 @@ public static JavaRDD<GATKRead> mark(final JavaRDD<GATKRead> reads, final SAMFil
} else if (ReadUtils.readAndMateAreUnmapped(read)) {
read.setIsDuplicate(false);
// Everything else is a duplicate
} else{
if (!(dontMarkUnmappedMates && read.isUnmapped())) {
} else {
if (markUnmappedMates || !read.isUnmapped()) {
read.setIsDuplicate(true);
} else {
read.setIsDuplicate(false);
}
}
}).iterator();
Expand Down Expand Up @@ -182,9 +185,9 @@ protected void runTool(final JavaSparkContext ctx) {
final JavaRDD<GATKRead> finalReadsForMetrics = mark(reads, header, markDuplicatesSparkArgumentCollection.duplicatesScoringStrategy, finder, getRecommendedNumReducers(), markDuplicatesSparkArgumentCollection.dontMarkUnmappedMates);

if (metricsFile != null) {
final JavaPairRDD<String, DuplicationMetrics> metricsByLibrary = MarkDuplicatesSparkUtils.generateMetrics(
final JavaPairRDD<String, GATKDuplicationMetrics> metricsByLibrary = MarkDuplicatesSparkUtils.generateMetrics(
header, finalReadsForMetrics);
final MetricsFile<DuplicationMetrics, Double> resultMetrics = getMetricsFile();
final MetricsFile<GATKDuplicationMetrics, Double> resultMetrics = getMetricsFile();
MarkDuplicatesSparkUtils.saveMetricsRDD(resultMetrics, header, metricsByLibrary, metricsFile);
}
header.setSortOrder(SAMFileHeader.SortOrder.coordinate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,27 +408,12 @@ private static Tuple2<IndexPair<String>, Integer> handleFragments(List<MarkDupli
.orElse(null);
}

static JavaPairRDD<String, DuplicationMetrics> generateMetrics(final SAMFileHeader header, final JavaRDD<GATKRead> reads) {
return reads.filter(read -> !read.isSecondaryAlignment() && !read.isSupplementaryAlignment())
.mapToPair(read -> {
static JavaPairRDD<String, GATKDuplicationMetrics> generateMetrics(final SAMFileHeader header, final JavaRDD<GATKRead> reads) {
return reads.mapToPair(read -> {
final String library = LibraryIdGenerator.getLibraryName(header, read.getReadGroup());
DuplicationMetrics metrics = new DuplicationMetrics();
GATKDuplicationMetrics metrics = new GATKDuplicationMetrics();
metrics.LIBRARY = library;
if (read.isUnmapped()) {
++metrics.UNMAPPED_READS;
} else if (!read.isPaired() || read.mateIsUnmapped()) {
++metrics.UNPAIRED_READS_EXAMINED;
} else {
++metrics.READ_PAIRS_EXAMINED;
}

if (read.isDuplicate()) {
if (!read.isPaired() || read.mateIsUnmapped()) {
++metrics.UNPAIRED_READ_DUPLICATES;
} else {
++metrics.READ_PAIR_DUPLICATES;
}
}
metrics.updateMetrics(read);
// NOTE: we use the SAMRecord transientAttribute field here specifically to prevent the already
// serialized read from being parsed again here for performance reasons.
if (((SAMRecordToGATKReadAdapter) read).getTransientAttribute(OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME)!=null) {
Expand All @@ -438,31 +423,22 @@ static JavaPairRDD<String, DuplicationMetrics> generateMetrics(final SAMFileHead
}
return new Tuple2<>(library, metrics);
})
.foldByKey(new DuplicationMetrics(), (metricsSum, m) -> {
if (metricsSum.LIBRARY == null) {
metricsSum.LIBRARY = m.LIBRARY;
}
// This should never happen, as we grouped by key using library as the key.
.foldByKey(new GATKDuplicationMetrics(), (metricsSum, m) -> {
metricsSum.merge(m);
if (!metricsSum.LIBRARY.equals(m.LIBRARY)) {
throw new GATKException("Two different libraries encountered while summing metrics: " + metricsSum.LIBRARY
+ " and " + m.LIBRARY);
}
metricsSum.UNMAPPED_READS += m.UNMAPPED_READS;
metricsSum.UNPAIRED_READS_EXAMINED += m.UNPAIRED_READS_EXAMINED;
metricsSum.READ_PAIRS_EXAMINED += m.READ_PAIRS_EXAMINED;
metricsSum.UNPAIRED_READ_DUPLICATES += m.UNPAIRED_READ_DUPLICATES;
metricsSum.READ_PAIR_DUPLICATES += m.READ_PAIR_DUPLICATES;
metricsSum.READ_PAIR_OPTICAL_DUPLICATES += m.READ_PAIR_OPTICAL_DUPLICATES;
return metricsSum;
})
.mapValues(metrics -> {
DuplicationMetrics copy = metrics.copy();
final GATKDuplicationMetrics copy = metrics.copy();
// Divide these by 2 because they are counted for each read
// when they should be counted by pair.
copy.READ_PAIRS_EXAMINED = metrics.READ_PAIRS_EXAMINED / 2;
copy.READ_PAIR_DUPLICATES = metrics.READ_PAIR_DUPLICATES / 2;

copy.calculateDerivedMetrics();
copy.calculateDerivedFields();
if (copy.ESTIMATED_LIBRARY_SIZE == null) {
copy.ESTIMATED_LIBRARY_SIZE = 0L;
}
Expand All @@ -475,19 +451,19 @@ static JavaPairRDD<String, DuplicationMetrics> generateMetrics(final SAMFileHead
* Note: the SamFileHeader is needed in order to include libraries that didn't have any duplicates.
* @param result metrics object, potentially pre-initialized with headers,
*/
public static void saveMetricsRDD(final MetricsFile<DuplicationMetrics, Double> result, final SAMFileHeader header, final JavaPairRDD<String, DuplicationMetrics> metricsRDD, final String metricsOutputPath) {
public static void saveMetricsRDD(final MetricsFile<GATKDuplicationMetrics, Double> result, final SAMFileHeader header, final JavaPairRDD<String, GATKDuplicationMetrics> metricsRDD, final String metricsOutputPath) {
final LibraryIdGenerator libraryIdGenerator = new LibraryIdGenerator(header);

final Map<String, DuplicationMetrics> nonEmptyMetricsByLibrary = metricsRDD.collectAsMap(); //Unknown Library
final Map<String, DuplicationMetrics> emptyMapByLibrary = libraryIdGenerator.getMetricsByLibraryMap();//with null
final Map<String, GATKDuplicationMetrics> nonEmptyMetricsByLibrary = metricsRDD.collectAsMap(); //Unknown Library
final Map<String, GATKDuplicationMetrics> emptyMapByLibrary = libraryIdGenerator.getMetricsByLibraryMap();//with null

final List<String> sortedListOfLibraryNames = new ArrayList<>(Sets.union(emptyMapByLibrary.keySet(), nonEmptyMetricsByLibrary.keySet()));
sortedListOfLibraryNames.sort(Utils.COMPARE_STRINGS_NULLS_FIRST);
for (final String library : sortedListOfLibraryNames) {
//if a non-empty exists, take it, otherwise take from the the empties. This is done to include libraries with zero data in them.
//But not all libraries are listed in the header (esp in testing data) so we union empty and non-empty
final DuplicationMetrics metricsToAdd = nonEmptyMetricsByLibrary.containsKey(library) ? nonEmptyMetricsByLibrary.get(library) : emptyMapByLibrary.get(library);
metricsToAdd.calculateDerivedMetrics();
final GATKDuplicationMetrics metricsToAdd = nonEmptyMetricsByLibrary.containsKey(library) ? nonEmptyMetricsByLibrary.get(library) : emptyMapByLibrary.get(library);
metricsToAdd.calculateDerivedFields();
result.addMetric(metricsToAdd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import picard.cmdline.programgroups.DiagnosticsAndQCProgramGroup;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.read.markduplicates.AbstractOpticalDuplicateFinderCommandLineProgram;
import org.broadinstitute.hellbender.utils.read.markduplicates.DuplicationMetrics;
import org.broadinstitute.hellbender.utils.read.markduplicates.GATKDuplicationMetrics;
import picard.sam.markduplicates.util.OpticalDuplicateFinder;
import org.broadinstitute.hellbender.utils.runtime.ProgressLogger;
import picard.sam.util.PhysicalLocation;
Expand Down Expand Up @@ -437,11 +437,11 @@ protected Object doWork() {
}
sorter.cleanup();

final MetricsFile<DuplicationMetrics, Integer> file = getMetricsFile();
final MetricsFile<GATKDuplicationMetrics, Integer> file = getMetricsFile();
for (final String library : duplicationHistosByLibrary.keySet()) {
final Histogram<Integer> duplicationHisto = duplicationHistosByLibrary.get(library);
final Histogram<Integer> opticalHisto = opticalHistosByLibrary.get(library);
final DuplicationMetrics metrics = new DuplicationMetrics();
final GATKDuplicationMetrics metrics = new GATKDuplicationMetrics();
metrics.LIBRARY = library;

// Filter out any bins that have only a single entry in them and calcu
Expand All @@ -456,7 +456,7 @@ protected Object doWork() {
}
}

metrics.calculateDerivedMetrics();
metrics.calculateDerivedFields();
file.addMetric(metrics);
file.addHistogram(duplicationHisto);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,46 +118,29 @@ protected Object doWork() {
try (final CloseableIterator<SAMRecord> iterator = headerAndIterator.iterator) {
while (iterator.hasNext()) {
final SAMRecord rec = iterator.next();
if (!rec.isSecondaryOrSupplementary()) {
final String library = LibraryIdGenerator.getLibraryName(header, rec);
DuplicationMetrics metrics = libraryIdGenerator.getMetricsByLibrary(library);
if (metrics == null) {
metrics = new DuplicationMetrics();
metrics.LIBRARY = library;
libraryIdGenerator.addMetricsByLibrary(library, metrics);
}
final String library = LibraryIdGenerator.getLibraryName(header, rec);
GATKDuplicationMetrics metrics = libraryIdGenerator.getMetricsByLibrary(library);
if (metrics == null) {
metrics = new GATKDuplicationMetrics();
metrics.LIBRARY = library;
libraryIdGenerator.addMetricsByLibrary(library, metrics);
}

// First bring the simple metrics up to date
if (rec.getReadUnmappedFlag()) {
++metrics.UNMAPPED_READS;
} else if (!rec.getReadPairedFlag() || rec.getMateUnmappedFlag()) {
++metrics.UNPAIRED_READS_EXAMINED;
if (recordInFileIndex == nextDuplicateIndex) {
rec.setDuplicateReadFlag(true);
// Now try and figure out the next duplicate index
if (this.duplicateIndexes.hasNext()) {
nextDuplicateIndex = this.duplicateIndexes.next();
} else {
++metrics.READ_PAIRS_EXAMINED; // will need to be divided by 2 at the end
// Only happens once we've marked all the duplicates
nextDuplicateIndex = -1;
}
} else {
rec.setDuplicateReadFlag(false);
}

metrics.updateMetrics(rec);

if (recordInFileIndex == nextDuplicateIndex) {
rec.setDuplicateReadFlag(true);

// Update the duplication metrics
if (!rec.getReadPairedFlag() || rec.getMateUnmappedFlag()) {
++metrics.UNPAIRED_READ_DUPLICATES;
} else {
++metrics.READ_PAIR_DUPLICATES;// will need to be divided by 2 at the end
}

// Now try and figure out the next duplicate index
if (this.duplicateIndexes.hasNext()) {
nextDuplicateIndex = this.duplicateIndexes.next();
} else {
// Only happens once we've marked all the duplicates
nextDuplicateIndex = -1;
}
} else {
rec.setDuplicateReadFlag(false);
}
}
recordInFileIndex++;

if (!this.REMOVE_DUPLICATES || !rec.getDuplicateReadFlag()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,21 @@ public static String prettyPrintSequenceRecords( final SAMSequenceDictionary seq
}

/**
* @param read read to check
* @return true if the read is paired and has a mapped mate, otherwise false
* @param read read to query
* @return true if the read has a mate and that mate is mapped, otherwise false
*/
public static boolean readHasMappedMate( final GATKRead read ) {
return read.isPaired() && ! read.mateIsUnmapped();
}

/**
* @param read read to query
* @return true if the read has a mate and that mate is mapped, otherwise false
*/
public static boolean readHasMappedMate( final SAMRecord read ) {
return read.getReadPairedFlag() && ! read.getMateUnmappedFlag();
}

/**
* Check whether the given String represents a legal attribute name according to the SAM spec,
* and throw an exception if it doesn't.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,17 @@ protected Map<String, String> getChainedPgIds(final SAMFileHeader outputHeader)
*/
protected void finalizeAndWriteMetrics(final LibraryIdGenerator libraryIdGenerator) {
//We want to sort libraries by name
final SortedMap<String, DuplicationMetrics> metricsByLibrary = new TreeMap<>(Utils.COMPARE_STRINGS_NULLS_FIRST);
final SortedMap<String, GATKDuplicationMetrics> metricsByLibrary = new TreeMap<>(Utils.COMPARE_STRINGS_NULLS_FIRST);
metricsByLibrary.putAll(libraryIdGenerator.getMetricsByLibraryMap());

final Histogram<Short> opticalDuplicatesByLibraryId = libraryIdGenerator.getOpticalDuplicatesByLibraryIdMap();
final Map<String, Short> libraryIds = libraryIdGenerator.getLibraryIdsMap();

// Write out the metrics
final MetricsFile<DuplicationMetrics, Double> file = getMetricsFile();
for (final Map.Entry<String, DuplicationMetrics> entry : metricsByLibrary.entrySet()) {
final MetricsFile<GATKDuplicationMetrics, Double> file = getMetricsFile();
for (final Map.Entry<String, GATKDuplicationMetrics> entry : metricsByLibrary.entrySet()) {
final String libraryName = entry.getKey();
final DuplicationMetrics metrics = entry.getValue();
final GATKDuplicationMetrics metrics = entry.getValue();

metrics.READ_PAIRS_EXAMINED = metrics.READ_PAIRS_EXAMINED / 2;
metrics.READ_PAIR_DUPLICATES = metrics.READ_PAIR_DUPLICATES / 2;
Expand All @@ -141,7 +141,7 @@ protected void finalizeAndWriteMetrics(final LibraryIdGenerator libraryIdGenerat
metrics.READ_PAIR_OPTICAL_DUPLICATES = (long) bin.getValue();
}
}
metrics.calculateDerivedMetrics();
metrics.calculateDerivedFields();
file.addMetric(metrics);
}

Expand Down
Loading

0 comments on commit 2cc7abd

Please sign in to comment.