Skip to content

Commit

Permalink
[π˜€π—½π—Ώ] initial version
Browse files Browse the repository at this point in the history
Created using spr 1.3.5-beta.1
  • Loading branch information
orausch committed Aug 20, 2022
2 parents 3c799c5 + 00d1787 commit f76122e
Show file tree
Hide file tree
Showing 14 changed files with 1,513 additions and 10 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
- name: Test with pytest
env:
ORT_RELEASE: ${{ github.workspace }}/onnxruntime-daceml-patched
PYTEST_ARGS: --cov=daceml --cov-report=term --cov-report xml --cov-config=.coveragerc -m "not fpga and not xilinx and not gpu and not onnx" --timeout=500
PYTEST_ARGS: --cov=daceml --cov-report=term --cov-report xml --cov-config=.coveragerc -m "not fpga and not xilinx and not gpu and not onnx and not mpi" --timeout=500
run: make test

- name: Test with doctest
Expand Down Expand Up @@ -95,7 +95,7 @@ jobs:

- name: Test with pytest
env:
PYTEST_ARGS: --cov=daceml --cov-report=term --cov-report xml --cov-config=.coveragerc -m "not fpga and not xilinx and not gpu and not onnx" --timeout=500 --skip-ort
PYTEST_ARGS: --cov=daceml --cov-report=term --cov-report xml --cov-config=.coveragerc -m "not fpga and not xilinx and not gpu and not onnx and not mpi" --timeout=500 --skip-ort
run: make test

- name: Upload coverage
Expand Down
38 changes: 38 additions & 0 deletions .github/workflows/dist-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Distributed CI

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
test-distributed:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'no-ci') }}
runs-on: [self-hosted, linux, gpu]

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
submodules: 'recursive'

- name: Install dependencies
env:
UPDATE_PIP: 'true'
run: |
rm -rf .dacecache tests/.dacecache
. /opt/setupenv
make clean install
venv/bin/pip install mpi4py
- name: Run Distributed Tests
env:
PYTEST: venv/bin/coverage run --parallel-mode --source=daceml -m pytest
PYTEST_ARGS: -s
PYTEST_PLUGINS: tests.distributed.mpi_mute
MPI_PREFIX: mpirun -np 8 --oversubscribe
run: make test-distributed

- name: Upload coverage
run: make codecov
2 changes: 1 addition & 1 deletion .github/workflows/gpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Test with pytest
env:
PYTEST_ARGS: --cov=daceml --cov-report=term --cov-report xml --cov-config=.coveragerc --gpu-only -m "not slow and not fpga and not xilinx and not onnx" --timeout=500
PYTEST_ARGS: --cov=daceml --cov-report=term --cov-report xml --cov-config=.coveragerc --gpu-only -m "not slow and not fpga and not xilinx and not onnx and not mpi" --timeout=500
run: make test

- name: Upload coverage
Expand Down
19 changes: 12 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ VENV_PATH ?= venv
PYTHON ?= python
PYTHON_BINARY ?= python
PYTEST ?= pytest
MPI_PREFIX ?= mpirun
PIP ?= pip
YAPF ?= yapf

Expand All @@ -17,13 +18,6 @@ ACTIVATE = . $(VENV_PATH)/bin/activate &&
endif

.PHONY: clean doc doctest test test-gpu codecov check-formatting check-formatting-names clean-dacecaches yapf
clean:
! test -d $(VENV_PATH) || rm -r $(VENV_PATH)

venv:
ifneq ($(VENV_PATH),)
test -d $(VENV_PATH) || echo "Creating new venv" && $(PYTHON) -m venv ./$(VENV_PATH)
endif

install: venv
ifneq ($(VENV_PATH),)
Expand All @@ -35,6 +29,14 @@ endif
$(ACTIVATE) $(PIP) install $(TORCH_VERSION)
$(ACTIVATE) $(PIP) install -e .[testing,docs]

clean:
! test -d $(VENV_PATH) || rm -r $(VENV_PATH)

venv:
ifneq ($(VENV_PATH),)
test -d $(VENV_PATH) || echo "Creating new venv" && $(PYTHON) -m venv ./$(VENV_PATH)
endif

doc:
# suppress warnings in ONNXOps docstrings using grep -v
$(ACTIVATE) cd doc && make clean html 2>&1 \
Expand Down Expand Up @@ -62,6 +64,9 @@ test-intel-fpga:
test-xilinx:
$(ACTIVATE) $(PYTEST) $(PYTEST_ARGS) tests/torch/fpga/

test-distributed:
$(ACTIVATE) $(MPI_PREFIX) $(PYTEST) $(PYTEST_ARGS) tests/distributed

codecov:
curl -s https://codecov.io/bash | bash

