Skip to content

Commit

Permalink
Recast partition_mesh in terms of generic part identifiers (#308)
Browse files Browse the repository at this point in the history
* simplify partition_mesh interface

* add get_connected_partitions back to docs

* use opaque partition ID instead of number in partition_mesh

* use self/other instead of local/nonlocal

* fix compatibility

* remove unnecessary import

Co-authored-by: Alex Fikl <[email protected]>

* eliminate fun

Co-authored-by: Alex Fikl <[email protected]>

* stamp out remaining traces of fun

* rename membership_list_to_sets -> membership_list_to_map and store index sets as numpy arrays instead of python sets

* remove return_sets option from get_partition_by_pymetis

* fix bugs in MPIBoundaryCommSetupHelper

* flake8

* fix bug

* add a couple of fixmes

* handle groupless mesh case in dim/ambient_dim

* Revert "handle groupless mesh case in dim/ambient_dim"

not a good solution

* disable removal of empty mesh groups in partitioning

* fix some issues with partitioning docs

* remove empty-group filtering

* clarify part vs. partition terminology

* change some asserts to exceptions

* add a couple of FIXMEs

* cosmetic change

Co-authored-by: Andreas Klöckner <[email protected]>

* detect elements that belong to multiple parts

* add return_parts argument to partition_mesh

* add type hints to mesh partitioning

* Fix some annotations in mesh.processing

* add explicit part_id attribute to InterPartAdjacencyGroup

* cosmetic change

* fix some type annotations

* Dict -> Mapping

* fix outdated argument reference in docstring

* try using just dtype

dtype[Any] requires python 3.9

Co-authored-by: Alex Fikl <[email protected]>
Co-authored-by: Andreas Klöckner <[email protected]>
  • Loading branch information
3 people committed Jul 1, 2022
1 parent 8744113 commit 28d2f34
Show file tree
Hide file tree
Showing 5 changed files with 447 additions and 381 deletions.
14 changes: 7 additions & 7 deletions meshmode/discretization/connection/opposite_face.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,20 +521,20 @@ def make_opposite_face_connection(actx, volume_to_bdry_conn):

# {{{ make_partition_connection

# FIXME: Consider adjusting terminology from local/remote to self/other.
def make_partition_connection(actx, *, local_bdry_conn,
remote_bdry_discr, remote_group_infos):
"""
Connects ``local_bdry_conn`` to a neighboring partition.
Connects ``local_bdry_conn`` to a neighboring part.
:arg local_bdry_conn: A :class:`DiscretizationConnection` of the local
partition.
:arg local_bdry_conn: A :class:`DiscretizationConnection` of the local part.
:arg remote_bdry_discr: A :class:`~meshmode.discretization.Discretization`
of the boundary of the remote partition.
of the boundary of the remote part.
:arg remote_group_infos: An array of
:class:`meshmode.distributed.RemoteGroupInfo` instances, one per remote
volume element group.
:returns: A :class:`DirectDiscretizationConnection` that performs data
exchange across faces from the remote partition to the local partition.
exchange across faces from the remote part to the local part.
.. versionadded:: 2017.1
Expand All @@ -556,15 +556,15 @@ def make_partition_connection(actx, *, local_bdry_conn,
# The code assumes that there is the same number of volume and surface groups.
#
# A weak reason to choose remote as the outer loop is because
# InterPartitionAdjacency refers to neighbors by global volume element
# InterPartAdjacency refers to neighbors by global volume element
# numbers, and we only have enough information to resolve those to (group,
# group_local_el_nr) for local elements (whereas we have no information
# about remote volume elements).
#
# (See the find_group_indices below.)

for rgi in remote_group_infos:
rem_ipags = rgi.inter_partition_adj_groups
rem_ipags = rgi.inter_part_adj_groups

for rem_ipag in rem_ipags:
i_local_grps = find_group_indices(local_vol_groups, rem_ipag.neighbors)
Expand Down
170 changes: 84 additions & 86 deletions meshmode/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
.. autoclass:: MPIBoundaryCommSetupHelper
.. autofunction:: get_partition_by_pymetis
.. autofunction:: get_inter_partition_tags
.. autofunction:: membership_list_to_map
.. autofunction:: get_connected_parts
.. autoclass:: RemoteGroupInfo
.. autoclass:: make_remote_group_infos
Expand Down Expand Up @@ -37,7 +38,7 @@

from dataclasses import dataclass
import numpy as np
from typing import List, Set, Union, Mapping, cast, Sequence, TYPE_CHECKING, Hashable
from typing import List, Set, Union, Mapping, cast, Sequence, TYPE_CHECKING

from arraycontext import ArrayContext
from meshmode.discretization.connection import (
Expand All @@ -46,9 +47,8 @@
from meshmode.mesh import (
Mesh,
InteriorAdjacencyGroup,
InterPartitionAdjacencyGroup,
BoundaryTag,
BTAG_PARTITION,
InterPartAdjacencyGroup,
PartID,
)

from meshmode.discretization import ElementGroupFactory
Expand Down Expand Up @@ -94,31 +94,32 @@ def send_mesh_parts(self, mesh, part_per_element, num_parts):
:arg part_per_element: A :class:`numpy.ndarray` containing one
integer per element of *mesh* indicating which part of the
partitioned mesh the element is to become a part of.
:arg num_parts: The number of partitions to divide the mesh into.
:arg num_parts: The number of parts to divide the mesh into.
Sends each partition to a different rank.
Returns one partition that was not sent to any other rank.
Sends each part to a different rank.
Returns one part that was not sent to any other rank.
"""
mpi_comm = self.mpi_comm
rank = mpi_comm.Get_rank()
assert num_parts <= mpi_comm.Get_size()

assert self.is_mananger_rank()

part_num_to_elements = membership_list_to_map(part_per_element)

from meshmode.mesh.processing import partition_mesh
parts = [partition_mesh(mesh, part_per_element, i)[0]
for i in range(num_parts)]
parts = partition_mesh(mesh, part_num_to_elements)

local_part = None

reqs = []
for r, part in enumerate(parts):
for r, part in parts.items():
if r == self.manager_rank:
local_part = part
else:
reqs.append(mpi_comm.isend(part, dest=r, tag=TAG_DISTRIBUTE_MESHES))

logger.info("rank %d: sent all mesh partitions", rank)
logger.info("rank %d: sent all mesh parts", rank)
for req in reqs:
req.wait()

Expand Down Expand Up @@ -147,16 +148,20 @@ def receive_mesh_part(self):

# {{{ remote group info

# FIXME: "Remote" is perhaps not the best naming convention for this. For example,
# in a multi-volume context it may be used when constructing inter-part connections
# between two parts on the same rank.
@dataclass
class RemoteGroupInfo:
inter_partition_adj_groups: List[InterPartitionAdjacencyGroup]
inter_part_adj_groups: List[InterPartAdjacencyGroup]
vol_elem_indices: np.ndarray
bdry_elem_indices: np.ndarray
bdry_faces: np.ndarray


def make_remote_group_infos(
actx: ArrayContext, local_btag: BoundaryTag,
actx: ArrayContext,
remote_part_id: PartID,
bdry_conn: DirectDiscretizationConnection
) -> Sequence[RemoteGroupInfo]:
local_vol_mesh = bdry_conn.from_discr.mesh
Expand All @@ -165,10 +170,10 @@ def make_remote_group_infos(

return [
RemoteGroupInfo(
inter_partition_adj_groups=[
inter_part_adj_groups=[
fagrp for fagrp in local_vol_mesh.facial_adjacency_groups[igrp]
if isinstance(fagrp, InterPartitionAdjacencyGroup)
and fagrp.boundary_tag == local_btag],
if isinstance(fagrp, InterPartAdjacencyGroup)
and fagrp.part_id == remote_part_id],
vol_elem_indices=np.concatenate([
actx.to_numpy(batch.from_element_indices)
for batch in bdry_conn.groups[igrp].batches]),
Expand All @@ -188,17 +193,13 @@ def make_remote_group_infos(
@dataclass(init=True, frozen=True)
class InterRankBoundaryInfo:
"""
.. attribute:: local_btag
A boundary tag for the local boundary towards the remote partition.
.. attribute:: local_part_id
An opaque, hashable, picklable identifier for the local partition.
An opaque, hashable, picklable identifier for the local part.
.. attribute:: remote_part_id
An opaque, hashable, picklable identifier for the remote partition.
An opaque, hashable, picklable identifier for the remote part.
.. attribute:: remote_rank
Expand All @@ -207,22 +208,21 @@ class InterRankBoundaryInfo:
.. attribute:: local_boundary_connection
A :class:`~meshmode.discretization.connection.DirectDiscretizationConnection`
from the volume onto the boundary described by :attr:`local_btag`.
from the volume onto the boundary described by
``BTAG_PARTITION(remote_part_id)``.
.. automethod:: __init__
"""

# FIXME better names?
local_btag: BoundaryTag
local_part_id: Hashable
remote_part_id: Hashable
local_part_id: PartID
remote_part_id: PartID
remote_rank: int
local_boundary_connection: DirectDiscretizationConnection


class MPIBoundaryCommSetupHelper:
"""
Helper for setting up inter-partition facial data exchange.
Helper for setting up inter-part facial data exchange.
.. automethod:: __init__
.. automethod:: __enter__
Expand All @@ -240,11 +240,11 @@ def __init__(self,
],
bdry_grp_factory: ElementGroupFactory):
"""
:arg local_bdry_conns: A :class:`dict` mapping remote partition to
:arg local_bdry_conns: A :class:`dict` mapping remote part to
`local_bdry_conn`, where `local_bdry_conn` is a
:class:`~meshmode.discretization.connection.DirectDiscretizationConnection`
that performs data exchange from
the volume to the faces adjacent to partition `i_remote_part`.
that performs data exchange from the volume to the faces adjacent to
part `i_remote_part`.
:arg bdry_grp_factory: Group factory to use when creating the remote-to-local
boundary connections
"""
Expand All @@ -265,9 +265,8 @@ def __init__(self,

inter_rank_bdry_info = [
InterRankBoundaryInfo(
local_btag=BTAG_PARTITION(remote_rank),
local_part_id=remote_rank,
remote_part_id=self.i_local_rank,
local_part_id=self.i_local_rank,
remote_part_id=remote_rank,
remote_rank=remote_rank,
local_boundary_connection=conn
)
Expand All @@ -292,7 +291,7 @@ def __enter__(self):

# to know when we're done
self.pending_recv_identifiers = {
(irbi.remote_rank, irbi.remote_part_id)
(irbi.local_part_id, irbi.remote_part_id)
for irbi in self.inter_rank_bdry_info}

self.send_reqs = [
Expand All @@ -302,7 +301,7 @@ def __enter__(self):
irbi.remote_part_id,
irbi.local_boundary_connection.to_discr.mesh,
make_remote_group_infos(
self.array_context, irbi.local_btag,
self.array_context, irbi.remote_part_id,
irbi.local_boundary_connection)),
dest=irbi.remote_rank)
for irbi in self.inter_rank_bdry_info]
Expand All @@ -314,13 +313,12 @@ def __exit__(self, type, value, traceback):

def complete_some(self):
"""
Returns a :class:`dict` mapping a subset of remote partitions to
Returns a :class:`dict` mapping a subset of remote parts to
remote-to-local boundary connections, where a remote-to-local boundary
connection is a
:class:`~meshmode.discretization.connection.DirectDiscretizationConnection`
that performs data exchange across faces from partition `i_remote_part`
to the local mesh. When an empty dictionary is returned, setup is
complete.
that performs data exchange across faces from part `i_remote_part` to the
local mesh. When an empty dictionary is returned, setup is complete.
"""
from mpi4py import MPI

Expand All @@ -341,36 +339,41 @@ def complete_some(self):

remote_to_local_bdry_conns = {}

local_part_id_to_irbi = {
irbi.local_part_id: irbi for irbi in self.inter_rank_bdry_info}
assert len(local_part_id_to_irbi) == len(self.inter_rank_bdry_info)
part_ids_to_irbi = {
(irbi.local_part_id, irbi.remote_part_id): irbi
for irbi in self.inter_rank_bdry_info}
if len(part_ids_to_irbi) < len(self.inter_rank_bdry_info):
raise ValueError(
"duplicate local/remote part pair in inter_rank_bdry_info")

for i_src_rank, recvd in zip(
source_ranks, data):
(recvd_remote_part_id, recvd_local_part_id,
for i_src_rank, recvd in zip(source_ranks, data):
(remote_part_id, local_part_id,
remote_bdry_mesh, remote_group_infos) = recvd

logger.debug("rank %d: Received part id '%s' data from rank %d",
self.i_local_rank, recvd_local_part_id, i_src_rank)
self.i_local_rank, remote_part_id, i_src_rank)

# Connect local_mesh to remote_mesh
from meshmode.discretization.connection import make_partition_connection
irbi = local_part_id_to_irbi[recvd_local_part_id]
irbi = part_ids_to_irbi[local_part_id, remote_part_id]
assert i_src_rank == irbi.remote_rank
assert recvd_remote_part_id == irbi.remote_part_id

remote_to_local_bdry_conns[recvd_local_part_id] \
= make_partition_connection(
self.array_context,
local_bdry_conn=irbi.local_boundary_connection,
remote_bdry_discr=(
irbi.local_boundary_connection.to_discr.copy(
actx=self.array_context,
mesh=remote_bdry_mesh,
group_factory=self.bdry_grp_factory)),
remote_group_infos=remote_group_infos)
if self._using_old_timey_interface:
key = remote_part_id
else:
key = (remote_part_id, local_part_id)

remote_to_local_bdry_conns[key] = (
make_partition_connection(
self.array_context,
local_bdry_conn=irbi.local_boundary_connection,
remote_bdry_discr=irbi.local_boundary_connection.to_discr.copy(
actx=self.array_context,
mesh=remote_bdry_mesh,
group_factory=self.bdry_grp_factory),
remote_group_infos=remote_group_infos))

self.pending_recv_identifiers.remove((i_src_rank, recvd_remote_part_id))
self.pending_recv_identifiers.remove((local_part_id, remote_part_id))

if not self.pending_recv_identifiers:
MPI.Request.waitall(self.send_reqs)
Expand All @@ -381,6 +384,7 @@ def complete_some(self):
# }}}


# FIXME: Move somewhere else, since it's not strictly limited to distributed?
def get_partition_by_pymetis(mesh, num_parts, *, connectivity="facial", **kwargs):
"""Return a mesh partition created by :mod:`pymetis`.
Expand All @@ -390,7 +394,7 @@ def get_partition_by_pymetis(mesh, num_parts, *, connectivity="facial", **kwargs
``"facial"`` or ``"nodal"`` (based on vertices).
:arg kwargs: Passed unmodified to :func:`pymetis.part_graph`.
:returns: a :class:`numpy.ndarray` with one entry per element indicating
to which partition each element belongs, with entries between ``0`` and
to which part each element belongs, with entries between ``0`` and
``num_parts-1``.
.. versionchanged:: 2020.2
Expand Down Expand Up @@ -429,39 +433,33 @@ def get_partition_by_pymetis(mesh, num_parts, *, connectivity="facial", **kwargs
return np.array(p)


def get_connected_partitions(mesh: Mesh) -> "Set[int]":
"""For a local mesh part in *mesh*, determine the set of boundary
tags for connections to other parts, cf.
:class:`meshmode.mesh.InterPartitionAdjacencyGroup`.
def membership_list_to_map(membership_list):
"""
Convert a :class:`numpy.ndarray` that maps an index to a key into a
:class:`dict` that maps a key to a set of indices (with each set of indices
stored as a sorted :class:`numpy.ndarray`).
"""
assert mesh.facial_adjacency_groups is not None
# internal and deprecated, remove in July 2022

def _get_neighbor_part_nr(btag):
if isinstance(btag, BTAG_PARTITION):
return btag.part_nr
else:
raise ValueError("unexpected inter-partition boundary tag type found")

return {
_get_neighbor_part_nr(grp.boundary_tag)
for fagrp_list in mesh.facial_adjacency_groups
for grp in fagrp_list
if isinstance(grp, InterPartitionAdjacencyGroup)}
entry: np.where(membership_list == entry)[0]
for entry in set(membership_list)}


def get_inter_partition_tags(mesh: Mesh) -> "Set[BoundaryTag]":
"""For a local mesh part in *mesh*, determine the set of boundary
tags for connections to other parts, cf.
:class:`meshmode.mesh.InterPartitionAdjacencyGroup`.
"""
# FIXME: Move somewhere else, since it's not strictly limited to distributed?
def get_connected_parts(mesh: Mesh) -> "Set[PartID]":
"""For a local mesh part in *mesh*, determine the set of connected parts."""
assert mesh.facial_adjacency_groups is not None

return {
grp.boundary_tag
grp.part_id
for fagrp_list in mesh.facial_adjacency_groups
for grp in fagrp_list
if isinstance(grp, InterPartitionAdjacencyGroup)}
if isinstance(grp, InterPartAdjacencyGroup)}


def get_connected_partitions(mesh: Mesh) -> "Set[PartID]":
warn(
"get_connected_partitions is deprecated and will stop working in June 2023. "
"Use get_connected_parts instead.", DeprecationWarning, stacklevel=2)
return get_connected_parts(mesh)

# vim: foldmethod=marker
Loading

0 comments on commit 28d2f34

Please sign in to comment.