diff --git a/CMakeLists.txt b/CMakeLists.txt index db6d02cd0..a94f011a6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,7 +56,6 @@ target_link_libraries(test_exe crypto z) - enable_testing() add_test(test_node test_exe --gtest_filter=NodeTest*) diff --git a/benchmark/BFS_BM.cpp b/benchmark/BFS_BM.cpp index 53190eb02..d854497bf 100644 --- a/benchmark/BFS_BM.cpp +++ b/benchmark/BFS_BM.cpp @@ -19,7 +19,7 @@ static void BFS_X(benchmark::State &state) auto &result = g.breadth_first_search(*(range_start->second->getNodePair().first)); } } -BENCHMARK(BFS_X)->RangeMultiplier(16)->Range((unsigned long)1, (unsigned long)1 << 16); +BENCHMARK(BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18); static void BFS_FromReadedCitHep(benchmark::State &state) { @@ -33,3 +33,63 @@ static void BFS_FromReadedCitHep(benchmark::State &state) BENCHMARK(BFS_FromReadedCitHep); +static void PSEUDO_CONCURRENCY_BFS_X(benchmark::State &state) +{ + CXXGRAPH::Graph g; + auto range_start = edges.begin(); + auto range_end = edges.find(state.range(0)); + std::unordered_map *> edgesX; + edgesX.insert(range_start, range_end); + for (auto e : edgesX) + { + g.addEdge(&(*e.second)); + } + for (auto _ : state) + { + auto &result = g.concurrency_breadth_first_search(*(range_start->second->getNodePair().first), 1); + } +} +BENCHMARK(PSEUDO_CONCURRENCY_BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18); + +static void PSEUDO_CONCURRENCY_BFS_FromReadedCitHep(benchmark::State &state) +{ + auto edgeSet = cit_graph_ptr->getEdgeSet(); + for (auto _ : state) + { + + auto &result = cit_graph_ptr->concurrency_breadth_first_search(*((*(edgeSet.begin()))->getNodePair().first), 1); + } +} + +BENCHMARK(PSEUDO_CONCURRENCY_BFS_FromReadedCitHep); + +static void CONCURRENCY_BFS_X(benchmark::State &state) +{ + CXXGRAPH::Graph g; + auto range_start = edges.begin(); + auto range_end = edges.find(state.range(0)); + std::unordered_map *> edgesX; + edgesX.insert(range_start, range_end); + for (auto e : edgesX) + { + g.addEdge(&(*e.second)); + } + for (auto _ : state) + { + auto &result = g.concurrency_breadth_first_search(*(range_start->second->getNodePair().first), 8); + } +} +BENCHMARK(CONCURRENCY_BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18); + +static void CONCURRENCY_BFS_FromReadedCitHep(benchmark::State &state) +{ + auto edgeSet = cit_graph_ptr->getEdgeSet(); + for (auto _ : state) + { + + auto &result = cit_graph_ptr->concurrency_breadth_first_search(*((*(edgeSet.begin()))->getNodePair().first), 8); + } +} + +BENCHMARK(CONCURRENCY_BFS_FromReadedCitHep); + diff --git a/include/Graph/Graph.hpp b/include/Graph/Graph.hpp index 13dc61260..081e5f381 100644 --- a/include/Graph/Graph.hpp +++ b/include/Graph/Graph.hpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -264,7 +265,19 @@ namespace CXXGRAPH * search. * */ - virtual const std::vector> breadth_first_search(const Node &start) const; + virtual const std::vector> breadth_first_search(const Node &start) const; + /** + * \brief + * The multithreaded version of breadth_first_search + * It turns out to be two indepentent functions because of implemntation differences + * + * @param start Node from where traversing starts + * @param num_threads number of threads + * @returns a vector of Node indicating which Node were visited during the + * search. + * + */ + virtual const std::vector> concurrency_breadth_first_search(const Node &start, size_t num_threads) const; /** * \brief * Function performs the depth first search algorithm over the graph @@ -1613,7 +1626,173 @@ namespace CXXGRAPH return visited; } + template + const std::vector> Graph::concurrency_breadth_first_search(const Node &start, size_t num_threads) const + { + std::vector> bfs_result; + // check is exist node in the graph + auto &nodeSet = Graph::getNodeSet(); + if (std::find(nodeSet.begin(), nodeSet.end(), &start) == nodeSet.end()) + { + return bfs_result; + } + + std::unordered_map *, int> node_to_index; + for (const auto &node : nodeSet) + { + node_to_index[node] = node_to_index.size(); + } + std::vector visited(nodeSet.size(), 0); + + // parameter limitations + if (num_threads <= 0) + { + std::cout << "Error: number of threads should be greater than 0" << std::endl; + num_threads = 2; + } + + const AdjacencyMatrix &adj = Graph::getAdjMatrix(); + // vector that stores vertices to be visit + std::vector *> level_tracker, next_level_tracker; + level_tracker.reserve(static_cast(1.0 *nodeSet.size())); + next_level_tracker.reserve(static_cast(1.0 * nodeSet.size())); + + // mark the starting node as visited + visited[node_to_index[&start]] = 1; + level_tracker.push_back(&start); + + // a worker is assigned a small part of tasks for each time + // assignments of tasks in current level and updates of tasks in next level are inclusive + std::mutex tracker_mutex; + std::mutex next_tracker_mutex; + std::atomic assigned_tasks = 0; + int num_tasks = 1; + // unit of task assignment, which mean assign block_size tasks to a worker each time + int block_size = 1; + int level = 1; + + auto extract_tasks = [&level_tracker, &tracker_mutex, &assigned_tasks, &num_tasks, &block_size] () -> std::pair + { + /* + std::lock_guard tracker_guard(tracker_mutex); + int task_block_size = std::min(num_tasks - assigned_tasks, block_size); + std::pair task_block{assigned_tasks, assigned_tasks + task_block_size}; + assigned_tasks += task_block_size; + return task_block; + */ + int start = assigned_tasks.fetch_add(block_size); + int end = std::min(num_tasks, start + block_size); + return {start, end}; + }; + + auto submit_result = [&next_level_tracker, &next_tracker_mutex] (std::vector *> &submission) -> void + { + std::lock_guard tracker_guard(next_tracker_mutex); + next_level_tracker.insert(std::end(next_level_tracker), std::begin(submission), std::end(submission)); + }; + + // worker thread sleep until it begin to search nodes of next level + std::mutex next_level_mutex; + std::condition_variable next_level_cond; + std::atomic waiting_workers = 0; + + auto bfs_worker = [&] () -> void + { + // algorithm is not done + while (!level_tracker.empty()) + { + // search for nodes in a level is not done + std::vector *> local_tracker; + while (1) + { + auto [start_index, end_index] = extract_tasks(); + if (start_index >= end_index) + { + break; + } + for (int i = start_index; i < end_index; ++i) + { + if (adj.count(level_tracker[i])) + { + for (const auto &elem : adj.at(level_tracker[i])) + { + int index = node_to_index[elem.first]; + if (visited[index] == 0) + { + visited[index] = 1; + local_tracker.push_back(elem.first); + } + } + } + } + } + + // submit local result to global result + if (!local_tracker.empty()) + { + submit_result(local_tracker); + } + + // last worker need to do preparation for the next iteration + int cur_level = level; + if (num_threads == 1 + waiting_workers.fetch_add(1)) + { + swap(level_tracker, next_level_tracker); + next_level_tracker.clear(); + + // adjust block_size according to number of nodes in next level + block_size = 4; + if (level_tracker.size() <= num_threads * 4) + { + block_size = std::max(1, static_cast(std::ceil(static_cast(level_tracker.size()) / num_threads))); + } + else if (level_tracker.size() >= num_threads * 64) + { + block_size = 16; + } + + num_tasks = level_tracker.size(); + waiting_workers = 0; + assigned_tasks = 0; + level = level + 1; + next_level_cond.notify_all(); + } + else + { + // not to wait if last worker reachs last statement before notify all or even further + std::unique_lock next_level_lock(next_level_mutex); + next_level_cond.wait(next_level_lock, + [&level, cur_level] () { return level != cur_level;}); + } + } + }; + + std::vector workers; + for (int i = 0; i < num_threads - 1; ++i) + { + workers.emplace_back(std::thread(bfs_worker)); + } + bfs_worker(); + + for (auto &worker : workers) + { + if (worker.joinable()) + { + worker.join(); + } + } + + for (const auto &visited_node : nodeSet) + { + if (visited[node_to_index[visited_node]] != 0) + { + bfs_result.push_back(*visited_node); + } + } + + return bfs_result; + } template const std::vector> Graph::depth_first_search(const Node &start) const { diff --git a/test/BFSTest.cpp b/test/BFSTest.cpp index 02b631b3d..872a32e07 100644 --- a/test/BFSTest.cpp +++ b/test/BFSTest.cpp @@ -1,5 +1,6 @@ #include "gtest/gtest.h" #include "CXXGraph.hpp" +#include TEST(BFSTest, test_1) { @@ -132,4 +133,201 @@ TEST(BFSTest, test_6) ASSERT_FALSE(std::find(res.begin(), res.end(), node3) != res.end()); ASSERT_FALSE(std::find(res.begin(), res.end(), node4) != res.end()); -} \ No newline at end of file +} + +// test cases of concurrency bfs +TEST(BFSTest, test_7) +{ + CXXGRAPH::Node node1("1", 1); + CXXGRAPH::Node node2("2", 2); + CXXGRAPH::Node node3("3", 3); + std::pair *, const CXXGRAPH::Node *> pairNode(&node1, &node2); + CXXGRAPH::DirectedWeightedEdge edge1(1, pairNode, 1); + CXXGRAPH::DirectedWeightedEdge edge2(2, node2, node3, 1); + CXXGRAPH::UndirectedWeightedEdge edge3(3, node1, node3, 6); + CXXGRAPH::T_EdgeSet edgeSet; + edgeSet.insert(&edge1); + edgeSet.insert(&edge2); + edgeSet.insert(&edge3); + CXXGRAPH::Graph graph(edgeSet); + std::vector> res = graph.concurrency_breadth_first_search(node1, 4); + ASSERT_EQ(res.size(), 3); + ASSERT_TRUE(std::find(res.begin(), res.end(), node1) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node2) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node3) != res.end()); + +} + +TEST(BFSTest, test_8) +{ + CXXGRAPH::Node node1("1", 1); + CXXGRAPH::Node node2("2", 2); + CXXGRAPH::Node node3("3", 3); + std::pair *, const CXXGRAPH::Node *> pairNode(&node1, &node2); + CXXGRAPH::DirectedWeightedEdge edge1(1, pairNode, 1); + CXXGRAPH::DirectedWeightedEdge edge2(2, node2, node3, 1); + CXXGRAPH::DirectedWeightedEdge edge3(3, node1, node3, 6); + CXXGRAPH::T_EdgeSet edgeSet; + edgeSet.insert(&edge1); + edgeSet.insert(&edge2); + edgeSet.insert(&edge3); + CXXGRAPH::Graph graph(edgeSet); + std::vector> res = graph.concurrency_breadth_first_search(node2, 4); + ASSERT_EQ(res.size(), 2); + ASSERT_FALSE(std::find(res.begin(), res.end(), node1) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node2) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node3) != res.end()); + +} + +TEST(BFSTest, test_9) +{ + CXXGRAPH::Node node1("1", 1); + CXXGRAPH::Node node2("2", 2); + CXXGRAPH::Node node3("3", 3); + std::pair *, const CXXGRAPH::Node *> pairNode(&node1, &node2); + CXXGRAPH::DirectedWeightedEdge edge1(1, pairNode, 1); + CXXGRAPH::DirectedWeightedEdge edge2(2, node2, node3, 1); + CXXGRAPH::UndirectedWeightedEdge edge3(3, node1, node3, 6); + CXXGRAPH::T_EdgeSet edgeSet; + edgeSet.insert(&edge1); + edgeSet.insert(&edge2); + edgeSet.insert(&edge3); + CXXGRAPH::Graph graph(edgeSet); + std::vector> res = graph.concurrency_breadth_first_search(node2, 4); + ASSERT_EQ(res.size(), 3); + ASSERT_TRUE(std::find(res.begin(), res.end(), node1) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node2) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node3) != res.end()); +} + +TEST(BFSTest, test_10) +{ + CXXGRAPH::Node node1("1", 1); + CXXGRAPH::Node node2("2", 2); + CXXGRAPH::Node node3("3", 3); + std::pair *, const CXXGRAPH::Node *> pairNode(&node1, &node2); + CXXGRAPH::DirectedWeightedEdge edge1(1, pairNode, 1); + CXXGRAPH::DirectedWeightedEdge edge2(2, node2, node3, 1); + CXXGRAPH::UndirectedWeightedEdge edge3(3, node1, node3, 6); + CXXGRAPH::T_EdgeSet edgeSet; + edgeSet.insert(&edge1); + edgeSet.insert(&edge2); + edgeSet.insert(&edge3); + CXXGRAPH::Graph graph(edgeSet); + std::vector> res = graph.concurrency_breadth_first_search(node3, 4); + ASSERT_EQ(res.size(), 3); + ASSERT_TRUE(std::find(res.begin(), res.end(), node1) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node2) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node3) != res.end()); + +} + +TEST(BFSTest, test_11) +{ + CXXGRAPH::Node node1("1", 1); + CXXGRAPH::Node node2("2", 2); + CXXGRAPH::Node node3("3", 3); + std::pair *, const CXXGRAPH::Node *> pairNode(&node1, &node2); + CXXGRAPH::DirectedWeightedEdge edge1(1, pairNode, 1); + CXXGRAPH::DirectedWeightedEdge edge2(2, node2, node3, 1); + CXXGRAPH::DirectedWeightedEdge edge3(3, node1, node3, 6); + CXXGRAPH::T_EdgeSet edgeSet; + edgeSet.insert(&edge1); + edgeSet.insert(&edge2); + edgeSet.insert(&edge3); + CXXGRAPH::Graph graph(edgeSet); + std::vector> res = graph.concurrency_breadth_first_search(node3, 4); + ASSERT_EQ(res.size(), 1); + ASSERT_FALSE(std::find(res.begin(), res.end(), node1) != res.end()); + ASSERT_FALSE(std::find(res.begin(), res.end(), node2) != res.end()); + ASSERT_TRUE(std::find(res.begin(), res.end(), node3) != res.end()); + +} + +TEST(BFSTest, test_12) +{ + CXXGRAPH::Node node1("1", 1); + CXXGRAPH::Node node2("2", 2); + CXXGRAPH::Node node3("3", 3); + CXXGRAPH::Node node4("4", 4); + CXXGRAPH::Node node5("5", 5); + CXXGRAPH::Node node6("6", 6); + CXXGRAPH::Node node7("7", 7); + CXXGRAPH::Node node8("8", 8); + std::pair *, const CXXGRAPH::Node *> pairNode(&node1, &node2); + CXXGRAPH::UndirectedWeightedEdge edge1(1, pairNode, 1); + CXXGRAPH::UndirectedWeightedEdge edge2(2, node2, node3, 1); + CXXGRAPH::UndirectedWeightedEdge edge3(3, node2, node6, 6); + CXXGRAPH::UndirectedWeightedEdge edge4(3, node3, node4, 6); + CXXGRAPH::UndirectedWeightedEdge edge5(3, node3, node5, 6); + CXXGRAPH::UndirectedWeightedEdge edge6(3, node6, node7, 6); + CXXGRAPH::UndirectedWeightedEdge edge7(3, node8, node6, 6); + CXXGRAPH::UndirectedWeightedEdge edge8(3, node8, node7, 6); + CXXGRAPH::T_EdgeSet edgeSet; + edgeSet.insert(&edge1); + edgeSet.insert(&edge2); + edgeSet.insert(&edge3); + edgeSet.insert(&edge4); + edgeSet.insert(&edge5); + edgeSet.insert(&edge6); + edgeSet.insert(&edge7); + edgeSet.insert(&edge8); + CXXGRAPH::Graph graph(edgeSet); + std::vector> res = graph.concurrency_breadth_first_search(node8, 4); + ASSERT_EQ(res.size(), 8); +} + +// this case is to verify result correction when number of threads more than 1 +TEST(BFSTest, test_13) +{ + unsigned int randSeed = (unsigned int)time(NULL); + srand(randSeed); + + int nodes_size = 60, edges_size = 2000; + std::vector *> nodes; + for (auto index = 0; index < nodes_size; index++) + { + int randomNumber = (rand_r(&randSeed) % nodes_size) + 1; + CXXGRAPH::Node *newNode = new CXXGRAPH::Node(std::to_string(index), randomNumber); + nodes.push_back(newNode); + } + + CXXGRAPH::T_EdgeSet edgeSet; + auto MaxValue = nodes.size(); + for (auto index = 0; index < edges_size; index++) + { + int randomNumber1 = (rand_r(&randSeed) % MaxValue); + int randomNumber2 = (rand_r(&randSeed) % MaxValue); + if (randomNumber1 != randomNumber2) + { + CXXGRAPH::UndirectedEdge *newEdge = new CXXGRAPH::UndirectedEdge(index, *(nodes.at(randomNumber1)), *(nodes.at(randomNumber2))); + edgeSet.insert(newEdge); + } + } + for (int i = 1; i < nodes.size(); i += 2) + { + CXXGRAPH::UndirectedEdge *newEdge = new CXXGRAPH::UndirectedEdge(edges_size + i + 1, *(nodes.at(0)), *(nodes.at(i))); + edgeSet.insert(newEdge); + } + CXXGRAPH::Graph graph(edgeSet); + + auto &result1 = graph.breadth_first_search(*(nodes[0])); + std::set> st1; + for (const auto &node : result1) + { + st1.emplace(node); + } + + auto &result2 = graph.concurrency_breadth_first_search(*(nodes[0]), 4); + ASSERT_EQ(result1.size(), result2.size()); + + for (const auto &node : result2) + { + ASSERT_TRUE(st1.count(node)); + if (!st1.count(node)) + { + std::cout << node; + } + } +}