Skip to content

Commit

Permalink
#3: Add object communications in JSON reader, api, and (not working y…
Browse files Browse the repository at this point in the history
…et) in visualization
  • Loading branch information
pierrepebay committed May 30, 2023
1 parent 386dd28 commit 985e645
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 7 deletions.
5 changes: 3 additions & 2 deletions examples/example2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ int main() {
using namespace tv;
// Read JSON file and input data

std::filesystem::path p = std::filesystem::path(SRC_DIR) / "tests/unit/lb_test_data" ;
std::filesystem::path p = "/Users/pierrepebay/Develop/LB-analysis-framework/data/synthetic_lb_data"; //std::filesystem::path(SRC_DIR) / "tests/unit/lb_test_data" ;
std::string path = std::filesystem::absolute(p).string();

NodeType rank = 0;
Expand All @@ -105,11 +105,12 @@ int main() {
info->addInfo(info2->getObjectInfo(), info2->getRank(rank2));
info->addInfo(info3->getObjectInfo(), info3->getRank(rank3));

info->getPhaseObjects(1,4);
info->getPhaseObjects(0,4);
fmt::print("===================\n");
info->getAllObjects(4);

fmt::print("===================\n");
info->normalizeEdges(0,4);

auto const& obj_info = info->getObjectInfo();

Expand Down
47 changes: 46 additions & 1 deletion src/vt-tv/api/info.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ struct Info {
}
}
}
fmt::print("Size of all objects: {}", objects.size());
fmt::print("Size of all objects: {}\n", objects.size());
return objects;
}

