From db481e6986678032388f1ffe503d435b980228f9 Mon Sep 17 00:00:00 2001 From: santeripuranen <26481940+santeripuranen@users.noreply.github.com> Date: Tue, 11 Feb 2020 14:14:48 +0200 Subject: [PATCH 1/4] Swap vector for boost::dynamic_bitset in order to curb memory consumption --- src/map_reads.cpp | 220 ++++++++++++++++++++++++---------------------- 1 file changed, 116 insertions(+), 104 deletions(-) diff --git a/src/map_reads.cpp b/src/map_reads.cpp index 6d6f4b7..b3fc1b8 100644 --- a/src/map_reads.cpp +++ b/src/map_reads.cpp @@ -24,20 +24,21 @@ ## ------------------------------------------------------------------------- ## Authors (alphabetically): Jacob L., Jaillard M., Lima L. -## Modified by John Lees +## Modified by John Lees and Santeri Puranen */ #include "global.h" #include "map_reads.hpp" #include "Utils.h" #include +#include #include #define NB_OF_READS_NOTIFICATION_MAP_AND_PHASE 10 //Nb of reads that the map and phase must process for notification using namespace std; void mapReadToTheGraphCore(const string &read, const Graph &graph, const vector< UnitigIdStrandPos > &nodeIdToUnitigId, - map &unitigIdToCount) { + boost::dynamic_bitset<>& unitigPattern ) { int lastUnitig=-1; //goes through all nodes/kmers of the read @@ -64,34 +65,36 @@ void mapReadToTheGraphCore(const string &read, const Graph &graph, const vector< //get the unitig localization of this kmer u_int64_t index = graph.nodeMPHFIndex(node); - UnitigIdStrandPos unitigIdStrandPos=nodeIdToUnitigId[index]; + const auto unitigId = nodeIdToUnitigId[index].unitigId; - if (lastUnitig != unitigIdStrandPos.unitigId) { - if (unitigIdToCount.find(unitigIdStrandPos.unitigId) == unitigIdToCount.end() ) - unitigIdToCount[unitigIdStrandPos.unitigId]=0; - unitigIdToCount[unitigIdStrandPos.unitigId]++; - lastUnitig = unitigIdStrandPos.unitigId; + if( lastUnitig != unitigId ) { + unitigPattern.set(unitigId); + lastUnitig = unitigId; } } } } - +/* void mapReadToTheGraph(const string &read, int readfileIndex, unsigned long readIndex, const Graph &graph, const vector< UnitigIdStrandPos > &nodeIdToUnitigId, map &unitigIdToCount) { //map the read mapReadToTheGraphCore(read, graph, nodeIdToUnitigId, unitigIdToCount); } - +*/ // We define a functor that will be cloned by the dispatcher struct MapAndPhase { - const vector &allReadFilesNames; + using bitmap_t = boost::dynamic_bitset<>; + using bitmap_container_t = vector< bitmap_t >; + + const vector &allReadFilesNames; const Graph& graph; - const string &outputFolder; - const string &tmpFolder; + //const string &outputFolder; + //const string &tmpFolder; uint64_t &nbOfReadsProcessed; ISynchronizer* synchro; + vector& allUnitigPatterns; vector< UnitigIdStrandPos > &nodeIdToUnitigId; int nbContigs; @@ -106,8 +109,8 @@ struct MapAndPhase synchro->lock (); nbOfReadsProcessed+=NB_OF_READS_NOTIFICATION_MAP_AND_PHASE; - cerr << '\r' << nbOfReadsProcessed << " reads processed."; - cerr.flush(); + cout << '\r' << nbOfReadsProcessed << " reads processed."; + cout.flush(); // We unlock the synchronizer synchro->unlock (); @@ -115,11 +118,12 @@ struct MapAndPhase }; MapAndPhase (const vector &allReadFilesNames, const Graph& graph, - const string &outputFolder, const string &tmpFolder, uint64_t &nbOfReadsProcessed, ISynchronizer* synchro, - vector< UnitigIdStrandPos > &nodeIdToUnitigId, int nbContigs) : - allReadFilesNames(allReadFilesNames), graph(graph), outputFolder(outputFolder), tmpFolder(tmpFolder), - nbOfReadsProcessed(nbOfReadsProcessed), synchro(synchro), nodeIdToUnitigId(nodeIdToUnitigId), - nbContigs(nbContigs){} + uint64_t &nbOfReadsProcessed, ISynchronizer* synchro, + bitmap_container_t &allUnitigPatterns, + vector< UnitigIdStrandPos > &nodeIdToUnitigId, int nbContigs) : + allReadFilesNames(allReadFilesNames), graph(graph), + nbOfReadsProcessed(nbOfReadsProcessed), synchro(synchro), + allUnitigPatterns(allUnitigPatterns), nodeIdToUnitigId(nodeIdToUnitigId), nbContigs(nbContigs){} void operator()(int i) { // We declare an input Bank and use it locally @@ -130,13 +134,10 @@ struct MapAndPhase MapAndPhaseIteratorListener* mapAndPhaseIteratorListener = new MapAndPhaseIteratorListener(nbOfReadsProcessed, synchro); SubjectIterator it(inputBank->iterator(), NB_OF_READS_NOTIFICATION_MAP_AND_PHASE, mapAndPhaseIteratorListener); - //XU_strain_i = how many times each unitig map to a strain - ofstream mappingOutputFile; - openFileForWriting(tmpFolder+string("/XU_strain_")+to_string(i), mappingOutputFile); - // We loop over sequences. - unsigned long readIndex = 0; - map unitigIdToCount; //TODO: change this to a vector + auto& unitigPattern = allUnitigPatterns[i]; + unitigPattern.resize(nbContigs); + for (it.first(); !it.isDone(); it.next()) { string read = (it.item()).toString(); //transform the read to upper case @@ -144,20 +145,8 @@ struct MapAndPhase read[j]=toupper(read[j]); //map this read to the graph - mapReadToTheGraph(read, i, readIndex, graph, nodeIdToUnitigId, unitigIdToCount); - - readIndex++; + mapReadToTheGraphCore(read, graph, nodeIdToUnitigId, unitigPattern); // unitigIdToCount); } - - //output info for mapping - the number of times the unitig appear in the strain - for (int i=0;i, vector > getUnitigsWithSamePattern (const vector< vector > &XU, int nbContigs) { - map< vector, vector > pattern2Unitigs; - - for (int i=0;i unitigs; - unitigs.push_back(i); - - //insert this pattern and his new set to the map - pattern2Unitigs.insert(make_pair(XU[i], unitigs)); - } else { - //pattern of unitig i is already in pattern2Unitigs, just add - pattern2Unitigs[XU[i]].push_back(i); +std::vector< boost::dynamic_bitset<> > +transposeXU( std::vector< boost::dynamic_bitset<> > &XUT ) +{ + using bitmap_t = boost::dynamic_bitset<>; + using bitmap_container_t = std::vector< bitmap_t >; + + const std::size_t nsamples( XUT.size() ); + const std::size_t ncontigs = XUT[0].size(); + bitmap_container_t XU; XU.resize(ncontigs); + for( auto& row: XU ) { row.resize(nsamples); } + + // transpose the input matrix; this is a fairly expensive operation + // due to the horrible memory access patterns involved here + for( auto i=0; i< XUT.size(); ++i ) + { + auto& bitset = XUT[i]; + auto pos = bitset.find_first(); + while( pos != bitmap_t::npos ) + { + XU[pos].set(i); + pos = bitset.find_next(pos); } - } + // release memory + bitset.swap(bitmap_t()); // apparently the swap trick is still the most reliable way to accomplish this // bitset.resize(0); bitset.shrink_to_fit(); + } + return XU; +} - return pattern2Unitigs; +//pattern is the unitig line +map< boost::dynamic_bitset<>, vector > getUnitigsWithSamePattern (const vector< boost::dynamic_bitset<> > &XU) { + using bitmap_t = boost::dynamic_bitset<>; + using mapping_t = map< bitmap_t, vector >; + + // storage for unique patterns linked to all parent unitigs. + mapping_t pattern2Unitigs; + + for( std::size_t i=0; i > &XU) { - ofstream XUFile; +void generate_XU(const string &filename, const string &nodesFile, const vector< boost::dynamic_bitset<> > &XU) { + using bitmap_t = boost::dynamic_bitset<>; + ofstream XUFile; openFileForWriting(filename, XUFile); //open file with list of node sequences @@ -197,16 +210,19 @@ void generate_XU(const string &filename, const string &nodesFile, const vector< int id; string seq; - for (int i=0;i> id >> seq; XUFile << seq << " |"; - //print the strains present - for (int j=0;j 0) { - XUFile << " " << (*strains)[j].id << ":" << XU[i][j]; - } + // print the strains present + // by finding all the set bits + auto pos = XUi.find_first(); + while( pos != bitmap_t::npos ) + { + XUFile << " " << (*strains)[pos].id << ":1"; + pos = XUi.find_next(pos); + } XUFile << endl; } nodesFileReader.close(); @@ -214,14 +230,13 @@ void generate_XU(const string &filename, const string &nodesFile, const vector< } void generate_unique_id_to_original_ids(const string &filename, - const map< vector, vector > &pattern2Unitigs) { + const map< boost::dynamic_bitset<>, vector > &pattern2Unitigs) { ofstream uniqueIdToOriginalIdsFile; openFileForWriting(filename, uniqueIdToOriginalIdsFile); //for each pattern int i=0; - auto it=pattern2Unitigs.begin(); - for (;it!=pattern2Unitigs.end();++it, ++i) { + for( auto it=pattern2Unitigs.begin(); it!=pattern2Unitigs.end(); ++it, ++i ) { //print the id of this pattern uniqueIdToOriginalIdsFile << i << " = "; @@ -234,8 +249,8 @@ void generate_unique_id_to_original_ids(const string &filename, uniqueIdToOriginalIdsFile.close(); } -void generate_XU_unique(const string &filename, const vector< vector > &XU, - const map< vector, vector > &pattern2Unitigs){ +void generate_XU_unique(const string &filename, const vector< boost::dynamic_bitset<> > &XU, + const map< boost::dynamic_bitset<>, vector > &pattern2Unitigs){ ofstream XUUnique; openFileForWriting(filename, XUUnique); @@ -247,14 +262,14 @@ void generate_XU_unique(const string &filename, const vector< vector > &XU, //for each pattern int i=0; - auto it=pattern2Unitigs.begin(); - for (;it!=pattern2Unitigs.end();++it, ++i) { + for( auto it=pattern2Unitigs.begin(); it!=pattern2Unitigs.end(); ++it, ++i ) { //print the id of this pattern XUUnique << i; - //print the pattern - for (const auto &v : it->first) - XUUnique << " " << v; + //print the pattern; will produce a *massive* file + const auto& pattern = it->first; + for( std::size_t i=0; i > &XU, //generate the pyseer input void generatePyseerInput (const vector &allReadFilesNames, - const string &outputFolder, const string &tmpFolder, + const string &outputFolder, + const vector< boost::dynamic_bitset<> >& XU, int nbContigs) { //Generate the XU (the pyseer input - the unitigs are rows with strains present) //XU_unique is XU is in matrix form (for Rtab input) with the duplicated rows removed - cerr << endl << endl << "[Generating pyseer input]..." << endl; - - //Create XU - vector< vector > XU(nbContigs); - for (auto & v : XU) - v.resize(allReadFilesNames.size()); - - //populate XU - for (int j=0; j> XU[i][j]; - inputFile.close(); - } - //create the files for pyseer { generate_XU(outputFolder+string("/unitigs.txt"), outputFolder+string("/graph.nodes"), XU); - map< vector, vector > pattern2Unitigs = getUnitigsWithSamePattern(XU, nbContigs); + auto pattern2Unitigs = getUnitigsWithSamePattern(XU); generate_unique_id_to_original_ids(outputFolder+string("/unitigs.unique_rows_to_all_rows.txt"), pattern2Unitigs); generate_XU_unique(outputFolder+string("/unitigs.unique_rows.Rtab"), XU, pattern2Unitigs); } - - cerr << "[Generating pyseer input] - Done!" << endl; - - } void map_reads::execute () { - //get the parameters + using bitmap_container_t = typename MapAndPhase::bitmap_container_t; + + //get the parameters string outputFolder = stripLastSlashIfExists(getInput()->getStr(STR_OUTPUT)); string tmpFolder = outputFolder+string("/tmp"); string longReadsFile = tmpFolder+string("/readsFile"); @@ -312,6 +310,9 @@ void map_reads::execute () //get all the read files' name vector allReadFilesNames = getVectorStringFromFile(longReadsFile); + // use bitmaps/bitsets in order to curb memory use + bitmap_container_t allUnitigPatterns; allUnitigPatterns.resize(allReadFilesNames.size()); + // We create an iterator over an integer range Range::Iterator allReadFilesNamesIt(0, allReadFilesNames.size() - 1); @@ -321,19 +322,30 @@ void map_reads::execute () // We create a dispatcher configured for 'nbCores' cores. Dispatcher dispatcher(nbCores, 1); - cerr << "[Starting mapping process... ]" << endl; - cerr << "Using " << nbCores << " cores to map " << allReadFilesNames.size() << " read files." << endl; + cout << "[Starting mapping process... ]" << endl; + cout << "Using " << nbCores << " cores to map " << allReadFilesNames.size() << " read files." << endl; // We iterate the range. NOTE: we could also use lambda expression (easing the code readability) uint64_t nbOfReadsProcessed = 0; dispatcher.iterate(allReadFilesNamesIt, - MapAndPhase(allReadFilesNames, *graph, outputFolder, tmpFolder, nbOfReadsProcessed, synchro, - *nodeIdToUnitigId, nbContigs)); + MapAndPhase(allReadFilesNames, *graph, nbOfReadsProcessed, synchro, + allUnitigPatterns, *nodeIdToUnitigId, nbContigs)); + + cout << endl << "[Mapping process finished!]" << endl; - cerr << endl << "[Mapping process finished!]" << endl; + // allUnitigPatterns has all samples/strains over the first dimension and + // unitig presense patterns over the second dimension (in bitsets). + // Here we transpose the matrix, while consuming it in order to gradually reduce memory footprint. + // For larger data sets this pattern accounting will dominate our emory footprint; overall memory consumption will peak here. + // Peak memory use occurs at the start and will be twice the matrix size (= 2 * (nbContigs*strains->size()/8) bytes). + cout << "[Transpose pattern matrix..]" << endl; + auto XU = transposeXU( allUnitigPatterns ); // this will consume allUnitigPatterns while transposing + allUnitigPatterns.swap(bitmap_container_t()); // release memory //generate the pyseer input - generatePyseerInput(allReadFilesNames, outputFolder, tmpFolder, nbContigs); + cout << "[Generating pyseer input]..." << endl; + generatePyseerInput(allReadFilesNames, outputFolder, XU, nbContigs); + cout << "[Generating pyseer input] - Done!" << endl; cout << "Number of unique patterns: " << getNbLinesInFile(outputFolder+string("/unitigs.unique_rows.Rtab")) << endl; From db2f88255f0caa445a2978d954d4c178ad7d1e02 Mon Sep 17 00:00:00 2001 From: santeripuranen <26481940+santeripuranen@users.noreply.github.com> Date: Tue, 11 Feb 2020 15:09:54 +0200 Subject: [PATCH 2/4] Fixed memory release syntax --- src/map_reads.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/map_reads.cpp b/src/map_reads.cpp index b3fc1b8..0e078b0 100644 --- a/src/map_reads.cpp +++ b/src/map_reads.cpp @@ -178,7 +178,7 @@ transposeXU( std::vector< boost::dynamic_bitset<> > &XUT ) pos = bitset.find_next(pos); } // release memory - bitset.swap(bitmap_t()); // apparently the swap trick is still the most reliable way to accomplish this // bitset.resize(0); bitset.shrink_to_fit(); + bitset.resize(0); bitmap_t(bitset).swap(bitset); } return XU; } @@ -340,7 +340,7 @@ void map_reads::execute () // Peak memory use occurs at the start and will be twice the matrix size (= 2 * (nbContigs*strains->size()/8) bytes). cout << "[Transpose pattern matrix..]" << endl; auto XU = transposeXU( allUnitigPatterns ); // this will consume allUnitigPatterns while transposing - allUnitigPatterns.swap(bitmap_container_t()); // release memory + allUnitigPatterns.resize(0); bitmap_container_t(allUnitigPatterns).swap(allUnitigPatterns); // release memory //generate the pyseer input cout << "[Generating pyseer input]..." << endl; From 6c3081662aacbe6a4e7ab9fdd078dbbc4ba4dd08 Mon Sep 17 00:00:00 2001 From: santeripuranen <26481940+santeripuranen@users.noreply.github.com> Date: Fri, 14 Feb 2020 00:02:32 +0200 Subject: [PATCH 3/4] Add option to gzip unitig output --- CMakeLists.txt | 4 +-- src/global.cpp | 2 ++ src/global.h | 1 + src/map_reads.cpp | 70 +++++++++++++++++++++++++++++++---------------- 4 files changed, 51 insertions(+), 26 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c144336..7472d86 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -68,7 +68,7 @@ include (GatbCore) # find Boost set(BOOST_ROOT /usr/bin/include) set(Boost_USE_STATIC_LIBS ON) -FIND_PACKAGE(Boost 1.69.0 COMPONENTS program_options filesystem system regex REQUIRED) +FIND_PACKAGE(Boost 1.69.0 COMPONENTS program_options filesystem system regex iostreams REQUIRED) include_directories(${Boost_INCLUDE_DIR}) # cdbg-ops target @@ -102,7 +102,7 @@ set (PROGRAM_SOURCE_DIR ${PROJECT_SOURCE_DIR}/src) include_directories (${PROGRAM_SOURCE_DIR}) file (GLOB_RECURSE ProjectFiles ${PROGRAM_SOURCE_DIR}/*.cpp) add_executable(${PROGRAM} ${ProjectFiles}) -target_link_libraries(${PROGRAM} ${gatb-core-libraries} ${Boost_LIBRARIES} -static-libgcc -static-libstdc++) +target_link_libraries(${PROGRAM} ${gatb-core-libraries} ${Boost_LIBRARIES} -lz -static-libgcc -static-libstdc++) ################################################################################ # INSTALLATION diff --git a/src/global.cpp b/src/global.cpp index 1939c72..4ee5e48 100644 --- a/src/global.cpp +++ b/src/global.cpp @@ -36,6 +36,7 @@ const char* STR_STRAINS_FILE = "-strains"; const char* STR_KSKMER_SIZE = "-k"; const char* STR_OUTPUT = "-output"; const char* STR_NBCORES = "-nb-cores"; +const char* STR_GZIP = "-gzip"; //global vars used by both programs Graph *graph; @@ -47,4 +48,5 @@ void populateParser (Tool *tool) { tool->getParser()->push_front (new OptionOneParam (STR_OUTPUT, "Path to the folder where the final and temporary files will be stored.", false, "output")); tool->getParser()->push_front (new OptionOneParam (STR_KSKMER_SIZE, "K-mer size.", false, "31")); tool->getParser()->push_front (new OptionOneParam (STR_STRAINS_FILE, "A text file describing the strains containing 2 columns: 1) ID of the strain; 2) Path to a multi-fasta file containing the sequences of the strain. This file needs a header.", true)); + tool->getParser()->push_front (new OptionNoParam (STR_GZIP, "Compress unitig output using gzip.", false)); } diff --git a/src/global.h b/src/global.h index f0a7f2f..575d4a7 100644 --- a/src/global.h +++ b/src/global.h @@ -40,6 +40,7 @@ extern const char* STR_STRAINS_FILE; extern const char* STR_KSKMER_SIZE; extern const char* STR_OUTPUT; extern const char* STR_NBCORES; +extern const char* STR_GZIP; void populateParser (Tool *tool); diff --git a/src/map_reads.cpp b/src/map_reads.cpp index 0e078b0..c07bd28 100644 --- a/src/map_reads.cpp +++ b/src/map_reads.cpp @@ -32,10 +32,14 @@ #include "Utils.h" #include #include +#include +#include +#include #include #define NB_OF_READS_NOTIFICATION_MAP_AND_PHASE 10 //Nb of reads that the map and phase must process for notification using namespace std; +namespace io = boost::iostreams; void mapReadToTheGraphCore(const string &read, const Graph &graph, const vector< UnitigIdStrandPos > &nodeIdToUnitigId, boost::dynamic_bitset<>& unitigPattern ) { @@ -43,7 +47,7 @@ void mapReadToTheGraphCore(const string &read, const Graph &graph, const vector< //goes through all nodes/kmers of the read if (read.size() >= graph.getKmerSize()) { - for (int i = 0; i < read.size() - graph.getKmerSize() + 1; i++) { + for (std::size_t i = 0; i < read.size() - graph.getKmerSize() + 1; i++) { string LRKmer = string(read.c_str() + i, graph.getKmerSize()); //TODO: From my tests, if you use buildNode with a Kmer containing an 'N', it will build a node with all Ns replaced by G @@ -74,13 +78,6 @@ void mapReadToTheGraphCore(const string &read, const Graph &graph, const vector< } } } -/* -void mapReadToTheGraph(const string &read, int readfileIndex, unsigned long readIndex, const Graph &graph, - const vector< UnitigIdStrandPos > &nodeIdToUnitigId, map &unitigIdToCount) { - //map the read - mapReadToTheGraphCore(read, graph, nodeIdToUnitigId, unitigIdToCount); -} -*/ // We define a functor that will be cloned by the dispatcher struct MapAndPhase @@ -141,7 +138,7 @@ struct MapAndPhase for (it.first(); !it.isDone(); it.next()) { string read = (it.item()).toString(); //transform the read to upper case - for (int j=0;j > &XUT ) // transpose the input matrix; this is a fairly expensive operation // due to the horrible memory access patterns involved here - for( auto i=0; i< XUT.size(); ++i ) + for( std::size_t i=0; i, vector > getUnitigsWithSamePattern (const vec return pattern2Unitigs; } -void generate_XU(const string &filename, const string &nodesFile, const vector< boost::dynamic_bitset<> > &XU) { +bool init_sink( const std::string& filename, io::filtering_ostream& os, bool compress=false ) +{ + if( compress ) { + os.push( io::gzip_compressor() ); + os.push( io::file_sink(filename+".gz",std::ios::binary) ); + } + else { + os.push( io::file_sink(filename) ); + } + + return os.is_complete(); +} + +void generate_XU(const string &filename, const string &nodesFile, const vector< boost::dynamic_bitset<> > &XU, bool compress=false ) { using bitmap_t = boost::dynamic_bitset<>; - ofstream XUFile; - openFileForWriting(filename, XUFile); + //ofstream XUFile; + //openFileForWriting(filename, XUFile); + io::filtering_ostream XUFile; + if( !init_sink( filename, XUFile, compress ) ) { + cerr << "Unknown error when trying to open file \"" << filename << "\" for output!" << endl; + return; + } //open file with list of node sequences ifstream nodesFileReader; @@ -226,7 +241,7 @@ void generate_XU(const string &filename, const string &nodesFile, const vector< XUFile << endl; } nodesFileReader.close(); - XUFile.close(); + //XUFile.close(); // filtering_ostream's destructor does this for us } void generate_unique_id_to_original_ids(const string &filename, @@ -250,9 +265,14 @@ void generate_unique_id_to_original_ids(const string &filename, } void generate_XU_unique(const string &filename, const vector< boost::dynamic_bitset<> > &XU, - const map< boost::dynamic_bitset<>, vector > &pattern2Unitigs){ - ofstream XUUnique; - openFileForWriting(filename, XUUnique); + const map< boost::dynamic_bitset<>, vector > &pattern2Unitigs, bool compress=false ){ + //ofstream XUUnique; + //openFileForWriting(filename, XUUnique); + io::filtering_ostream XUUnique; + if( !init_sink( filename, XUUnique, compress ) ) { + cerr << "Unknown error when trying to open file \"" << filename << "\" for output!" << endl; + return; + } //print the header XUUnique << "pattern_id"; @@ -272,22 +292,23 @@ void generate_XU_unique(const string &filename, const vector< boost::dynamic_bit XUUnique << " " << pattern[i]; XUUnique << endl; } - XUUnique.close(); + //XUUnique.close(); // filtering_ostream's destructor does this for us } //generate the pyseer input void generatePyseerInput (const vector &allReadFilesNames, const string &outputFolder, const vector< boost::dynamic_bitset<> >& XU, - int nbContigs) { + int nbContigs, bool compress=false ) { //Generate the XU (the pyseer input - the unitigs are rows with strains present) //XU_unique is XU is in matrix form (for Rtab input) with the duplicated rows removed //create the files for pyseer { - generate_XU(outputFolder+string("/unitigs.txt"), outputFolder+string("/graph.nodes"), XU); + generate_XU(outputFolder+string("/unitigs.txt"), outputFolder+string("/graph.nodes"), XU, compress ); auto pattern2Unitigs = getUnitigsWithSamePattern(XU); + cout << "Number of unique patterns: " << pattern2Unitigs.size() << endl; generate_unique_id_to_original_ids(outputFolder+string("/unitigs.unique_rows_to_all_rows.txt"), pattern2Unitigs); - generate_XU_unique(outputFolder+string("/unitigs.unique_rows.Rtab"), XU, pattern2Unitigs); + generate_XU_unique(outputFolder+string("/unitigs.unique_rows.Rtab"), XU, pattern2Unitigs, compress ); } } @@ -300,6 +321,7 @@ void map_reads::execute () string tmpFolder = outputFolder+string("/tmp"); string longReadsFile = tmpFolder+string("/readsFile"); int nbCores = getInput()->getInt(STR_NBCORES); + const bool compress = getInput()->get(STR_GZIP); //get the nbContigs int nbContigs = getNbLinesInFile(outputFolder+string("/graph.nodes")); @@ -336,7 +358,7 @@ void map_reads::execute () // allUnitigPatterns has all samples/strains over the first dimension and // unitig presense patterns over the second dimension (in bitsets). // Here we transpose the matrix, while consuming it in order to gradually reduce memory footprint. - // For larger data sets this pattern accounting will dominate our emory footprint; overall memory consumption will peak here. + // For larger data sets this pattern accounting will dominate our memory footprint; overall memory consumption will peak here. // Peak memory use occurs at the start and will be twice the matrix size (= 2 * (nbContigs*strains->size()/8) bytes). cout << "[Transpose pattern matrix..]" << endl; auto XU = transposeXU( allUnitigPatterns ); // this will consume allUnitigPatterns while transposing @@ -344,10 +366,10 @@ void map_reads::execute () //generate the pyseer input cout << "[Generating pyseer input]..." << endl; - generatePyseerInput(allReadFilesNames, outputFolder, XU, nbContigs); + generatePyseerInput(allReadFilesNames, outputFolder, XU, nbContigs, compress); cout << "[Generating pyseer input] - Done!" << endl; - cout << "Number of unique patterns: " << getNbLinesInFile(outputFolder+string("/unitigs.unique_rows.Rtab")) << endl; + //cout << "Number of unique patterns: " << getNbLinesInFile(outputFolder+string("/unitigs.unique_rows.Rtab")) << endl; // Remove the global graph pointer, otherwise its destructor is called // twice by GATB (after main) giving a HDF5 error From b734d56a5c5a54f8b5c02caf508b6c6c3c64dc93 Mon Sep 17 00:00:00 2001 From: John Lees Date: Sat, 15 Feb 2020 15:24:44 +0000 Subject: [PATCH 4/4] Revert "Reduce memory footprint" --- src/map_reads.cpp | 220 ++++++++++++++++++++++------------------------ 1 file changed, 104 insertions(+), 116 deletions(-) diff --git a/src/map_reads.cpp b/src/map_reads.cpp index 0e078b0..6d6f4b7 100644 --- a/src/map_reads.cpp +++ b/src/map_reads.cpp @@ -24,21 +24,20 @@ ## ------------------------------------------------------------------------- ## Authors (alphabetically): Jacob L., Jaillard M., Lima L. -## Modified by John Lees and Santeri Puranen +## Modified by John Lees */ #include "global.h" #include "map_reads.hpp" #include "Utils.h" #include -#include #include #define NB_OF_READS_NOTIFICATION_MAP_AND_PHASE 10 //Nb of reads that the map and phase must process for notification using namespace std; void mapReadToTheGraphCore(const string &read, const Graph &graph, const vector< UnitigIdStrandPos > &nodeIdToUnitigId, - boost::dynamic_bitset<>& unitigPattern ) { + map &unitigIdToCount) { int lastUnitig=-1; //goes through all nodes/kmers of the read @@ -65,36 +64,34 @@ void mapReadToTheGraphCore(const string &read, const Graph &graph, const vector< //get the unitig localization of this kmer u_int64_t index = graph.nodeMPHFIndex(node); - const auto unitigId = nodeIdToUnitigId[index].unitigId; + UnitigIdStrandPos unitigIdStrandPos=nodeIdToUnitigId[index]; - if( lastUnitig != unitigId ) { - unitigPattern.set(unitigId); - lastUnitig = unitigId; + if (lastUnitig != unitigIdStrandPos.unitigId) { + if (unitigIdToCount.find(unitigIdStrandPos.unitigId) == unitigIdToCount.end() ) + unitigIdToCount[unitigIdStrandPos.unitigId]=0; + unitigIdToCount[unitigIdStrandPos.unitigId]++; + lastUnitig = unitigIdStrandPos.unitigId; } } } } -/* + void mapReadToTheGraph(const string &read, int readfileIndex, unsigned long readIndex, const Graph &graph, const vector< UnitigIdStrandPos > &nodeIdToUnitigId, map &unitigIdToCount) { //map the read mapReadToTheGraphCore(read, graph, nodeIdToUnitigId, unitigIdToCount); } -*/ + // We define a functor that will be cloned by the dispatcher struct MapAndPhase { - using bitmap_t = boost::dynamic_bitset<>; - using bitmap_container_t = vector< bitmap_t >; - - const vector &allReadFilesNames; + const vector &allReadFilesNames; const Graph& graph; - //const string &outputFolder; - //const string &tmpFolder; + const string &outputFolder; + const string &tmpFolder; uint64_t &nbOfReadsProcessed; ISynchronizer* synchro; - vector& allUnitigPatterns; vector< UnitigIdStrandPos > &nodeIdToUnitigId; int nbContigs; @@ -109,8 +106,8 @@ struct MapAndPhase synchro->lock (); nbOfReadsProcessed+=NB_OF_READS_NOTIFICATION_MAP_AND_PHASE; - cout << '\r' << nbOfReadsProcessed << " reads processed."; - cout.flush(); + cerr << '\r' << nbOfReadsProcessed << " reads processed."; + cerr.flush(); // We unlock the synchronizer synchro->unlock (); @@ -118,12 +115,11 @@ struct MapAndPhase }; MapAndPhase (const vector &allReadFilesNames, const Graph& graph, - uint64_t &nbOfReadsProcessed, ISynchronizer* synchro, - bitmap_container_t &allUnitigPatterns, - vector< UnitigIdStrandPos > &nodeIdToUnitigId, int nbContigs) : - allReadFilesNames(allReadFilesNames), graph(graph), - nbOfReadsProcessed(nbOfReadsProcessed), synchro(synchro), - allUnitigPatterns(allUnitigPatterns), nodeIdToUnitigId(nodeIdToUnitigId), nbContigs(nbContigs){} + const string &outputFolder, const string &tmpFolder, uint64_t &nbOfReadsProcessed, ISynchronizer* synchro, + vector< UnitigIdStrandPos > &nodeIdToUnitigId, int nbContigs) : + allReadFilesNames(allReadFilesNames), graph(graph), outputFolder(outputFolder), tmpFolder(tmpFolder), + nbOfReadsProcessed(nbOfReadsProcessed), synchro(synchro), nodeIdToUnitigId(nodeIdToUnitigId), + nbContigs(nbContigs){} void operator()(int i) { // We declare an input Bank and use it locally @@ -134,10 +130,13 @@ struct MapAndPhase MapAndPhaseIteratorListener* mapAndPhaseIteratorListener = new MapAndPhaseIteratorListener(nbOfReadsProcessed, synchro); SubjectIterator it(inputBank->iterator(), NB_OF_READS_NOTIFICATION_MAP_AND_PHASE, mapAndPhaseIteratorListener); - // We loop over sequences. - auto& unitigPattern = allUnitigPatterns[i]; - unitigPattern.resize(nbContigs); + //XU_strain_i = how many times each unitig map to a strain + ofstream mappingOutputFile; + openFileForWriting(tmpFolder+string("/XU_strain_")+to_string(i), mappingOutputFile); + // We loop over sequences. + unsigned long readIndex = 0; + map unitigIdToCount; //TODO: change this to a vector for (it.first(); !it.isDone(); it.next()) { string read = (it.item()).toString(); //transform the read to upper case @@ -145,8 +144,20 @@ struct MapAndPhase read[j]=toupper(read[j]); //map this read to the graph - mapReadToTheGraphCore(read, graph, nodeIdToUnitigId, unitigPattern); // unitigIdToCount); + mapReadToTheGraph(read, i, readIndex, graph, nodeIdToUnitigId, unitigIdToCount); + + readIndex++; } + + //output info for mapping - the number of times the unitig appear in the strain + for (int i=0;i > -transposeXU( std::vector< boost::dynamic_bitset<> > &XUT ) -{ - using bitmap_t = boost::dynamic_bitset<>; - using bitmap_container_t = std::vector< bitmap_t >; - - const std::size_t nsamples( XUT.size() ); - const std::size_t ncontigs = XUT[0].size(); - bitmap_container_t XU; XU.resize(ncontigs); - for( auto& row: XU ) { row.resize(nsamples); } - - // transpose the input matrix; this is a fairly expensive operation - // due to the horrible memory access patterns involved here - for( auto i=0; i< XUT.size(); ++i ) - { - auto& bitset = XUT[i]; - auto pos = bitset.find_first(); - while( pos != bitmap_t::npos ) - { - XU[pos].set(i); - pos = bitset.find_next(pos); - } - // release memory - bitset.resize(0); bitmap_t(bitset).swap(bitset); - } - return XU; -} - //pattern is the unitig line -map< boost::dynamic_bitset<>, vector > getUnitigsWithSamePattern (const vector< boost::dynamic_bitset<> > &XU) { - using bitmap_t = boost::dynamic_bitset<>; - using mapping_t = map< bitmap_t, vector >; - - // storage for unique patterns linked to all parent unitigs. - mapping_t pattern2Unitigs; - - for( std::size_t i=0; i, vector > getUnitigsWithSamePattern (const vector< vector > &XU, int nbContigs) { + map< vector, vector > pattern2Unitigs; + + for (int i=0;i unitigs; + unitigs.push_back(i); + + //insert this pattern and his new set to the map + pattern2Unitigs.insert(make_pair(XU[i], unitigs)); + } else { + //pattern of unitig i is already in pattern2Unitigs, just add + pattern2Unitigs[XU[i]].push_back(i); + } + } - return pattern2Unitigs; + return pattern2Unitigs; } -void generate_XU(const string &filename, const string &nodesFile, const vector< boost::dynamic_bitset<> > &XU) { - using bitmap_t = boost::dynamic_bitset<>; - ofstream XUFile; +void generate_XU(const string &filename, const string &nodesFile, const vector< vector > &XU) { + ofstream XUFile; openFileForWriting(filename, XUFile); //open file with list of node sequences @@ -210,19 +197,16 @@ void generate_XU(const string &filename, const string &nodesFile, const vector< int id; string seq; - for( auto& XUi: XU ) { - // print the unitig sequence + for (int i=0;i> id >> seq; XUFile << seq << " |"; - // print the strains present - // by finding all the set bits - auto pos = XUi.find_first(); - while( pos != bitmap_t::npos ) - { - XUFile << " " << (*strains)[pos].id << ":1"; - pos = XUi.find_next(pos); - } + //print the strains present + for (int j=0;j 0) { + XUFile << " " << (*strains)[j].id << ":" << XU[i][j]; + } XUFile << endl; } nodesFileReader.close(); @@ -230,13 +214,14 @@ void generate_XU(const string &filename, const string &nodesFile, const vector< } void generate_unique_id_to_original_ids(const string &filename, - const map< boost::dynamic_bitset<>, vector > &pattern2Unitigs) { + const map< vector, vector > &pattern2Unitigs) { ofstream uniqueIdToOriginalIdsFile; openFileForWriting(filename, uniqueIdToOriginalIdsFile); //for each pattern int i=0; - for( auto it=pattern2Unitigs.begin(); it!=pattern2Unitigs.end(); ++it, ++i ) { + auto it=pattern2Unitigs.begin(); + for (;it!=pattern2Unitigs.end();++it, ++i) { //print the id of this pattern uniqueIdToOriginalIdsFile << i << " = "; @@ -249,8 +234,8 @@ void generate_unique_id_to_original_ids(const string &filename, uniqueIdToOriginalIdsFile.close(); } -void generate_XU_unique(const string &filename, const vector< boost::dynamic_bitset<> > &XU, - const map< boost::dynamic_bitset<>, vector > &pattern2Unitigs){ +void generate_XU_unique(const string &filename, const vector< vector > &XU, + const map< vector, vector > &pattern2Unitigs){ ofstream XUUnique; openFileForWriting(filename, XUUnique); @@ -262,14 +247,14 @@ void generate_XU_unique(const string &filename, const vector< boost::dynamic_bit //for each pattern int i=0; - for( auto it=pattern2Unitigs.begin(); it!=pattern2Unitigs.end(); ++it, ++i ) { + auto it=pattern2Unitigs.begin(); + for (;it!=pattern2Unitigs.end();++it, ++i) { //print the id of this pattern XUUnique << i; - //print the pattern; will produce a *massive* file - const auto& pattern = it->first; - for( std::size_t i=0; ifirst) + XUUnique << " " << v; XUUnique << endl; } XUUnique.close(); @@ -277,25 +262,42 @@ void generate_XU_unique(const string &filename, const vector< boost::dynamic_bit //generate the pyseer input void generatePyseerInput (const vector &allReadFilesNames, - const string &outputFolder, - const vector< boost::dynamic_bitset<> >& XU, + const string &outputFolder, const string &tmpFolder, int nbContigs) { //Generate the XU (the pyseer input - the unitigs are rows with strains present) //XU_unique is XU is in matrix form (for Rtab input) with the duplicated rows removed + cerr << endl << endl << "[Generating pyseer input]..." << endl; + + //Create XU + vector< vector > XU(nbContigs); + for (auto & v : XU) + v.resize(allReadFilesNames.size()); + + //populate XU + for (int j=0; j> XU[i][j]; + inputFile.close(); + } + //create the files for pyseer { generate_XU(outputFolder+string("/unitigs.txt"), outputFolder+string("/graph.nodes"), XU); - auto pattern2Unitigs = getUnitigsWithSamePattern(XU); + map< vector, vector > pattern2Unitigs = getUnitigsWithSamePattern(XU, nbContigs); generate_unique_id_to_original_ids(outputFolder+string("/unitigs.unique_rows_to_all_rows.txt"), pattern2Unitigs); generate_XU_unique(outputFolder+string("/unitigs.unique_rows.Rtab"), XU, pattern2Unitigs); } + + cerr << "[Generating pyseer input] - Done!" << endl; + + } void map_reads::execute () { - using bitmap_container_t = typename MapAndPhase::bitmap_container_t; - - //get the parameters + //get the parameters string outputFolder = stripLastSlashIfExists(getInput()->getStr(STR_OUTPUT)); string tmpFolder = outputFolder+string("/tmp"); string longReadsFile = tmpFolder+string("/readsFile"); @@ -310,9 +312,6 @@ void map_reads::execute () //get all the read files' name vector allReadFilesNames = getVectorStringFromFile(longReadsFile); - // use bitmaps/bitsets in order to curb memory use - bitmap_container_t allUnitigPatterns; allUnitigPatterns.resize(allReadFilesNames.size()); - // We create an iterator over an integer range Range::Iterator allReadFilesNamesIt(0, allReadFilesNames.size() - 1); @@ -322,30 +321,19 @@ void map_reads::execute () // We create a dispatcher configured for 'nbCores' cores. Dispatcher dispatcher(nbCores, 1); - cout << "[Starting mapping process... ]" << endl; - cout << "Using " << nbCores << " cores to map " << allReadFilesNames.size() << " read files." << endl; + cerr << "[Starting mapping process... ]" << endl; + cerr << "Using " << nbCores << " cores to map " << allReadFilesNames.size() << " read files." << endl; // We iterate the range. NOTE: we could also use lambda expression (easing the code readability) uint64_t nbOfReadsProcessed = 0; dispatcher.iterate(allReadFilesNamesIt, - MapAndPhase(allReadFilesNames, *graph, nbOfReadsProcessed, synchro, - allUnitigPatterns, *nodeIdToUnitigId, nbContigs)); - - cout << endl << "[Mapping process finished!]" << endl; + MapAndPhase(allReadFilesNames, *graph, outputFolder, tmpFolder, nbOfReadsProcessed, synchro, + *nodeIdToUnitigId, nbContigs)); - // allUnitigPatterns has all samples/strains over the first dimension and - // unitig presense patterns over the second dimension (in bitsets). - // Here we transpose the matrix, while consuming it in order to gradually reduce memory footprint. - // For larger data sets this pattern accounting will dominate our emory footprint; overall memory consumption will peak here. - // Peak memory use occurs at the start and will be twice the matrix size (= 2 * (nbContigs*strains->size()/8) bytes). - cout << "[Transpose pattern matrix..]" << endl; - auto XU = transposeXU( allUnitigPatterns ); // this will consume allUnitigPatterns while transposing - allUnitigPatterns.resize(0); bitmap_container_t(allUnitigPatterns).swap(allUnitigPatterns); // release memory + cerr << endl << "[Mapping process finished!]" << endl; //generate the pyseer input - cout << "[Generating pyseer input]..." << endl; - generatePyseerInput(allReadFilesNames, outputFolder, XU, nbContigs); - cout << "[Generating pyseer input] - Done!" << endl; + generatePyseerInput(allReadFilesNames, outputFolder, tmpFolder, nbContigs); cout << "Number of unique patterns: " << getNbLinesInFile(outputFolder+string("/unitigs.unique_rows.Rtab")) << endl;