Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using MPI across multiple sims #1178

Closed
smartalecH opened this issue Apr 13, 2020 · 17 comments · Fixed by #1192
Closed

Using MPI across multiple sims #1178

smartalecH opened this issue Apr 13, 2020 · 17 comments · Fixed by #1192

Comments

@smartalecH
Copy link
Collaborator

Typically if I have a python script with one simulation object, I can easily parallelize it using mpi:

mpirun -np 16 python3 my_script.py

Now let's say my script has four simulation objects and I want to run all of them in parallel and each with multiple processes from the same script. Using the example above with 16 allocated processes, is there a clever way to assign 4 processes to each of my simulation objects that all run concurrently (and only communicate between "subgroups")?

This would be especially useful for multiobjective adjoint optimization.

I realize I could do this with some clever bash scripting but it would be nice to keep everything "in the loop" of a single python script.

@stevengj
Copy link
Collaborator

Yup, this was already implemented 11 years ago in 81dcccb

@stevengj
Copy link
Collaborator

stevengj commented Apr 15, 2020

Call

n = meep.divide_parallel_processes(N)

to divide the MPI processes into N subgroups, and returns the index n of the current group, which can be used to decide which simulation to run.

That is, you have one run script, and the script only creates one simulation object (typically) — depending on the value of n that it receives, it will create a different simulation object (i.e. different parameters).

@stevengj
Copy link
Collaborator

And if you need to do some global communications among all processes, just do

meep.begin_global_communications()
# ...do stuff...
meep.end_global_communications()

@stevengj
Copy link
Collaborator

Note that the statement in https://meep.readthedocs.io/en/latest/Parallel_Meep/#different-forms-of-parallelization that "Meep provides no explicit support for this mode of operation" is actually untrue.

@stevengj
Copy link
Collaborator

For example, if you want to synchronize an array between all the processes, not just a subgroup, you could have each process allocate an array to store the results, initialized to zero, write its portion of the data into the array, and then call

meep.begin_global_communications()
meep.sum_to_all(input_array, summed_array)
meep.end_global_communications()

However, for this to work we will also need to add SWIG typemaps for the sum_to_all method that takes array arguments, since I don't think we have a high-level wrapper for this that works with Python arrays yet.

@oskooi
Copy link
Collaborator

oskooi commented Apr 18, 2020

The divide_parallel_processes feature does not seem to be working. The following is a simple test to compute the total flux from a point source at a single frequency. The frequency is one of two values (1.0 or 0.5) determined by the subgroup index (either 0 or 1 for a splitting into two groups). The rest of the Simulation parameters are identical. However, regardless of how many MPI processes the simulation is launched with, only the frequency 1.0 seems to be computed.

script

import meep as mp

resolution = 20

sxy = 4
dpml = 1
cell = mp.Vector3(sxy+2*dpml,sxy+2*dpml,0)

pml_layers = [mp.PML(dpml)]

n = mp.divide_parallel_processes(2)
fcen = 1.0 if n == 0 else 0.5

sources = [mp.Source(src=mp.GaussianSource(fcen,fwidth=0.2*fcen),
                     center=mp.Vector3(),
                     component=mp.Ez)]

symmetries = [mp.Mirror(mp.X),
              mp.Mirror(mp.Y)]

sim = mp.Simulation(cell_size=cell,
                    resolution=resolution,
                    sources=sources,
                    symmetries=symmetries,
                    boundary_layers=pml_layers)

flux_box = sim.add_flux(fcen, 0, 1,
                        mp.FluxRegion(mp.Vector3(y=0.5*sxy), size=mp.Vector3(sxy)),
                        mp.FluxRegion(mp.Vector3(y=-0.5*sxy), size=mp.Vector3(sxy), weight=-1),
                        mp.FluxRegion(mp.Vector3(0.5*sxy), size=mp.Vector3(y=sxy)),
                        mp.FluxRegion(mp.Vector3(-0.5*sxy), size=mp.Vector3(y=sxy), weight=-1))

sim.run(until_after_sources=mp.stop_when_fields_decayed(50, mp.Ez, mp.Vector3(), 1e-6))

tot_flux = mp.get_fluxes(flux_box)[0]
print("flux:, {}, {:.4f}, {:.6f}".format(n,fcen,tot_flux))

output

$ mpirun -n 4 python split.py
Using MPI version 3.1, 4 processes
-----------
Initializing structure...
Halving computational cell along direction x
Halving computational cell along direction y
Splitting into 2 chunks evenly
time for choose_chunkdivision = 0.000274579 s
Working in 2D dimensions.
Computational cell is 6 x 6 x 0 with resolution 20
time for set_epsilon = 0.00403629 s
-----------
field decay(t = 50.025000000000006): 10.992590364782105 / 10.992590364782105 = 1.0
field decay(t = 100.05000000000001): 4.080185867370076e-10 / 10.992590364782105 = 3.711760132936559e-11
run 0 finished at t = 100.05000000000001 (4002 timesteps)
flux:, 0, 1.0000, 9.862873

There is no indication that results for the other frequency (0.5) belonging to the second subgroup with index 1 was computed.

@smartalecH
Copy link
Collaborator Author

