Skip to content

Commit

Permalink
Refactor GVCFWriter to allow push/pull iteration.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbergelson authored and tomwhite committed Nov 2, 2018
1 parent 39206f8 commit 1489950
Show file tree
Hide file tree
Showing 10 changed files with 469 additions and 255 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package org.broadinstitute.hellbender.utils.downsampling;

import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.iterators.PushPullTransformer;

import java.util.Collection;
import java.util.List;

/**
* The basic downsampler API, with no reads-specific operations.
*/
public abstract class Downsampler<T> {
public abstract class Downsampler<T> implements PushPullTransformer<T> {

/**
* Number of items discarded by this downsampler since the last call to resetStats()
Expand All @@ -22,6 +23,7 @@ public abstract class Downsampler<T> {
*
* @param item the individual item to submit to the downsampler for consideration
*/
@Override
public abstract void submit( final T item );

/**
Expand All @@ -30,7 +32,8 @@ public abstract class Downsampler<T> {
*
* @param items the collection of items to submit to the downsampler for consideration
*/
public void submit( final Collection<T> items ) {
@Override
public void submit(final Collection<T> items) {
Utils.nonNull(items, "submitted items must not be null");

for ( final T item : items ) {
Expand All @@ -43,13 +46,15 @@ public void submit( final Collection<T> items ) {
*
* @return true if this downsampler has > 0 finalized items, otherwise false
*/
@Override
public abstract boolean hasFinalizedItems();

/**
* Return (and *remove*) all items that have survived downsampling and are waiting to be retrieved.
*
* @return a list of all finalized items this downsampler contains, or an empty list if there are none
*/
@Override
public abstract List<T> consumeFinalizedItems();

/**
Expand Down Expand Up @@ -112,6 +117,7 @@ protected void incrementNumberOfDiscardedItems( final int newlyDiscardedItems )
* Used to tell the downsampler that no more items will be submitted to it, and that it should
* finalize any pending items.
*/
@Override
public abstract void signalEndOfInput();

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,91 +1,23 @@
package org.broadinstitute.hellbender.utils.downsampling;

import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.iterators.PushToPullIterator;
import org.broadinstitute.hellbender.utils.read.GATKRead;

import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* Iterator wrapper around our generic {@link ReadsDownsampler)} interface. Wraps an iterator of reads,
* and downsamples the reads from that iterator using the provided downsampler.
*
* Converts the push-style {@link ReadsDownsampler)} interface to a pull model.
*/
public final class ReadsDownsamplingIterator implements Iterator<GATKRead>, Iterable<GATKRead> {

private final Iterator<GATKRead> nestedReadIterator;
private final ReadsDownsampler downsampler;
private Iterator<GATKRead> cachedDownsampledReads = null;
private GATKRead nextRead = null;
public final class ReadsDownsamplingIterator extends PushToPullIterator<GATKRead> {

/**
* @param iter wrapped iterator from which this iterator will pull reads to be downsampled
* @param iter wrapped iterator from which this iterator will pull reads to be downsampled
* @param downsampler downsampler through which the reads from the wrapped iterator will be fed
*/
public ReadsDownsamplingIterator( Iterator<GATKRead> iter, ReadsDownsampler downsampler ) {
Utils.nonNull(iter, "iterator must not be null");
Utils.nonNull(downsampler, "downsampler must not be null");

this.nestedReadIterator = iter;
this.downsampler = downsampler;

advanceToNextRead();
}

@Override
public boolean hasNext() {
return nextRead != null;
}

@Override
public GATKRead next() {
if ( nextRead == null ) {
throw new NoSuchElementException("next() called when there are no more items");
}

final GATKRead toReturn = nextRead;
advanceToNextRead();

return toReturn;
}

private void advanceToNextRead() {
if ( readyToReleaseReads() || fillDownsampledReadsCache() ) {
nextRead = cachedDownsampledReads.next();
}
else {
nextRead = null;
}
}

private boolean readyToReleaseReads() {
return cachedDownsampledReads != null && cachedDownsampledReads.hasNext();
}

private boolean fillDownsampledReadsCache() {
while ( nestedReadIterator.hasNext() && ! downsampler.hasFinalizedItems() ) {
downsampler.submit(nestedReadIterator.next());
}

if ( ! nestedReadIterator.hasNext() ) {
downsampler.signalEndOfInput();
}

final Collection<GATKRead> downsampledReads = downsampler.consumeFinalizedItems();
cachedDownsampledReads = downsampledReads.iterator();

return cachedDownsampledReads.hasNext();
}

@Override
public void remove() {
throw new UnsupportedOperationException("Cannot remove records via a ReadsDownsamplingIterator");
}

@Override
public Iterator<GATKRead> iterator() {
return this;
public ReadsDownsamplingIterator(Iterator<GATKRead> iter, ReadsDownsampler downsampler) {
super(iter, downsampler);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.broadinstitute.hellbender.utils.iterators;

import org.broadinstitute.hellbender.utils.Utils;

import java.util.Collection;
import java.util.List;

/**
* A class that receives a stream of elements and transforms or filters them in some way, such as by downsampling with
* a {@link org.broadinstitute.hellbender.utils.downsampling.Downsampler}. Elements are submitted in a push-style model,
* in contrast to Java's pull-style {@link java.util.Iterator}. A transformer may be used to transform an iterator of
* elements using {@link PushToPullIterator}.
*
* @param <T> type of items to be submitted
* @see PushToPullIterator
* @see org.broadinstitute.hellbender.utils.downsampling.Downsampler
*/
public interface PushPullTransformer<T> {
/**
* Submit one item to the transformer for consideration. Some transformers will be able to determine
* immediately whether the item survives the transformation process, while others will need to see
* more items before making that determination.
*
* @param item the individual item to submit to the transformer for consideration
*/
void submit(T item);

/**
* Are there items that have survived the transformation process waiting to be retrieved?
*
* @return true if this transformer has > 0 finalized items, otherwise false
*/
boolean hasFinalizedItems();

/**
* Return (and *remove*) all items that have survived transformation and are waiting to be retrieved.
*
* @return a list of all finalized items this transformer contains, or an empty list if there are none
*/
List<T> consumeFinalizedItems();

/**
* Used to tell the transformer that no more items will be submitted to it, and that it should
* finalize any pending items.
*/
void signalEndOfInput();

default void submit(final Collection<T> items) {
Utils.nonNull(items, "submitted items must not be null");
items.forEach(this::submit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.broadinstitute.hellbender.utils.iterators;

import org.broadinstitute.hellbender.utils.Utils;

import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* Iterator wrapper around our generic {@link PushPullTransformer)} interface. Wraps an iterator of input elements,
* and transforms the reads from that iterator using the provided transformer.
*
* Converts the push-style {@link PushPullTransformer)} interface to a pull model.
*
* @param <T> type of items to be iterated over
*/
public class PushToPullIterator<T> implements Iterator<T>, Iterable<T> {

private final Iterator<T> inputElements;
private final PushPullTransformer<T> transformer;
private Iterator<T> cachedElements = null;
private T nextElement = null;

/**
* @param inputElements wrapped iterator from which this iterator will pull elements
* @param transformer transformer through which the reads from the wrapped iterator will be fed
*/
public PushToPullIterator(Iterator<T> inputElements, PushPullTransformer<T> transformer ) {
Utils.nonNull(inputElements, "iterator must not be null");
Utils.nonNull(transformer, "transformer must not be null");

this.inputElements = inputElements;
this.transformer = transformer;

advanceToNextElement();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public T next() {
if ( nextElement == null ) {
throw new NoSuchElementException("next() called when there are no more items");
}

final T toReturn = nextElement;
advanceToNextElement();

return toReturn;
}

private void advanceToNextElement() {
if ( readyToReleaseReads() || fillCache() ) {
nextElement = cachedElements.next();
}
else {
nextElement = null;
}
}

private boolean readyToReleaseReads() {
return cachedElements != null && cachedElements.hasNext();
}

private boolean fillCache() {
while ( inputElements.hasNext() && ! transformer.hasFinalizedItems() ) {
transformer.submit(inputElements.next());
}

if ( ! inputElements.hasNext() ) {
transformer.signalEndOfInput();
}

final Collection<T> transformedElements = transformer.consumeFinalizedItems();
cachedElements = transformedElements.iterator();

return cachedElements.hasNext();
}

@Override
public void remove() {
throw new UnsupportedOperationException("Cannot remove records via a Push");
}

@Override
public Iterator<T> iterator() {
return this;
}
}
Loading

0 comments on commit 1489950

Please sign in to comment.