diff --git a/src/algorithms/component_paths.cpp b/src/algorithms/component_paths.cpp index 890e7a5056b..810bed2e01c 100644 --- a/src/algorithms/component_paths.cpp +++ b/src/algorithms/component_paths.cpp @@ -3,10 +3,19 @@ */ #include +#include +#include +#include +#include +#include "../cluster.hpp" #include "component_paths.hpp" +#include "utility.hpp" #include "sdsl/bit_vectors.hpp" +#include "sdsl/int_vector.hpp" +//#define debug_component_paths +//#define debug_parallel_component_paths namespace vg { namespace algorithms { @@ -38,7 +47,7 @@ vector> component_paths(const PathHandleGraph& grap unordered_set& component_path_set = component_path_sets.back(); // init a BFS queue - std::queue queue; + deque queue; // function to call on each subsequent handle we navigate to function record_paths_and_enqueue = [&](const handle_t& here) { @@ -59,7 +68,7 @@ vector> component_paths(const PathHandleGraph& grap }); // and add it to the queue - queue.push(here); + queue.push_back(here); enqueued[graph.get_id(here) - min_id] = 1; } return true; @@ -71,7 +80,7 @@ vector> component_paths(const PathHandleGraph& grap // do the BFS traversal while (!queue.empty()) { handle_t handle = queue.front(); - queue.pop(); + queue.pop_front(); // traverse in both directions graph.follow_edges(handle, false, record_paths_and_enqueue); @@ -81,6 +90,385 @@ vector> component_paths(const PathHandleGraph& grap return component_path_sets; } + +template +void reallocate_atomic_int_vector(vector>*& vec1, vector>*& vec2) { + + if (!vec1) { + // the vector has already been reallocated + return; + } + + // allocate the second vector + vec2 = new vector>(vec1->size()); + + // TODO: these threads will actually be fighting for processor time + // with the spin-locks holding the main threads busy while they wait... + + // parallel copy in contiguous blocks + int thread_count = get_thread_count(); + static const int64_t block_size = 4096; + atomic next(0); + vector workers; + for (int i = 0; i < thread_count; ++i) { + workers.emplace_back([&]() { + while (true) { + int64_t begin = block_size * (next++); + if (begin >= vec2->size()) { + // we're past the end of the vector + break; + } + for (int64_t j = begin, n = min(begin + block_size, vec2->size()); j < n; ++j) { + (*vec2)[j].store((*vec1)[j].load()); + } + } + }); + } + + // barrier sync + for (auto& worker : workers) { + worker.join(); + } + + // free the first vector + delete vec1; + vec1 = nullptr; +}; + +vector> component_paths_parallel(const PathHandleGraph& graph) { + +#ifdef debug_parallel_component_paths + cerr << "computing component paths in parallel" << endl; +#endif + + // get all paths + vector paths; + paths.reserve(graph.get_path_count()); + graph.for_each_path_handle([&](const path_handle_t& path) { + paths.emplace_back(path); + }); + + // sort in descending order by step count + stable_sort(paths.begin(), paths.end(), [&](path_handle_t a, path_handle_t b) { + return graph.get_step_count(a) > graph.get_step_count(b); + }); + + int thread_count = get_thread_count(); + + // the number of threads that haven't exited + atomic threads_active(thread_count); + + // a system that lets one thread freeze the others at checkpoints while it does + // some job + atomic frozen(0); + atomic num_frozen(0); + + // checkpoint to wait if any other thread wants us to freeze + auto check_freeze = [&]() { + if (frozen.load()) { + ++num_frozen; + while (frozen.load()) { + // spin lock + } + --num_frozen; + } + }; + // wait until all other threads have reached a freeze check point + // and then execute a function + auto freeze_and_execute = [&](const function& exec) { + // continue trying to freeze until we actually get to be the thread + // the freezes the other threads + bool was_frozen = true; + while (was_frozen) { + was_frozen = frozen.fetch_or(1); + if (was_frozen) { + // we weren't the thread who switched the freeze on, freeze + // ourselves and wait + check_freeze(); + } + else { + while (num_frozen.load() < threads_active.load() - 1) { + // spin lock waiting for the other threads to reach a checkpoint + } + // execute the function + exec(); + // unfreeze the rest of the threads + frozen.fetch_and(0); + // leave the loop + } + } + }; + + // we'll be using the ID space as vector indices, calculat evalues we'll use for that + nid_t min_id = graph.min_node_id(); + nid_t id_range = graph.max_node_id() - min_id + 1; + + // we'll try to accomplish the job with the minimum int size possible to + // keep the memory use down + // note: this has to be done on the heap because the deleted copy assignment + // and copy construction for atomic ints means vectors can never be resized + vector>* id_vec_8 = new vector>(id_range); + vector>* id_vec_16 = nullptr; + vector>* id_vec_32 = nullptr; + // in parallel initialize with sentinels, which will be replaced by search IDs + static const size_t block_size = 4096; + atomic block_idx(0); + vector initializers; + for (int i = 0; i < thread_count; ++i) { + initializers.emplace_back([&]() { + while (true) { + size_t begin = block_size * (block_idx++); + if (begin >= id_vec_8->size()) { + break; + } + for (size_t j = begin, end = min(begin + block_size, id_vec_8->size()); j < end; ++j) { + (*id_vec_8)[j].store(0); + } + } + }); + } + // TODO: switch to unsigned ints and use 0 as the sentinel + + // barrier sync + for (auto& initializer : initializers) { + initializer.join(); + } + + // this value keeps track of which one of these we're actually using, taking + // the values 0, 1, or 2 + uint8_t which_vec = 0; + // the last search ID we can accommodate for each vector + const uint32_t max_search_id[3] = { + numeric_limits::max(), + numeric_limits::max(), + numeric_limits::max() + }; + + + // for each search ID, the other search IDs it encountered adjacent to its search + vector> neighbors(max_search_id[0] + 1); + // for each search ID, the path handles it fouund while traversing + vector> search_path_sets(max_search_id[0] + 1); + + // define accessors that hide the ugliness of checking which vector we're using: + + // perform atomic load on whichever vector we're currently using + auto load = [&](int64_t i) { + uint32_t loaded; + switch (which_vec) { + case 0: + loaded = (*id_vec_8)[i].load(); + break; + case 1: + loaded = (*id_vec_16)[i].load(); + break; + default: + loaded = (*id_vec_32)[i].load(); + break; + } + return loaded; + }; + // perform atomic compare-exchange on whichever vector we're currently using + auto compare_exchange = [&](int64_t i, uint32_t& expected, uint32_t desired) { + bool exchanged; + switch (which_vec) { + case 0: + { + uint8_t expected_8 = expected; + uint8_t desired_8 = desired; + exchanged = (*id_vec_8)[i].compare_exchange_strong(expected_8, desired_8); + if (!exchanged) { + expected = expected_8; + } + break; + } + case 1: + { + uint16_t expected_16 = expected; + uint16_t desired_16 = desired; + exchanged = (*id_vec_16)[i].compare_exchange_strong(expected_16, desired_16); + if (!exchanged) { + expected = expected_16; + } + break; + } + default: + { + exchanged = (*id_vec_32)[i].compare_exchange_strong(expected, desired); + break; + } + } + return exchanged; + }; + + + // to keep track of the index of the path we will use to seed a BFS next + atomic next_path(0); + // to keep track of the ID of the next seeded search + atomic next_search_id(1); + // initialize the swarm of workers + vector workers; + for (int i = 0; i < thread_count; ++i) { + workers.emplace_back([&,i]() { + while (true) { + + int64_t path_idx = next_path++; + + if (path_idx >= paths.size()) { + // all of the paths have been explored, we can exit + break; + } + +#ifdef debug_parallel_component_paths + cerr << ("worker " + to_string(i) + " got path idx " + to_string(path_idx) + ": " + graph.get_path_name(paths[path_idx]) + " with step count " + to_string(graph.get_step_count(paths[path_idx])) + "\n"); +#endif + + path_handle_t path = paths[path_idx]; + if (graph.get_step_count(path) == 0) { + // skip an empty path + check_freeze(); + continue; + } + + // seed a BFS search off of the first node in this path + handle_t seed = graph.get_handle_of_step(graph.path_begin(path)); + + if (load(graph.get_id(seed) - min_id) != 0) { + // another thread has already traversed over this node, no need + // to start a search here +#ifdef debug_parallel_component_paths + cerr << ("worker " + to_string(i) + " skipping seed " + to_string(graph.get_id(seed)) + ", which was previously visited by " + to_string(load(graph.get_id(seed) - min_id)) + "\n"); +#endif + check_freeze(); + continue; + } + + + + // we're going to initiate a BFS from the seed, assign a new search ID + uint32_t search_id = next_search_id++; + + // TODO: add finer-grain reallocations so that neighbors and + // search_path_sets don't need to get so large to guarantee that + // we don't index past them with a search ID + if (search_id > max_search_id[which_vec]) { + // we need to move up to the next int size in order to acommodate + // this search ID, so demand that the other threads stop writing so + // we can switch to a larger bit width + freeze_and_execute([&]() { + // check to make sure another thread didn't already move us over + // to the next vector while we were waiting to freeze + if (search_id <= max_search_id[which_vec]) { + return; + } + + ++which_vec; + neighbors.resize(max_search_id[which_vec] + 1); + search_path_sets.resize(max_search_id[which_vec] + 1); + if (which_vec == 1) { + reallocate_atomic_int_vector(id_vec_8, id_vec_16); + } + else if (which_vec == 2) { + reallocate_atomic_int_vector(id_vec_16, id_vec_32); + } + else { + cerr << "error: parallel component paths algorithm ran out of 32-bit search IDs\n"; + exit(1); + } + }); + } + +#ifdef debug_parallel_component_paths + cerr << ("worker " + to_string(i) + " starting search on seed " + to_string(graph.get_id(seed)) + " with search ID " + to_string(search_id) + "\n"); +#endif + + // FIFO queue for BFS + deque queue; + + // function to call on each subsequent handle we navigate to + function record_paths_and_enqueue = [&](const handle_t& here) { + int64_t idx = graph.get_id(here) - min_id; + uint32_t visit_id = 0; + bool exchanged = compare_exchange(idx, visit_id, search_id); + if (exchanged) { + // we found the unvisited sentinel and replaced it with our search ID + + // add the paths of the new node + graph.for_each_step_on_handle(here, [&](const step_handle_t& step) { + search_path_sets[search_id].insert(graph.get_path_handle_of_step(step)); + }); + + // and add it to the queue + queue.push_back(here); + } + else if (visit_id != search_id) { + // we are adjacent to nodes explored by a different search, record the + // neighbor + neighbors[search_id].insert(visit_id); + } + return true; + }; + + // init the queue + record_paths_and_enqueue(seed); + + while (!queue.empty()) { + // set a checkpoint in case a thread is trying to reallocate + check_freeze(); + + // de-queue a node + handle_t handle = queue.front(); + queue.pop_front(); + + // traverse in both directions + graph.follow_edges(handle, false, record_paths_and_enqueue); + graph.follow_edges(handle, true, record_paths_and_enqueue); + } + + } + // keep track of the fact this thread is exiting + --threads_active; +#ifdef debug_parallel_component_paths + cerr << ("worker " + to_string(i) + " exiting\n"); +#endif + }); + } + + // barrier sync + for (auto& worker : workers) { + worker.join(); + } + + // find equivalence classes of search IDs on the same component + size_t num_search_ids = next_search_id.load(); + structures::UnionFind union_find(num_search_ids, false); + for (size_t i = 0; i < num_search_ids; ++i) { + for (auto j : neighbors[i]) { + union_find.union_groups(i, j); + } + } + // agglomerate the sets of paths ecountered by all of the searches + // in each equivalence class + vector> return_val; + for (const auto& component_search_ids : union_find.all_groups()) { + bool added_new_set = false; + for (size_t i : component_search_ids) { + for (path_handle_t path : search_path_sets[i]) { + if (!added_new_set) { + return_val.emplace_back(); + added_new_set = true; + } + return_val.back().insert(path); + } + } + } + + delete id_vec_8; + delete id_vec_16; + delete id_vec_32; + + return return_val; +} } } diff --git a/src/algorithms/component_paths.hpp b/src/algorithms/component_paths.hpp index c64a6048ff1..68c276f5c92 100644 --- a/src/algorithms/component_paths.hpp +++ b/src/algorithms/component_paths.hpp @@ -14,8 +14,13 @@ namespace algorithms { using namespace std; +// returns sets of path handles, one set for each component (unless the +// component doesn't have any paths) vector> component_paths(const PathHandleGraph& graph); +// the same semantics as the previous, but multithreaded and more +// memory intensive +vector> component_paths_parallel(const PathHandleGraph& graph); } } diff --git a/src/subcommand/mpmap_main.cpp b/src/subcommand/mpmap_main.cpp index 520a42d58ca..802c7fa6a2a 100644 --- a/src/subcommand/mpmap_main.cpp +++ b/src/subcommand/mpmap_main.cpp @@ -1652,7 +1652,7 @@ int main_mpmap(int argc, char** argv) { if (!suppress_progress) { cerr << progress_boilerplate() << "Identifying reference paths." << endl; } - vector> component_path_sets = algorithms::component_paths(*path_position_handle_graph); + vector> component_path_sets = algorithms::component_paths_parallel(*path_position_handle_graph); for (const auto& path_set : component_path_sets) { // remove dependency on system hash ordering vector ordered_path_set(path_set.begin(), path_set.end()); diff --git a/src/unittest/component_paths.cpp b/src/unittest/component_paths.cpp new file mode 100644 index 00000000000..b3b6c4ba1d1 --- /dev/null +++ b/src/unittest/component_paths.cpp @@ -0,0 +1,141 @@ +#include "catch.hpp" +#include "../utility.hpp" +#include "../algorithms/component_paths.hpp" + +#include + +namespace vg { +namespace unittest { + +using namespace std; + +set> normalize(vector> result) { + set> return_val; + for (auto& comp_set : result) { + return_val.emplace(comp_set.begin(), comp_set.end()); + } + return return_val; +} + +TEST_CASE("Parallel component paths produces correct results", "[comppathset]") { + + + int thread_count_pre = get_thread_count(); + + SECTION("On a graph with several components") { + + bdsg::HashGraph graph; + + handle_t prev; + path_handle_t p1 = graph.create_path_handle("1"); + path_handle_t p2 = graph.create_path_handle("2"); + path_handle_t p3 = graph.create_path_handle("3"); + path_handle_t p4 = graph.create_path_handle("4"); + path_handle_t p5 = graph.create_path_handle("5"); + path_handle_t p6 = graph.create_path_handle("6"); + path_handle_t p7 = graph.create_path_handle("7"); + path_handle_t p8 = graph.create_path_handle("8"); + path_handle_t p9 = graph.create_path_handle("9"); + path_handle_t p10 = graph.create_path_handle("10"); + path_handle_t p11 = graph.create_path_handle("11"); + path_handle_t p12 = graph.create_path_handle("12"); + path_handle_t p13 = graph.create_path_handle("13"); + for (int i = 0; i < 256; ++i) { + handle_t h = graph.create_handle("A"); + if (i != 0 && + i != 64 && + i != 128 && + i != 192 && + i != 208 && + i != 224 && + i != 240) { + graph.create_edge(prev, h); + } + + // first component: separated by a spacer + if (i < 32) { + graph.append_step(p1, h); + } + if (i >= 48 && i < 64) { + graph.append_step(p2, h); + } + // second component: overlapping + if (i >= 64 && i < 100) { + graph.append_step(p3, h); + } + if (i >= 80 && i < 128) { + graph.append_step(p4, h); + } + // third component: large path with small overlapping paths + if (i >= 128 && i < 192) { + graph.append_step(p5, h); + } + if (i >= 128 && i < 140) { + graph.append_step(p6, h); + } + if (i >= 150 && i < 165) { + graph.append_step(p7, h); + } + if (i >= 160 && i < 180) { + graph.append_step(p8, h); + } + if (i >= 185 && i < 192) { + graph.append_step(p9, h); + } + // several smaller components with one path each + if (i >= 192 && i < 208) { + graph.append_step(p10, h); + } + if (i >= 208 && i < 224) { + graph.append_step(p11, h); + } + if (i >= 224 && i < 240) { + graph.append_step(p12, h); + } + if (i >= 240) { + graph.append_step(p13, h); + } + prev = h; + } + + auto serial_result = normalize(algorithms::component_paths(graph)); + + int repetitions = 10; + for (int num_threads : {1, 2, 4, 8, 16}) { + omp_set_num_threads(num_threads); + for (int i = 0; i < repetitions; ++i) { + auto parallel_result = normalize(algorithms::component_paths_parallel(graph)); + REQUIRE(parallel_result == serial_result); + } + } + } + + SECTION("On a graph with many small components") { + + bdsg::HashGraph graph; + + for (int i = 0; i < 4096; ++i) { + handle_t h = graph.create_handle("A"); + path_handle_t p = graph.create_path_handle(to_string(i)); + graph.append_step(p, h); + } + + auto serial_result = normalize(algorithms::component_paths(graph)); + + int repetitions = 10; + for (int num_threads : {1, 2, 4, 8, 16}) { + omp_set_num_threads(num_threads); + for (int i = 0; i < repetitions; ++i) { + auto parallel_result = normalize(algorithms::component_paths_parallel(graph)); + REQUIRE(parallel_result == serial_result); + } + } + } + + omp_set_num_threads(thread_count_pre); +} + +// TODO: test that will require second int vector + +} +}