Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Genotyping code for the Gnarly Pipeline (gnomAD v3) #4947

Merged
merged 13 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.IndexFeatureFile;
import org.broadinstitute.hellbender.tools.genomicsdb.GenomicsDBConstants;
import org.broadinstitute.hellbender.tools.genomicsdb.GenomicsDBOptions;
import org.broadinstitute.hellbender.utils.IndexUtils;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
Expand All @@ -26,6 +27,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -228,7 +230,7 @@ public FeatureDataSource(final String featurePath, final String name, final int
public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLookaheadBases, final Class<? extends Feature> targetFeatureType,
final int cloudPrefetchBuffer, final int cloudIndexPrefetchBuffer) {
this(featureInput, queryLookaheadBases, targetFeatureType, cloudPrefetchBuffer, cloudIndexPrefetchBuffer,
null);
new GenomicsDBOptions());
}

/**
Expand All @@ -241,19 +243,40 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook
* that produce this type of Feature. May be null, which results in an unrestricted search.
* @param cloudPrefetchBuffer MB size of caching/prefetching wrapper for the data, if on Google Cloud (0 to disable).
* @param cloudIndexPrefetchBuffer MB size of caching/prefetching wrapper for the index, if on Google Cloud (0 to disable).
* @param reference Path to a reference. May be null. Needed only for reading from GenomicsDB.
* @param reference the reference genome corresponding to the data to be read
*/
public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLookaheadBases, final Class<? extends Feature> targetFeatureType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

final int cloudPrefetchBuffer, final int cloudIndexPrefetchBuffer, final Path reference) {
this(featureInput, queryLookaheadBases, targetFeatureType, cloudPrefetchBuffer, cloudIndexPrefetchBuffer,
new GenomicsDBOptions(reference));
}