Doesn't the print function only operate for the master process now?

I think a better way to test is with a sum_to_all. I'm still working on a typemap suite.

@stevengj
Copy link
Collaborator

stevengj commented Apr 18, 2020

print only works from the "real" master process (process 0).

So you'll need to do some global communications to send the results back to the master process.

@stevengj
Copy link
Collaborator

stevengj commented Apr 18, 2020

Note that you can do global communications directly with mpi4py using MPI.COMM_WORLD … no actual need for sum_to_all and begin_global_communications.

mpi4py has direct support for sending numpy arrays, for example.

@oskooi
Copy link
Collaborator

oskooi commented Apr 19, 2020

Using mpi4py for the global communications does indeed reveal that divide_parallel_processes is working as expected. The end of the example from above is modified to send/receive the flux values from two processes:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

tot_fluxes = [0,0]
if rank == 0:
    comm.send(tot_flux, dest=1, tag=11)
    tot_fluxes[0] = tot_flux
    tot_fluxes[1] = comm.recv(source=1, tag=11)
elif rank == 1:
    comm.send(tot_flux, dest=0, tag=11)
    tot_fluxes[1] = tot_flux
    tot_fluxes[0] = comm.recv(source=0, tag=11)

print(tot_fluxes)

The output for mpirun -n 2 python split.py correctly shows two different flux values (corresponding to the two different frequencies) stored in the list:

[9.862872853382488, 19.653727538698696]

It would be nice to have a high-level interface for storing/printing the output from the different subgroups rather than have the user call the low-level send and recv MPI commands.

@stevengj
Copy link
Collaborator

stevengj commented Apr 22, 2020

Might be nice to have a function

merged = merge_subgroup_data(data)

that takes a numpy array data from every process, which is expected to be identical across each subgroup, and then returns an array merged which is just the concatenated data from each subgroup.

Under the hood, it would call MPI_Alltoallv on every process (COMM_WORLD), but the inputs from every process would be an empty array (sendcount=0) except on process 0 of each subgroup, which would send data.

@mawc2019
Copy link
Contributor

Hi @smartalecH , I want to make sure that I am using divide_parallel_processes and merge_subgroup_data correctly when doing optimization. Could you take a look at the code below? You may only look at the few lines that are relevant to the usage. Thanks!

import meep as mp
import meep.adjoint as mpa
import numpy as np
import cmath,math
from autograd import numpy as npa
from autograd import grad,jacobian,tensor_jacobian_product
import nlopt
import os
Si = mp.Medium(index=3.48)
SiO2 = mp.Medium(index=1.45)
Air = mp.Medium(index=1)

num_groups = 3
n = mp.divide_parallel_processes(num_groups)

resolution,design_resolution = 40,40

design_region_width,design_region_height = 0.8,0.4
Lx,Ly = design_region_width+0.2,4
dpml = 1.5
pml_layers = [mp.PML(dpml, direction=mp.Y)]
cell_size = mp.Vector3(Lx,Ly+2*dpml)
monitor_height,monitor_width = design_region_height/2,0.1

minimum_length = 0.08 # minimum length scale (microns)
eta_i = 0.5 # blueprint (or intermediate) design field thresholding point (between 0 and 1)
eta_e = 0.55 # erosion design field thresholding point (between 0 and 1)
eta_d = 1-eta_e # dilation design field thresholding point (between 0 and 1)
filter_radius = mpa.get_conic_radius_from_eta_e(minimum_length,eta_e)
print("Minimum length: ",minimum_length,". Filter radius: ",filter_radius)

fcen,fwidth = 1/1.55,0.2/1.55
theta_in = np.radians(90*(num_groups-n)/num_groups) # different angles of incidence
k = mp.Vector3(fcen*1.45).rotate(mp.Vector3(z=1), theta_in)

def pw_amp(k,x0): # amplitude function
  def _pw_amp(x):
    return cmath.exp(1j*2*math.pi*k.dot(x+x0))
  return _pw_amp

source_center,source_size  = mp.Vector3(0,-Ly/4),mp.Vector3(Lx,0)
src = mp.GaussianSource(frequency=fcen,fwidth=fwidth)
source = [mp.Source(src,component=mp.Ez,center=source_center,size=source_size,amp_func=pw_amp(k,source_center))]

Nx,Ny = int(round(design_resolution*design_region_width)),1

design_variables = mp.MaterialGrid(mp.Vector3(Nx,Ny),Air,Si,grid_type='U_MEAN')
design_region = mpa.DesignRegion(design_variables,volume=mp.Volume(center=mp.Vector3(0,design_region_height/2),size=mp.Vector3(design_region_width,design_region_height)))

geometry = [mp.Block(center=mp.Vector3(0,-Ly/4-dpml/2), material=SiO2, size=mp.Vector3(Lx,Ly/2+dpml, 0)), # subtrate
    mp.Block(center=design_region.center, size=design_region.size, material=design_variables) # design region
]

sim = mp.Simulation(cell_size=cell_size,boundary_layers=pml_layers,geometry=geometry,sources=source,k_point=k,resolution=resolution)
nf = mpa.FourierFields(sim,mp.Volume(center=mp.Vector3(0,monitor_height),size=mp.Vector3(monitor_width,0)),mp.Ez,yee_grid=True)
ob_list = [nf]

