Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cpp pregel case #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ file(
"# Do not edit this file!!!\n"
"app:\n"
"- algo: my_app\n"
" type: cpp_pie\n"
" class_name: gs::MyApp\n"
" type: cpp_pregel\n"
" class_name: gs::SSSP\n"
" src: my_app.h\n"
" context_type: vertex_data\n"
" context_type: labeled_vertex_data\n"
" compatible_graph:\n"
" - gs::ArrowProjectedFragment\n"
" - gs::DynamicProjectedFragment\n"
" - vineyard::ArrowFragment\n"
" - vineyard::ArrowFlattenFragment\n"
)

# custom target to package app
Expand Down
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,19 @@ Here is an example to run the packaged gar file in GraphScope Python interface.
import graphscope

from graphscope.framework.app import load_app
from graphscope.dataset import load_p2p_network

sess = graphscope.session()
simple_graph = load_p2p_network(sess)._project_to_simple()

my_app = load_app('<path_to_your_gar_resource>')
result = my_app(simple_graph, 10) # pass 10 as param1 defined in 'MyAppContext.h'

print(result.to_numpy('r'))
g = graphscope.g(directed=False, generate_eid=False, vertex_map="local")
g = g.add_edges(
f"/Users/siyuan/CLionProjects/gstest/property/p2p-31_property_e_0",
label="knows",
src_label="person",
dst_label="person",
)

my_app = load_app('/Users/siyuan/CLionProjects/cpp-template/build/my_app.gar')
result = my_app(g, src=1)

df = result.to_dataframe(selector={'id': 'v:person.id', 'r': 'r:person'}).sort_values(by='id')
print(df)
```

## Codebase Explained
Expand Down
121 changes: 65 additions & 56 deletions src/my_app.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/** Copyright 2022 Alibaba Group Holding Limited.
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -13,69 +13,78 @@
* limitations under the License.
*/

#ifndef MY_APP_H
#define MY_APP_H
#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_
#define ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_

#include "my_app_context.h"
#include <algorithm>
#include <limits>
#include <string>

#include "vineyard/graph/fragment/arrow_fragment.h"

#include "core/app/pregel/i_vertex_program.h"
#include "core/app/pregel/pregel_compute_context.h"
#include "core/app/pregel/pregel_property_app_base.h"

namespace gs {

/**
* @brief Compute the degree for each vertex.
*
* @tparam FRAG_T
*/
template <typename FRAG_T>
class MyApp : public grape::ParallelAppBase<FRAG_T, MyAppContext<FRAG_T>>,
public grape::ParallelEngine,
public grape::Communicator {
template<typename FRAG_T>
class PregelSSSP
: public IPregelProgram<
PregelPropertyVertex<FRAG_T, double, double>,
PregelPropertyComputeContext<FRAG_T, double, double>> {
using fragment_t = FRAG_T;

public:
INSTALL_PARALLEL_WORKER(MyApp<FRAG_T>, MyAppContext<FRAG_T>, FRAG_T)
static constexpr grape::MessageStrategy message_strategy =
grape::MessageStrategy::kSyncOnOuterVertex;
static constexpr grape::LoadStrategy load_strategy =
grape::LoadStrategy::kBothOutIn;
using vertex_t = typename fragment_t::vertex_t;

/**
* @brief Implement your partial evaluation here.
*
* @param fragment
* @param context
* @param messages
*/
void PEval(const fragment_t& fragment, context_t& context,
message_manager_t& messages) {
messages.InitChannels(thread_num());
// Implement your partial evaluation here.
// We put all compute logic in IncEval phase, thus do nothing but force continue.
messages.ForceContinue();
void Init(PregelPropertyVertex<fragment_t, double, double>& v,
PregelPropertyComputeContext<fragment_t, double, double>& context)
override {
v.set_value(std::numeric_limits<double>::max());
}

void IncEval(const fragment_t& fragment, context_t& context,
message_manager_t& messages) {
// superstep
++context.step;

// Process received messages sent by other fragment here.
{
messages.ParallelProcess<fragment_t, double>(
thread_num(), fragment,
[&context](int tid, vertex_t u, const double& msg) {
// Implement your logic here.
});
void Compute(grape::IteratorPair<double*> messages,
PregelPropertyVertex<fragment_t, double, double>& v,
PregelPropertyComputeContext<fragment_t, double, double>&
context) override {
bool updated = false;
if (context.superstep() == 0) {
std::string source_id = context.get_config("src");
if (v.id() == "1") {
std::cout << "Source: " << source_id << " v.id() " << v.id() << std::endl;
}
if (v.id() == source_id) {
updated = true;
v.set_value(0);
}
} else {
double cur_value = v.value();
double new_value = cur_value;
for (auto msg : messages) {
new_value = std::min(new_value, msg);
}
if (new_value != cur_value) {
v.set_value(new_value);
updated = true;
}
}

// Compute the degree for each vertex, set the result in context
auto inner_vertices = fragment.InnerVertices();
ForEach(inner_vertices.begin(), inner_vertices.end(),
[&context, &fragment](int tid, vertex_t u) {
context.result[u] =
static_cast<uint64_t>(fragment.GetOutgoingAdjList(u).Size() +
fragment.GetIncomingAdjList(u).Size());
});
if (updated) {
double dist = v.value();
for (int label_id = 0; label_id < context.edge_label_num(); label_id++) {
for (auto& e : v.outgoing_edges(label_id)) {
double new_dist = dist + static_cast<double>(e.get_int(0));
v.send(e.vertex(), new_dist);
}
}
}
v.vote_to_halt();
}
};
}; // namespace gs

#endif // MY_APP_H
template<typename FRAG_T>
using SSSP = gs::PregelPropertyAppBase<FRAG_T, PregelSSSP<FRAG_T>>;

} // namespace gs

#endif // ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_

64 changes: 0 additions & 64 deletions src/my_app_context.h

This file was deleted.