Expand All @@ -215,6 +215,51 @@ struct Info {
*/
std::size_t getNumRanks() const { return ranks_.size(); }

/**
* \brief Normalize communications for a phase: ensure receives and sends coincide
*
* \return void
*/
void normalizeEdges(PhaseType phase, uint64_t n_ranks) {
fmt::print("---- Normalizing Edges for phase {} ----\n", phase);
auto phaseObjects = getPhaseObjects(phase, n_ranks);
for (auto& [id1, objectWork1] : phaseObjects) {
auto& sent1 = objectWork1.getSent();
auto& received1 = objectWork1.getReceived();
for (auto& [id2, objectWork2] : phaseObjects) {
// No communications to oneself
if (id1 != id2) {
fmt::print("--Communication between object {} and object {}\n\n", id1, id2);
auto& sent2 = objectWork2.getSent();
auto& received2 = objectWork2.getReceived();
// Communications existing on object 2, to be added on object 1
fmt::print(" Communications existing on object {}, to be added on object {}:\n", id2, id1);
if (sent2.find(id1) != sent2.end()) {
fmt::print(" adding sent from object {} to received by object {}\n", id2, id1);
objectWork1.addReceivedCommunications(id2, sent2.at(id1));
} else if (received2.find(id1) != received2.end()) {
fmt::print(" adding received from object {} to sent by object {}\n", id2, id1);
objectWork1.addSentCommunications(id2, received2.at(id1));
} else {
fmt::print(" None\n");
}
// Communications existing on object 1, to be added on object 2
fmt::print(" Communications existing on object {}, to be added on object {}:\n", id1, id2);
if (sent1.find(id2) != sent1.end()) {
fmt::print(" adding sent from object {} to received by object {}\n", id1, id2);
objectWork2.addReceivedCommunications(id1, sent1.at(id2));
} else if (received2.find(id1) != received2.end()) {
fmt::print(" adding received from object {} to sent by object {}\n", id1, id2);
objectWork2.addSentCommunications(id1, received1.at(id2));
} else {
fmt::print(" None\n");
}
fmt::print("\n");
}
}
}
}

private:
/// All the object info that doesn't change across phases
std::unordered_map<ElementIDType, ObjectInfo> object_info_;
Expand Down
128 changes: 128 additions & 0 deletions src/vt-tv/api/object_communicator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#if !defined INCLUDED_VT_TV_API_OBJECT_COMMUNICATOR_H
#define INCLUDED_VT_TV_API_OBJECT_COMMUNICATOR_H

#include <string>
#include <iostream>
#include <list>
#include <map>
#include <unordered_set>
#include <vector>
#include <utility>

#include "vt-tv/api/types.h"
#include <fmt-vt/format.h>

namespace vt::tv {

struct Object;

/**
* A class holding received and sent messages for an object.
*/
struct ObjectCommunicator
{
private:
ElementIDType object_id_;
std::map<ElementIDType, double> received_;
std::map<ElementIDType, double> sent_;

/**
* Summarize one-way communicator properties and check for errors.
*/
std::vector<double> summarize_unidirectional(std::string direction) const {
// Initialize list of volumes
std::vector<double> volumes;

// Iterate over one-way communications
std::map<ElementIDType, double> communications;
if(direction == "to") {
communications = this->sent_;
} else {
communications = this->received_;
}

// for (const auto& [key, value] : communications) {
// sanity check
// if(key->get_id() == this->object_id_) {
// throw nb::index_error(
// "object " + this->object_id_ +
// " cannot send communication to itself.");
// }
// volumes.push_back(value);
// }

return volumes;
}
public:
/**
* \brief get id of object for this communicator
*
* \return id
*/
ElementIDType get_object_id() { return object_id_; };

/**
* Return all from_object=volume pairs received by object.
*/
std::map<ElementIDType, double>& get_received() { return this->received_; };

/**
* Return the volume of a message received from an object if any.
*/
double get_received_from_object(ElementIDType id) const {
return this->received_.at(id);
}

/**
* Return all to_object=volume pairs sent from object.
*/
std::map<ElementIDType, double>& get_sent() { return this->sent_; };

/**
* Return the volume of a message sent to an object if any.
*/
double get_sent_to_object(ElementIDType id) const {
return this->sent_.at(id);
}

/**
* Summarize communicator properties and check for errors.
*/
std::pair<std::vector<double>, std::vector<double>> summarize() const {
// Summarize sent communications
std::vector<double> w_sent = this->summarize_unidirectional("to");

// Summarize received communications
std::vector<double> w_recv = this->summarize_unidirectional("from");

return std::make_pair(w_sent, w_recv);
}

void addReceived(ElementIDType from_id, double bytes) {
this->received_.insert(std::make_pair(from_id, bytes));
if (from_id == this->object_id_) fmt::print("Object {} receiving communication from myself\n",this->object_id_);
}

void addSent(ElementIDType to_id, double bytes) {
this->sent_.insert(std::make_pair(to_id, bytes));
if (to_id == this->object_id_) fmt::print("Object {} sending communication to myself\n",this->object_id_);
}

ObjectCommunicator(ElementIDType id_in)
: object_id_(id_in)
, received_()
, sent_()
{}
ObjectCommunicator(ElementIDType id_in,
std::map<ElementIDType, double> recv_in,
std::map<ElementIDType, double> sent_in)
: object_id_(id_in)
, received_(recv_in)
, sent_(sent_in)
{}
~ObjectCommunicator() = default;
};

} /* end namesapce vt::tv */

#endif /*INCLUDED_VT_TV_API_OBJECT_COMMUNICATOR_H*/
60 changes: 58 additions & 2 deletions src/vt-tv/api/object_work.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
#if !defined INCLUDED_VT_TV_API_OBJECT_WORK_H
#define INCLUDED_VT_TV_API_OBJECT_WORK_H

#include "vt-tv/api/types.h"
#include "vt-tv/api/object_communicator.h"

#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -78,7 +78,8 @@ struct ObjectWork {
) : id_(in_id),
whole_phase_load_(in_whole_phase_load),
subphase_loads_(std::move(in_subphase_loads)),
user_defined_(std::move(in_user_defined))
user_defined_(std::move(in_user_defined)),
communicator_(id_)
{ }

/**
Expand Down Expand Up @@ -109,6 +110,60 @@ struct ObjectWork {
*/
auto const& getUserDefined() const { return user_defined_; }

/**
* \brief set communications for this object
*
* \return void
*/
void setCommunications(ObjectCommunicator c) {
assert(c.get_object_id() == id_);
communicator_ = c;
};

/**
* \brief add sent communication to this object
*
* \return void
*/
void addSentCommunications(ElementIDType to_id, double bytes) {
communicator_.addSent(to_id, bytes);
};

/**
* \brief add received communication to this object
*
* \return void
*/
void addReceivedCommunications(ElementIDType from_id, double bytes) {
communicator_.addReceived(from_id, bytes);
};

// void normalizeEdges(Info& i) {
// auto const& received = communicator_.get_received();
// for (auto const& [elm, bytes] : received) {
// i.getPhaseObjects(elm).addSentCommunications(id_, bytes);
// }

