Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamline communications #1656

Merged
merged 1 commit into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/meep.i
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,15 @@ void _get_gradient(PyObject *grad, PyObject *fields_a, PyObject *fields_f, PyObj
%ignore is_medium;
%ignore is_medium;
%ignore is_metal;
%ignore meep::all_in_or_out;
%ignore meep::all_connect_phases;
%ignore meep::choose_chunkdivision;
%ignore meep::comms_key;
%ignore meep::comms_key_hash_fn;
%ignore meep::comms_manager;
%ignore meep::comms_operation;
%ignore meep::comms_sequence;
%ignore meep::create_comms_manager;
%ignore meep::fields_chunk;
%ignore meep::infinity;

Expand Down
2 changes: 1 addition & 1 deletion scheme/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ endif

# workaround missing namespace prefix in swig
meep_renames.i: $(LIBHDRS)
(echo "// AUTOMATICALLY GENERATED -- DO NOT EDIT"; perl -pe 's/^ *class +([A-Za-z_0-9:]*)( *| *:[^{]*){.*$$/%rename(meep_\1) meep::\1;/' $(LIBHDRS) | grep "%rename" | sed '/meep_fields_chunk/d' | sort -u; echo; grep -hv typedef $(LIBHDRS) | perl -pe 's/(inline|const|extern|static) +//g' | perl -pe 's/^[A-Za-z_0-9:<>]+[* ]+([A-Za-z_0-9:]*) *\(.*$$/%rename(meep_\1) meep::\1;/' | grep "%rename" | sed '/choose_chunkdivision/d' | sort -u; ) > $@
(echo "// AUTOMATICALLY GENERATED -- DO NOT EDIT"; perl -pe 's/^ *class +([A-Za-z_0-9:]*)( *| *:[^{]*){.*$$/%rename(meep_\1) meep::\1;/' $(LIBHDRS) | grep "%rename" | sed '/meep_fields_chunk/d ; /meep_comms_manager/d ; /meep_comms_key_hash_fn/d' | sort -u; echo; grep -hv typedef $(LIBHDRS) | perl -pe 's/(inline|const|extern|static) +//g' | perl -pe 's/^[A-Za-z_0-9:<>]+[* ]+([A-Za-z_0-9:]*) *\(.*$$/%rename(meep_\1) meep::\1;/' | grep "%rename" | sed '/choose_chunkdivision/d ; /meep_create_comms_manager/d' | sort -u; ) > $@

# work around bug in swig, where it doesn't prepend namespace to friend funcs
meep_swig_bug_workaround.i: $(LIBHDRS)
Expand Down
9 changes: 8 additions & 1 deletion scheme/meep.i
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,15 @@ static meep::vec my_kpoint_func(double freq, int mode, void *user_data) {

%warnfilter(302,325,451,503,509);

%ignore meep::all_in_or_out;
%ignore meep::all_connect_phases;
%ignore meep::choose_chunkdivision;
%ignore meep::comms_key;
%ignore meep::comms_key_hash_fn;
%ignore meep::comms_manager;
%ignore meep::comms_operation;
%ignore meep::comms_sequence;
%ignore meep::create_comms_manager;
%ignore meep::fields_chunk;
%ignore meep_geom::fragment_stats;

Expand All @@ -300,4 +308,3 @@ static meep::vec my_kpoint_func(double freq, int mode, void *user_data) {
%}

%include "meep-ctl-swig.hpp"

129 changes: 107 additions & 22 deletions src/boundaries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
% Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/

#include <algorithm>
#include <map>
#include <stdlib.h>
#include <complex>

#include "meep.hpp"
#include "meep/mympi.hpp"
#include "meep_internals.hpp"

#define UNUSED(x) (void)x // silence compiler warnings
Expand All @@ -27,6 +30,52 @@ using namespace std;

namespace meep {

namespace {

// Creates an optimized comms_sequence from a vector of comms_operations.
// Send operations are prioritized in descending order by the amount of data that is transferred.
comms_sequence optimize_comms_operations(const std::vector<comms_operation> &operations) {
comms_sequence ret;
std::map<int, size_t> send_size_by_my_chunk_idx;
std::map<int, std::vector<comms_operation> > send_ops_by_my_chunk_idx;

for (const auto &op : operations) {
if (op.comm_direction == Incoming) {
ret.receive_ops.push_back(op);
continue;
}

// Group send operations by source chunk and accumulate the transfer size - excluding chunk
// pairs that reside on the same processor.
if (op.other_proc_id != my_rank()) {
send_size_by_my_chunk_idx[op.my_chunk_idx] += op.transfer_size;
}
else {
// Make sure that op.my_chunk_idx is represented in the map.
send_size_by_my_chunk_idx[op.my_chunk_idx] += 0;
}
send_ops_by_my_chunk_idx[op.my_chunk_idx].push_back(op);
}

// Sort in descending order to prioritize large transfers.
std::vector<std::pair<int, size_t> > send_op_sizes(send_size_by_my_chunk_idx.begin(),
send_size_by_my_chunk_idx.end());
std::sort(send_op_sizes.begin(), send_op_sizes.end(),
[](const std::pair<int, size_t> &a, const std::pair<int, size_t> &b) -> bool {
return a.second > b.second;
});

// Assemble send operations.
for (const auto &size_pair : send_op_sizes) {
int my_chunk_idx = size_pair.first;
const auto& ops_vector = send_ops_by_my_chunk_idx[my_chunk_idx];
ret.send_ops.insert(std::end(ret.send_ops), std::begin(ops_vector), std::end(ops_vector));
}
return ret;
}

} // namespace

void fields::set_boundary(boundary_side b, direction d, boundary_condition cond) {
if (boundaries[b][d] != cond) {
boundaries[b][d] = cond;
Expand Down Expand Up @@ -116,10 +165,10 @@ void fields::disconnect_chunks() {
for (int i = 0; i < num_chunks * num_chunks; i++) {
delete[] comm_blocks[ft][i];
comm_blocks[ft][i] = 0;
for (int ip = 0; ip < 3; ++ip)
comm_sizes[ft][ip][i] = 0;
}
comms_sequence_for_field[ft].clear();
}
comm_sizes.clear();
}

// this should be called by any code that might set chunk_connections_valid = false,
Expand Down Expand Up @@ -309,9 +358,9 @@ bool fields_chunk::needs_W_notowned(component c) {
}

void fields::connect_the_chunks() {
size_t *nc[NUM_FIELD_TYPES][3][2];
size_t *nc[NUM_FIELD_TYPES][NUM_CONNECT_PHASE_TYPES][2];
FOR_FIELD_TYPES(f) {
for (int ip = 0; ip < 3; ip++)
for (connect_phase ip : all_connect_phases)
for (int io = 0; io < 2; io++) {
nc[f][ip][io] = new size_t[num_chunks];
for (int i = 0; i < num_chunks; i++)
Expand Down Expand Up @@ -356,14 +405,10 @@ void fields::connect_the_chunks() {
FOR_E_AND_H(c) { needs_W_notowned[c] = or_to_all(needs_W_notowned[c]); }
finished_working();

comm_sizes.clear();
for (int i = 0; i < num_chunks; i++) {
// First count the border elements...
const grid_volume vi = chunks[i]->gv;
FOR_FIELD_TYPES(ft) {
for (int ip = 0; ip < 3; ip++)
for (int j = 0; j < num_chunks; j++)
comm_sizes[ft][ip][j + i * num_chunks] = 0;
}
FOR_COMPONENTS(corig) {
if (have_component(corig)) LOOP_OVER_VOL_NOTOWNED(vi, corig, n) {
IVEC_LOOP_ILOC(vi, here);
Expand All @@ -372,10 +417,10 @@ void fields::connect_the_chunks() {
complex<double> thephase;
if (locate_component_point(&c, &here, &thephase) && !on_metal_boundary(here))
for (int j = 0; j < num_chunks; j++) {
const std::pair<int, int> pair_j_to_i{j, i};
if ((chunks[i]->is_mine() || chunks[j]->is_mine()) && chunks[j]->gv.owns(here) &&
!(is_B(corig) && is_B(c) && B_redundant[5 * i + corig - Bx] &&
B_redundant[5 * j + c - Bx])) {
const int pair = j + i * num_chunks;
const connect_phase ip = thephase == 1.0
? CONNECT_COPY
: (thephase == -1.0 ? CONNECT_NEGATE : CONNECT_PHASE);
Expand All @@ -384,14 +429,14 @@ void fields::connect_the_chunks() {
const int nn = is_real ? 1 : 2;
nc[f][ip][Incoming][i] += nn;
nc[f][ip][Outgoing][j] += nn;
comm_sizes[f][ip][pair] += nn;
comm_sizes[{f, ip, pair_j_to_i}] += nn;
}
if (needs_W_notowned[corig]) {
field_type f = is_electric(corig) ? WE_stuff : WH_stuff;
const int nn = is_real ? 1 : 2;
nc[f][ip][Incoming][i] += nn;
nc[f][ip][Outgoing][j] += nn;
comm_sizes[f][ip][pair] += nn;
comm_sizes[{f, ip, pair_j_to_i}] += nn;
}
if (is_electric(corig) || is_magnetic(corig)) {
field_type f = is_electric(corig) ? PE_stuff : PH_stuff;
Expand All @@ -411,11 +456,11 @@ void fields::connect_the_chunks() {
const size_t nn = (is_real ? 1 : 2) * (cni);
nc[f][ip][Incoming][i] += nn;
nc[f][ip][Outgoing][j] += nn;
comm_sizes[f][ip][pair] += nn;
comm_sizes[{f, ip, pair_j_to_i}] += nn;
const connect_phase iip = CONNECT_COPY;
nc[f][iip][Incoming][i] += ni;
nc[f][iip][Outgoing][j] += ni;
comm_sizes[f][iip][pair] += ni;
comm_sizes[{f, iip, pair_j_to_i}] += ni;
}
} // if is_mine and owns...
} // loop over j chunks
Expand All @@ -426,7 +471,7 @@ void fields::connect_the_chunks() {
FOR_FIELD_TYPES(ft) {
for (int j = 0; j < num_chunks; j++) {
delete[] comm_blocks[ft][j + i * num_chunks];
comm_blocks[ft][j + i * num_chunks] = new realnum[comm_size_tot(ft, j + i * num_chunks)];
comm_blocks[ft][j + i * num_chunks] = new realnum[comm_size_tot(ft, {j, i})];
}
}
} // loop over i chunks
Expand All @@ -438,15 +483,15 @@ void fields::connect_the_chunks() {
for i < i' */

// wh stores the current indices in the connections array(s)
size_t *wh[NUM_FIELD_TYPES][3][2];
size_t *wh[NUM_FIELD_TYPES][NUM_CONNECT_PHASE_TYPES][2];

/* Now allocate the connection arrays... this is still slightly
wasteful (by a factor of 2) because we allocate for chunks we
don't own if we have a connection with them. Removing this waste
would mean a bunch more is_mine() checks below. */
FOR_FIELD_TYPES(f) {
for (int ip = 0; ip < 3; ip++) {
for (int io = 0; io < 2; io++) {
for (connect_phase ip : all_connect_phases) {
for (in_or_out io : all_in_or_out) {
for (int i = 0; i < num_chunks; i++)
chunks[i]->alloc_extra_connections(field_type(f), connect_phase(ip), in_or_out(io),
nc[f][ip][io][i]);
Expand All @@ -465,11 +510,10 @@ void fields::connect_the_chunks() {

// initialize wh[f][ip][Incoming][j] to sum of comm_sizes for jj < j
FOR_FIELD_TYPES(f) {
for (int ip = 0; ip < 3; ip++) {
for (connect_phase ip : all_connect_phases) {
wh[f][ip][Incoming][0] = 0;
for (int j = 1; j < num_chunks; ++j)
wh[f][ip][Incoming][j] =
wh[f][ip][Incoming][j - 1] + comm_sizes[f][ip][(j - 1) + i * num_chunks];
wh[f][ip][Incoming][j] = wh[f][ip][Incoming][j - 1] + get_comm_size({f, ip, {j - 1, i}});
}
}

Expand Down Expand Up @@ -555,10 +599,51 @@ void fields::connect_the_chunks() {
} // LOOP_OVER_VOL_NOTOWNED
} // FOR_COMPONENTS
} // loop over i chunks

FOR_FIELD_TYPES(f) {
for (int ip = 0; ip < 3; ip++)
for (connect_phase ip : all_connect_phases)
for (int io = 0; io < 2; io++)
delete[] wh[f][ip][io];

// Calculate the sequence of sends and receives in advance.
// Initiate receive operations as early as possible.
std::unique_ptr<comms_manager> manager = create_comms_manager();
std::vector<comms_operation> operations;
std::vector<int> tagto(count_processors());

for (int j = 0; j < num_chunks; j++) {
for (int i = 0; i < num_chunks; i++) {
const chunk_pair pair{j, i};
const size_t comm_size = comm_size_tot(f, pair);
if (!comm_size) continue;
if (comm_size > manager->max_transfer_size()) {
// MPI uses int for size to send/recv
meep::abort("communications size too big for the current implementation");
}
const int pair_idx = j + i * num_chunks;

if (chunks[j]->is_mine()) {
operations.push_back(comms_operation{/*my_chunk_idx=*/j,
/*other_chunk_idx=*/i,
/*other_proc_id=*/chunks[i]->n_proc(),
/*pair_idx=*/pair_idx,
/*transfer_size=*/comm_size,
/*comm_direction=*/Outgoing,
/*tag=*/tagto[chunks[i]->n_proc()]++});
}
if (chunks[i]->is_mine()) {
operations.push_back(comms_operation{/*my_chunk_idx=*/i,
/*other_chunk_idx=*/j,
/*other_proc_id=*/chunks[j]->n_proc(),
/*pair_idx=*/pair_idx,
/*transfer_size=*/comm_size,
/*comm_direction=*/Incoming,
/*tag=*/tagto[chunks[j]->n_proc()]++});
}
}
}

comms_sequence_for_field[f] = optimize_comms_operations(operations);
}
delete[] B_redundant;
}
Expand Down
16 changes: 4 additions & 12 deletions src/fields.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ fields::fields(structure *s, double m, double beta, bool zero_fields_near_cylori
for (int i = 0; i < num_chunks; i++)
chunks[i] = new fields_chunk(s->chunks[i], outdir, m, beta, zero_fields_near_cylorigin, i);
FOR_FIELD_TYPES(ft) {
for (int ip = 0; ip < 3; ip++) {
comm_sizes[ft][ip] = new size_t[num_chunks * num_chunks];
for (int i = 0; i < num_chunks * num_chunks; i++)
comm_sizes[ft][ip][i] = 0;
}
typedef realnum *realnum_ptr;
comm_blocks[ft] = new realnum_ptr[num_chunks * num_chunks];
for (int i = 0; i < num_chunks * num_chunks; i++)
Expand Down Expand Up @@ -116,11 +111,6 @@ fields::fields(const fields &thef)
for (int i = 0; i < num_chunks; i++)
chunks[i] = new fields_chunk(*thef.chunks[i], i);
FOR_FIELD_TYPES(ft) {
for (int ip = 0; ip < 3; ip++) {
comm_sizes[ft][ip] = new size_t[num_chunks * num_chunks];
for (int i = 0; i < num_chunks * num_chunks; i++)
comm_sizes[ft][ip][i] = 0;
}
typedef realnum *realnum_ptr;
comm_blocks[ft] = new realnum_ptr[num_chunks * num_chunks];
for (int i = 0; i < num_chunks * num_chunks; i++)
Expand All @@ -140,8 +130,6 @@ fields::~fields() {
for (int i = 0; i < num_chunks * num_chunks; i++)
delete[] comm_blocks[ft][i];
delete[] comm_blocks[ft];
for (int ip = 0; ip < 3; ip++)
delete[] comm_sizes[ft][ip];
}
delete sources;
delete fluxes;
Expand Down Expand Up @@ -772,4 +760,8 @@ double linear_interpolate(double rx, double ry, double rz, double *data,
#undef D
}

bool operator==(const comms_key &lhs, const comms_key &rhs) {
return (lhs.ft == rhs.ft) && (lhs.phase == rhs.phase) && (lhs.pair == rhs.pair);
}

} // namespace meep
Loading