Skip to content

Commit

Permalink
Exposed maximum chunk size in CNV panel of normals. (broadinstitute#4528
Browse files Browse the repository at this point in the history
)
  • Loading branch information
samuelklee authored and cwhelan committed May 25, 2018
1 parent 5834b89 commit b0ad5a0
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 26 deletions.
4 changes: 4 additions & 0 deletions scripts/cnv_wdl/somatic/cnv_somatic_panel_workflow.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ workflow CNVSomaticPanelWorkflow {
Boolean? do_impute_zeros
Float? extreme_outlier_truncation_percentile
Int? number_of_eigensamples
Int? maximum_chunk_size
Int? mem_gb_for_create_read_count_pon

Array[Pair[String, String]] normal_bams_and_bais = zip(normal_bams, normal_bais)
Expand Down Expand Up @@ -128,6 +129,7 @@ workflow CNVSomaticPanelWorkflow {
do_impute_zeros = do_impute_zeros,
extreme_outlier_truncation_percentile = extreme_outlier_truncation_percentile,
number_of_eigensamples = number_of_eigensamples,
maximum_chunk_size = maximum_chunk_size,
annotated_intervals = AnnotateIntervals.annotated_intervals,
gatk4_jar_override = gatk4_jar_override,
gatk_docker = gatk_docker,
Expand All @@ -153,6 +155,7 @@ task CreateReadCountPanelOfNormals {
Boolean? do_impute_zeros
Float? extreme_outlier_truncation_percentile
Int? number_of_eigensamples
Int? maximum_chunk_size
File? annotated_intervals #do not perform explicit GC correction by default
File? gatk4_jar_override

Expand Down Expand Up @@ -180,6 +183,7 @@ task CreateReadCountPanelOfNormals {
--do-impute-zeros ${default="true" do_impute_zeros} \
--extreme-outlier-truncation-percentile ${default="0.1" extreme_outlier_truncation_percentile} \
--number-of-eigensamples ${default="20" number_of_eigensamples} \
--maximum-chunk-size ${default="16777216" maximum_chunk_size} \
${"--annotated-intervals " + annotated_intervals} \
--output ${pon_entity_id}.pon.hdf5
>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.apache.commons.math3.linear.RealMatrix;
import org.apache.logging.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.broadinstitute.barclay.argparser.Advanced;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.BetaFeature;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
Expand All @@ -20,6 +21,7 @@
import org.broadinstitute.hellbender.tools.copynumber.denoising.HDF5SVDReadCountPanelOfNormals;
import org.broadinstitute.hellbender.tools.copynumber.formats.collections.AnnotatedIntervalCollection;
import org.broadinstitute.hellbender.tools.copynumber.formats.collections.SimpleCountCollection;
import org.broadinstitute.hellbender.tools.copynumber.utils.HDF5Utils;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
Expand Down Expand Up @@ -126,6 +128,7 @@ public final class CreateReadCountPanelOfNormals extends SparkCommandLineProgram
public static final String EXTREME_SAMPLE_MEDIAN_PERCENTILE_LONG_NAME = "extreme-sample-median-percentile";
public static final String IMPUTE_ZEROS_LONG_NAME = "do-impute-zeros";
public static final String EXTREME_OUTLIER_TRUNCATION_PERCENTILE_LONG_NAME = "extreme-outlier-truncation-percentile";
public static final String MAXIMUM_CHUNK_SIZE = "maximum-chunk-size";

//default values for filtering
private static final double DEFAULT_MINIMUM_INTERVAL_MEDIAN_PERCENTILE = 10.0;
Expand All @@ -136,6 +139,8 @@ public final class CreateReadCountPanelOfNormals extends SparkCommandLineProgram
private static final double DEFAULT_EXTREME_OUTLIER_TRUNCATION_PERCENTILE = 0.1;

private static final int DEFAULT_NUMBER_OF_EIGENSAMPLES = 20;
private static final int DEFAULT_CHUNK_DIVISOR = 16;
private static final int DEFAULT_MAXIMUM_CHUNK_SIZE = HDF5Utils.MAX_NUMBER_OF_VALUES_PER_HDF5_MATRIX / DEFAULT_CHUNK_DIVISOR;

@Argument(
doc = "Input TSV or HDF5 files containing integer read counts in genomic intervals for all samples in the panel of normals (output of CollectReadCounts). " +
Expand Down Expand Up @@ -232,6 +237,20 @@ public final class CreateReadCountPanelOfNormals extends SparkCommandLineProgram
)
private int numEigensamplesRequested = DEFAULT_NUMBER_OF_EIGENSAMPLES;

@Advanced
@Argument(
doc = "Maximum HDF5 matrix chunk size. Large matrices written to HDF5 are chunked into equally sized " +
"subsets of rows (plus a subset containing the remainder, if necessary) to avoid a hard limit in " +
"Java HDF5 on the number of elements in a matrix. However, since a single row is not allowed to " +
"be split across multiple chunks, the number of columns must be less than the maximum number of " +
"values in each chunk. Decreasing this number will reduce heap usage when writing chunks.",
fullName = MAXIMUM_CHUNK_SIZE,
minValue = 1,
maxValue = HDF5Utils.MAX_NUMBER_OF_VALUES_PER_HDF5_MATRIX,
optional = true
)
private int maximumChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;

@Override
protected void runPipeline(final JavaSparkContext ctx) {
if (!new HDF5Library().load(null)) { //Note: passing null means using the default temp dir.
Expand All @@ -252,6 +271,9 @@ protected void runPipeline(final JavaSparkContext ctx) {
final SimpleCountCollection firstReadCounts = SimpleCountCollection.read(firstReadCountFile);
final SAMSequenceDictionary sequenceDictionary = firstReadCounts.getMetadata().getSequenceDictionary();
final List<SimpleInterval> intervals = firstReadCounts.getIntervals();
Utils.validateArg(firstReadCounts.size() <= maximumChunkSize,
String.format("The number of intervals (%d) in each read-counts file cannot exceed the maximum chunk size (%d).",
firstReadCounts.size(), maximumChunkSize));

//get GC content (null if not provided)
final AnnotatedIntervalCollection annotatedIntervals = CopyNumberArgumentValidationUtils.validateAnnotatedIntervals(
Expand All @@ -269,7 +291,8 @@ protected void runPipeline(final JavaSparkContext ctx) {
HDF5SVDReadCountPanelOfNormals.create(outputPanelOfNormalsFile, getCommandLine(),
sequenceDictionary, readCountMatrix, sampleFilenames, intervals, intervalGCContent,
minimumIntervalMedianPercentile, maximumZerosInSamplePercentage, maximumZerosInIntervalPercentage,
extremeSampleMedianPercentile, doImputeZeros, extremeOutlierTruncationPercentile, numEigensamplesRequested, ctx);
extremeSampleMedianPercentile, doImputeZeros, extremeOutlierTruncationPercentile, numEigensamplesRequested,
maximumChunkSize, ctx);

logger.info("Panel of normals successfully created.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.utils.param.ParamUtils;
import org.broadinstitute.hellbender.utils.spark.SparkConverter;

import java.io.File;
Expand Down Expand Up @@ -237,6 +238,7 @@ public static void create(final File outFile,
final boolean doImputeZeros,
final double extremeOutlierTruncationPercentile,
final int numEigensamplesRequested,
final int maximumChunkSize,
final JavaSparkContext ctx) {
try (final HDF5File file = new HDF5File(outFile, HDF5File.OpenMode.CREATE)) {
logger.info("Creating " + outFile.getAbsolutePath() + "...");
Expand All @@ -253,7 +255,7 @@ public static void create(final File outFile,

logger.info(String.format("Writing original read counts (%d x %d)...",
originalReadCounts.getColumnDimension(), originalReadCounts.getRowDimension()));
pon.writeOriginalReadCountsPath(originalReadCounts);
pon.writeOriginalReadCountsPath(originalReadCounts, maximumChunkSize);

logger.info(String.format("Writing original sample filenames (%d)...", originalSampleFilenames.size()));
pon.writeOriginalSampleFilenames(originalSampleFilenames);
Expand Down Expand Up @@ -332,7 +334,7 @@ public static void create(final File outFile,
pon.writeSingularValues(singularValues);

logger.info(String.format("Writing eigensample vectors (transposed to %d x %d)...", eigensampleVectors[0].length, eigensampleVectors.length));
pon.writeEigensampleVectors(eigensampleVectors);
pon.writeEigensampleVectors(eigensampleVectors, maximumChunkSize);
} catch (final RuntimeException exception) {
//if any exceptions encountered, delete partial output and rethrow
logger.warn(String.format("Exception encountered during creation of panel of normals (%s). Attempting to delete partial output in %s...",
Expand Down Expand Up @@ -361,8 +363,9 @@ private void writeSequenceDictionary(final SAMSequenceDictionary sequenceDiction
file.makeStringArray(SEQUENCE_DICTIONARY_PATH, stringWriter.toString());
}

private void writeOriginalReadCountsPath(final RealMatrix originalReadCounts) {
HDF5Utils.writeChunkedDoubleMatrix(file, ORIGINAL_READ_COUNTS_PATH, originalReadCounts.getData(), CHUNK_DIVISOR);
private void writeOriginalReadCountsPath(final RealMatrix originalReadCounts,
final int maximumChunkSize) {
HDF5Utils.writeChunkedDoubleMatrix(file, ORIGINAL_READ_COUNTS_PATH, originalReadCounts.getData(), maximumChunkSize);
}

private void writeOriginalSampleFilenames(final List<String> originalSampleFilenames) {
Expand Down Expand Up @@ -393,9 +396,9 @@ private void writeSingularValues(final double[] singularValues) {
file.makeDoubleArray(PANEL_SINGULAR_VALUES_PATH, singularValues);
}

private void writeEigensampleVectors(final double[][] eigensampleVectors) {
private void writeEigensampleVectors(final double[][] eigensampleVectors,
final int maximumChunkSize) {
HDF5Utils.writeChunkedDoubleMatrix(file, PANEL_EIGENSAMPLE_VECTORS_PATH,
new Array2DRowRealMatrix(eigensampleVectors, false).transpose().getData(),
CHUNK_DIVISOR);
new Array2DRowRealMatrix(eigensampleVectors, false).transpose().getData(), maximumChunkSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.utils.param.ParamUtils;

import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -134,41 +135,38 @@ public static double[][] readChunkedDoubleMatrix(final HDF5File file,
* Given a large matrix, chunks the matrix into equally sized subsets of rows
* (plus a subset containing the remainder, if necessary) and writes these submatrices to indexed sub-paths
* to avoid a hard limit in Java HDF5 on the number of elements in a matrix given by
* {@code MAX_NUM_VALUES_PER_HDF5_MATRIX}. The number of chunks is determined by {@code chunkDivisor},
* {@code MAX_NUM_VALUES_PER_HDF5_MATRIX}. The number of chunks is determined by {@code maxChunkSize},
* which should be set appropriately for the desired number of columns.
*
* @param chunkDivisor The maximum number of values in each chunk
* is given by {@code MAX_NUM_VALUES_PER_HDF5_MATRIX} / {@code chunkDivisor},
* so increasing this number will reduce heap usage when writing chunks,
* which requires subarrays to be copied. However, since a single row is not allowed
* to be split across multiple chunks, the number of columns must be less
* than the maximum number of values in each chunk. For example,
* {@code chunkDivisor} = 16 allows for 16777215 columns.
* @param maxChunkSize The maximum number of values in each chunk. Decreasing this number will reduce
* heap usage when writing chunks, which requires subarrays to be copied.
* However, since a single row is not allowed to be split across multiple chunks,
* the number of columns must be less than the maximum number of values in each chunk.
*/
public static void writeChunkedDoubleMatrix(final HDF5File file,
final String path,
final double[][] matrix,
final int chunkDivisor) {
final int maxChunkSize) {
Utils.nonNull(file);
IOUtils.canReadFile(file.getFile());
Utils.nonNull(path);
Utils.nonNull(matrix);
Utils.validateArg(chunkDivisor > 0, "Chunk divisor must be positive.");
final int maxNumValuesPerChunk = MAX_NUMBER_OF_VALUES_PER_HDF5_MATRIX / chunkDivisor;
ParamUtils.inRange(maxChunkSize, 1 , MAX_NUMBER_OF_VALUES_PER_HDF5_MATRIX,
String.format("Maximum chunk size must be in [1, %d].", MAX_NUMBER_OF_VALUES_PER_HDF5_MATRIX));
final long numRows = matrix.length;
Utils.validateArg(numRows > 0, "Matrix must contain at least one row.");
final long numColumns = matrix[0].length;
Utils.validateArg(numColumns > 0, "Matrix must contain at least one column.");
Utils.validateArg(numColumns <= maxNumValuesPerChunk,
Utils.validateArg(numColumns <= maxChunkSize,
String.format("Number of columns (%d) exceeds the maximum number of values allowed per chunk (%d).",
numColumns, maxNumValuesPerChunk));
numColumns, maxChunkSize));

final int numRowsPerFilledChunk = (int) (maxNumValuesPerChunk / numColumns);
final int numRowsPerFilledChunk = (int) (maxChunkSize / numColumns);
final int numFilledChunks = numRowsPerFilledChunk == 0 ? 0 : (int) numRows / numRowsPerFilledChunk;
final boolean needPartialChunk = numFilledChunks == 0 || numRows % numRowsPerFilledChunk != 0;

logger.debug("Number of values in matrix / maximum number allowed for HDF5 matrix: " + (double) numRows * numColumns / MAX_NUMBER_OF_VALUES_PER_HDF5_MATRIX);
logger.debug("Maximum number of values per chunk: " + maxNumValuesPerChunk);
logger.debug("Maximum number of values per chunk: " + maxChunkSize);
logger.debug("Number of filled chunks: " + numFilledChunks);
logger.debug("Number of rows per filled chunk: " + numRowsPerFilledChunk);
logger.debug("Partial chunk needed: " + needPartialChunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
public final class HDF5UtilsUnitTest extends GATKBaseTest {
private static final double DOUBLE_MATRIX_TOLERANCE = 1E-4;
private static final int CHUNK_DIVISOR = 256;
private static final int MAX_CHUNK_SIZE = HDF5Utils.MAX_NUMBER_OF_VALUES_PER_HDF5_MATRIX / CHUNK_DIVISOR;

@DataProvider(name = "testCreateLargeMatrixData")
public Object[][] dataProvider() {
return new Object[][] {
new Object[] {100, 100000},
new Object[] {CHUNK_DIVISOR + 16, (Integer.MAX_VALUE / Byte.SIZE) / CHUNK_DIVISOR},
new Object[] {(Integer.MAX_VALUE / Byte.SIZE) / CHUNK_DIVISOR, CHUNK_DIVISOR + 16}
new Object[] {CHUNK_DIVISOR + 16, MAX_CHUNK_SIZE},
new Object[] {MAX_CHUNK_SIZE, CHUNK_DIVISOR + 16}
};
}

Expand All @@ -40,7 +41,7 @@ public void testCreateLargeMatrix(final int numRows,
final RealMatrix largeMatrix = createMatrixOfGaussianValues(numRows, numColumns, mean, sigma);
final File tempOutputHD5 = IOUtils.createTempFile("large-matrix-", ".hd5");
try (final HDF5File hdf5File = new HDF5File(tempOutputHD5, HDF5File.OpenMode.CREATE)) {
HDF5Utils.writeChunkedDoubleMatrix(hdf5File, matrixPath, largeMatrix.getData(), CHUNK_DIVISOR);
HDF5Utils.writeChunkedDoubleMatrix(hdf5File, matrixPath, largeMatrix.getData(), MAX_CHUNK_SIZE);
}

try (final HDF5File hdf5FileForReading = new HDF5File(tempOutputHD5, HDF5File.OpenMode.READ_ONLY)) {
Expand Down

0 comments on commit b0ad5a0

Please sign in to comment.