Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
send action proofs based on chain's db mode
Browse files Browse the repository at this point in the history
  • Loading branch information
spoonincode committed Jun 15, 2020
1 parent b4cc801 commit 9bdb214
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,16 @@ class merkle_cmd_creator {
};

struct compressed_proof_generator_impl {
compressed_proof_generator_impl(chain::controller& controller) {
compressed_proof_generator_impl(chain::controller& controller, std::vector<compressed_proof_generator::result_callback_funcs>&& callbacks) :
callbacks(callbacks), irreversible(controller.get_read_mode() == chain::db_read_mode::IRREVERSIBLE) {
controller_connections.emplace_back(controller.irreversible_block.connect([this,&controller](const chain::block_state_ptr& bsp) {
on_irreversible_block(bsp, controller);
}));
controller_connections.emplace_back(controller.applied_transaction.connect([this](std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t) {
on_applied_transaction(std::get<0>(t));
}));
controller_connections.emplace_back(controller.accepted_block.connect([this](const chain::block_state_ptr& p) {
on_accepted_block(p);
controller_connections.emplace_back(controller.accepted_block.connect([this,&controller](const chain::block_state_ptr& p) {
on_accepted_block(p, controller);
}));
controller_connections.emplace_back(controller.block_start.connect([this](uint32_t block_num) {
on_block_start(block_num);
Expand All @@ -130,7 +131,7 @@ struct compressed_proof_generator_impl {
cached_traces[trace->id] = trace;
}

void on_accepted_block(const chain::block_state_ptr& bsp) {
void on_accepted_block(const chain::block_state_ptr& bsp, const chain::controller& controller) {
std::set<action_entry> action_entries_this_block;

if(onblock_trace) {
Expand All @@ -150,38 +151,48 @@ struct compressed_proof_generator_impl {
}
clear_caches();

reversible_action_entries_index.emplace(reversible_action_entries{bsp->block_num, bsp->id, std::move(action_entries_this_block)});
if(irreversible)
reversible_action_entries_index.emplace(reversible_action_entries{bsp->block_num, bsp->id, std::move(action_entries_this_block)});
else
run_callbacks_on_block(bsp, action_entries_this_block, controller);
}

void on_irreversible_block(const chain::block_state_ptr& bsp, const chain::controller& controller) {
if(!irreversible)
return;

if(const auto it = reversible_action_entries_index.find(bsp->id); it != reversible_action_entries_index.end()) {
const std::set<action_entry>& action_entries = it->action_entries;

const chain::digest_type arv_dig = *controller.get_protocol_feature_manager().get_builtin_digest(chain::builtin_protocol_feature_t::action_return_value);
const auto& protocol_features_this_block = bsp->activated_protocol_features->protocol_features;
const bool action_return_value_active_this_block = protocol_features_this_block.find(arv_dig) != protocol_features_this_block.end();

for(const auto& cb : callbacks) {
merkle_cmd_creator cc;
bool any_interested = false;
for(const action_entry& ae : action_entries) {
if(cb.first(ae.action)) {
cc.add_interested_action(ae);
any_interested = true;
}
else
cc.add_noninterested_action(ae);
}
run_callbacks_on_block(bsp, action_entries, controller);

if(any_interested)
cb.second(cc.generate_serialized_proof(action_return_value_active_this_block));
}
reversible_action_entries_index.get<by_block_num>().erase(
reversible_action_entries_index.get<by_block_num>().begin(),
reversible_action_entries_index.get<by_block_num>().upper_bound(bsp->block_num)
);
}
}

reversible_action_entries_index.get<by_block_num>().erase(
reversible_action_entries_index.get<by_block_num>().begin(),
reversible_action_entries_index.get<by_block_num>().upper_bound(bsp->block_num)
);
void run_callbacks_on_block(const chain::block_state_ptr& bsp, const std::set<action_entry>& action_entries, const chain::controller& controller) {
const chain::digest_type arv_dig = *controller.get_protocol_feature_manager().get_builtin_digest(chain::builtin_protocol_feature_t::action_return_value);
const auto& protocol_features_this_block = bsp->activated_protocol_features->protocol_features;
const bool action_return_value_active_this_block = protocol_features_this_block.find(arv_dig) != protocol_features_this_block.end();

for(const auto& cb : callbacks) {
merkle_cmd_creator cc;
bool any_interested = false;
for(const action_entry& ae : action_entries) {
if(cb.first(ae.action)) {
cc.add_interested_action(ae);
any_interested = true;
}
else
cc.add_noninterested_action(ae);
}

if(any_interested)
cb.second(cc.generate_serialized_proof(action_return_value_active_this_block));
}
}

void on_block_start(uint32_t block_num) {
Expand Down Expand Up @@ -211,21 +222,23 @@ struct compressed_proof_generator_impl {
> reversible_action_entries_index_type;
reversible_action_entries_index_type reversible_action_entries_index;

std::map<transaction_id_type, chain::transaction_trace_ptr> cached_traces;
std::optional<chain::transaction_trace_ptr> onblock_trace;
std::vector<std::pair<compressed_proof_generator::action_filter_func,
compressed_proof_generator::merkle_proof_result_func>> callbacks;
std::map<transaction_id_type, chain::transaction_trace_ptr> cached_traces;
std::optional<chain::transaction_trace_ptr> onblock_trace;
std::vector<compressed_proof_generator::result_callback_funcs> callbacks;

fc::optional<boost::filesystem::path> reversible_path;

std::list<boost::signals2::scoped_connection> controller_connections;
bool irreversible;
};

using generator_impl = compressed_proof_generator_impl;

compressed_proof_generator::compressed_proof_generator(chain::controller& controller) : my(new compressed_proof_generator_impl(controller)) {}
compressed_proof_generator::compressed_proof_generator(chain::controller& controller, std::vector<result_callback_funcs>&& callbacks) :
my(new compressed_proof_generator_impl(controller, std::move(callbacks))) {}

compressed_proof_generator::compressed_proof_generator(chain::controller& controller, const boost::filesystem::path& p) : my(new compressed_proof_generator_impl(controller)) {
compressed_proof_generator::compressed_proof_generator(chain::controller& controller, std::vector<result_callback_funcs>&& callbacks, const boost::filesystem::path& p) :
my(new compressed_proof_generator_impl(controller, std::move(callbacks))) {
my->reversible_path = p;

if(boost::filesystem::exists(*my->reversible_path)) {
Expand Down Expand Up @@ -264,10 +277,6 @@ compressed_proof_generator::~compressed_proof_generator() {
}
}

void compressed_proof_generator::add_result_callback(action_filter_func&& filter, merkle_proof_result_func&& result) {
my->callbacks.emplace_back(std::make_pair(filter, result));
}

struct amqp_compressed_proof_plugin_impl {
std::unique_ptr<compressed_proof_generator> proof_generator;
std::list<reliable_amqp_publisher> publishers;
Expand Down Expand Up @@ -311,7 +320,8 @@ void amqp_compressed_proof_plugin::plugin_initialize(const variables_map& option
boost::filesystem::remove(p.path(), ec);

const boost::filesystem::path reversible_blocks_file = dir / (file_prefix + "-reversible.bin"s);
my->proof_generator = std::make_unique<compressed_proof_generator>(controller, reversible_blocks_file);

std::vector<compressed_proof_generator::result_callback_funcs> callbacks;

if(options.count("compressed-proof")) {
const std::vector<std::string>& descs = options.at("compressed-proof").as<std::vector<std::string>>();
Expand All @@ -337,16 +347,18 @@ void amqp_compressed_proof_plugin::plugin_initialize(const variables_map& option
for(const std::string& name_string : filtered_receivers)
filter_on_names.emplace(name_string);

my->proof_generator->add_result_callback(
callbacks.emplace_back(std::make_pair(
[filter_on_names](const chain::action& act) {
return filter_on_names.find(act.account) != filter_on_names.end();
},
[&publisher](std::vector<char>&& serialized_proof) {
publisher.publish_message(std::move(serialized_proof));
}
);
));
}
}

my->proof_generator = std::make_unique<compressed_proof_generator>(controller, std::move(callbacks), reversible_blocks_file);
}
FC_LOG_AND_RETHROW()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ using namespace appbase;

class compressed_proof_generator {
public:
compressed_proof_generator(chain::controller& controller);
compressed_proof_generator(chain::controller& controller, const boost::filesystem::path& reversible_path);
using action_filter_func = std::function<bool(const chain::action& a)>;
using merkle_proof_result_func = std::function<void(std::vector<char>&&)>;
using result_callback_funcs = std::pair<action_filter_func, merkle_proof_result_func>;

compressed_proof_generator(chain::controller& controller, std::vector<result_callback_funcs>&& callbacks);
compressed_proof_generator(chain::controller& controller, std::vector<result_callback_funcs>&& callbacks, const boost::filesystem::path& reversible_path);
~compressed_proof_generator();

compressed_proof_generator(const compressed_proof_generator&) = delete;
compressed_proof_generator& operator=(const compressed_proof_generator&) = delete;

using action_filter_func = std::function<bool(const chain::action& a)>;
using merkle_proof_result_func = std::function<void(std::vector<char>&&)>;
void add_result_callback(action_filter_func&& filter, merkle_proof_result_func&& result);
private:
std::unique_ptr<struct compressed_proof_generator_impl> my;
};
Expand Down
Loading

0 comments on commit 9bdb214

Please sign in to comment.