diff --git a/AUTHORS b/AUTHORS index f5dd39547..87d2d7607 100644 --- a/AUTHORS +++ b/AUTHORS @@ -14,4 +14,5 @@ Alec Hammond Yidong Chong Robin Dunn Ian Williamson -Andreas Hoenselaar \ No newline at end of file +Andreas Hoenselaar +Ben Bartlett diff --git a/doc/docs/Parallel_Meep.md b/doc/docs/Parallel_Meep.md index 6dcda9ac5..d430ad9ac 100644 --- a/doc/docs/Parallel_Meep.md +++ b/doc/docs/Parallel_Meep.md @@ -169,3 +169,139 @@ See also [FAQ/Should I expect linear speedup from the parallel Meep](FAQ.md#shou
![](images/parallel_benchmark_DFT.png)
+ + +Dynamic Chunk Balancing +----------------------- + +Since Meep's computation time is ultimately bottlenecked by the slowest-running process, it is desirable to load-balance the chunk layout such that each compute node is given an equal workload. By default, Meep uses a heuristics-based scheme to estimate the cost of a computation region based on the composition of voxel types (anisotropic, PML, etc.) However, this method of estimating the simulation time for a chunk in advance is not always accurate, a problem which is especially true for simulations run on shared-resource clusters. Meep's `chunk_balancer` module allows for a more empirical, data-driven approach for dynamically load-balancing parallel simulations. This approach uses the simulation time per node to adaptively modify the chunk layout and it implicitly corrects for heterogeneity over the simulation region and variability in background loads and job priority on shared compute resources. This approach can be especially useful for cases such as adjoint optimization, where slight variations on the same simulation are run many times over many iterations. + +
+![](images/adaptive_chunk_layout.gif) +
+ +### Chunk balancer interface + +The chunk balancer interface provides four main methods: +- `make_initial_chunk_layout()` generates the initial chunk layout for the first iteration of a simulation. By default, `None` is returned, indicating Meep should use its default chunk partitioning strategy. +- `should_adjust_chunk_layout()` decides whether the current layout is sufficiently poorly balanced to justify the up-front cost of reallocating the field arrays when changing chunk layouts. +- `compute_new_chunk_layout()` looks at the current chunk layout and per-process timing data to compute a new chunk layout which has chunk sizes adjusted to improve the load balance across compute nodes. +- `adjust_chunk_layout()` is syntactic sugar which will compute a new chunk layout, apply it to the simulation object, reset the simulation, and re-initialize the simulation. + +```py +class AbstractChunkBalancer(abc.ABC): + """Chunk balancer for dynamically load-balanced chunk layouts.""" + + def make_initial_chunk_layout(self, sim) -> mp.BinaryPartition: + """Generates an initial chunk layout for simulation.""" + + def should_adjust_chunk_layout(self, sim) -> bool: + """Is current layout imbalanced enough to justify rebuilding sim?""" + + @abc.abstractmethod + def compute_new_chunk_layout( + self, + timing_measurements: MeepTimingMeasurements, + old_chunk_layout: mp.BinaryPartition, + chunk_volumes: Tuple[mp.grid_volume], + chunk_owners: np.ndarray) -> mp.BinaryPartition: + """Rebalance chunks to equalize simulation time for each node.""" + + def adjust_chunk_layout(self, sim, **kwargs) -> None: + """Computes a new chunk layout and applies it to sim.""" +``` + +### Usage + +Using the chunk balancer is very straightforward, and it can typically be integrated into existing Meep simulations with only a few lines of code. Here is a simple example: + +```py +from meep.chunk_balancer import ChunkBalancer + +chunk_balancer = ChunkBalancer() + +# Compute an initial chunk layout +initial_chunk_layout = chunk_balancer.make_initial_chunk_layout() + +sim = mp.Simulation(..., chunk_layout=initial_chunk_layout) +sim.init_sim() + +for iteration in range(epochs): + sim.run(...) + # Adjust the chunk layout for the next iteration if needed + chunk_balancer.adjust_chunk_layout(sim, sensitivity=0.4) +``` + +Chunks can also be rebalanced between runs of a program by dumping and loading the chunk layout from a pickled object. For example: + +```py +import meep as mp +from meep.chunk_balancer import ChunkBalancer +from meep.timing_measurements import MeepTimingMeasurements +import pickle +import os.path + +# Fetch chunk layout from a previous run if it exists +if os.path.exists("path/to/chunk_layout.pkl"): + chunk_layout = pickle.load(open("path/to/chunk_layout.pkl", "rb")) +else: + chunk_layout = None + +sim = mp.Simulation(..., chunk_layout=chunk_layout) +sim.init_sim() +sim.run(...) + +# Compute and save chunk layout for next run +timings = MeepTimingMeasurements.new_from_simulation(sim) +chunk_volumes = sim.structure.get_chunk_volumes() +chunk_owners = sim.structure.get_chunk_owners() +next_chunk_layout = ChunkBalancer().compute_new_chunk_layout( + timings, + chunk_layout, + chunk_volumes, + chunk_owners, + sensitivity=0.4) + +# Save chunk layout for next run +with open("path/to/chunk_layout.pkl", "wb") as f: + pickle.dump(next_chunk_layout, f) +``` + +### Chunk adjustment algorithm + +The default chunk adjustment algorithm recursively traverses the `BinaryPartition` object and resizes the chunk volumes of the left and right children of each node to have an equal per-node simulation time. The new chunk layout has split positions which are a weighted average of the previous iteration's chunk layout and the newly computed layout, and the `sensitivity` parameter adjusts how fast the chunk sizes are adjusted. (`sensitivity=0.0` means no adjustment, `sensitivity=1.0` means an immediate snap to the predicted split positions, and `sensitivity=0.5` is the average of the old and new layouts.) The adjustment algorithm is summarized in pseudocode below: + +``` +def adjust_split_pos(node): + for subtree in {node.left, node.right}: + V := Σ volume for nodes in subtree + t := Σ sim time for nodes in subtree + n := number of processes in subtree + l := t / n # average load per process + + Vₗ ↦ Vₗ / lₗ + Vᵣ ↦ Vᵣ / lᵣ + + split_pos’ := dₘᵢₙ + (dₘₐₓ - dₘᵢₙ) * (Vₗ) / (Vₗ + Vᵣ) + + # Adjust with sensitivity parameter + node.split_pos ↦ s * split_pos’ + (1-s) * node.split_pos + + # Recurse through rest of the tree + adjust_split_pos(node.left) + adjust_split_pos(node.right) +``` + +### Load balancing results on shared clusters + +The following benchmarking results show load-balancing improvements from a parallel run on a shared compute cluster in a datacenter. The per-core working times (blue and orange) start of initially poorly balanced using the default `split_by_cost` scheme, but the load balancing improves over successive iterations. + +
+![](images/chunk_balancer_timing_stats.gif) +
+ +Using the normalized standard deviation in simulation times per iteration as a proxy for load-balancing efficacy, we can see that the load balance improves over a large number of runs with varying numbers of processes: + +
+![](images/chunk_balancer_variance.jpg) +
\ No newline at end of file diff --git a/doc/docs/images/adaptive_chunk_layout.gif b/doc/docs/images/adaptive_chunk_layout.gif new file mode 100644 index 000000000..4c8626b45 Binary files /dev/null and b/doc/docs/images/adaptive_chunk_layout.gif differ diff --git a/doc/docs/images/chunk_balancer_timing_stats.gif b/doc/docs/images/chunk_balancer_timing_stats.gif new file mode 100644 index 000000000..039e8f6e8 Binary files /dev/null and b/doc/docs/images/chunk_balancer_timing_stats.gif differ diff --git a/doc/docs/images/chunk_balancer_variance.jpg b/doc/docs/images/chunk_balancer_variance.jpg new file mode 100644 index 000000000..1b88280c3 Binary files /dev/null and b/doc/docs/images/chunk_balancer_variance.jpg differ diff --git a/python/Makefile.am b/python/Makefile.am index d003aad9c..b75f53d6f 100644 --- a/python/Makefile.am +++ b/python/Makefile.am @@ -43,9 +43,11 @@ TESTS = \ $(TEST_DIR)/test_antenna_radiation.py \ $(TEST_DIR)/test_array_metadata.py \ $(TEST_DIR)/test_bend_flux.py \ + $(TEST_DIR)/test_binary_partition_utils.py \ $(BINARY_GRATING_TEST) \ $(TEST_DIR)/test_cavity_arrayslice.py \ $(TEST_DIR)/test_cavity_farfield.py \ + $(TEST_DIR)/test_chunk_balancer.py \ $(TEST_DIR)/test_chunk_layout.py \ $(TEST_DIR)/test_chunks.py \ $(TEST_DIR)/test_cyl_ellipsoid.py \ @@ -85,6 +87,7 @@ TESTS = \ $(TEST_DIR)/test_simulation.py \ $(TEST_DIR)/test_special_kz.py \ $(TEST_DIR)/test_source.py \ + $(TEST_DIR)/test_timing_measurements.py \ $(TEST_DIR)/test_user_defined_material.py \ $(TEST_DIR)/test_visualization.py \ $(WVG_SRC_TEST) @@ -205,9 +208,12 @@ endif # MAINTAINER_MODE # specification of python source files to be byte-compiled at installation ###################################################################### HL_IFACE = \ + $(srcdir)/binary_partition_utils.py \ + $(srcdir)/chunk_balancer.py \ $(srcdir)/geom.py \ $(srcdir)/simulation.py \ $(srcdir)/source.py \ + $(srcdir)/timing_measurements.py \ $(srcdir)/visualization.py \ $(srcdir)/materials.py \ $(srcdir)/verbosity_mgr.py diff --git a/python/binary_partition_utils.py b/python/binary_partition_utils.py new file mode 100644 index 000000000..c37a5d07f --- /dev/null +++ b/python/binary_partition_utils.py @@ -0,0 +1,299 @@ +from typing import Dict, Generator, List, Tuple +import warnings + +import meep as mp +import numpy as onp + + +def is_leaf_node(partition: mp.BinaryPartition) -> bool: + """Returns True if the partition has no children. + + Args: + partition: the BinaryPartition node + + Returns: + A boolean indicating whether partition is a leaf node. + """ + return partition.left is None and partition.right is None + + +def enumerate_leaf_nodes( + partition: mp.BinaryPartition) -> Generator[mp.BinaryPartition, None, None]: + """Enumerates all leaf nodes of a partition. + + Args: + partition: the BinaryPartition node + + Yields: + The leaf nodes contained within the partition. + """ + if is_leaf_node(partition): + yield partition + else: + for child in enumerate_leaf_nodes(partition.left): + yield child + for child in enumerate_leaf_nodes(partition.right): + yield child + + +def partition_has_duplicate_proc_ids(partition: mp.BinaryPartition) -> bool: + """Returns True if the partition has more than one node with the same proc_id. + + Args: + partition: the BinaryPartition node + + Returns: + A boolean indicating if the partition contains duplicate proc_ids. + """ + proc_ids = [node.proc_id for node in enumerate_leaf_nodes(partition)] + return len(set(proc_ids)) != len(proc_ids) + + +def get_total_weight(partition: mp.BinaryPartition, + weights_by_proc_id: List[float]) -> float: + """Computes the total weights contained within a BinaryPartition subtree. + + Args: + partition: a BinaryPartition subtree to compute the associated weights for + weights_by_proc_id: a list of weights associated with each proc_id + + Returns: + The sum of all weights for each proc_id encountered in the subtree. + + Raises: + ValueError: if sim.chunk_layout includes nodes with duplicate proc_ids + """ + if partition_has_duplicate_proc_ids(partition): + raise ValueError('Duplicate proc_ids found in chunk_layout!') + if partition.proc_id is not None: + return weights_by_proc_id[partition.proc_id] + elif partition.left is not None and partition.right is not None: + left_weight = get_total_weight(partition.left, weights_by_proc_id) + right_weight = get_total_weight(partition.right, weights_by_proc_id) + return left_weight + right_weight + else: + raise ValueError('Partition missing proc_id or left, right attributes!') + + +def get_left_right_total_weights( + partition: mp.BinaryPartition, + weights_by_proc_id: List[float]) -> Tuple[float, float]: + """Computes the total weights contained in left and right subtrees. + + Args: + partition: a BinaryPartition tree to compute the associated weights for + weights_by_proc_id: a list of weights associated with each proc_id + + Returns: + The sum of weights for each proc_id encountered in the left and right + subtrees. + + Raises: + ValueError: if the BinaryPartition is a leaf node or improperly formatted. + """ + if partition.left is not None and partition.right is not None: + left_weight = get_total_weight(partition.left, weights_by_proc_id) + right_weight = get_total_weight(partition.right, weights_by_proc_id) + return (left_weight, right_weight) + else: + raise ValueError('Partition missing left, right attributes!') + + +def pixel_volume(grid_volume: mp.grid_volume) -> int: + """Computes the number of pixels contained in a 2D or 3D grid_volume object. + + NOTE: This assumes that z=0 means 2D simulation and z>0 means 3D. + + Args: + grid_volume: a meep grid_volume object + + Returns: + The 2D or 3D number of pixels in the grid_volume. + """ + if grid_volume.nz() > 0: # we're in a 3D simulation + return grid_volume.nx() * grid_volume.ny() * grid_volume.nz() + else: # 2D simulation + return grid_volume.nx() * grid_volume.ny() + + +def get_total_volume(partition: mp.BinaryPartition, + chunk_volumes: Tuple[mp.grid_volume], + chunk_owners: onp.ndarray) -> float: + """Computes the total pixel volume in a subtree from associated chunk volumes. + + NOTE: If multiple chunks are owned by the same process, this function may + over-count the volume, since all provided grid volumes associated with a + given process are counted. + + Args: + partition: a BinaryPartition subtree to compute the associated volumes for + chunk_volumes: associated grid volumes from a simulation, obtained by + calling sim.structure.get_chunk_volumes() + chunk_owners: list of processes associated with each grid volume, obtained + by calling sim.structure.get_chunk_owners() + + Returns: + The total pixel volume occupied by all chunks owned by the partition. + """ + my_grid_volumes = get_grid_volumes_in_tree(partition, chunk_volumes, + chunk_owners) + return sum(pixel_volume(vol) for vol in my_grid_volumes) + + +def get_left_right_total_volumes( + partition: mp.BinaryPartition, chunk_volumes: Tuple[mp.grid_volume], + chunk_owners: onp.ndarray) -> Tuple[float, float]: + """Computes the total pixel volume in left and right subtrees. + + Args: + partition: a BinaryPartition subtree to compute the associated volumes for + chunk_volumes: associated grid volumes from a simulation, obtained by + calling sim.structure.get_chunk_volumes() + chunk_owners: list of processes associated with each grid volume, obtained + by calling sim.structure.get_chunk_owners() + + Returns: + A tuple for left and right subtreees, where each entry is a list of the + total pixel volume occupied by all chunks owned by each process. + + Raises: + ValueError: if the BinaryPartition is a leaf node or improperly formatted. + """ + if partition.left is not None and partition.right is not None: + left_volume = get_total_volume(partition.left, chunk_volumes, chunk_owners) + right_volume = get_total_volume(partition.right, chunk_volumes, + chunk_owners) + return (left_volume, right_volume) + else: + raise ValueError('Partition missing left, right attributes!') + + +def get_grid_volumes_in_tree(partition: mp.BinaryPartition, + chunk_volumes: Tuple[mp.grid_volume], + chunk_owners: onp.ndarray) -> List[mp.grid_volume]: + """Fetches a list of grid_volumes contained in a BinaryPartition subtree. + + NOTE: If multiple chunks are owned by the same process, this function may + over-count the volume, since all provided grid volumes associated with a + given process are counted. + + Args: + partition: a BinaryPartition subtree to find grid_volumes for + chunk_volumes: associated grid volumes from a simulation, obtained by + calling sim.structure.get_chunk_volumes() + chunk_owners: list of processes associated with each grid volume, obtained + by calling sim.structure.get_chunk_owners() + + Returns: + A list of all grid_volume objects associated with any proc_id contained in + the partition. The list is not necessarily ordered by proc_id values. + """ + if partition_has_duplicate_proc_ids(partition): + warnings.warn('Partition has duplicate proc_ids, overcounting possible!') + + my_proc_ids = [node.proc_id for node in enumerate_leaf_nodes(partition)] + + return [ + chunk_volumes[i] + for (i, owner) in enumerate(chunk_owners) + if owner in my_proc_ids + ] + + +def get_total_volume_per_process(partition: mp.BinaryPartition, + chunk_volumes: Tuple[mp.grid_volume], + chunk_owners: onp.ndarray) -> Dict[int, float]: + """Computes the total pixel volume per process contained in a BinaryPartition. + + Args: + partition: a BinaryPartition subtree to compute the associated volumes for + chunk_volumes: associated grid volumes from a simulation, obtained by + calling sim.structure.get_chunk_volumes() + chunk_owners: list of processes associated with each grid volume, obtained + by calling sim.structure.get_chunk_owners() + + Returns: + A dictionary of the total pixel volume occupied by all chunks owned by each + process. + """ + volumes_per_process = {} + leaf_nodes_in_tree = enumerate_leaf_nodes(partition) + for leaf in leaf_nodes_in_tree: + total_volume = 0 + for i, owner in enumerate(chunk_owners): + if owner == leaf.proc_id: + total_volume += pixel_volume(chunk_volumes[i]) + volumes_per_process[leaf.proc_id] = total_volume + return volumes_per_process + + +def get_box_ranges( + partition: mp.BinaryPartition, chunk_volumes: Tuple[mp.grid_volume], + chunk_owners: onp.ndarray +) -> Tuple[float, float, float, float, float, float]: + """Gets the max and min x, y, z dimensions spanned by a partition. + + Args: + partition: a BinaryPartition subtree to compute the range for + chunk_volumes: associated grid volumes from a simulation, obtained by + calling sim.structure.get_chunk_volumes() + chunk_owners: list of processes associated with each grid volume, obtained + by calling sim.structure.get_chunk_owners() + + Returns: + A 6-tuple which enumerates: + ( + xmin: the min x value of any grid_volume associated with the partition, + xmax: the max x value of any grid_volume associated with the partition, + ymin: the min y value of any grid_volume associated with the partition, + ymax: the max y value of any grid_volume associated with the partition, + zmin: the min z value of any grid_volume associated with the partition, + zmax: the max z value of any grid_volume associated with the partition + ) + """ + xmins = [] + xmaxs = [] + ymins = [] + ymaxs = [] + zmins = [] + zmaxs = [] + for leaf in enumerate_leaf_nodes(partition): + # generate a list of all volumes owned by the process + owned_volumes = [ + chunk_volumes[i] + for (i, owner) in enumerate(chunk_owners) + if owner == leaf.proc_id + ] + # add the corners to the lists + for vol in owned_volumes: + xmins.append(vol.surroundings().get_min_corner().x()) + ymins.append(vol.surroundings().get_min_corner().y()) + zmins.append(vol.surroundings().get_min_corner().z()) + xmaxs.append(vol.surroundings().get_max_corner().x()) + ymaxs.append(vol.surroundings().get_max_corner().y()) + zmaxs.append(vol.surroundings().get_max_corner().z()) + return (min(xmins), max(xmaxs), min(ymins), max(ymaxs), min(zmins), + max(zmaxs)) + + +def partitions_are_equal(bp1: mp.BinaryPartition, + bp2: mp.BinaryPartition) -> bool: + """Determines if two partitions have all nodes with identical attributes. + + Args: + bp1: a BinaryPartition object to compare equality for + bp2: the other BinaryPartition object to compare equality for + + Returns: + A bool if all nodes in the partitions have equal attributes + """ + if is_leaf_node(bp1) and is_leaf_node(bp2): + return bp1.proc_id == bp2.proc_id + elif (not is_leaf_node(bp1)) and (not is_leaf_node(bp2)): + return all([ + bp1.split_dir == bp2.split_dir, bp1.split_pos == bp2.split_pos, + partitions_are_equal(bp1.left, bp2.left), + partitions_are_equal(bp1.right, bp2.right) + ]) + else: + return False diff --git a/python/chunk_balancer.py b/python/chunk_balancer.py new file mode 100644 index 000000000..956ba979c --- /dev/null +++ b/python/chunk_balancer.py @@ -0,0 +1,284 @@ +import abc +import copy +from typing import Optional, Tuple, Union + +import meep as mp +from meep import binary_partition_utils as bpu +from meep.timing_measurements import MeepTimingMeasurements + +import numpy as np + + +class AbstractChunkBalancer(abc.ABC): + """Abstract chunk balancer for adaptive chunk layouts in Meep simulations. + + This class defines interfaces for a chunk balancer, which adjusts chunk + layouts to optimal load balancing. It provides two main functionalities: + 1. Generating an initial chunk layout for the first iteration of an + optimization run, using a strategy other than the default chunk + partitioning routine in Meep + 2. Adaptively modifying chunk layouts for subsequent iterations in an + optimization run by incorporating timing data from the previous + iteration(s) and resizing the chunks accordingly + + Subclasses of this class can be passed as an option into optimization runs. + """ + + def make_initial_chunk_layout( + self, sim: mp.Simulation) -> Union[mp.BinaryPartition, None]: + """Generates an initial chunk layout based on expected compute costs. + + Args: + sim: the meep.Simulation object to generate a chunk layout for + + Returns: + The chunk layout to be used by the simulation, or None, in which case + Meep will use its own default logic to compute the chunk layout (this is + the default behavior and can be overridden in child classes). + """ + del sim + return None + + def adjust_chunk_layout(self, sim: mp.Simulation, **kwargs) -> None: + """Computes a new chunk layout, applies it to sim, and resets/re-inits sim. + + This method also calls self.should_adjust_chunk_layout(sim). If the current + chunk layout is sufficiently well-balanced, no changes will be made. + NOTE: This is a state-changing method which may reset the simulation. + + Args: + sim: the meep.Simulation object with the chunk layout to be adjusted + **kwargs: extra args to be passed to self.compute_new_chunk_layout + + Raises: + ValueError: if sim.chunk_layout includes nodes with duplicate proc_ids + ValueError: if sim.chunk_layout has proc_ids not included in + sim.structure.get_chunk_owners() (this could occur if number of + processes exceeds the number of physical cores) + """ + self._validate_sim(sim) + + if self.should_adjust_chunk_layout(sim): + + old_chunk_layout = sim.chunk_layout + chunk_volumes = sim.structure.get_chunk_volumes() + chunk_owners = sim.structure.get_chunk_owners() + + timing_measurements = MeepTimingMeasurements.new_from_simulation(sim, -1) + + new_chunk_layout = self.compute_new_chunk_layout(timing_measurements, + old_chunk_layout, + chunk_volumes, + chunk_owners, **kwargs) + + sim.reset_meep() + sim.chunk_layout = new_chunk_layout + sim.init_sim() + + @abc.abstractmethod + def compute_new_chunk_layout(self, + timing_measurements: MeepTimingMeasurements, + old_partition: mp.BinaryPartition, + chunk_volumes: Tuple[mp.grid_volume], + chunk_owners: np.ndarray) -> mp.BinaryPartition: + """Rebalances the partition to equalize simulation time for node's children. + + Args: + timing_measurements: elapsed time by category from previous iteration + old_partition: the chunk_layout from the previous iteration + chunk_volumes: obtained from sim.structure.get_chunk_volumes() + chunk_owners: obtained from sim.structure.get_chunk_owners() + + Returns: + The chunk layout to be used by the next iteration, with the chunks + resized appropriately to balance load across MPI processes. + """ + + def should_adjust_chunk_layout(self, sim: mp.Simulation) -> bool: + """Is the current layout imbalanced enough to justify rebuilding the sim? + + Args: + sim: the meep.Simulation object with the chunk layout to be adjusted + + Returns: + A bool specifying whether the current chunk layout is sufficiently poorly + load-balanced to justify the upfront cost of rebuilding the Meep + simulation object to redefine the chunks. By default, True is returned if + this method is not overridden in a subclass. + """ + del sim + return True + + def _validate_sim(self, sim: mp.Simulation): + """Ensures that chunk layout and chunk volumes are properly formatted. + + Args: + sim: the meep.Simulation object to validate + + Raises: + ValueError: if sim.chunk_layout includes nodes with duplicate proc_ids + ValueError: if sim.chunk_layout has proc_ids not included in + sim.structure.get_chunk_owners() (this could occur if number of + processes exceeds the number of physical cores) + """ + # Check that chunk_layout does not contain duplicate nodes for same process + if bpu.partition_has_duplicate_proc_ids(sim.chunk_layout): + raise ValueError('Duplicate proc_ids found in chunk_layout!') + # Check that proc_ids in chunk_layout are assigned properly to grid owners + chunk_owners = sim.structure.get_chunk_owners() + proc_ids = [ + node.proc_id for node in bpu.enumerate_leaf_nodes(sim.chunk_layout) + ] + if set(chunk_owners) != set(proc_ids): + raise ValueError( + 'Processes {} present in chunk_layout but not grid_owners!'.format( + set(proc_ids) - set(chunk_owners))) + + +class ChunkBalancer(AbstractChunkBalancer): + """A chunk balancer for adaptive chunk layouts in Meep simulations. + + This class generates initial chunk layouts using Meep's built-in scheme, and + rebalances chunks using timing data from the previous iteration in an + optimization run to resize chunks appropriately. + + The general idea behind this chunk balancer implementation is to compute + 'load' terms defined by the ratio of (simulation time t) / (chunk volume V). + This implicitly should account for additional background load on the various + MPI nodes, since if one machine is more busy than another, it will have a + higher ratio of t/V. + + The split_pos for each node is adjusted to equalize the per-core values of + left and right simulation times, then the method is recursively called for + each of the node's children. A sensitivity parameter determines how much the + split_pos should change from the previous value, with 0.0 being no change + and 1.0 being instant change to the new computed value. + """ + + def compute_new_chunk_layout( + self, + timing_measurements: MeepTimingMeasurements, + partition: mp.BinaryPartition, + chunk_volumes: Tuple[mp.grid_volume], + chunk_owners: np.ndarray, + new_partition: Optional[mp.BinaryPartition] = None, + new_partition_root: Optional[mp.BinaryPartition] = None, + xyz_bounds: Optional[Tuple[float, float, float, float, float, + float]] = None, + sensitivity: float = 0.5) -> mp.BinaryPartition: + """Rebalances the partition to equalize simulation time for node's children. + + Args: + timing_measurements: elapsed time by category from previous iteration + partition: the chunk_layout from the previous iteration + chunk_volumes: obtained from sim.structure.get_chunk_volumes() + chunk_owners: obtained from sim.structure.get_chunk_owners() + new_partition: reference to the new chunk_layout object + new_partition_root: reference to the root node of the new chunk_layout + xyz_bounds: min and max xyz values for the new sub-partition + sensitivity: adjusts how much the new split_pos should change + + Returns: + The chunk layout to be used by the next iteration, with the chunks + resized appropriately to balance sim times across MPI processes. + """ + + # if we're at root, make a copy of the partition to return as new partition + if new_partition is None: + new_partition = copy.deepcopy(partition) + new_partition_root = new_partition + + # if at leaf node, no more rebalancing to do, so return the partition + if bpu.is_leaf_node(partition): + return partition + + if xyz_bounds is None: + xyz_bounds = bpu.get_box_ranges(partition, chunk_volumes, chunk_owners) + + # Retrieve the relevant dimensions d_min, d_max along the split axis + # NOTE: we call the distances d_min, d_max regardless of split direction + if partition.split_dir == mp.X: + d_min, d_max, _, _, _, _ = xyz_bounds + elif partition.split_dir == mp.Y: + _, _, d_min, d_max, _, _ = xyz_bounds + elif partition.split_dir == mp.Z: + _, _, _, _, d_min, d_max = xyz_bounds + else: + raise ValueError('Split direction must be mp.X, mp.Y, or mp.Z!') + + sim_times = list( + self._compute_working_times_per_process(timing_measurements)) + + n_left = partition.left.numchunks() + n_right = partition.right.numchunks() + + t_left, t_right = bpu.get_left_right_total_weights(partition, sim_times) + v_left, v_right = bpu.get_left_right_total_volumes(partition, chunk_volumes, + chunk_owners) + + v_left_new = v_left * t_right * n_left + v_right_new = v_right * t_left * n_right + split_frac = v_left_new / (v_left_new + v_right_new) + + old_split_pos = partition.split_pos + + new_split_pos = (d_max - d_min) * split_frac + d_min + new_split_pos = sensitivity * new_split_pos + (1 - + sensitivity) * old_split_pos + + # Adjust the split pos on new_partition. We can't modify the old partition + # because it could affect left/right volume ratios for subsequent calls + new_partition.split_pos = new_split_pos + + x_min, x_max, y_min, y_max, z_min, z_max = xyz_bounds + # Update the box ranges for the new partition after the adjustment + if partition.split_dir == mp.X: + left_xyz_bounds = (x_min, new_split_pos, y_min, y_max, z_min, z_max) + right_xyz_bounds = (new_split_pos, x_max, y_min, y_max, z_min, z_max) + elif partition.split_dir == mp.Y: + left_xyz_bounds = (x_min, x_max, y_min, new_split_pos, z_min, z_max) + right_xyz_bounds = (x_min, x_max, new_split_pos, y_max, z_min, z_max) + elif partition.split_dir == mp.Z: + left_xyz_bounds = (x_min, x_max, y_min, y_max, z_min, new_split_pos) + right_xyz_bounds = (x_min, x_max, y_min, y_max, new_split_pos, z_max) + + self.compute_new_chunk_layout( + timing_measurements, + partition.left, + chunk_volumes, + chunk_owners, + new_partition=new_partition.left, + new_partition_root=new_partition_root, + xyz_bounds=left_xyz_bounds, + sensitivity=sensitivity) + self.compute_new_chunk_layout( + timing_measurements, + partition.right, + chunk_volumes, + chunk_owners, + new_partition=new_partition.right, + new_partition_root=new_partition_root, + xyz_bounds=right_xyz_bounds, + sensitivity=sensitivity) + + return new_partition + + def _compute_working_times_per_process( + self, timing_measurements: MeepTimingMeasurements) -> np.ndarray: + """Computes the time spent by each MPI process actively working.""" + + time_sinks_to_include = [ + 'time_stepping', + 'boundaries_copying', + 'field_output', + 'fourier_transform', + 'mpb', + 'near_to_farfield_transform', + ] + + num_processes = len(timing_measurements.measurements['time_stepping']) + working_times = np.zeros(num_processes) + for category in time_sinks_to_include: + working_times += np.array(timing_measurements.measurements[category]) + + return working_times diff --git a/python/tests/test_binary_partition_utils.py b/python/tests/test_binary_partition_utils.py new file mode 100644 index 000000000..5b6f72866 --- /dev/null +++ b/python/tests/test_binary_partition_utils.py @@ -0,0 +1,378 @@ +import copy +import unittest + +import parameterized + +import meep as mp +import meep.binary_partition_utils as bpu +import numpy as np + +PARTITION_NO_DUPLICATE_PROC_ID = mp.BinaryPartition( + data=[(mp.X, -2.), 0, [(mp.Y, 1.5), [(mp.X, 4.), 1, [(mp.Y, + 0.5), 4, 3]], 2]]) +PARTITION_DUPLICATE_PROC_ID = mp.BinaryPartition( + data=[(mp.X, -2.), 0, [(mp.Y, 1.5), [(mp.X, 4.), 1, [(mp.Y, + 0.5), 1, 3]], 0]]) + +# Mocked chunk IDs since we are in a single-core environment +CHUNK_OWNERS_NO_DUPLICATE_PROC_ID = np.array([0, 1, 4, 3, 2]) +CHUNK_OWNERS_DUPLICATE_PROC_ID = np.array([0, 1, 1, 3, 0]) + + +class BinaryPartitionUtilsTest(unittest.TestCase): + + @parameterized.parameterized.expand([ + (PARTITION_NO_DUPLICATE_PROC_ID, False), + (PARTITION_NO_DUPLICATE_PROC_ID.right, False), + (PARTITION_NO_DUPLICATE_PROC_ID.left, True), + (PARTITION_DUPLICATE_PROC_ID, False), + (PARTITION_DUPLICATE_PROC_ID.right, False), + (PARTITION_DUPLICATE_PROC_ID.left, True), + ]) + def test_is_leaf_node(self, partition, expected_leaf_status): + self.assertEqual(bpu.is_leaf_node(partition), expected_leaf_status) + + @parameterized.parameterized.expand([ + (PARTITION_NO_DUPLICATE_PROC_ID, CHUNK_OWNERS_NO_DUPLICATE_PROC_ID), + (PARTITION_DUPLICATE_PROC_ID, CHUNK_OWNERS_DUPLICATE_PROC_ID), + ]) + def test_enumerate_leaf_nodes(self, partition, chunk_owners): + leaf_nodes = list(bpu.enumerate_leaf_nodes(partition)) + self.assertEqual(len(leaf_nodes), partition.numchunks()) + proc_ids = [node.proc_id for node in bpu.enumerate_leaf_nodes(partition)] + self.assertEqual(proc_ids, list(chunk_owners)) # depth first ordering + + def test_partition_has_duplicate_proc_ids(self): + self.assertFalse( + bpu.partition_has_duplicate_proc_ids(PARTITION_NO_DUPLICATE_PROC_ID)) + self.assertTrue( + bpu.partition_has_duplicate_proc_ids(PARTITION_DUPLICATE_PROC_ID)) + + @parameterized.parameterized.expand([ + ( + PARTITION_NO_DUPLICATE_PROC_ID, + [0, 1, 2, 3, 4], + ), + ( + PARTITION_DUPLICATE_PROC_ID, + [0, 1, 2, 3, 4], + ), + ]) + def test_get_total_weight(self, partition, weights_by_proc_id): + if not bpu.partition_has_duplicate_proc_ids(partition): + self.assertEqual( + bpu.get_total_weight(partition, weights_by_proc_id), + sum(weights_by_proc_id)) + self.assertEqual( + bpu.get_total_weight(partition.right.left, weights_by_proc_id), + weights_by_proc_id[1] + weights_by_proc_id[4] + weights_by_proc_id[3]) + else: + with self.assertRaises(ValueError): + bpu.get_total_weight(partition, weights_by_proc_id) + + @parameterized.parameterized.expand([ + ( + PARTITION_NO_DUPLICATE_PROC_ID, + [0, 1, 2, 3, 4], + ), + ( + PARTITION_DUPLICATE_PROC_ID, + [0, 1, 2, 3, 4], + ), + ]) + def test_get_left_right_total_weights(self, partition, weights_by_proc_id): + proc_ids = [node.proc_id for node in bpu.enumerate_leaf_nodes(partition)] + no_duplicates = len(set(proc_ids)) == len(proc_ids) + if no_duplicates: + self.assertEqual( + bpu.get_left_right_total_weights(partition, weights_by_proc_id), + (bpu.get_total_weight(partition.left, weights_by_proc_id), + bpu.get_total_weight(partition.right, weights_by_proc_id))) + else: + with self.assertRaises(ValueError): + bpu.get_left_right_total_weights(partition, weights_by_proc_id) + + @parameterized.parameterized.expand([ + ( + PARTITION_NO_DUPLICATE_PROC_ID, + 2, + [1500, 2400, 300, 100, 700], + ), + ( + PARTITION_NO_DUPLICATE_PROC_ID, + 3, + [15000, 24000, 3000, 1000, 7000], + ), + ]) + def test_pixel_volume(self, partition, dims, expected_pixel_volumes): + cell_size = mp.Vector3(10.0, 5.0, 1.0) if dims == 3 else mp.Vector3( + 10.0, 5.0, 0.0) + sim = mp.Simulation( + cell_size=cell_size, resolution=10, chunk_layout=partition) + sim.init_sim() + chunk_volumes = sim.structure.get_chunk_volumes() + self.assertEqual([bpu.pixel_volume(vol) for vol in chunk_volumes], + expected_pixel_volumes) + + @parameterized.parameterized.expand([ + (PARTITION_NO_DUPLICATE_PROC_ID, CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, 3500), + (PARTITION_DUPLICATE_PROC_ID, CHUNK_OWNERS_DUPLICATE_PROC_ID, 3500), + ]) + def test_get_total_volume_2d(self, partition, chunk_owners, + expected_total_volume): + sim = mp.Simulation( + cell_size=mp.Vector3(10.0, 5.0, 0.0), + resolution=10, + chunk_layout=partition) + sim.init_sim() + + chunk_volumes = sim.structure.get_chunk_volumes() + total_volume = sim.cell_size[0] * sim.cell_size[1] * sim.resolution**2 + self.assertEqual( + bpu.get_total_volume(partition, chunk_volumes, chunk_owners), + total_volume) + + if not bpu.partition_has_duplicate_proc_ids(partition): + self.assertEqual( + bpu.get_total_volume(partition.right, chunk_volumes, chunk_owners), + expected_total_volume) + + @parameterized.parameterized.expand([ + (PARTITION_NO_DUPLICATE_PROC_ID, CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, 35000), + (PARTITION_DUPLICATE_PROC_ID, CHUNK_OWNERS_DUPLICATE_PROC_ID, 35000), + ]) + def test_get_total_volume_3d(self, partition, chunk_owners, + expected_total_volume): + sim = mp.Simulation( + cell_size=mp.Vector3(10.0, 5.0, 1.0), + resolution=10, + chunk_layout=partition) + sim.init_sim() + + chunk_volumes = sim.structure.get_chunk_volumes() + total_volume = sim.cell_size[0] * sim.cell_size[1] * sim.cell_size[ + 2] * sim.resolution**3 + self.assertEqual( + bpu.get_total_volume(partition, chunk_volumes, chunk_owners), + total_volume) + + if not bpu.partition_has_duplicate_proc_ids(partition): + self.assertEqual( + bpu.get_total_volume(partition.right, chunk_volumes, chunk_owners), + expected_total_volume) + + @parameterized.parameterized.expand([ + ( + PARTITION_NO_DUPLICATE_PROC_ID, + CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, + 2, + ), + ( + PARTITION_DUPLICATE_PROC_ID, + CHUNK_OWNERS_DUPLICATE_PROC_ID, + 2, + ), + ( + PARTITION_NO_DUPLICATE_PROC_ID, + CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, + 3, + ), + ( + PARTITION_DUPLICATE_PROC_ID, + CHUNK_OWNERS_DUPLICATE_PROC_ID, + 3, + ), + ]) + def test_get_left_right_total_volumes(self, partition, chunk_owners, dims): + cell_size = mp.Vector3(10.0, 5.0, 1.0) if dims == 3 else mp.Vector3( + 10.0, 5.0, 0.0) + sim = mp.Simulation( + cell_size=cell_size, resolution=10, chunk_layout=partition) + sim.init_sim() + + chunk_volumes = sim.structure.get_chunk_volumes() + self.assertEqual( + bpu.get_left_right_total_volumes(partition, chunk_volumes, + chunk_owners), + (bpu.get_total_volume(partition.left, chunk_volumes, chunk_owners), + bpu.get_total_volume(partition.right, chunk_volumes, chunk_owners))) + + @parameterized.parameterized.expand([ + ( + PARTITION_NO_DUPLICATE_PROC_ID, + CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, + 2, + ), + ( + PARTITION_DUPLICATE_PROC_ID, + CHUNK_OWNERS_DUPLICATE_PROC_ID, + 2, + ), + ( + PARTITION_NO_DUPLICATE_PROC_ID, + CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, + 3, + ), + ( + PARTITION_DUPLICATE_PROC_ID, + CHUNK_OWNERS_DUPLICATE_PROC_ID, + 3, + ), + ]) + def test_get_grid_volumes_in_tree(self, partition, chunk_owners, dims): + cell_size = mp.Vector3(10.0, 5.0, 1.0) if dims == 3 else mp.Vector3( + 10.0, 5.0, 0.0) + sim = mp.Simulation( + cell_size=cell_size, resolution=10, chunk_layout=partition) + sim.init_sim() + + chunk_volumes = sim.structure.get_chunk_volumes() + grid_volumes_in_tree = bpu.get_grid_volumes_in_tree(partition, + chunk_volumes, + chunk_owners) + self.assertEqual(set(grid_volumes_in_tree), set(chunk_volumes)) + + no_duplicates = len(set(chunk_owners)) == len(chunk_owners) + grid_volumes_in_right_expected = chunk_volumes[ + 1:] if no_duplicates else chunk_volumes + grid_volumes_in_right = bpu.get_grid_volumes_in_tree( + partition.right, chunk_volumes, chunk_owners) + self.assertEqual( + set(grid_volumes_in_right), set(grid_volumes_in_right_expected)) + + @parameterized.parameterized.expand([ + ( + PARTITION_NO_DUPLICATE_PROC_ID, + CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, + 2, + { + 0: 1500, + 1: 2400, + 4: 300, + 3: 100, + 2: 700 + }, + ), + ( + PARTITION_DUPLICATE_PROC_ID, + CHUNK_OWNERS_DUPLICATE_PROC_ID, + 2, + { + 0: 2200, + 1: 2700, + 3: 100, + }, + ), + ( + PARTITION_NO_DUPLICATE_PROC_ID, + CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, + 3, + { + 0: 15000, + 1: 24000, + 4: 3000, + 3: 1000, + 2: 7000 + }, + ), + ( + PARTITION_DUPLICATE_PROC_ID, + CHUNK_OWNERS_DUPLICATE_PROC_ID, + 3, + { + 0: 22000, + 1: 27000, + 3: 1000, + }, + ), + ]) + def test_get_total_volume_per_process(self, partition, chunk_owners, dims, + expected_volumes_per_process): + cell_size = mp.Vector3(10.0, 5.0, 1.0) if dims == 3 else mp.Vector3( + 10.0, 5.0, 0.0) + sim = mp.Simulation( + cell_size=cell_size, resolution=10, chunk_layout=partition) + sim.init_sim() + chunk_volumes = sim.structure.get_chunk_volumes() + volumes_per_process = bpu.get_total_volume_per_process( + partition, chunk_volumes, chunk_owners) + self.assertEqual(volumes_per_process, expected_volumes_per_process) + + @parameterized.parameterized.expand([ + ( + PARTITION_NO_DUPLICATE_PROC_ID, + CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, + 2, + (-5.0, 5.0, -2.5, 2.5, 0.0, 0.0), + (-2.0, 5.0, -2.5, 2.5, 0.0, 0.0), + ), + ( + PARTITION_DUPLICATE_PROC_ID, + CHUNK_OWNERS_DUPLICATE_PROC_ID, + 2, + (-5.0, 5.0, -2.5, 2.5, 0.0, 0.0), + (-5.0, 5.0, -2.5, 2.5, 0.0, 0.0), + ), + ( + PARTITION_NO_DUPLICATE_PROC_ID, + CHUNK_OWNERS_NO_DUPLICATE_PROC_ID, + 3, + (-5.0, 5.0, -2.5, 2.5, -0.5, 0.5), + (-2.0, 5.0, -2.5, 2.5, -0.5, 0.5), + ), + ( + PARTITION_DUPLICATE_PROC_ID, + CHUNK_OWNERS_DUPLICATE_PROC_ID, + 3, + (-5.0, 5.0, -2.5, 2.5, -0.5, 0.5), + (-5.0, 5.0, -2.5, 2.5, -0.5, 0.5), + ), + ]) + def test_get_box_ranges(self, partition, chunk_owners, dims, + expected_box_ranges, expected_right_box_ranges): + cell_size = mp.Vector3(10.0, 5.0, 1.0) if dims == 3 else mp.Vector3( + 10.0, 5.0, 0.0) + sim = mp.Simulation( + cell_size=cell_size, resolution=10, chunk_layout=partition) + sim.init_sim() + chunk_volumes = sim.structure.get_chunk_volumes() + self.assertEqual( + bpu.get_box_ranges(partition, chunk_volumes, chunk_owners), + expected_box_ranges) + self.assertEqual( + bpu.get_box_ranges(partition.right, chunk_volumes, chunk_owners), + expected_right_box_ranges) + + @parameterized.parameterized.expand([ + ( + copy.deepcopy(PARTITION_NO_DUPLICATE_PROC_ID), + copy.deepcopy(PARTITION_NO_DUPLICATE_PROC_ID), + True, + ), + ( + copy.deepcopy(PARTITION_DUPLICATE_PROC_ID), + copy.deepcopy(PARTITION_DUPLICATE_PROC_ID), + True, + ), + ( + copy.deepcopy(PARTITION_NO_DUPLICATE_PROC_ID), + copy.deepcopy(PARTITION_DUPLICATE_PROC_ID), + False, + ), + ( + mp.BinaryPartition(data=[(mp.X, -2.5), 0, [(mp.X, 2.5), 1, 2]]), + mp.BinaryPartition(data=[(mp.X, -2.5), 0, [(mp.X, 2.5), 1, 2]]), + True, + ), + ( + mp.BinaryPartition(data=[(mp.X, -2.5), 0, [(mp.X, 2.4), 1, 2]]), + mp.BinaryPartition(data=[(mp.X, -2.5), 0, [(mp.X, 2.5), 1, 2]]), + False, + ), + ]) + def test_partitions_are_equal(self, bp1, bp2, is_equal): + self.assertEqual(bpu.partitions_are_equal(bp1, bp2), is_equal) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/tests/test_chunk_balancer.py b/python/tests/test_chunk_balancer.py new file mode 100644 index 000000000..36a85f40c --- /dev/null +++ b/python/tests/test_chunk_balancer.py @@ -0,0 +1,329 @@ +import meep as mp +import numpy as np + +import parameterized +import unittest + +import meep.binary_partition_utils as bpu + +from meep.timing_measurements import MeepTimingMeasurements, TIMING_MEASUREMENT_IDS +from meep.chunk_balancer import ChunkBalancer + + +class MockSimulation(mp.Simulation): + """Class which emulates the multi-core MPI behavior while on a single core. + + This inherits all methods from mp.Simulation but overrides two behaviors: + 1. sim.time_spent_on() will provide fake timing data where all entries are + [0.0 ... 0.0] except for time_stepping, which will have an array of + values where the elapsed time for each process is equal to the pixel + volume of that process's chunk. + 2. sim.structure.get_chunk_owners will return a list of process IDs which + would be the values that you would see if running the simulation in MPI + mode. (Otherwise in a single-core test environment, an array of zeros + would be returned.) + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.relative_loads = None + + def _structure_get_chunk_owners(self): + # Hacky workaround to make this test work on single-core systems + proc_ids = [] + for leaf in bpu.enumerate_leaf_nodes(self.chunk_layout): + # it seems that grid volumes are enumerated + # in the same order as enumerate_leaf_nodes() + proc_ids.append(leaf.proc_id) + return np.array(proc_ids) + + def init_sim(self): + super().init_sim() + setattr(self.structure, "get_chunk_owners", + self._structure_get_chunk_owners) + + def time_spent_on(self, time_sink: int): + # We're going to pretend the amount of time spent is ~volume so that + # chunks should converge to an equal volume + num_processes = self.chunk_layout.numchunks() + chunk_volumes = self.structure.get_chunk_volumes() + chunk_owners = self.structure.get_chunk_owners() + + if chunk_volumes[0].nz() > 0: # 3D case + volumes = [v.nx() * v.ny() * v.nz() for v in chunk_volumes] + else: # 2D case + volumes = [v.nx() * v.ny() for v in chunk_volumes] + + total_volume_by_proc = np.zeros(num_processes) + for i, v in enumerate(volumes): + total_volume_by_proc[chunk_owners[i]] += v + + if time_sink == TIMING_MEASUREMENT_IDS["time_stepping"]: + times = 1.0 * total_volume_by_proc + else: + times = 0.0 * np.ones(num_processes) + + if self.relative_loads is not None: + times = times * self.relative_loads + + return times + + +# The test sim instances are lambda functions () => MockSimulation because +# the chunk balancer tests will mutate the chunk_layout attribute. + +TEST_SIM_1 = lambda: MockSimulation( + cell_size=mp.Vector3(10.0, 5.0, 0), + resolution=20, + chunk_layout=mp.BinaryPartition(data=[ + (mp.X, -2.0), 0, [(mp.Y, 1.5), [(mp.X, 4.0), 1, [(mp.Y, 0.5), 4, 3]], 2] + ])) +TEST_SIM_2 = lambda: MockSimulation( + cell_size=mp.Vector3(10.0, 5.0, 3.0), + resolution=10, + chunk_layout=mp.BinaryPartition(data=[ + (mp.X, -2.0), 0, [(mp.Y, 1.0), [(mp.X, 3.0), 1, [(mp.Y, 0.5), 4, 3]], 2] + ])) +TEST_SIM_3 = lambda: MockSimulation( + cell_size=mp.Vector3(6.0, 4.0, 0), + resolution=10, + chunk_layout=mp.BinaryPartition(data=[(mp.X, -2.0), 0, [(mp.X, 2.0), 1, 2]]) +) +TEST_SIM_4 = lambda: MockSimulation( + cell_size=mp.Vector3(6.0, 4.0, 0), + resolution=10, + chunk_layout=mp.BinaryPartition( + data=[(mp.X, -2.0), 0, + [(mp.X, -0.5), 1, [(mp.X, 1.0), 2, [(mp.X, 2.0), 3, 4]]]])) + +TEST_SIM_DUPLICATE_PROC_ID = lambda: MockSimulation( + cell_size=mp.Vector3(10.0, 5.0, 0), + resolution=10, + chunk_layout=mp.BinaryPartition(data=[ + (mp.X, -2.0), 0, [(mp.Y, 1.5), [(mp.X, 4.0), 1, [(mp.Y, 0.5), 2, 1]], 2] + ])) + +TEST_CHUNK_DATA_1 = { + "chunk_layout": + mp.BinaryPartition(data=[(mp.X, -2.5), 0, [(mp.X, 2.5), 1, 2]]), + "cell_size": + mp.Vector3(6.0, 4.0, 0), + "time_stepping": [1.0, 1.0, 1.0], + "new_chunk_layout": + mp.BinaryPartition(data=[(mp.X, -2.5), 0, [(mp.X, 2.5), 1, 2]]), +} +TEST_CHUNK_DATA_2 = { + "chunk_layout": + mp.BinaryPartition(data=[(mp.X, -2.5), 0, [(mp.X, 2.5), 1, 2]]), + "cell_size": + mp.Vector3(6.0, 4.0, 0), + "time_stepping": [3.0 - 2.5, 2.5 + 2.5, 3.0 - 2.5], + "new_chunk_layout": + mp.BinaryPartition(data=[(mp.X, -1.0), 0, [(mp.X, 1.0), 1, 2]]), +} +TEST_CHUNK_DATA_3 = { + "chunk_layout": mp.BinaryPartition(data=[(mp.X, 2.0), 0, 1]), + "cell_size": mp.Vector3(6.0, 4.0, 0), + "time_stepping": [1.0, 1.0], + "new_chunk_layout": mp.BinaryPartition(data=[(mp.X, 2.0), 0, 1]), +} +TEST_CHUNK_DATA_4 = { + "chunk_layout": mp.BinaryPartition(data=[(mp.X, 2.0), 0, 1]), + "cell_size": mp.Vector3(6.0, 4.0, 0), + "time_stepping": [5.0, 1.0], + "new_chunk_layout": mp.BinaryPartition(data=[(mp.X, 0.0), 0, 1]), +} +TEST_CHUNK_DATA_5 = { + "chunk_layout": + mp.BinaryPartition(data=[( + mp.X, + -2.0), 0, [(mp.Y, 1.5), [(mp.X, 4.0), 1, [(mp.Y, 0.5), 4, 3]], 2]]), + "cell_size": + mp.Vector3(10.0, 5.0, 0), + "time_stepping": [1.0, 1.0, 1.0, 1.0, 1.0], + "new_chunk_layout": + mp.BinaryPartition(data=[( + mp.X, + -2.0), 0, [(mp.Y, 1.5), [(mp.X, 4.0), 1, [(mp.Y, 0.5), 4, 3]], 2]]), +} +TEST_CHUNK_DATA_6 = { + "chunk_layout": + mp.BinaryPartition(data=[( + mp.X, + -2.0), 0, [(mp.Y, 1.5), [(mp.X, 4.0), 1, [(mp.Y, 0.5), 4, 3]], 2]]), + "cell_size": + mp.Vector3(10.0, 5.0, 0), + "time_stepping": [1500.0, 2400.0, 700.0, 100.0, 300.0], + "new_chunk_layout": + mp.BinaryPartition( + data=[(mp.X, -3.0), 0, + [(mp.Y, 1.25 + ), [(mp.X, -0.3333333333333335), 1, [(mp.Y, + -0.625), 4, 3]], 2]]), +} + + +class MockSimulationTest(unittest.TestCase): + + @parameterized.parameterized.expand([ + (TEST_SIM_1, [6000.0, 9600.0, 2800.0, 400.0, 1200.0]), + (TEST_SIM_2, [45000.0, 52500.0, 31500.0, 3000.0, 18000.0]), + (TEST_SIM_3, [400.0, 1600.0, 400.0]), + (TEST_SIM_4, [400.0, 600.0, 600.0, 400.0, 400.0]), + ]) + def test_time_spent_on(self, test_sim_constructor, expected_stepping_times): + test_sim = test_sim_constructor() + test_sim.init_sim() + + for time_sink in TIMING_MEASUREMENT_IDS.values(): + if time_sink == TIMING_MEASUREMENT_IDS["time_stepping"]: + self.assertListEqual( + list(test_sim.time_spent_on(time_sink)), expected_stepping_times) + else: + self.assertListEqual( + list(test_sim.time_spent_on(time_sink)), + [0.0] * len(expected_stepping_times)) + + @parameterized.parameterized.expand([ + (TEST_SIM_1, [0, 1, 4, 3, 2]), + (TEST_SIM_2, [0, 1, 4, 3, 2]), + (TEST_SIM_3, [0, 1, 2]), + (TEST_SIM_4, [0, 1, 2, 3, 4]), + ]) + def test_structure_get_chunk_owners(self, test_sim_constructor, + expected_chunk_owners): + test_sim = test_sim_constructor() + test_sim.init_sim() + + chunk_owners = test_sim.structure.get_chunk_owners() + + self.assertListEqual(list(chunk_owners), expected_chunk_owners) + + +class ChunkBalancerTest(unittest.TestCase): + + @parameterized.parameterized.expand([ + (TEST_SIM_1, False), + (TEST_SIM_DUPLICATE_PROC_ID, True), + ]) + def test_validate_sim(self, test_sim_constructor, should_raise_exception): + test_sim = test_sim_constructor() + test_sim.init_sim() + + chunk_balancer = ChunkBalancer() + + if should_raise_exception: + with self.assertRaises(ValueError): + chunk_balancer._validate_sim(test_sim) + else: + chunk_balancer._validate_sim(test_sim) + + @parameterized.parameterized.expand([ + (TEST_SIM_1,), + (TEST_SIM_2,), + (TEST_SIM_3,), + (TEST_SIM_4,), + ]) + def test_chunk_layout_improvement(self, test_sim_constructor): + """Tests that chunk_balancer improves balance after 1 iteration.""" + test_sim = test_sim_constructor() + test_sim.init_sim() + + old_timing_measurements = MeepTimingMeasurements.new_from_simulation( + test_sim, -1) + + chunk_balancer = ChunkBalancer() + + chunk_balancer.adjust_chunk_layout(test_sim, sensitivity=1.0) + + new_timing_measurements = MeepTimingMeasurements.new_from_simulation( + test_sim, -1) + + old_step_times = np.array( + old_timing_measurements.measurements["time_stepping"]) + new_step_times = np.array( + new_timing_measurements.measurements["time_stepping"]) + + old_max_time = np.max(old_step_times) + new_max_time = np.max(new_step_times) + + old_min_time = np.min(old_step_times) + new_min_time = np.min(new_step_times) + + self.assertLess(new_max_time, old_max_time) + self.assertGreater(new_min_time, old_min_time) + + @parameterized.parameterized.expand([ + (TEST_SIM_1,), + (TEST_SIM_2,), + (TEST_SIM_3,), + (TEST_SIM_4,), + ]) + def test_chunk_layout_convergence(self, test_sim_constructor): + """Tests that chunk_balancer converges to load balanced state.""" + test_sim = test_sim_constructor() + test_sim.init_sim() + + chunk_balancer = ChunkBalancer() + + num_iterations = 25 + + for _ in range(num_iterations): + chunk_balancer.adjust_chunk_layout(test_sim, sensitivity=0.5) + + new_timing_measurements = MeepTimingMeasurements.new_from_simulation( + test_sim, -1) + + new_step_times = np.array( + new_timing_measurements.measurements["time_stepping"]) + + # Check that new stepping times have converged to close to the mean value + tolerance = 0.05 + mean_step_time = np.mean(new_step_times) + self.assertTrue( + np.allclose(mean_step_time, new_step_times, rtol=tolerance)) + + @parameterized.parameterized.expand([ + (TEST_CHUNK_DATA_1,), + (TEST_CHUNK_DATA_2,), + (TEST_CHUNK_DATA_3,), + (TEST_CHUNK_DATA_4,), + (TEST_CHUNK_DATA_5,), + (TEST_CHUNK_DATA_6,), + ]) + def test_split_pos_adjustment(self, test_chunk_data): + chunk_layout = test_chunk_data["chunk_layout"] + sim = MockSimulation( + cell_size=test_chunk_data["cell_size"], + resolution=10, + chunk_layout=chunk_layout) + sim.init_sim() + chunk_volumes = sim.structure.get_chunk_volumes() + chunk_owners = sim.structure.get_chunk_owners() + + chunk_balancer = ChunkBalancer() + + measurements = {} + num_procs = len(test_chunk_data["time_stepping"]) + for name in TIMING_MEASUREMENT_IDS.keys(): + if name == "time_stepping": + measurements[name] = test_chunk_data["time_stepping"] + else: + measurements[name] = [0] * num_procs + timing_measurements = MeepTimingMeasurements(measurements, -1, None, None, + None, None, None) + + new_chunk_layout = chunk_balancer.compute_new_chunk_layout( + timing_measurements, + chunk_layout, + chunk_volumes, + chunk_owners, + sensitivity=1.0) + expected_chunk_layout = test_chunk_data["new_chunk_layout"] + + self.assertTrue( + bpu.partitions_are_equal(new_chunk_layout, expected_chunk_layout)) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/tests/test_timing_measurements.py b/python/tests/test_timing_measurements.py new file mode 100644 index 000000000..93ec0ca02 --- /dev/null +++ b/python/tests/test_timing_measurements.py @@ -0,0 +1,32 @@ +import time +import unittest + +import meep as mp +from meep import timing_measurements as timing + + +class TimingTest(unittest.TestCase): + + def test_timing_measurements(self): + """Tests that timing measurements have expected names and can be updated.""" + sim = mp.Simulation( + cell_size=mp.Vector3(2, 2, 2), + resolution=20, + ) + time_start = time.time() + sim.run(until=5) + timing_measurements = timing.MeepTimingMeasurements.new_from_simulation(sim) + + # Check for expected names after updating + self.assertSetEqual( + set(timing_measurements.measurement_names), + set(timing.TIMING_MEASUREMENT_IDS.keys()), + ) + self.assertTrue(timing_measurements.elapsed_time > 0 or timing_measurements.elapsed_time == -1) + self.assertGreater(timing_measurements.num_time_steps, 0) + self.assertGreaterEqual(timing_measurements.comm_efficiency, 0) + self.assertGreaterEqual(timing_measurements.comm_efficiency_one_to_one, 0) + self.assertGreaterEqual(timing_measurements.comm_efficiency_all_to_all, 0) + +if __name__ == '__main__': + unittest.main() diff --git a/python/timing_measurements.py b/python/timing_measurements.py new file mode 100644 index 000000000..08fc068b6 --- /dev/null +++ b/python/timing_measurements.py @@ -0,0 +1,175 @@ +from typing import Dict, List, Optional +import meep as mp +import numpy as np + +# Codes for different Meep time sinks, used by `mp.Simulation.time_spent_on()`. +# See +# https://meep.readthedocs.io/en/latest/Python_User_Interface/#simulation-time +# for more information. +TIMING_MEASUREMENT_IDS = { + 'connecting_chunks': mp.Connecting, + 'time_stepping': mp.Stepping, + 'boundaries_copying': mp.Boundaries, + 'mpi_all_to_all': mp.MpiAllTime, + 'mpi_one_to_one': mp.MpiOneTime, + 'field_output': mp.FieldOutput, + 'fourier_transform': mp.FourierTransforming, + 'mpb': mp.MPBTime, + 'near_to_farfield_transform': mp.GetFarfieldsTime, + 'other': mp.Other, + 'field_update_b': mp.FieldUpdateB, + 'field_update_h': mp.FieldUpdateH, + 'field_update_d': mp.FieldUpdateD, + 'field_update_e': mp.FieldUpdateE, + 'boundary_stepping_b': mp.BoundarySteppingB, + 'boundary_stepping_wh': mp.BoundarySteppingWH, + 'boundary_stepping_ph': mp.BoundarySteppingPH, + 'boundary_stepping_h': mp.BoundarySteppingH, + 'boundary_stepping_d': mp.BoundarySteppingD, + 'boundary_stepping_we': mp.BoundarySteppingWE, + 'boundary_stepping_pe': mp.BoundarySteppingPE, + 'boundary_stepping_e': mp.BoundarySteppingE, +} + + +def get_current_time_step(sim: mp.Simulation) -> int: + """Gets the current time step from a Meep simulation object.""" + return sim.fields.t + + +class MeepTimingMeasurements: + """Container for Meep timing measurements and performance information. + + See + https://meep.readthedocs.io/en/latest/Python_User_Interface/#simulation-time + for more information on the Meep timing measurements. + + Attributes: + measurements: a dictionary of timing measurements, where each entry is a + list containing timing measurements for each process. + elapsed_time: the simulation's elapsed wall time. + num_time_steps: the total number of time steps taken in the simulation. + time_per_step: the wall time taken per step. Each element of this list is + expected to correspond to a unique time step, but could also be averaged + over several time steps. + dft_relative_change: the relative change in the DFT fields, measured at each + time step. + overlap_relative_change: the relative change in the mode overlaps, measured + at each time step. + relative_energy: the relative energy in the simulation domain, measured + at each time step. + measurement_names: a list of measurement names stored in `measurements`. + comm_efficiency: the communication efficiency, defined as the mean + MPI/synchronization time divided by the sum of the mean time taken for + time stepping and the mean time taken for fourier transforming. This is + zero if no simulation work has been performed. + comm_efficiency_one_to_one: MPI one-to-one communication efficiency, defined + as the mean MPI one-to-one communication time divided by the sum of the + mean time taken for time stepping and the mean time taken for fourier + transforming. This is zero if no simulation work has been performed. + comm_efficiency_all_to_all: MPI all-to-all communication efficiency, defined + as the mean MPI all-to-all communication time divided by the sum of the + mean time taken for time stepping and the mean time taken for fourier + transforming. This is zero if no simulation work has been performed. + """ + + def __init__(self, + measurements: Dict[str, List[float]], + elapsed_time: float, + num_time_steps: int, + time_per_step: List[float], + dft_relative_change: List[float], + overlap_relative_change: List[float], + relative_energy: List[float]): + self.measurements = measurements + self.elapsed_time = elapsed_time + self.num_time_steps = num_time_steps + self.time_per_step = time_per_step + self.dft_relative_change = dft_relative_change + self.overlap_relative_change = overlap_relative_change + self.relative_energy = relative_energy + + @classmethod + def new_from_simulation( + cls, + sim: mp.Simulation, + elapsed_time: Optional[float] = -1, + time_per_step: Optional[List[float]] = None, + dft_relative_change: Optional[List[float]] = None, + overlap_relative_change: Optional[List[float]] = None, + relative_energy: Optional[List[float]] = None, + ) -> 'MeepTimingMeasurements': + """Creates a new `MeepTimingMeasurements` from a Meep simulation object. + + Usage example: + ``` + sim = mp.Simulation(...) + sim.init_sim() + start_time = time.time() + sim.run(...) + measurements = meep_timing.MeepTimingMeasurements.new_from_simulation( + sim=sim, + elapsed_time=time.time() - start_time, + ) + ``` + + Args: + sim: a Meep simulation object that has previously been run. + elapsed_time: the wall time that has elapsed while running the simulation. + time_per_step: the wall time taken per step. Each element of this list is + expected to correspond to a unique time step, but could also be averaged + over several time steps. + dft_relative_change: the relative change in the DFT fields, measured at + each time step. + overlap_relative_change: the relative change in the mode overlaps, + measured at each time step. + relative_energy: the relative energy in the simulation domain, measured + at each time step. + + Returns: + the resulting `MeepTimingMeasurements` + """ + measurements = {} + for name, timing_id in TIMING_MEASUREMENT_IDS.items(): + measurements[name] = sim.time_spent_on(timing_id).tolist() + return cls( + measurements=measurements, + elapsed_time=elapsed_time, + num_time_steps=get_current_time_step(sim), + time_per_step=time_per_step if time_per_step is not None else [], + dft_relative_change=dft_relative_change if dft_relative_change is not None else [], + overlap_relative_change=overlap_relative_change if overlap_relative_change is not None else [], + relative_energy=relative_energy if relative_energy is not None else [], + ) + + @property + def measurement_names(self) -> List[str]: + return list(self.measurements) + + @property + def comm_efficiency(self) -> float: + computation_time = np.mean(self.measurements['time_stepping']) + np.mean( + self.measurements['fourier_transform']) + if computation_time == 0: + return 0 + else: + return np.mean(self.measurements['mpi_all_to_all'] + + self.measurements['mpi_one_to_one']) / computation_time + + @property + def comm_efficiency_one_to_one(self) -> float: + computation_time = np.mean(self.measurements['time_stepping']) + np.mean( + self.measurements['fourier_transform']) + if computation_time == 0: + return 0 + else: + return np.mean(self.measurements['mpi_one_to_one']) / computation_time + + @property + def comm_efficiency_all_to_all(self) -> float: + computation_time = np.mean(self.measurements['time_stepping']) + np.mean( + self.measurements['fourier_transform']) + if computation_time == 0: + return 0 + else: + return np.mean(self.measurements['mpi_all_to_all']) / computation_time