diff --git a/Makefile b/Makefile index 87cdcee9c2c..eacb40aab6a 100644 --- a/Makefile +++ b/Makefile @@ -830,7 +830,6 @@ clean: clean-rocksdb clean-vcflib $(RM) -r share/ cd $(DEP_DIR) && cd sonLib && $(MAKE) clean cd $(DEP_DIR) && cd sparsehash && $(MAKE) clean - cd $(DEP_DIR) && cd htslib && $(MAKE) clean cd $(DEP_DIR) && cd fastahack && $(MAKE) clean cd $(DEP_DIR) && cd gcsa2 && $(MAKE) clean cd $(DEP_DIR) && cd gbwt && $(MAKE) clean diff --git a/deps/gbwt b/deps/gbwt index a9fed2c8d8a..70a97edd60c 160000 --- a/deps/gbwt +++ b/deps/gbwt @@ -1 +1 @@ -Subproject commit a9fed2c8d8aafba17d5dabb18114ae07c366d915 +Subproject commit 70a97edd60cae8768520bac74100964af0105bbf diff --git a/src/gbwt_helper.cpp b/src/gbwt_helper.cpp index 009ec55fc33..77fc5c48c82 100644 --- a/src/gbwt_helper.cpp +++ b/src/gbwt_helper.cpp @@ -1,6 +1,8 @@ #include "gbwt_helper.hpp" #include "utility.hpp" +#include + #include namespace vg { @@ -105,6 +107,101 @@ void finish_gbwt_constuction(gbwt::GBWTBuilder& builder, //------------------------------------------------------------------------------ +void load_gbwt(const std::string& filename, gbwt::GBWT& index, bool show_progress) { + if (show_progress) { + std::cerr << "Loading compressed GBWT from " << filename << std::endl; + } + std::unique_ptr loaded = vg::io::VPKG::load_one(filename); + if (loaded.get() == nullptr) { + std::cerr << "error: [load_gbwt()] could not load compressed GBWT " << filename << std::endl; + std::exit(EXIT_FAILURE); + } + index = std::move(*loaded); +} + +void load_gbwt(const std::string& filename, gbwt::DynamicGBWT& index, bool show_progress) { + if (show_progress) { + std::cerr << "Loading dynamic GBWT from " << filename << std::endl; + } + std::unique_ptr loaded = vg::io::VPKG::load_one(filename); + if (loaded.get() == nullptr) { + std::cerr << "error: [load_gbwt()] could not load dynamic GBWT " << filename << std::endl; + std::exit(EXIT_FAILURE); + } + index = std::move(*loaded); +} + +void GBWTHandler::use_compressed() { + if (this->in_use == index_compressed) { + return; + } else if (this->in_use == index_dynamic) { + if (this->show_progress) { + std::cerr << "Converting dynamic GBWT into compressed GBWT" << std::endl; + } + this->compressed = gbwt::GBWT(this->dynamic); + this->dynamic = gbwt::DynamicGBWT(); + this->in_use = index_compressed; + } else { + load_gbwt(this->filename, this->compressed, this->show_progress); + this->in_use = index_compressed; + } +} + +void GBWTHandler::use_dynamic() { + if (this->in_use == index_dynamic) { + return; + } else if (this->in_use == index_compressed) { + if (this->show_progress) { + std::cerr << "Converting compressed GBWT into dynamic GBWT" << std::endl; + } + this->dynamic = gbwt::DynamicGBWT(this->compressed); + this->compressed = gbwt::GBWT(); + this->in_use = index_dynamic; + } else { + load_gbwt(this->filename, this->dynamic, this->show_progress); + this->in_use = index_dynamic; + } +} + +void GBWTHandler::use(gbwt::GBWT& new_index) { + this->clear(); + this->compressed.swap(new_index); + this->in_use = index_compressed; +} + +void GBWTHandler::use(gbwt::DynamicGBWT& new_index) { + this->clear(); + this->dynamic.swap(new_index); + this->in_use = index_dynamic; +} + +void GBWTHandler::unbacked() { + this->filename = std::string(); +} + +void GBWTHandler::serialize(const std::string& new_filename) { + this->filename = new_filename; + if (this->show_progress) { + std::cerr << "Serializing the GBWT to " << this->filename << std::endl; + } + if (this->in_use == index_none) { + std::cerr << "warning: [GBWTHandler] no GBWT to serialize" << std::endl; + return; + } else if (this->in_use == index_compressed) { + vg::io::VPKG::save(this->compressed, this->filename); + } else { + vg::io::VPKG::save(this->dynamic, this->filename); + } +} + +void GBWTHandler::clear() { + this->compressed = gbwt::GBWT(); + this->dynamic = gbwt::DynamicGBWT(); + this->in_use = index_none; +} + +//------------------------------------------------------------------------------ + std::string insert_gbwt_path(MutablePathHandleGraph& graph, const gbwt::GBWT& gbwt_index, gbwt::size_type id) { gbwt::size_type sequence_id = gbwt::Path::encode(id, false); diff --git a/src/gbwt_helper.hpp b/src/gbwt_helper.hpp index b73458f6999..7a8dc6d8db7 100644 --- a/src/gbwt_helper.hpp +++ b/src/gbwt_helper.hpp @@ -74,6 +74,60 @@ void finish_gbwt_constuction(gbwt::GBWTBuilder& builder, //------------------------------------------------------------------------------ +/// Load a compressed GBWT from the file. +void load_gbwt(const std::string& filename, gbwt::GBWT& index, bool show_progress = false); + +/// Load a dynamic GBWT from the file. +void load_gbwt(const std::string& filename, gbwt::DynamicGBWT& index, bool show_progress = false); + +/** + * Helper class that stores either a GBWT or a DynamicGBWT and loads them from a file + * or converts between them when necessary. + */ +struct GBWTHandler { + enum index_type { index_none, index_compressed, index_dynamic }; + + /// Compressed GBWT. + gbwt::GBWT compressed; + + /// Dynamic GBWT. + gbwt::DynamicGBWT dynamic; + + /// Which index is in use. + index_type in_use = index_none; + + /// The in-memory indexes are backed by this file. + std::string filename; + + /// Print progress information to stderr when loading/converting indexes. + bool show_progress = false; + + /// Switch to a compressed GBWT, converting it from the dynamic GBWT or reading it + /// from a file if necessary. + void use_compressed(); + + /// Switch to a dynamic GBWT, converting it from the compressed GBWT or reading it + /// from a file if necessary. + void use_dynamic(); + + /// Start using this compressed GBWT. Clears the index used as the argument. + void use(gbwt::GBWT& new_index); + + /// Start using this dynamic GBWT. Clears the index used as the argument. + void use(gbwt::DynamicGBWT& new_index); + + /// The GBWT is no longer backed by a file. + void unbacked(); + + /// Serialize the in-memory index to this file and start using it as the backing file. + void serialize(const std::string& new_filename); + + /// Clear the in-memory index. + void clear(); +}; + +//------------------------------------------------------------------------------ + /// Insert a GBWT thread into the graph and return its name. Returns an empty string on failure. /// NOTE: id is a gbwt path id, not a gbwt sequence id. std::string insert_gbwt_path(MutablePathHandleGraph& graph, const gbwt::GBWT& gbwt_index, gbwt::size_type id); diff --git a/src/haplotype_indexer.cpp b/src/haplotype_indexer.cpp index 8cb9e13226a..e673dfb69b5 100644 --- a/src/haplotype_indexer.cpp +++ b/src/haplotype_indexer.cpp @@ -336,12 +336,7 @@ std::unique_ptr HaplotypeIndexer::build_gbwt(const std::vecto return (this->excluded_samples.find(sample_names[sample_id]) == this->excluded_samples.end()); }, [&](const gbwt::Haplotype& haplotype) { builder.insert(haplotype.path, true); // Insert in both orientations. - builder.index.metadata.addPath({ - static_cast(haplotype.sample), - static_cast(contig_names.size() - 1), - static_cast(haplotype.phase), - static_cast(haplotype.count) - }); + builder.index.metadata.addPath(haplotype.sample, contig_names.size() - 1, haplotype.phase, haplotype.count); haplotypes.insert(gbwt::range_type(haplotype.sample, haplotype.phase)); }, [&](gbwt::size_type, gbwt::size_type) -> bool { // For each overlap, discard it if our global flag is set. @@ -399,21 +394,11 @@ std::unique_ptr HaplotypeIndexer::build_gbwt(const PathHandle } builder.insert(buffer, true); // Insert in both orientations. if (this->paths_as_samples) { - builder.index.metadata.addPath({ - static_cast(sample_names.size()), - static_cast(0), - static_cast(0), - static_cast(0) - }); + builder.index.metadata.addPath(sample_names.size(), 0, 0, 0); sample_names.push_back(path_name); haplotype_count++; } else { - builder.index.metadata.addPath({ - static_cast(0), - static_cast(contig_names.size()), - static_cast(0), - static_cast(0) - }); + builder.index.metadata.addPath(0, contig_names.size(), 0, 0); contig_names.push_back(path_name); } }); @@ -463,12 +448,7 @@ std::unique_ptr HaplotypeIndexer::build_gbwt(const PathHandle sample_count = iter->second.second; iter->second.second++; } - builder.index.metadata.addPath({ - static_cast(sample_id), - static_cast(0), - static_cast(0), - static_cast(sample_count) - }); + builder.index.metadata.addPath(sample_id, 0, 0, sample_count); }; for (auto& file_name : aln_filenames) { if (aln_format == "GAM") { diff --git a/src/subcommand/gbwt_main.cpp b/src/subcommand/gbwt_main.cpp index 53b02917583..ffb8e2763da 100644 --- a/src/subcommand/gbwt_main.cpp +++ b/src/subcommand/gbwt_main.cpp @@ -1,7 +1,8 @@ /** \file gbwt_main.cpp * - * Defines the "vg gbwt" subcommand, which wraps up access for commands we'd otherwise find - * in the gbwt submodule. */ + * Defines the "vg gbwt" subcommand for building, merging, and manipulating GBWT indexes + * and GBWTGraphs. + */ #include #include @@ -16,32 +17,20 @@ #include "../region.hpp" #include -#include #include #include #include -using namespace std; using namespace vg; -using namespace vg::subcommand; - -#include enum build_mode { build_none, build_vcf, build_paths, build_alignments }; -enum merge_mode { merge_none, merge_insert, merge_fast }; +enum merge_mode { merge_none, merge_insert, merge_fast, merge_parallel }; enum path_cover_mode { path_cover_none, path_cover_augment, path_cover_local, path_cover_greedy }; -enum index_type { index_none, index_compressed, index_dynamic }; - -void load_gbwt(const std::string& filename, gbwt::GBWT& index, bool show_progress); -void load_gbwt(const std::string& filename, gbwt::DynamicGBWT& index, bool show_progress); -void use_or_save(std::unique_ptr& index, gbwt::DynamicGBWT& dynamic_index, index_type& in_use, std::vector& filenames, size_t i, bool show_progress); +void use_or_save(std::unique_ptr& index, GBWTHandler& gbwts, std::vector& filenames, size_t i, bool show_progress); -void get_compressed(gbwt::GBWT& compressed_index, gbwt::DynamicGBWT& dynamic_index, index_type& in_use, const std::string& filename, bool show_progress); -void get_dynamic(gbwt::GBWT& compressed_index, gbwt::DynamicGBWT& dynamic_index, index_type& in_use, const std::string& filename, bool show_progress); - -void print_metadata(std::ostream& out, const gbwt::GBWT& compressed_index, const gbwt::DynamicGBWT& dynamic_index, index_type in_use); +void print_metadata(std::ostream& out, const GBWTHandler& gbwts); void get_graph(std::unique_ptr& graph, bool& in_use, const std::string& filename, bool show_progress); void clear_graph(std::unique_ptr& graph, bool& in_use); @@ -70,6 +59,10 @@ size_t default_build_jobs() { return std::max(static_cast(1), static_cast(omp_get_max_threads() / 2)); } +size_t default_merge_jobs() { + return std::min(static_cast(gbwt::MergeParameters::MERGE_JOBS), std::max(static_cast(1), static_cast(omp_get_max_threads() / 2))); +} + void use_preset(HaplotypeIndexer& haplotype_indexer, std::string preset_name); void help_gbwt(char** argv) { @@ -84,8 +77,11 @@ void help_gbwt(char** argv) { std::cerr << " -p, --progress show progress and statistics" << std::endl; std::cerr << std::endl; std::cerr << "GBWT construction parameters (for steps 1 and 4):" << std::endl; - std::cerr << " -b, --buffer-size N GBWT construction buffer size in millions of nodes (default " << (gbwt::DynamicGBWT::INSERT_BATCH_SIZE / gbwt::MILLION) << ")" << std::endl; - std::cerr << " -i, --id-interval N store path ids at one out of N positions (default " << gbwt::DynamicGBWT::SAMPLE_INTERVAL << ")" << std::endl; + std::cerr << " --buffer-size N GBWT construction buffer size in millions of nodes (default " << (gbwt::DynamicGBWT::INSERT_BATCH_SIZE / gbwt::MILLION) << ")" << std::endl; + std::cerr << " --id-interval N store path ids at one out of N positions (default " << gbwt::DynamicGBWT::SAMPLE_INTERVAL << ")" << std::endl; + std::cerr << std::endl; + std::cerr << "Search parameters (for -b and -r):" << std::endl; + std::cerr << " --num-threads N use N parallel search threads (default " << omp_get_max_threads() << ")" << std::endl; std::cerr << std::endl; std::cerr << "Step 1: GBWT construction (requires -o, -x, and one of { -v, -E, A }):" << std::endl; std::cerr << " -v, --vcf-input index the haplotypes in the VCF files specified in input args in parallel" << std::endl; @@ -111,9 +107,15 @@ void help_gbwt(char** argv) { std::cerr << " -A, --alignment-input index the alignments in the GAF files specified in input args" << std::endl; std::cerr << " --gam-format the input files are in GAM format instead of GAF format" << std::endl; std::cerr << std::endl; - std::cerr << "Step 2: Merge multiple input GBWTs (requires -o; use deps/gbwt/merge_gbwt for more options):" << std::endl; + std::cerr << "Step 2: Merge multiple input GBWTs (requires -o):" << std::endl; std::cerr << " -m, --merge use the insertion algorithm" << std::endl; std::cerr << " -f, --fast fast merging algorithm (node ids must not overlap)" << std::endl; + std::cerr << " -b, --parallel use the parallel algorithm" << std::endl; + std::cerr << " --chunk-size N search in chunks of N sequences (default " << gbwt::MergeParameters::CHUNK_SIZE << ")" << std::endl; + std::cerr << " --pos-buffer N use N MiB position buffers for each search thread (default " << gbwt::MergeParameters::POS_BUFFER_SIZE << ")" << std::endl; + std::cerr << " --thread-buffer N use N MiB thread buffers for each search thread (default " << gbwt::MergeParameters::THREAD_BUFFER_SIZE << ")" << std::endl; + std::cerr << " --merge-buffers N merge 2^N thread buffers into one file per merge job (default " << gbwt::MergeParameters::MERGE_BUFFERS << ")" << std::endl; + std::cerr << " --merge-jobs N run N parallel merge jobs (default " << default_merge_jobs() << ")" << std::endl; std::cerr << std::endl; std::cerr << "Step 3: Remove samples (requires -o and one input GBWT):" << std::endl; std::cerr << " -R, --remove-sample X remove the sample with name X from the index (may repeat)" << std::endl; @@ -130,9 +132,8 @@ void help_gbwt(char** argv) { std::cerr << std::endl; std::cerr << "Step 6: R-index construction (one input GBWT):" << std::endl; std::cerr << " -r, --r-index FILE build an r-index and store it in FILE" << std::endl; - std::cerr << " --num-threads N use N parallel search threads (default " << omp_get_max_threads() << ")" << std::endl; std::cerr << std::endl; - std::cerr << "Step 7: Metadata (one input GBWT; use deps/gbwt/metadata_tool to modify):" << std::endl; + std::cerr << "Step 7: Metadata (one input GBWT):" << std::endl; std::cerr << " -M, --metadata print basic metadata" << std::endl; std::cerr << " -C, --contigs print the number of contigs" << std::endl; std::cerr << " -H, --haplotypes print the number of haplotypes" << std::endl; @@ -155,6 +156,9 @@ int main_gbwt(int argc, char** argv) } // Long options with no corresponding short options. + constexpr int OPT_BUFFER_SIZE = 1000; + constexpr int OPT_ID_INTERVAL = 1001; + constexpr int OPT_NUM_THREADS = 1002; constexpr int OPT_PRESET = 1100; constexpr int OPT_NUM_JOBS = 1101; constexpr int OPT_INPUTS_AS_JOBS = 1102; @@ -171,10 +175,14 @@ int main_gbwt(int argc, char** argv) constexpr int OPT_EXCLUDE_SAMPLE = 1113; constexpr int OPT_PATHS_AS_SAMPLES = 1114; constexpr int OPT_GAM_FORMAT = 1115; - constexpr int OPT_NUM_THREADS = 1600; + constexpr int OPT_CHUNK_SIZE = 1200; + constexpr int OPT_POS_BUFFER = 1201; + constexpr int OPT_THREAD_BUFFER = 1202; + constexpr int OPT_MERGE_BUFFERS = 1203; + constexpr int OPT_MERGE_JOBS = 1204; // Requirements and modes. - bool produces_one_gbwt = false; + bool produces_one_gbwt = false; // Steps 1-4 eventually produce one input GBWT regardless of the number of input args. build_mode build = build_none; merge_mode merge = merge_none; path_cover_mode path_cover = path_cover_none; @@ -185,13 +193,17 @@ int main_gbwt(int argc, char** argv) bool gam_format = false, inputs_as_jobs = false, parse_only = false; size_t build_jobs = default_build_jobs(); + // Parallel merging. + gbwt::MergeParameters merge_parameters; + merge_parameters.setMergeJobs(default_merge_jobs()); + // Other parameters and flags. bool show_progress = false; bool count_threads = false; bool metadata = false, contigs = false, haplotypes = false, samples = false, list_names = false, thread_names = false; size_t num_paths = gbwtgraph::PATH_COVER_DEFAULT_N, context_length = gbwtgraph::PATH_COVER_DEFAULT_K; bool num_paths_set = false; - size_t r_index_threads = omp_get_max_threads(); + size_t search_threads = omp_get_max_threads(); // File/sample names. std::string gbwt_output, thread_output; @@ -211,8 +223,11 @@ int main_gbwt(int argc, char** argv) { "progress", no_argument, 0, 'p' }, // GBWT construction parameters - { "buffer-size", required_argument, 0, 'b' }, - { "id-interval", required_argument, 0, 'i' }, + { "buffer-size", required_argument, 0, OPT_BUFFER_SIZE }, + { "id-interval", required_argument, 0, OPT_ID_INTERVAL }, + + // Search parameters + { "num-threads", required_argument, 0, OPT_NUM_THREADS }, // Input GBWT construction { "vcf-input", no_argument, 0, 'v' }, @@ -238,6 +253,12 @@ int main_gbwt(int argc, char** argv) // Merging { "merge", no_argument, 0, 'm' }, { "fast", no_argument, 0, 'f' }, + { "parallel", no_argument, 0, 'b' }, + { "chunk-size", required_argument, 0, OPT_CHUNK_SIZE }, + { "pos-buffer", required_argument, 0, OPT_POS_BUFFER }, + { "thread-buffer", required_argument, 0, OPT_THREAD_BUFFER }, + { "merge-buffers", required_argument, 0, OPT_MERGE_BUFFERS }, + { "merge-jobs", required_argument, 0, OPT_MERGE_JOBS }, // Remove sample { "remove-sample", required_argument, 0, 'R' }, @@ -254,7 +275,6 @@ int main_gbwt(int argc, char** argv) // R-index { "r-index", required_argument, 0, 'r' }, - { "num-threads", required_argument, 0, OPT_NUM_THREADS }, // Metadata { "metadata", no_argument, 0, 'M' }, @@ -273,7 +293,7 @@ int main_gbwt(int argc, char** argv) }; int option_index = 0; - c = getopt_long(argc, argv, "x:o:d:pb:i:vEAmfR:alPn:k:g:r:MCHSLTce:h?", long_options, &option_index); + c = getopt_long(argc, argv, "x:o:d:pvEAmfbR:alPn:k:g:r:MCHSLTce:h?", long_options, &option_index); /* Detect the end of the options. */ if (c == -1) @@ -297,17 +317,23 @@ int main_gbwt(int argc, char** argv) break; // GBWT construction parameters - case 'b': + case OPT_BUFFER_SIZE: haplotype_indexer.gbwt_buffer_size = std::max(parse(optarg), 1ul); break; - case 'i': + case OPT_ID_INTERVAL: haplotype_indexer.id_interval = parse(optarg); break; + // Search parameters + case OPT_NUM_THREADS: + search_threads = std::max(parse(optarg), 1ul); + break; + // Input GBWT construction case 'v': assert(build == build_none); build = build_vcf; + produces_one_gbwt = true; break; case OPT_PRESET: use_preset(haplotype_indexer, optarg); @@ -411,6 +437,25 @@ int main_gbwt(int argc, char** argv) merge = merge_fast; produces_one_gbwt = true; break; + case 'b': + merge = merge_parallel; + produces_one_gbwt = true; + break; + case OPT_CHUNK_SIZE: + merge_parameters.setChunkSize(parse(optarg)); + break; + case OPT_POS_BUFFER: + merge_parameters.setPosBufferSize(parse(optarg)); + break; + case OPT_THREAD_BUFFER: + merge_parameters.setThreadBufferSize(parse(optarg)); + break; + case OPT_MERGE_BUFFERS: + merge_parameters.setMergeBuffers(parse(optarg)); + break; + case OPT_MERGE_JOBS: + merge_parameters.setMergeJobs(parse(optarg)); + break; // Remove sample case 'R': @@ -451,9 +496,6 @@ int main_gbwt(int argc, char** argv) case 'r': r_index_name = optarg; break; - case OPT_NUM_THREADS: - r_index_threads = parse(optarg); - break; // Metadata case 'M': @@ -541,7 +583,6 @@ int main_gbwt(int argc, char** argv) } } if (!to_remove.empty()) { - // Ugly hack: We cannot use produces_one_gbwt, because the GBWT may be produced in the next step. if (!(input_filenames.size() == 1 || merge != merge_none) || gbwt_output.empty()) { std::cerr << "error: [vg gbwt]: removing a sample requires one input GBWT and output GBWT" << std::endl; std::exit(EXIT_FAILURE); @@ -556,7 +597,7 @@ int main_gbwt(int argc, char** argv) std::cerr << "error: [vg gbwt]: greedy path cover does not use input GBWTs" << std::endl; std::exit(EXIT_FAILURE); } - if ((path_cover == path_cover_local || path_cover == path_cover_augment) && !(input_filenames.size() == 1 || produces_one_gbwt)) { + if ((path_cover == path_cover_local || path_cover == path_cover_augment) && !(input_filenames.size() == 1 || merge != merge_none)) { std::cerr << "error: [vg gbwt]: path cover options -a and -l require one input GBWT" << std::endl; std::exit(EXIT_FAILURE); } @@ -600,9 +641,9 @@ int main_gbwt(int argc, char** argv) gbwt::TempFile::setDirectory(temp_file::get_dir()); // This is the data we are using. - gbwt::GBWT compressed_index; - gbwt::DynamicGBWT dynamic_index; - index_type in_use = index_none; + GBWTHandler gbwts; + gbwts.filename = gbwt_name; + gbwts.show_progress = show_progress; std::unique_ptr input_graph; bool graph_in_use = false; @@ -613,6 +654,7 @@ int main_gbwt(int argc, char** argv) if (show_progress) { std::cerr << "Building input GBWTs" << std::endl; } + gbwts.unbacked(); // We will build a new GBWT. get_graph(input_graph, graph_in_use, xg_name, show_progress); if (build == build_vcf) { if (show_progress) { @@ -653,8 +695,8 @@ int main_gbwt(int argc, char** argv) #pragma omp parallel for schedule(dynamic, 1) for (size_t i = 0; i < vcf_parses.size(); i++) { std::string job_name = "Job " + std::to_string(i); - std::unique_ptr temp = haplotype_indexer.build_gbwt(vcf_parses[i], job_name); - use_or_save(temp, dynamic_index, in_use, gbwt_files, i, show_progress); + std::unique_ptr parsed = haplotype_indexer.build_gbwt(vcf_parses[i], job_name); + use_or_save(parsed, gbwts, gbwt_files, i, show_progress); } if (vcf_parses.size() > 1) { input_filenames = gbwt_files; // Use the temporary GBWTs as inputs. @@ -665,15 +707,13 @@ int main_gbwt(int argc, char** argv) std::cerr << "Input type: embedded paths" << std::endl; } std::unique_ptr temp = haplotype_indexer.build_gbwt(*input_graph); - dynamic_index = std::move(*temp); - in_use = index_dynamic; + gbwts.use(*temp); } else if (build == build_alignments) { if (show_progress) { std::cerr << "Input type: " << (gam_format ? "GAM" : "GAF") << std::endl; } std::unique_ptr temp = haplotype_indexer.build_gbwt(*input_graph, input_filenames, (gam_format ? "GAM" : "GAF")); - dynamic_index = std::move(*temp); - in_use = index_dynamic; + gbwts.use(*temp); } if (show_progress) { double seconds = gbwt::readTimer() - start; @@ -690,7 +730,14 @@ int main_gbwt(int argc, char** argv) if (merge != merge_none) { double start = gbwt::readTimer(); if (show_progress) { - std::string algo_name = (merge == merge_fast ? "fast" : "insertion"); + std::string algo_name; + if (merge == merge_fast) { + algo_name = "fast"; + } else if (merge == merge_insert) { + algo_name = "insertion"; + } else if (merge == merge_parallel) { + algo_name = "parallel"; + } std::cerr << "Merging " << input_filenames.size() << " input GBWTs (" << algo_name << " algorithm)" << std::endl; } if (merge == merge_fast) { @@ -701,27 +748,41 @@ int main_gbwt(int argc, char** argv) if (show_progress) { std::cerr << "Merging the GBWTs" << std::endl; } - compressed_index = gbwt::GBWT(indexes); - in_use = index_compressed; - } - else if (merge == merge_insert) { - load_gbwt(gbwt_name, dynamic_index, show_progress); - in_use = index_dynamic; + gbwt::GBWT merged(indexes); + gbwts.use(merged); + } else if (merge == merge_insert) { + gbwts.use_dynamic(); for (size_t i = 1; i < input_filenames.size(); i++) { gbwt::GBWT next; load_gbwt(input_filenames[i], next, show_progress); - if (next.size() > 2 * dynamic_index.size()) { + if (next.size() > 2 * gbwts.dynamic.size()) { + std::cerr << "warning: [vg gbwt] merging " << input_filenames[i] << " into a substantially smaller index" << std::endl; + std::cerr << "warning: [vg gbwt] merging would be faster in another order" << std::endl; + } + if (show_progress) { + std::cerr << "Inserting " << next.sequences() << " sequences of total length " << next.size() << std::endl; + } + gbwts.dynamic.merge(next); + } + } else if (merge == merge_parallel) { + gbwts.use_dynamic(); + omp_set_num_threads(search_threads); + for (size_t i = 1; i < input_filenames.size(); i++) { + gbwt::DynamicGBWT next; + load_gbwt(input_filenames[i], next, show_progress); + if (next.size() > 2 * gbwts.dynamic.size()) { std::cerr << "warning: [vg gbwt] merging " << input_filenames[i] << " into a substantially smaller index" << std::endl; std::cerr << "warning: [vg gbwt] merging would be faster in another order" << std::endl; } if (show_progress) { std::cerr << "Inserting " << next.sequences() << " sequences of total length " << next.size() << std::endl; } - dynamic_index.merge(next); + gbwts.dynamic.merge(next, merge_parameters); } } + gbwts.unbacked(); // We modified the GBWT. if (show_progress) { - print_metadata(std::cerr, compressed_index, dynamic_index, in_use); + print_metadata(std::cerr, gbwts); double seconds = gbwt::readTimer() - start; std::cerr << "GBWTs merged in " << seconds << " seconds, " << gbwt::inGigabytes(gbwt::memoryUsage()) << " GiB" << std::endl; std::cerr << std::endl; @@ -735,15 +796,15 @@ int main_gbwt(int argc, char** argv) if (show_progress) { std::cerr << "Removing " << to_remove.size() << " sample(s) from the index" << std::endl; } - get_dynamic(compressed_index, dynamic_index, in_use, gbwt_name, show_progress); - if (!(dynamic_index.hasMetadata() && dynamic_index.metadata.hasPathNames() && dynamic_index.metadata.hasSampleNames())) { + gbwts.use_dynamic(); + if (!(gbwts.dynamic.hasMetadata() && gbwts.dynamic.metadata.hasPathNames() && gbwts.dynamic.metadata.hasSampleNames())) { std::cerr << "error: [vg gbwt] the index does not contain metadata with thread and sample names" << std::endl; std::exit(EXIT_FAILURE); } std::set sample_ids; for (const std::string& sample_name : to_remove) { - gbwt::size_type sample_id = dynamic_index.metadata.sample(sample_name); - if (sample_id >= dynamic_index.metadata.samples()) { + gbwt::size_type sample_id = gbwts.dynamic.metadata.sample(sample_name); + if (sample_id >= gbwts.dynamic.metadata.samples()) { std::cerr << "warning: [vg gbwt] the index does not contain sample " << sample_name << std::endl; } else { sample_ids.insert(sample_id); @@ -751,7 +812,7 @@ int main_gbwt(int argc, char** argv) } std::vector path_ids; for (gbwt::size_type sample_id : sample_ids) { - std::vector current_paths = dynamic_index.metadata.removeSample(sample_id); + std::vector current_paths = gbwts.dynamic.metadata.removeSample(sample_id); path_ids.insert(path_ids.end(), current_paths.begin(), current_paths.end()); } if (path_ids.empty()) { @@ -760,8 +821,9 @@ int main_gbwt(int argc, char** argv) if (show_progress) { std::cerr << "Removing " << path_ids.size() << " threads" << std::endl; } - size_t foo = dynamic_index.remove(path_ids); + size_t foo = gbwts.dynamic.remove(path_ids); } + gbwts.unbacked(); // We modified the GBWT. if (show_progress) { double seconds = gbwt::readTimer() - start; std::cerr << "Samples removed in " << seconds << " seconds, " << gbwt::inGigabytes(gbwt::memoryUsage()) << " GiB" << std::endl; @@ -781,21 +843,23 @@ int main_gbwt(int argc, char** argv) if (show_progress) { std::cerr << "Algorithm: greedy" << std::endl; } - compressed_index = gbwtgraph::path_cover_gbwt(*input_graph, num_paths, context_length, haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, haplotype_indexer.id_interval, show_progress); - in_use = index_compressed; + gbwt::GBWT cover = gbwtgraph::path_cover_gbwt(*input_graph, num_paths, context_length, haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, haplotype_indexer.id_interval, show_progress); + gbwts.use(cover); } else if (path_cover == path_cover_augment) { if (show_progress) { std::cerr << "Algorithm: augment" << std::endl; } - get_dynamic(compressed_index, dynamic_index, in_use, gbwt_name, show_progress); - gbwtgraph::augment_gbwt(*input_graph, dynamic_index, num_paths, context_length, haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, haplotype_indexer.id_interval, show_progress); + gbwts.use_dynamic(); + gbwtgraph::augment_gbwt(*input_graph, gbwts.dynamic, num_paths, context_length, haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, haplotype_indexer.id_interval, show_progress); } else { if (show_progress) { std::cerr << "Algorithm: local haplotypes" << std::endl; } - get_compressed(compressed_index, dynamic_index, in_use, gbwt_name, show_progress); - compressed_index = gbwtgraph::local_haplotypes(*input_graph, compressed_index, num_paths, context_length, haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, haplotype_indexer.id_interval, show_progress); + gbwts.use_compressed(); + gbwt::GBWT cover = gbwtgraph::local_haplotypes(*input_graph, gbwts.compressed, num_paths, context_length, haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, haplotype_indexer.id_interval, show_progress); + gbwts.use(cover); } + gbwts.unbacked(); // We modified the GBWT. if (show_progress) { double seconds = gbwt::readTimer() - start; std::cerr << "Path cover built in " << seconds << " seconds, " << gbwt::inGigabytes(gbwt::memoryUsage()) << " GiB" << std::endl; @@ -805,16 +869,9 @@ int main_gbwt(int argc, char** argv) // Now we can serialize the GBWT. - if (!gbwt_output.empty() && in_use != index_none) { + if (!gbwt_output.empty()) { double start = gbwt::readTimer(); - if (show_progress) { - std::cerr << "Serializing the GBWT to " << gbwt_output << std::endl; - } - if (in_use == index_compressed) { - vg::io::VPKG::save(compressed_index, gbwt_output); - } else if (in_use == index_dynamic) { - vg::io::VPKG::save(dynamic_index, gbwt_output); - } + gbwts.serialize(gbwt_output); if (show_progress) { double seconds = gbwt::readTimer() - start; std::cerr << "GBWT serialized in " << seconds << " seconds, " << gbwt::inGigabytes(gbwt::memoryUsage()) << " GiB" << std::endl; @@ -830,11 +887,11 @@ int main_gbwt(int argc, char** argv) std::cerr << "Building GBWTGraph" << std::endl; } get_graph(input_graph, graph_in_use, xg_name, show_progress); - get_compressed(compressed_index, dynamic_index, in_use, gbwt_name, show_progress); + gbwts.use_compressed(); if (show_progress) { std::cerr << "Starting the construction" << std::endl; } - gbwtgraph::GBWTGraph graph(compressed_index, *input_graph); + gbwtgraph::GBWTGraph graph(gbwts.compressed, *input_graph); if (show_progress) { std::cerr << "Serializing GBWTGraph to " << graph_output << std::endl; } @@ -857,12 +914,12 @@ int main_gbwt(int argc, char** argv) if (show_progress) { std::cerr << "Building r-index" << std::endl; } - omp_set_num_threads(r_index_threads); - get_compressed(compressed_index, dynamic_index, in_use, gbwt_name, show_progress); + omp_set_num_threads(search_threads); + gbwts.use_compressed(); if (show_progress) { std::cerr << "Starting the construction" << std::endl; } - gbwt::FastLocate r_index(compressed_index); + gbwt::FastLocate r_index(gbwts.compressed); if (show_progress) { std::cerr << "Serializing the r-index to " << r_index_name << std::endl; } @@ -877,49 +934,49 @@ int main_gbwt(int argc, char** argv) // Metadata options. if (metadata_mode) { - get_compressed(compressed_index, dynamic_index, in_use, gbwt_name, show_progress); - if (!compressed_index.hasMetadata()) { + gbwts.use_compressed(); + if (!gbwts.compressed.hasMetadata()) { std::cerr << "error: [vg gbwt] the GBWT does not contain metadata" << std::endl; std::exit(EXIT_FAILURE); } if (metadata) { - print_metadata(std::cout, compressed_index, dynamic_index, in_use); + print_metadata(std::cout, gbwts); } if (contigs) { if (list_names) { - if (compressed_index.metadata.hasContigNames()) { - for (size_t i = 0; i < compressed_index.metadata.contigs(); i++) { - std::cout << compressed_index.metadata.contig(i) << std::endl; + if (gbwts.compressed.metadata.hasContigNames()) { + for (size_t i = 0; i < gbwts.compressed.metadata.contigs(); i++) { + std::cout << gbwts.compressed.metadata.contig(i) << std::endl; } } else { std::cerr << "error: [vg gbwt] the metadata does not contain contig names" << std::endl; std::exit(EXIT_FAILURE); } } else { - std::cout << compressed_index.metadata.contigs() << std::endl; + std::cout << gbwts.compressed.metadata.contigs() << std::endl; } } if (haplotypes) { - std::cout << compressed_index.metadata.haplotypes() << std::endl; + std::cout << gbwts.compressed.metadata.haplotypes() << std::endl; } if (samples) { if (list_names) { - if (compressed_index.metadata.hasSampleNames()) { - for (size_t i = 0; i < compressed_index.metadata.samples(); i++) { - std::cout << compressed_index.metadata.sample(i) << std::endl; + if (gbwts.compressed.metadata.hasSampleNames()) { + for (size_t i = 0; i < gbwts.compressed.metadata.samples(); i++) { + std::cout << gbwts.compressed.metadata.sample(i) << std::endl; } } else { std::cerr << "error: [vg gbwt] the metadata does not contain sample names" << std::endl; std::exit(EXIT_FAILURE); } } else { - std::cout << compressed_index.metadata.samples() << std::endl; + std::cout << gbwts.compressed.metadata.samples() << std::endl; } } if (thread_names) { - if (compressed_index.metadata.hasPathNames()) { - for (size_t i = 0; i < compressed_index.metadata.paths(); i++) { - std::cout << thread_name(compressed_index, i) << std::endl; + if (gbwts.compressed.metadata.hasPathNames()) { + for (size_t i = 0; i < gbwts.compressed.metadata.paths(); i++) { + std::cout << thread_name(gbwts.compressed, i) << std::endl; } } else { std::cerr << "error: [vg gbwt] the metadata does not contain thread names" << std::endl; @@ -935,14 +992,14 @@ int main_gbwt(int argc, char** argv) if (show_progress) { std::cerr << "Extracting threads to " << thread_output << std::endl; } - get_compressed(compressed_index, dynamic_index, in_use, gbwt_name, show_progress); + gbwts.use_compressed(); if (show_progress) { std::cerr << "Starting the extraction" << std::endl; } - gbwt::size_type node_width = gbwt::bit_length(compressed_index.sigma() - 1); + gbwt::size_type node_width = gbwt::bit_length(gbwts.compressed.sigma() - 1); gbwt::text_buffer_type out(thread_output, std::ios::out, gbwt::MEGABYTE, node_width); - for (gbwt::size_type id = 0; id < compressed_index.sequences(); id += 2) { // Ignore reverse complements. - gbwt::vector_type sequence = compressed_index.extract(id); + for (gbwt::size_type id = 0; id < gbwts.compressed.sequences(); id += 2) { // Ignore reverse complements. + gbwt::vector_type sequence = gbwts.compressed.extract(id); for (auto node : sequence) { out.push_back(node); } @@ -958,8 +1015,8 @@ int main_gbwt(int argc, char** argv) // There are two sequences for each thread. if (count_threads) { - get_compressed(compressed_index, dynamic_index, in_use, gbwt_name, show_progress); - std::cout << (compressed_index.sequences() / 2) << std::endl; + gbwts.use_compressed(); + std::cout << (gbwts.compressed.sequences() / 2) << std::endl; } } @@ -971,74 +1028,17 @@ int main_gbwt(int argc, char** argv) // Utility functions -void load_gbwt(const std::string& filename, gbwt::GBWT& index, bool show_progress) { - if (show_progress) { - std::cerr << "Loading " << filename << std::endl; - } - std::unique_ptr loaded = vg::io::VPKG::load_one(filename); - if (loaded.get() == nullptr) { - std::cerr << "error: [vg gbwt] could not load compressed GBWT " << filename << std::endl; - std::exit(EXIT_FAILURE); - } - index = std::move(*loaded); -} - -void load_gbwt(const std::string& filename, gbwt::DynamicGBWT& index, bool show_progress) { - if (show_progress) { - std::cerr << "Loading " << filename << std::endl; - } - std::unique_ptr loaded = vg::io::VPKG::load_one(filename); - if (loaded.get() == nullptr) { - std::cerr << "error: [vg gbwt] could not load dynamic GBWT " << filename << std::endl; - std::exit(EXIT_FAILURE); - } - index = std::move(*loaded); -} - -void get_compressed(gbwt::GBWT& compressed_index, gbwt::DynamicGBWT& dynamic_index, index_type& in_use, const std::string& filename, bool show_progress) { - if (in_use == index_compressed) { - return; - } else if (in_use == index_dynamic) { - if (show_progress) { - std::cerr << "Converting dynamic GBWT into compressed GBWT" << std::endl; - } - compressed_index = gbwt::GBWT(dynamic_index); - dynamic_index = gbwt::DynamicGBWT(); - in_use = index_compressed; - } else { - load_gbwt(filename, compressed_index, show_progress); - in_use = index_compressed; - } -} - -void get_dynamic(gbwt::GBWT& compressed_index, gbwt::DynamicGBWT& dynamic_index, index_type& in_use, const std::string& filename, bool show_progress) { - if (in_use == index_dynamic) { - return; - } else if (in_use == index_compressed) { - if (show_progress) { - std::cerr << "Converting compressed GBWT into dynamic GBWT" << std::endl; - } - dynamic_index = gbwt::DynamicGBWT(compressed_index); - compressed_index = gbwt::GBWT(); - in_use = index_dynamic; - } else { - load_gbwt(filename, dynamic_index, show_progress); - in_use = index_dynamic; - } -} - -void print_metadata(std::ostream& out, const gbwt::GBWT& compressed_index, const gbwt::DynamicGBWT& dynamic_index, index_type in_use) { - if (in_use == index_compressed) { - gbwt::operator<<(out, compressed_index.metadata) << std::endl; - } else if (in_use == index_dynamic) { - gbwt::operator<<(out, dynamic_index.metadata) << std::endl; +void print_metadata(std::ostream& out, const GBWTHandler& gbwts) { + if (gbwts.in_use == GBWTHandler::index_compressed) { + gbwt::operator<<(out, gbwts.compressed.metadata) << std::endl; + } else if (gbwts.in_use == GBWTHandler::index_dynamic) { + gbwt::operator<<(out, gbwts.dynamic.metadata) << std::endl; } } -void use_or_save(std::unique_ptr& index, gbwt::DynamicGBWT& dynamic_index, index_type& in_use, std::vector& filenames, size_t i, bool show_progress) { +void use_or_save(std::unique_ptr& index, GBWTHandler& gbwts, std::vector& filenames, size_t i, bool show_progress) { if (filenames.size() == 1) { - dynamic_index = std::move(*index); - in_use = index_dynamic; + gbwts.use(*index); } else { std::string temp = temp_file::create("gbwt-" + std::to_string(i) + "-"); if (show_progress) { @@ -1218,5 +1218,5 @@ void use_preset(HaplotypeIndexer& haplotype_indexer, std::string preset_name) { //---------------------------------------------------------------------------- // Register subcommand -static Subcommand vg_gbwt("gbwt", "Manipulate GBWTs", main_gbwt); +static vg::subcommand::Subcommand vg_gbwt("gbwt", "Manipulate GBWTs", main_gbwt); diff --git a/src/transcriptome.cpp b/src/transcriptome.cpp index 2730d99251e..93a14deada4 100644 --- a/src/transcriptome.cpp +++ b/src/transcriptome.cpp @@ -1657,7 +1657,7 @@ int32_t Transcriptome::add_transcripts_to_gbwt(gbwt::GBWTBuilder * gbwt_builder, gbwt_builder->insert(gbwt_thread, add_bidirectional); // Insert transcript path name into GBWT index. - gbwt_builder->index.metadata.addPath({static_cast(sample_names.size()), 0, 0, 0}); + gbwt_builder->index.metadata.addPath(sample_names.size(), 0, 0, 0); sample_names.emplace_back(transcript_path.name); } } diff --git a/test/t/37_vg_gbwt.t b/test/t/37_vg_gbwt.t index 330f27015e3..f8cab384b42 100644 --- a/test/t/37_vg_gbwt.t +++ b/test/t/37_vg_gbwt.t @@ -5,7 +5,7 @@ BASH_TAP_ROOT=../deps/bash-tap PATH=../bin:$PATH # for vg -plan tests 85 +plan tests 91 # Build vg graphs for two chromosomes @@ -98,6 +98,10 @@ vg gbwt -f -o xy.fast.gbwt x.gbwt y.gbwt is $? 0 "fast merging with multiple chromosomes" cmp xy.merge.gbwt xy.fast.gbwt is $? 0 "identical merging results with the insertion and fast algorithms" +vg gbwt -b --merge-jobs 1 -o xy.parallel.gbwt x.gbwt y.gbwt +is $? 0 "parallel merging with multiple chromosomes" +cmp xy.merge.gbwt xy.parallel.gbwt +is $? 0 "identical merging results with the insertion and parallel algorithms" vg gbwt -x xy-alt.xg -o xy.direct.gbwt -v small/xy2.vcf.gz is $? 0 "direct construction with multiple chromosomes and a single VCF" cmp xy.direct.gbwt xy.merge.gbwt @@ -117,7 +121,7 @@ is $(vg gbwt -C xy.merge.gbwt) 2 "multiple chromosomes: 2 contigs" is $(vg gbwt -H xy.merge.gbwt) 2 "multiple chromosomes: 2 haplotypes" is $(vg gbwt -S xy.merge.gbwt) 1 "multiple chromosomes: 1 sample" -rm -f x.gbwt y.gbwt xy.merge.gbwt xy.fast.gbwt xy.direct.gbwt xy.multi.gbwt +rm -f x.gbwt y.gbwt xy.merge.gbwt xy.fast.gbwt xy.parallel.gbwt xy.direct.gbwt xy.multi.gbwt rm -f xy.1000gp.gbwt @@ -183,6 +187,18 @@ is $? 0 "fast merging: empty + non-empty" cmp x.gbwt x2.gbwt is $? 0 "the index remains unchanged" +# Parallel merging, x + empty +vg gbwt -b --merge-jobs 1 -o x2.gbwt x.gbwt empty.gbwt +is $? 0 "parallel merging: non-empty + empty" +cmp x.gbwt x2.gbwt +is $? 0 "the index remains unchanged" + +# Parallel merging, empty + x; silence the warning about the merging order +vg gbwt -b --merge-jobs 1 -o x2.gbwt empty.gbwt x.gbwt 2> /dev/null +is $? 0 "parallel merging: empty + non-empty" +cmp x.gbwt x2.gbwt +is $? 0 "the index remains unchanged" + rm -f x.gbwt empty.gbwt x2.gbwt