Expand Down
1 change: 1 addition & 0 deletions daceml/distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import schedule
125 changes: 125 additions & 0 deletions daceml/distributed/communication/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import dace
import dace.library
from dace import nodes, properties, SDFG, SDFGState, subsets

from . import subarrays

from daceml.util import utils

#: a placeholder variable that is used for all fully replicated ranks
FULLY_REPLICATED_RANK = 'FULLY_REPLICATED_RANK'


@dace.library.node
class DistributedMemlet(nodes.LibraryNode):
"""
Communication node that distributes input to output based on symbolic
expressions for the rank-local subsets.
These expressions (``src_subset`` and ``dst_subset``) should include
symbolic variables given in ``src_rank_variables`` and
``dst_rank_variables``.
"""

# Global properties
implementations = {
"subarrays": subarrays.CommunicateSubArrays,
}
default_implementation = "subarrays"

src_rank_variables = properties.ListProperty(
element_type=str,
desc="List of variables used in the indexing expressions that represent"
" rank identifiers in the source expression")
src_pgrid = properties.Property(dtype=str, allow_none=True)
src_subset = properties.RangeProperty(
default=subsets.Range([]),
desc="Subset of the input array that is held on each rank")
src_global_array = properties.DataProperty()

dst_rank_variables = properties.ListProperty(
element_type=str,
desc="List of variables used in the indexing expressions that represent"
"rank identifiers in the destination expression")
dst_pgrid = properties.Property(dtype=str, allow_none=True)
dst_subset = properties.RangeProperty(
default=subsets.Range([]),
desc="Subset of the output array that is held on each rank")
dst_global_array = properties.DataProperty()

def __init__(self, name, src_rank_variables, src_pgrid, src_subset,
src_global_array, dst_rank_variables, dst_pgrid, dst_subset,
dst_global_array):
super().__init__(name)
self.src_rank_variables = src_rank_variables
self.src_pgrid = src_pgrid
self.src_subset = src_subset
self.src_global_array = src_global_array
self.dst_rank_variables = dst_rank_variables
self.dst_pgrid = dst_pgrid
self.dst_subset = dst_subset
self.dst_global_array = dst_global_array

def validate(self, sdfg: SDFG, state: SDFGState):

if self.src_global_array not in sdfg.arrays:
raise ValueError(
f"{self.src_global_array} is not an array in the SDFG")
if self.dst_global_array not in sdfg.arrays:
raise ValueError(
f"{self.dst_global_array} is not an array in the SDFG")

src_free_vars = self.src_subset.free_symbols
dst_free_vars = self.dst_subset.free_symbols

if self.src_pgrid is None and self.dst_pgrid is None:
raise ValueError("At least one process grid must be specified")

if src_free_vars.difference(self.src_rank_variables):
raise ValueError(
"Source subset has free variables that are not rank variables")
if dst_free_vars.difference(self.dst_rank_variables):
raise ValueError(
"Destination subset has free variables that are not rank "
"variables")

if FULLY_REPLICATED_RANK in src_free_vars or FULLY_REPLICATED_RANK in dst_free_vars:
raise RuntimeError(
"Fully replicated rank appeared in free variables, this should"
" not happen")

inp_buffer, out_buffer = None, None

if state.out_degree(self) != 1:
raise ValueError(
"SymbolicCommunication node must have exactly one output edge")
if state.in_degree(self) != 1:
raise ValueError(
"SymbolicCommunication node must have exactly one input edge")
out_buffer = sdfg.arrays[state.out_edges(self)[0].data.data]
inp_buffer = sdfg.arrays[state.in_edges(self)[0].data.data]
if inp_buffer.dtype != out_buffer.dtype:
raise ValueError(
"Input and output buffers must have the same data type")

# Check that subset sizes are correct
if not utils.all_equal(self.src_subset.size_exact(), inp_buffer.shape):
raise ValueError(
f"Source subset size {self.src_subset.size_exact()} does not"
f" match input buffer size {inp_buffer.shape}")
if not utils.all_equal(self.dst_subset.size_exact(), out_buffer.shape):
raise ValueError(
f"Destination subset size {self.dst_subset.size_exact()} does"
f" not match output buffer size {out_buffer.shape}")

# Check process grids
if self.src_pgrid and self.src_pgrid not in sdfg.process_grids:
raise ValueError("Source process grid not found")
if self.dst_pgrid and self.dst_pgrid not in sdfg.process_grids:
raise ValueError("Destination process grid not found")

return inp_buffer, out_buffer

def __str__(self):
return (f"{self.src_global_array}[{self.src_subset}] ->"
f" {self.dst_global_array}[{self.dst_subset}]")
Loading

0 comments on commit f76122e

Please sign in to comment.