Skip to content

Commit

Permalink
tensor node: n-D tensor of sets -> (n+1)-D tensor
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinGeens committed Aug 16, 2024
1 parent ea00fe8 commit bb3a870
Show file tree
Hide file tree
Showing 27 changed files with 415 additions and 329 deletions.
22 changes: 8 additions & 14 deletions stream/classes/cost_model/memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, accelerator: "Accelerator") -> None:
self.top_instances: dict[Core, list[MemoryInstance]] = {}
# memory operand stored by every top level memory
self.memory_operands: dict[Core, list[list[MemoryOperand]]] = {}
for _, core in sorted([(core.id, core) for core in self.accelerator.core_iterator]):
for _, core in sorted([(core.id, core) for core in self.accelerator.core_list]):
top_levels: list[MemoryLevel] = list(
(level for level, out_degree in core.memory_hierarchy.out_degree() if out_degree == 0)
)
Expand Down Expand Up @@ -101,13 +101,10 @@ def find_tensor_in_top_instances(self, tensor: Tensor):
return instances_storing_tensor, available_since_timesteps

def find_tensor(self, tensor: Tensor):
(
instances_storing_tensor,
available_since_timesteps,
) = self.find_tensor_in_top_instances(tensor)
cores_storing_tensor = []
top_instance_idxs = []
available_since = []
instances_storing_tensor, available_since_timesteps = self.find_tensor_in_top_instances(tensor)
cores_storing_tensor: list[int] = []
top_instance_idxs: list[int] = []
available_since: list[int] = []
# Find which cores have these instances as their top instance
for core, top_instances in self.top_instances.items():
for top_instance_idx, top_instance in enumerate(top_instances):
Expand Down Expand Up @@ -297,12 +294,9 @@ def remove_tensor_from_top_instance(
)
)
except StopIteration:
# raise ValueError(
# f"No tensor found equal to {tensor} in top instance {top_instance}."
# )
# If the tensor is not present, we don't have to remove it.
# This is possible because in "Accelerator.transfer_tensor_to_core(...)"
# it removes a tensor on a sender core if detects it's no longer needed there.
# If the tensor is not present, we don't have to remove it. # This is possible because in
# `Accelerator.transfer_tensor_to_core(...)` it removes a tensor on a sender core if detects it's no longer
# needed there.
return
self.top_instance_stored_tensors[top_instance].remove(equivalent_tensor)
del self.top_instance_available_since_timestep[top_instance][tensor.equality_hash()]
Expand Down
4 changes: 2 additions & 2 deletions stream/classes/cost_model/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def get_tensors_needed_for_node(node: ComputationNode, G: ComputationNodeWorkloa
Args:
node (ComputationNode): The node to be computed.
G (DiGraph): The graph of all nodes.
G : The graph of all nodes.
Returns:
tuple: A tuple of tensors and a tuple of memory operands for the node.
Expand Down Expand Up @@ -231,7 +231,7 @@ def schedule_graph(
Each node should have a core_allocation and runtime set.
Args:
G (DiGraph): Graph containing the nodes to be scheduled.
G : Graph containing the nodes to be scheduled.
accelerator (Accelerator): The accelerator to schedule the nodes on.
cores_start_offset (dict, optional): A dict containing for each core_id its start offset. Defaults to None.
operands_to_prefetch (list, optional): The layer operands that should be prefetched at the start of the
Expand Down
20 changes: 4 additions & 16 deletions stream/classes/hardware/architecture/accelerator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from math import ceil

import networkx as nx
from networkx import DiGraph
from zigzag.datatypes import MemoryOperand
from zigzag.hardware.architecture.Core import Core
from zigzag.hardware.architecture.MemoryInstance import MemoryInstance
Expand All @@ -11,18 +9,12 @@
from stream.classes.cost_model.memory_manager import MemoryManager
from stream.classes.workload.computation_node import ComputationNode
from stream.classes.workload.tensor import Tensor
from stream.utils import DiGraphWrapper


class CoreGraph(DiGraph):
class CoreGraph(DiGraphWrapper[Core]):
"""Represents the core structure of an accelerator"""

@property
def node_list(self) -> list[Core]:
return list(self.nodes()) # type: ignore

def shortest_path(self, producer: Core, consumer: Core) -> list[Core]:
return nx.shortest_path(self, producer, consumer) # type: ignore


class Accelerator:
"""
Expand Down Expand Up @@ -65,13 +57,9 @@ def get_core(self, core_id: int) -> Core:
raise ValueError(f"Requested core with id {core_id} is not present in accelerator.")
return core

# @property
# def core_iterator(self) -> Iterator[Core]:
# return self.cores.nodes() # type: ignore

@property
def core_list(self):
return self.cores.node_list
def core_list(self) -> list[Core]:
return list(self.cores.node_list)

def spawn(
self,
Expand Down
6 changes: 2 additions & 4 deletions stream/classes/hardware/architecture/noc/bus.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from networkx import DiGraph
from zigzag.datatypes import Constants
from zigzag.hardware.architecture.Core import Core

from stream.classes.hardware.architecture.accelerator import CoreGraph
from stream.classes.hardware.architecture.noc.communication_link import CommunicationLink


Expand Down Expand Up @@ -147,6 +147,4 @@ def get_bus(
edges.append((offchip_core, simd_core, {"cl": from_offchip_link}))

# Build the graph using the constructed list of edges
H = DiGraph(edges)

return H
return CoreGraph(edges)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal

import numpy as np

Expand All @@ -14,7 +14,12 @@ class CommunicationLink:
"""Represents a fixed-bandwidth communication link used to communicate between two cores."""

def __init__(
self, sender: "Core", receiver: "Core", bandwidth: int, unit_energy_cost: float, bidirectional: bool = False
self,
sender: "Core | Literal['Any']",
receiver: "Core | Literal['Any']",
bandwidth: int | float,
unit_energy_cost: float,
bidirectional: bool = False,
) -> None:
self.sender = sender
self.receiver = receiver
Expand Down
17 changes: 5 additions & 12 deletions stream/classes/hardware/architecture/noc/mesh_2d.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import numpy as np
from networkx import DiGraph
from zigzag.datatypes import Constants
from zigzag.hardware.architecture.Core import Core

from stream.classes.hardware.architecture.accelerator import CoreGraph
from stream.classes.hardware.architecture.noc.communication_link import CommunicationLink


def have_shared_memory(a, b):
def have_shared_memory(a: Core, b: Core):
"""Returns True if core a and core b have a shared top level memory
Args:
Expand Down Expand Up @@ -59,7 +59,7 @@ def get_2d_mesh(

cores_array = np.asarray(cores).reshape((nb_rows, nb_cols), order="F")

edges = []
edges: list[tuple[Core, Core, dict[str, CommunicationLink]]] = []
# Horizontal edges
for row in cores_array:
# From left to right
Expand Down Expand Up @@ -115,8 +115,6 @@ def get_2d_mesh(

# If there is a pooling core, also add two edges from each core to the pooling core: one in each direction
if pooling_core:
if not isinstance(pooling_core, Core):
raise ValueError("The given pooling_core is not a Core object.")
for core in cores:
if not have_shared_memory(core, pooling_core):
edges.append(
Expand All @@ -139,8 +137,6 @@ def get_2d_mesh(
simd_bandwidth = float("inf")
simd_unit_energy_cost = 0
if simd_core:
if not isinstance(simd_core, Core):
raise ValueError("The given simd_core is not a Core object.")
for core in cores:
if not have_shared_memory(core, simd_core):
edges.append(
Expand Down Expand Up @@ -207,8 +203,7 @@ def get_2d_mesh(
else:
to_offchip_link = CommunicationLink("Any", offchip_core, offchip_write_bandwidth, unit_energy_cost)
from_offchip_link = CommunicationLink(offchip_core, "Any", offchip_read_bandwidth, unit_energy_cost)
if not isinstance(offchip_core, Core):
raise ValueError("The given offchip_core is not a Core object.")

for core in cores:
edges.append((core, offchip_core, {"cl": to_offchip_link}))
edges.append((offchip_core, core, {"cl": from_offchip_link}))
Expand All @@ -220,6 +215,4 @@ def get_2d_mesh(
edges.append((offchip_core, simd_core, {"cl": from_offchip_link}))

# Build the graph using the constructed list of edges
H = DiGraph(edges)

return H
return CoreGraph(edges)
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def set_node_core_allocations(self, core_allocations: list[int]):
# Find all nodes of this coarse id and set their core_allocation, energy and runtime
nodes = (
node
for node in self.workload.nodes()
for node in self.workload.node_list
if isinstance(node, ComputationNode) and node.id == layer_id and node.group == group_id
)
for node in nodes:
Expand Down
75 changes: 36 additions & 39 deletions stream/classes/opt/scheduling/layer_stacks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from enum import Enum

import networkx as nx
from networkx import DiGraph
from zigzag.workload.ONNXWorkload import ONNXWorkload as Workload
from zigzag.datatypes import Constants

from stream.classes.hardware.architecture.accelerator import Accelerator
from stream.classes.workload.computation_node import ComputationNode

CONSTANT_MEMORY_OPERAND = "I2"
from stream.classes.workload.onnx_workload import ComputationNodeWorkload


class LayerStackMode(Enum):
Expand All @@ -16,48 +14,50 @@ class LayerStackMode(Enum):
MANUAL = 2


def fits(occupations, core_capacities, occupation_factor):
def fits(occupations: dict[int, int], core_capacities: dict[int, int], occupation_factor: float):
return not any([occupations[core_id] >= core_capacities[core_id] * occupation_factor for core_id in occupations])


def update_occupations(workload: Workload, occupations, layer_id: int, group_ids: list[int]):
def update_occupations(
workload: ComputationNodeWorkload, occupations: dict[int, int], layer_id: int, group_ids: list[int]
):
for group_id in group_ids:
# Find a node that has this layer and group id and extract its constant op size
node = next(n for n in workload.nodes() if n.id == layer_id and n.group == group_id)
node = next(n for n in workload.node_list if n.id == layer_id and n.group == group_id)
constant_operands = node.constant_operands
if not constant_operands:
continue
# Assume last constant operand is correctr one for stack calculation
constant_operand = constant_operands[-1]
# Assert that the memory operand matches the assumed one for capacities
memory_operand = node.memory_operand_links[constant_operand]
assert memory_operand == CONSTANT_MEMORY_OPERAND
assert memory_operand == Constants.MEM_OP_2
# Get the size of the constant operand and add it to the current stack
size = node.operand_size_bit[constant_operand]
allocation = node.chosen_core_allocation
assert allocation is not None
occupations[allocation] += size


def get_layer_stacks_standard(workload: DiGraph):
def get_layer_stacks_standard(workload: ComputationNodeWorkload):
"""Return all layer ids in a single stack.
Args:
workload (DiGraph): The workload.
"""
layer_ids = sorted(set(node.id for node in workload.nodes()))
layer_ids = sorted(set(node.id for node in workload.node_list))
return [layer_ids]


def get_layer_stacks_occupation_based(
workload: DiGraph, # cn-wise workload
original_workload: DiGraph, # layer-wise workload
workload: ComputationNodeWorkload, # cn-wise workload
original_workload: ComputationNodeWorkload, # layer-wise workload
accelerator: Accelerator,
occupation_factor: int,
):
# Get all layer id, group combinations in the workload
layer_groups = {}
for n in workload.nodes():
layer_groups: dict[int, set[int]] = {}
for n in workload.node_list:
if not isinstance(n, ComputationNode):
continue
layer_id = n.id
Expand All @@ -67,20 +67,19 @@ def get_layer_stacks_occupation_based(
else:
layer_groups[layer_id] = {group_id}
# Active cores given the allocations of the workload
active_core_ids = sorted(set(n.core_allocation for n in workload.nodes()))
core_capacities = {
core_id: accelerator.get_core(core_id).get_memory_size_dict()[CONSTANT_MEMORY_OPERAND][-1]
active_core_ids = sorted(set(n.core_allocation for n in workload.node_list))
core_capacities: dict[int, ...] = {
core_id: accelerator.get_core(core_id).get_memory_size_dict()[Constants.MEM_OP_2][-1]
for core_id in active_core_ids
}
# Store all stacks
all_stacks = []
# Store the ids in the current stack
current_stack = []

all_stacks: list[list[int]] = []
current_stack: list[int] = []
# Track the constant operand occupation in all cores for the current stack
occupations = {core_id: 0 for core_id in active_core_ids}
# Compute the layer cutoffs based on the topological generations
# and the constant operands size
for i, generation in enumerate(nx.topological_generations(original_workload)):
for generation in nx.topological_generations(original_workload):
for original_node in generation:
if not isinstance(original_node, ComputationNode):
continue
Expand Down Expand Up @@ -115,25 +114,23 @@ def get_layer_stacks_occupation_based(


def get_layer_stacks(
workload: DiGraph, # cn-wise workload
original_workload: DiGraph, # layer-wise workload
workload: ComputationNodeWorkload, # cn-wise workload
original_workload: ComputationNodeWorkload, # layer-wise workload
accelerator: Accelerator,
occupation_factor: int,
mode: LayerStackMode,
layer_stacks,
):
if mode == LayerStackMode.STANDARD:
layer_stacks = get_layer_stacks_standard(workload)
elif mode == LayerStackMode.OCCUPATION_BASED:
layer_stacks = get_layer_stacks_occupation_based(
workload,
original_workload,
accelerator,
occupation_factor,
)
elif mode == LayerStackMode.MANUAL:
assert layer_stacks
layer_stacks = layer_stacks
else:
raise ValueError(f"Invalid layer stack calculation mode: {mode}.")
return layer_stacks
match mode:
case LayerStackMode.STANDARD:
return get_layer_stacks_standard(workload)
case LayerStackMode.OCCUPATION_BASED:
return get_layer_stacks_occupation_based(
workload,
original_workload,
accelerator,
occupation_factor,
)
case LayerStackMode.MANUAL:
assert layer_stacks
return layer_stacks
2 changes: 1 addition & 1 deletion stream/classes/stages/DetermineHintLoopsStage.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def get_hint_loops_fused(self):
return hint_loops

def get_nb_computation_nodes(self, stack):
nodes = [n for n in self.workload.nodes() if n.id in stack]
nodes = [n for n in self.workload.node_list if n.id in stack]
nb_computation_nodes = sum([isinstance(n, ComputationNode) for n in nodes])
return nb_computation_nodes

Expand Down
Loading

0 comments on commit bb3a870

Please sign in to comment.