Date: Tue, 27 Nov 2018 16:44:01 -0500
Subject: [PATCH 02/15] responding to the first round of comments
.../tools/spark/ | 393 +++++++++---------
.../markduplicates/ | 16 +-
.../hellbender/utils/spark/ | 13 +
.../spark/ | 102 +++--
4 files changed, 261 insertions(+), 263 deletions(-)
diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/ b/src/main/java/org/broadinstitute/hellbender/tools/spark/
index 0811f38948f..c38e8603977 100644
--- a/src/main/java/org/broadinstitute/hellbender/tools/spark/
+++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/
@@ -5,6 +5,7 @@
import htsjdk.samtools.*;
import htsjdk.samtools.util.*;
import htsjdk.tribble.AbstractFeatureReader;
import htsjdk.tribble.FeatureReader;
@@ -22,16 +23,16 @@
import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary;
import org.broadinstitute.hellbender.engine.spark.GATKSparkTool;
import org.broadinstitute.hellbender.exceptions.GATKException;
+import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.codecs.table.TableCodec;
import org.broadinstitute.hellbender.utils.codecs.table.TableFeature;
import org.broadinstitute.hellbender.utils.spark.SparkUtils;
import picard.cmdline.programgroups.ReadDataManipulationProgramGroup;
import scala.Tuple2;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -52,14 +53,14 @@
* Usage Examples
* Output to a single file
- * java -jar picard.jar RevertSam \\
+ * gatk RevertSamSpark \\
* -I input.bam \\
* -O reverted.bam
Output by read group into multiple files with sample map
- * java -jar picard.jar RevertSam \\
+ * gatk RevertSamSpark \\
* -I input.bam \\
* --output-by-readgroup \\
* --output-map reverted_bam_paths.tsv
@@ -67,7 +68,7 @@
Output by read group with no output map
- * java -jar picard.jar RevertSam \\
+ * gatk RevertSamSpark \\
* -I input.bam \\
* --output-by-readgroup \\
* -O /write/reverted/read/group/bams/in/this/dir
@@ -86,27 +87,27 @@
programGroup = ReadDataManipulationProgramGroup.class)
public class RevertSamSpark extends GATKSparkTool {
- static final String USAGE_SUMMARY = "Reverts SAM or BAM files to a previous state. ";
+ static final String USAGE_SUMMARY = "Reverts SAM or BAM files to a previous state.";
static final String USAGE_DETAILS = "This tool removes or restores certain properties of the SAM records, including alignment " +
"information, which can be used to produce an unmapped BAM (uBAM) from a previously aligned BAM. It is also capable of " +
"restoring the original quality scores of a BAM file that has already undergone base quality score recalibration (BQSR) if the" +
"original qualities were retained.\n" +
\n" +
"Example with single output:
\n" +
- "java -jar picard.jar RevertSam \\\n" +
+ "gatk RevertSamSpark \\\n" +
" -I input.bam \\\n" +
" -O reverted.bam\n" +
"\n" +
"Example outputting by read group with output map:
\n" +
- "java -jar picard.jar RevertSam \\\n" +
+ "gatk RevertSamSpark \\\n" +
" -I input.bam \\\n" +
" --output-by-readgroup \\\n" +
" --output-map reverted_bam_paths.tsv\n" +
"\n" +
"Will output a BAM/SAM file per read group.\n" +
"Example outputting by read group without output map:
\n" +
- "java -jar picard.jar RevertSam \\\n" +
- " I=input.bam \\\n" +
+ "gatk RevertSamSpark \\\n" +
+ " -I input.bam \\\n" +
" --output-by-readgroup \\\n" +
" -O /write/reverted/read/group/bams/in/this/dir\n" +
"\n" +
@@ -116,89 +117,68 @@ public class RevertSamSpark extends GATKSparkTool {
"LENIENT or SILENT if the failures are expected to be obviated by the reversion process " +
"(e.g. invalid alignment information will be obviated when the dontRemoveAlignmentInformation option is used).\n" +
+ public static final String OUTPUT_MAP_READ_GROUP_FIELD_NAME = "READ_GROUP_ID";
+ public static final String OUTPUT_MAP_OUTPUT_FILE_FIELD_NAME = "OUTPUT";
public boolean requiresReads() { return true; }
- @Argument(mutex = {OUTPUT_MAP_ARG}, shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME, fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME, doc = "The output SAM/BAM file to create, or an output directory if '--output-by-readgroup' is set.")
- public File output;
+ @Argument(mutex = {OUTPUT_MAP_LONG_NAME}, shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME,
+ fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME,
+ doc = "The output SAM/BAM file to create, or an output directory if '--output-by-readgroup' is set.")
+ public String output;
- public static final String OUTPUT_MAP_ARG = "output-map";
- @Argument(mutex = {StandardArgumentDefinitions.OUTPUT_LONG_NAME}, fullName = OUTPUT_MAP_ARG, doc = "Tab separated file with two columns, READ_GROUP_ID and OUTPUT, providing file mapping only used if '--output-by-readgroup' is set.")
- public File outputMap;
+ public static final String OUTPUT_MAP_LONG_NAME = "output-map";
+ @Argument(mutex = {StandardArgumentDefinitions.OUTPUT_LONG_NAME},
+ doc = "Tab separated file with two columns, OUTPUT_MAP_READ_GROUP_FIELD_NAME and OUTPUT_MAP_OUTPUT_FILE_FIELD_NAME, providing file mapping only used if '--output-by-readgroup' is set.")
+ public String outputMap;
- public static final String OUTPUT_BY_READGROUP_ARG = "output-by-readgroup";
- @Argument(fullName = OUTPUT_BY_READGROUP_ARG, doc = "When true, outputs each read group in a separate file.")
+ public static final String OUTPUT_BY_READGROUP_LONG_NAME = "output-by-readgroup";
+ @Argument(fullName = OUTPUT_BY_READGROUP_LONG_NAME, doc = "When true, outputs each read group in a separate file.")
public boolean outputByReadGroup = false;
- public static enum FileType implements CommandLineParser.ClpEnum {
- sam("Generate SAM files."),
- bam("Generate BAM files."),
- cram("Generate CRAM files."),
- dynamic("Generate files based on the extention of input.");
- final String description;
- FileType(String descrition) {
- this.description = descrition;
- }
- @Override
- public String getHelpDoc() {
- return description;
- }
- }
@Argument(doc = "WARNING: This option is potentially destructive. If enabled will discard reads in order to produce " +
"a consistent output BAM. Reads discarded include (but are not limited to) paired reads with missing " +
"mates, duplicated records, records with mismatches in length of bases and qualities. This option should " +
"only be enabled if the output sort order is queryname and will always cause sorting to occur.")
public boolean sanitize = false;
+ public static final String KEEP_FIRST_DUPLICATE_LONG_NAME = "keep-first-duplicate";
@Argument(doc = "If 'sanitize' only one record when we find more than one record with the same name for R1/R2/unpaired reads respectively. " +
"For paired end reads, keeps only the first R1 and R2 found respectively, and discards all unpaired reads. " +
"Duplicates do not refer to the duplicate flag in the FLAG field, but instead reads with the same name.",
- fullName = "keep-first-duplicate")
public boolean keepFirstDuplicate = false;
- public static final String OUTPUT_BY_READGROUP_FILE_FORMAT_ARG = "output-by-readgroup-file-format";
- @Argument(fullName = OUTPUT_BY_READGROUP_FILE_FORMAT_ARG, doc = "When using outputByReadGroup, the output file format can be set to a certain format.")
+ public static final String OUTPUT_BY_READGROUP_FILE_FORMAT_LONG_NAME = "output-by-readgroup-file-format";
+ @Argument(fullName = OUTPUT_BY_READGROUP_FILE_FORMAT_LONG_NAME, doc = "When using --output-by-readgroup, the output file format can be set to a certain format.")
public FileType outputByReadgroupFileFormat = FileType.dynamic;
@Argument(shortName = StandardArgumentDefinitions.SORT_ORDER_SHORT_NAME, fullName = StandardArgumentDefinitions.SORT_ORDER_LONG_NAME, doc = "The sort order to create the reverted output file with, defaults to whatever is specified in the current file", optional = true)
public SAMFileHeader.SortOrder sortOrder = SAMFileHeader.SortOrder.queryname;
- public static final String DONT_RESTORE_ORIGINAL_QUALITIES_ARG = "dont-restore-original-qualities";
- @Argument( fullName = DONT_RESTORE_ORIGINAL_QUALITIES_ARG, doc = "Set to prevent the tool from setting the OQ field to the QUAL where avalible.", optional = true)
+ public static final String DONT_RESTORE_ORIGINAL_QUALITIES_LONG_NAME = "dont-restore-original-qualities";
+ @Argument( fullName = DONT_RESTORE_ORIGINAL_QUALITIES_LONG_NAME, doc = "Set to prevent the tool from setting the OQ field to the QUAL where available.", optional = true)
public boolean dontRestoreOriginalQualities = false;
- public static final String DONT_REMOVE_DUPLICATE_INFORMATION_ARG = "remove-duplicate-information";
- @Argument(fullName = DONT_REMOVE_DUPLICATE_INFORMATION_ARG, doc = "By default we remove duplicate read flags from all reads. Note that if this is true, " +
+ public static final String DONT_REMOVE_DUPLICATE_INFORMATION_LONG_NAME = "remove-duplicate-information";
+ @Argument(fullName = DONT_REMOVE_DUPLICATE_INFORMATION_LONG_NAME, doc = "By default we remove duplicate read flags from all reads. Note that if this is true, " +
" the output may have the unusual but sometimes desirable trait of having unmapped reads that are marked as duplicates.")
- public boolean dontRemoveDuplicateInformation = false; //TODO flip this value
+ public boolean dontRemoveDuplicateInformation = false;
- public static final String DONT_REMOVE_ALIGNMENT_INFORMATION_ARG = "remove-alignment-information";
- @Argument(fullName = DONT_REMOVE_ALIGNMENT_INFORMATION_ARG, doc = "Remove all alignment information from the file.")
+ public static final String DONT_REMOVE_ALIGNMENT_INFORMATION_LONG_NAME = "remove-alignment-information";
+ @Argument(fullName = DONT_REMOVE_ALIGNMENT_INFORMATION_LONG_NAME, doc = "Remove all alignment information from the file.")
public boolean dontRemoveAlignmentInformation = false;
- public static final String ATTRIBUTE_TO_CLEAR_ARG = "attributes-to-clear";
- @Argument(fullName = ATTRIBUTE_TO_CLEAR_ARG,doc = "When removing alignment information, the set of optional tags to remove.", optional = true)
- public List attributesToClear = new ArrayList();
+ public static final String ATTRIBUTE_TO_CLEAR_LONG_NAME = "attributes-to-clear";
+ @Argument(fullName = ATTRIBUTE_TO_CLEAR_LONG_NAME, doc = "When removing alignment information, the set of optional tags to remove.", optional = true)
+ public Set attributesToClear = new HashSet();
- public static final String REMOVE_DEFAULT_ATTRIBUTE_TO_CLEAR_ARG = "remove-default-attributes-to-clear";
- @Argument(fullName = REMOVE_DEFAULT_ATTRIBUTE_TO_CLEAR_ARG,doc = "When removing alignment information, the set of optional tags to remove.", optional = true)
+ public static final String REMOVE_DEFAULT_ATTRIBUTE_TO_CLEAR_LONG_NAME = "remove-default-attributes-to-clear";
+ @Argument(fullName = REMOVE_DEFAULT_ATTRIBUTE_TO_CLEAR_LONG_NAME, doc = "When removing alignment information, the set of optional tags to remove.", optional = true)
public boolean removeDefaults = false;
- public static List DEFAULT_ATTRIBUTES_TO_CLEAR = new ArrayList() {{
- add(;
- add(;
- add(;
- add(;
- add(;
- add(; // Supplementary alignment metadata
- add(; // Mate Cigar
- add(;
- }};
public static final String SAMPLE_ALIAS_ARG = "sample-alias";
@Argument(fullName = SAMPLE_ALIAS_ARG, doc = "The sample alias to use in the reverted output file. This will override the existing " +
"sample alias in the file and is used only if all the read groups in the input file have the " +
@@ -216,13 +196,43 @@ public List getDefaultReadFilters() {
return Collections.singletonList(ReadFilterLibrary.ALLOW_ALL_READS);
+ public static List DEFAULT_ATTRIBUTES_TO_CLEAR = new ArrayList() {{
+ add(;
+ add(;
+ add(;
+ add(;
+ add(;
+ add(; // Supplementary alignment metadata
+ add(; // Mate Cigar
+ add(;
+ }};
+ public enum FileType implements CommandLineParser.ClpEnum {
+ sam("Generate SAM files."),
+ bam("Generate BAM files."),
+ cram("Generate CRAM files."),
+ dynamic("Generate files based on the extention of input.");
+ final String description;
+ FileType(String description) {
+ this.description = description;
+ }
+ @Override
+ public String getHelpDoc() {
+ return description;
+ }
+ }
* Enforce that output ordering is queryname when sanitization is turned on since it requires a queryname sort.
+ * Also checks to ensure that the user has chosen a valid subset of arguments pertaining to output and sanitization.
protected String[] customCommandLineValidation() {
final List errors = new ArrayList<>();
- RevertSamSpark.ValidationUtil.validateOutputParams(outputByReadGroup, output, outputMap, errors);
+ validateOutputParams(outputByReadGroup, output, outputMap);
if (!sanitize && keepFirstDuplicate) errors.add("'keepFirstDuplicate' cannot be used without 'sanitize'");
@@ -234,37 +244,37 @@ protected String[] customCommandLineValidation() {
protected void runTool(JavaSparkContext ctx) {
- Broadcast headerb = ctx.broadcast(getHeaderForReads());
+ Broadcast headerBroadcast = ctx.broadcast(getHeaderForReads());
JavaRDD reads = getReads();
// Grab the input header and remap values where appropriate
- SAMFileHeader inHeader = getHeaderForReads();
- ValidationUtil.validateHeaderOverrides(inHeader, sampleAlias, libraryName);
+ SAMFileHeader localHeader = headerBroadcast.getValue();
+ validateHeaderOverrides(localHeader, sampleAlias, libraryName);
if (sampleAlias != null) {
- inHeader.getReadGroups().forEach(rg -> rg.setSample(sampleAlias));
+ localHeader.getReadGroups().forEach(rg -> rg.setSample(sampleAlias));
if (libraryName != null) {
- inHeader.getReadGroups().forEach(rg -> rg.setLibrary(libraryName));
+ localHeader.getReadGroups().forEach(rg -> rg.setLibrary(libraryName));
// Map the readgroups in the header to appropriate
- Map writerMap = getOutputMap(outputMap,
+ Map writerMap = getOutputMap(outputMap,
getDefaultExtension(readArguments.getReadFiles().get(0).toString(), outputByReadgroupFileFormat),
- inHeader.getReadGroups(),
+ localHeader.getReadGroups(),
// Construct appropriate headers for the output files
- final Map headerMap = getReadgroupHeaderMap(inHeader, writerMap);
+ final Map headerMap = getReadGroupHeaderMap(localHeader, writerMap);
// Revert the reads based on the given attributes
- List attributesToRevert = removeDefaults? DEFAULT_ATTRIBUTES_TO_CLEAR: new ArrayList<>();
+ List attributesToRevert = removeDefaults ? DEFAULT_ATTRIBUTES_TO_CLEAR : new ArrayList<>();
JavaRDD readsReverted = revertReads(reads, attributesToRevert);
@@ -272,18 +282,18 @@ protected void runTool(JavaSparkContext ctx) {
// Sanitize the reads, sorting them into appropriate order if necessary
if (sanitize) {
- Map readGroupFormatMap = createReadGroupFormatMap(readsReverted, headerb, !dontRestoreOriginalQualities);
+ Map readGroupFormatMap = createReadGroupFormatMap(readsReverted, headerBroadcast, !dontRestoreOriginalQualities);
- readsReverted = sanitize(readGroupFormatMap, readsReverted, inHeader, keepFirstDuplicate);
+ readsReverted = sanitize(readGroupFormatMap, readsReverted, localHeader, keepFirstDuplicate);
// Write the one or many read output files
- for (Map.Entry rmap: writerMap.entrySet()) {
+ for (Map.Entry rmap: writerMap.entrySet()) {
//TODO what to do if the readgroup isn't present
final String key = rmap.getKey();
JavaRDD filteredreads = rmap.getKey()==null? readsReverted :
readsReverted.filter(r -> r.getReadGroup().equals(key));
- writeReads(ctx, rmap.getValue().getPath(), filteredreads, headerMap.get(rmap.getKey())); //TODO proper header map
+ writeReads(ctx, rmap.getValue().toString(), filteredreads, headerMap.get(rmap.getKey())); //TODO proper header map
@@ -294,7 +304,7 @@ protected void runTool(JavaSparkContext ctx) {
* @param reads Reads RDD over which to iterate and detect readgroups
* @param inHeader Header describing the readgroups present in the bam
* @param restoreOriginalQualities Whether to use the OQ tag for determining the map
- * @return
+ * @return the best guess at the quality encoding format present for each readgroup based on the first {@link QualityEncodingDetector#DEFAULT_MAX_RECORDS_TO_ITERATE} reads in each readgroup.
private Map createReadGroupFormatMap( final JavaRDD reads,
final Broadcast inHeader,
@@ -348,7 +358,7 @@ public SAMRecord next() {
* and one read labled as second in pair to treat as the representative reads, throwing away the rest.
private JavaRDD sanitize(final Map readGroupToFormat, final JavaRDD reads, final SAMFileHeader header, final boolean keepFirstDuplicate) {
- JavaRDD sortedReads = querynameSortReadsIfNecessary(reads.filter(r -> r.getBases().length == r.getBaseQualities().length), getRecommendedNumReducers(), header);
+ JavaRDD sortedReads = SparkUtils.querynameSortReadsIfNecessary(reads.filter(r -> r.getBases().length == r.getBaseQualities().length), getRecommendedNumReducers(), header);
JavaPairRDD> readsByGroup = spanReadsByKey(sortedReads);
return readsByGroup.flatMap(group -> {
@@ -442,26 +452,15 @@ private static JavaPairRDD> spanReadsByKey(final Java
- private static JavaRDD querynameSortReadsIfNecessary(JavaRDD reads, int numReducers, SAMFileHeader headerForTool) {
- JavaRDD sortedReadsForMarking;
- if (ReadUtils.isReadNameGroupedBam(headerForTool)) {
- sortedReadsForMarking = reads;
- } else {
- headerForTool.setSortOrder(SAMFileHeader.SortOrder.queryname);
- sortedReadsForMarking = SparkUtils.sortReadsAccordingToHeader(reads, headerForTool, numReducers);
- }
- return sortedReadsForMarking;
- }
- private Map getReadgroupHeaderMap(SAMFileHeader inHeader, Map writerMap) {
+ private Map getReadGroupHeaderMap(SAMFileHeader inHeader, Map writerMap) {
final Map headerMap;
if (outputByReadGroup) {
if (inHeader.getReadGroups().isEmpty()) {
- throw new GATKException("The header is missing its read group map");
+ throw new UserException("The header is missing its read group map");
- ValidationUtil.assertAllReadGroupsMapped(writerMap, inHeader.getReadGroups());
+ assertAllReadGroupsMapped(writerMap, inHeader.getReadGroups());
headerMap = new HashMap<>();
for (final SAMReadGroupRecord readGroup : inHeader.getReadGroups()) {
final SAMFileHeader header = createOutHeader(inHeader, sortOrder, !dontRemoveAlignmentInformation);
@@ -471,12 +470,12 @@ private Map getReadgroupHeaderMap(SAMFileHeader inHeader,
} else {
final SAMFileHeader singleOutHeader = createOutHeader(inHeader, sortOrder, !dontRemoveAlignmentInformation);
- headerMap = Collections.singletonMap(null,singleOutHeader);
+ headerMap = Collections.singletonMap(null, singleOutHeader);
return headerMap;
- private SAMFileHeader createOutHeader(
+ private static SAMFileHeader createOutHeader(
final SAMFileHeader inHeader,
final SAMFileHeader.SortOrder sortOrder,
final boolean removeAlignmentInformation) {
@@ -492,13 +491,13 @@ private SAMFileHeader createOutHeader(
static String getDefaultExtension(final String input, final FileType setting) {
if (setting == FileType.dynamic) {
- if (input.endsWith(".sam")) {
- return ".sam";
+ if (input.endsWith(IOUtil.SAM_FILE_EXTENSION)) {
- if (input.endsWith(".cram")) {
- throw new GATKException("Input file is a cram. This is currently unsupported for this tool");//TODO unsupported feature
+ if (input.endsWith(CramIO.CRAM_FILE_EXTENSION)) {
+ throw new UserException.UnimplementedFeature("Input file is a cram. This is currently unsupported for this tool");
- return ".bam";
+ return BamFileIoUtils.BAM_FILE_EXTENSION;
} else {
return "." + setting.toString();
@@ -554,153 +553,159 @@ public JavaRDD revertReads(JavaRDD reads, List attri
- static Map getOutputMap(
- final File outputMapFile,
- final File outputDir,
+ static Map getOutputMap(
+ final String outputMapFile,
+ final String outputDir,
final String defaultExtension,
final List readGroups,
final boolean outputByReadgroup) {
if (outputByReadgroup) {
- final Map outputMap;
+ final Map outputMap;
if (outputMapFile != null) {
try {
outputMap = createOutputMapFromFile(outputMapFile);
} catch (IOException e) {
- throw new GATKException("Encountered an error reading output map file", e);
+ throw new UserException("Encountered an error reading output map file", e);
} else {
outputMap = createOutputMapFromHeader(readGroups, outputDir, defaultExtension);
return outputMap;
} else {
- return Collections.singletonMap(null, outputDir);
+ return Collections.singletonMap(null, IOUtils.getPath(outputDir));
// Names the files based on the locations laid out in the readgroup map
- private static Map createOutputMapFromFile(final File outputMapFile) throws IOException {
- final Map outputMap = new HashMap<>();
- final FeatureReader parser = AbstractFeatureReader.getFeatureReader(outputMapFile.getAbsolutePath(), new TableCodec(null), false);
- for (final TableFeature row : parser.iterator()) {
- final String id = row.get("READ_GROUP_ID");
- final String output = row.get("OUTPUT");
- final File outputPath = new File(output);
- outputMap.put(id, outputPath);
+ private static Map createOutputMapFromFile(final String outputMapFile) throws IOException {
+ final Map outputMap = new HashMap<>();
+ try (final FeatureReader parser = AbstractFeatureReader.getFeatureReader(outputMapFile, new TableCodec(null), false);) {
+ for (final TableFeature row : parser.iterator()) {
+ final String id = row.get(OUTPUT_MAP_READ_GROUP_FIELD_NAME);
+ final String output = row.get(OUTPUT_MAP_OUTPUT_FILE_FIELD_NAME);
+ final Path outputPath = IOUtils.getPath(output);
+ outputMap.put(id, outputPath);
+ }
- CloserUtil.close(parser);
return outputMap;
// Names the files based on the readgroups individually presented in the header
- private static Map createOutputMapFromHeader(final List readGroups, final File outputDir, final String extension) {
- final Map outputMap = new HashMap<>();
+ private static Map createOutputMapFromHeader(final List readGroups, final String outputDir, final String extension) {
+ final Map outputMap = new HashMap<>();
for (final SAMReadGroupRecord readGroup : readGroups) {
final String id = readGroup.getId();
final String fileName = id + extension;
- final Path outputPath = Paths.get(outputDir.toString(), fileName);
- outputMap.put(id, outputPath.toFile());
+ final Path outputPath = Paths.get(outputDir, fileName);
+ outputMap.put(id, outputPath);
return outputMap;
-// ________________________________________________________________________________________________________________________
-// sum garbage
-// ________________________________________________________________________________________________________________________
* Methods used for validating parameters to RevertSam.
- static class ValidationUtil {
- static void validateOutputParams(final boolean outputByReadGroup, final File output, final File outputMap, final List errors) {
+ static List validateOutputParams(final boolean outputByReadGroup, final String output, final String outputMap) {
+ final List errors = new ArrayList<>();
+ try {
if (outputByReadGroup) {
- validateOutputParamsByReadGroup(output, outputMap, errors);
+ errors.addAll(validateOutputParamsByReadGroup(output, outputMap));
} else {
- validateOutputParamsNotByReadGroup(output, outputMap, errors);
+ errors.addAll(validateOutputParamsNotByReadGroup(output, outputMap));
+ } catch (IOException e) {
+ throw new UserException.BadInput("Error while validating input file", e);
+ return errors;
+ }
- static void validateOutputParamsByReadGroup(final File output, final File outputMap, final List errors) {
- if (output != null) {
- if (!Files.isDirectory(output.toPath())) {
- errors.add("When '--output-by-readgroup' is set and output is provided, it must be a directory: " + output);
- }
- return;
- }
- // output is null if we reached here
- if (outputMap == null) {
- errors.add("Must provide either output or outputMap when '--output-by-readgroup' is set.");
- return;
- }
- if (!Files.isReadable(outputMap.toPath())) {
- errors.add("Cannot read outputMap " + outputMap);
- return;
- }
- final FeatureReader parser = AbstractFeatureReader.getFeatureReader(outputMap.getAbsolutePath(), new TableCodec(null),false);
- if (!RevertSamSpark.ValidationUtil.isOutputMapHeaderValid((List)parser.getHeader())) {
- errors.add("Invalid header: " + outputMap + ". Must be a tab-separated file with READ_GROUP_ID as first column and output as second column.");
+ @SuppressWarnings("unchecked")
+ static List validateOutputParamsByReadGroup(final String output, final String outputMap) throws IOException {
+ final List errors = new ArrayList<>();
+ if (output != null) {
+ if (!Files.isDirectory(IOUtil.getPath(output))) {
+ errors.add("When '--output-by-readgroup' is set and output is provided, it must be a directory: " + output);
+ return errors;
+ }
+ // output is null if we reached here
+ if (outputMap == null) {
+ errors.add("Must provide either output or outputMap when '--output-by-readgroup' is set.");
+ return errors;
+ if (!Files.isReadable(IOUtil.getPath(outputMap))) {
+ errors.add("Cannot read outputMap " + outputMap);
+ return errors;
+ }
+ final FeatureReader parser = AbstractFeatureReader.getFeatureReader(outputMap, new TableCodec(null),false);
+ if (!isOutputMapHeaderValid((List)parser.getHeader())) {
+ errors.add("Invalid header: " + outputMap + ". Must be a tab-separated file with OUTPUT_MAP_READ_GROUP_FIELD_NAME as first column and output as second column.");
+ }
+ return errors;
+ }
- static void validateOutputParamsNotByReadGroup(final File output, final File outputMap, final List errors) {
- if (outputMap != null) {
- errors.add("Cannot provide outputMap when '--output-by-read' isn't set. Provide output instead.");
- }
- if (output == null) {
- errors.add("output is required when '--output-by-read'");
- return;
- }
- if (Files.isDirectory(output.toPath())) {
- errors.add("output " + output + " should not be a directory when '--output-by-read'");
- }
+ static List validateOutputParamsNotByReadGroup(final String output, final String outputMap) throws IOException {
+ final List errors = new ArrayList<>();
+ if (outputMap != null) {
+ errors.add("Cannot provide outputMap when '--output-by-read' isn't set. Provide output instead.");
+ }
+ if (output == null) {
+ errors.add("output is required when '--output-by-read'");
+ return errors;
+ if (Files.isDirectory(IOUtil.getPath(output))) {
+ errors.add("output " + output + " should not be a directory when '--output-by-read'");
+ }
+ return errors;
+ }
- /**
- * If we are going to override sampleAlias or libraryName, make sure all the read
- * groups have the same values.
- */
- static void validateHeaderOverrides(
- final SAMFileHeader inHeader,
- final String sampleAlias,
- final String libraryName) {
- final List rgs = inHeader.getReadGroups();
- if (sampleAlias != null || libraryName != null) {
- boolean allSampleAliasesIdentical = true;
- boolean allLibraryNamesIdentical = true;
- for (int i = 1; i < rgs.size(); i++) {
- if (!rgs.get(0).getSample().equals(rgs.get(i).getSample())) {
- allSampleAliasesIdentical = false;
- }
- if (!rgs.get(0).getLibrary().equals(rgs.get(i).getLibrary())) {
- allLibraryNamesIdentical = false;
- }
- }
- if (sampleAlias != null && !allSampleAliasesIdentical) {
- throw new GATKException("Read groups have multiple values for sample. " +
- "A value for sampleAlias cannot be supplied.");
+ /**
+ * If we are going to override sampleAlias or libraryName, make sure all the read
+ * groups have the same values.
+ */
+ static void validateHeaderOverrides(
+ final SAMFileHeader inHeader,
+ final String sampleAlias,
+ final String libraryName) {
+ final List rgs = inHeader.getReadGroups();
+ if (sampleAlias != null || libraryName != null) {
+ boolean allSampleAliasesIdentical = true;
+ boolean allLibraryNamesIdentical = true;
+ for (int i = 1; i < rgs.size(); i++) {
+ if (!rgs.get(0).getSample().equals(rgs.get(i).getSample())) {
+ allSampleAliasesIdentical = false;
- if (libraryName != null && !allLibraryNamesIdentical) {
- throw new GATKException("Read groups have multiple values for library name. " +
- "A value for library name cannot be supplied.");
+ if (!rgs.get(0).getLibrary().equals(rgs.get(i).getLibrary())) {
+ allLibraryNamesIdentical = false;
+ if (sampleAlias != null && !allSampleAliasesIdentical) {
+ throw new UserException("Read groups have multiple values for sample. " +
+ "A value for sampleAlias cannot be supplied.");
+ }
+ if (libraryName != null && !allLibraryNamesIdentical) {
+ throw new UserException("Read groups have multiple values for library name. " +
+ "A value for library name cannot be supplied.");
+ }
+ }
- static void assertAllReadGroupsMapped(final Map outputMap, final List readGroups) {
- for (final SAMReadGroupRecord readGroup : readGroups) {
- final String id = readGroup.getId();
- final File output = outputMap.get(id);
- if (output == null) {
- throw new GATKException("Read group id " + id + " not found in outputMap " + outputMap);
- }
+ static void assertAllReadGroupsMapped(final Map outputMap, final List readGroups) {
+ for (final SAMReadGroupRecord readGroup : readGroups) {
+ final String id = readGroup.getId();
+ final Path output = outputMap.get(id);
+ if (output == null) {
+ throw new GATKException("Read group id " + id + " not found in outputMap " + outputMap);
+ }
- static boolean isOutputMapHeaderValid(final List columnLabels) {
- return columnLabels.size() >= 2 &&
- "READ_GROUP_ID".equals(columnLabels.get(0)) &&
- "OUTPUT".equals(columnLabels.get(1));
- }
+ static boolean isOutputMapHeaderValid(final List columnLabels) {
+ return columnLabels.size() >= 2 &&
+ OUTPUT_MAP_READ_GROUP_FIELD_NAME.equals(columnLabels.get(0)) &&
+ OUTPUT_MAP_OUTPUT_FILE_FIELD_NAME.equals(columnLabels.get(1));
diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/ b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/
index 98051ad0e96..d25fb877bd1 100644
--- a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/
+++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/
@@ -97,7 +97,7 @@ public static JavaRDD mark(final JavaRDD reads, final SAMFil
SAMFileHeader headerForTool = header.clone();
// If the input isn't queryname sorted, sort it before duplicate marking
- final JavaRDD sortedReadsForMarking = querynameSortReadsIfNecessary(reads, numReducers, headerForTool);
+ final JavaRDD sortedReadsForMarking = SparkUtils.querynameSortReadsIfNecessary(reads, numReducers, headerForTool);
// If we need to remove optical duplicates or tag them, then make sure we are keeping track
final boolean markOpticalDups = (taggingPolicy != MarkDuplicates.DuplicateTaggingPolicy.DontTag);
@@ -171,20 +171,6 @@ public static JavaRDD mark(final JavaRDD reads, final SAMFil
- /**
- * Sort reads into queryname order if they are not already sorted
- */
- private static JavaRDD querynameSortReadsIfNecessary(JavaRDD reads, int numReducers, SAMFileHeader headerForTool) {
- JavaRDD sortedReadsForMarking;
- if (ReadUtils.isReadNameGroupedBam(headerForTool)) {
- sortedReadsForMarking = reads;
- } else {
- headerForTool.setSortOrder(SAMFileHeader.SortOrder.queryname);
- sortedReadsForMarking = SparkUtils.sortReadsAccordingToHeader(reads, headerForTool, numReducers);
- }
- return sortedReadsForMarking;
- }
* A custom partitioner designed to cut down on spark shuffle costs.
* This is designed such that getPartition(key) is called on a key which corresponds to the already known target partition
diff --git a/src/main/java/org/broadinstitute/hellbender/utils/spark/ b/src/main/java/org/broadinstitute/hellbender/utils/spark/
index e0fc3ef3f49..a914caea5ad 100644
--- a/src/main/java/org/broadinstitute/hellbender/utils/spark/
+++ b/src/main/java/org/broadinstitute/hellbender/utils/spark/
@@ -282,4 +282,17 @@ protected Tuple2> computeNext() {
+ /**
+ * Sort reads into queryname order if they are not already sorted
+ */
+ public static JavaRDD querynameSortReadsIfNecessary(JavaRDD reads, int numReducers, SAMFileHeader headerForTool) {
+ JavaRDD sortedReadsForMarking;
+ if (ReadUtils.isReadNameGroupedBam(headerForTool)) {
+ sortedReadsForMarking = reads;
+ } else {
+ headerForTool.setSortOrder(SAMFileHeader.SortOrder.queryname);
+ sortedReadsForMarking = sortReadsAccordingToHeader(reads, headerForTool, numReducers);
+ }
+ return sortedReadsForMarking;
+ }
diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/ b/src/test/java/org/broadinstitute/hellbender/tools/spark/
index fb433ffdd0d..9b987869a69 100644
--- a/src/test/java/org/broadinstitute/hellbender/tools/spark/
+++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/
@@ -4,15 +4,17 @@
import htsjdk.samtools.util.CloserUtil;
import org.broadinstitute.hellbender.CommandLineProgramTest;
import org.broadinstitute.hellbender.exceptions.GATKException;
-import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.testutils.ArgumentsBuilder;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.*;
@Test(groups = "Spark")
@@ -78,13 +80,13 @@ public void basicPositiveTests(final SAMFileHeader.SortOrder so, final boolean r
if (outputByReadGroup) {
- args.addPositionalArgument("--"+RevertSamSpark.OUTPUT_BY_READGROUP_ARG);
+ args.addPositionalArgument("--"+RevertSamSpark.OUTPUT_BY_READGROUP_LONG_NAME);
if (so != null) {
args.addArgument("sort-order",; //TODO decide on sort order outputing
if (!removeAlignmentInfo) {
- args.addPositionalArgument("--"+RevertSamSpark.DONT_REMOVE_ALIGNMENT_INFORMATION_ARG);
+ args.addPositionalArgument("--"+RevertSamSpark.DONT_REMOVE_ALIGNMENT_INFORMATION_LONG_NAME);
if (sample != null) {
@@ -119,7 +121,7 @@ public void testOutputByReadGroupWithOutputMap() throws Exception {
final String outputPath1 = outputDir + "/rg1.sam";
final String outputPath2 = outputDir + "/my_rg2.bam";
final String outputPath3 = outputDir + "/my_rg3.sam";//TODO not used?
- mapWriter.println("READ_GROUP_ID\tOUTPUT");
mapWriter.println("0\t" + outputPath0);
mapWriter.println("2\t" + outputPath2);
mapWriter.println("1\t" + outputPath1);
@@ -141,7 +143,7 @@ public void testOutputByReadGroupWithOutputMap() throws Exception {
- "--"+RevertSamSpark.ATTRIBUTE_TO_CLEAR_ARG,
@@ -154,11 +156,12 @@ public void testOutputByReadGroupWithOutputMap() throws Exception {
verifyPositiveResults(output2, reverter, true, true, true, true, "2", 2, "test_sample_1", "test_library_1");
- @Test (expectedExceptions = UserException.class)
+ @Test
+ // TODO the purpose of this test is unclear to me
public void testSingleEndSanitize() throws Exception {
final File output = File.createTempFile("single_end_reverted", ".sam");
- final String args[] = { "-I " + singleEndSamToRevert, "-O " + output.getAbsolutePath(), "--sanitize"};
+ final String args[] = { "-I", singleEndSamToRevert.getAbsolutePath(), "-O", output.getAbsolutePath(), "--sanitize"};
@@ -255,7 +258,7 @@ public void testSanitizeAndDeduplicateRecords() throws Exception {
"--input", input.getAbsolutePath(),
"-O", output.getAbsolutePath()
@@ -287,71 +290,62 @@ public Object[][] getNegativeTestData() {
- public void testValidateOutputParamsByReadGroupMapValid() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsByReadGroup(null, validOutputMap, errors);
+ public void testValidateOutputParamsByReadGroupMapValid() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsByReadGroup(null, validOutputMap.getAbsolutePath());
Assert.assertEquals(errors.size(), 0);
- public void testValidateOutputParamsByReadGroupMissingMap() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsByReadGroup(null, nonExistentOutputMap, errors);
+ public void testValidateOutputParamsByReadGroupMissingMap() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsByReadGroup(null, nonExistentOutputMap.getAbsolutePath());
Assert.assertEquals(errors.size(), 1);
Assert.assertEquals(errors.get(0).contains("Cannot read"), true);
- public void testValidateOutputParamsByReadGroupBadHeaderMap() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsByReadGroup(null, badHeaderOutputMap, errors);
+ public void testValidateOutputParamsByReadGroupBadHeaderMap() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsByReadGroup(null, badHeaderOutputMap.getAbsolutePath());
Assert.assertEquals(errors.size(), 1);
Assert.assertEquals(errors.get(0).contains("Invalid header"), true);
- public void testValidateOutputParamsByReadGroupNoMapOrDir() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsByReadGroup(null, null, errors);
+ public void testValidateOutputParamsByReadGroupNoMapOrDir() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsByReadGroup(null, null);
Assert.assertEquals(errors.size(), 1);
Assert.assertEquals(errors.get(0).contains("Must provide either"), true);
- public void testValidateOutputParamsByReadGroupDirValid() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsByReadGroup(createTempDir("testValidateOutputParamsNotByReadGroupValid"), null, errors);
+ public void testValidateOutputParamsByReadGroupDirValid() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsByReadGroup(createTempDir("testValidateOutputParamsNotByReadGroupValid").getAbsolutePath(), null);
Assert.assertEquals(errors.size(), 0);
- public void testValidateOutputParamsNotByReadGroupValid() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsNotByReadGroup(createTempFile("testValidateOutputParamsNotByReadGroupValid",""), null, errors);
+ public void testValidateOutputParamsNotByReadGroupValid() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsNotByReadGroup(createTempFile("testValidateOutputParamsNotByReadGroupValid","").getAbsolutePath(), null);
Assert.assertEquals(errors.size(), 0);
- public void testValidateOutputParamsNotByReadGroupNoOutput() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsNotByReadGroup(null, null, errors);
+ public void testValidateOutputParamsNotByReadGroupNoOutput() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsNotByReadGroup(null, null);
Assert.assertEquals(errors.size(), 1);
Assert.assertEquals(errors.get(0).contains("output is required"), true);
- public void testValidateOutputParamsNotByReadGroupMap() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsNotByReadGroup(null, validOutputMap, errors);
+ public void testValidateOutputParamsNotByReadGroupMap() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsNotByReadGroup(null, validOutputMap.getAbsolutePath());
Assert.assertEquals(errors.size(), 2);
Assert.assertEquals(errors.get(0).contains("Cannot provide outputMap"), true);
Assert.assertEquals(errors.get(1).contains("output is required"), true);
- public void testValidateOutputParamsNotByReadGroupDir() {
- final List errors = new ArrayList();
- RevertSamSpark.ValidationUtil.validateOutputParamsNotByReadGroup(createTempDir("testValidateOutputParamsNotByReadGroupDir"), null, errors);
+ public void testValidateOutputParamsNotByReadGroupDir() throws IOException {
+ final List errors = RevertSamSpark.validateOutputParamsNotByReadGroup(createTempDir("testValidateOutputParamsNotByReadGroupDir").getAbsolutePath(), null);
Assert.assertEquals(errors.size(), 1);
Assert.assertEquals(errors.get(0).contains("should not be a directory"), true);
@@ -361,12 +355,12 @@ public void testAssertAllReadGroupsMappedSuccess() {
final SAMReadGroupRecord rg1 = new SAMReadGroupRecord("rg1");
final SAMReadGroupRecord rg2 = new SAMReadGroupRecord("rg2");
- final Map outputMap = new HashMap();
- outputMap.put("rg1", new File("/foo/bar/rg1.bam"));
- outputMap.put("rg2", new File("/foo/bar/rg2.bam"));
- RevertSamSpark.ValidationUtil.assertAllReadGroupsMapped(outputMap, Arrays.asList(rg1, rg2));
- RevertSamSpark.ValidationUtil.assertAllReadGroupsMapped(outputMap, Arrays.asList(rg1));
- RevertSamSpark.ValidationUtil.assertAllReadGroupsMapped(outputMap, Arrays.asList(rg2));
+ final Map outputMap = new HashMap<>();
+ outputMap.put("rg1", IOUtils.getPath(new File("/foo/bar/rg1.bam").getAbsolutePath()));
+ outputMap.put("rg2", IOUtils.getPath(new File("/foo/bar/rg2.bam").getAbsolutePath()));
+ RevertSamSpark.assertAllReadGroupsMapped(outputMap, Arrays.asList(rg1, rg2));
+ RevertSamSpark.assertAllReadGroupsMapped(outputMap, Arrays.asList(rg1));
+ RevertSamSpark.assertAllReadGroupsMapped(outputMap, Arrays.asList(rg2));
@Test(expectedExceptions = {GATKException.class})
@@ -375,21 +369,21 @@ public void testAssertAllReadGroupsMappedFailure() {
final SAMReadGroupRecord rg2 = new SAMReadGroupRecord("rg2");
final SAMReadGroupRecord rg3 = new SAMReadGroupRecord("rg3");
- final Map outputMap = new HashMap();
- outputMap.put("rg1", new File("/foo/bar/rg1.bam"));
- outputMap.put("rg2", new File("/foo/bar/rg2.bam"));
- RevertSamSpark.ValidationUtil.assertAllReadGroupsMapped(outputMap, Arrays.asList(rg1, rg2, rg3));
+ final Map outputMap = new HashMap<>();
+ outputMap.put("rg1", IOUtils.getPath(new File("/foo/bar/rg1.bam").getAbsolutePath()));
+ outputMap.put("rg2", IOUtils.getPath(new File("/foo/bar/rg2.bam").getAbsolutePath()));
+ RevertSamSpark.assertAllReadGroupsMapped(outputMap, Arrays.asList(rg1, rg2, rg3));
public void testIsOutputMapHeaderValid() {
- boolean isValid = RevertSamSpark.ValidationUtil.isOutputMapHeaderValid(Arrays.asList("READ_GROUP_ID", "OUTPUT"));
+ boolean isValid = RevertSamSpark.isOutputMapHeaderValid(Arrays.asList("OUTPUT_MAP_READ_GROUP_FIELD_NAME", "OUTPUT_MAP_OUTPUT_FILE_FIELD_NAME"));
Assert.assertEquals(isValid, true);
- isValid = RevertSamSpark.ValidationUtil.isOutputMapHeaderValid(Arrays.asList("OUTPUT"));
+ isValid = RevertSamSpark.isOutputMapHeaderValid(Arrays.asList("OUTPUT_MAP_OUTPUT_FILE_FIELD_NAME"));
Assert.assertEquals(isValid, false);
- isValid = RevertSamSpark.ValidationUtil.isOutputMapHeaderValid(Collections.EMPTY_LIST);
+ isValid = RevertSamSpark.isOutputMapHeaderValid(Collections.EMPTY_LIST);
Assert.assertEquals(isValid, false);
@@ -398,16 +392,16 @@ public void testFilePathsWithoutMapFile() {
final SAMReadGroupRecord rg1 = new SAMReadGroupRecord("rg1");
final SAMReadGroupRecord rg2 = new SAMReadGroupRecord("rg2");
- final Map outputMap = RevertSamSpark.getOutputMap(null, new File("/foo/bar"), ".bam", Arrays.asList(rg1, rg2), true);
- Assert.assertEquals(outputMap.get("rg1"), new File("/foo/bar/rg1.bam"));
- Assert.assertEquals(outputMap.get("rg2"), new File("/foo/bar/rg2.bam"));
+ final Map outputMap = RevertSamSpark.getOutputMap(null, new File("/foo/bar").getAbsolutePath(), ".bam", Arrays.asList(rg1, rg2), true);
+ Assert.assertEquals(outputMap.get("rg1"), IOUtils.getPath(new File("/foo/bar/rg1.bam").getAbsolutePath()));
+ Assert.assertEquals(outputMap.get("rg2"), IOUtils.getPath(new File("/foo/bar/rg2.bam").getAbsolutePath()));
public void testFilePathsWithMapFile() {
- final Map outputMap = RevertSamSpark.getOutputMap(validOutputMap, null, ".bam", Collections.emptyList(), true);
- Assert.assertEquals(outputMap.get("rg1"), new File("/path/to/my_rg_1.ubam"));
- Assert.assertEquals(outputMap.get("rg2"), new File("/path/to/my_rg_2.ubam"));
+ final Map outputMap = RevertSamSpark.getOutputMap(validOutputMap.getAbsolutePath(), null, ".bam", Collections.emptyList(), true);
+ Assert.assertEquals(outputMap.get("rg1"), IOUtils.getPath(new File("/path/to/my_rg_1.ubam").getAbsolutePath()));
+ Assert.assertEquals(outputMap.get("rg2"), IOUtils.getPath(new File("/path/to/my_rg_2.ubam").getAbsolutePath()));
From e7327390f6b292b791a8021d9eeac990a968d64b Mon Sep 17 00:00:00 2001
From: James
Date: Thu, 6 Dec 2018 16:16:58 -0500
Subject: [PATCH 03/15] responding to another round of comments with nothing
more than rowdy backtalk and refusal to respond
.../tools/spark/ | 44 ++--
.../utils/codecs/table/ | 28 ++-
.../runtime/ | 2 +-
.../hellbender/utils/spark/ | 14 +- | 2 +-
.../spark/ | 211 +++++-------------
.../tools/spark/ | 144 ++++++------
.../ | 2 +-
8 files changed, 191 insertions(+), 256 deletions(-)
diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/ b/src/main/java/org/broadinstitute/hellbender/tools/spark/
index c38e8603977..db79dd066a4 100644
--- a/src/main/java/org/broadinstitute/hellbender/tools/spark/
+++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/
@@ -77,16 +77,18 @@
* Note: If the program fails due to a SAM validation error, consider setting the VALIDATION_STRINGENCY option to
* LENIENT or SILENT if the failures are expected to be obviated by the reversion process
- * (e.g. invalid alignment information will be obviated when the dontRemoveAlignmentInformation option is used).
+ * (e.g. invalid alignment information will be obviated when the keepAlignmentInformation option is used).
- summary =RevertSamSpark.USAGE_DETAILS,
- oneLineSummary =RevertSamSpark.USAGE_SUMMARY,
+ summary = RevertSamSpark.USAGE_DETAILS,
+ oneLineSummary = RevertSamSpark.USAGE_SUMMARY,
programGroup = ReadDataManipulationProgramGroup.class)
public class RevertSamSpark extends GATKSparkTool {
+ private static final long serialVersionUID = 1L;
static final String USAGE_SUMMARY = "Reverts SAM or BAM files to a previous state.";
static final String USAGE_DETAILS = "This tool removes or restores certain properties of the SAM records, including alignment " +
"information, which can be used to produce an unmapped BAM (uBAM) from a previously aligned BAM. It is also capable of " +
@@ -115,7 +117,7 @@ public class RevertSamSpark extends GATKSparkTool {
" Output format can be overridden with the outputByReadgroupFileFormat option.\n" +
"Note: If the program fails due to a SAM validation error, consider setting the VALIDATION_STRINGENCY option to " +
"LENIENT or SILENT if the failures are expected to be obviated by the reversion process " +
- "(e.g. invalid alignment information will be obviated when the dontRemoveAlignmentInformation option is used).\n" +
+ "(e.g. invalid alignment information will be obviated when the keepAlignmentInformation option is used).\n" +
public static final String OUTPUT_MAP_READ_GROUP_FIELD_NAME = "READ_GROUP_ID";
public static final String OUTPUT_MAP_OUTPUT_FILE_FIELD_NAME = "OUTPUT";
@@ -167,9 +169,9 @@ public class RevertSamSpark extends GATKSparkTool {
" the output may have the unusual but sometimes desirable trait of having unmapped reads that are marked as duplicates.")
public boolean dontRemoveDuplicateInformation = false;
- public static final String DONT_REMOVE_ALIGNMENT_INFORMATION_LONG_NAME = "remove-alignment-information";
- @Argument(fullName = DONT_REMOVE_ALIGNMENT_INFORMATION_LONG_NAME, doc = "Remove all alignment information from the file.")
- public boolean dontRemoveAlignmentInformation = false;
+ public static final String KEEP_ALIGNMENT_INFORMATION = "keep-alignment-information";
+ @Argument(fullName = KEEP_ALIGNMENT_INFORMATION, doc = "Remove all alignment information from the file.")
+ public boolean keepAlignmentInformation = false;
public static final String ATTRIBUTE_TO_CLEAR_LONG_NAME = "attributes-to-clear";
@Argument(fullName = ATTRIBUTE_TO_CLEAR_LONG_NAME, doc = "When removing alignment information, the set of optional tags to remove.", optional = true)
@@ -196,7 +198,7 @@ public List getDefaultReadFilters() {
return Collections.singletonList(ReadFilterLibrary.ALLOW_ALL_READS);
- public static List DEFAULT_ATTRIBUTES_TO_CLEAR = new ArrayList() {{
+ public static List DEFAULT_ATTRIBUTES_TO_CLEAR = Collections.unmodifiableList(new ArrayList(){{
@@ -205,7 +207,7 @@ public List getDefaultReadFilters() {
add(; // Supplementary alignment metadata
add(; // Mate Cigar
- }};
+ }});
public enum FileType implements CommandLineParser.ClpEnum {
sam("Generate SAM files."),
@@ -234,7 +236,9 @@ protected String[] customCommandLineValidation() {
final List errors = new ArrayList<>();
validateOutputParams(outputByReadGroup, output, outputMap);
- if (!sanitize && keepFirstDuplicate) errors.add("'keepFirstDuplicate' cannot be used without 'sanitize'");
+ if (!sanitize && keepFirstDuplicate) {
+ errors.add("'keepFirstDuplicate' cannot be used without 'sanitize'");
+ }
if (!errors.isEmpty()) {
return errors.toArray(new String[errors.size()]);
@@ -311,11 +315,13 @@ private Map createReadGroupFormatMap( final JavaRDD<
final boolean restoreOriginalQualities) {
final Map output = new HashMap<>();
- inHeader.getValue().getReadGroups().stream().parallel().forEach(rg -> {
+ inHeader.getValue().getReadGroups().stream().forEach(rg -> {
// For each readgroup filter down to just the reads in that group
final String key = rg.getId();
JavaRDD filtered = reads.filter(r -> r.getReadGroup().equals(key));
+ // NOTE: this method has the potential to be expensive as it may end up pulling on the first partition many times, and potentially
+ // end up iterating over the entire genome in the case where there are readgroups missing from the bam
if (!filtered.isEmpty()) {
// take the number of reads required by QualityEncodingDetector to determine quality score map
@@ -352,13 +358,15 @@ public SAMRecord next() {
- * If this is run, we want to be careful to remove duplicated reads from the bam.
+ * If this is run, we want to be careful to remove copied reads from the bam.
* In order to do this we group each read by its readname and randomly select one read labeled as first in pair
- * and one read labled as second in pair to treat as the representative reads, throwing away the rest.
+ * and one read labeled as second in pair to treat as the representative reads, throwing away the rest.
private JavaRDD sanitize(final Map readGroupToFormat, final JavaRDD reads, final SAMFileHeader header, final boolean keepFirstDuplicate) {
- JavaRDD sortedReads = SparkUtils.querynameSortReadsIfNecessary(reads.filter(r -> r.getBases().length == r.getBaseQualities().length), getRecommendedNumReducers(), header);
+ JavaRDD sortedReads = SparkUtils.querynameSortReadsIfNecessary(
+ reads.filter(r -> r.getLength() == r.getBaseQualityCount()),
+ getRecommendedNumReducers(), header);
JavaPairRDD> readsByGroup = spanReadsByKey(sortedReads);
return readsByGroup.flatMap(group -> {
@@ -463,12 +471,12 @@ private Map getReadGroupHeaderMap(SAMFileHeader inHeader,
assertAllReadGroupsMapped(writerMap, inHeader.getReadGroups());
headerMap = new HashMap<>();
for (final SAMReadGroupRecord readGroup : inHeader.getReadGroups()) {
- final SAMFileHeader header = createOutHeader(inHeader, sortOrder, !dontRemoveAlignmentInformation);
+ final SAMFileHeader header = createOutHeader(inHeader, sortOrder, !keepAlignmentInformation);
headerMap.put(readGroup.getId(), header);
} else {
- final SAMFileHeader singleOutHeader = createOutHeader(inHeader, sortOrder, !dontRemoveAlignmentInformation);
+ final SAMFileHeader singleOutHeader = createOutHeader(inHeader, sortOrder, !keepAlignmentInformation);
headerMap = Collections.singletonMap(null, singleOutHeader);
@@ -525,7 +533,7 @@ public JavaRDD revertReads(JavaRDD reads, List attri
reads = -> {r.setIsDuplicate(false); return r;});
- if (!dontRemoveAlignmentInformation) {
+ if (!keepAlignmentInformation) {
reads = -> {
if (rec.isReverseStrand()) {
@@ -640,7 +648,7 @@ static List validateOutputParamsByReadGroup(final String output, final
final FeatureReader parser = AbstractFeatureReader.getFeatureReader(outputMap, new TableCodec(null),false);
if (!isOutputMapHeaderValid((List)parser.getHeader())) {
- errors.add("Invalid header: " + outputMap + ". Must be a tab-separated file with OUTPUT_MAP_READ_GROUP_FIELD_NAME as first column and output as second column.");
+ errors.add("Invalid header: " + outputMap + ". Must be a tab-separated file with +"+OUTPUT_MAP_READ_GROUP_FIELD_NAME+"+ as first column and output as second column.");
return errors;
diff --git a/src/main/java/org/broadinstitute/hellbender/utils/codecs/table/ b/src/main/java/org/broadinstitute/hellbender/utils/codecs/table/
index e896b1a4b15..142cc1a09d9 100644
--- a/src/main/java/org/broadinstitute/hellbender/utils/codecs/table/
+++ b/src/main/java/org/broadinstitute/hellbender/utils/codecs/table/
@@ -2,6 +2,7 @@
import htsjdk.tribble.AsciiFeatureCodec;
import htsjdk.tribble.readers.LineIterator;
+import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.SimpleInterval;
@@ -52,7 +53,6 @@ public final class TableCodec extends AsciiFeatureCodec {
protected static final String COMMENT_DELIMITER = "#";
private final String headerDelimiter;
- private final String commentDelimiter;
protected String delimiter_regex = "\\s+";
@@ -60,16 +60,22 @@ public final class TableCodec extends AsciiFeatureCodec {
private boolean havePassedHeader = false;
- public TableCodec(final String headerLineDelimiter, final String commentLineDelimiter) {
+ /**
+ * Create a TableCodec with a configured header line delimiter
+ *
+ * @param headerLineDelimiter the delimeter for comment header lines, or null if the header is a single commented line-
+ */
+ public TableCodec(final String headerLineDelimiter) {
+ if ( "".equals(headerLineDelimiter) ) {
+ throw new GATKException("HeaderLineDelimiter must either be a valid delimiter or null");
+ }
headerDelimiter = headerLineDelimiter;
- commentDelimiter = commentLineDelimiter;
- }
- public TableCodec(final String headerLineDelimiter) {
- this(headerLineDelimiter, COMMENT_DELIMITER);
+ /**
+ * Create a TableCodec for IGV track data.
+ */
public TableCodec() {
@@ -78,7 +84,7 @@ public TableCodec() {
public TableFeature decode(final String line) {
if ((headerDelimiter != null && ! line.startsWith(headerDelimiter)) ||
(headerDelimiter == null && !havePassedHeader) ||
- line.startsWith(commentDelimiter) || line.startsWith(IGV_HEADER_DELIMITER)) {
+ line.startsWith(COMMENT_DELIMITER) || line.startsWith(IGV_HEADER_DELIMITER)) {
havePassedHeader = true;
return null;
@@ -94,10 +100,10 @@ public List readActualHeader(final LineIterator reader) {
boolean isFirst = true;
while (reader.hasNext()) {
final String line = reader.peek(); // Peek to avoid reading non-header data
- if ( isFirst && ! line.startsWith(commentDelimiter) && headerDelimiter != null && ! line.startsWith(headerDelimiter) ) {
+ if ( isFirst && ! line.startsWith(COMMENT_DELIMITER) && headerDelimiter != null && ! line.startsWith(headerDelimiter) ) {
throw new UserException.MalformedFile("TableCodec file does not have a header");
- isFirst &= line.startsWith(commentDelimiter);
+ isFirst &= line.startsWith(COMMENT_DELIMITER);
if (headerDelimiter == null || line.startsWith(headerDelimiter)) {; // "Commit" the peek
if (!header.isEmpty()) {
@@ -106,7 +112,7 @@ public List readActualHeader(final LineIterator reader) {
final String[] spl = line.split(delimiter_regex);
Collections.addAll(header, spl);
return header;
- } else if (line.startsWith(commentDelimiter)) {
+ } else if (line.startsWith(COMMENT_DELIMITER)) {; // "Commit" the peek
} else {
diff --git a/src/main/java/org/broadinstitute/hellbender/utils/runtime/ b/src/main/java/org/broadinstitute/hellbender/utils/runtime/
index 25ad6ce5ecc..59e89775135 100644
--- a/src/main/java/org/broadinstitute/hellbender/utils/runtime/
+++ b/src/main/java/org/broadinstitute/hellbender/utils/runtime/
@@ -363,7 +363,7 @@ protected void tryCleanShutdown() {
if (process != null) {
- // terminate the app by closing the process' input stream
+ // terminate the app by closing the process' INPUT stream
diff --git a/src/main/java/org/broadinstitute/hellbender/utils/spark/ b/src/main/java/org/broadinstitute/hellbender/utils/spark/
index a914caea5ad..2f527732f32 100644
--- a/src/main/java/org/broadinstitute/hellbender/utils/spark/
+++ b/src/main/java/org/broadinstitute/hellbender/utils/spark/
@@ -20,10 +20,8 @@
import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSink;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.Utils;
import scala.Tuple2;
@@ -242,7 +240,7 @@ public static JavaRDD putReadsWithTheSameNameInTheSamePartition(final
* @return an RDD where each the values for each key are grouped into an iterable collection
public static JavaPairRDD> spanByKey(JavaPairRDD rdd) {
- return rdd.mapPartitionsToPair(SparkUtils::spanningIterator);
+ return rdd.mapPartitionsToPair(SparkUtils::getSpanningIterator);
@@ -252,7 +250,7 @@ public static JavaPairRDD> spanByKey(JavaPairRDD rdd
* @param type of values
* @return an iterator over pairs of keys and grouped values
- public static Iterator>> spanningIterator(Iterator> iterator) {
+ public static Iterator>> getSpanningIterator(Iterator> iterator) {
final PeekingIterator> iter = Iterators.peekingIterator(iterator);
return new AbstractIterator>>() {
@@ -285,13 +283,13 @@ protected Tuple2> computeNext() {
* Sort reads into queryname order if they are not already sorted
- public static JavaRDD querynameSortReadsIfNecessary(JavaRDD reads, int numReducers, SAMFileHeader headerForTool) {
+ public static JavaRDD querynameSortReadsIfNecessary(JavaRDD reads, int numReducers, SAMFileHeader header) {
JavaRDD sortedReadsForMarking;
- if (ReadUtils.isReadNameGroupedBam(headerForTool)) {
+ if (ReadUtils.isReadNameGroupedBam(header)) {
sortedReadsForMarking = reads;
} else {
- headerForTool.setSortOrder(SAMFileHeader.SortOrder.queryname);
- sortedReadsForMarking = sortReadsAccordingToHeader(reads, headerForTool, numReducers);
+ header.setSortOrder(SAMFileHeader.SortOrder.queryname);
+ sortedReadsForMarking = sortReadsAccordingToHeader(reads, header, numReducers);
return sortedReadsForMarking;
diff --git a/src/test/java/org/broadinstitute/hellbender/cmdline/ b/src/test/java/org/broadinstitute/hellbender/cmdline/
index 822d8cf400d..548b9385e7d 100644
--- a/src/test/java/org/broadinstitute/hellbender/cmdline/
+++ b/src/test/java/org/broadinstitute/hellbender/cmdline/
@@ -52,7 +52,7 @@ public void testPicardNormalizeFastaWithBadArgs() throws IOException {
final File outfile = createTempFile("normalized", ".fasta");
// Use GATK-style lower case argument names, which are rejected by Picard
- // because it uses upper cased argument names (--input/--output)
+ // because it uses upper cased argument names (--INPUT/--OUTPUT)
final String[] args = {
"--input", input.getAbsolutePath(),
"--output", outfile.getAbsolutePath(),
diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/ b/src/test/java/org/broadinstitute/hellbender/tools/spark/
index 9b987869a69..dbd6e93a81d 100644
--- a/src/test/java/org/broadinstitute/hellbender/tools/spark/
+++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/
@@ -4,7 +4,9 @@
import htsjdk.samtools.util.CloserUtil;
import org.broadinstitute.hellbender.CommandLineProgramTest;
import org.broadinstitute.hellbender.exceptions.GATKException;
+import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.testutils.ArgumentsBuilder;
+import org.broadinstitute.hellbender.testutils.BaseTest;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
@@ -20,20 +22,9 @@
@Test(groups = "Spark")
public class RevertSamSparkIntegrationTest extends CommandLineProgramTest {
- @Override
- public String getToolTestDataDir() {
- return "src/test/resources/org/broadinstitute/hellbender/tools/spark/RevertSamSpark";
- }
public static List defaultAttributesToClearPlusXT = new ArrayList