Skip to content

Commit

Permalink
#16: Centralize JSON reading with file and string capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrepebay committed Apr 15, 2024
1 parent 887fb05 commit 6459c47
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 197 deletions.
166 changes: 7 additions & 159 deletions bindings/python/tv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,168 +98,16 @@ void tv_from_json(const std::vector<std::string>& input_json_per_rank_list, cons
// Initialize the info object, that will hold data for all ranks for all phases
std::unique_ptr<Info> info = std::make_unique<Info>();

// map to store rank communications: for each rank, list of tuples of from_id, to_id, bytes
std::unordered_map<int64_t, std::vector<std::tuple<int64_t, int64_t, double>>> rank_communications;

# pragma omp parallel for
for (int64_t rank_id = 0; rank_id < num_ranks; rank_id++) {
fmt::print("Reading file for rank {}\n", rank_id);
std::string rank_json_str = input_json_per_rank_list[rank_id];
try {
std::cerr << "vt-tv: Parsing JSON for rank " << rank_id << "\n";
auto j = json::parse(rank_json_str);
assert(j != nullptr && "Must have valid json");

std::unordered_map<ElementIDType, ObjectInfo> object_info;
std::unordered_map<PhaseType, PhaseWork> phase_info;

// std::cerr << "vt-tv: Reading rank " << rank_id << "\n";

auto phases = j["phases"];

if (phases.is_array()) {
for (auto const& phase : phases) {
auto id = phase["id"];
std::cerr << "vt-tv: Reading phase " << id << "\n";
auto tasks = phase["tasks"];

std::unordered_map<ElementIDType, ObjectWork> objects;

if (tasks.is_array()) {
for (auto const& task : tasks) {
auto node = task["node"];
auto time = task["time"];
auto etype = task["entity"]["type"];
assert(time.is_number());
assert(node.is_number());

if (etype == "object") {
auto object = task["entity"]["id"];
auto home = task["entity"]["home"];
bool migratable = task["entity"]["migratable"];
assert(object.is_number());
assert(home.is_number());

// std::cerr << "vt-tv: Processing object " << object << " in phase " << id << "\n";
std::vector<UniqueIndexBitType> index_arr;


if (
task["entity"].find("collection_id") != task["entity"].end() and
task["entity"].find("index") != task["entity"].end()
) {
auto cid = task["entity"]["collection_id"];
auto idx = task["entity"]["index"];
if (cid.is_number() && idx.is_array()) {
std::vector<UniqueIndexBitType> arr = idx;
index_arr = std::move(arr);
}
}

ObjectInfo oi{object, home, migratable, std::move(index_arr)};

if (task["entity"].find("collection_id") != task["entity"].end()) {
oi.setIsCollection(true);
oi.setMetaID(task["entity"]["collection_id"]);
}

if (task["entity"].find("objgroup_id") != task["entity"].end()) {
oi.setIsObjGroup(true);
oi.setMetaID(task["entity"]["objgroup_id"]);
}

object_info.try_emplace(object, std::move(oi));

std::unordered_map<SubphaseType, TimeType> subphase_loads;

if (task.find("subphases") != task.end()) {
auto subphases = task["subphases"];
if (subphases.is_array()) {
for (auto const& s : subphases) {
auto sid = s["id"];
auto stime = s["time"];

assert(sid.is_number());
assert(stime.is_number());

subphase_loads[sid] = stime;
}
}
}

std::unordered_map<std::string, ObjectWork::VariantType> user_defined;
if (task.find("user_defined") != task.end()) {
auto user_defined = task["user_defined"];
if (user_defined.is_object()) {
for (auto& [key, value] : user_defined.items()) {
user_defined[key] = value;
}
}
}
// fmt::print(" Add object {}\n", (ElementIDType)object);
objects.try_emplace(
object,
ObjectWork{
object, time, std::move(subphase_loads), std::move(user_defined)
}
);
}
}
}

fmt::print(" Parsing communications.\n");
if (phase.find("communications") != phase.end()) {
auto communications = phase["communications"];
if (communications.is_array()) {
for (auto const& comm : communications) {
auto type = comm["type"];
if (type == "SendRecv") {
double bytes = comm["bytes"];
auto messages = comm["messages"];

auto from = comm["from"];
auto to = comm["to"];

ElementIDType from_id = from["id"];
ElementIDType to_id = to["id"];

assert(bytes.is_number());
assert(from.is_number());
assert(to.is_number());

fmt::print(" {} -> {} // {} bytes\n", from_id, to_id, bytes);
// std::cout << " bytes: " << bytes << std::endl;
// Object on this rank sent data
if (objects.find(from_id) != objects.end()) {
fmt::print(" Found sender object.\n");
fmt::print(" Adding Sent communication from object {} to object {}\n", from_id, to_id);
objects.at(from_id).addSentCommunications(to_id, bytes);
} else {
fmt::print(" Didn't find sender object.\n");
}
if (objects.find(to_id) != objects.end()) {
fmt::print(" Found recipient object.\n");
fmt::print(" Adding Received communication from object {} to object {}\n", from_id, to_id);
objects.at(to_id).addReceivedCommunications(from_id, bytes);
} else {
fmt::print(" Didn't find recipient object.\n");
}
}
}
}
}
phase_info.try_emplace(id, PhaseWork{id, std::move(objects)});
}
}
// fmt::print(" vt-tv: Adding rank {}\n", rank_id);
Rank r{static_cast<NodeType>(rank_id), std::move(phase_info)};

# pragma omp critical
{
info->addInfo(std::move(object_info), std::move(r));
}

} catch(const std::exception& e) {
std::cerr << "vt-tv: Error reading data for rank " << rank_id << ": " << e.what() << '\n';
utility::JSONReader reader{static_cast<NodeType>(rank_id)};
reader.readString(rank_json_str);
auto tmpInfo = reader.parse();
#pragma omp critical
{
info->addInfo(tmpInfo->getObjectInfo(), tmpInfo->getRank(rank_id));
}
}
// Instantiate render
Expand Down
2 changes: 2 additions & 0 deletions bindings/python/tv.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
#include "vt-tv/api/info.h"
#include "vt-tv/utility/decompression_input_container.h"
#include "vt-tv/utility/input_iterator.h"
#include "vt-tv/utility/qoi_serializer.h"
#include "vt-tv/utility/json_reader.h"

#include <nlohmann/json.hpp>
#include <yaml-cpp/yaml.h>
Expand Down
6 changes: 3 additions & 3 deletions examples/example2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ int main() {
std::unique_ptr<Info> info = std::make_unique<Info>();

for (NodeType rank = 0; rank < n_ranks; rank++) {
utility::JSONReader reader{rank, path + "/data." + std::to_string(rank) + ".json"};
reader.readFile();
auto tmpInfo = reader.parseFile();
utility::JSONReader reader{rank};
reader.readFile(path + "/data." + std::to_string(rank) + ".json");
auto tmpInfo = reader.parse();
info->addInfo(tmpInfo->getObjectInfo(), tmpInfo->getRank(rank));
}

Expand Down
1 change: 0 additions & 1 deletion src/vt-tv/api/object_work.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ struct ObjectWork {
/// QOIs to be visualized
std::unordered_map<std::string, QOIVariantTypes> attributes_;

/// @todo: add communications
ObjectCommunicator communicator_;
};

Expand Down
22 changes: 14 additions & 8 deletions src/vt-tv/utility/json_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@

namespace vt::tv::utility {

bool JSONReader::isCompressed() const {
bool JSONReader::isCompressed(std::string const& in_filename) const {
bool compressed = true;

// determine if the file is compressed or not
std::ifstream is(filename_);
std::ifstream is(in_filename);
if (not is.good()) {
auto str = fmt::format("Filename is not valid: {}", filename_);
auto str = fmt::format("Filename is not valid: {}", in_filename);
fmt::print(str);
assert(false && "Invalid file");
return false;
Expand All @@ -83,23 +83,29 @@ bool JSONReader::isCompressed() const {
return compressed;
}

void JSONReader::readFile() {
void JSONReader::readFile(std::string const& in_filename) {
using json = nlohmann::json;

if (isCompressed()) {
DecompressionInputContainer c(filename_);
if (isCompressed(in_filename)) {
DecompressionInputContainer c(in_filename);
json j = json::parse(c);
json_ = std::make_unique<json>(std::move(j));
} else {
std::ifstream is(filename_, std::ios::binary);
std::ifstream is(in_filename, std::ios::binary);
assert(is.good() && "File must be good");
json j = json::parse(is);
is.close();
json_ = std::make_unique<json>(std::move(j));
}
}

std::unique_ptr<Info> JSONReader::parseFile() {
void JSONReader::readString(std::string const& in_json_string) {
using json = nlohmann::json;
json j = json::parse(in_json_string);
json_ = std::make_unique<json>(std::move(j));
}

std::unique_ptr<Info> JSONReader::parse() {
using json = nlohmann::json;

assert(json_ != nullptr && "Must have valid json");
Expand Down
29 changes: 18 additions & 11 deletions src/vt-tv/utility/json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,41 +57,48 @@ namespace vt::tv::utility {
/**
* \struct JSONReader
*
* \brief Reader for JSON files in the LBDataType format.
* \brief Reader for JSON in the LBDataType format.
*/
struct JSONReader {

/**
* \brief Construct the reader
*
* \param[in] in_filename the file name to read
*/
JSONReader(NodeType in_rank, std::string const& in_filename)
: rank_(in_rank),
filename_(in_filename)
JSONReader(NodeType in_rank)
: rank_(in_rank)
{ }

/**
* \brief Check if the file is compressed or not
*
* \param[in] in_filename the file name to check
*
* \return whether the file is compressed
*/
bool isCompressed() const;
bool isCompressed(std::string const& in_filename) const;

/**
* \brief Read the JSON file
* \brief Read a given JSON file
*
* \param[in] in_filename the file name to read
*/
void readFile(std::string const& in_filename);

/**
* \brief Read a given serialized json string
*
* \param[in] in_json_string the serialized json string to read
*/
void readFile();
void readString(std::string const& in_json_string);

/**
* \brief Parse the json into vt-tv's data structure Info, with a single rank
* filled out
*/
std::unique_ptr<Info> parseFile();
std::unique_ptr<Info> parse();

private:
NodeType rank_ = 0;
std::string filename_;
std::unique_ptr<nlohmann::json> json_ = nullptr;
};

Expand Down
6 changes: 3 additions & 3 deletions src/vt-tv/utility/parse_render.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ void ParseRender::parseAndRender(PhaseType phase_id, std::unique_ptr<Info> info)
# pragma omp parallel for
for (int64_t rank = 0; rank < n_ranks; rank++) {
fmt::print("Reading file for rank {}\n", rank);
utility::JSONReader reader{static_cast<NodeType>(rank), input_dir + "data." + std::to_string(rank) + ".json"};
reader.readFile();
auto tmpInfo = reader.parseFile();
utility::JSONReader reader{static_cast<NodeType>(rank)};
reader.readFile(input_dir + "data." + std::to_string(rank) + ".json");
auto tmpInfo = reader.parse();
#pragma omp critical
{
info->addInfo(tmpInfo->getObjectInfo(), tmpInfo->getRank(rank));
Expand Down
24 changes: 12 additions & 12 deletions tests/unit/test_json_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ TEST_F(TestJSONReader, test_json_reader_1) {
std::string path = std::filesystem::absolute(p).string();

NodeType rank = 0;
utility::JSONReader reader{rank, path + "/data.0.json"};
reader.readFile();
auto info = reader.parseFile();
utility::JSONReader reader{rank};
reader.readFile(path + "/data.0.json");
auto info = reader.parse();

auto const& obj_info = info->getObjectInfo();

Expand Down Expand Up @@ -122,10 +122,10 @@ TEST_F(TestJSONReader, test_json_reader_metadata_attributes) {
std::string path = std::filesystem::absolute(p).string();

NodeType rank = 0;
utility::JSONReader reader{rank, path + "/reader_test_data.json"};
utility::JSONReader reader{rank};

reader.readFile();
auto info = reader.parseFile();
reader.readFile(path + "/reader_test_data.json");
auto info = reader.parse();
auto& rank_info = info->getRank(rank);
EXPECT_EQ(rank_info.getRankID(), rank);

Expand All @@ -145,10 +145,10 @@ TEST_F(TestJSONReader, test_json_reader_object_info_attributes) {
std::string path = std::filesystem::absolute(p).string();

NodeType rank = 0;
utility::JSONReader reader{rank, path + "/reader_test_data.json"};
utility::JSONReader reader{rank};

reader.readFile();
auto info = reader.parseFile();
reader.readFile(path + "/reader_test_data.json");
auto info = reader.parse();
auto& rank_info = info->getRank(rank);
EXPECT_EQ(rank_info.getRankID(), rank);

Expand Down Expand Up @@ -193,10 +193,10 @@ TEST_F(TestJSONReader, test_json_reader_object_work_user_defined) {
std::string path = std::filesystem::absolute(p).string();

NodeType rank = 0;
utility::JSONReader reader{rank, path + "/reader_test_data.json"};
utility::JSONReader reader{rank};

reader.readFile();
auto info = reader.parseFile();
reader.readFile(path + "/reader_test_data.json");
auto info = reader.parse();
auto& rank_info = info->getRank(rank);
EXPECT_EQ(rank_info.getRankID(), rank);

Expand Down

0 comments on commit 6459c47

Please sign in to comment.