Skip to content

Commit

Permalink
Streamline communication (#1656)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahoenselaar authored Jul 13, 2021
1 parent fdc4aa8 commit 9d979ed
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 150 deletions.
8 changes: 8 additions & 0 deletions python/meep.i
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,15 @@ void _get_gradient(PyObject *grad, PyObject *fields_a, PyObject *fields_f, PyObj
%ignore is_medium;
%ignore is_medium;
%ignore is_metal;
%ignore meep::all_in_or_out;
%ignore meep::all_connect_phases;
%ignore meep::choose_chunkdivision;
%ignore meep::comms_key;
%ignore meep::comms_key_hash_fn;
%ignore meep::comms_manager;
%ignore meep::comms_operation;
%ignore meep::comms_sequence;
%ignore meep::create_comms_manager;
%ignore meep::fields_chunk;
%ignore meep::infinity;

Expand Down
2 changes: 1 addition & 1 deletion scheme/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ endif

# workaround missing namespace prefix in swig
meep_renames.i: $(LIBHDRS)
(echo "// AUTOMATICALLY GENERATED -- DO NOT EDIT"; perl -pe 's/^ *class +([A-Za-z_0-9:]*)( *| *:[^{]*){.*$$/%rename(meep_\1) meep::\1;/' $(LIBHDRS) | grep "%rename" | sed '/meep_fields_chunk/d' | sort -u; echo; grep -hv typedef $(LIBHDRS) | perl -pe 's/(inline|const|extern|static) +//g' | perl -pe 's/^[A-Za-z_0-9:<>]+[* ]+([A-Za-z_0-9:]*) *\(.*$$/%rename(meep_\1) meep::\1;/' | grep "%rename" | sed '/choose_chunkdivision/d' | sort -u; ) > $@
(echo "// AUTOMATICALLY GENERATED -- DO NOT EDIT"; perl -pe 's/^ *class +([A-Za-z_0-9:]*)( *| *:[^{]*){.*$$/%rename(meep_\1) meep::\1;/' $(LIBHDRS) | grep "%rename" | sed '/meep_fields_chunk/d ; /meep_comms_manager/d ; /meep_comms_key_hash_fn/d' | sort -u; echo; grep -hv typedef $(LIBHDRS) | perl -pe 's/(inline|const|extern|static) +//g' | perl -pe 's/^[A-Za-z_0-9:<>]+[* ]+([A-Za-z_0-9:]*) *\(.*$$/%rename(meep_\1) meep::\1;/' | grep "%rename" | sed '/choose_chunkdivision/d ; /meep_create_comms_manager/d' | sort -u; ) > $@

# work around bug in swig, where it doesn't prepend namespace to friend funcs
meep_swig_bug_workaround.i: $(LIBHDRS)
Expand Down
9 changes: 8 additions & 1 deletion scheme/meep.i
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,15 @@ static meep::vec my_kpoint_func(double freq, int mode, void *user_data) {

%warnfilter(302,325,451,503,509);

%ignore meep::all_in_or_out;
%ignore meep::all_connect_phases;
%ignore meep::choose_chunkdivision;
%ignore meep::comms_key;
%ignore meep::comms_key_hash_fn;
%ignore meep::comms_manager;
%ignore meep::comms_operation;
%ignore meep::comms_sequence;
%ignore meep::create_comms_manager;
%ignore meep::fields_chunk;
%ignore meep_geom::fragment_stats;

Expand All @@ -300,4 +308,3 @@ static meep::vec my_kpoint_func(double freq, int mode, void *user_data) {
%}

%include "meep-ctl-swig.hpp"

129 changes: 107 additions & 22 deletions src/boundaries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
% Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/

#include <algorithm>
#include <map>
#include <stdlib.h>
#include <complex>

#include "meep.hpp"
#include "meep/mympi.hpp"
#include "meep_internals.hpp"

#define UNUSED(x) (void)x // silence compiler warnings
Expand All @@ -27,6 +30,52 @@ using namespace std;

namespace meep {

namespace {

// Creates an optimized comms_sequence from a vector of comms_operations.
// Send operations are prioritized in descending order by the amount of data that is transferred.
comms_sequence optimize_comms_operations(const std::vector<comms_operation> &operations) {
comms_sequence ret;
std::map<int, size_t> send_size_by_my_chunk_idx;
std::map<int, std::vector<comms_operation> > send_ops_by_my_chunk_idx;

for (const auto &op : operations) {
if (op.comm_direction == Incoming) {
ret.receive_ops.push_back(op);
continue;
}

// Group send operations by source chunk and accumulate the transfer size - excluding chunk
// pairs that reside on the same processor.
if (op.other_proc_id != my_rank()) {
send_size_by_my_chunk_idx[op.my_chunk_idx] += op.transfer_size;
}
else {
// Make sure that op.my_chunk_idx is represented in the map.
send_size_by_my_chunk_idx[op.my_chunk_idx] += 0;
}
send_ops_by_my_chunk_idx[op.my_chunk_idx].push_back(op);
}

// Sort in descending order to prioritize large transfers.
std::vector<std::pair<int, size_t> > send_op_sizes(send_size_by_my_chunk_idx.begin(),
send_size_by_my_chunk_idx.end());
std::sort(send_op_sizes.begin(), send_op_sizes.end(),
[](const std::pair<int, size_t> &a, const std::pair<int, size_t> &b) -> bool {
return a.second > b.second;
});

// Assemble send operations.
for (const auto &size_pair : send_op_sizes) {
int my_chunk_idx = size_pair.first;
const auto& ops_vector = send_ops_by_my_chunk_idx[my_chunk_idx];
ret.send_ops.insert(std::end(ret.send_ops), std::begin(ops_vector), std::end(ops_vector));
}
return ret;
}

} // namespace

void fields::set_boundary(boundary_side b, direction d, boundary_condition cond) {
if (boundaries[b][d] != cond) {
boundaries[b][d] = cond;
Expand Down Expand Up @@ -116,10 +165,10 @@ void fields::disconnect_chunks() {
for (int i = 0; i < num_chunks * num_chunks; i++) {
delete[] comm_blocks[ft][i];
comm_blocks[ft][i] = 0;
for (int ip = 0; ip < 3; ++ip)
comm_sizes[ft][ip][i] = 0;
}
comms_sequence_for_field[ft].clear();
}
comm_sizes.clear();
}

// this should be called by any code that might set chunk_connections_valid = false,
Expand Down Expand Up @@ -309,9 +358,9 @@ bool fields_chunk::needs_W_notowned(component c) {
}

void fields::connect_the_chunks() {
size_t *nc[NUM_FIELD_TYPES][3][2];
size_t *nc[NUM_FIELD_TYPES][NUM_CONNECT_PHASE_TYPES][2];
FOR_FIELD_TYPES(f) {
for (int ip = 0; ip < 3; ip++)
for (connect_phase ip : all_connect_phases)
for (int io = 0; io < 2; io++) {
nc[f][ip][io] = new size_t[num_chunks];
for (int i = 0; i < num_chunks; i++)
Expand Down Expand Up @@ -356,14 +405,10 @@ void fields::connect_the_chunks() {
FOR_E_AND_H(c) { needs_W_notowned[c] = or_to_all(needs_W_notowned[c]); }
finished_working();

comm_sizes.clear();
for (int i = 0; i < num_chunks; i++) {
// First count the border elements...
const grid_volume vi = chunks[i]->gv;
FOR_FIELD_TYPES(ft) {
for (int ip = 0; ip < 3; ip++)
for (int j = 0; j < num_chunks; j++)
comm_sizes[ft][ip][j + i * num_chunks] = 0;
}
FOR_COMPONENTS(corig) {
if (have_component(corig)) LOOP_OVER_VOL_NOTOWNED(vi, corig, n) {
IVEC_LOOP_ILOC(vi, here);
Expand All @@ -372,10 +417,10 @@ void fields::connect_the_chunks() {
complex<double> thephase;
if (locate_component_point(&c, &here, &thephase) && !on_metal_boundary(here))
for (int j = 0; j < num_chunks; j++) {
const std::pair<int, int> pair_j_to_i{j, i};
if ((chunks[i]->is_mine() || chunks[j]->is_mine()) && chunks[j]->gv.owns(here) &&
!(is_B(corig) && is_B(c) && B_redundant[5 * i + corig - Bx] &&
B_redundant[5 * j + c - Bx])) {
const int pair = j + i * num_chunks;
const connect_phase ip = thephase == 1.0
? CONNECT_COPY
: (thephase == -1.0 ? CONNECT_NEGATE : CONNECT_PHASE);
Expand All @@ -384,14 +429,14 @@ void fields::connect_the_chunks() {
const int nn = is_real ? 1 : 2;
nc[f][ip][Incoming][i] += nn;
nc[f][ip][Outgoing][j] += nn;
comm_sizes[f][ip][pair] += nn;
comm_sizes[{f, ip, pair_j_to_i}] += nn;
}
if (needs_W_notowned[corig]) {
field_type f = is_electric(corig) ? WE_stuff : WH_stuff;
const int nn = is_real ? 1 : 2;
nc[f][ip][Incoming][i] += nn;
nc[f][ip][Outgoing][j] += nn;
comm_sizes[f][ip][pair] += nn;
comm_sizes[{f, ip, pair_j_to_i}] += nn;
}
if (is_electric(corig) || is_magnetic(corig)) {
field_type f = is_electric(corig) ? PE_stuff : PH_stuff;
Expand All @@ -411,11 +456,11 @@ void fields::connect_the_chunks() {
const size_t nn = (is_real ? 1 : 2) * (cni);
nc[f][ip][Incoming][i] += nn;
nc[f][ip][Outgoing][j] += nn;
comm_sizes[f][ip][pair] += nn;
comm_sizes[{f, ip, pair_j_to_i}] += nn;
const connect_phase iip = CONNECT_COPY;
nc[f][iip][Incoming][i] += ni;
nc[f][iip][Outgoing][j] += ni;
comm_sizes[f][iip][pair] += ni;
comm_sizes[{f, iip, pair_j_to_i}] += ni;
}
} // if is_mine and owns...
} // loop over j chunks
Expand All @@ -426,7 +471,7 @@ void fields::connect_the_chunks() {
FOR_FIELD_TYPES(ft) {
for (int j = 0; j < num_chunks; j++) {
delete[] comm_blocks[ft][j + i * num_chunks];
comm_blocks[ft][j + i * num_chunks] = new realnum[comm_size_tot(ft, j + i * num_chunks)];
comm_blocks[ft][j + i * num_chunks] = new realnum[comm_size_tot(ft, {j, i})];
}
}
} // loop over i chunks
Expand All @@ -438,15 +483,15 @@ void fields::connect_the_chunks() {
for i < i' */

// wh stores the current indices in the connections array(s)
size_t *wh[NUM_FIELD_TYPES][3][2];
size_t *wh[NUM_FIELD_TYPES][NUM_CONNECT_PHASE_TYPES][2];

/* Now allocate the connection arrays... this is still slightly
wasteful (by a factor of 2) because we allocate for chunks we
don't own if we have a connection with them. Removing this waste
would mean a bunch more is_mine() checks below. */
FOR_FIELD_TYPES(f) {
for (int ip = 0; ip < 3; ip++) {
for (int io = 0; io < 2; io++) {
for (connect_phase ip : all_connect_phases) {
for (in_or_out io : all_in_or_out) {
for (int i = 0; i < num_chunks; i++)
chunks[i]->alloc_extra_connections(field_type(f), connect_phase(ip), in_or_out(io),
nc[f][ip][io][i]);
Expand All @@ -465,11 +510,10 @@ void fields::connect_the_chunks() {

// initialize wh[f][ip][Incoming][j] to sum of comm_sizes for jj < j
FOR_FIELD_TYPES(f) {
for (int ip = 0; ip < 3; ip++) {
for (connect_phase ip : all_connect_phases) {
wh[f][ip][Incoming][0] = 0;
for (int j = 1; j < num_chunks; ++j)
wh[f][ip][Incoming][j] =
wh[f][ip][Incoming][j - 1] + comm_sizes[f][ip][(j - 1) + i * num_chunks];
wh[f][ip][Incoming][j] = wh[f][ip][Incoming][j - 1] + get_comm_size({f, ip, {j - 1, i}});
}
}

Expand Down Expand Up @@ -555,10 +599,51 @@ void fields::connect_the_chunks() {
} // LOOP_OVER_VOL_NOTOWNED
} // FOR_COMPONENTS
} // loop over i chunks

FOR_FIELD_TYPES(f) {
for (int ip = 0; ip < 3; ip++)
for (connect_phase ip : all_connect_phases)
for (int io = 0; io < 2; io++)
delete[] wh[f][ip][io];

// Calculate the sequence of sends and receives in advance.
// Initiate receive operations as early as possible.
std::unique_ptr<comms_manager> manager = create_comms_manager();
std::vector<comms_operation> operations;
std::vector<int> tagto(count_processors());

for (int j = 0; j < num_chunks; j++) {
for (int i = 0; i < num_chunks; i++) {
const chunk_pair pair{j, i};
const size_t comm_size = comm_size_tot(f, pair);
if (!comm_size) continue;
if (comm_size > manager->max_transfer_size()) {
// MPI uses int for size to send/recv
meep::abort("communications size too big for the current implementation");
}
const int pair_idx = j + i * num_chunks;

if (chunks[j]->is_mine()) {
operations.push_back(comms_operation{/*my_chunk_idx=*/j,
/*other_chunk_idx=*/i,
/*other_proc_id=*/chunks[i]->n_proc(),
/*pair_idx=*/pair_idx,
/*transfer_size=*/comm_size,
/*comm_direction=*/Outgoing,
/*tag=*/tagto[chunks[i]->n_proc()]++});
}
if (chunks[i]->is_mine()) {
operations.push_back(comms_operation{/*my_chunk_idx=*/i,
/*other_chunk_idx=*/j,
/*other_proc_id=*/chunks[j]->n_proc(),
/*pair_idx=*/pair_idx,
/*transfer_size=*/comm_size,
/*comm_direction=*/Incoming,
/*tag=*/tagto[chunks[j]->n_proc()]++});
}
}
}

comms_sequence_for_field[f] = optimize_comms_operations(operations);
}
delete[] B_redundant;
}
Expand Down
16 changes: 4 additions & 12 deletions src/fields.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ fields::fields(structure *s, double m, double beta, bool zero_fields_near_cylori
for (int i = 0; i < num_chunks; i++)
chunks[i] = new fields_chunk(s->chunks[i], outdir, m, beta, zero_fields_near_cylorigin, i);
FOR_FIELD_TYPES(ft) {
for (int ip = 0; ip < 3; ip++) {
comm_sizes[ft][ip] = new size_t[num_chunks * num_chunks];
for (int i = 0; i < num_chunks * num_chunks; i++)
comm_sizes[ft][ip][i] = 0;
}
typedef realnum *realnum_ptr;
comm_blocks[ft] = new realnum_ptr[num_chunks * num_chunks];
for (int i = 0; i < num_chunks * num_chunks; i++)
Expand Down Expand Up @@ -116,11 +111,6 @@ fields::fields(const fields &thef)
for (int i = 0; i < num_chunks; i++)
chunks[i] = new fields_chunk(*thef.chunks[i], i);
FOR_FIELD_TYPES(ft) {
for (int ip = 0; ip < 3; ip++) {
comm_sizes[ft][ip] = new size_t[num_chunks * num_chunks];
for (int i = 0; i < num_chunks * num_chunks; i++)
comm_sizes[ft][ip][i] = 0;
}
typedef realnum *realnum_ptr;
comm_blocks[ft] = new realnum_ptr[num_chunks * num_chunks];
for (int i = 0; i < num_chunks * num_chunks; i++)
Expand All @@ -140,8 +130,6 @@ fields::~fields() {
for (int i = 0; i < num_chunks * num_chunks; i++)
delete[] comm_blocks[ft][i];
delete[] comm_blocks[ft];
for (int ip = 0; ip < 3; ip++)
delete[] comm_sizes[ft][ip];
}
delete sources;
delete fluxes;
Expand Down Expand Up @@ -772,4 +760,8 @@ double linear_interpolate(double rx, double ry, double rz, double *data,
#undef D
}

bool operator==(const comms_key &lhs, const comms_key &rhs) {
return (lhs.ft == rhs.ft) && (lhs.phase == rhs.phase) && (lhs.pair == rhs.pair);
}

} // namespace meep
Loading

0 comments on commit 9d979ed

Please sign in to comment.