Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multithread-bfs #252

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ target_link_libraries(test_exe
crypto
z)


enable_testing()

add_test(test_node test_exe --gtest_filter=NodeTest*)
Expand Down
62 changes: 61 additions & 1 deletion benchmark/BFS_BM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static void BFS_X(benchmark::State &state)
auto &result = g.breadth_first_search(*(range_start->second->getNodePair().first));
}
}
BENCHMARK(BFS_X)->RangeMultiplier(16)->Range((unsigned long)1, (unsigned long)1 << 16);
BENCHMARK(BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18);

static void BFS_FromReadedCitHep(benchmark::State &state)
{
Expand All @@ -33,3 +33,63 @@ static void BFS_FromReadedCitHep(benchmark::State &state)

BENCHMARK(BFS_FromReadedCitHep);

static void PSEUDO_CONCURRENCY_BFS_X(benchmark::State &state)
{
CXXGRAPH::Graph<int> g;
auto range_start = edges.begin();
auto range_end = edges.find(state.range(0));
std::unordered_map<unsigned long, CXXGRAPH::Edge<int> *> edgesX;
edgesX.insert(range_start, range_end);
for (auto e : edgesX)
{
g.addEdge(&(*e.second));
}
for (auto _ : state)
{
auto &result = g.concurrency_breadth_first_search(*(range_start->second->getNodePair().first), 1);
}
}
BENCHMARK(PSEUDO_CONCURRENCY_BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18);

static void PSEUDO_CONCURRENCY_BFS_FromReadedCitHep(benchmark::State &state)
{
auto edgeSet = cit_graph_ptr->getEdgeSet();
for (auto _ : state)
{

auto &result = cit_graph_ptr->concurrency_breadth_first_search(*((*(edgeSet.begin()))->getNodePair().first), 1);
}
}

BENCHMARK(PSEUDO_CONCURRENCY_BFS_FromReadedCitHep);

static void CONCURRENCY_BFS_X(benchmark::State &state)
{
CXXGRAPH::Graph<int> g;
auto range_start = edges.begin();
auto range_end = edges.find(state.range(0));
std::unordered_map<unsigned long, CXXGRAPH::Edge<int> *> edgesX;
edgesX.insert(range_start, range_end);
for (auto e : edgesX)
{
g.addEdge(&(*e.second));
}
for (auto _ : state)
{
auto &result = g.concurrency_breadth_first_search(*(range_start->second->getNodePair().first), 8);
}
}
BENCHMARK(CONCURRENCY_BFS_X)->RangeMultiplier(18)->Range((unsigned long)1, (unsigned long)1 << 18);

static void CONCURRENCY_BFS_FromReadedCitHep(benchmark::State &state)
{
auto edgeSet = cit_graph_ptr->getEdgeSet();
for (auto _ : state)
{

auto &result = cit_graph_ptr->concurrency_breadth_first_search(*((*(edgeSet.begin()))->getNodePair().first), 8);
}
}

BENCHMARK(CONCURRENCY_BFS_FromReadedCitHep);

181 changes: 180 additions & 1 deletion include/Graph/Graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <fstream>
#include <limits.h>
#include <mutex>
#include <condition_variable>
#include <set>
#include <atomic>
#include <thread>
Expand Down Expand Up @@ -264,7 +265,19 @@ namespace CXXGRAPH
* search.
*
*/
virtual const std::vector<Node<T>> breadth_first_search(const Node<T> &start) const;
virtual const std::vector<Node<T>> breadth_first_search(const Node<T> &start) const;
/**
* \brief
* The multithreaded version of breadth_first_search
* It turns out to be two indepentent functions because of implemntation differences
*
* @param start Node from where traversing starts
* @param num_threads number of threads
* @returns a vector of Node indicating which Node were visited during the
* search.
*
*/
virtual const std::vector<Node<T>> concurrency_breadth_first_search(const Node<T> &start, size_t num_threads) const;
/**
* \brief
* Function performs the depth first search algorithm over the graph
Expand Down Expand Up @@ -1613,7 +1626,173 @@ namespace CXXGRAPH

return visited;
}
template <typename T>
const std::vector<Node<T>> Graph<T>::concurrency_breadth_first_search(const Node<T> &start, size_t num_threads) const
{
std::vector<Node<T>> bfs_result;
// check is exist node in the graph
auto &nodeSet = Graph<T>::getNodeSet();
if (std::find(nodeSet.begin(), nodeSet.end(), &start) == nodeSet.end())
{
return bfs_result;
}

std::unordered_map<const Node<T> *, int> node_to_index;
for (const auto &node : nodeSet)
{
node_to_index[node] = node_to_index.size();
}
std::vector<int> visited(nodeSet.size(), 0);

// parameter limitations
if (num_threads <= 0)
{
std::cout << "Error: number of threads should be greater than 0" << std::endl;
num_threads = 2;
}

const AdjacencyMatrix<T> &adj = Graph<T>::getAdjMatrix();
// vector that stores vertices to be visit
std::vector<const Node<T> *> level_tracker, next_level_tracker;
level_tracker.reserve(static_cast<int>(1.0 *nodeSet.size()));
next_level_tracker.reserve(static_cast<int>(1.0 * nodeSet.size()));

// mark the starting node as visited
visited[node_to_index[&start]] = 1;
level_tracker.push_back(&start);

// a worker is assigned a small part of tasks for each time
// assignments of tasks in current level and updates of tasks in next level are inclusive
std::mutex tracker_mutex;
std::mutex next_tracker_mutex;
std::atomic<int> assigned_tasks = 0;
int num_tasks = 1;
// unit of task assignment, which mean assign block_size tasks to a worker each time
int block_size = 1;
int level = 1;

auto extract_tasks = [&level_tracker, &tracker_mutex, &assigned_tasks, &num_tasks, &block_size] () -> std::pair<int,int>
{
/*
std::lock_guard<std::mutex> tracker_guard(tracker_mutex);
int task_block_size = std::min(num_tasks - assigned_tasks, block_size);
std::pair<int,int> task_block{assigned_tasks, assigned_tasks + task_block_size};
assigned_tasks += task_block_size;
return task_block;
*/
int start = assigned_tasks.fetch_add(block_size);
int end = std::min(num_tasks, start + block_size);
return {start, end};
};

