diff --git a/day1/README.md b/day1/README.md index 4736276..79cda03 100644 --- a/day1/README.md +++ b/day1/README.md @@ -123,7 +123,7 @@ warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ## はじめてのMPI -環境構築ができたら、こんなコードを書いて、[hello.cpp](hello.cpp)という名前で保存してみよう。 +環境構築ができたら、こんなコードを書いて、`hello.cpp`という名前で保存してみよう。 ```cpp #include @@ -196,9 +196,7 @@ MPI_Comm_rank(MPI_COMM_WORLD, &rank); ``` これを実行すると変数`rank`にランク番号が入る。N並列している場合、ランクは0からN-1までである。 -試してみよう。 - -[rank.cpp](rank.cpp) +試してみよう。以下を`rank.cpp`として保存し、コンパイル、実行してみよう。 ```cpp #include @@ -301,7 +299,7 @@ mpirun -np 4 ./a.out 競合が起きないようにする。 「ひとかたまりの処理」とは、例えば「`printf`で出力を始めてから終わるまで」である。 -例えば先程の[rank.cpp](rank.cpp)の例では、 +例えば先程の`rank.cpp`の例では、 ```cpp printf("Hello! My rank is %d\n", rank); @@ -330,11 +328,7 @@ puts("Hello! My rank is 3"); とかになるだけで、さほど表示は乱れない。 -さて、同様なプログラムを`std::cout`で書いてみよう。 - -こんな感じになると思う。 - -[rank_stream.cpp](rank_stream.cpp) +さて、同様なプログラムを`std::cout`で書いてみよう。以下を`rank_stream.cpp`という名前で保存、コンパイル、実行してみよう。 ```cpp #include @@ -468,11 +462,7 @@ gdbで特定のプロセスにアタッチする。しかし、gdbでアタッ * gdbで変数をいじって無限ループを脱出させる * あとは好きなようにデバッグする -という方針でいく。なお、なぜかMac OSではMPIプロセスへのgdbのアタッチがうまくいかなかったので、以下はCentOSで実行している。 - -こんなコードを書く。 - -[gdb_mpi.cpp](gdb_mpi.cpp) +という方針でいく。なお、なぜかMac OSではMPIプロセスへのgdbのアタッチがうまくいかなかったので、以下はCentOSで実行している。以下を`gdb_mpi.cpp`という名前で保存しよう。 ```cpp #include diff --git a/day3/README.md b/day3/README.md index bf5e3bb..7301b0c 100644 --- a/day3/README.md +++ b/day3/README.md @@ -19,7 +19,7 @@ まず、自明並列でよく出てくる例として、サンプリング数を並列化で稼ぐ方法を見てみよう。とりあえず定番の、 モンテカルロ法で円周率を計算してみる。 -こんなコードを書いて、[calc_pi.cpp](calc_pi.cpp)という名前で保存してみよう。 +こんなコードを書いて、`calc_pi.cpp`という名前で保存してみよう。 ```cpp #include @@ -65,7 +65,7 @@ $ ./a.out 4. ランク番号を乱数の種に使う 5. そのまま`calc_pi`を呼ぶ。 -以上の修正をしたコードを[calc_pi_mpi.cpp](calc_pi_mpi.cpp)という名前で作成する。 +以上の修正をしたコードを`calc_pi_mpi.cpp`という名前で作成する。 ```cpp #include @@ -154,7 +154,7 @@ PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP ## 自明並列テンプレート -先程の並列プログラム[calc_pi_mpi.cpp](calc_pi_mpi.cpp)のmain関数はこうなっていた。 +先程の並列プログラム`calc_pi_mpi.cpp`のmain関数はこうなっていた。 ```cpp int main(int argc, char **argv) { @@ -242,9 +242,7 @@ MPI_Comm_size(MPI_COMM_WORLD, &procs) ``` とすれば、`procs`にプロセス数が入る。これを使うと、先程のコードは -こんな感じにかける。 - -[processfiles.cpp](processfiles.cpp) +こんな感じにかける(`processfiles.cpp`)。 ```cpp #include @@ -299,7 +297,7 @@ MPI_Allreduce(&pi, &pi_sum, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 第一引数から、「和を取りたい変数」「和を受け取りたい変数」「変数の数」「変数の型」「やりたい演算」「コミュニケータ」の順番で指定する。ここでは一つの変数のみ総和演算を行っているが、配列を渡して一気に複数のデータについて総和を取ることもできる。また、総和だけでなく積や論理演算も実行できる。 -円周率の推定値`pi`と、その自乗`pi2 = pi*pi`について総和を取り、定義通りに期待値と標準偏差を求めるコードが[calc_pi_reduce.cpp](calc_pi_reduce.cpp)である。 +円周率の推定値`pi`と、その自乗`pi2 = pi*pi`について総和を取り、定義通りに期待値と標準偏差を求めるコードが以下の`calc_pi_reduce.cpp`である。 ```cpp #include diff --git a/day4/README.md b/day4/README.md index 4c1ad43..74e3cad 100644 --- a/day4/README.md +++ b/day4/README.md @@ -92,9 +92,74 @@ void dump(std::vector &data) { } ``` -あとは適当な条件を与えれば時間発展させることができる。ここでは、「一様加熱」と「温度固定」の二通りを試してみよう。コードはこちら。 +あとは適当な条件を与えれば時間発展させることができる。ここでは、「一様加熱」と「温度固定」の二通りを試してみよう。以下のコードを`thermal.cpp`という名前で保存、実行してみよう。 -[thermal.cpp](thermal.cpp) +```cpp +#include +#include +#include +#include + +const int L = 128; +const int STEP = 100000; +const int DUMP = 1000; + +void onestep(std::vector &lattice, const double h) { + static std::vector orig(L); + std::copy(lattice.begin(), lattice.end(), orig.begin()); + for (int i = 1; i < L - 1; i++) { + lattice[i] += h * (orig[i - 1] - 2.0 * orig[i] + orig[i + 1]); + } + // For Periodic Boundary + lattice[0] += h * (orig[L - 1] - 2.0 * lattice[0] + orig[1]); + lattice[L - 1] += h * (orig[L - 2] - 2.0 * lattice[L - 1] + orig[0]); +} + +void dump(std::vector &data) { + static int index = 0; + char filename[256]; + sprintf(filename, "data%03d.dat", index); + std::cout << filename << std::endl; + std::ofstream ofs(filename); + for (int i = 0; i < data.size(); i++) { + ofs << i << " " << data[i] << std::endl; + } + index++; +} + +void fixed_temperature(std::vector &lattice) { + const double h = 0.01; + const double Q = 1.0; + for (int i = 0; i < STEP; i++) { + onestep(lattice, h); + lattice[L / 4] = Q; + lattice[3 * L / 4] = -Q; + if ((i % DUMP) == 0) dump(lattice); + } +} + +void uniform_heating(std::vector &lattice) { + const double h = 0.2; + const double Q = 1.0; + for (int i = 0; i < STEP; i++) { + onestep(lattice, h); + for (auto &s : lattice) { + s += Q * h; + } + lattice[0] = 0.0; + lattice[L - 1] = 0.0; + if ((i % DUMP) == 0) dump(lattice); + } +} + +int main() { + std::vector lattice(L, 0.0); + //uniform_heating(lattice); + fixed_temperature(lattice); +} +``` + +実行結果は以下のようになる。 ![fig/thermal.png](fig/thermal.png) @@ -188,9 +253,7 @@ void fixed_temperature(std::vector &lattice) { とりあえずメモリに問題なければ「3. 一度まとめてから吐く」が楽なので、今回はこれを採用しよう。メモリが厳しかったり、数万プロセスの計算とかする時にはなにか工夫してくださいまし。 さて、「一度まとめてから吐く」ためには、「各プロセスにバラバラにあるデータを、どこかのプロセスに一括して持ってくる」必要があるのだが、MPIには -そのものずばり`MPI_Gather`という関数がある。使い方は以下のサンプルを見たほうが早いと思う。 - -[gather.cpp](gather.cpp) +そのものずばり`MPI_Gather`という関数がある。使い方はサンプルを見たほうが早い。以下を`gather.cpp`という名前で保存、実行しよう。 ```cpp #include @@ -343,11 +406,104 @@ void fixed_temperature(std::vector &lattice, int rank, int procs) { } ``` -これも一様加熱と同じで、「温度を固定している場所がどのプロセスが担当するどの場所か」を調べる必要があるが、それを考えるのはさほど難しくないだろう。 +これも一様加熱と同じで、「温度を固定している場所がどのプロセスが担当するどの場所か」を調べる必要があるが、それを考えるのはさほど難しくないだろう。そんなわけで完成した並列コードを`thermal_mpi.cpp`という名前で保存しよう。 -そんなわけで完成した並列コードがこちら。 +```cpp +#include +#include +#include +#include +#include -[thermal_mpi.cpp](thermal_mpi.cpp) +const int L = 128; +const int STEP = 100000; +const int DUMP = 1000; + +void dump(std::vector &data) { + static int index = 0; + char filename[256]; + sprintf(filename, "data%03d.dat", index); + std::cout << filename << std::endl; + std::ofstream ofs(filename); + for (unsigned int i = 0; i < data.size(); i++) { + ofs << i << " " << data[i] << std::endl; + } + index++; +} + +void dump_mpi(std::vector &local, int rank, int procs) { + static std::vector global(L); + MPI_Gather(&(local[1]), L / procs, MPI_DOUBLE, global.data(), L / procs, MPI_DOUBLE, 0, MPI_COMM_WORLD); + if (rank == 0) { + dump(global); + } +} + +void onestep(std::vector &lattice, double h, int rank, int procs) { + const int size = lattice.size(); + static std::vector orig(size); + std::copy(lattice.begin(), lattice.end(), orig.begin()); + // ここから通信のためのコード + const int left = (rank - 1 + procs) % procs; // 左のランク番号 + const int right = (rank + 1) % procs; // 右のランク番号 + MPI_Status st; + // 右端を右に送って、左端を左から受け取る + MPI_Sendrecv(&(lattice[size - 2]), 1, MPI_DOUBLE, right, 0, &(orig[0]), 1, MPI_DOUBLE, left, 0, MPI_COMM_WORLD, &st); + // 左端を左に送って、右端を右から受け取る + MPI_Sendrecv(&(lattice[1]), 1, MPI_DOUBLE, left, 0, &(orig[size - 1]), 1, MPI_DOUBLE, right, 0, MPI_COMM_WORLD, &st); + + //あとはシリアル版と同じ + for (int i = 1; i < size - 1; i++) { + lattice[i] += h * (orig[i - 1] - 2.0 * orig[i] + orig[i + 1]); + } +} + +void uniform_heating(std::vector &lattice, int rank, int procs) { + const double h = 0.2; + const double Q = 1.0; + for (int i = 0; i < STEP; i++) { + onestep(lattice, h, rank, procs); + for (auto &s : lattice) { + s += Q * h; + } + if (rank == 0) { + lattice[1] = 0.0; + } + if (rank == procs - 1) { + lattice[lattice.size() - 2] = 0.0; + } + if ((i % DUMP) == 0) dump_mpi(lattice, rank, procs); + } +} + +void fixed_temperature(std::vector &lattice, int rank, int procs) { + const double h = 0.01; + const double Q = 1.0; + const int s = L / procs; + for (int i = 0; i < STEP; i++) { + onestep(lattice, h, rank, procs); + if (rank == (L / 4 / s)) { + lattice[L / 4 - rank * s + 1] = Q; + } + if (rank == (3 * L / 4 / s)) { + lattice[3 * L / 4 - rank * s + 1] = -Q; + } + if ((i % DUMP) == 0) dump_mpi(lattice, rank, procs); + } +} + +int main(int argc, char **argv) { + MPI_Init(&argc, &argv); + int rank, procs; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &procs); + const int mysize = L / procs + 2; + std::vector local(mysize); + uniform_heating(local, rank, procs); + //fixed_temperature(local, rank, procs); + MPI_Finalize(); +} +``` せっかく並列化したので、高速化したかどうか調べてみよう。一様加熱の計算をさせてみる。 diff --git a/day5/README.md b/day5/README.md index 82a10bd..d90d49e 100644 --- a/day5/README.md +++ b/day5/README.md @@ -119,9 +119,99 @@ int main() { `save_as_dat`は、呼ばれるたびに配列を連番のファイル名で保存する関数である。 -全体のコードはこんな感じになる。 +全体のコードはこんな感じになる(`gs.cpp`)。 -[gs.cpp](gs.cpp) +```cpp +#include +#include +#include +#include + +const int L = 128; +const int TOTAL_STEP = 20000; +const int INTERVAL = 200; +const double F = 0.04; +const double k = 0.06075; +const double dt = 0.2; +const double Du = 0.05; +const double Dv = 0.1; + +typedef std::vector vd; + +void init(vd &u, vd &v) { + int d = 3; + for (int i = L / 2 - d; i < L / 2 + d; i++) { + for (int j = L / 2 - d; j < L / 2 + d; j++) { + u[j + i * L] = 0.7; + } + } + d = 6; + for (int i = L / 2 - d; i < L / 2 + d; i++) { + for (int j = L / 2 - d; j < L / 2 + d; j++) { + v[j + i * L] = 0.9; + } + } +} + +double calcU(double tu, double tv) { + return tu * tu * tv - (F + k) * tu; +} + +double calcV(double tu, double tv) { + return -tu * tu * tv + F * (1.0 - tv); +} + +double laplacian(int ix, int iy, vd &s) { + double ts = 0.0; + ts += s[ix - 1 + iy * L]; + ts += s[ix + 1 + iy * L]; + ts += s[ix + (iy - 1) * L]; + ts += s[ix + (iy + 1) * L]; + ts -= 4.0 * s[ix + iy * L]; + return ts; +} + +void calc(vd &u, vd &v, vd &u2, vd &v2) { + for (int iy = 1; iy < L - 1; iy++) { + for (int ix = 1; ix < L - 1; ix++) { + double du = 0; + double dv = 0; + const int i = ix + iy * L; + du = Du * laplacian(ix, iy, u); + dv = Dv * laplacian(ix, iy, v); + du += calcU(u[i], v[i]); + dv += calcV(u[i], v[i]); + u2[i] = u[i] + du * dt; + v2[i] = v[i] + dv * dt; + } + } +} + +void save_as_dat(vd &u) { + static int index = 0; + char filename[256]; + sprintf(filename, "conf%03d.dat", index); + std::cout << filename << std::endl; + std::ofstream ofs(filename, std::ios::binary); + ofs.write((char *)(u.data()), sizeof(double)*L * L); + index++; +} + +int main() { + const int V = L * L; + vd u(V, 0.0), v(V, 0.0); + vd u2(V, 0.0), v2(V, 0.0); + init(u, v); + for (int i = 0; i < TOTAL_STEP; i++) { + if (i & 1) { + calc(u2, v2, u, v); + } else { + calc(u, v, u2, v2); + } + if (i % INTERVAL == 0) save_as_dat(u); + } +} +``` コンパイル、実行してみよう。 @@ -550,9 +640,9 @@ void gather(std::vector &local_data, MPIinfo &mi) { MPIは書いた通りに動く。なので、通信アルゴリズムが決まっていれば、その手順どおりに書くだけである。 実際面倒なのは通信そのものよりも、通信の前処理と後処理だったりする(そもそも今回も通信は一行だけだ)。 -以上をすべてまとめたコードは以下の通り。 +以上をすべてまとめたコードを`gather2d.cpp`としよう。やや大きいので、ウェブへのリンクを貼っておく。 -[gather2d.cpp](gather2d.cpp) +[https://github.com/kaityo256/sevendayshpc/blob/master/day5/gather2d.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day5/gather2d.cpp) main関数だけ書いておくとこんな感じ。 @@ -650,7 +740,7 @@ void sendrecv_x(std::vector &local_data, MPIinfo &mi) { このアルゴリズムを実装するとこんな感じになる。 -[sendrecv.cpp](sendrecv.cpp) +[https://github.com/kaityo256/sevendayshpc/blob/master/day5/sendrecv.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day5/sendrecv.cpp) 実行結果はこんな感じ。 diff --git a/day6/README.md b/day6/README.md index 9ecb7a3..e313b51 100644 --- a/day6/README.md +++ b/day6/README.md @@ -36,9 +36,7 @@ OSの管理下で動くプロセスから見える「メモリ」は、それを などが挙げられる。なお、Windowsでは「ハードディスクにスワップする領域の上限」のことを「仮想メモリ」と呼んでいるようなので注意。 -実際に、プロセスごとに固有の仮想メモリが与えられているのを見てみよう。こんなコードを書いてみる。 - -[vmem.cpp](vmem.cpp) +実際に、プロセスごとに固有の仮想メモリが与えられているのを見てみよう。こんなコード(`vmem.cpp`)を書いてみる。 ```cpp #include @@ -214,7 +212,11 @@ flat-MPIをやっている場合は、各プロセスごとに独立な論理メ * Intel(R) Xeon(R) CPU E5-2680 v3 @ 2.50GHz 12コア x 2ソケット -まず、シリアルコードとしてDay 4で使ったGray Scottモデルの計算を使おう。純粋に計算のみをカウントするため、途中のファイル出力を削除し、また実行時間を測定するようにしたのが[gs.cpp](gs.cpp)である。ただし、デバッグのために最終結果だけファイルに出力している。コンパイルして`perf`でプロファイルをとってみよう。まず、`perf record`で記録を取る。 +まず、シリアルコードとしてDay 4で使ったGray Scottモデルの計算を使おう。純粋に計算のみをカウントするため、途中のファイル出力を削除し、また実行時間を測定するようにしたものが`gs.cpp`である。 + +[https://github.com/kaityo256/sevendayshpc/blob/master/day6/gs.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day6/gs.cpp) + +ただし、デバッグのために最終結果だけファイルに出力している。コンパイルして`perf`でプロファイルをとってみよう。まず、`perf record`で記録を取る。 ```sh $ g++ -O3 -mavx2 -std=c++11 -fopenmp gs.cpp -o gs.out @@ -265,7 +267,7 @@ void calc(vd &u, vd &v, vd &u2, vd &v2) { まずは内側のループにディレクティブを入れてみよう。`#pragma omp parallel for`というディレクティブを対象ループの直前に入れるだけでよい。 -[gs_omp1.cpp](gs_omp1.cpp) +[https://github.com/kaityo256/sevendayshpc/blob/master/day6/gs_omp1.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day6/gs_omp1.cpp) ```cpp void calc(vd &u, vd &v, vd &u2, vd &v2) { @@ -306,7 +308,7 @@ diff conf000.org conf000.dat 次に、外側を並列化してみよう。 -[gs_omp2.cpp](gs_omp2.cpp) +[https://github.com/kaityo256/sevendayshpc/blob/master/day6/gs_omp2.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day6/gs_omp2.cpp) ```cpp void calc(vd &u, vd &v, vd &u2, vd &v2) { @@ -582,7 +584,10 @@ const auto e = std::chrono::system_clock::now(); const auto elapsed = std::chrono::duration_cast(e - s).count(); ``` -これで`elapsed`にミリ秒単位の値が入る。このようにして作ったハイブリッド版の反応拡散方程式ソルバが[gs_hybrid.cpp](gs_hybrid.cpp)である。 +これで`elapsed`にミリ秒単位の値が入る。このようにして作ったハイブリッド版の反応拡散方程式ソルバが`gs_hybrid.cpp`である。 + +[https://github.com/kaityo256/sevendayshpc/blob/master/day6/gs_hybrid.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day6/gs_hybrid.cpp) + 筆者の環境ではMPIにパスが通してあるので、以下のようなオプションでコンパイルできる。 ```sh @@ -621,4 +626,4 @@ $ OMP_NUM_THREADS=2 mpiexec -np 2 ./a.out 一般論として、プロセスとスレッドには最適な割合が存在し、どれが一番早いかはやってみないとわからない。 しかし、筆者の経験としては、非常に単純な計算で、かつそこそこ計算量がある場合はflat-MPIが一番早いことが多い。 -ただし、筆者はスレッド並列化にあまり慣れていないため、上記のコードもOpenMPに慣れた人がチューニングしたら、もっとハイブリッド版が早くなるのかもしれない。そのあたりはいろいろ試して見てほしい。「こうすればもっと早くなった」「ここが問題だった」といったプルリクを歓迎する。 \ No newline at end of file +ただし、筆者はスレッド並列化にあまり慣れていないため、上記のコードもOpenMPに慣れた人がチューニングしたら、もっとハイブリッド版が早くなるのかもしれない。そのあたりはいろいろ試して見てほしい。「こうすればもっと早くなった」「ここが問題だった」といったプルリクを歓迎する。 diff --git a/day7/README.md b/day7/README.md index 2131819..33a28ef 100644 --- a/day7/README.md +++ b/day7/README.md @@ -55,9 +55,7 @@ void print256d(__m256d x) { ``` `_m256d x`が、そのまま`double x[4]`として使えているのがわかると思う。この時、`x[0]`が一番下位となる。 -先程の代入と合わせるとこんな感じになる。 - -[print.cpp](print.cpp) +先程の代入と合わせるとこんな感じになる(`print.cpp`)。 ```cpp #include @@ -112,9 +110,7 @@ __Z3addDv4_dS_: `vaddpd`はSIMDの足し算を行う命令であり、ちゃんとYMMレジスタの足し算が呼ばれていることがわかる。 -実際に4要素同時に足し算できることを確認しよう。 - -[add.cpp](add.cpp) +実際に4要素同時に足し算できることを確認しよう(`add.cpp`)。 ```cpp #include @@ -140,9 +136,7 @@ $ ./a.out `(0,1,2,3)`というベクトルと、`(4,5,6,7)`というベクトルの和をとり、`(4,6,8,10)`というベクトルが得られた。 このように、ベクトル同士の演算に見えるので、SIMD化のことをベクトル化と呼んだりする。ただし、線形代数で出てくる -ベクトルの積とは違い、SIMDの積は単に要素ごとの積になることに注意。実際、さっきの和を積にするとこうなる。 - -[mul.cpp](mul.cpp) +ベクトルの積とは違い、SIMDの積は単に要素ごとの積になることに注意。実際、さっきの和を積にするとこうなる(`mul.cpp`)。 ```cpp #include @@ -168,10 +162,7 @@ $ ./a.out それぞれ、`0*0`、`1*5`、`2*6`、`3*7`が計算されていることがわかる。 -あとSIMD化で大事なのは、SIMDレジスタへのデータの読み書きである。先程はデバッグのために`_mm256_set_pd`を使ったが、これは極めて遅い。 -どんな動作をするか見てみよう。 - -[setpd.cpp](setpd.cpp) +あとSIMD化で大事なのは、SIMDレジスタへのデータの読み書きである。先程はデバッグのために`_mm256_set_pd`を使ったが、これは極めて遅い。どんな動作をするか見てみよう(`setpd.cpp`)。 ```cpp #include @@ -211,9 +202,7 @@ __Z5setpddddd: 例えば、`_mm256_load_pd`は、指定されたポインタから連続する4つの倍精度実数をとってきて YMMレジスタに入れてくれる。ただし、そのポインタの指すアドレスは32バイトアラインされていなければならない。 -利用例はこんな感じになる。 - -[load.cpp](load.cpp) +利用例はこんな感じになる(`load.cpp`)。 ```cpp #include @@ -245,9 +234,7 @@ $ ./a.out 10.000000 8.000000 6.000000 4.000000 ``` -`_mm256_load_pd`が何をやっているか(どんなアセンブリに対応するか)も見てみよう。こんなコードのアセンブリを見てみる。 - -[loadasm.cpp](loadasm.cpp) +`_mm256_load_pd`が何をやっているか(どんなアセンブリに対応するか)も見てみよう。こんなコードのアセンブリを見てみる(`loadasm.cpp`)。 ```cpp #include @@ -302,9 +289,7 @@ IBM以外でも、ARMが規定したアセンブリ言語であるUALは「[Unif ## 簡単なSIMD化の例 では、実際にSIMD化をやってみよう。こんなコードを考える。 -一次元の配列の単純な和のループである。 - -[func.cpp](func.cpp) +一次元の配列の単純な和のループである(`func.cpp`)。 ```cpp const int N = 10000; @@ -351,9 +336,7 @@ x86は歴史的経緯からスカラーコードでも浮動小数点演算にSI * 二つのレジスタを足す * 結果のレジスタを配列cのしかるべき場所に保存する -ということをすればSIMD化完了である。コードを見たほうが早いと思う。 - -[func_simd.cpp](func_simd.cpp) +ということをすればSIMD化完了である。コードを見たほうが早いと思う(`func_simd.cpp`)。 ```cpp #include @@ -437,9 +420,64 @@ void check(void(*pfunc)(), const char *type) { } ``` -全部まとめたコードはこちら。 +全部まとめたコードを`simdcheck.cpp`として保存、実行してみよう。 + +```cpp +#include +#include +#include +#include + +const int N = 10000; + +double a[N], b[N], c[N]; +double ans[N]; -[simdcheck.cpp](simdcheck.cpp) +void check(void(*pfunc)(), const char *type) { + pfunc(); + unsigned char *x = (unsigned char *)c; + unsigned char *y = (unsigned char *)ans; + bool valid = true; + for (int i = 0; i < 8 * N; i++) { + if (x[i] != y[i]) { + valid = false; + break; + } + } + if (valid) { + printf("%s is OK\n", type); + } else { + printf("%s is NG\n", type); + } +} + +void func() { + for (int i = 0; i < N; i++) { + c[i] = a[i] + b[i]; + } +} + +void func_simd() { + for (int i = 0; i < N; i += 4) { + __m256d va = _mm256_load_pd(&(a[i])); + __m256d vb = _mm256_load_pd(&(b[i])); + __m256d vc = va + vb; + _mm256_store_pd(&(c[i]), vc); + } +} + +int main() { + std::mt19937 mt; + std::uniform_real_distribution ud(0.0, 1.0); + for (int i = 0; i < N; i++) { + a[i] = ud(mt); + b[i] = ud(mt); + ans[i] = a[i] + b[i]; + } + check(func, "scalar"); + check(func_simd, "vector"); +} +``` 実際に実行してテストしてみよう。 @@ -814,7 +852,7 @@ void dump() { } ``` -時間発展後に座標をダンプして、その結果を比較しよう。シリアル版を[mag.cpp](magnetic/mag.cpp)、SIMD版を[mag_simd.cpp](magnetic/mag_simd.cpp)としておき、以下のようにコンパイル、実行、結果の比較をする。 +時間発展後に座標をダンプして、その結果を比較しよう。シリアル版を[mag.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day7/magnetic/mag.cpp)、SIMD版を[mag_simd.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day7/magnetic/mag_simd.cpp)としておき、以下のようにコンパイル、実行、結果の比較をする。 ```sh $ g++ -std=c++11 -O3 -mavx2 -mfma mag.cpp -o a.out @@ -948,6 +986,6 @@ L13: `vpermpd`がシャッフル命令である。ループボディがかなり小さいが、このループは100000回まわるため、25000回しかまわらないコンパイラによる自動SIMD化ルーチンには勝てない。大雑把な話、ループボディの計算コストが半分だが、回転数が4倍なので2倍負けた、という感じである。 -上記の例のように、「いま手元にあるコード」をがんばって「そのままSIMD化」して高速化しても、データ構造を変えるとコンパイラがあっさり自動SIMD化できて負けることがある。多くの場合「SIMD化」はデータ構造のグローバルな変更を伴う。先のコードのAoS版である[md.cpp](md.cpp)と、SoA版である[md_soa.cpp](md_soa.cpp)は、全く同じことをしているが **全書き換え** になっている。今回はコードが短いから良いが、10万行とかあるコードだと「やっぱりSoAの方が早いから全書き換えで!」と気軽には言えないだろう。また、デバイスによってデータ構造の向き不向きもある。例えば「CPUではAoSの方が早いが、GPGPUではSoAの方が早い」なんてこともざらにある。こういう場合には、ホットスポットルーチンに入る前にAoS←→SoAの相互変換をしたりすることも検討するが、もちろんその分オーバーヘッドもあるので面倒くさいところである。 +上記の例のように、「いま手元にあるコード」をがんばって「そのままSIMD化」して高速化しても、データ構造を変えるとコンパイラがあっさり自動SIMD化できて負けることがある。多くの場合「SIMD化」はデータ構造のグローバルな変更を伴う。先のコードのAoS版である[mag.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day7/magnetic/mag.cpp)と、SoA版である[mag_soa.cpp](https://github.com/kaityo256/sevendayshpc/blob/master/day7/magnetic/mag_soa.cpp)は、全く同じことをしているが **全書き換え** になっている。今回はコードが短いから良いが、10万行とかあるコードだと「やっぱりSoAの方が早いから全書き換えで!」と気軽には言えないだろう。また、デバイスによってデータ構造の向き不向きもある。例えば「CPUではAoSの方が早いが、GPGPUではSoAの方が早い」なんてこともざらにある。こういう場合には、ホットスポットルーチンに入る前にAoS←→SoAの相互変換をしたりすることも検討するが、もちろんその分オーバーヘッドもあるので面倒くさいところである。 -まぁ、以上のようにいろいろ面倒なことを書いたが、ちゃんと手を動かして上記を試してみた方には「SIMD化は(原理的には)簡単だ」ということには同意してもらえると思う。MPIもSIMD化も同じである。いろいろ考えることがあって面倒だが、やること自体は単純なので難しくはない。今回はシャッフル命令を取り上げたが、他にもマスク処理やgather/scatter、pack/unpackなど、SIMDには実に様々な命令がある。しかし、「そういう命令欲しいな」と思って調べたらたいがいある。あとは対応する組み込み関数を呼べばよい。要するに「やるだけ」である。ただし、MPI化は「やれば並列計算ができ、かつプロセスあたりの計算量を増やせばいくらでも並列化効率を上げられる」ことが期待されるのに対して、SIMD化は「やっても性能が向上するかはわからず、下手に手を出すよりコンパイラに任せた方が早い」なんてこともある。全くSIMD化されていないコードに対してSIMD化で得られるゲインは、256bitなら4倍、512ビットでも8倍程度しかなく、現実にはその半分も出れば御の字であろう。SIMD化はやってて楽しい作業であるが、手間とコストが釣り合うかどうかは微妙だな、というのが筆者の実感である。 \ No newline at end of file +まぁ、以上のようにいろいろ面倒なことを書いたが、ちゃんと手を動かして上記を試してみた方には「SIMD化は(原理的には)簡単だ」ということには同意してもらえると思う。MPIもSIMD化も同じである。いろいろ考えることがあって面倒だが、やること自体は単純なので難しくはない。今回はシャッフル命令を取り上げたが、他にもマスク処理やgather/scatter、pack/unpackなど、SIMDには実に様々な命令がある。しかし、「そういう命令欲しいな」と思って調べたらたいがいある。あとは対応する組み込み関数を呼べばよい。要するに「やるだけ」である。ただし、MPI化は「やれば並列計算ができ、かつプロセスあたりの計算量を増やせばいくらでも並列化効率を上げられる」ことが期待されるのに対して、SIMD化は「やっても性能が向上するかはわからず、下手に手を出すよりコンパイラに任せた方が早い」なんてこともある。全くSIMD化されていないコードに対してSIMD化で得られるゲインは、256bitなら4倍、512ビットでも8倍程度しかなく、現実にはその半分も出れば御の字であろう。SIMD化はやってて楽しい作業であるが、手間とコストが釣り合うかどうかは微妙だな、というのが筆者の実感である。 diff --git a/review/config.yml b/review/config.yml index 03329a1..0d33197 100644 --- a/review/config.yml +++ b/review/config.yml @@ -79,7 +79,7 @@ date: 2020-02-29 # 複数指定する場合は次のように記述する # [["初版第1刷の日付", "初版第2刷の日付"], ["第2版第1刷の日付"]] # 日付の後ろを空白文字で区切り、任意の文字列を置くことも可能。 -history: [["2020-02-29 ver 1.0"]] +history: [["2020-02-25 ver 1.1"]] # [experimental] 新刊を頒布したイベント名(例:「技術書典6(2019年春)新刊」) pubevent_name: # 権利表記(配列で複数指定可)