Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DownsampleVcf tool #974

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 281 additions & 0 deletions src/main/scala/com/fulcrumgenomics/vcf/DownsampleVcf.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package com.fulcrumgenomics.vcf

import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.commons.io.Io
import com.fulcrumgenomics.commons.util.LazyLogging
import com.fulcrumgenomics.fasta.SequenceDictionary
import com.fulcrumgenomics.sopt.{arg, clp}
import com.fulcrumgenomics.util.{Metric, ProgressLogger}
import com.fulcrumgenomics.vcf.api.Allele.NoCallAllele
import com.fulcrumgenomics.vcf.api.{Allele, Genotype, Variant, VcfCount, VcfFieldType, VcfFormatHeader, VcfHeader, VcfSource, VcfWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.vcf.DownsampleVcf.{downsampleAndRegenotype, winnowVariants}

import scala.math.log10
import scala.util.Random

object DownsampleVcf extends LazyLogging {
/** Removes variants that are within a specified distance from a previous variant
* The end position of the current variant is compared with the start position of the following variant
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Removes variants that are within a specified distance from a previous variant
* The end position of the current variant is compared with the start position of the following variant
/** Removes variants that are within a specified distance from a previous variant.
* The end position of the current variant is compared with the start position of the following variant.

*
* @param variants an iterator of the variants to process
* @param windowSize the interval (exclusive) in which to check for additional variants.
* windowSize considers the distance between the end position of a variant
* with the start position of the following variant
* @param dict a sequencing dictionary to get contig ordering
* @return a new iterator of variants with just the variant entries we want to keep
*/
def winnowVariants(variants: Iterator[Variant], windowSize: Int, dict: SequenceDictionary): Iterator[Variant] = {
require(windowSize >= 0, f"the windowSize ($windowSize) is negative")
new Iterator[Variant] {
private val iter = variants.bufferBetter

def hasNext: Boolean = iter.hasNext

def isInOrder(current: Variant, next: Variant, currentIndex: Int, nextIndex: Int): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def isInOrder(current: Variant, next: Variant, currentIndex: Int, nextIndex: Int): Boolean = {
private def isInOrder(current: Variant, next: Variant, currentIndex: Int, nextIndex: Int): Boolean = {

(currentIndex < nextIndex) || (currentIndex == nextIndex && current.end <= next.pos)
}

def next(): Variant = {
val current = iter.next()
val currentIndex = dict(current.chrom).index
iter.dropWhile { next: Variant =>
val nextIndex = dict(next.chrom).index
require(
isInOrder(current, next, currentIndex, nextIndex),
f"variants out of order; ${current.chrom}:${current.pos} > ${next.chrom}:${next.pos}")

currentIndex == nextIndex && next.pos - current.end < windowSize
}
current
}
}
}

/** Downsamples variants using Allele Depths
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Downsamples variants using Allele Depths
/** Downsamples variants by randomly sampling the total allele depths at the given proportion.

*
* @param oldAds an indexed seq of the original allele depths
* @param proportion the proportion to use for downsampling,
* calculated using total base count from the index and a target base count
* @return a new IndexedSeq of allele depths of the same length as `oldAds`
*/
def downsampleADs(oldAds: IndexedSeq[Int], proportion: Double, random: Random): IndexedSeq[Int] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Function parameters should be the most general type possible. You could change this to IterableOnce[Int] and then below add .toIndexedSeq.

require(proportion <= 1, f"proportion must be less than 1: proportion = ${proportion}")
oldAds.map(s => Range(0, s).iterator.map(_ => random.nextDouble()).count(_ < proportion))
}

/**
* Does the downsampling on a Variant
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Does the downsampling on a Variant
* Re-genotypes a variant for each sample after downsampling the allele counts based on the given per-sample proportions.

* @param variant the variant with the genotype to downsample
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param variant the variant with the genotype to downsample
* @param variant the variant to downsample and re-genotype

* @param proportions a map of downsampling target proportions for each sample
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param proportions a map of downsampling target proportions for each sample
* @param proportions proportion to downsample the allele counts for each sample prior to re-genotyping

* @param random random number generator for downsampling
* @param epsilon the error rate for genotyping
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param epsilon the error rate for genotyping
* @param epsilon the sequencing error rate for genotyping

* @return a new variant with updated genotypes
*/
// Returns a new variant that has downsampled ADs, recomputed PLs and updated genotypes
def downsampleAndRegenotype(variant: Variant, proportions: Map[String, Double], random: Random, epsilon: Double = 0.01): Variant = {
try {
variant.copy(genotypes = variant.genotypes.map { case (sample, gt) =>
val proportion = proportions(sample)
sample -> downsampleAndRegenotype(gt = gt, proportion = proportion, random = random, epsilon = epsilon)
})
} catch {
case e: MatchError => throw new Exception(

Check warning on line 83 in src/main/scala/com/fulcrumgenomics/vcf/DownsampleVcf.scala

View check run for this annotation

Codecov / codecov/patch

src/main/scala/com/fulcrumgenomics/vcf/DownsampleVcf.scala#L83

Added line #L83 was not covered by tests
"processing " + variant.id + " at " + variant.chrom + ":" + variant.pos + "-" + variant.end, e
)
}
}

/**
* Does the downsampling on a Genotype
* @param gt the genotype to downsample
* @param proportion the proportion to use for downsampling allele depths
* @param random random number generator for downsampling
* @param epsilon the error rate for genotyping
* @return a new Genotype with updated allele depths, PLs and genotype
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the params based on the suggestions for other functions

*/
def downsampleAndRegenotype(gt: Genotype, proportion: Double, random: Random, epsilon: Double): Genotype = {
val oldAds = gt[IndexedSeq[Int]]("AD")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider a getOrElse[IndexedSeq[Int]]("AD", throw new Exception("AD not found for sample...")) (this is pseudocode)

val newAds = downsampleADs(oldAds, proportion, random)
val Seq(aa, ab, bb) = computePls(newAds)
val Seq(alleleA, alleleB) = gt.alleles.toSeq

val calls = {
if (aa == 0 && ab == 0 && bb == 0) IndexedSeq(NoCallAllele, NoCallAllele)
else if (aa < ab && aa < bb) IndexedSeq(alleleA, alleleA)
else if (bb < ab && bb < aa) IndexedSeq(alleleB, alleleB)
else IndexedSeq(alleleA, alleleB)
}
gt.copy(attrs = Map("PL" -> IndexedSeq(aa, ab, bb), "AD" -> newAds, "DP" -> newAds.sum), calls = calls)
}

/**
* Compute the genotype likelihoods given the allele depths.
* @param ads The allele depths to generate likelihoods from
* @return a list of three likelihoods
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Compute the genotype likelihoods given the allele depths.
* @param ads The allele depths to generate likelihoods from
* @return a list of three likelihoods
* Compute the genotype likelihoods given the allele depths, assuming a diploid genotype (i.e. two allele depths)
* @param ads The input depths for the two alleles A and B.
* @return a list of three likelihoods for the alleles AA, AB, BB.

*/
def computePls(ads: IndexedSeq[Int]): IndexedSeq[Int] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require ads has length two

val likelihoods = Likelihoods(ads(0), ads(1))
IndexedSeq(likelihoods.aa.round.toInt, likelihoods.ab.round.toInt, likelihoods.bb.round.toInt)
}


object Likelihoods {
/** Computes the likelihoods for each possible genotype.
*
* @param alleleDepthA the reference allele depth
* @param alleleDepthB the alternate allele depth
* @param epsilon the error rate for genotyping
* @return a new `Likelihood` that has the likelihoods of AA, AB, and BB
*/
def apply(alleleDepthA: Int, alleleDepthB: Int, epsilon: Double = 0.01): Likelihoods = {
val aGivenAA = log10(1 - epsilon)
val aGivenBB = log10(epsilon)
val aGivenAB = log10((1 - epsilon) / 2)

val rawGlAA = ((alleleDepthA * aGivenAA) + (alleleDepthB * aGivenBB)) * -10
val rawGlBB = ((alleleDepthA * aGivenBB) + (alleleDepthB * aGivenAA)) * -10
val rawGlAB = ((alleleDepthA + alleleDepthB) * aGivenAB) * -10

val minGL = math.min(math.min(rawGlAA, rawGlAB), rawGlBB)

Likelihoods(
aa = rawGlAA - minGL,
ab = rawGlAB - minGL,
bb = rawGlBB - minGL
)
}
}

/** Stores the log10(likelihoods) for all possible bi-allelic genotypes.
*
* @param aa likelihood of AA
* @param ab likelihood of AB
* @param bb likelihood of BB
*/
case class Likelihoods(aa: Double, ab: Double, bb: Double) {
def pls = IndexedSeq(aa.round.toInt, ab.round.toInt, bb.round.toInt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scaladoc please

}
}

@clp(group = ClpGroups.VcfOrBcf, description =
"""
|DownsampleVcf takes a vcf file and metadata with sequencing info and
|1. winnows the vcf to remove variants within a specified distance to each other,
|2. downsamples the variants using the provided allele depths and target base count by
| re-computing/downsampling the allele depths for the new target base count
| and re-computing the genotypes based on the new allele depths
|and writes a new downsampled vcf file.
|For single-sample VCFs, the metadata file can be omitted, and instead you can specify originalBases.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|DownsampleVcf takes a vcf file and metadata with sequencing info and
|1. winnows the vcf to remove variants within a specified distance to each other,
|2. downsamples the variants using the provided allele depths and target base count by
| re-computing/downsampling the allele depths for the new target base count
| and re-computing the genotypes based on the new allele depths
|and writes a new downsampled vcf file.
|For single-sample VCFs, the metadata file can be omitted, and instead you can specify originalBases.
|Re-genotypes a VCF after downsampling the allele counts.
|
|The input VCF must have one or more samples.
|
|If the input VCF contains a single sample, `--proportion` may be given to give a value from 0 to 1 used to downsample the allele depths at each variant site. For
| multi-sample VCFs, the _target number of sequenced bases_ to downsample to as well as a metadata file (described below) that will provide the _original number sequenced bases_. The original number of sequenced bases is not the mean coverage, but the total count of bases observed for the sample. The metadata file contains a header and the following columns per sample:
|... FIXME
|
The tool first winnows the VCF to remove variants within a specified distance to each other (if `--window-size` is greater than zero). Next, each variant is examined independently, using the per-sample genotypes.
| The allele depths per-genotype are randomly downsampled given the proportion, either from `--proportion` for single-sample VCFs, or from the _target number of sequenced bases_ divided by the _original number of sequenced bases_. The downsampled allele depths are then used to re-compute allele likelhoods and producea new genotype.
|TODO: describe the VCF outputs. I.e. which FORMAT/INFO tags are produced, and which are retained?

""")
class DownsampleVcf
(@arg(flag = 'i', doc = "The vcf to downsample.") val input: PathToVcf,
@arg(flag = 'm', doc = "Index file with bases per sample.") val metadata: Option[FilePath] = None,
@arg(flag = 'b', doc = "Original number of bases (for single-sample VCF)") val originalBases: Option[Double] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a --proportion argument here instead, then make downsampleToBases optional for single-sample VCFs. Thoughts?

@arg(flag = 'n', doc = "Target number of bases to downsample to.") val downsampleToBases: Double,
@arg(flag = 'o', doc = "Output file name.") val output: PathToVcf,
@arg(flag = 'w', doc = "Winnowing window size.") val windowSize: Int = 150,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a default of zero here for end-users not to have a footgun?

@arg(flag = 'e', doc = "Error rate for genotyping.") val epsilon: Double = 0.01,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@arg(flag = 'e', doc = "Error rate for genotyping.") val epsilon: Double = 0.01,
@arg(flag = 'e', doc = "Sequencing error rate for genotyping.") val epsilon: Double = 0.01,

@arg(flag = 'c', doc = "True to write out no-calls.") val writeNoCall: Boolean = false)
extends FgBioTool {
Io.assertReadable(input)
Io.assertReadable(metadata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is optional right? Does this work if it's optional?

Io.assertCanWriteFile(output)
require(downsampleToBases > 0, "target base count must be greater than zero")
require(windowSize >= 0, "window size must be greater than or equal to zero")
require(0 <= epsilon && epsilon <= 1, "epsilon/error rate must be between 0 and 1")
originalBases match {
case Some(x) =>
require(x > 0, "originalBases must be greater than zero")
require(metadata.isEmpty, "Must pass either originalBases (for single-sample VCF) or metadata, not both")
case None =>
require(metadata.isDefined, "Must pass either originalBases (for single-sample VCF) or metadata, not both")
}

override def execute(): Unit = {
val vcf = VcfSource(input)
val progress = ProgressLogger(logger, noun = "variants")
val proportions = (
originalBases match {
case Some(x) =>
require(vcf.header.samples.length == 1, "--original-bases requires a single-sample VCF")
LazyList(vcf.header.samples.head -> math.min(downsampleToBases / x, 1.0))
case _ =>
Sample.read(metadata.getOrElse(throw new RuntimeException))
.filter(s => vcf.header.samples.contains(s.SAMPLE_NAME))
.map(sample => sample.SAMPLE_NAME -> math.min(downsampleToBases / sample.BASE_COUNT.toDouble, 1.0))
}
).toMap
proportions.foreach { case (s, p) => logger.info(f"Downsampling $s with proportion ${p}%.4f") }

val winnowed = if (windowSize > 0) winnowVariants(vcf.iterator, windowSize = windowSize, dict = vcf.header.dict) else vcf.iterator
val outputVcf = VcfWriter(path = output, header = buildOutputHeader(vcf.header))

val random = new Random(42)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the seed to the arg-list and have a default of 42?

winnowed.foreach { v =>
val ds = downsampleAndRegenotype(v, proportions = proportions, random = random, epsilon = epsilon)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no spaces between equals to be consistent with the rest of the codebase

if (writeNoCall) {
outputVcf += ds
progress.record(ds)
}
else if (!ds.gts.forall(g => g.isNoCall)) {
outputVcf += ds
progress.record(ds)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (writeNoCall) {
outputVcf += ds
progress.record(ds)
}
else if (!ds.gts.forall(g => g.isNoCall)) {
outputVcf += ds
progress.record(ds)
}
}
if (writeNoCall || !ds.gts.forall(g => g.isNoCall)) {
outputVcf += ds
progress.record(ds)
}
}


progress.logLast()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably nice to log how many reads were read, how many remained after winnowing, and how many were written. Consider three progress loggers?

vcf.safelyClose()
outputVcf.close()
}

def buildOutputHeader(in: VcfHeader): VcfHeader = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

val fmts = Seq.newBuilder[VcfFormatHeader]
fmts ++= in.formats

if (!in.format.contains("AD")) {
fmts += VcfFormatHeader(id="AD", count=VcfCount.OnePerAllele, kind=VcfFieldType.Integer, description="Per allele depths.")

Check warning on line 237 in src/main/scala/com/fulcrumgenomics/vcf/DownsampleVcf.scala

View check run for this annotation

Codecov / codecov/patch

src/main/scala/com/fulcrumgenomics/vcf/DownsampleVcf.scala#L237

Added line #L237 was not covered by tests
}

if (!in.format.contains("DP")) {
fmts += VcfFormatHeader(id="DP", count=VcfCount.Fixed(1), kind=VcfFieldType.Integer, description="Total depth across alleles.")
}

if (!in.format.contains("PL")) {
fmts += VcfFormatHeader(id="PL", count=VcfCount.OnePerGenotype, kind=VcfFieldType.Integer, description="Per genotype phred scaled likelihoods.")

Check warning on line 245 in src/main/scala/com/fulcrumgenomics/vcf/DownsampleVcf.scala

View check run for this annotation

Codecov / codecov/patch

src/main/scala/com/fulcrumgenomics/vcf/DownsampleVcf.scala#L245

Added line #L245 was not covered by tests
}

in.copy(formats = fmts.result())
}
}

object Sample {
/** Load a set of samples from the 1KG metadata file. */
def read(path: FilePath): Seq[Sample] = {
val lines = Io.readLines(path).dropWhile(_.startsWith("##")).map(line => line.dropWhile(_ == '#'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is not described in the public API. Consider making this object and function private

Metric.read[Sample](lines=lines)
}
}

case class Sample(ENA_FILE_PATH: String = ".",
MD5SUM: String = ".",
RUN_ID: String = ".",
STUDY_ID: String = ".",
STUDY_NAME: String = ".",
CENTER_NAME: String = ".",
SUBMISSION_ID: String = ".",
SUBMISSION_DATE: String = ".",
SAMPLE_ID: String = ".",
SAMPLE_NAME: String,
POPULATION: String = ".",
EXPERIMENT_ID: String = ".",
INSTRUMENT_PLATFORM: String = ".",
INSTRUMENT_MODEL: String = ".",
LIBRARY_NAME: String = ".",
RUN_NAME: String = ".",
INSERT_SIZE: String = ".",
LIBRARY_LAYOUT: String = ".",
PAIRED_FASTQ: String = ".",
READ_COUNT: String = ".",
BASE_COUNT: Long,
ANALYSIS_GROUP: String = ".") extends Metric
Loading
Loading