From 73586246ff1e5bc16e41fc0b04de396c13b59115 Mon Sep 17 00:00:00 2001 From: Trevor Martin <60452953+trevormartinj7@users.noreply.github.com> Date: Mon, 26 Aug 2024 20:41:21 -0600 Subject: [PATCH] Read Alignment Parallelization (#98) (#480) * Initial parallization work * Initial parallization work * Fleshed out process function, added tracking for manager dictionary * lots of debugging of the process function * parallelization achieved * Improved boundary function * Removing prints * removing old code * Failed cache generator function * Created single thread seq_cache generator * initial functioning parallelization * adding data to variantCache and N_ constants * more edits * replacing old aln stats * Fixed return values * changing output file * fixed boundary error * Adding more tracking of timings * optimized stat tracking considerably * removing imports and unneccesary checks * more logging * Unbalancing the processes to allow for better locking interactions * Adding amplicon name to plotting * fixing stat tracking * removed old code * pin numpy * Pin versions of numpy and matplotlib in CI environment (#84) * D3-Enhancements (#78) * Sam/try plots (#71) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Point test to try-plots * Fix d3 not showing and plotly mixing with matplotlib * Use logger for warnings and debug statements * Point tests back at master --------- * Sam/fix plots (#72) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Fix d3 not showing and plotly mixing with matplotlib --------- * Remove token from integration tests file * Provide sgRNA_sequences to plot_nucleotide_quilt plots * Passing sgRNA_sequences to plot * Refactor check for determining when to use CRISPREssoPro or matplotlib for Batch plots * Add max-height to Batch report samples * Change testing branch * Fix wrong check for large Batch plots * Fix typo and move flexiguide to debug (#77) * Change flexiguide output to debug level * Fix typo in fastp merged output file name * Adding id tags for d3 script enhancements * pointing to test branch * Add amplicon_name parameter to allele heatmap and line plots * Add function to extract quantification window regions from include_idxs * Scale the quantification window according to the coordinates of the sgRNA plot * added c2pro check, added space in args.json * Correct the quantification window indexes for multiple guides * Fix name of nucleotide conversion plot when guides are not the same * Fix jinja variables that aren't found * Fix multiple guide errors where the wrong sgRNA sequence was associated in d3 plot * Remove unneeded variable and extra whitespace * Switch test branch to master --------- * old changes * removed some superfluous code * removed some superfluous code * reverting to old flow if one process * Enriched aln_stats with aligned read_length * fixed declaration bug * Fixing stats tracking for irregular reads * fixed stat tracking by multiplying variant count * adding some function explanations, cleaning up code, optimizing lock updating * Cleaned up code, potentially breaks tests * Revert "Cleaned up code, potentially breaks tests" This reverts commit e7cfe3d239f6fdfedafa26913363ddcc98c81721. * Revert "adding some function explanations, cleaning up code, optimizing lock updating" This reverts commit 96deee25b1af7e29c40ac360295c30509ede83ae. * replaced test branch, reverted to working version, added .upate() inside lock * checking old cython branch * reverting breaking changes * pointing back at my test branch, unreverting non breaking changes * timing log change * pin numpy * Pin versions of numpy and matplotlib in CI environment (#84) * pointing to correct test branch * removed debug statement * memory tracking * Replaced seq_cache with variantCache for improved memory * removing print statements * commenting out memory tracking * Removing one extra loop to improve processing time * optimizing info, removing old functions and print statements * removing empty string handling * removed empty string, reworded comments * removing uneccesary files * Literally minding my p's and q's * Reset .c files * Editing function comments, renaming managerCache, removing debug print statement * Initial attempt at creating a temp_variant file for memory optimization * update * reading and writing to tsv to save on memory * Merged in memory optimization branch * Updating pytests for equal boundary changes * Refactored write out fastq, improved tests * Replace zcat (#94) * D3-Enhancements (#78) * Sam/try plots (#71) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Point test to try-plots * Fix d3 not showing and plotly mixing with matplotlib * Use logger for warnings and debug statements * Point tests back at master --------- * Sam/fix plots (#72) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Fix d3 not showing and plotly mixing with matplotlib --------- * Remove token from integration tests file * Provide sgRNA_sequences to plot_nucleotide_quilt plots * Passing sgRNA_sequences to plot * Refactor check for determining when to use CRISPREssoPro or matplotlib for Batch plots * Add max-height to Batch report samples * Change testing branch * Fix wrong check for large Batch plots * Fix typo and move flexiguide to debug (#77) * Change flexiguide output to debug level * Fix typo in fastp merged output file name * Adding id tags for d3 script enhancements * pointing to test branch * Add amplicon_name parameter to allele heatmap and line plots * Add function to extract quantification window regions from include_idxs * Scale the quantification window according to the coordinates of the sgRNA plot * added c2pro check, added space in args.json * Correct the quantification window indexes for multiple guides * Fix name of nucleotide conversion plot when guides are not the same * Fix jinja variables that aren't found * Fix multiple guide errors where the wrong sgRNA sequence was associated in d3 plot * Remove unneeded variable and extra whitespace * Switch test branch to master --------- * Replace zcat with gunzip -c in `get_most_frequent_reads` --------- * Limiting plotting processes * Fix CRISPRessoAggregate bug and other improvements (#95) * D3-Enhancements (#78) * Sam/try plots (#71) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Point test to try-plots * Fix d3 not showing and plotly mixing with matplotlib * Use logger for warnings and debug statements * Point tests back at master --------- * Sam/fix plots (#72) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Fix d3 not showing and plotly mixing with matplotlib --------- * Remove token from integration tests file * Provide sgRNA_sequences to plot_nucleotide_quilt plots * Passing sgRNA_sequences to plot * Refactor check for determining when to use CRISPREssoPro or matplotlib for Batch plots * Add max-height to Batch report samples * Change testing branch * Fix wrong check for large Batch plots * Fix typo and move flexiguide to debug (#77) * Change flexiguide output to debug level * Fix typo in fastp merged output file name * Adding id tags for d3 script enhancements * pointing to test branch * Add amplicon_name parameter to allele heatmap and line plots * Add function to extract quantification window regions from include_idxs * Scale the quantification window according to the coordinates of the sgRNA plot * added c2pro check, added space in args.json * Correct the quantification window indexes for multiple guides * Fix name of nucleotide conversion plot when guides are not the same * Fix jinja variables that aren't found * Fix multiple guide errors where the wrong sgRNA sequence was associated in d3 plot * Remove unneeded variable and extra whitespace * Switch test branch to master --------- * Add amplicon_name to plot functions * Add sgRNA sequences to nucleotide quilt parameters in Aggregate * Add custom_colors to Aggregate plot functions * Update Aggregate and make_aggregate_report to have logger and tool * Write command_used to Aggregate .json info file * Point to new test branch and add Aggregate run * Make the order of Aggregate runs explicit * Sort all instances of crispresso2_folder_info in Aggregate * Sort df_summary_quantification df in Aggregate * Try sorting with a list of single column * Update to correct test branch * Move to master test branch --------- * Add BAM test cases to Github Actions integration tests * Add "Run" to step names * Point to trevor/fastq-testing test branch * Add in Aggregate test case to Github Actions * parallelizing process_single_fastq_write_bam_out function * wip parallelization for process_bam function * Fixing write out bug * Functional bam input parallelized function * cleaned up code, removed old versions of functions * trying again * Remove duplicate c2_tests * Remove commented out copy c2_tests * Updating file access to close on error, removing unnecesary bam appendings * Add `percent_complete`to info statements (#102) * Fix CRISPRessoAggregate bug and other improvements (#95) (#470) * D3-Enhancements (#78) * Sam/try plots (#71) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Point test to try-plots * Fix d3 not showing and plotly mixing with matplotlib * Use logger for warnings and debug statements * Point tests back at master --------- * Sam/fix plots (#72) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Fix d3 not showing and plotly mixing with matplotlib --------- * Remove token from integration tests file * Provide sgRNA_sequences to plot_nucleotide_quilt plots * Passing sgRNA_sequences to plot * Refactor check for determining when to use CRISPREssoPro or matplotlib for Batch plots * Add max-height to Batch report samples * Change testing branch * Fix wrong check for large Batch plots * Fix typo and move flexiguide to debug (#77) * Change flexiguide output to debug level * Fix typo in fastp merged output file name * Adding id tags for d3 script enhancements * pointing to test branch * Add amplicon_name parameter to allele heatmap and line plots * Add function to extract quantification window regions from include_idxs * Scale the quantification window according to the coordinates of the sgRNA plot * added c2pro check, added space in args.json * Correct the quantification window indexes for multiple guides * Fix name of nucleotide conversion plot when guides are not the same * Fix jinja variables that aren't found * Fix multiple guide errors where the wrong sgRNA sequence was associated in d3 plot * Remove unneeded variable and extra whitespace * Switch test branch to master --------- * Add amplicon_name to plot functions * Add sgRNA sequences to nucleotide quilt parameters in Aggregate * Add custom_colors to Aggregate plot functions * Update Aggregate and make_aggregate_report to have logger and tool * Write command_used to Aggregate .json info file * Point to new test branch and add Aggregate run * Make the order of Aggregate runs explicit * Sort all instances of crispresso2_folder_info in Aggregate * Sort df_summary_quantification df in Aggregate * Try sorting with a list of single column * Update to correct test branch * Move to master test branch --------- * Display percentages in the CLI output (#88) (#473) * D3-Enhancements (#78) * Sam/try plots (#71) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Point test to try-plots * Fix d3 not showing and plotly mixing with matplotlib * Use logger for warnings and debug statements * Point tests back at master --------- * Sam/fix plots (#72) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Fix d3 not showing and plotly mixing with matplotlib --------- * Remove token from integration tests file * Provide sgRNA_sequences to plot_nucleotide_quilt plots * Passing sgRNA_sequences to plot * Refactor check for determining when to use CRISPREssoPro or matplotlib for Batch plots * Add max-height to Batch report samples * Change testing branch * Fix wrong check for large Batch plots * Fix typo and move flexiguide to debug (#77) * Change flexiguide output to debug level * Fix typo in fastp merged output file name * Adding id tags for d3 script enhancements * pointing to test branch * Add amplicon_name parameter to allele heatmap and line plots * Add function to extract quantification window regions from include_idxs * Scale the quantification window according to the coordinates of the sgRNA plot * added c2pro check, added space in args.json * Correct the quantification window indexes for multiple guides * Fix name of nucleotide conversion plot when guides are not the same * Fix jinja variables that aren't found * Fix multiple guide errors where the wrong sgRNA sequence was associated in d3 plot * Remove unneeded variable and extra whitespace * Switch test branch to master --------- * Display percentages in the CLI output --------- * No pool (#79) (#474) * Sam/try plots (#71) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Point test to try-plots * Fix d3 not showing and plotly mixing with matplotlib * Use logger for warnings and debug statements * Point tests back at master --------- * Sam/fix plots (#72) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Fix d3 not showing and plotly mixing with matplotlib --------- * Remove token from integration tests file * Passing sgRNA sequences to regular and Batch D3 plots (#73) * Sam/try plots (#71) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Point test to try-plots * Fix d3 not showing and plotly mixing with matplotlib * Use logger for warnings and debug statements * Point tests back at master --------- * Sam/fix plots (#72) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Fix d3 not showing and plotly mixing with matplotlib --------- * Remove token from integration tests file * Provide sgRNA_sequences to plot_nucleotide_quilt plots * Passing sgRNA_sequences to plot * Refactor check for determining when to use CRISPREssoPro or matplotlib for Batch plots * Add max-height to Batch report samples * Change testing branch * Fix wrong check for large Batch plots * Update integration_tests.yml to point back at master --------- * Push new releases to ECR (#74) * Create aws_ecr.yml (#1) * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * us-east-1 * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Update aws_ecr.yml * Pass div id for plotly * Remove debug * Don't use thread pool with 1 process * Fix logger issue * Catchup * Remove extra print statements * Restrict generation of multiprocessing pool to when n_processes > 1 * Switch test branch to version bump * Fix variable name error * Change test branch back to master * Fix CRISPRessoAggregate bug and other improvements (#95) * D3-Enhancements (#78) * Sam/try plots (#71) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Point test to try-plots * Fix d3 not showing and plotly mixing with matplotlib * Use logger for warnings and debug statements * Point tests back at master --------- * Sam/fix plots (#72) * Fix batch mode pandas warning. (#70) * refactor to call method on DataFrame, rather than Series. Removes warning. * Fix pandas future warning in CRISPRessoWGS --------- * Functional * Cole/fix status file name (#69) * Update config file logging messages This removes printing the exception (which is essentially a duplicate), and adds a condition if no config file was provided. Also changes `json` to `config` so that it is more clear. * Fix divide by zero when no amplicons are present in Batch mode * Don't append file_prefix to status file name * Place status files in output directories * Update tests branch for file_prefix addition * Load D3 and plotly figures with pro with multiple amplicons * Update batch * Fix bug in CRISPRessoCompare with pointing to report datas with file_prefix Before this fix, when using a file_prefix the second run that was compared would not be displayed as a data in the first figure of the report. * Import CRISPRessoPro instead of importing the version When installed via conda, the version is not available * Remove `get_amplicon_output` unused function from CRISPRessoCompare Also remove unused argparse import * Implement `get_matching_allele_files` in CRISPRessoCompare and accompanying unit tests * Allow for matching of multiple guides in the same amplicon * Fix pandas FutureWarning * Change test branch back to master --------- * Try catch all futures * Fix test fail plots * Fix d3 not showing and plotly mixing with matplotlib --------- * Remove token from integration tests file * Provide sgRNA_sequences to plot_nucleotide_quilt plots * Passing sgRNA_sequences to plot * Refactor check for determining when to use CRISPREssoPro or matplotlib for Batch plots * Add max-height to Batch report samples * Change testing branch * Fix wrong check for large Batch plots * Fix typo and move flexiguide to debug (#77) * Change flexiguide output to debug level * Fix typo in fastp merged output file name * Adding id tags for d3 script enhancements * pointing to test branch * Add amplicon_name parameter to allele heatmap and line plots * Add function to extract quantification window regions from include_idxs * Scale the quantification window according to the coordinates of the sgRNA plot * added c2pro check, added space in args.json * Correct the quantification window indexes for multiple guides * Fix name of nucleotide conversion plot when guides are not the same * Fix jinja variables that aren't found * Fix multiple guide errors where the wrong sgRNA sequence was associated in d3 plot * Remove unneeded variable and extra whitespace * Switch test branch to master --------- * Add amplicon_name to plot functions * Add sgRNA sequences to nucleotide quilt parameters in Aggregate * Add custom_colors to Aggregate plot functions * Update Aggregate and make_aggregate_report to have logger and tool * Write command_used to Aggregate .json info file * Point to new test branch and add Aggregate run * Make the order of Aggregate runs explicit * Sort all instances of crispresso2_folder_info in Aggregate * Sort df_summary_quantification df in Aggregate * Try sorting with a list of single column * Update to correct test branch * Move to master test branch --------- --------- * Add percent_complete to subprocess alignment * Remove extraneous spaces * Added more percent_complete statements to info blocks --------- * Updated functions with newly merged progress percent from Cole * Minor code cleanup, removal of comments and print statements etc * Pointing test branch back at CRISPResso2_tests master branch * Added checking for failed processes causing not all reads to be counted --------- Co-authored-by: mbowcut2 <55161542+mbowcut2@users.noreply.github.com> Co-authored-by: Cole Lyman Co-authored-by: Samuel Nichols --- .github/workflows/integration_tests.yml | 18 +- CRISPResso2/CRISPRessoCORE.py | 937 ++++++++++++++---------- tests/unit_tests/test_CRISPRessoCORE.py | 74 ++ 3 files changed, 629 insertions(+), 400 deletions(-) diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index ff91670b..64f8f516 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -41,8 +41,7 @@ jobs: CACHE_NUMBER: 0 with: path: /usr/share/miniconda/envs/test_env - key: - conda-${{ runner.os }}--${{ runner.arch }}--${{ steps.get-date.outputs.today }}-${{ env.CACHE_NUMBER }}-${{ hashFiles('.github/envs/test_env.yml') }} + key: conda-${{ runner.os }}--${{ runner.arch }}--${{ steps.get-date.outputs.today }}-${{ env.CACHE_NUMBER }}-${{ hashFiles('.github/envs/test_env.yml') }} - name: Update Conda Env run: | @@ -80,6 +79,21 @@ jobs: run: | make prime-editor test print + - name: Run BAM Input + if: success() || failure() + run: | + make bam test print + + - name: Run BAM Output + if: success() || failure() + run: | + make bam-out test print + + - name: Run BAM Genome Output + if: success() || failure() + run: | + make bam-out-genome test print + - name: Run Batch if: success() || failure() run: | diff --git a/CRISPResso2/CRISPRessoCORE.py b/CRISPResso2/CRISPRessoCORE.py index 0699c109..e4bc0458 100644 --- a/CRISPResso2/CRISPRessoCORE.py +++ b/CRISPResso2/CRISPRessoCORE.py @@ -25,6 +25,7 @@ import re import subprocess as sb import traceback +from multiprocessing import Process from CRISPResso2 import CRISPRessoCOREResources @@ -441,9 +442,84 @@ def get_new_variant_object(args, fastq_seq, refs, ref_names, aln_matrix, pe_scaf return new_variant -def process_fastq(fastq_filename, variantCache, ref_names, refs, args): + + + +def get_variant_cache_equal_boundaries(num_unique_sequences, n_processes): + """Determines the boundaries for the number of unique sequences to be processed by each process + Parameters + ---------- + num_unique_sequences: the number of unique sequences to be processed + n_processes: the number of processes to be used + Returns + ---------- + boundaries: a list of n+1 integer indexes, where n is the number of processes. + """ + + if num_unique_sequences < n_processes: + raise Exception("The number of unique sequences is less than the number of processes. Please reduce the number of processes.") + + boundaries = [0] + # Determine roughly equal segment size for each process + segment_size = num_unique_sequences // n_processes + + for i in range(n_processes - 1): + boundaries.append(boundaries[-1] + segment_size) + boundaries.append(num_unique_sequences) + return boundaries + + +def variant_file_generator_process(seq_list, get_new_variant_object, args, refs, ref_names, aln_matrix, pe_scaffold_dna_info, process_id, variants_dir): + """the target of the multiprocessing.Process object, generates the new variants for a subset of the reads in the fastq file and stores them in tsv files + Parameters + ---------- + seq_list: list of reads to process + get_new_variant_object: function to generate the new variant object + args: CRISPResso2 args + refs: dict with info for all refs + ref_names: list of ref names + aln_matrix: alignment matrix for needleman wunsch + pe_scaffold_dna_info: tuple of( + index of location in ref to find scaffold seq if it exists + shortest dna sequence to identify scaffold sequence + ) + process_id: the id of the process to print out debug information + variants_dir: the directory to store the tsv files + Returns + ---------- + Nothing + + """ + def numpy_encoder(obj): + """ Custom encoding for numpy arrays and other non-serializable types """ + if isinstance(obj, np.ndarray): + return obj.tolist() # Convert numpy arrays to lists + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") + + variant_file_path = os.path.join(variants_dir, f"variants_{process_id}.tsv") + + variant_lines = "" + with open(variant_file_path, 'w') as file: + file.truncate() # Ensures tsv file is empty before writing to it + for index, fastq_seq in enumerate(seq_list): + new_variant = get_new_variant_object(args, fastq_seq, refs, ref_names, aln_matrix, pe_scaffold_dna_info) + # Convert the complex object to a JSON string + json_string = json.dumps(new_variant, default=numpy_encoder) + variant_lines += f"{fastq_seq}\t{json_string}\n" + if index % 10000 == 0 and index != 0: + info(f"Process {process_id + 1} has processed {index} unique reads", {'percent_complete': 10}) + file.write(variant_lines) + variant_lines = "" + file.write(variant_lines) + + info(f"Process {process_id + 1} has finished processing {index} unique reads", {'percent_complete': 10}) + + +def process_fastq(fastq_filename, variantCache, ref_names, refs, args, files_to_remove, output_directory, fastq_write_out=False): """process_fastq processes each of the reads contained in a fastq file, given a cache of pre-computed variants - fastqIn: name of fastq (e.g. output of fastp) + Parameters + ---------- + fastq_filename: name of fastq (e.g. output of fastp) This file can be gzipped or plain text variantCache: dict with keys: sequence @@ -514,101 +590,206 @@ def process_fastq(fastq_filename, variantCache, ref_names, refs, args): -allelic varaints if two variants are known to exist """ - N_TOT_READS = 0 - N_CACHED_ALN = 0 # read was found in cache - N_CACHED_NOTALN = 0 #read was found in 'not aligned' cache - N_COMPUTED_ALN = 0 # not in cache, aligned to at least 1 sequence with min cutoff - N_COMPUTED_NOTALN = 0 #not in cache, not aligned to any sequence with min cutoff - - N_GLOBAL_SUBS = 0 #number of substitutions across all reads - indicator of sequencing quality - N_SUBS_OUTSIDE_WINDOW = 0 - N_MODS_IN_WINDOW = 0 #number of modifications found inside the quantification window - N_MODS_OUTSIDE_WINDOW = 0 #number of modifications found outside the quantification window - N_READS_IRREGULAR_ENDS = 0 #number of reads with modifications at the 0 or -1 position - READ_LENGTH = 0 - aln_matrix_loc = os.path.join(_ROOT, args.needleman_wunsch_aln_matrix_loc) CRISPRessoShared.check_file(aln_matrix_loc) aln_matrix = CRISPResso2Align.read_matrix(aln_matrix_loc) - pe_scaffold_dna_info = (0, None) #scaffold start loc, scaffold seq to search if args.prime_editing_pegRNA_scaffold_seq != "" and args.prime_editing_pegRNA_extension_seq != "": pe_scaffold_dna_info = get_pe_scaffold_search(refs['Prime-edited']['sequence'], args.prime_editing_pegRNA_extension_seq, args.prime_editing_pegRNA_scaffold_seq, args.prime_editing_pegRNA_scaffold_min_match_length) - not_aln = {} #cache for reads that don't align + if fastq_write_out: + not_aligned_variants = {} + + if fastq_filename.endswith('.gz'): - fastq_handle = gzip.open(fastq_filename, 'rt') + fastq_input_opener = lambda x: gzip.open(x, 'rt') else: - fastq_handle=open(fastq_filename) - - while(fastq_handle.readline()): - #read through fastq in sets of 4 - fastq_seq = fastq_handle.readline().strip() - fastq_plus = fastq_handle.readline() - fastq_qual = fastq_handle.readline() - - if (N_TOT_READS % 10000 == 0): - info("Processing reads; N_TOT_READS: %d N_COMPUTED_ALN: %d N_CACHED_ALN: %d N_COMPUTED_NOTALN: %d N_CACHED_NOTALN: %d"%(N_TOT_READS, N_COMPUTED_ALN, N_CACHED_ALN, N_COMPUTED_NOTALN, N_CACHED_NOTALN)) - - N_TOT_READS+=1 - #if the sequence has been seen and can't be aligned, skip it - if (fastq_seq in not_aln): - N_CACHED_NOTALN += 1 - continue - #if the sequence is already associated with a variant in the variant cache, pull it out - if (fastq_seq in variantCache): - N_CACHED_ALN+=1 - variantCache[fastq_seq]['count'] += 1 - match_name = "variant_" + variantCache[fastq_seq]['best_match_name'] - N_GLOBAL_SUBS += variantCache[fastq_seq][match_name]['substitution_n'] + variantCache[fastq_seq][match_name]['substitutions_outside_window'] - N_SUBS_OUTSIDE_WINDOW += variantCache[fastq_seq][match_name]['substitutions_outside_window'] - N_MODS_IN_WINDOW += variantCache[fastq_seq][match_name]['mods_in_window'] - N_MODS_OUTSIDE_WINDOW += variantCache[fastq_seq][match_name]['mods_outside_window'] - if variantCache[fastq_seq][match_name]['irregular_ends']: - N_READS_IRREGULAR_ENDS += 1 - - #otherwise, create a new variant object, and put it in the cache - else: - new_variant = get_new_variant_object(args, fastq_seq, refs, ref_names, aln_matrix, pe_scaffold_dna_info) - if new_variant['best_match_score'] <= 0: - N_COMPUTED_NOTALN+=1 - not_aln[fastq_seq] = 1 - else: - N_COMPUTED_ALN+=1 - variantCache[fastq_seq] = new_variant - match_name = "variant_" + new_variant['best_match_name'] - if READ_LENGTH == 0: - READ_LENGTH = len(new_variant[match_name]['aln_seq']) - N_GLOBAL_SUBS += new_variant[match_name]['substitution_n'] + new_variant[match_name]['substitutions_outside_window'] - N_SUBS_OUTSIDE_WINDOW += new_variant[match_name]['substitutions_outside_window'] - N_MODS_IN_WINDOW += new_variant[match_name]['mods_in_window'] - N_MODS_OUTSIDE_WINDOW += new_variant[match_name]['mods_outside_window'] - if new_variant[match_name]['irregular_ends']: - N_READS_IRREGULAR_ENDS += 1 + fastq_input_opener = open + + with fastq_input_opener(fastq_filename) as fastq_handle: + + # Reading through the fastq file and enriching variantCache as a dictionary with the following: + # Key: the unique DNA sequence from the fastq file + # Value: an integer that represents how many times we've seen this specific read + num_reads = 0 + fastq_id = fastq_handle.readline() + while(fastq_id): + if num_reads % 50000 == 0 and num_reads != 0: + info("Iterating over fastq file to identify reads; %d reads identified."%(num_reads)) + #read through fastq in sets of 4 + fastq_seq = fastq_handle.readline().strip() + fastq_plus = fastq_handle.readline().strip() + fastq_qual = fastq_handle.readline() + if fastq_seq in variantCache: + # if the read has already been seen, we increment its value by 1 to track number of copies + variantCache[fastq_seq] += 1 + # If the sequence is not in the cache, we create it and set its value to 1 + elif fastq_seq not in variantCache: + variantCache[fastq_seq] = 1 + fastq_id = fastq_handle.readline() + num_reads += 1 + + num_unique_reads = len(variantCache.keys()) + info("Finished reading fastq file; %d unique reads found of %d total reads found "%(num_unique_reads, num_reads)) + + n_processes = 1 + if args.n_processes == "max": + n_processes = CRISPRessoMultiProcessing.get_max_processes() + elif args.n_processes.isdigit(): + n_processes = int(args.n_processes) + N_TOT_READS = 0 + N_CACHED_ALN = 0 # number of copies of all aligned reads + N_CACHED_NOTALN = 0 # number of copies of all non-aligned reads + N_COMPUTED_ALN = 0 # number of unique reads aligned to at least 1 sequence with min cutoff + N_COMPUTED_NOTALN = 0 # number of unique reads not aligned to any sequence with min cutoff + N_GLOBAL_SUBS = 0 # number of substitutions across all reads - indicator of sequencing quality + N_SUBS_OUTSIDE_WINDOW = 0 + N_MODS_IN_WINDOW = 0 # number of modifications found inside the quantification window + N_MODS_OUTSIDE_WINDOW = 0 # number of modifications found outside the quantification window + N_READS_IRREGULAR_ENDS = 0 # number of reads with modifications at the 0 or -1 position + READ_LENGTH = 0 + unaligned_reads = [] + + if n_processes > 1 and num_unique_reads > n_processes: + boundaries = get_variant_cache_equal_boundaries(num_unique_reads, n_processes) + + processes = [] # list to hold the processes so we can wait for them to complete with join() + + info("Spinning up %d parallel processes to analyze unique reads..."%(n_processes)) + # We create n_processes, sending each a weighted sublist of the sequences for variant generation + variants_dir = output_directory + for i in range(n_processes): + left_sublist_index = boundaries[i] + right_sublist_index = boundaries[i+1] + process = Process( + target=variant_file_generator_process, + args=( + (list(variantCache.keys())[left_sublist_index:right_sublist_index]), + get_new_variant_object, + args, + refs, + ref_names, + aln_matrix, + pe_scaffold_dna_info, + i, + variants_dir + ) + ) + process.start() + processes.append(process) + for p in processes: + p.join() # pauses the main thread until the processes are finished + info("Finished processing unique reads, now generating statistics...", {'percent_complete': 15}) + if os.path.exists(variants_dir): + variant_file_list = [] + for n_processes in range(n_processes): + variant_file_list.append(os.path.join(variants_dir, f"variants_{n_processes}.tsv")) + # List all files in the directory + for file_path in variant_file_list: + # Ensure the file is a .tsv before processing + if file_path.endswith(".tsv"): + try: + with open(file_path, 'r') as file: + for index, line in enumerate(file): + # Each line contains a sequence followed by a JSON string + parts = line.strip().split('\t') + if len(parts) == 2: + seq = parts[0] + json_data = parts[1] + variant_dict = json.loads(json_data) + else: + error(f"Error splitting line: {line}") + variant_count = variantCache[seq] + N_TOT_READS += variant_count + variant = variant_dict + variant['count'] = variant_count + if variant['best_match_score'] <= 0: + N_COMPUTED_NOTALN += 1 + N_CACHED_NOTALN += (variant_count - 1) + if fastq_write_out: + not_aligned_variants[seq] = variant + # remove the unaligned reads from the cache + unaligned_reads.append(seq) + elif variant['best_match_score'] > 0: + variantCache[seq] = variant + N_COMPUTED_ALN += 1 + N_CACHED_ALN += (variant_count - 1) + match_name = "variant_" + variant['best_match_name'] + if READ_LENGTH == 0: + READ_LENGTH = len(variant[match_name]['aln_seq']) + N_GLOBAL_SUBS += (variant[match_name]['substitution_n'] + variant[match_name]['substitutions_outside_window']) * variant_count + N_SUBS_OUTSIDE_WINDOW += variant[match_name]['substitutions_outside_window'] * variant_count + N_MODS_IN_WINDOW += variant[match_name]['mods_in_window'] * variant_count + N_MODS_OUTSIDE_WINDOW += variant[match_name]['mods_outside_window'] * variant_count + if variant[match_name]['irregular_ends']: + N_READS_IRREGULAR_ENDS += variant_count + if (index % 50000 == 0 and index > 0): + info("Calculating statistics; %d completed out of %d unique reads"%(index, num_unique_reads)) + except FileNotFoundError: + raise CRISPRessoShared.OutputFolderIncompleteException(f"Could not find generated variants file, try deleting output folder, checking input files, and rerunning CRISPResso") + files_to_remove.append(file_path) + else: + raise CRISPRessoShared.OutputFolderIncompleteException(f"Could not find output folder, try deleting output folder and rerunning CRISPResso") - fastq_handle.close() + if N_COMPUTED_ALN + N_COMPUTED_NOTALN != num_unique_reads: + raise CRISPRessoShared.OutputFolderIncompleteException(f"Number of unique reads processed by parallel processes does not match the number of unique reads found in the fastq file. Try rerunning CRISPResso.") + else: + for index, fastq_seq in enumerate(variantCache.keys()): + variant_count = variantCache[fastq_seq] + N_TOT_READS += variant_count + variant = get_new_variant_object(args, fastq_seq, refs, ref_names, aln_matrix, pe_scaffold_dna_info) + variant['count'] = variant_count + if variant['best_match_score'] <= 0: + N_COMPUTED_NOTALN += 1 + N_CACHED_NOTALN += (variant_count - 1) + unaligned_reads.append(fastq_seq) + if fastq_write_out: + not_aligned_variants[fastq_seq] = variant + elif variant['best_match_score'] > 0: + variantCache[fastq_seq] = variant + N_COMPUTED_ALN += 1 + N_CACHED_ALN += (variant_count - 1) + match_name = "variant_" + variant['best_match_name'] + if READ_LENGTH == 0: + READ_LENGTH = len(variant[match_name]['aln_seq']) + N_GLOBAL_SUBS += (variant[match_name]['substitution_n'] + variant[match_name]['substitutions_outside_window']) * variant_count + N_SUBS_OUTSIDE_WINDOW += variant[match_name]['substitutions_outside_window'] * variant_count + N_MODS_IN_WINDOW += variant[match_name]['mods_in_window'] * variant_count + N_MODS_OUTSIDE_WINDOW += variant[match_name]['mods_outside_window'] * variant_count + if variant[match_name]['irregular_ends']: + N_READS_IRREGULAR_ENDS += variant_count + if (index % 10000 == 0): + info("Processing Reads; %d Completed out of %d Unique Reads"%(index, num_unique_reads)) + + # This deletes non-aligned reads from variantCache + for seq in unaligned_reads: + del variantCache[seq] info("Finished reads; N_TOT_READS: %d N_COMPUTED_ALN: %d N_CACHED_ALN: %d N_COMPUTED_NOTALN: %d N_CACHED_NOTALN: %d"%(N_TOT_READS, N_COMPUTED_ALN, N_CACHED_ALN, N_COMPUTED_NOTALN, N_CACHED_NOTALN)) aln_stats = {"N_TOT_READS" : N_TOT_READS, - "N_CACHED_ALN" : N_CACHED_ALN, - "N_CACHED_NOTALN" : N_CACHED_NOTALN, - "N_COMPUTED_ALN" : N_COMPUTED_ALN, - "N_COMPUTED_NOTALN" : N_COMPUTED_NOTALN, - "N_GLOBAL_SUBS": N_GLOBAL_SUBS, - "N_SUBS_OUTSIDE_WINDOW": N_SUBS_OUTSIDE_WINDOW, - "N_MODS_IN_WINDOW": N_MODS_IN_WINDOW, - "N_MODS_OUTSIDE_WINDOW": N_MODS_OUTSIDE_WINDOW, - "N_READS_IRREGULAR_ENDS": N_READS_IRREGULAR_ENDS, - "READ_LENGTH": READ_LENGTH - } - return(aln_stats) + "N_CACHED_ALN" : N_CACHED_ALN, + "N_CACHED_NOTALN" : N_CACHED_NOTALN, + "N_COMPUTED_ALN" : N_COMPUTED_ALN, + "N_COMPUTED_NOTALN" : N_COMPUTED_NOTALN, + "N_GLOBAL_SUBS": N_GLOBAL_SUBS, + "N_SUBS_OUTSIDE_WINDOW": N_SUBS_OUTSIDE_WINDOW, + "N_MODS_IN_WINDOW": N_MODS_IN_WINDOW, + "N_MODS_OUTSIDE_WINDOW": N_MODS_OUTSIDE_WINDOW, + "N_READS_IRREGULAR_ENDS": N_READS_IRREGULAR_ENDS, + "READ_LENGTH": READ_LENGTH + } + if fastq_write_out: + return(aln_stats, not_aligned_variants) + else: + return(aln_stats) -def process_bam(bam_filename, bam_chr_loc, output_bam, variantCache, ref_names, refs, args): +def process_bam(bam_filename, bam_chr_loc, output_bam, variantCache, ref_names, refs, args, files_to_remove, output_directory): """ process_bam processes each of the reads contained in a bam file, given a cache of pre-computed variants + like process_fastq, it also spins up parallel processes to analyze unique reads if more than one has been specified bam_filename: name of bam (e.g. output of bowtie2) bam_chr_loc: positions in the input bam to read from output_bam: name out bam to write to (includes crispresso alignment info) @@ -617,21 +798,9 @@ def process_bam(bam_filename, bam_chr_loc, output_bam, variantCache, ref_names, ref_names: list of reference names refs: dictionary of sequences name>ref object args: crispresso2 args + files_to_remove: list of files to remove + output_directory: directory to store tsv files """ - - N_TOT_READS = 0 - N_CACHED_ALN = 0 # read was found in cache - N_CACHED_NOTALN = 0 #read was found in 'not aligned' cache - N_COMPUTED_ALN = 0 # not in cache, aligned to at least 1 sequence with min cutoff - N_COMPUTED_NOTALN = 0 #not in cache, not aligned to any sequence with min cutoff - - N_GLOBAL_SUBS = 0 #number of substitutions across all reads - indicator of sequencing quality - N_SUBS_OUTSIDE_WINDOW = 0 - N_MODS_IN_WINDOW = 0 #number of modifications found inside the quantification window - N_MODS_OUTSIDE_WINDOW = 0 #number of modifications found outside the quantification window - N_READS_IRREGULAR_ENDS = 0 #number of reads with modifications at the 0 or -1 position - READ_LENGTH = 0 - aln_matrix_loc = os.path.join(_ROOT, args.needleman_wunsch_aln_matrix_loc) CRISPRessoShared.check_file(aln_matrix_loc) aln_matrix = CRISPResso2Align.read_matrix(aln_matrix_loc) @@ -640,8 +809,6 @@ def process_bam(bam_filename, bam_chr_loc, output_bam, variantCache, ref_names, if args.prime_editing_pegRNA_scaffold_seq != "" and args.prime_editing_pegRNA_extension_seq != "": pe_scaffold_dna_info = get_pe_scaffold_search(refs['Prime-edited']['sequence'], args.prime_editing_pegRNA_extension_seq, args.prime_editing_pegRNA_scaffold_seq, args.prime_editing_pegRNA_scaffold_min_match_length) - not_aln = {} #cache for reads that don't align - output_sam = output_bam+".sam" with open(output_sam, "w") as sam_out: #first, write header to sam @@ -651,62 +818,182 @@ def process_bam(bam_filename, bam_chr_loc, output_bam, variantCache, ref_names, crispresso_cmd_to_write = ' '.join(sys.argv) sam_out.write('@PG\tID:crispresso2\tPN:crispresso2\tVN:'+CRISPRessoShared.__version__+'\tCL:"'+crispresso_cmd_to_write+'"\n') - if bam_chr_loc != "": proc = sb.Popen(['samtools', 'view', bam_filename, bam_chr_loc], stdout=sb.PIPE, encoding='utf-8') else: proc = sb.Popen(['samtools', 'view', bam_filename], stdout=sb.PIPE, encoding='utf-8') + num_reads = 0 + + # Reading through the bam file and enriching variantCache as a dictionary with the following: + # Key: the unique DNA sequence from the fastq file + # Value: an integer that represents how many times we've seen this specific read for sam_line in proc.stdout: + if num_reads % 50000 == 0 and num_reads != 0: + info("Iterating over sam file to identify reads; %d reads identified."%(num_reads)) sam_line_els = sam_line.rstrip().split("\t") fastq_seq = sam_line_els[9] - - if (N_TOT_READS % 10000 == 0): - info("Processing reads; N_TOT_READS: %d N_COMPUTED_ALN: %d N_CACHED_ALN: %d N_COMPUTED_NOTALN: %d N_CACHED_NOTALN: %d"%(N_TOT_READS, N_COMPUTED_ALN, N_CACHED_ALN, N_COMPUTED_NOTALN, N_CACHED_NOTALN)) - - N_TOT_READS+=1 - - #if the sequence has been seen and can't be aligned, skip it - if (fastq_seq in not_aln): - N_CACHED_NOTALN += 1 - sam_line_els.append(not_aln[fastq_seq]) #Crispresso2 alignment: NA - sam_out.write("\t".join(sam_line_els)+"\n") - continue - #if the sequence is already associated with a variant in the variant cache, pull it out - if (fastq_seq in variantCache): - N_CACHED_ALN+=1 - variantCache[fastq_seq]['count'] += 1 - match_name = "variant_" + variantCache[fastq_seq]['best_match_name'] - N_GLOBAL_SUBS += variantCache[fastq_seq][match_name]['substitution_n'] + variantCache[fastq_seq][match_name]['substitutions_outside_window'] - N_SUBS_OUTSIDE_WINDOW += variantCache[fastq_seq][match_name]['substitutions_outside_window'] - N_MODS_IN_WINDOW += variantCache[fastq_seq][match_name]['mods_in_window'] - N_MODS_OUTSIDE_WINDOW += variantCache[fastq_seq][match_name]['mods_outside_window'] - if variantCache[fastq_seq][match_name]['irregular_ends']: - N_READS_IRREGULAR_ENDS += 1 - #sam_line_els[5] = variantCache[fastq_seq]['sam_cigar'] - sam_line_els.append(variantCache[fastq_seq]['crispresso2_annotation']) - sam_out.write("\t".join(sam_line_els)+"\n") - - #otherwise, create a new variant object, and put it in the cache + if fastq_seq in variantCache: + # if the read has already been seen, we increment its value by 1 to track number of copies + variantCache[fastq_seq] += 1 + # If the sequence is not in the cache, we create it and set its value to 1 else: + variantCache[fastq_seq] = 1 + num_reads += 1 + num_unique_reads = len(variantCache.keys()) + info("Finished reading bam file; %d unique reads found of %d total reads found "%(num_unique_reads, num_reads)) + n_processes = 1 + if args.n_processes == "max": + n_processes = CRISPRessoMultiProcessing.get_max_processes() + elif args.n_processes.isdigit(): + n_processes = int(args.n_processes) + + N_TOT_READS = 0 + N_CACHED_ALN = 0 # number of copies of all aligned reads + N_CACHED_NOTALN = 0 # number of copies of all non-aligned reads + N_COMPUTED_ALN = 0 # number of unique reads aligned to at least 1 sequence with min cutoff + N_COMPUTED_NOTALN = 0 # number of unique reads not aligned to any sequence with min cutoff + N_GLOBAL_SUBS = 0 # number of substitutions across all reads - indicator of sequencing quality + N_SUBS_OUTSIDE_WINDOW = 0 + N_MODS_IN_WINDOW = 0 # number of modifications found inside the quantification window + N_MODS_OUTSIDE_WINDOW = 0 # number of modifications found outside the quantification window + N_READS_IRREGULAR_ENDS = 0 # number of reads with modifications at the 0 or -1 position + READ_LENGTH = 0 + not_aln = {} + + if n_processes > 1 and num_unique_reads > n_processes: + boundaries = get_variant_cache_equal_boundaries(num_unique_reads, n_processes) + processes = [] # list to hold the processes so we can wait for them to complete with join() + variants_dir = output_directory + + info("Spinning up %d parallel processes to analyze unique reads..."%(n_processes)) + for i in range(n_processes): + left_sublist_index = boundaries[i] + right_sublist_index = boundaries[i+1] + process = Process( + target=variant_file_generator_process, + args=( + (list(variantCache.keys())[left_sublist_index:right_sublist_index]), + get_new_variant_object, + args, + refs, + ref_names, + aln_matrix, + pe_scaffold_dna_info, + i, + variants_dir + ) + ) + process.start() + processes.append(process) + for p in processes: + p.join() # pauses the main thread until the processes are finished + + info("Finished processing unique reads, now generating statistics...", {'percent_complete': 15}) + variant_file_list = [] + if os.path.exists(variants_dir): + for n_processes in range(n_processes): + variant_file_list.append(os.path.join(variants_dir, f"variants_{n_processes}.tsv")) + for file_path in variant_file_list: + # Ensure the file is a .tsv before processing + if file_path.endswith(".tsv"): + try: + with open(file_path, 'r') as file: + for index, line in enumerate(file): + # Each line contains a sequence followed by a JSON string + parts = line.strip().split('\t') + if len(parts) == 2: + seq = parts[0] + json_data = parts[1] + new_variant = json.loads(json_data) + else: + error(f"Error splitting line: {line}") + variant_count = variantCache[seq] + new_variant['count'] = variant_count + N_TOT_READS += variant_count + if new_variant['best_match_score'] <= 0: + N_COMPUTED_NOTALN+=1 + N_CACHED_NOTALN += (variant_count - 1) + crispresso_sam_optional_fields = "c2:Z:ALN=NA" +\ + " ALN_SCORES=" + ('&'.join([str(x) for x in new_variant['aln_scores']])) +\ + " ALN_DETAILS=" + ('&'.join([','.join([str(y) for y in x]) for x in new_variant['ref_aln_details']])) + not_aln[seq] = crispresso_sam_optional_fields + else: + class_names = [] + ins_inds = [] + del_inds = [] + sub_inds = [] + edit_strings = [] + for idx, best_match_name in enumerate(new_variant['aln_ref_names']): + payload=new_variant['variant_'+best_match_name] + + del_inds.append([str(x[0][0])+"("+str(x[1])+")" for x in zip(payload['deletion_coordinates'], payload['deletion_sizes'])]) + + ins_vals = [] + for ins_coord,ins_size in zip(payload['insertion_coordinates'],payload['insertion_sizes']): + ins_start = payload['ref_positions'].index(ins_coord[0]) + ins_vals.append(payload['aln_seq'][ins_start+1:ins_start+1+ins_size]) + ins_inds.append([str(x[0][0])+"("+str(x[1])+"+"+x[2]+")" for x in zip(payload['insertion_coordinates'], payload['insertion_sizes'], ins_vals)]) + + sub_inds.append(payload['substitution_positions']) + edit_strings.append('D'+str(int(payload['deletion_n']))+';I'+str(int(payload['insertion_n']))+';S'+str(int(payload['substitution_n']))) + + + crispresso_sam_optional_fields = "c2:Z:ALN="+("&".join(new_variant['aln_ref_names'])) +\ + " CLASS="+new_variant['class_name']+\ + " MODS="+("&".join(edit_strings))+\ + " DEL="+("&".join([';'.join(x) for x in del_inds])) +\ + " INS="+("&".join([';'.join(x) for x in ins_inds])) +\ + " SUB=" + ("&".join([';'.join([str(y) for y in x]) for x in sub_inds])) +\ + " ALN_REF=" + ('&'.join([new_variant['variant_'+name]['aln_ref'] for name in new_variant['aln_ref_names']])) +\ + " ALN_SEQ=" + ('&'.join([new_variant['variant_'+name]['aln_seq'] for name in new_variant['aln_ref_names']])) + #cigar strings are in reference to the given amplicon, not to the genomic sequence to which this read is aligned.. + #first_variant = new_variant['variant_'+new_variant['aln_ref_names'][0]] + #sam_cigar = ''.join(CRISPRessoShared.unexplode_cigar(''.join([CRISPRessoShared.CIGAR_LOOKUP[x] for x in zip(first_variant['aln_seq'],first_variant['aln_ref'])]))) + #sam_line_els[5] = sam_cigar + #new_variant['sam_cigar'] = sam_cigar + new_variant['crispresso2_annotation'] = crispresso_sam_optional_fields + variantCache[seq] = new_variant + N_COMPUTED_ALN+=1 + N_CACHED_ALN += (variant_count - 1) + match_name = 'variant_' + new_variant['best_match_name'] + if READ_LENGTH == 0: + READ_LENGTH = len(new_variant[match_name]['aln_seq']) + N_GLOBAL_SUBS += new_variant[match_name]['substitution_n'] + new_variant[match_name]['substitutions_outside_window'] * variant_count + N_SUBS_OUTSIDE_WINDOW += new_variant[match_name]['substitutions_outside_window'] * variant_count + N_MODS_IN_WINDOW += new_variant[match_name]['mods_in_window'] * variant_count + N_MODS_OUTSIDE_WINDOW += new_variant[match_name]['mods_outside_window'] * variant_count + if new_variant[match_name]['irregular_ends']: + N_READS_IRREGULAR_ENDS += variant_count + + except FileNotFoundError: + raise CRISPRessoShared.OutputFolderIncompleteException(f"Could not find generated variants file, try deleting output folder, checking input files, and rerunning CRISPResso") + files_to_remove.append(file_path) + if N_COMPUTED_ALN + N_COMPUTED_NOTALN != num_unique_reads: + raise CRISPRessoShared.OutputFolderIncompleteException(f"Number of unique reads processed by parallel processes does not match the number of unique reads found in the bam file. Try rerunning CRISPResso.") + else: + # Single process mode + for idx, fastq_seq in enumerate(variantCache.keys()): + variant_count = variantCache[fastq_seq] + N_TOT_READS += variant_count new_variant = get_new_variant_object(args, fastq_seq, refs, ref_names, aln_matrix, pe_scaffold_dna_info) + new_variant['count'] = variant_count if new_variant['best_match_score'] <= 0: + N_CACHED_NOTALN += (variant_count - 1) N_COMPUTED_NOTALN+=1 crispresso_sam_optional_fields = "c2:Z:ALN=NA" +\ " ALN_SCORES=" + ('&'.join([str(x) for x in new_variant['aln_scores']])) +\ " ALN_DETAILS=" + ('&'.join([','.join([str(y) for y in x]) for x in new_variant['ref_aln_details']])) sam_line_els.append(crispresso_sam_optional_fields) not_aln[fastq_seq] = crispresso_sam_optional_fields - sam_out.write("\t".join(sam_line_els)+"\n") else: N_COMPUTED_ALN+=1 - + N_CACHED_ALN += (variant_count - 1) class_names = [] ins_inds = [] del_inds = [] sub_inds = [] edit_strings = [] - # for idx, best_match_name in enumerate(best_match_names): for idx, best_match_name in enumerate(new_variant['aln_ref_names']): payload=new_variant['variant_'+best_match_name] @@ -730,202 +1017,126 @@ def process_bam(bam_filename, bam_chr_loc, output_bam, variantCache, ref_names, " SUB=" + ("&".join([';'.join([str(y) for y in x]) for x in sub_inds])) +\ " ALN_REF=" + ('&'.join([new_variant['variant_'+name]['aln_ref'] for name in new_variant['aln_ref_names']])) +\ " ALN_SEQ=" + ('&'.join([new_variant['variant_'+name]['aln_seq'] for name in new_variant['aln_ref_names']])) - sam_line_els.append(crispresso_sam_optional_fields) - - #cigar strings are in reference to the given amplicon, not to the genomic sequence to which this read is aligned.. - #first_variant = new_variant['variant_'+new_variant['aln_ref_names'][0]] - #sam_cigar = ''.join(CRISPRessoShared.unexplode_cigar(''.join([CRISPRessoShared.CIGAR_LOOKUP[x] for x in zip(first_variant['aln_seq'],first_variant['aln_ref'])]))) - #sam_line_els[5] = sam_cigar - #new_variant['sam_cigar'] = sam_cigar new_variant['crispresso2_annotation'] = crispresso_sam_optional_fields - - sam_out.write("\t".join(sam_line_els)+"\n") - variantCache[fastq_seq] = new_variant match_name = 'variant_' + new_variant['best_match_name'] if READ_LENGTH == 0: READ_LENGTH = len(new_variant[match_name]['aln_seq']) - N_GLOBAL_SUBS += new_variant[match_name]['substitution_n'] + new_variant[match_name]['substitutions_outside_window'] - N_SUBS_OUTSIDE_WINDOW += new_variant[match_name]['substitutions_outside_window'] - N_MODS_IN_WINDOW += new_variant[match_name]['mods_in_window'] - N_MODS_OUTSIDE_WINDOW += new_variant[match_name]['mods_outside_window'] + N_GLOBAL_SUBS += new_variant[match_name]['substitution_n'] + new_variant[match_name]['substitutions_outside_window'] * variant_count + N_SUBS_OUTSIDE_WINDOW += new_variant[match_name]['substitutions_outside_window'] * variant_count + N_MODS_IN_WINDOW += new_variant[match_name]['mods_in_window'] * variant_count + N_MODS_OUTSIDE_WINDOW += new_variant[match_name]['mods_outside_window'] * variant_count if new_variant[match_name]['irregular_ends']: - N_READS_IRREGULAR_ENDS += 1 + N_READS_IRREGULAR_ENDS += variant_count + if (idx % 10000 == 0 and idx > 0): + info("Processing Reads; %d Completed out of %d Unique Reads"%(idx, num_unique_reads)) - output_sam = output_bam+".sam" - cmd = 'samtools view -Sb '+output_sam + '>'+output_bam + ' && samtools index ' + output_bam - bam_status=sb.call(cmd, shell=True) - if bam_status: - raise CRISPRessoShared.BadParameterException( - 'Bam creation failed. Command used: {0}'.format(cmd), - ) + # Now that we've enriched variantCache with the unique reads, we read through the bam file again to write variants to the output file + if bam_chr_loc != "": + new_proc = sb.Popen(['samtools', 'view', bam_filename, bam_chr_loc], stdout=sb.PIPE, encoding='utf-8') + else: + new_proc = sb.Popen(['samtools', 'view', bam_filename], stdout=sb.PIPE, encoding='utf-8') + + for sam_line in new_proc.stdout: + sam_line_els = sam_line.rstrip().split("\t") + fastq_seq = sam_line_els[9] + if fastq_seq in not_aln: + sam_line_els.append(not_aln[fastq_seq]) #Crispresso2 alignment: NA + sam_out.write("\t".join(sam_line_els)+"\n") + elif fastq_seq in variantCache: + sam_line_els.append(variantCache[fastq_seq]['crispresso2_annotation']) + sam_out.write("\t".join(sam_line_els)+"\n") + + for fastq_seq in not_aln.keys(): + del variantCache[fastq_seq] info("Finished reads; N_TOT_READS: %d N_COMPUTED_ALN: %d N_CACHED_ALN: %d N_COMPUTED_NOTALN: %d N_CACHED_NOTALN: %d"%(N_TOT_READS, N_COMPUTED_ALN, N_CACHED_ALN, N_COMPUTED_NOTALN, N_CACHED_NOTALN)) aln_stats = {"N_TOT_READS" : N_TOT_READS, - "N_CACHED_ALN" : N_CACHED_ALN, - "N_CACHED_NOTALN" : N_CACHED_NOTALN, - "N_COMPUTED_ALN" : N_COMPUTED_ALN, - "N_COMPUTED_NOTALN" : N_COMPUTED_NOTALN, - "N_GLOBAL_SUBS": N_GLOBAL_SUBS, - "N_SUBS_OUTSIDE_WINDOW": N_SUBS_OUTSIDE_WINDOW, - "N_MODS_IN_WINDOW": N_MODS_IN_WINDOW, - "N_MODS_OUTSIDE_WINDOW": N_MODS_OUTSIDE_WINDOW, - "N_READS_IRREGULAR_ENDS": N_READS_IRREGULAR_ENDS, - "READ_LENGTH": READ_LENGTH - } + "N_CACHED_ALN" : N_CACHED_ALN, + "N_CACHED_NOTALN" : N_CACHED_NOTALN, + "N_COMPUTED_ALN" : N_COMPUTED_ALN, + "N_COMPUTED_NOTALN" : N_COMPUTED_NOTALN, + "N_GLOBAL_SUBS": N_GLOBAL_SUBS, + "N_SUBS_OUTSIDE_WINDOW": N_SUBS_OUTSIDE_WINDOW, + "N_MODS_IN_WINDOW": N_MODS_IN_WINDOW, + "N_MODS_OUTSIDE_WINDOW": N_MODS_OUTSIDE_WINDOW, + "N_READS_IRREGULAR_ENDS": N_READS_IRREGULAR_ENDS, + "READ_LENGTH": READ_LENGTH + } return(aln_stats) +def process_fastq_write_out(fastq_input, fastq_output, variantCache, ref_names, refs, args, files_to_remove, output_directory): -def process_fastq_write_out(fastq_input, fastq_output, variantCache, ref_names, refs, args): - """ - process_fastq_write_out processes each of the reads contained in a fastq input file, given a cache of pre-computed variants. All reads are read in, analyzed, and written to output with annotation - fastq_input: input fastq - fastq_output: fastq to write out - variantCache: dict with keys: sequence dict with keys (described in process_fastq) - - ref_names: list of reference names - refs: dictionary of sequences name>ref object - args: crispresso2 args - """ - - N_TOT_READS = 0 - N_CACHED_ALN = 0 # read was found in cache - N_CACHED_NOTALN = 0 #read was found in 'not aligned' cache - N_COMPUTED_ALN = 0 # not in cache, aligned to at least 1 sequence with min cutoff - N_COMPUTED_NOTALN = 0 #not in cache, not aligned to any sequence with min cutoff - - N_GLOBAL_SUBS = 0 #number of substitutions across all reads - indicator of sequencing quality - N_SUBS_OUTSIDE_WINDOW = 0 - N_MODS_IN_WINDOW = 0 #number of modifications found inside the quantification window - N_MODS_OUTSIDE_WINDOW = 0 #number of modifications found outside the quantification window - N_READS_IRREGULAR_ENDS = 0 #number of reads with modifications at the 0 or -1 position - READ_LENGTH = 0 + aln_stats, not_aln = process_fastq(fastq_input, variantCache, ref_names, refs, args, files_to_remove, output_directory, True) + info("Reads processed, now annotating fastq_output file: %s"%(fastq_output)) - aln_matrix_loc = os.path.join(_ROOT, args.needleman_wunsch_aln_matrix_loc) - CRISPRessoShared.check_file(aln_matrix_loc) - aln_matrix = CRISPResso2Align.read_matrix(aln_matrix_loc) - pe_scaffold_dna_info = (0, None) #scaffold start loc, scaffold sequence - if args.prime_editing_pegRNA_scaffold_seq != "" and args.prime_editing_pegRNA_extension_seq != "": - pe_scaffold_dna_info = get_pe_scaffold_search(refs['Prime-edited']['sequence'], args.prime_editing_pegRNA_extension_seq, args.prime_editing_pegRNA_scaffold_seq, args.prime_editing_pegRNA_scaffold_min_match_length) - not_aln = {} #cache for reads that don't align - not_aln[''] = "" #add empty sequence to the not_aln in case the fastq has an extra newline at the end if fastq_input.endswith('.gz'): - fastq_input_handle=gzip.open(fastq_input, 'rt') + fastq_input_opener = lambda x: gzip.open(x, 'rt') else: - fastq_input_handle=open(fastq_input) - - fastq_out_handle = gzip.open(fastq_output, 'wt') - - fastq_id = fastq_input_handle.readline() - while(fastq_id): - #read through fastq in sets of 4 - fastq_seq = fastq_input_handle.readline().strip() - fastq_plus = fastq_input_handle.readline().strip() - fastq_qual = fastq_input_handle.readline() - - if (N_TOT_READS % 10000 == 0): - info("Processing reads; N_TOT_READS: %d N_COMPUTED_ALN: %d N_CACHED_ALN: %d N_COMPUTED_NOTALN: %d N_CACHED_NOTALN: %d"%(N_TOT_READS, N_COMPUTED_ALN, N_CACHED_ALN, N_COMPUTED_NOTALN, N_CACHED_NOTALN)) - - N_TOT_READS+=1 - - #if the sequence has been seen and can't be aligned, skip it - if fastq_seq in not_aln: - N_CACHED_NOTALN += 1 - fastq_out_handle.write(fastq_id+fastq_seq+"\n"+fastq_plus+not_aln[fastq_seq]+"\n"+fastq_qual) #not_aln[fastq_seq] is alignment: NA - elif fastq_seq in variantCache: #if the sequence is already associated with a variant in the variant cache, pull it out - N_CACHED_ALN+=1 - variantCache[fastq_seq]['count'] += 1 - match_name = "variant_" + variantCache[fastq_seq]['best_match_name'] - N_GLOBAL_SUBS += variantCache[fastq_seq][match_name]['substitution_n'] + variantCache[fastq_seq][match_name]['substitutions_outside_window'] - N_SUBS_OUTSIDE_WINDOW += variantCache[fastq_seq][match_name]['substitutions_outside_window'] - N_MODS_IN_WINDOW += variantCache[fastq_seq][match_name]['mods_in_window'] - N_MODS_OUTSIDE_WINDOW += variantCache[fastq_seq][match_name]['mods_outside_window'] - if variantCache[fastq_seq][match_name]['irregular_ends']: - N_READS_IRREGULAR_ENDS += 1 - fastq_out_handle.write(fastq_id+fastq_seq+"\n"+fastq_plus+variantCache[fastq_seq]['crispresso2_annotation']+"\n"+fastq_qual) - - #otherwise, create a new variant object, and put it in the cache - else: - new_variant = get_new_variant_object(args, fastq_seq, refs, ref_names, aln_matrix, pe_scaffold_dna_info) - if new_variant['best_match_score'] <= 0: - N_COMPUTED_NOTALN+=1 + fastq_input_opener = open + + with gzip.open(fastq_output, 'wt') as fastq_out_handle, fastq_input_opener(fastq_input) as fastq_input_handle: + + fastq_id = fastq_input_handle.readline() + while(fastq_id): + #read through fastq in sets of 4 + fastq_seq = fastq_input_handle.readline().strip() + fastq_plus = fastq_input_handle.readline().strip() + fastq_qual = fastq_input_handle.readline() + + if fastq_seq in not_aln: + new_variant = not_aln[fastq_seq] crispresso2_annotation = " ALN=NA" +\ - " ALN_SCORES=" + ('&'.join([str(x) for x in new_variant['aln_scores']])) +\ - " ALN_DETAILS=" + ('&'.join([','.join([str(y) for y in x]) for x in new_variant['ref_aln_details']])) - not_aln[fastq_seq] = crispresso2_annotation + " ALN_SCORES=" + ('&'.join([str(x) for x in new_variant['aln_scores']])) +\ + " ALN_DETAILS=" + ('&'.join([','.join([str(y) for y in x]) for x in new_variant['ref_aln_details']])) fastq_out_handle.write(fastq_id+fastq_seq+"\n"+fastq_plus+crispresso2_annotation+"\n"+fastq_qual) - else: - N_COMPUTED_ALN+=1 - ins_inds = [] - del_inds = [] - sub_inds = [] - edit_strings = [] + fastq_id = fastq_input_handle.readline() + continue -# for idx, best_match_name in enumerate(best_match_names): - for idx, best_match_name in enumerate(new_variant['aln_ref_names']): - payload=new_variant['variant_'+best_match_name] + if fastq_seq in variantCache: + new_variant = variantCache[fastq_seq] - del_inds.append([str(x[0][0])+"("+str(x[1])+")" for x in zip(payload['deletion_coordinates'], payload['deletion_sizes'])]) + ins_inds = [] + del_inds = [] + sub_inds = [] + edit_strings = [] - ins_vals = [] - for ins_coord,ins_size in zip(payload['insertion_coordinates'],payload['insertion_sizes']): - ins_start = payload['ref_positions'].index(ins_coord[0]) - ins_vals.append(payload['aln_seq'][ins_start+1:ins_start+1+ins_size]) - ins_inds.append([str(x[0][0])+"("+str(x[1])+"+"+x[2]+")" for x in zip(payload['insertion_coordinates'], payload['insertion_sizes'], ins_vals)]) + for idx, best_match_name in enumerate(new_variant['aln_ref_names']): + payload=new_variant['variant_'+best_match_name] - sub_inds.append(payload['substitution_positions']) - edit_strings.append('D'+str(int(payload['deletion_n']))+';I'+str(int(payload['insertion_n']))+';S'+str(int(payload['substitution_n']))) + del_inds.append([str(x[0][0])+"("+str(x[1])+")" for x in zip(payload['deletion_coordinates'], payload['deletion_sizes'])]) - crispresso2_annotation = " ALN="+("&".join(new_variant['aln_ref_names'])) +\ - " ALN_SCORES=" + ('&'.join([str(x) for x in new_variant['aln_scores']])) +\ - " ALN_DETAILS=" + ('&'.join([','.join([str(y) for y in x]) for x in new_variant['ref_aln_details']])) +\ - " CLASS="+new_variant['class_name']+\ - " MODS="+("&".join(edit_strings))+\ - " DEL="+("&".join([';'.join(x) for x in del_inds])) +\ - " INS="+("&".join([';'.join(x) for x in ins_inds])) +\ - " SUB=" + ("&".join([';'.join([str(y) for y in x]) for x in sub_inds])) +\ - " ALN_REF=" + ('&'.join([new_variant['variant_'+name]['aln_ref'] for name in new_variant['aln_ref_names']])) +\ - " ALN_SEQ=" + ('&'.join([new_variant['variant_'+name]['aln_seq'] for name in new_variant['aln_ref_names']])) + ins_vals = [] + for ins_coord,ins_size in zip(payload['insertion_coordinates'],payload['insertion_sizes']): + ins_start = payload['ref_positions'].index(ins_coord[0]) + ins_vals.append(payload['aln_seq'][ins_start+1:ins_start+1+ins_size]) + ins_inds.append([str(x[0][0])+"("+str(x[1])+"+"+x[2]+")" for x in zip(payload['insertion_coordinates'], payload['insertion_sizes'], ins_vals)]) - new_variant['crispresso2_annotation'] = crispresso2_annotation + sub_inds.append(payload['substitution_positions']) + edit_strings.append('D'+str(int(payload['deletion_n']))+';I'+str(int(payload['insertion_n']))+';S'+str(int(payload['substitution_n']))) - fastq_out_handle.write(fastq_id+fastq_seq+"\n"+fastq_plus+crispresso2_annotation+"\n"+fastq_qual) + crispresso2_annotation = " ALN="+("&".join(new_variant['aln_ref_names'])) +\ + " ALN_SCORES=" + ('&'.join([str(x) for x in new_variant['aln_scores']])) +\ + " ALN_DETAILS=" + ('&'.join([','.join([str(y) for y in x]) for x in new_variant['ref_aln_details']])) +\ + " CLASS="+new_variant['class_name']+\ + " MODS="+("&".join(edit_strings))+\ + " DEL="+("&".join([';'.join(x) for x in del_inds])) +\ + " INS="+("&".join([';'.join(x) for x in ins_inds])) +\ + " SUB=" + ("&".join([';'.join([str(y) for y in x]) for x in sub_inds])) +\ + " ALN_REF=" + ('&'.join([new_variant['variant_'+name]['aln_ref'] for name in new_variant['aln_ref_names']])) +\ + " ALN_SEQ=" + ('&'.join([new_variant['variant_'+name]['aln_seq'] for name in new_variant['aln_ref_names']])) + new_variant['crispresso2_annotation'] = crispresso2_annotation + fastq_out_handle.write(fastq_id+fastq_seq+"\n"+fastq_plus+crispresso2_annotation+"\n"+fastq_qual) + #last step of loop = read next line + fastq_id = fastq_input_handle.readline() - variantCache[fastq_seq] = new_variant - match_name = 'variant_' + new_variant['best_match_name'] - if READ_LENGTH == 0: - READ_LENGTH = len(new_variant[match_name]['aln_seq']) - N_GLOBAL_SUBS += new_variant[match_name]['substitution_n'] + new_variant[match_name]['substitutions_outside_window'] - N_SUBS_OUTSIDE_WINDOW += new_variant[match_name]['substitutions_outside_window'] - N_MODS_IN_WINDOW += new_variant[match_name]['mods_in_window'] - N_MODS_OUTSIDE_WINDOW += new_variant[match_name]['mods_outside_window'] - if new_variant[match_name]['irregular_ends']: - N_READS_IRREGULAR_ENDS += 1 - - #last step of loop = read next line - fastq_id = fastq_input_handle.readline() - fastq_input_handle.close() - fastq_out_handle.close() - info("Finished reads; N_TOT_READS: %d N_COMPUTED_ALN: %d N_CACHED_ALN: %d N_COMPUTED_NOTALN: %d N_CACHED_NOTALN: %d"%(N_TOT_READS, N_COMPUTED_ALN, N_CACHED_ALN, N_COMPUTED_NOTALN, N_CACHED_NOTALN)) - aln_stats = {"N_TOT_READS" : N_TOT_READS, - "N_CACHED_ALN" : N_CACHED_ALN, - "N_CACHED_NOTALN" : N_CACHED_NOTALN, - "N_COMPUTED_ALN" : N_COMPUTED_ALN, - "N_COMPUTED_NOTALN" : N_COMPUTED_NOTALN, - "N_GLOBAL_SUBS": N_GLOBAL_SUBS, - "N_SUBS_OUTSIDE_WINDOW": N_SUBS_OUTSIDE_WINDOW, - "N_MODS_IN_WINDOW": N_MODS_IN_WINDOW, - "N_MODS_OUTSIDE_WINDOW": N_MODS_OUTSIDE_WINDOW, - "N_READS_IRREGULAR_ENDS": N_READS_IRREGULAR_ENDS, - "READ_LENGTH": READ_LENGTH - } - return(aln_stats) + return aln_stats -def process_single_fastq_write_bam_out(fastq_input, bam_output, bam_header, variantCache, ref_names, refs, args): +def process_single_fastq_write_bam_out(fastq_input, bam_output, bam_header, variantCache, ref_names, refs, args, files_to_remove, output_directory): """ process_fastq_write_out processes each of the reads contained in a fastq input file, given a cache of pre-computed variants. All reads are read in, analyzed, and written to output with annotation @@ -945,81 +1156,37 @@ def process_single_fastq_write_bam_out(fastq_input, bam_output, bam_header, vari ref_names: list of reference names refs: dictionary of sequences name>ref object args: crispresso2 args + files_to_remove: list of files to remove + output_directory: directory to write output tsv files to """ + aln_stats, not_aln = process_fastq(fastq_input, variantCache, ref_names, refs, args, files_to_remove, output_directory, True) + info("Reads processed, now annotating fastq_output file: %s"%(bam_output)) - N_TOT_READS = 0 - N_CACHED_ALN = 0 # read was found in cache - N_CACHED_NOTALN = 0 # read was found in 'not aligned' cache - N_COMPUTED_ALN = 0 # not in cache, aligned to at least 1 sequence with min cutoff - N_COMPUTED_NOTALN = 0 # not in cache, not aligned to any sequence with min cutoff - - N_GLOBAL_SUBS = 0 #number of substitutions across all reads - indicator of sequencing quality - N_SUBS_OUTSIDE_WINDOW = 0 - N_MODS_IN_WINDOW = 0 #number of modifications found inside the quantification window - N_MODS_OUTSIDE_WINDOW = 0 #number of modifications found outside the quantification window - N_READS_IRREGULAR_ENDS = 0 #number of reads with modifications at the 0 or -1 position - READ_LENGTH = 0 aln_matrix_loc = os.path.join(_ROOT, args.needleman_wunsch_aln_matrix_loc) CRISPRessoShared.check_file(aln_matrix_loc) aln_matrix = CRISPResso2Align.read_matrix(aln_matrix_loc) - pe_scaffold_dna_info = (0, None) # scaffold start loc, scaffold sequence - if args.prime_editing_pegRNA_scaffold_seq != "" and args.prime_editing_pegRNA_extension_seq != "": - pe_scaffold_dna_info = get_pe_scaffold_search(refs['Prime-edited']['sequence'], args.prime_editing_pegRNA_extension_seq, args.prime_editing_pegRNA_scaffold_seq, args.prime_editing_pegRNA_scaffold_min_match_length) - not_aln = {} # cache for reads that don't align - not_aln[''] = "" # add empty sequence to the not_aln in case the fastq has an extra newline at the end - if fastq_input.endswith('.gz'): - fastq_input_handle = gzip.open(fastq_input, 'rt') + fastq_input_opener = lambda x: gzip.open(x, 'rt') else: - fastq_input_handle = open(fastq_input) + fastq_input_opener = open sam_out = bam_output+".sam" - sam_out_handle = open(sam_out, 'wt') - - # write sam output header - sam_out_handle.write(bam_header) - - fastq_id = fastq_input_handle.readline().strip()[1:] - while(fastq_id): - # read through fastq in sets of 4 - fastq_seq = fastq_input_handle.readline().strip() - fastq_plus = fastq_input_handle.readline().strip() - fastq_qual = fastq_input_handle.readline().strip() - - if (N_TOT_READS % 10000 == 0): - info("Processing reads; N_TOT_READS: %d N_COMPUTED_ALN: %d N_CACHED_ALN: %d N_COMPUTED_NOTALN: %d N_CACHED_NOTALN: %d"%(N_TOT_READS, N_COMPUTED_ALN, N_CACHED_ALN, N_COMPUTED_NOTALN, N_CACHED_NOTALN)) - - N_TOT_READS += 1 - - # if the sequence has been seen and can't be aligned, skip it - if fastq_seq in not_aln: - N_CACHED_NOTALN += 1 - new_sam_entry = not_aln[fastq_seq][:] - new_sam_entry[0] = fastq_id - new_sam_entry[10] = fastq_qual - sam_out_handle.write("\t".join(new_sam_entry)+"\n") # not_aln[fastq_seq] is alignment: NA - elif fastq_seq in variantCache: # if the sequence is already associated with a variant in the variant cache, pull it out - N_CACHED_ALN += 1 - variantCache[fastq_seq]['count'] += 1 - match_name = "variant_" + variantCache[fastq_seq]['best_match_name'] - N_GLOBAL_SUBS += variantCache[fastq_seq][match_name]['substitution_n'] + variantCache[fastq_seq][match_name]['substitutions_outside_window'] - N_SUBS_OUTSIDE_WINDOW += variantCache[fastq_seq][match_name]['substitutions_outside_window'] - N_MODS_IN_WINDOW += variantCache[fastq_seq][match_name]['mods_in_window'] - N_MODS_OUTSIDE_WINDOW += variantCache[fastq_seq][match_name]['mods_outside_window'] - if variantCache[fastq_seq][match_name]['irregular_ends']: - N_READS_IRREGULAR_ENDS += 1 - new_sam_entry = variantCache[fastq_seq]['sam_entry'][:] - new_sam_entry[0] = fastq_id - new_sam_entry[10] = fastq_qual - sam_out_handle.write("\t".join(new_sam_entry)+"\n") # write cached alignment with modified read id and qual - - # otherwise, create a new variant object, and put it in the cache - else: - new_variant = get_new_variant_object(args, fastq_seq, refs, ref_names, aln_matrix, pe_scaffold_dna_info) - if new_variant['best_match_score'] <= 0: - N_COMPUTED_NOTALN += 1 + + with open(sam_out, 'wt') as sam_out_handle, fastq_input_opener(fastq_input) as fastq_input_handle: + # write sam output header + sam_out_handle.write(bam_header) + + fastq_id = fastq_input_handle.readline().strip()[1:] + while(fastq_id): + # read through fastq in sets of 4 + fastq_seq = fastq_input_handle.readline().strip() + fastq_plus = fastq_input_handle.readline().strip() + fastq_qual = fastq_input_handle.readline().strip() + + # if the sequence has been seen and can't be aligned, skip it + if fastq_seq in not_aln: new_sam_entry = [ fastq_id, # read id '4', # flag = unmapped 0x4 @@ -1039,14 +1206,15 @@ def process_single_fastq_write_bam_out(fastq_input, bam_output, bam_header, vari new_sam_entry.append(crispresso_sam_optional_fields) not_aln[fastq_seq] = new_sam_entry sam_out_handle.write("\t".join(new_sam_entry)+"\n") # write cached alignment with modified read id and qual - else: - N_COMPUTED_ALN += 1 + continue + + if fastq_seq in variantCache: + new_variant = variantCache[fastq_seq] ins_inds = [] del_inds = [] sub_inds = [] edit_strings = [] -# for idx, best_match_name in enumerate(best_match_names): for idx, best_match_name in enumerate(new_variant['aln_ref_names']): payload = new_variant['variant_'+best_match_name] @@ -1121,21 +1289,8 @@ def process_single_fastq_write_bam_out(fastq_input, bam_output, bam_header, vari sam_out_handle.write("\t".join(new_sam_entry)+"\n") # write cached alignment with modified read id and qual - variantCache[fastq_seq] = new_variant - match_name = 'variant_' + new_variant['best_match_name'] - if READ_LENGTH == 0: - READ_LENGTH = len(new_variant[match_name]['aln_seq']) - N_GLOBAL_SUBS += new_variant[match_name]['substitution_n'] + new_variant[match_name]['substitutions_outside_window'] - N_SUBS_OUTSIDE_WINDOW += new_variant[match_name]['substitutions_outside_window'] - N_MODS_IN_WINDOW += new_variant[match_name]['mods_in_window'] - N_MODS_OUTSIDE_WINDOW += new_variant[match_name]['mods_outside_window'] - if new_variant[match_name]['irregular_ends']: - N_READS_IRREGULAR_ENDS += 1 - - #last step of loop = read next line - fastq_id = fastq_input_handle.readline().strip()[1:] - fastq_input_handle.close() - sam_out_handle.close() + #last step of loop = read next line + fastq_id = fastq_input_handle.readline().strip()[1:] sort_and_index_cmd = 'samtools sort ' + sam_out + ' -o ' + bam_output + ' && samtools index ' + bam_output sort_bam_status = sb.call(sort_and_index_cmd, shell=True) if sort_bam_status: @@ -1146,22 +1301,11 @@ def process_single_fastq_write_bam_out(fastq_input, bam_output, bam_header, vari if not args.debug: os.remove(sam_out) - info("Finished reads; N_TOT_READS: %d N_COMPUTED_ALN: %d N_CACHED_ALN: %d N_COMPUTED_NOTALN: %d N_CACHED_NOTALN: %d"%(N_TOT_READS, N_COMPUTED_ALN, N_CACHED_ALN, N_COMPUTED_NOTALN, N_CACHED_NOTALN)) - aln_stats = {"N_TOT_READS" : N_TOT_READS, - "N_CACHED_ALN" : N_CACHED_ALN, - "N_CACHED_NOTALN" : N_CACHED_NOTALN, - "N_COMPUTED_ALN" : N_COMPUTED_ALN, - "N_COMPUTED_NOTALN" : N_COMPUTED_NOTALN, - "N_GLOBAL_SUBS": N_GLOBAL_SUBS, - "N_SUBS_OUTSIDE_WINDOW": N_SUBS_OUTSIDE_WINDOW, - "N_MODS_IN_WINDOW": N_MODS_IN_WINDOW, - "N_MODS_OUTSIDE_WINDOW": N_MODS_OUTSIDE_WINDOW, - "N_READS_IRREGULAR_ENDS": N_READS_IRREGULAR_ENDS, - "READ_LENGTH": READ_LENGTH - } + info("Finished writing out to bam file: %s"%(bam_output)) return(aln_stats) + def normalize_name(name, fastq_r1, fastq_r2, bam_input): """Normalize the name according to the inputs and clean it. @@ -2486,21 +2630,22 @@ def get_prime_editing_guides(this_amp_seq, this_amp_name, ref0_seq, prime_edited ####INITIALIZE CACHE#### variantCache = {} - #put empty sequence into cache - cache_fastq_seq = '' - variantCache[cache_fastq_seq] = {} - variantCache[cache_fastq_seq]['count'] = 0 #operates on variantCache if args.bam_input: - aln_stats = process_bam(args.bam_input, args.bam_chr_loc, crispresso2_info['bam_output'], variantCache, ref_names, refs, args) + aln_stats = process_bam(args.bam_input, args.bam_chr_loc, crispresso2_info['bam_output'], variantCache, ref_names, refs, args, files_to_remove, OUTPUT_DIRECTORY) elif args.fastq_output: - aln_stats = process_fastq_write_out(processed_output_filename, crispresso2_info['fastq_output'], variantCache, ref_names, refs, args) + aln_stats = process_fastq_write_out(processed_output_filename, crispresso2_info['fastq_output'], variantCache, ref_names, refs, args, files_to_remove, OUTPUT_DIRECTORY) elif args.bam_output: bam_header += '@PG\tID:crispresso2\tPN:crispresso2\tVN:'+CRISPRessoShared.__version__+'\tCL:"'+crispresso_cmd_to_write+'"\n' - aln_stats = process_single_fastq_write_bam_out(processed_output_filename, crispresso2_info['bam_output'], bam_header, variantCache, ref_names, refs, args) + aln_stats = process_single_fastq_write_bam_out(processed_output_filename, crispresso2_info['bam_output'], bam_header, variantCache, ref_names, refs, args, files_to_remove, OUTPUT_DIRECTORY) else: - aln_stats = process_fastq(processed_output_filename, variantCache, ref_names, refs, args) + aln_stats = process_fastq(processed_output_filename, variantCache, ref_names, refs, args, files_to_remove, OUTPUT_DIRECTORY) + + #put empty sequence into cache + cache_fastq_seq = '' + variantCache[cache_fastq_seq] = {} + variantCache[cache_fastq_seq]['count'] = 0 info('Done!', {'percent_complete': 20}) @@ -2712,10 +2857,6 @@ def get_allele_row(reference_name, variant_count, aln_ref_names_str, aln_ref_sco #end get_allele_row() definition - #take care of empty seqs - cache_fastq_seq = '' - variantCache[cache_fastq_seq]['count'] = 0 - ###iterate through variants for variant in variantCache: #skip variant if there were none observed diff --git a/tests/unit_tests/test_CRISPRessoCORE.py b/tests/unit_tests/test_CRISPRessoCORE.py index de5b8ff8..cd54d40a 100644 --- a/tests/unit_tests/test_CRISPRessoCORE.py +++ b/tests/unit_tests/test_CRISPRessoCORE.py @@ -192,6 +192,80 @@ def test_get_cloned_include_idxs_from_quant_window_coordinates_include_zero(): quant_window_coordinates = '0-5' s1inds = [0, 1, 2, 3, 4, 5] assert CRISPRessoCORE.get_cloned_include_idxs_from_quant_window_coordinates(quant_window_coordinates, s1inds) == [0, 1, 2, 3, 4, 5] + + +# Testing parallelization functions +def test_regular_input(): + # Test with typical input + assert CRISPRessoCORE.get_variant_cache_equal_boundaries(100, 4) == [0, 25, 50, 75, 100] + +def test_remainder_input(): +# # Test with typical input + assert CRISPRessoCORE.get_variant_cache_equal_boundaries(101, 4) == [0, 25, 50, 75, 101] + +def test_similar_num_reads_input(): +# # Test with typical input + assert CRISPRessoCORE.get_variant_cache_equal_boundaries(11, 10) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11] + +def test_large_similar_num_reads_input(): +# # Test with typical input + assert CRISPRessoCORE.get_variant_cache_equal_boundaries(101, 100) == list(range(0, 100)) + [101] + +def test_more_processes_than_reads(): +# # Test with typical input + # assert CRISPRessoCORE.get_variant_cache_equal_boundaries(3, 5) + # assert that an exception is raised + with pytest.raises(Exception): + CRISPRessoCORE.get_variant_cache_equal_boundaries(3, 5) + +def test_single_process(): + # Test with a single process + assert CRISPRessoCORE.get_variant_cache_equal_boundaries(50, 1) == [0, 50] + +def test_zero_sequences(): + # Test with zero unique sequences + with pytest.raises(Exception): + CRISPRessoCORE.get_variant_cache_equal_boundaries(0, 3) + +def test_large_numbers(): + # Test with large number of processes and sequences + boundaries = CRISPRessoCORE.get_variant_cache_equal_boundaries(10000, 10) + assert len(boundaries) == 11 # Check that there are 11 boundaries + +def test_sublist_generation(): + n_processes = 4 + unique_reads = 100 + mock_variant_cache = [i for i in range(unique_reads)] + assert len(mock_variant_cache) == 100 + boundaries = CRISPRessoCORE.get_variant_cache_equal_boundaries(unique_reads, n_processes) + assert boundaries == [0, 25, 50, 75, 100] + sublists = [] + for i in range(n_processes): + left_sublist_index = boundaries[i] + right_sublist_index = boundaries[i+1] + sublist = mock_variant_cache[left_sublist_index:right_sublist_index] + sublists.append(sublist) + assert [len(sublist) for sublist in sublists] == [25, 25, 25, 25] + assert [s for sublist in sublists for s in sublist] == mock_variant_cache + +def test_irregular_sublist_generation(): + n_processes = 4 + unique_reads = 113 + mock_variant_cache = [i for i in range(unique_reads)] + assert len(mock_variant_cache) == 113 + boundaries = CRISPRessoCORE.get_variant_cache_equal_boundaries(unique_reads, n_processes) + # assert boundaries == [0, 25, 50, 75, 100] + sublists = [] + for i in range(n_processes): + left_sublist_index = boundaries[i] + right_sublist_index = boundaries[i+1] + sublist = mock_variant_cache[left_sublist_index:right_sublist_index] + sublists.append(sublist) + assert [len(sublist) for sublist in sublists] == [28,28,28,29] + assert [s for sublist in sublists for s in sublist] == mock_variant_cache + + + if __name__ == "__main__": # execute only if run as a script