auto submit_result = [&next_level_tracker, &next_tracker_mutex] (std::vector<const Node<T> *> &submission) -> void
{
std::lock_guard<std::mutex> tracker_guard(next_tracker_mutex);
next_level_tracker.insert(std::end(next_level_tracker), std::begin(submission), std::end(submission));
};

// worker thread sleep until it begin to search nodes of next level
std::mutex next_level_mutex;
std::condition_variable next_level_cond;
std::atomic<int> waiting_workers = 0;

auto bfs_worker = [&] () -> void
{
// algorithm is not done
while (!level_tracker.empty())
{
// search for nodes in a level is not done
std::vector<const Node<T> *> local_tracker;
while (1)
{
auto [start_index, end_index] = extract_tasks();
if (start_index >= end_index)
{
break;
}

for (int i = start_index; i < end_index; ++i)
{
if (adj.count(level_tracker[i]))
{
for (const auto &elem : adj.at(level_tracker[i]))
{
int index = node_to_index[elem.first];
if (visited[index] == 0)
{
visited[index] = 1;
local_tracker.push_back(elem.first);
}
}
}
}
}

// submit local result to global result
if (!local_tracker.empty())
{
submit_result(local_tracker);
}

// last worker need to do preparation for the next iteration
int cur_level = level;
if (num_threads == 1 + waiting_workers.fetch_add(1))
{
swap(level_tracker, next_level_tracker);
next_level_tracker.clear();

// adjust block_size according to number of nodes in next level
block_size = 4;
if (level_tracker.size() <= num_threads * 4)
{
block_size = std::max(1, static_cast<int>(std::ceil(static_cast<double>(level_tracker.size()) / num_threads)));
}
else if (level_tracker.size() >= num_threads * 64)
{
block_size = 16;
}

num_tasks = level_tracker.size();
waiting_workers = 0;
assigned_tasks = 0;
level = level + 1;
next_level_cond.notify_all();
}
else
{
// not to wait if last worker reachs last statement before notify all or even further
std::unique_lock<std::mutex> next_level_lock(next_level_mutex);
next_level_cond.wait(next_level_lock,
[&level, cur_level] () { return level != cur_level;});
}
}
};

std::vector<std::thread> workers;
for (int i = 0; i < num_threads - 1; ++i)
{
workers.emplace_back(std::thread(bfs_worker));
}
bfs_worker();

for (auto &worker : workers)
{
if (worker.joinable())
{
worker.join();
}
}

for (const auto &visited_node : nodeSet)
{
if (visited[node_to_index[visited_node]] != 0)
{
bfs_result.push_back(*visited_node);
}
}

return bfs_result;
}
template <typename T>
const std::vector<Node<T>> Graph<T>::depth_first_search(const Node<T> &start) const
{
Expand Down
Loading