Skip to content

Commit

Permalink
adding --sort-order option to SortSamSpark
Browse files Browse the repository at this point in the history
adding a --sort-order option to SortSamSpark to let users specify the what order to sort in
enabling disabled tests
fixing the tests which weren't actually asserting anything

closes #1260

work in progress

in progress

refactoring

using new SparkUtils method
  • Loading branch information
lbergelson committed May 25, 2018
1 parent 3189612 commit d37cede
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private static void writeReadsADAM(
private static void saveAsShardedHadoopFiles(
final JavaSparkContext ctx, final String outputFile, final String referenceFile,
final SAMFormat samOutputFormat, final JavaRDD<SAMRecord> reads, final SAMFileHeader header,
final boolean writeHeader) throws IOException {
final boolean writeHeader) {
// Set the static header on the driver thread.
if (samOutputFormat == SAMFormat.CRAM) {
SparkCRAMOutputFormat.setHeader(header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.barclay.help.DocumentedFeature;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.utils.spark.SparkUtils;
import picard.cmdline.programgroups.ReadDataManipulationProgramGroup;
import org.broadinstitute.hellbender.engine.filters.ReadFilter;
import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary;
import org.broadinstitute.hellbender.engine.spark.GATKSparkTool;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadCoordinateComparator;
import scala.Tuple2;

import java.util.Collections;
import java.util.List;
Expand All @@ -27,35 +26,61 @@
public final class SortSamSpark extends GATKSparkTool {
private static final long serialVersionUID = 1L;

public static final String SORT_ORDER_LONG_NAME = "sort-order";

@Override
public boolean requiresReads() { return true; }

@Argument(doc="the output file path", shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME, fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME, optional = false)
protected String outputFile;
private String outputFile;

@Argument(doc="sort order of the output file", fullName = SORT_ORDER_LONG_NAME, optional = true)
private SparkSortOrder sortOrder = SparkSortOrder.coordinate;

/**
* SortOrders that have corresponding implementations for spark.
* These correspond to a subset of {@link SAMFileHeader.SortOrder}.
*/
private enum SparkSortOrder {
coordinate(SAMFileHeader.SortOrder.coordinate),
queryname(SAMFileHeader.SortOrder.queryname);

private final SAMFileHeader.SortOrder order;

SparkSortOrder(SAMFileHeader.SortOrder order) {
this.order = order;
}

public SAMFileHeader.SortOrder getSamOrder() {
return order;
}
}

@Override
public List<ReadFilter> getDefaultReadFilters() {
return Collections.singletonList(ReadFilterLibrary.ALLOW_ALL_READS);
}

@Override
protected void onStartup() {
super.onStartup();
}

@Override
protected void runTool(final JavaSparkContext ctx) {
JavaRDD<GATKRead> reads = getReads();
int numReducers = getRecommendedNumReducers();
logger.info("Using %s reducers", numReducers);
final JavaRDD<GATKRead> reads = getReads();
final int numReducers = getRecommendedNumReducers();
logger.info("Using %d reducers", numReducers);

final SAMFileHeader header = getHeaderForReads();
header.setSortOrder(sortOrder.getSamOrder());

final SAMFileHeader readsHeader = getHeaderForReads();
ReadCoordinateComparator comparator = new ReadCoordinateComparator(readsHeader);
JavaRDD<GATKRead> sortedReads;
final JavaRDD<GATKRead> readsToWrite;
if (shardedOutput) {
sortedReads = reads
.mapToPair(read -> new Tuple2<>(read, null))
.sortByKey(comparator, true, numReducers)
.keys();
readsToWrite = SparkUtils.sortReadsAccordingToHeader(reads, header, numReducers);
} else {
sortedReads = reads; // sorting is done by writeReads below
readsToWrite = reads;
}
readsHeader.setSortOrder(SAMFileHeader.SortOrder.coordinate);
writeReads(ctx, outputFile, sortedReads);
writeReads(ctx, outputFile, readsToWrite, header);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.broadinstitute.hellbender.tools.spark.pipelines;

import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.ValidationStringency;
import org.broadinstitute.barclay.argparser.CommandLineException;
import org.broadinstitute.hellbender.CommandLineProgramTest;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.engine.spark.GATKSparkTool;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.test.ArgumentsBuilder;
import org.broadinstitute.hellbender.utils.test.SamAssertionUtils;
import org.testng.annotations.DataProvider;
Expand All @@ -20,9 +24,8 @@ public Object[][] sortBAMData() {
{"count_reads.cram", "count_reads_sorted.cram", "count_reads.fasta", ".cram", "coordinate"},
{"count_reads.bam", "count_reads_sorted.bam", "count_reads.fasta", ".cram", "coordinate"},

//SortBamSpark is missing SORT_ORDER parameter https://github.com/broadinstitute/gatk/issues/1260
// {"count_reads.bam", "count_reads.bam", null, ".bam", "queryname"},
// {"count_reads.cram", "count_reads.cram", "count_reads.fasta", ".cram", "queryname"},
{"count_reads.bam", "count_reads.bam", null, ".bam", "queryname"},
{"count_reads.cram", "count_reads.cram", "count_reads.fasta", ".cram", "queryname"},
};
}

Expand All @@ -38,21 +41,17 @@ public void testSortBAMs(
final File actualOutputFile = createTempFile("sort_sam", outputExtension);
File referenceFile = null == referenceFileName ? null : new File(getTestDataDir(), referenceFileName);
ArgumentsBuilder args = new ArgumentsBuilder();
args.add("--input"); args.add(inputFile.getCanonicalPath());
args.add("--output"); args.add(actualOutputFile.getCanonicalPath());
args.addInput(inputFile);
args.addOutput(actualOutputFile);
if (null != referenceFile) {
args.add("--R");
args.add(referenceFile.getAbsolutePath());
args.addReference(referenceFile);
}
args.add("--num-reducers"); args.add("1");
args.addArgument(GATKSparkTool.NUM_REDUCERS_LONG_NAME, "1");
args.addArgument(SortSamSpark.SORT_ORDER_LONG_NAME, sortOrderName);

//https://github.com/broadinstitute/gatk/issues/1260
// args.add("--SORT_ORDER");
// args.add(sortOrderName);
this.runCommandLine(args);

this.runCommandLine(args.getArgsArray());

SamAssertionUtils.samsEqualStringent(actualOutputFile, expectedOutputFile, ValidationStringency.DEFAULT_STRINGENCY, referenceFile);
SamAssertionUtils.assertSamsEqual(actualOutputFile, expectedOutputFile, ValidationStringency.DEFAULT_STRINGENCY, referenceFile);
}

@Test(groups = "spark")
Expand All @@ -61,13 +60,33 @@ public void test() throws Exception {
final File sortedBam = new File(getTestDataDir(), "count_reads_sorted.bam");
final File outputBam = createTempFile("sort_bam_spark", ".bam");
ArgumentsBuilder args = new ArgumentsBuilder();
args.add("--"+ StandardArgumentDefinitions.INPUT_LONG_NAME); args.add(unsortedBam.getCanonicalPath());
args.add("--"+StandardArgumentDefinitions.OUTPUT_LONG_NAME); args.add(outputBam.getCanonicalPath());
args.add("--num-reducers"); args.add("1");
args.addInput(unsortedBam);
args.addOutput(outputBam);
args.addArgument(GATKSparkTool.NUM_REDUCERS_LONG_NAME, "1");

this.runCommandLine(args.getArgsArray());
this.runCommandLine(args);

SamAssertionUtils.assertSamsEqual(outputBam, sortedBam);
}


@DataProvider
public Object[][] getInvalidSortOrders(){
return new Object[][]{
{SAMFileHeader.SortOrder.unknown},
{SAMFileHeader.SortOrder.unsorted},
{SAMFileHeader.SortOrder.duplicate}
};
}

@Test(expectedExceptions = CommandLineException.BadArgumentValue.class, dataProvider = "getInvalidSortOrders")
public void testBadSortOrders(SAMFileHeader.SortOrder badOrder){
final File unsortedBam = new File(getTestDataDir(), "count_reads.bam");
ArgumentsBuilder args = new ArgumentsBuilder();
args.addInput(unsortedBam);
args.addOutput(createTempFile("sort_bam_spark", ".bam"));
args.addArgument(SortSamSpark.SORT_ORDER_LONG_NAME, badOrder.toString());

this.runCommandLine(args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
@SQ SN:chr6 LN:101
@SQ SN:chr7 LN:404
@SQ SN:chr8 LN:202
@RG ID:0 SM:Hi,Mom!
@RG ID:0 SM:Hi,Mom! PL:ILLUMINA
@PG ID:1 PN:Hey! VN:2.0
both_reads_align_clip_marked 83 chr7 1 255 101M = 302 201 CAACAGAAGCNGGNATCTGTGTTTGTGTTTCGGATTTCCTGCTGAANNGNTTNTCGNNTCNNNNNNNNATCCCGATTTCNTTCCGCAGCTNACCTCCCAAN )'.*.+2,))&&'&*/)-&*-)&.-)&)&),/-&&..)./.,.).*&&,&.&&-)&&&0*&&&&&&&&/32/,01460&&/6/*0*/2/283//36868/& RG:Z:0
both_reads_present_only_first_aligns 89 chr7 1 255 101M * 0 0 CAACAGAAGCNGGNATCTGTGTTTGTGTTTCGGATTTCCTGCTGAANNGNTTNTCGNNTCNNNNNNNNATCCCGATTTCNTTCCGCAGCTNACCTCCCAAN )'.*.+2,))&&'&*/)-&*-)&.-)&)&),/-&&..)./.,.).*&&,&.&&-)&&&0*&&&&&&&&/32/,01460&&/6/*0*/2/283//36868/& RG:Z:0
Expand Down

0 comments on commit d37cede

Please sign in to comment.