Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

amqp_compressed_proof_plugin: Send compressed action merkle proofs over AMQP #9135

Merged
merged 20 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
374f502
amqp_compressed_proof_plugin: compressed action merkle proofs over AMQP
spoonincode May 27, 2020
7c5022e
fix file creation probe
spoonincode May 27, 2020
934818b
only need a single merkle creator
spoonincode May 28, 2020
40b0a3b
fix deletion of compressed proof data files
spoonincode May 28, 2020
635d862
Fix for onblock handling: verify correct onblock and store all action…
heifner Jun 1, 2020
6735944
Use id instead of block_num since num could be the same
heifner Jun 1, 2020
5a69188
Revert "Use id instead of block_num since num could be the same"
heifner Jun 1, 2020
aba97da
fix onblock check
spoonincode Jun 3, 2020
b833b7d
fix merge & add message-id for compressed proofs
spoonincode Jun 3, 2020
4e1ed21
remove dead commented code
spoonincode Jun 3, 2020
02defd7
change pack<>() input to const here so explicit template params not n…
spoonincode Jun 4, 2020
5630204
two more unit tests for compressed proof plugin
spoonincode Jun 11, 2020
de00bfd
don't need to treat deepest merkle level special on input
spoonincode Jun 12, 2020
06988ac
fix another pack<>()
spoonincode Jun 12, 2020
6a41141
Use new block_start signal for clearing onblock cached value
heifner Jun 12, 2020
b4cc801
have compressed_proof_generator take a controller and manage signals …
spoonincode Jun 12, 2020
9bdb214
send action proofs based on chain's db mode
spoonincode Jun 12, 2020
ff00189
should use publish_message_raw() now
spoonincode Jun 15, 2020
fb5d775
add ability to filter actions in to proofs based on action name
spoonincode Jun 15, 2020
d6a4720
remove all reversible tracking -- rely on db_read_mode entirely
spoonincode Jun 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_subdirectory(login_plugin)
add_subdirectory(test_control_plugin)
add_subdirectory(test_control_api_plugin)
add_subdirectory(amqp_witness_plugin)
add_subdirectory(amqp_compressed_proof_plugin)

# Forward variables to top level so packaging picks them up
set(CPACK_DEBIAN_PACKAGE_DEPENDS ${CPACK_DEBIAN_PACKAGE_DEPENDS} PARENT_SCOPE)
7 changes: 7 additions & 0 deletions plugins/amqp_compressed_proof_plugin/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
file(GLOB HEADERS "include/eosio/amqp_compressed_proof_plugin/*.hpp")
add_library( amqp_compressed_proof_plugin
amqp_compressed_proof_plugin.cpp
${HEADERS} )