def J(near_field):
    near_field = near_field.flatten()
    intensity = npa.abs(near_field)**2
    return npa.sum(intensity)/len(near_field) # average intensity

def mapping(x,eta,beta):
    filtered_field = mpa.conic_filter(x,filter_radius*design_resolution,design_region_width,Ny/design_resolution,design_resolution)
    projected_field = mpa.tanh_projection(filtered_field,beta,eta) # projection
    return projected_field.flatten() # interpolate to actual materials

def f(x, grad):
    t = x[0] # "dummy" parameter
    v = x[1:] # design parameters
    if grad.size > 0:
        grad[0] = 1
        grad[1:] = 0
    return t

opt = mpa.OptimizationProblem(
    simulation = sim,
    objective_functions = J,
    objective_arguments = ob_list,
    design_regions = [design_region],
    fcen=fcen,
    df = 0,
    nf = 1,
    decay_by = 1e-5,
    decay_fields=[mp.Ez]
)

def c_minmax(result,x,gradient,eta,beta):
    
    t = x[0] # auxiliary variable
    v = x[1:] # design parameters

    f0, dJ_single = opt([mapping(v,eta,beta)])
    f_arr = mp.merge_subgroup_data(f0)
    dJ_du = mp.merge_subgroup_data(dJ_single)
    
    # Backprop the gradients through our mapping function
    my_grad = np.zeros(dJ_du.shape)
    for k in range(num_groups): 
        my_grad[:,k] = -dJ_du[:,k]
    
    # Assign gradients
    if gradient.size > 0:
        gradient[:,0] = 1 # gradient w.r.t. "t"
        gradient[:,1:] = my_grad.T # gradient w.r.t. each frequency objective
    
    result[:] = t-np.real(f_arr)
    print("Auxiliary variable: ",t)

# Initial guess
x = 0.5*np.ones(Nx*Ny)

# lower and upper bounds
lb = np.zeros((Nx*Ny,))
ub = np.ones((Nx*Ny,))

# insert the auxiliary variable
x = np.insert(x,0,1)
lb = np.insert(lb,0,0)
ub = np.insert(ub,0,np.inf)


cur_beta,beta_scale = 2,1.2
num_betas = 100
update_factor = 10
algorithm = nlopt.LD_MMA

for iters in range(num_betas):
    solver = nlopt.opt(algorithm, Nx*Ny+1)
    solver.set_lower_bounds(lb)
    solver.set_upper_bounds(ub)
    solver.set_max_objective(f)  
    solver.set_maxeval(update_factor)
    solver.set_xtol_rel(1e-8)
    solver.add_inequality_mconstraint(lambda r,x,g: c_minmax(r,x,g,eta_i,cur_beta), np.array([1e-8]*num_groups))
    x[:] = solver.optimize(x)
    print("Current β: ",cur_beta)
    cur_beta = cur_beta*beta_scale

@smartalecH
Copy link
Collaborator Author

am using divide_parallel_processes and merge_subgroup_data correctly when doing optimization

Looks good to me! I'm assuming you aren't getting the results you are hoping for?

@mawc2019
Copy link
Contributor

mawc2019 commented Jul 2, 2021

Thank you! I got errors when running my optimization tasks using divide_parallel_processes and merge_subgroup_data. I think one of the reasons may be that I was using the branch loop_in_chunks_fixes. This branch worked well for some of my previous optimization tasks but still has bugs. When I stop using this branch, some errors disappear. Although some other errors remain, they are irrelevant to the issue here.

@ReDoDx09
Copy link

ReDoDx09 commented Jul 8, 2021

@smartalecH could you please tell how to choose correctly the number of processes np
in case where I'im working with cluster ?
lets suppose I have :
nodes=N
cpus-per-task=M
in this case what is the max value of the number of processes np ?
Thank you

@stevengj
Copy link
Collaborator

stevengj commented Jul 8, 2021

If you have N nodes and M cpus per node, then normally you would want to choose at most NM processes.

@ReDoDx09
Copy link

ReDoDx09 commented Jul 9, 2021

@stevengj @smartalecH
i would be so grateful if one of you can help me to translate this part of code from C++ to python
Thank you
#include <meep.hpp>
using namespace meep;
int main(int argc, char **argv) {
initialize mpi(argc, argv); // do this even for non-MPI Meep
double resolution = 20; // pixels per distance
grid_volume v = vol2d(5, 10, resolution); // 5x10 2d cell
structure s(v, eps, pml(1.0));
fields f(&s);

f.output_hdf5(Dielectric, v.surroundings());

double freq = 0.3, fwidth = 0.1;
gaussian_src_time src(freq, fwidth);
f.add_point_source(Ey, src, vec(1.1, 2.3));
while (f.time() < f.last_source_time()) {
f.step();
}

f.output_hdf5(Hz, v.surroundings());

return 0;
}
double eps(const vec &p) {
if (p.x() < 2 && p.y() < 3)
return 12.0;
return 1.0;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants