Skip to content

Commit

Permalink
more typehint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinGeens committed Aug 13, 2024
1 parent df27aea commit ea00fe8
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 97 deletions.
30 changes: 16 additions & 14 deletions stream/classes/cost_model/communication_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from math import ceil
from typing import TYPE_CHECKING, Any

import networkx as nx
from zigzag.datatypes import Constants, MemoryOperand
from zigzag.hardware.architecture.Core import Core

Expand All @@ -12,6 +11,7 @@

if TYPE_CHECKING:
from stream.classes.hardware.architecture.accelerator import Accelerator
from stream.classes.hardware.architecture.noc.communication_link import CommunicationLink


class CommunicationEvent:
Expand Down Expand Up @@ -50,7 +50,9 @@ class CommunicationLinkEvent:
* the percentage of the link bandwidth used
"""

def __init__(self, type, start, end, tensors, energy, activity=100) -> None:
def __init__(
self, type: str, start: int, end: int, tensors: list[Tensor], energy: float, activity: float = 100
) -> None:
self.type = type
self.start = start
self.end = end
Expand Down Expand Up @@ -89,12 +91,10 @@ def __init__(self, accelerator: "Accelerator") -> None:

def get_shortest_paths(self):
# For each core pair save a shortest path
shortest_paths: dict[tuple[Core, Core], Any] = {}
for producer_core, consumer_core in itertools.product(
self.accelerator.cores.nodes(), self.accelerator.cores.nodes()
):
shortest_paths[(producer_core, consumer_core)] = nx.shortest_path(
self.accelerator.cores, producer_core, consumer_core
shortest_paths: dict[tuple[Core, Core], list[Core]] = {}
for producer_core, consumer_core in itertools.product(self.accelerator.core_list, self.accelerator.core_list):
shortest_paths[(producer_core, consumer_core)] = self.accelerator.cores.shortest_path(
producer_core, consumer_core
)
return shortest_paths

Expand Down Expand Up @@ -137,7 +137,7 @@ def update_links(
tensor: Tensor,
sender: Core | int,
receiver: Core | int,
receiver_memory_operand: str,
receiver_memory_operand: MemoryOperand,
start_timestep: int,
duration: int,
) -> tuple[int, int, float, float]:
Expand All @@ -163,7 +163,7 @@ def update_links(
receiver = self.accelerator.get_core(receiver)
links = self.get_links_for_pair(sender, receiver)
if not links: # When sender == receiver
return 0, 0
return 0, 0, 0, 0

cles = [
CommunicationLinkEvent(
Expand Down Expand Up @@ -214,7 +214,7 @@ def block_offchip_links(
duration (int): The duration of the blocking in cycles.
cn (ComputationNode): The computational node for which we are blocking the links.
"""
links_to_block = dict()
links_to_block: dict["CommunicationLink", int] = {}
core = self.accelerator.get_core(core_id)
offchip_core = self.accelerator.get_core(self.accelerator.offchip_core_id)
if Constants.OUTPUT_MEM_OP in too_large_operands:
Expand All @@ -230,7 +230,7 @@ def block_offchip_links(
if not too_large_operands:
return start_timestep
# Get the tensors for which we are blocking based on the operands
tensors = []
tensors: list[Tensor] = []
for mem_op in too_large_operands:
layer_op = cn.memory_operand_links.mem_to_layer_op(mem_op)
tensors.append(cn.operand_tensors[layer_op])
Expand All @@ -242,7 +242,9 @@ def block_offchip_links(
link.block(block_start, duration, tensors, activity=req_bw)
return block_start

def get_links_idle_window(self, links: dict, best_case_start: int, duration: int, tensors: list[Tensor]) -> int:
def get_links_idle_window(
self, links: dict["CommunicationLink", int], best_case_start: int, duration: int, tensors: list[Tensor]
) -> int:
"""Return the timestep at which tensor can be transfered across the links.
Both links must have an idle window large enough for the transfer.
The timestep must be greater than or equal to best_case_start.
Expand All @@ -254,7 +256,7 @@ def get_links_idle_window(self, links: dict, best_case_start: int, duration: int
tensors (list): The tensors to be transferred. Used to broadcast from previous transfer.
"""
assert len(links) > 0
idle_intersections = []
idle_intersections: list[tuple[int, int]] = []
for i, (link, req_bw) in enumerate(links.items()):
req_bw = min(req_bw, link.bandwidth) # ceil the bw
windows = link.get_idle_window(req_bw, duration, best_case_start, tensors)
Expand Down
2 changes: 1 addition & 1 deletion stream/classes/cost_model/cost_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(
workload: ComputationNodeWorkload,
accelerator: Accelerator,
operands_to_prefetch: list[str],
scheduling_order: list[int],
scheduling_order: list[tuple[int, int]],
) -> None:
# Initialize the SCME by setting the workload graph to be scheduled
self.workload = workload
Expand Down
41 changes: 22 additions & 19 deletions stream/classes/cost_model/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import logging
from operator import itemgetter
from typing import TYPE_CHECKING

from networkx import DiGraph
from zigzag.datatypes import Constants, LayerOperand, MemoryOperand
from zigzag.hardware.architecture.Core import Core

from stream.classes.hardware.architecture.accelerator import Accelerator
from stream.classes.workload.computation_node import ComputationNode
from stream.classes.workload.onnx_workload import ComputationNodeWorkload
from stream.classes.workload.tensor import Tensor

if TYPE_CHECKING:
from stream.classes.hardware.architecture.accelerator import Accelerator

logger = logging.getLogger(__name__)


def initialize_priorities(workload: ComputationNodeWorkload, accelerator: Accelerator):
def initialize_priorities(workload: ComputationNodeWorkload, accelerator: "Accelerator"):
for n in workload.node_list:
for tensor in n.operand_tensors.values():
tensor.initialize_instance_priorities(workload, n, accelerator)


def initialize_offchip_tensors(workload: ComputationNodeWorkload, accelerator: Accelerator):
def initialize_offchip_tensors(workload: ComputationNodeWorkload, accelerator: "Accelerator"):
offchip_core_id = accelerator.offchip_core_id
assert offchip_core_id is not None, "No offchip core found for this accelerator"
offchip_core = accelerator.get_core(offchip_core_id)
Expand All @@ -44,7 +46,7 @@ def initialize_offchip_tensors(workload: ComputationNodeWorkload, accelerator: A
)


def prefetch_constant_operands(G: ComputationNodeWorkload, accelerator: Accelerator, operands_to_prefetch: list[str]):
def prefetch_constant_operands(G: ComputationNodeWorkload, accelerator: "Accelerator", operands_to_prefetch: list[str]):
operands_to_prefetch_converted = [LayerOperand(x) for x in operands_to_prefetch]
total_cn_offchip_link_energy = 0
total_cn_offchip_memory_energy = 0
Expand Down Expand Up @@ -78,11 +80,14 @@ def prefetch_constant_operands(G: ComputationNodeWorkload, accelerator: Accelera
)


def get_best_candidate(candidates: list[ComputationNode], scheduling_order: list[int]) -> tuple[ComputationNode, int]:
def get_best_candidate(
candidates: list[tuple[int, ComputationNode]], scheduling_order: list[tuple[int, int]]
) -> tuple[ComputationNode, int]:
# If this core doesn't have any candidates, continue to the next core
if not candidates:
raise ValueError("There are no candidates to schedule.")
preds_ends, cn_candidates = zip(*candidates)
cn_candidates: list[ComputationNode]
idxs = [scheduling_order.index((n.id, n.sub_id)) for n in cn_candidates]
best_candidate_idx = idxs.index(min(idxs))
best_candidate = cn_candidates[best_candidate_idx]
Expand Down Expand Up @@ -132,7 +137,7 @@ def get_tensors_needed_for_node(node: ComputationNode, G: ComputationNodeWorkloa


def clear_memories(
accelerator: Accelerator,
accelerator: "Accelerator",
core: Core,
memory_operands: list[MemoryOperand],
timestep: int,
Expand All @@ -158,7 +163,7 @@ def clear_memories(
def decrease_priority(
tensors: list[Tensor],
tensors_operands: list[MemoryOperand],
accelerator: Accelerator,
accelerator: "Accelerator",
node: ComputationNode,
):
for tensor_used_by_node, tensor_memory_operand in zip(tensors, tensors_operands):
Expand All @@ -171,18 +176,15 @@ def decrease_priority(

def check_for_removal(
tensors: list[Tensor],
accelerator: Accelerator,
accelerator: "Accelerator",
node: ComputationNode,
G: DiGraph,
G: ComputationNodeWorkload,
timestep: int,
):
offchip_core_id = accelerator.offchip_core_id
for tensor_used_by_node in tensors:
if tensor_used_by_node.get_total_priority() == 0:
(
instances_storing_tensor,
_,
) = accelerator.memory_manager.find_tensor_in_top_instances(tensor_used_by_node)
instances_storing_tensor, _ = accelerator.memory_manager.find_tensor_in_top_instances(tensor_used_by_node)
for instance_storing_tensor in instances_storing_tensor:
core_ids_of_instance = [
core.id for core in accelerator.memory_manager.cores_per_top_instance[instance_storing_tensor]
Expand Down Expand Up @@ -220,11 +222,11 @@ def check_for_removal(

def schedule_graph(
G: ComputationNodeWorkload,
accelerator: Accelerator,
accelerator: "Accelerator",
cores_idle_from: dict[int, int] | None = None,
operands_to_prefetch: list[str] = [],
scheduling_order=None,
):
scheduling_order: list[tuple[int, int]] | None = None,
) -> tuple[int, float, float, float, float, float, float, float, float, float]:
"""Schedule the nodes of graph G across the cores in the system.
Each node should have a core_allocation and runtime set.
Expand Down Expand Up @@ -264,14 +266,15 @@ def schedule_graph(
# Put the very first nodes of a layer that doesn't have any incoming edges as the first candidates
for source_node in (n for n, d in G.in_degree() if d == 0):
core_allocation = source_node.chosen_core_allocation
candidates.append((cores_idle_from[core_allocation], source_node))
candidates.append((cores_idle_from[core_allocation], source_node)) # type: ignore

# Get all the nodes with no successors that produce final outputs, used for off-loading final outputs
sink_layers = sorted(set(n.id for n, d in G.out_degree() if d == 0))
sink_layer_nodes = set((n for n in G.node_list if (n.id in sink_layers) and n.produces_final_output))

# Get the offchip core id and core
offchip_core_id = accelerator.offchip_core_id
assert offchip_core_id is not None
offchip_core = accelerator.get_core(offchip_core_id)

# Schedule preparation:
Expand Down Expand Up @@ -433,7 +436,7 @@ def schedule_graph(
# Only push back sink node outputs if they're generated and stored on the core
if best_candidate.output_operand not in best_candidate.too_large_operands:
(
current_timestep,
_,
link_energy_cost,
memory_energy_cost,
) = accelerator.remove(
Expand Down
61 changes: 38 additions & 23 deletions stream/classes/hardware/architecture/accelerator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from math import ceil
from typing import Iterator

import networkx as nx
from networkx import DiGraph
from zigzag.datatypes import MemoryOperand
from zigzag.hardware.architecture.Core import Core
Expand All @@ -9,9 +9,21 @@

from stream.classes.cost_model.communication_manager import CommunicationManager
from stream.classes.cost_model.memory_manager import MemoryManager
from stream.classes.workload.computation_node import ComputationNode
from stream.classes.workload.tensor import Tensor


class CoreGraph(DiGraph):
"""Represents the core structure of an accelerator"""

@property
def node_list(self) -> list[Core]:
return list(self.nodes()) # type: ignore

def shortest_path(self, producer: Core, consumer: Core) -> list[Core]:
return nx.shortest_path(self, producer, consumer) # type: ignore


class Accelerator:
"""
The Accelerator class houses a set of Cores with an additional Global Buffer.
Expand All @@ -22,7 +34,7 @@ class Accelerator:
def __init__(
self,
name: str,
cores: DiGraph,
cores: CoreGraph,
offchip_core_id: int | None = None,
):
self.name = name
Expand All @@ -48,35 +60,35 @@ def get_core(self, core_id: int) -> Core:
Return the core with id 'core_id'.
Raises ValueError() when a core_id is not found in the available cores.
"""
core = next((core for core in self.core_iterator if core.id == core_id), None)
core = next((core for core in self.core_list if core.id == core_id), None)
if core is None:
raise ValueError(f"Requested core with id {core_id} is not present in accelerator.")
return core

@property
def core_iterator(self) -> Iterator[Core]:
return self.cores.nodes() # type: ignore
# @property
# def core_iterator(self) -> Iterator[Core]:
# return self.cores.nodes() # type: ignore

@property
def core_list(self) -> list[Core]:
return list(self.cores.nodes()) # type: ignore
def core_list(self):
return self.cores.node_list

def spawn(
self,
tensor: Tensor,
core: Core,
memory_op: str,
memory_op: MemoryOperand,
initial_timestep: int,
available_timestep: int,
):
"""Spawns a tensor on a core.
Args:
tensor (Tensor): The tensor to be spawned.
core (Core): The core on which to spawn the tensor.
memory_op (str): The memory operand on the core where the tensor will spawn.
initial_timestep (int): The timestep at which space will be reserved for the tensor.
available_timestep (int): The timestep at which the tensor will become available. Different from
tensor: The tensor to be spawned.
core: The core on which to spawn the tensor.
memory_op: The memory operand on the core where the tensor will spawn.
initial_timestep: The timestep at which space will be reserved for the tensor.
available_timestep: The timestep at which the tensor will become available. Different from
initial_timestep when it is transferred.
"""
self.memory_manager.add_tensor_to_core(tensor, core, initial_timestep, available_timestep, memory_op)
Expand All @@ -93,6 +105,7 @@ def remove(
timestep (int): The timestep to remove the tensor at.
write_back_to_offchip (bool, optional): Write the tensor to offchip before removal. Defaults to False.
"""
assert self.offchip_core_id is not None
################################# STEP 1 #################################
# Transfer the tensor to off-chip if required and not present there
link_energy_cost = 0
Expand Down Expand Up @@ -234,7 +247,7 @@ def transfer_tensor_to_core(
tensor_operand: MemoryOperand,
non_evictable_tensors: list[Tensor],
sending_core_id: int | None = None,
):
) -> tuple[int, float, float, float, float, bool]:
"""
Transfer a tensor to a given core id.
If the tensor is already present on the receiving core, nothing happens.
Expand Down Expand Up @@ -275,10 +288,7 @@ def transfer_tensor_to_core(
tensor.equality_hash()
]
else:
(
_,
available_since_timesteps,
) = self.find_tensor_in_top_instances(tensor)
(_, available_since_timesteps) = self.find_tensor_in_top_instances(tensor)
# Pick the core that has stored the tensor the longest
available_since_timestep = min(available_since_timesteps.values())
storing_instance = next(
Expand Down Expand Up @@ -323,9 +333,7 @@ def transfer_tensor_to_core(
links,
evictions_complete_timestep,
transfer_duration,
[
tensor,
],
[tensor],
)
transfer_end = transfer_start + transfer_duration
################################# STEP 5 #################################
Expand Down Expand Up @@ -401,7 +409,14 @@ def get_memory_energy_cost_of_transfer(

return sender_energy + receiver_energy

def block_offchip_links(self, too_large_operands, core_id, start_timestep, duration, cn) -> int:
def block_offchip_links(
self,
too_large_operands: list[MemoryOperand],
core_id: int,
start_timestep: int,
duration: int,
cn: ComputationNode,
) -> int:
return self.communication_manager.block_offchip_links(too_large_operands, core_id, start_timestep, duration, cn)

def contains_tensor(self, tensor: Tensor, top_instance: int | MemoryInstance):
Expand Down
Loading

0 comments on commit ea00fe8

Please sign in to comment.