// auto const& sent = communicator_.get_sent();
// for (auto const& [elm, bytes] : send) {
// pw.getObject(elm).addSentCommunications(id_, bytes);
// }
// }

/**
* \brief get received communications for this object
*/
std::map<ElementIDType, double>& getReceived() {
return communicator_.get_received();
}

/**
* \brief get sent communications for this object
*/
std::map<ElementIDType, double>& getSent() {
return communicator_.get_sent();
}

private:
/// Element ID
ElementIDType id_ = 0;
Expand All @@ -120,6 +175,7 @@ struct ObjectWork {
std::unordered_map<std::string, VariantType> user_defined_;

/// @todo: add communications
ObjectCommunicator communicator_;
};

} /* end namesapce vt::tv */
Expand Down
7 changes: 7 additions & 0 deletions src/vt-tv/api/phase_work.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ struct PhaseWork {
*/
auto const& getObjectWork() const { return objects_; }

/**
* \brief set communications for an object in this phase
*
* \return void
*/
void setCommunications(ElementIDType o_id, ObjectCommunicator c) { objects_.at(o_id).setCommunications(c); };

private:
/// Phase identifier
PhaseType phase_ = 0;
Expand Down
53 changes: 52 additions & 1 deletion src/vt-tv/render/render.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ vtkNew<vtkPolyData> Render::create_object_mesh_(PhaseWork phase) {
// @todo
// Iterate over ranks and objects to create mesh points
uint64_t point_index = 0;
std::map<ObjectInfo, uint64_t> point_to_index;
std::map<ElementIDType, uint64_t> objectid_to_index;

auto object_mapping = this->create_object_mapping_(p_id);

Expand Down Expand Up @@ -347,16 +347,67 @@ vtkNew<vtkPolyData> Render::create_object_mesh_(PhaseWork phase) {
b_arr->SetTuple1(point_index, sentinel);
i++;
point_index++;
objectid_to_index.insert(std::make_pair(objectWork.getID(), point_index));
}
}

vtkNew<vtkDoubleArray> lineValuesArray;
lineValuesArray->SetName("bytes");
vtkNew<vtkCellArray> lines;
uint64_t n_e = 0;
auto phaseObjects = this->info_.getPhaseObjects(phase.getPhase(), this->n_ranks_);

for (auto& [id, objWork] : phaseObjects) {
fmt::print("id {}\n", id);
std::map<ElementIDType, double>& receivedCommunications = objWork.getSent();
fmt::print("map size: {}\n", receivedCommunications.size());
for (auto& [from_id, bytes] : receivedCommunications) {
fmt::print("id {} <-> from_id {}\n", id, from_id);
vtkNew<vtkLine> line;
lineValuesArray->InsertNextTuple1(bytes);
line->GetPointIds()->SetId(0, objectid_to_index.at(id));
line->GetPointIds()->SetId(1, objectid_to_index.at(from_id));
lines->InsertNextCell(line);
}
}

vtkNew<vtkPolyData> pd_mesh;
pd_mesh->SetPoints(points);
pd_mesh->SetLines(lines);
pd_mesh->GetPointData()->SetScalars(q_arr);
pd_mesh->GetPointData()->AddArray(b_arr);
pd_mesh->GetCellData()->SetScalars(lineValuesArray);
fmt::print("\n\n");
fmt::print("-----finished creating object mesh-----\n");

// Setup the visualization pipeline
vtkNew<vtkNamedColors> namedColors;
vtkNew<vtkPolyData> linesPolyData;
linesPolyData->SetPoints(points);
linesPolyData->SetLines(lines);
linesPolyData->GetCellData()->SetScalars(lineValuesArray);
vtkNew<vtkPolyDataMapper> mapper;
mapper->SetInputData(linesPolyData);

vtkNew<vtkActor> actor;
actor->SetMapper(mapper);
actor->GetProperty()->SetLineWidth(4);

vtkNew<vtkRenderer> renderer;
renderer->AddActor(actor);
renderer->SetBackground(namedColors->GetColor3d("SlateGray").GetData());

vtkNew<vtkRenderWindow> window;
window->SetWindowName("Colored Lines");
window->AddRenderer(renderer);

vtkNew<vtkRenderWindowInteractor> interactor;
interactor->SetRenderWindow(window);

// Visualize
window->Render();
interactor->Start();

return pd_mesh;
}

Expand Down
Loading

0 comments on commit 985e645

Please sign in to comment.