Skip to content

Commit

Permalink
* implement multi-thread bfs (#252)
Browse files Browse the repository at this point in the history
* add testcases for multi-thread bfs(in BFSTest.cpp)
* add benchmark for multi-thread bfs(in BFS_BM.cpp)

Co-authored-by: suncanghuai <[email protected]>
  • Loading branch information
suncanghuai and suncanghuai authored Dec 6, 2022
1 parent a00bb4a commit 5afdc18
Show file tree
Hide file tree
Showing 4 changed files with 440 additions and 4 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ target_link_libraries(test_exe
crypto
z)


enable_testing()

add_test(test_node test_exe --gtest_filter=NodeTest*)
Expand Down
62 changes: 61 additions & 1 deletion benchmark/BFS_BM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static void BFS_X(benchmark::State &state)
auto &result = g.breadth_first_search(*(range_start->second->getNodePair().first));
}
}
BENCHMARK(BFS_X)->RangeMultiplier(16)->Range((unsigned long)1, (unsigned long)1 << 16);
BENCHMARK(BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18);

static void BFS_FromReadedCitHep(benchmark::State &state)
{
Expand All @@ -33,3 +33,63 @@ static void BFS_FromReadedCitHep(benchmark::State &state)

BENCHMARK(BFS_FromReadedCitHep);

static void PSEUDO_CONCURRENCY_BFS_X(benchmark::State &state)
{
CXXGRAPH::Graph<int> g;
auto range_start = edges.begin();
auto range_end = edges.find(state.range(0));
std::unordered_map<unsigned long, CXXGRAPH::Edge<int> *> edgesX;
edgesX.insert(range_start, range_end);
for (auto e : edgesX)
{
g.addEdge(&(*e.second));
}
for (auto _ : state)
{
auto &result = g.concurrency_breadth_first_search(*(range_start->second->getNodePair().first), 1);
}
}
BENCHMARK(PSEUDO_CONCURRENCY_BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18);

static void PSEUDO_CONCURRENCY_BFS_FromReadedCitHep(benchmark::State &state)
{
auto edgeSet = cit_graph_ptr->getEdgeSet();
for (auto _ : state)
{

auto &result = cit_graph_ptr->concurrency_breadth_first_search(*((*(edgeSet.begin()))->getNodePair().first), 1);
}
}

BENCHMARK(PSEUDO_CONCURRENCY_BFS_FromReadedCitHep);

static void CONCURRENCY_BFS_X(benchmark::State &state)
{
CXXGRAPH::Graph<int> g;
auto range_start = edges.begin();
auto range_end = edges.find(state.range(0));
std::unordered_map<unsigned long, CXXGRAPH::Edge<int> *> edgesX;
edgesX.insert(range_start, range_end);
for (auto e : edgesX)
{
g.addEdge(&(*e.second));
}
for (auto _ : state)
{
auto &result = g.concurrency_breadth_first_search(*(range_start->second->getNodePair().first), 8);
}
}
BENCHMARK(CONCURRENCY_BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18);

static void CONCURRENCY_BFS_FromReadedCitHep(benchmark::State &state)
{
auto edgeSet = cit_graph_ptr->getEdgeSet();
for (auto _ : state)
{

auto &result = cit_graph_ptr->concurrency_breadth_first_search(*((*(edgeSet.begin()))->getNodePair().first), 8);
}
}

BENCHMARK(CONCURRENCY_BFS_FromReadedCitHep);

181 changes: 180 additions & 1 deletion include/Graph/Graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <fstream>
#include <limits.h>
#include <mutex>
#include <condition_variable>
#include <set>
#include <atomic>
#include <thread>
Expand Down Expand Up @@ -264,7 +265,19 @@ namespace CXXGRAPH
* search.
*
*/
virtual const std::vector<Node<T>> breadth_first_search(const Node<T> &start) const;
virtual const std::vector<Node<T>> breadth_first_search(const Node<T> &start) const;
/**
* \brief
* The multithreaded version of breadth_first_search
* It turns out to be two indepentent functions because of implemntation differences
*
* @param start Node from where traversing starts
* @param num_threads number of threads
* @returns a vector of Node indicating which Node were visited during the
* search.
*
*/
virtual const std::vector<Node<T>> concurrency_breadth_first_search(const Node<T> &start, size_t num_threads) const;
/**
* \brief
* Function performs the depth first search algorithm over the graph
Expand Down Expand Up @@ -1613,7 +1626,173 @@ namespace CXXGRAPH

return visited;
}
template <typename T>
const std::vector<Node<T>> Graph<T>::concurrency_breadth_first_search(const Node<T> &start, size_t num_threads) const
{
std::vector<Node<T>> bfs_result;
// check is exist node in the graph
auto &nodeSet = Graph<T>::getNodeSet();
if (std::find(nodeSet.begin(), nodeSet.end(), &start) == nodeSet.end())
{
return bfs_result;
}

std::unordered_map<const Node<T> *, int> node_to_index;
for (const auto &node : nodeSet)
{
node_to_index[node] = node_to_index.size();
}
std::vector<int> visited(nodeSet.size(), 0);

// parameter limitations
if (num_threads <= 0)
{
std::cout << "Error: number of threads should be greater than 0" << std::endl;
num_threads = 2;
}

const AdjacencyMatrix<T> &adj = Graph<T>::getAdjMatrix();
// vector that stores vertices to be visit
std::vector<const Node<T> *> level_tracker, next_level_tracker;
level_tracker.reserve(static_cast<int>(1.0 *nodeSet.size()));
next_level_tracker.reserve(static_cast<int>(1.0 * nodeSet.size()));

// mark the starting node as visited
visited[node_to_index[&start]] = 1;
level_tracker.push_back(&start);

// a worker is assigned a small part of tasks for each time
// assignments of tasks in current level and updates of tasks in next level are inclusive
std::mutex tracker_mutex;
std::mutex next_tracker_mutex;
std::atomic<int> assigned_tasks = 0;
int num_tasks = 1;
// unit of task assignment, which mean assign block_size tasks to a worker each time
int block_size = 1;
int level = 1;

auto extract_tasks = [&level_tracker, &tracker_mutex, &assigned_tasks, &num_tasks, &block_size] () -> std::pair<int,int>
{
/*
std::lock_guard<std::mutex> tracker_guard(tracker_mutex);
int task_block_size = std::min(num_tasks - assigned_tasks, block_size);
std::pair<int,int> task_block{assigned_tasks, assigned_tasks + task_block_size};
assigned_tasks += task_block_size;
return task_block;
*/
int start = assigned_tasks.fetch_add(block_size);
int end = std::min(num_tasks, start + block_size);
return {start, end};
};