/**
* Creates a FeatureDataSource backed by the provided FeatureInput. We will look ahead the specified number of bases
* during queries that produce cache misses.
*
* @param featureInput a FeatureInput specifying a source of Features
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
* that produce this type of Feature. May be null, which results in an unrestricted search.
* @param cloudPrefetchBuffer MB size of caching/prefetching wrapper for the data, if on Google Cloud (0 to disable).
* @param cloudIndexPrefetchBuffer MB size of caching/prefetching wrapper for the index, if on Google Cloud (0 to disable).
* @param genomicsDBOptions options and info for reading from a GenomicsDB; may be null
*/
public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLookaheadBases, final Class<? extends Feature> targetFeatureType,
final int cloudPrefetchBuffer, final int cloudIndexPrefetchBuffer, final GenomicsDBOptions genomicsDBOptions) {
Utils.validateArg(queryLookaheadBases >= 0, "Query lookahead bases must be >= 0");
this.featureInput = Utils.nonNull(featureInput, "featureInput must not be null");
if (IOUtils.isGenomicsDBPath(featureInput)) {
Utils.nonNull(genomicsDBOptions, "GenomicsDBOptions must not be null. Calling tool may not read from a GenomicsDB data source.");
}

final Function<SeekableByteChannel, SeekableByteChannel> cloudWrapper = (cloudPrefetchBuffer > 0 ? is -> SeekableByteChannelPrefetcher.addPrefetcher(cloudPrefetchBuffer, is) : Function.identity());
final Function<SeekableByteChannel, SeekableByteChannel> cloudIndexWrapper = (cloudIndexPrefetchBuffer > 0 ? is -> SeekableByteChannelPrefetcher.addPrefetcher(cloudIndexPrefetchBuffer, is) : Function.identity());

// Create a feature reader without requiring an index. We will require one ourselves as soon as
// a query by interval is attempted.
this.featureReader = getFeatureReader(featureInput, targetFeatureType, cloudWrapper, cloudIndexWrapper, reference);
this.featureReader = getFeatureReader(featureInput, targetFeatureType, cloudWrapper, cloudIndexWrapper, genomicsDBOptions);

if (IOUtils.isGenomicsDBPath(featureInput)) {
//genomics db uri's have no associated index file to read from, but they do support random access
Expand Down Expand Up @@ -285,16 +308,17 @@ final void printCacheStats() {
private static <T extends Feature> FeatureReader<T> getFeatureReader(final FeatureInput<T> featureInput, final Class<? extends Feature> targetFeatureType,
final Function<SeekableByteChannel, SeekableByteChannel> cloudWrapper,
final Function<SeekableByteChannel, SeekableByteChannel> cloudIndexWrapper,
final Path reference) {
if (IOUtils.isGenomicsDBPath(featureInput)) {
final GenomicsDBOptions genomicsDBOptions) {
if (IOUtils.isGenomicsDBPath(featureInput.getFeaturePath())) {
Utils.nonNull(genomicsDBOptions);
try {
if (reference == null) {
if (genomicsDBOptions.getReference() == null) {
throw new UserException.MissingReference("You must provide a reference if you want to load from GenomicsDB");
}
try {
final File referenceAsFile = reference.toFile();
return (FeatureReader<T>) getGenomicsDBFeatureReader(featureInput, referenceAsFile);
} catch (final UnsupportedOperationException e) {
final File referenceAsFile = genomicsDBOptions.getReference().toFile();
return (FeatureReader<T>)getGenomicsDBFeatureReader(featureInput, referenceAsFile, genomicsDBOptions);
} catch (final UnsupportedOperationException e){
throw new UserException.BadInput("GenomicsDB requires that the reference be a local file.", e);
}
} catch (final ClassCastException e) {
Expand Down Expand Up @@ -354,7 +378,7 @@ private static <T extends Feature> FeatureReader<T> getFeatureReader(final Featu
}
}

protected static FeatureReader<VariantContext> getGenomicsDBFeatureReader(final GATKPathSpecifier path, final File reference) {
protected static FeatureReader<VariantContext> getGenomicsDBFeatureReader(final GATKPathSpecifier path, final File reference, final GenomicsDBOptions genomicsDBOptions) {
final String workspace = IOUtils.getGenomicsDBAbsolutePath(path) ;
if (workspace == null) {
throw new IllegalArgumentException("Trying to create a GenomicsDBReader from non-GenomicsDB inputpath " + path);
Expand All @@ -370,7 +394,7 @@ protected static FeatureReader<VariantContext> getGenomicsDBFeatureReader(final

try {
final GenomicsDBExportConfiguration.ExportConfiguration exportConfigurationBuilder =
createExportConfiguration(reference, workspace, callsetJson, vidmapJson, vcfHeader);
createExportConfiguration(reference, workspace, callsetJson, vidmapJson, vcfHeader, genomicsDBOptions);
return new GenomicsDBFeatureReader<>(exportConfigurationBuilder, new BCF2Codec(), Optional.empty());
} catch (final IOException e) {
throw new UserException("Couldn't create GenomicsDBFeatureReader", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import org.broadinstitute.hellbender.cmdline.CommandLineProgram;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.genomicsdb.GenomicsDBOptions;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.config.ConfigFactory;
import org.broadinstitute.hellbender.utils.config.GATKConfig;
import org.broadinstitute.hellbender.utils.variant.GATKVCFConstants;

import java.io.File;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -205,7 +207,8 @@ private void initializeFeatureSources( final int featureQueryLookahead, final Co
// Only create a data source for Feature arguments that were actually specified
if ( featureInput != null ) {
final Class<? extends Feature> featureType = getFeatureTypeForFeatureInputField(featureArgument.getKey());
addToFeatureSources(featureQueryLookahead, featureInput, featureType, cloudPrefetchBuffer, cloudIndexPrefetchBuffer, reference);
addToFeatureSources(featureQueryLookahead, featureInput, featureType, cloudPrefetchBuffer, cloudIndexPrefetchBuffer,
toolInstance instanceof VariantWalker ? ((VariantWalker) toolInstance).getGenomicsDBOptions() : null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably avoid this ugly instanceof check by making GATKTool.getGenomicsDBOptions() return null (or new GenomicsDBOptions(referenceArguments.getReferencePath())) instead of throwing an exception, and then having GATKTool.initializeFeatures() (and its overrides) pass the GenomicsDB options in to the FeatureManager constructor, which can then propagate them down here.

Could be done in a separate PR if you don't want to do it here.

}
}
}
Expand All @@ -217,6 +220,13 @@ public void dumpAllFeatureCacheStats() {
}
}

void addToFeatureSources(final int featureQueryLookahead, final FeatureInput<? extends Feature> featureInput,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add javadoc for this overload (can copy from below)

final Class<? extends Feature> featureType, final int cloudPrefetchBuffer,
final int cloudIndexPrefetchBuffer, final Path reference) {
// Create a new FeatureDataSource for this file, and add it to our query pool
featureSources.put(featureInput, new FeatureDataSource<>(featureInput, featureQueryLookahead, featureType, cloudPrefetchBuffer, cloudIndexPrefetchBuffer, new GenomicsDBOptions(reference)));
}

/**
* Add the feature data source to the given feature input.
*
Expand All @@ -225,13 +235,16 @@ public void dumpAllFeatureCacheStats() {
* @param featureType class of features
* @param cloudPrefetchBuffer MB size of caching/prefetching wrapper for the data, if on Google Cloud (0 to disable).
* @param cloudIndexPrefetchBuffer MB size of caching/prefetching wrapper for the index, if on Google Cloud (0 to disable).
* @param genomicsDBOptions options and info for reading from a GenomicsDB
*
* Note: package-visible to enable access from the core walker classes
* (but not actual tools, so it's not protected).
*/
void addToFeatureSources(final int featureQueryLookahead, final FeatureInput<? extends Feature> featureInput, final Class<? extends Feature> featureType, final int cloudPrefetchBuffer, final int cloudIndexPrefetchBuffer, final Path reference) {
void addToFeatureSources(final int featureQueryLookahead, final FeatureInput<? extends Feature> featureInput,
final Class<? extends Feature> featureType, final int cloudPrefetchBuffer,
final int cloudIndexPrefetchBuffer, final GenomicsDBOptions genomicsDBOptions) {
// Create a new FeatureDataSource for this file, and add it to our query pool
featureSources.put(featureInput, new FeatureDataSource<>(featureInput, featureQueryLookahead, featureType, cloudPrefetchBuffer, cloudIndexPrefetchBuffer, reference));
featureSources.put(featureInput, new FeatureDataSource<>(featureInput, featureQueryLookahead, featureType, cloudPrefetchBuffer, cloudIndexPrefetchBuffer, genomicsDBOptions));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.broadinstitute.hellbender.engine.filters.WellformedReadFilter;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.genomicsdb.GenomicsDBOptions;
import org.broadinstitute.hellbender.tools.walkers.annotator.Annotation;
import org.broadinstitute.hellbender.transformers.ReadTransformer;
import org.broadinstitute.hellbender.utils.IntervalUtils;
Expand Down Expand Up @@ -413,6 +414,14 @@ protected List<SimpleInterval> transformTraversalIntervals(final List<SimpleInte
return getIntervals;
}

/**
*
* @return By default, not every GATK tool can read from a GenomicsDB -- child classes can override
*/
protected GenomicsDBOptions getGenomicsDBOptions() {
throw new IllegalArgumentException("This tool does not take a GenomicsDB as a feature input.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have this return either null or a new GenomicsDBOptions(referenceArguments.getReferencePath()) instead of throwing an exception, you could likely get rid of the VariantWalker special-casing in FeatureManager.

}

/**
* Initialize our source of reference data (or set it to null if no reference argument was provided).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ protected final void onStartup() {
protected void initializeDrivingVariants() {
drivingVariantsFeatureInput = new FeatureInput<>(drivingVariantFile, "drivingVariantFile");

//This is the data source for the driving source of variants, which uses a cache lookahead of FEATURE_CACHE_LOOKAHEAD
ldgauthier marked this conversation as resolved.
Show resolved Hide resolved
// Create a FeatureDataSource for the driving variants FeatureInput, using the
// cache lookahead value from getDrivingVariantCacheLookAheadBases()
drivingVariants = new FeatureDataSource<>(drivingVariantsFeatureInput, getDrivingVariantCacheLookAheadBases(), VariantContext.class, cloudPrefetchBuffer, cloudIndexPrefetchBuffer,
referenceArguments.getReferencePath());
getGenomicsDBOptions());

// Also add the driving variants FeatureInput to FeatureManager as well so that it can be queried,
// but use a lookahead value of 0 to avoid caching because of windowed queries that need to "look behind" as well.
features.addToFeatureSources(0, drivingVariantsFeatureInput, VariantContext.class, cloudPrefetchBuffer, cloudIndexPrefetchBuffer,
referenceArguments.getReferencePath());
getGenomicsDBOptions());

// Note: the intervals for the driving variants are set in onStartup()
}
Expand Down Expand Up @@ -88,8 +89,6 @@ public final VCFHeader getHeaderForVariants() {
}

/**
* {@inheritDoc}
*
* Implementation of variant-based traversal.
*
* NOTE: You should only override {@link #traverse()} if you are writing a new walker base class in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.broadinstitute.hellbender.engine.filters.CountingVariantFilter;
import org.broadinstitute.hellbender.engine.filters.VariantFilter;
import org.broadinstitute.hellbender.engine.filters.VariantFilterLibrary;
import org.broadinstitute.hellbender.tools.genomicsdb.GenomicsDBOptions;
import org.broadinstitute.hellbender.transformers.VariantTransformer;
import org.broadinstitute.hellbender.utils.IndexUtils;

Expand Down Expand Up @@ -33,12 +34,23 @@ public abstract class VariantWalkerBase extends WalkerBase {
*/
public static final int DEFAULT_DRIVING_VARIANTS_LOOKAHEAD_BASES = 100_000;

//Various options for reading from a GenomicsDB
protected GenomicsDBOptions genomicsDBOptions;

@Override
public boolean requiresFeatures() { return true; }

@Override
public String getProgressMeterRecordLabel() { return "variants"; }

@Override
protected GenomicsDBOptions getGenomicsDBOptions() {
if (genomicsDBOptions == null) {
genomicsDBOptions = new GenomicsDBOptions(referenceArguments.getReferencePath());
}
return genomicsDBOptions;
}

@Override
void initializeFeatures() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private static void writeVariantsSingle(
final JavaRDD<VariantContext> sortedVariants = sortVariantsToHeader ? sortVariants(variants, header, numReducers) : variants;
final JavaRDD<VariantContext> variantsToSave;
if (writeGvcf) {
GVCFBlockCombiner gvcfBlockCombiner = new GVCFBlockCombiner(gqPartitions, defaultPloidy);
GVCFBlockCombiner gvcfBlockCombiner = new GVCFBlockCombiner(gqPartitions, defaultPloidy, false);
gvcfBlockCombiner.addRangesToHeader(header);
variantsToSave = sortedVariants.mapPartitions((FlatMapFunction<Iterator<VariantContext>, VariantContext>) v -> new GVCFBlockCombiningIterator(v, gqPartitions, defaultPloidy));
} else {
Expand Down
Loading