Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#422: Stream data to vt-tv for rendering #463

Merged
merged 8 commits into from
May 21, 2024
46 changes: 46 additions & 0 deletions config/test-vt-tv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Specify input
from_data:
data_stem: /home/pierrelp/Develop/NGA/vt-tv/source/tests/unit/lb_test_data/data
phase_ids:
- 0
- 1
- 2
- 3
- 4
- 5
- 6
- 7
check_schema: False

# Specify work model
work_model:
name: AffineCombination
parameters:
alpha: 1.0
beta: 1.0e-08
gamma: 0.0

# Specify algorithm
algorithm:
name: PhaseStepper

# Specify output
output_dir: ../output
output_file_stem: output_file
visualization:
x_ranks: 2
y_ranks: 2
z_ranks: 1
object_jitter: 0.5
rank_qoi: load
object_qoi: load
save_meshes: true
force_continuous_object_qoi: true
output_visualization_dir: ../output
output_visualization_file_stem: output_file

write_JSON:
compressed: False
suffix: json
communications: True
offline_LB_compatible: True
56 changes: 40 additions & 16 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
import importlib
import yaml

try:
import vttv
using_vttv = True
except ModuleNotFoundError:
using_vttv = False

# pylint:disable=C0413:wrong-import-position
# Use lbaf module from source if lbaf package is not installed
if importlib.util.find_spec('lbaf') is None:
Expand All @@ -14,7 +20,6 @@
from lbaf import PROJECT_PATH, __version__
from lbaf.Execution.lbsRuntime import Runtime
from lbaf.IO.lbsConfigurationValidator import ConfigurationValidator
from lbaf.IO.lbsVisualizer import Visualizer
from lbaf.IO.lbsVTDataReader import LoadReader
from lbaf.IO.lbsVTDataWriter import VTDataWriter
from lbaf.Model.lbsPhase import Phase
Expand Down Expand Up @@ -113,6 +118,11 @@ def init_parameters(self, config: dict, base_dir: str):

# Parse visualizer parameters when available
if (viz := config.get("visualization")) is not None:

# Ensure that vttv module was found
if not using_vttv:
self.__logger.warning("Visualization enabled but vttv not found. No visualization will be generated.")

# Retrieve mandatory visualization parameters
try:
self.grid_size = []
Expand Down Expand Up @@ -570,21 +580,35 @@ def run(self):
self.__parameters.object_qoi
]

# Instantiate and execute visualizer
visualizer = Visualizer(
self.__logger,
qoi_request,
self.__parameters.continuous_object_qoi,
phases,
self.__parameters.grid_size,
self.__parameters.object_jitter,
self.__parameters.output_dir,
self.__parameters.output_file_stem,
runtime.get_distributions(),
runtime.get_statistics())
visualizer.generate(
self.__parameters.save_meshes,
not self.__parameters.rank_qoi is None)
# Call vttv visualization
if using_vttv:
self.__logger.info("Calling vt-tv")

# Serialize data to JSON-formatted string
self.__rank_phases = {}
for p in phases.values():
for r in p.get_ranks():
self.__rank_phases.setdefault(r.get_id(), {})
self.__rank_phases[r.get_id()][p.get_id()] = r

ranks_json_str = []
for i in range(len(self.__rank_phases.items())):
ranks_json_str.append(self.__json_writer._json_serializer((i, self.__rank_phases[i])))

vttv_params = {
"x_ranks": self.__parameters.grid_size[0],
"y_ranks": self.__parameters.grid_size[1],
"z_ranks": self.__parameters.grid_size[2],
"object_jitter": self.__parameters.object_jitter,
"rank_qoi": self.__parameters.rank_qoi,
"object_qoi": self.__parameters.object_qoi,
"save_meshes": self.__parameters.save_meshes,
"force_continuous_object_qoi": self.__parameters.continuous_object_qoi,
"output_visualization_dir": self.__parameters.output_dir,
"output_visualization_file_stem": self.__parameters.output_file_stem
}
num_ranks = self.__parameters.grid_size[0] * self.__parameters.grid_size[1] * self.__parameters.grid_size[2]
vttv.tvFromJson(ranks_json_str, str(vttv_params), num_ranks)

# Report on rebalanced phase when available
if rebalanced_phase:
Expand Down
2 changes: 1 addition & 1 deletion src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> tuple:
for k, v in comm.items():
self.__logger.debug(f"{k}: {v}")
else:
self.__communications_dict[phase_id] = {}
self.__communications_dict.setdefault(phase_id, {rank_id: {}})

# Instantiante rank for current phase
phase_rank = Rank(self.__logger, rank_id)
Expand Down
2 changes: 1 addition & 1 deletion src/lbaf/IO/lbsVTDataWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def write(self, phases: dict):
for p in phases.values():
for r in p.get_ranks():
self.__rank_phases.setdefault(r.get_id(), {})
self.__rank_phases[r.get_id()][p.get_id()]= r
self.__rank_phases[r.get_id()][p.get_id()] = r

# Prevent recursion overruns
sys.setrecursionlimit(25000)
Expand Down
4 changes: 4 additions & 0 deletions src/lbaf/Model/lbsObject.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ def get_sent_volume(self) -> float:
"""Return volume of communications sent by object."""
return sum([v for v in self.__communicator.get_sent().values()]) if self.__communicator else 0

def get_max_volume(self) -> float:
"""Return the maximum bytes received or sent by object."""
return self.__communicator.get_max_volume() if self.__communicator else 0

def set_rank_id(self, r_id: int) -> None:
"""Assign object to rank ID"""
self.__rank_id = r_id
Expand Down
9 changes: 9 additions & 0 deletions src/lbaf/Model/lbsObjectCommunicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ def get_sent_to_object(self, o):
"""Return the volume of a message received from an object if any."""
return self.__sent.get(o)

def get_max_volume(self):
"""Return the maximum bytes received or sent at this communicator."""
max_received, max_sent = 0., 0.
if len(self.__sent) > 0:
max_sent = max(self.__sent.values())
if len(self.__received) > 0:
max_received = max(self.__received.values())
return max(max_received, max_sent)

def summarize(self) -> tuple:
"""Summarize communicator properties and check for errors."""
# Summarize sent communications
Expand Down
Loading