auto submit_result = [&next_level_tracker, &next_tracker_mutex] (std::vector<const Node<T> *> &submission) -> void
{
std::lock_guard<std::mutex> tracker_guard(next_tracker_mutex);
next_level_tracker.insert(std::end(next_level_tracker), std::begin(submission), std::end(submission));
};

// worker thread sleep until it begin to search nodes of next level
std::mutex next_level_mutex;
std::condition_variable next_level_cond;
std::atomic<int> waiting_workers = 0;

auto bfs_worker = [&] () -> void
{
// algorithm is not done
while (!level_tracker.empty())
{
// search for nodes in a level is not done
std::vector<const Node<T> *> local_tracker;
while (1)
{
auto [start_index, end_index] = extract_tasks();
if (start_index >= end_index)
{
break;
}

for (int i = start_index; i < end_index; ++i)
{
if (adj.count(level_tracker[i]))
{
for (const auto &elem : adj.at(level_tracker[i]))
{
int index = node_to_index[elem.first];
if (visited[index] == 0)
{
visited[index] = 1;
local_tracker.push_back(elem.first);
}
}
}
}
}

// submit local result to global result
if (!local_tracker.empty())
{
submit_result(local_tracker);
}

// last worker need to do preparation for the next iteration
int cur_level = level;
if (num_threads == 1 + waiting_workers.fetch_add(1))
{
swap(level_tracker, next_level_tracker);
next_level_tracker.clear();

// adjust block_size according to number of nodes in next level
block_size = 4;
if (level_tracker.size() <= num_threads * 4)
{
block_size = std::max(1, static_cast<int>(std::ceil(static_cast<double>(level_tracker.size()) / num_threads)));
}
else if (level_tracker.size() >= num_threads * 64)
{
block_size = 16;
}

num_tasks = level_tracker.size();
waiting_workers = 0;
assigned_tasks = 0;
level = level + 1;
next_level_cond.notify_all();
}
else
{
// not to wait if last worker reachs last statement before notify all or even further
std::unique_lock<std::mutex> next_level_lock(next_level_mutex);
next_level_cond.wait(next_level_lock,
[&level, cur_level] () { return level != cur_level;});
}
}
};

std::vector<std::thread> workers;
for (int i = 0; i < num_threads - 1; ++i)
{
workers.emplace_back(std::thread(bfs_worker));
}
bfs_worker();

for (auto &worker : workers)
{
if (worker.joinable())
{
worker.join();
}
}

for (const auto &visited_node : nodeSet)
{
if (visited[node_to_index[visited_node]] != 0)
{
bfs_result.push_back(*visited_node);
}
}

return bfs_result;
}
template <typename T>
const std::vector<Node<T>> Graph<T>::depth_first_search(const Node<T> &start) const
{
Expand Down
Loading

0 comments on commit 5afdc18

Please sign in to comment.