- C++17 对标准库算法重载了并行版本,区别是多了一个指定执行策略的参数
std::vector<int> v;
std::sort(std::execution::par, v.begin(), v.end());
- std::execution::par 表示允许多线程并行执行此算法,注意这是一个权限(permission)而非强制要求(requirement),此算法依然可以被单线程执行
- 另外,如果指定了执行策略,算法复杂度的要求也更宽松,因为并行算法为了利用好系统的并行性通常要做更多工作。比如把工作划分给 100 个处理器,即使总工作是原来的两倍,也仍然能获得原来的五十倍的性能
- <execution> 中指定了如下执行策略类
std::execution::sequenced_policy
std::execution::parallel_policy
std::execution::parallel_unsequenced_policy
std::execution::unsequenced_policy // C++20
std::execution::seq
std::execution::par
std::execution::par_unseq
std::execution::unseq // C++20
- 如果使用执行策略,算法的行为就会受执行策略影响,影响方面包括:算法复杂度、抛异常时的行为、算法步骤的执行位置(where)、方式(how)、时刻(when)
- 除了管理并行执行的调度开销,许多并行算法会执行更多的核心操作(交换、比较、使用函数对象等),这样可以减少总的实际消耗时间,从而全面提升性能。这就是算法复杂度受影响的原因,其具体改变因算法不同而异
- 在不指定执行策略时,如下对算法的调用,抛出的异常会被传播
std::for_each(v.begin(), v.end(), [](auto x) { throw my_exception(); });
std::for_each(std::execution::seq, v.begin(), v.end(),
[](auto x) { throw my_exception(); });
- 不同的执行策略的执行方式也不相同。执行策略会指定执行算法步骤的代理,可以是常规线程、矢量流、GPU 线程或其他任何东西。执行策略也会指定算法步骤运行的顺序限制,比如是否要以特定顺序运行、不同算法步骤的一部分是否可以互相交错或并行运行等。下面对不同的执行策略进行详细解释
- std::execution::sequenced_policy 策略要求可以不(may not)并行执行,所有操作将执行在一个线程上。但它也是执行策略,因此与其他执行策略一样会影响算法复杂度和异常行为
- 所有执行在一个线程上的操作必须以某个确定顺序执行,因此这些操作是不能互相交错的。但不规定具体顺序,因此对于不同的函数调用可能产生不同的顺序
std::vector<int> v(1000);
int n = 0;
// 把 1-1000 存入容器,存入顺序可能是顺序也可能是乱序
std::for_each(std::execution::seq, v.begin(), v.end(),
[&](int& x) { x = ++n; });
std::for_each(std::execution::par, v.begin(), v.end(), [](auto& x) { ++x; });
- 只有在元素之间有特定顺序或对共享数据的访问不同步时,它才有问题
std::vector<int> v(1000);
int n = 0;
std::for_each(std::execution::par, v.begin(), v.end(), [&](int& x) {
x = ++n;
}); // 如果多个线程执行 lambda 就会对 n 产生数据竞争
std::accumulate(v.begin(), v.end(), 0);
std::reduce(std::execution::par, v.begin(), v.end());
- 如果常规算法有并行版的重载,则并行版对常规算法原有的所有重载都有一个对应重载版本
template <class RandomIt>
void sort(RandomIt first, RandomIt last);
template <class RandomIt, class Compare>
void sort(RandomIt first, RandomIt last, Compare comp);
// 并行版对应有两个重载
template <class ExecutionPolicy, class RandomIt>
void sort(ExecutionPolicy&& policy, RandomIt first, RandomIt last);
template <class ExecutionPolicy, class RandomIt, class Compare>
void sort(ExecutionPolicy&& policy, RandomIt first, RandomIt last,
Compare comp);
- 但并行版的重载对部分算法有一些区别,如果常规版本使用的是输入迭代器(input iterator)或输出迭代器(output iterator),则并行版的重载将使用前向迭代器(forward iterator)
template <class InputIt, class OutputIt>
OutputIt copy(InputIt first, InputIt last, OutputIt d_first);
template <class ExecutionPolicy, class ForwardIt1, class ForwardIt2>
ForwardIt2 copy(ExecutionPolicy&& policy, ForwardIt1 first, ForwardIt1 last,
ForwardIt2 d_first);
#include <algorithm>
#include <mutex>
#include <vector>
class A {
public:
int get() const {
std::lock_guard<std::mutex> l(m_);
return n_;
}
void inc() {
std::lock_guard<std::mutex> l(m_);
++n_;
}
private:
mutable std::mutex m_;
int n_ = 0;
};
void f(std::vector<A>& v) {
std::for_each(std::execution::par, v.begin(), v.end(), [](A& x) { x.inc(); });
}
#include <algorithm>
#include <mutex>
#include <vector>
class A {
public:
int get() const { return n_; }
void inc() { ++n_; }
private:
int n_ = 0;
};
class B {
public:
void lock() { m_.lock(); }
void unlock() { m_.unlock(); }
std::vector<A>& get() { return v_; }
private:
std::mutex m_;
std::vector<A> v_;
};
void f(B& x) {
std::lock_guard<std::mutex> l(x);
auto& v = x.get();
std::for_each(std::execution::par_unseq, v.begin(), v.end(),
[](A& x) { x.inc(); });
}
- 下面是一个更实际的例子。假如有一个网站,访问日志有上百万条,为了方便查看数据需要对日志进行处理。对日志每行的处理是独立的工作,很适合使用并行算法
struct Log {
std::string page;
time_t visit_time;
// any other fields
};
extern Log parse(const std::string& line);
using Map = std::unordered_map<std::string, unsigned long long>;
Map f(const std::vector<std::string>& v) {
struct Combine {
// log、Map 两个参数有四种组合,所以需要四个重载
Map operator()(Map lhs, Map rhs) const {
if (lhs.size() < rhs.size()) {
std::swap(lhs, rhs);
}
for (const auto& x : rhs) {
lhs[x.first] += x.second;
}
return lhs;
}
Map operator()(Log l, Map m) const {
++m[l.page];
return m;
}
Map operator()(Map m, Log l) const {
++m[l.page];
return m;
}
Map operator()(Log lhs, Log rhs) const {
Map m;
++m[lhs.page];
++m[rhs.page];
return m;
}
};
return std::transform_reduce(std::execution::par, v.begin(), v.end(),
Map{}, // 初始值,一个空的 map
Combine{}, // 结合两个元素的二元操作
parse); // 对每个元素执行的一元操作
}