Skip to content

Commit

Permalink
PropertyGraph and RDG support upsert for properties (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
witchel authored Apr 16, 2021
1 parent 6884fa6 commit e5cdbf5
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 21 deletions.
15 changes: 15 additions & 0 deletions libgalois/include/katana/PropertyGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ class KATANA_EXPORT PropertyGraph {
PropertyGraph::*properties_fn)() const;
Result<void> (PropertyGraph::*add_properties_fn)(
const std::shared_ptr<arrow::Table>& props);
Result<void> (PropertyGraph::*upsert_properties_fn)(
const std::shared_ptr<arrow::Table>& props);
Result<void> (PropertyGraph::*remove_property_int)(int i);
Result<void> (PropertyGraph::*remove_property_str)(const std::string& str);

Expand All @@ -185,6 +187,11 @@ class KATANA_EXPORT PropertyGraph {
return (g->*add_properties_fn)(props);
}

Result<void> UpsertProperties(
const std::shared_ptr<arrow::Table>& props) const {
return (g->*upsert_properties_fn)(props);
}

Result<void> RemoveProperty(int i) const {
return (g->*remove_property_int)(i);
}
Expand Down Expand Up @@ -405,8 +412,14 @@ class KATANA_EXPORT PropertyGraph {

const GraphTopology& topology() const { return topology_; }

/// Add Node properties that do not exist in the current graph
Result<void> AddNodeProperties(const std::shared_ptr<arrow::Table>& props);
/// Add Edge properties that do not exist in the current graph
Result<void> AddEdgeProperties(const std::shared_ptr<arrow::Table>& props);
/// If property name exists, replace it, otherwise insert it
Result<void> UpsertNodeProperties(const std::shared_ptr<arrow::Table>& props);
/// If property name exists, replace it, otherwise insert it
Result<void> UpsertEdgeProperties(const std::shared_ptr<arrow::Table>& props);

Result<void> RemoveNodeProperty(int i);
Result<void> RemoveNodeProperty(const std::string& prop_name);
Expand All @@ -421,6 +434,7 @@ class KATANA_EXPORT PropertyGraph {
.property_fn = &PropertyGraph::GetNodeProperty,
.properties_fn = &PropertyGraph::node_properties,
.add_properties_fn = &PropertyGraph::AddNodeProperties,
.upsert_properties_fn = &PropertyGraph::UpsertNodeProperties,
.remove_property_int = &PropertyGraph::RemoveNodeProperty,
.remove_property_str = &PropertyGraph::RemoveNodeProperty,
};
Expand All @@ -433,6 +447,7 @@ class KATANA_EXPORT PropertyGraph {
.property_fn = &PropertyGraph::GetEdgeProperty,
.properties_fn = &PropertyGraph::edge_properties,
.add_properties_fn = &PropertyGraph::AddEdgeProperties,
.upsert_properties_fn = &PropertyGraph::UpsertEdgeProperties,
.remove_property_int = &PropertyGraph::RemoveEdgeProperty,
.remove_property_str = &PropertyGraph::RemoveEdgeProperty,
};
Expand Down
32 changes: 32 additions & 0 deletions libgalois/src/PropertyGraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,22 @@ katana::PropertyGraph::AddNodeProperties(
return rdg_.AddNodeProperties(props);
}

katana::Result<void>
katana::PropertyGraph::UpsertNodeProperties(
const std::shared_ptr<arrow::Table>& props) {
if (props->num_columns() == 0) {
KATANA_LOG_DEBUG("upsert empty node prop table");
return ResultSuccess();
}
if (topology_.out_indices &&
topology_.out_indices->length() != props->num_rows()) {
return KATANA_ERROR(
ErrorCode::InvalidArgument, "expected {} rows found {} instead",
topology_.out_indices->length(), props->num_rows());
}
return rdg_.UpsertNodeProperties(props);
}

katana::Result<void>
katana::PropertyGraph::RemoveNodeProperty(int i) {
return rdg_.RemoveNodeProperty(i);
Expand Down Expand Up @@ -416,6 +432,22 @@ katana::PropertyGraph::AddEdgeProperties(
return rdg_.AddEdgeProperties(props);
}

katana::Result<void>
katana::PropertyGraph::UpsertEdgeProperties(
const std::shared_ptr<arrow::Table>& props) {
if (props->num_columns() == 0) {
KATANA_LOG_DEBUG("upsert empty edge prop table");
return ResultSuccess();
}
if (topology_.out_dests &&
topology_.out_dests->length() != props->num_rows()) {
return KATANA_ERROR(
ErrorCode::InvalidArgument, "expected {} rows found {} instead",
topology_.out_dests->length(), props->num_rows());
}
return rdg_.UpsertEdgeProperties(props);
}

katana::Result<void>
katana::PropertyGraph::RemoveEdgeProperty(int i) {
return rdg_.RemoveEdgeProperty(i);
Expand Down
6 changes: 6 additions & 0 deletions libtsuba/include/tsuba/RDG.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class KATANA_EXPORT RDG {
katana::Result<void> AddEdgeProperties(
const std::shared_ptr<arrow::Table>& props);

katana::Result<void> UpsertNodeProperties(
const std::shared_ptr<arrow::Table>& props);

katana::Result<void> UpsertEdgeProperties(
const std::shared_ptr<arrow::Table>& props);

katana::Result<void> RemoveNodeProperty(uint32_t i);
katana::Result<void> RemoveEdgeProperty(uint32_t i);

Expand Down
68 changes: 55 additions & 13 deletions libtsuba/src/RDG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,30 @@ CommitRDG(
return ret;
}

void
AddNodePropStorageInfo(
tsuba::RDGCore* core, const std::shared_ptr<arrow::Table>& props) {
const auto& schema = props->schema();
for (int i = 0, end = props->num_columns(); i < end; ++i) {
core->part_header().AddNodePropStorageInfo(tsuba::PropStorageInfo{
.name = schema->field(i)->name(),
.path = "",
});
}
}

void
AddEdgePropStorageInfo(
tsuba::RDGCore* core, const std::shared_ptr<arrow::Table>& props) {
const auto& schema = props->schema();
for (int i = 0, end = props->num_columns(); i < end; ++i) {
core->part_header().AddEdgePropStorageInfo(tsuba::PropStorageInfo{
.name = schema->field(i)->name(),
.path = "",
});
}
}

} // namespace

katana::Result<void>
Expand Down Expand Up @@ -529,13 +553,7 @@ tsuba::RDG::AddNodeProperties(const std::shared_ptr<arrow::Table>& props) {
return res.error();
}

const auto& schema = props->schema();
for (int i = 0, end = props->num_columns(); i < end; ++i) {
core_->part_header().AppendNodePropStorageInfo(tsuba::PropStorageInfo{
.name = schema->field(i)->name(),
.path = "",
});
}
AddNodePropStorageInfo(core_.get(), props);

KATANA_LOG_DEBUG_ASSERT(
static_cast<size_t>(core_->node_properties()->num_columns()) ==
Expand All @@ -550,14 +568,38 @@ tsuba::RDG::AddEdgeProperties(const std::shared_ptr<arrow::Table>& props) {
return res.error();
}

const auto& schema = props->schema();
for (int i = 0, end = props->num_columns(); i < end; ++i) {
core_->part_header().AppendEdgePropStorageInfo(tsuba::PropStorageInfo{
.name = schema->field(i)->name(),
.path = "",
});
AddEdgePropStorageInfo(core_.get(), props);

KATANA_LOG_DEBUG_ASSERT(
static_cast<size_t>(core_->edge_properties()->num_columns()) ==
core_->part_header().edge_prop_info_list().size());

return katana::ResultSuccess();
}

katana::Result<void>
tsuba::RDG::UpsertNodeProperties(const std::shared_ptr<arrow::Table>& props) {
if (auto res = core_->UpsertNodeProperties(props); !res) {
return res.error();
}

AddNodePropStorageInfo(core_.get(), props);

KATANA_LOG_DEBUG_ASSERT(
static_cast<size_t>(core_->node_properties()->num_columns()) ==
core_->part_header().node_prop_info_list().size());

return katana::ResultSuccess();
}

katana::Result<void>
tsuba::RDG::UpsertEdgeProperties(const std::shared_ptr<arrow::Table>& props) {
if (auto res = core_->UpsertEdgeProperties(props); !res) {
return res.error();
}

AddEdgePropStorageInfo(core_.get(), props);

KATANA_LOG_DEBUG_ASSERT(
static_cast<size_t>(core_->edge_properties()->num_columns()) ==
core_->part_header().edge_prop_info_list().size());
Expand Down
55 changes: 51 additions & 4 deletions libtsuba/src/RDGCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace {

katana::Result<void>
AddProperties(
UpsertProperties(
const std::shared_ptr<arrow::Table>& props,
std::shared_ptr<arrow::Table>* to_update) {
std::shared_ptr<arrow::Table> current = *to_update;
Expand All @@ -26,11 +26,29 @@ AddProperties(
int last = current->num_columns();

for (int i = 0, n = schema->num_fields(); i < n; i++) {
auto result =
next->AddColumn(last + i, schema->field(i), props->column(i));
auto name = schema->field(i)->name();
auto current_col = next->GetColumnByName(name);
arrow::Result<std::shared_ptr<arrow::Table>> result;
std::string error_context = "insert";
if (current_col == nullptr) {
// Insert the column, error_context == "insert"
result = next->AddColumn(last++, schema->field(i), props->column(i));
} else {
// Update the column
error_context = "update";
auto col_names = next->ColumnNames();
// Column names are not sorted, but assumed to be less than 100s
auto col_it = std::find(col_names.begin(), col_names.end(), name);
KATANA_LOG_ASSERT(
col_it != col_names.end()); // GetColumnByName != null
result = next->SetColumn(
std::distance(col_names.begin(), col_it), schema->field(i),
props->column(i));
}
if (!result.ok()) {
return KATANA_ERROR(
tsuba::ErrorCode::ArrowError, "arrow error: {}", result.status());
tsuba::ErrorCode::ArrowError, "arrow error {}: {}", error_context,
result.status());
}

next = result.ValueOrDie();
Expand All @@ -47,6 +65,25 @@ AddProperties(
return katana::ResultSuccess();
}

katana::Result<void>
AddProperties(
const std::shared_ptr<arrow::Table>& props,
std::shared_ptr<arrow::Table>* to_update) {
auto col_names = (*to_update)->ColumnNames();

const auto& schema = props->schema();
for (int i = 0, n = schema->num_fields(); i < n; i++) {
auto name = schema->field(i)->name();
// Column names are not sorted, but assumed to be less than 100s
auto col_it = std::find(col_names.begin(), col_names.end(), name);
if (col_it != col_names.end()) {
return KATANA_ERROR(
tsuba::ErrorCode::Exists, "column names are not distinct");
}
}
return UpsertProperties(props, to_update);
}

} // namespace

namespace tsuba {
Expand All @@ -61,6 +98,16 @@ RDGCore::AddEdgeProperties(const std::shared_ptr<arrow::Table>& props) {
return AddProperties(props, &edge_properties_);
}

katana::Result<void>
RDGCore::UpsertNodeProperties(const std::shared_ptr<arrow::Table>& props) {
return UpsertProperties(props, &node_properties_);
}

katana::Result<void>
RDGCore::UpsertEdgeProperties(const std::shared_ptr<arrow::Table>& props) {
return UpsertProperties(props, &edge_properties_);
}

void
RDGCore::InitEmptyProperties() {
std::vector<std::shared_ptr<arrow::Array>> empty;
Expand Down
6 changes: 6 additions & 0 deletions libtsuba/src/RDGCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class KATANA_EXPORT RDGCore {
katana::Result<void> AddEdgeProperties(
const std::shared_ptr<arrow::Table>& props);

katana::Result<void> UpsertNodeProperties(
const std::shared_ptr<arrow::Table>& props);

katana::Result<void> UpsertEdgeProperties(
const std::shared_ptr<arrow::Table>& props);

katana::Result<void> RemoveNodeProperty(uint32_t i);

katana::Result<void> RemoveEdgeProperty(uint32_t i);
Expand Down
24 changes: 20 additions & 4 deletions libtsuba/src/RDGPartHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,28 @@ class KATANA_EXPORT RDGPartHeader {
// Property manipulation
//

void AppendNodePropStorageInfo(PropStorageInfo&& pmd) {
node_prop_info_list_.emplace_back(std::move(pmd));
void AddNodePropStorageInfo(PropStorageInfo&& pmd) {
auto pmd_it = std::find_if(
node_prop_info_list_.begin(), node_prop_info_list_.end(),
[&](const PropStorageInfo& my_pmd) { return my_pmd.name == pmd.name; });
if (pmd_it == node_prop_info_list_.end()) {
node_prop_info_list_.emplace_back(std::move(pmd));
} else {
// If we already have a record, clear the path so we will rewrite it
pmd_it->path = "";
}
}

void AppendEdgePropStorageInfo(PropStorageInfo&& pmd) {
edge_prop_info_list_.emplace_back(std::move(pmd));
void AddEdgePropStorageInfo(PropStorageInfo&& pmd) {
auto pmd_it = std::find_if(
edge_prop_info_list_.begin(), edge_prop_info_list_.end(),
[&](const PropStorageInfo& my_pmd) { return my_pmd.name == pmd.name; });
if (pmd_it == edge_prop_info_list_.end()) {
edge_prop_info_list_.emplace_back(std::move(pmd));
} else {
// If we already have a record, clear the path so we will rewrite it
pmd_it->path = "";
}
}

void RemoveNodeProperty(uint32_t i) {
Expand Down

0 comments on commit e5cdbf5

Please sign in to comment.