Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port of RevertSam tool into Spark #5395

Merged
merged 16 commits into from
Jan 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ private StandardArgumentDefinitions(){}
public static final String PEDIGREE_FILE_LONG_NAME = "pedigree";
public static final String SITES_ONLY_LONG_NAME = "sites-only-vcf-output";
public static final String INVALIDATE_PREVIOUS_FILTERS_LONG_NAME = "invalidate-previous-filters";
public static final String SORT_ORDER_LONG_NAME = "sort-order";


public static final String INPUT_SHORT_NAME = "I";
public static final String OUTPUT_SHORT_NAME = "O";
Expand Down Expand Up @@ -67,6 +69,7 @@ private StandardArgumentDefinitions(){}
public static final String ANNOTATIONS_TO_EXCLUDE_SHORT_NAME = "AX";
public static final String SAMPLE_NAME_SHORT_NAME = "sn";
public static final String PEDIGREE_FILE_SHORT_NAME = "ped";
public static final String SORT_ORDER_SHORT_NAME = "SO";

public static final String SPARK_PROPERTY_NAME = "conf";

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
public final class SortSamSpark extends GATKSparkTool {
private static final long serialVersionUID = 1L;

public static final String SORT_ORDER_LONG_NAME = "sort-order";

@Override
public boolean requiresReads() { return true; }

@Argument(doc="the output file path", shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME, fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME, optional = false)
private String outputFile;

@Argument(doc="sort order of the output file", fullName = SORT_ORDER_LONG_NAME, optional = true)
@Argument(doc="sort order of the output file", shortName = StandardArgumentDefinitions.SORT_ORDER_SHORT_NAME, fullName = StandardArgumentDefinitions.SORT_ORDER_LONG_NAME, optional = true)
private SparkSortOrder sortOrder = SparkSortOrder.coordinate;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static JavaRDD<GATKRead> mark(final JavaRDD<GATKRead> reads, final SAMFil
SAMFileHeader headerForTool = header.clone();

// If the input isn't queryname sorted, sort it before duplicate marking
final JavaRDD<GATKRead> sortedReadsForMarking = querynameSortReadsIfNecessary(reads, numReducers, headerForTool);
final JavaRDD<GATKRead> sortedReadsForMarking = SparkUtils.querynameSortReadsIfNecessary(reads, numReducers, headerForTool);

// If we need to remove optical duplicates or tag them, then make sure we are keeping track
final boolean markOpticalDups = (taggingPolicy != MarkDuplicates.DuplicateTaggingPolicy.DontTag);
Expand Down Expand Up @@ -171,20 +171,6 @@ public static JavaRDD<GATKRead> mark(final JavaRDD<GATKRead> reads, final SAMFil
}


/**
* Sort reads into queryname order if they are not already sorted
*/
private static JavaRDD<GATKRead> querynameSortReadsIfNecessary(JavaRDD<GATKRead> reads, int numReducers, SAMFileHeader headerForTool) {
JavaRDD<GATKRead> sortedReadsForMarking;
if (ReadUtils.isReadNameGroupedBam(headerForTool)) {
sortedReadsForMarking = reads;
} else {
headerForTool.setSortOrder(SAMFileHeader.SortOrder.queryname);
sortedReadsForMarking = SparkUtils.sortReadsAccordingToHeader(reads, headerForTool, numReducers);
}
return sortedReadsForMarking;
}

/**
* A custom partitioner designed to cut down on spark shuffle costs.
* This is designed such that getPartition(key) is called on a key which corresponds to the already known target partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;
import org.broadinstitute.hellbender.utils.read.markduplicates.*;
import org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords.*;
import org.broadinstitute.hellbender.utils.spark.SparkUtils;
import picard.sam.markduplicates.util.OpticalDuplicateFinder;
import picard.sam.markduplicates.util.ReadEnds;
import scala.Tuple2;
Expand Down Expand Up @@ -259,7 +260,7 @@ private static JavaPairRDD<String, Iterable<IndexPair<GATKRead>>> getReadsGroupe
*/
private static JavaPairRDD<String, Iterable<IndexPair<GATKRead>>> spanReadsByKey(final JavaRDD<IndexPair<GATKRead>> reads) {
JavaPairRDD<String, IndexPair<GATKRead>> nameReadPairs = reads.mapToPair(read -> new Tuple2<>(read.getValue().getName(), read));
return spanByKey(nameReadPairs).flatMapToPair(namedRead -> {
return SparkUtils.spanByKey(nameReadPairs).flatMapToPair(namedRead -> {
// for each name, separate reads by key (group name)
List<Tuple2<String, Iterable<IndexPair<GATKRead>>>> out = Lists.newArrayList();
ListMultimap<String, IndexPair<GATKRead>> multi = LinkedListMultimap.create();
Expand All @@ -274,54 +275,6 @@ private static JavaPairRDD<String, Iterable<IndexPair<GATKRead>>> spanReadsByKey
});
}

/**
* Like <code>groupByKey</code>, but assumes that values are already sorted by key, so no shuffle is needed,
* which is much faster.
* @param rdd the input RDD
* @param <K> type of keys
* @param <V> type of values
* @return an RDD where each the values for each key are grouped into an iterable collection
*/
private static <K, V> JavaPairRDD<K, Iterable<V>> spanByKey(JavaPairRDD<K, V> rdd) {
return rdd.mapPartitionsToPair(MarkDuplicatesSparkUtils::spanningIterator);
}

/**
* An iterator that groups values having the same key into iterable collections.
* @param iterator an iterator over key-value pairs
* @param <K> type of keys
* @param <V> type of values
* @return an iterator over pairs of keys and grouped values
*/
static <K, V> Iterator<Tuple2<K, Iterable<V>>> spanningIterator(Iterator<Tuple2<K, V>> iterator) {
final PeekingIterator<Tuple2<K, V>> iter = Iterators.peekingIterator(iterator);
return new AbstractIterator<Tuple2<K, Iterable<V>>>() {
@Override
protected Tuple2<K, Iterable<V>> computeNext() {
K key = null;
List<V> group = Lists.newArrayList();
while (iter.hasNext()) {
if (key == null) {
Tuple2<K, V> next = iter.next();
key = next._1();
V value = next._2();
group.add(value);
continue;
}
K nextKey = iter.peek()._1(); // don't advance...
if (nextKey.equals(key)) {
group.add(iter.next()._2()); // .. unless the keys match
} else {
return new Tuple2<>(key, group);
}
}
if (key != null) {
return new Tuple2<>(key, group);
}
return endOfData();
}
};
}

/**
* Primary landing point for MarkDuplicateSparkRecords:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import htsjdk.tribble.AsciiFeatureCodec;
import htsjdk.tribble.readers.LineIterator;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -17,6 +19,8 @@
* <ul>
* <li>Header: must begin with line HEADER or track (for IGV), followed by any number of column names,
* separated by whitespace.</li>
* <li>Header: Custom header delimiters can be provided, with a null header line being interpreted as having a non-delimeted
* header which consists of one line.</li>
* <li>Comment lines starting with # are ignored</li>
* <li>Each non-header and non-comment line is split into parts by whitespace,
* and these parts are assigned as a map to their corresponding column name in the header.
Expand All @@ -28,30 +32,62 @@
*
* </p>
*
* <h2>File format example</h2>
* <h2>File format example 1</h2>
* <pre>
* HEADER a b c
* 1:1 1 2 3
* 1:2 4 5 6
* 1:3 7 8 9
* </pre>
*
* <h2>File format example 2</h2>
* <pre>
* a b c
* 1:1 1 2 3
* 1:2 4 5 6
* 1:3 7 8 9
* </pre>
*/
public final class TableCodec extends AsciiFeatureCodec<TableFeature> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there you exposed some potential error states in this class when you allowed arbitrary header and comment lines. An alternative to making these changes would be to use some other table reader like TableReader.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wanted to avoid the table reader like the plague. It is absurdly heavy duty in its scope to handle short tsvs like this. It seems wholly unnecessary to build a feature out of a simple tsv file that could be just as easily accomplished with a scanner.

protected static final String HEADER_DELIMITER = "HEADER";
protected static final String DEFAULT_HEADER_DELIMITER = "HEADER";
protected static final String IGV_HEADER_DELIMITER = "track";
protected static final String COMMENT_DELIMITER = "#";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also change?

Suggested change
protected static final String COMMENT_DELIMITER = "#";
protected static final String DEFAULT_COMMENT_DELIMITER = "#";


private final String headerDelimiter;

protected String delimiter_regex = "\\s+";

protected List<String> header = new ArrayList<>();

public TableCodec() {
private boolean havePassedHeader = false;

/**
* Create a TableCodec with a configured header line delimiter
*
* @param headerLineDelimiter the delimeter for comment header lines, or null if the header is a single commented line-
*/
public TableCodec(final String headerLineDelimiter) {
super(TableFeature.class);
if ( "".equals(headerLineDelimiter) ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just Utils.nonEmpty() ?

// Note, it is valid for headerLineDelimiter to be null, just not empty as the regex breaks in that case.
throw new GATKException("HeaderLineDelimiter must either be a valid delimiter or null");
}
headerDelimiter = headerLineDelimiter;
}

/**
* Create a TableCodec for IGV track data.
*/
public TableCodec() {
this(DEFAULT_HEADER_DELIMITER);
}

@Override
public TableFeature decode(final String line) {
if (line.startsWith(HEADER_DELIMITER) || line.startsWith(COMMENT_DELIMITER) || line.startsWith(IGV_HEADER_DELIMITER)) {
if ((headerDelimiter != null && line.startsWith(headerDelimiter)) ||
(headerDelimiter == null && !havePassedHeader) ||
line.startsWith(COMMENT_DELIMITER) || line.startsWith(IGV_HEADER_DELIMITER)) {
havePassedHeader = true;
return null;
}
final String[] split = line.split(delimiter_regex);
Expand All @@ -66,11 +102,11 @@ public List<String> readActualHeader(final LineIterator reader) {
boolean isFirst = true;
while (reader.hasNext()) {
final String line = reader.peek(); // Peek to avoid reading non-header data
if ( isFirst && ! line.startsWith(HEADER_DELIMITER) && ! line.startsWith(COMMENT_DELIMITER)) {
if ( isFirst && ! line.startsWith(COMMENT_DELIMITER) && headerDelimiter != null && ! line.startsWith(headerDelimiter) ) {
throw new UserException.MalformedFile("TableCodec file does not have a header");
}
isFirst &= line.startsWith(COMMENT_DELIMITER);
if (line.startsWith(HEADER_DELIMITER)) {
if (headerDelimiter == null || line.startsWith(headerDelimiter)) {
reader.next(); // "Commit" the peek
if (!header.isEmpty()) {
throw new UserException.MalformedFile("Input table file seems to have two header lines. The second is = " + line);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package org.broadinstitute.hellbender.utils.read;

import htsjdk.samtools.Cigar;
import htsjdk.samtools.CigarElement;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.*;
import htsjdk.samtools.util.Locatable;
import htsjdk.samtools.util.StringUtil;
import org.broadinstitute.hellbender.exceptions.GATKException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,11 @@ public void setMateIsUnplaced() {
setIsPaired(true);

samRecord.setMateUnmappedFlag(true);
samRecord.setMateNegativeStrandFlag(false);
samRecord.setMateReferenceIndex(SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX);
samRecord.setMateAlignmentStart(SAMRecord.NO_ALIGNMENT_START);
}


@Override
public boolean isReverseStrand() {
return samRecord.getReadNegativeStrandFlag();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.broadinstitute.hellbender.utils.spark;

import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.*;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMSequenceRecord;
import htsjdk.samtools.SAMTextHeaderCodec;
Expand Down Expand Up @@ -231,4 +230,67 @@ public static JavaRDD<GATKRead> putReadsWithTheSameNameInTheSamePartition(final
return current;
});
}

/**
* Like <code>groupByKey</code>, but assumes that values are already sorted by key, so no shuffle is needed,
* which is much faster.
* @param rdd the input RDD
* @param <K> type of keys
* @param <V> type of values
* @return an RDD where each the values for each key are grouped into an iterable collection
*/
public static <K, V> JavaPairRDD<K, Iterable<V>> spanByKey(JavaPairRDD<K, V> rdd) {
return rdd.mapPartitionsToPair(SparkUtils::getSpanningIterator);
}

/**
* An iterator that groups values having the same key into iterable collections.
* @param iterator an iterator over key-value pairs
* @param <K> type of keys
* @param <V> type of values
* @return an iterator over pairs of keys and grouped values
*/
public static <K, V> Iterator<Tuple2<K, Iterable<V>>> getSpanningIterator(Iterator<Tuple2<K, V>> iterator) {
final PeekingIterator<Tuple2<K, V>> iter = Iterators.peekingIterator(iterator);
return new AbstractIterator<Tuple2<K, Iterable<V>>>() {
@Override
protected Tuple2<K, Iterable<V>> computeNext() {
K key = null;
List<V> group = Lists.newArrayList();
while (iter.hasNext()) {
if (key == null) {
Tuple2<K, V> next = iter.next();
key = next._1();
V value = next._2();
group.add(value);
continue;
}
K nextKey = iter.peek()._1(); // don't advance...
if (nextKey.equals(key)) {
group.add(iter.next()._2()); // .. unless the keys match
} else {
return new Tuple2<>(key, group);
}
}
if (key != null) {
return new Tuple2<>(key, group);
}
return endOfData();
}
};
}

/**
* Sort reads into queryname order if they are not already sorted
*/
public static JavaRDD<GATKRead> querynameSortReadsIfNecessary(JavaRDD<GATKRead> reads, int numReducers, SAMFileHeader header) {
JavaRDD<GATKRead> sortedReadsForMarking;
if (ReadUtils.isReadNameGroupedBam(header)) {
sortedReadsForMarking = reads;
} else {
header.setSortOrder(SAMFileHeader.SortOrder.queryname);
sortedReadsForMarking = sortReadsAccordingToHeader(reads, header, numReducers);
}
return sortedReadsForMarking;
}
}
Loading