target_link_libraries( amqp_compressed_proof_plugin appbase fc chain_plugin reliable_amqp_publisher )
target_include_directories( amqp_compressed_proof_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" )
329 changes: 329 additions & 0 deletions plugins/amqp_compressed_proof_plugin/amqp_compressed_proof_plugin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
#include <eosio/amqp_compressed_proof_plugin/amqp_compressed_proof_plugin.hpp>

#include <eosio/chain/protocol_feature_manager.hpp>
#include <eosio/chain/exceptions.hpp>

#include <eosio/reliable_amqp_publisher/reliable_amqp_publisher.hpp>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/key.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/algorithm/string.hpp>

#include <fc/io/cfile.hpp>

using namespace boost::multi_index;
using namespace std::string_literals;

namespace eosio {
static appbase::abstract_plugin& _amqp_compressed_proof_plugin = app().register_plugin<amqp_compressed_proof_plugin>();

struct action_entry {
chain::action action;
std::vector<char> return_value;
chain::action_receipt action_receipt;

action_entry() = default;

action_entry(const chain::action_trace& atrace) :
action(atrace.act), return_value(atrace.return_value), action_receipt(*atrace.receipt) {}

bool operator<(const action_entry& other) const {
return action_receipt.global_sequence < other.action_receipt.global_sequence;
}
};

struct combine_nodes{};
using merkle_cmd = fc::static_variant<chain::digest_type, action_entry, combine_nodes>;

struct merkle_creator_node {
chain::checksum_type hash;
std::list<merkle_cmd> cmd_stream;
};

class merkle_cmd_creator {
public:
void add_noninterested_action(const action_entry& ae) {
curent_level.emplace_back(merkle_creator_node{chain::digest_type::hash(ae.action_receipt)});
}

void add_interested_action(const action_entry& ae) {
curent_level.emplace_back(merkle_creator_node{chain::digest_type::hash(ae.action_receipt), {ae}});
}

std::vector<char> generate_serialized_proof(const bool arv_activated) {
if(curent_level.empty())
return std::vector<char>();

while(curent_level.size() > 1) {
if(curent_level.size() % 2)
curent_level.emplace_back(merkle_creator_node{curent_level.back().hash});
for(unsigned int i = 0; i < curent_level.size(); i+=2)
curent_level[i/2] = combine(curent_level[i], curent_level[i+1]);
curent_level.resize(curent_level.size()/2);
}

return fc::raw::pack((const bool)arv_activated, curent_level[0].cmd_stream);
}

private:
merkle_creator_node combine(merkle_creator_node& left, merkle_creator_node& right) {
chain::digest_type::encoder digest_enc;
fc::raw::pack(digest_enc, chain::make_canonical_left(left.hash), chain::make_canonical_right(right.hash));

merkle_creator_node new_node = {digest_enc.result()};

if(left.cmd_stream.empty() && right.cmd_stream.empty()) {
//do nothing, this is an uninteresting part of the tree
}
else if(left.cmd_stream.empty()) {
new_node.cmd_stream.emplace_front(left.hash);
new_node.cmd_stream.splice(new_node.cmd_stream.end(), right.cmd_stream);
new_node.cmd_stream.emplace_back(combine_nodes());
}
else if(right.cmd_stream.empty()) {
new_node.cmd_stream.splice(new_node.cmd_stream.end(), left.cmd_stream);
new_node.cmd_stream.emplace_back(right.hash);
new_node.cmd_stream.emplace_back(combine_nodes());
}
else {
new_node.cmd_stream.splice(new_node.cmd_stream.end(), left.cmd_stream);
new_node.cmd_stream.splice(new_node.cmd_stream.end(), right.cmd_stream);
new_node.cmd_stream.emplace_back(combine_nodes());
}

return new_node;
}

std::vector<merkle_creator_node> curent_level;
};

struct compressed_proof_generator_impl {
compressed_proof_generator_impl(chain::controller& controller, std::vector<compressed_proof_generator::result_callback_funcs>&& callbacks) :
callbacks(callbacks) {
controller_connections.emplace_back(controller.applied_transaction.connect([this](std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t) {
on_applied_transaction(std::get<0>(t));
}));
controller_connections.emplace_back(controller.accepted_block.connect([this,&controller](const chain::block_state_ptr& p) {
on_accepted_block(p, controller);
}));
controller_connections.emplace_back(controller.block_start.connect([this](uint32_t block_num) {
on_block_start(block_num);
}));
}

void on_applied_transaction(const chain::transaction_trace_ptr& trace) {
if(!trace->receipt)
return;
//only executed & delayed transaction traces would make it in to action_mroot
if(trace->receipt->status != chain::transaction_receipt::status_enum::executed &&
trace->receipt->status != chain::transaction_receipt::status_enum::delayed)
return;

if(chain::is_onblock(*trace))
onblock_trace.emplace(trace);
else
cached_traces[trace->id] = trace;
}

void on_accepted_block(const chain::block_state_ptr& bsp, const chain::controller& controller) {
std::set<action_entry> action_entries_this_block;

if(onblock_trace) {
for(const chain::action_trace& at : (*onblock_trace)->action_traces)
action_entries_this_block.emplace(at);
}
for(const chain::transaction_receipt& r : bsp->block->transactions) {
transaction_id_type id;
if(r.trx.contains<chain::transaction_id_type>())
id = r.trx.get<chain::transaction_id_type>();
else
id = r.trx.get<chain::packed_transaction>().id();
auto it = cached_traces.find(id);
EOS_ASSERT(it != cached_traces.end(), chain::misc_exception, "missing trace for transaction ${id}", ("id", id));
for(const chain::action_trace& at : it->second->action_traces)
action_entries_this_block.emplace(at);
}
clear_caches();

run_callbacks_on_block(bsp, action_entries_this_block, controller);
}

void run_callbacks_on_block(const chain::block_state_ptr& bsp, const std::set<action_entry>& action_entries, const chain::controller& controller) {
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
const chain::digest_type arv_dig = *controller.get_protocol_feature_manager().get_builtin_digest(chain::builtin_protocol_feature_t::action_return_value);
const auto& protocol_features_this_block = bsp->activated_protocol_features->protocol_features;
const bool action_return_value_active_this_block = protocol_features_this_block.find(arv_dig) != protocol_features_this_block.end();

for(const auto& cb : callbacks) {
merkle_cmd_creator cc;
bool any_interested = false;
for(const action_entry& ae : action_entries) {
if(cb.first(ae.action)) {
cc.add_interested_action(ae);
any_interested = true;
}
else
cc.add_noninterested_action(ae);
}

if(any_interested)
cb.second(cc.generate_serialized_proof(action_return_value_active_this_block));
}
}

void on_block_start(uint32_t block_num) {
clear_caches();
}

void clear_caches() {
cached_traces.clear();
onblock_trace.reset();
}

std::map<transaction_id_type, chain::transaction_trace_ptr> cached_traces;
std::optional<chain::transaction_trace_ptr> onblock_trace;
std::vector<compressed_proof_generator::result_callback_funcs> callbacks;

std::list<boost::signals2::scoped_connection> controller_connections;
};

using generator_impl = compressed_proof_generator_impl;

compressed_proof_generator::compressed_proof_generator(chain::controller& controller, std::vector<result_callback_funcs>&& callbacks) :
my(new compressed_proof_generator_impl(controller, std::move(callbacks))) {}

compressed_proof_generator::~compressed_proof_generator() = default;

struct amqp_compressed_proof_plugin_impl {
std::unique_ptr<compressed_proof_generator> proof_generator;
std::list<reliable_amqp_publisher> publishers;
};

amqp_compressed_proof_plugin::amqp_compressed_proof_plugin():my(new amqp_compressed_proof_plugin_impl()){}
amqp_compressed_proof_plugin::~amqp_compressed_proof_plugin() = default;

void amqp_compressed_proof_plugin::set_program_options(options_description& cli, options_description& cfg) {
cfg.add_options()
("compressed-proof", bpo::value<std::vector<std::string>>()->composing(),
"Define a filter, AMQP server, and exchange to publish compressed proofs to. May be specified multiple times. "
"Must be in the form of:\n"
" <name>=<filter>=<server>=<exchange>\n"
"Where:\n"
" <name> \tis a unique name identifying this proof; used for persistent filenames\n"
" <filter> \tis a comma separated list of receiver[:action] to filter actions on\n"
" <server> \tis the AMQP server URI\n"
" <exchange> \tis the AMQP exchange to publish to\n"
)
;

cli.add_options()
("compressed-proof-delete", bpo::bool_switch(),
"Delete all compressed proof data files for unconfirmed messages and blocks");
}

void amqp_compressed_proof_plugin::plugin_initialize(const variables_map& options) {
try {
boost::filesystem::path dir = app().data_dir() / "amqp";
boost::system::error_code ec;
boost::filesystem::create_directories(dir, ec);

const std::string file_prefix = "cproof";

chain::controller& controller = app().get_plugin<chain_plugin>().chain();

if(options.count("compressed-proof-delete") && options.at("compressed-proof-delete").as<bool>())
for(const auto& p : boost::filesystem::directory_iterator(dir))
if(boost::starts_with(p.path().filename().generic_string(), file_prefix))
boost::filesystem::remove(p.path(), ec);

std::vector<compressed_proof_generator::result_callback_funcs> callbacks;

if(options.count("compressed-proof")) {
const std::vector<std::string>& descs = options.at("compressed-proof").as<std::vector<std::string>>();
std::set<std::string> names_used;

for(const std::string& desc : descs) {
std::vector<std::string> tokens;
boost::split(tokens, desc, boost::is_any_of("="));
EOS_ASSERT(tokens.size() >= 4, chain::plugin_config_exception, "Did not find 4 tokens in compressed-proof option \"${o}\"", ("o", desc));

const std::string& name = tokens[0];
EOS_ASSERT(name.size(), chain::plugin_config_exception, "Cannot have empty name for compressed-proof \"${o}\"", ("o", desc));
EOS_ASSERT(names_used.emplace(name).second,
chain::plugin_config_exception, "Name \"${n}\" used for more than one compressed-proof", ("n", name));
std::vector<std::string> filtered_receivers;
boost::split(filtered_receivers, tokens[1], boost::is_any_of(","));
EOS_ASSERT(filtered_receivers.size(), chain::plugin_config_exception, "Cannot have empty filter list for compressed-proof");

const boost::filesystem::path amqp_unconfimed_file = dir / (file_prefix + "-unconfirmed-"s + name + ".bin"s);
reliable_amqp_publisher& publisher = my->publishers.emplace_back(tokens[2], tokens[3], "", amqp_unconfimed_file, "eosio.node.compressed_proof_v0");

//the presence of an empty set means any action on that receiver
std::map<chain::name, std::set<chain::name>> filter_on_names_and_actions;
for(const std::string& name_action_string : filtered_receivers) {
std::vector<std::string> receiver_action_split;
boost::split(receiver_action_split, name_action_string, boost::is_any_of(":"));
EOS_ASSERT(receiver_action_split.size() == 1 || receiver_action_split.size() == 2, chain::plugin_config_exception, "Malformed receiver:action filter: ${a}", ("a", name_action_string));

const eosio::name receiver_name = eosio::name(receiver_action_split[0]);

if(receiver_action_split.size() == 1)
filter_on_names_and_actions[receiver_name].clear(); //empty set = wildcard
else {
auto it = filter_on_names_and_actions.find(receiver_name);
//if the receiver doesn't exist, or the receiver does exist and already has an action filter; add this action to filter
if(it == filter_on_names_and_actions.end() || it->second.size())
filter_on_names_and_actions[receiver_name].emplace(eosio::name(receiver_action_split[1]));
//else do nothing, wildcard already set on this receiver
}
}

callbacks.emplace_back(std::make_pair(
[filter_on_names_and_actions](const chain::action& act) {
if(auto it = filter_on_names_and_actions.find(act.account); it != filter_on_names_and_actions.end()) {
if(it->second.empty()) //empty set means all actions
return true;
else
return it->second.find(act.name) != it->second.end();
}
return false;
},
[&publisher](std::vector<char>&& serialized_proof) {
publisher.publish_message_raw(std::move(serialized_proof));
}
));
}
}

my->proof_generator = std::make_unique<compressed_proof_generator>(controller, std::move(callbacks));
}
FC_LOG_AND_RETHROW()
}

void amqp_compressed_proof_plugin::plugin_startup() {}

void amqp_compressed_proof_plugin::plugin_shutdown() {}

}

namespace fc {
template<typename T>
inline T& operator<<(T& ds, const std::list<eosio::merkle_cmd>& mcl) {
fc::raw::pack(ds, unsigned_int((uint32_t)mcl.size()) );
for(const eosio::merkle_cmd& mc : mcl)
fc::raw::pack(ds, mc);
return ds;
}
template<typename T>
inline T& operator<<(T& ds, const std::set<eosio::action_entry>& aes) {
fc::raw::pack(ds, unsigned_int((uint32_t)aes.size()) );
for(const eosio::action_entry& ae : aes)
fc::raw::pack(ds, ae);
return ds;
}
}

FC_REFLECT(eosio::combine_nodes, );
FC_REFLECT(eosio::action_entry, (action)(return_value)(action_receipt));
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once
#include <appbase/application.hpp>

#include <eosio/chain_plugin/chain_plugin.hpp>

namespace eosio {

using namespace appbase;

class compressed_proof_generator {
public:
using action_filter_func = std::function<bool(const chain::action& a)>;
using merkle_proof_result_func = std::function<void(std::vector<char>&&)>;
using result_callback_funcs = std::pair<action_filter_func, merkle_proof_result_func>;

compressed_proof_generator(chain::controller& controller, std::vector<result_callback_funcs>&& callbacks);
compressed_proof_generator(chain::controller& controller, std::vector<result_callback_funcs>&& callbacks, const boost::filesystem::path& reversible_path);
~compressed_proof_generator();

compressed_proof_generator(const compressed_proof_generator&) = delete;
compressed_proof_generator& operator=(const compressed_proof_generator&) = delete;

private:
std::unique_ptr<struct compressed_proof_generator_impl> my;
};

class amqp_compressed_proof_plugin : public appbase::plugin<amqp_compressed_proof_plugin> {
public:
amqp_compressed_proof_plugin();
virtual ~amqp_compressed_proof_plugin();

APPBASE_PLUGIN_REQUIRES((chain_plugin))
virtual void set_program_options(options_description&, options_description& cfg) override;

void plugin_initialize(const variables_map& options);
void plugin_startup();
void plugin_shutdown();

private:
std::unique_ptr<struct amqp_compressed_proof_plugin_impl> my;
};

}
Loading