Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Rodeos with Streaming Plugin #9029

Merged
merged 18 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@
[submodule "libraries/fc/include/fc/crypto/webauthn_json"]
path = libraries/fc/include/fc/crypto/webauthn_json
url = https://github.com/Tencent/rapidjson/
[submodule "libraries/amqp-cpp"]
path = libraries/amqp-cpp
url = https://github.com/CopernicaMarketingSoftware/AMQP-CPP
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ configure_file(${CMAKE_SOURCE_DIR}/libraries/rocksdb/LICENSE.Apache
${CMAKE_BINARY_DIR}/licenses/eosio/LICENSE.rocksdb COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/libraries/rocksdb/LICENSE.leveldb
${CMAKE_BINARY_DIR}/licenses/eosio/LICENSE.leveldb COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/libraries/amqp-cpp/LICENSE
leordev marked this conversation as resolved.
Show resolved Hide resolved
${CMAKE_BINARY_DIR}/licenses/eosio/LICENSE.amqpcpp COPYONLY)

install(FILES LICENSE DESTINATION ${CMAKE_INSTALL_FULL_DATAROOTDIR}/licenses/eosio/ COMPONENT base)
install(FILES libraries/softfloat/COPYING.txt DESTINATION ${CMAKE_INSTALL_FULL_DATAROOTDIR}/licenses/eosio/ RENAME LICENSE.softfloat COMPONENT base)
Expand All @@ -233,6 +235,7 @@ install(FILES libraries/yubihsm/LICENSE DESTINATION ${CMAKE_INSTALL_FULL_DATAROO
install(FILES libraries/eos-vm/LICENSE DESTINATION ${CMAKE_INSTALL_FULL_DATAROOTDIR}/licenses/eosio/ RENAME LICENSE.eos-vm COMPONENT base)
install(FILES libraries/rocksdb/LICENSE.Apache DESTINATION ${CMAKE_INSTALL_FULL_DATAROOTDIR}/licenses/eosio/ RENAME LICENSE.rocksdb COMPONENT base)
install(FILES libraries/rocksdb/LICENSE.leveldb DESTINATION ${CMAKE_INSTALL_FULL_DATAROOTDIR}/licenses/eosio/ RENAME LICENSE.leveldb COMPONENT base)
install(FILES libraries/amqp-cpp/LICENSE DESTINATION ${CMAKE_INSTALL_FULL_DATAROOTDIR}/licenses/eosio/ RENAME LICENSE.amqpcpp COMPONENT base)

add_custom_target(base-install
COMMAND "${CMAKE_COMMAND}" --build "${CMAKE_BINARY_DIR}"
Expand Down
5 changes: 5 additions & 0 deletions libraries/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,8 @@ set_property(GLOBAL PROPERTY CTEST_CUSTOM_TESTS_IGNORE
attest pbkdf2 parsing ${_CTEST_CUSTOM_TESTS_IGNORE}")

add_subdirectory( rodeos )

find_package(OpenSSL REQUIRED)
option(AMQP-CPP_LINUX_TCP CACHE ON)
add_subdirectory( amqp-cpp EXCLUDE_FROM_ALL )
target_include_directories(amqpcpp PRIVATE "${OPENSSL_INCLUDE_DIR}")
1 change: 1 addition & 0 deletions libraries/amqp-cpp
Submodule amqp-cpp added at e7f76b
9 changes: 6 additions & 3 deletions programs/rodeos/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_executable( ${RODEOS_EXECUTABLE_NAME}
add_executable( ${RODEOS_EXECUTABLE_NAME}
cloner_plugin.cpp
streamer_plugin.cpp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better as:

file(GLOB SRC *.cpp *.hpp streams/*.hpp)
add_executable( ${RODEOS_EXECUTABLE_NAME}
  ${SRC}
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do that, you really ought to add CONFIGURE_DEPENDS. Even then it's not recommended by cmake

We do not recommend using GLOB to collect a list of source files from your source tree. If no CMakeLists.txt file changes when a source is added or removed then the generated build system cannot know when to ask CMake to regenerate. The CONFIGURE_DEPENDS flag may not work reliably on all generators, or if a new generator is added in the future that cannot support it, projects using it will be stuck. Even if CONFIGURE_DEPENDS works reliably, there is still a cost to perform the check on every rebuild.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We glob header files in other CMakeLists.txt and add that in. Is that the preferred method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONFIGURE_DEPENDS

Actually, this is a cmake 3.12 feature

main.cpp
rocksdb_plugin.cpp
wasm_ql_http.cpp
Expand Down Expand Up @@ -29,11 +30,13 @@ endif()

configure_file(config.hpp.in config.hpp ESCAPE_QUOTES)

target_include_directories(${RODEOS_EXECUTABLE_NAME} PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../../libraries/abieos/src )
target_include_directories(${RODEOS_EXECUTABLE_NAME} PUBLIC ${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/../../libraries/abieos/src
${CMAKE_CURRENT_SOURCE_DIR}/../../libraries/amqp-cpp/include)

target_link_libraries( ${RODEOS_EXECUTABLE_NAME}
PRIVATE appbase version
PRIVATE rodeos_lib fc ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} )
PRIVATE rodeos_lib fc amqpcpp ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS})

copy_bin( ${RODEOS_EXECUTABLE_NAME} )
install( TARGETS
Expand Down
21 changes: 15 additions & 6 deletions programs/rodeos/cloner_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ struct cloner_config : ship_client::connection_config {
};

struct cloner_plugin_impl : std::enable_shared_from_this<cloner_plugin_impl> {
std::shared_ptr<cloner_config> config = std::make_shared<cloner_config>();
std::shared_ptr<cloner_session> session;
boost::asio::deadline_timer timer;
std::shared_ptr<cloner_config> config = std::make_shared<cloner_config>();
std::shared_ptr<cloner_session> session;
boost::asio::deadline_timer timer;
std::optional<std::function<void(const char* data, uint64_t data_size)>> streamer = {};
heifner marked this conversation as resolved.
Show resolved Hide resolved

cloner_plugin_impl() : timer(app().get_io_service()) {}

Expand Down Expand Up @@ -157,9 +158,13 @@ struct cloner_session : ship_client::connection_callbacks, std::enable_shared_fr
rodeos_snapshot->write_block_info(result);
rodeos_snapshot->write_deltas(result, [] { return app().is_quiting(); });

// todo: remove
if (filter)
filter->process(*rodeos_snapshot, result, bin, [](const char*, uint64_t) {});
if (filter) {
filter->process(*rodeos_snapshot, result, bin, [&](const char* data, uint64_t data_size) {
if (my->streamer) {
(*my->streamer)(data, data_size);
}
});
}

rodeos_snapshot->end_block(result, false);
return true;
Expand Down Expand Up @@ -235,4 +240,8 @@ void cloner_plugin::plugin_shutdown() {
ilog("cloner_plugin stopped");
}

void cloner_plugin::set_streamer(std::function<void(const char* data, uint64_t data_size)> streamer_func) {
my->streamer = streamer_func;
}

} // namespace b1
2 changes: 2 additions & 0 deletions programs/rodeos/cloner_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class cloner_plugin : public appbase::plugin<cloner_plugin> {
void plugin_startup();
void plugin_shutdown();

void set_streamer(std::function<void(const char* data, uint64_t data_size)> streamer_function);

private:
std::shared_ptr<struct cloner_plugin_impl> my;
};
Expand Down
77 changes: 77 additions & 0 deletions programs/rodeos/streamer_plugin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// copyright defined in LICENSE.txt

#include "streamer_plugin.hpp"
#include "streams/logger.hpp"
#include "streams/rabbitmq.hpp"
#include "streams/stream.hpp"

#include <abieos.hpp>
#include <eosio/abi.hpp>
#include <fc/exception/exception.hpp>
#include <memory>

namespace b1 {
heifner marked this conversation as resolved.
Show resolved Hide resolved

using namespace appbase;
using namespace std::literals;

struct streamer_plugin_impl {
std::vector<std::unique_ptr<stream_handler>> streams;
};

static abstract_plugin& _streamer_plugin = app().register_plugin<streamer_plugin>();

streamer_plugin::streamer_plugin() : my(std::make_shared<streamer_plugin_impl>()) {}

streamer_plugin::~streamer_plugin() {}

void streamer_plugin::set_program_options(options_description& cli, options_description& cfg) {
auto op = cfg.add_options();
op("stream-rabbits", bpo::value<std::vector<string>>()->composing(),
"RabbitMQ Streams if any; Format: amqp://USER:PASSWORD@ADDRESS:PORT/QUEUE[/ROUTING_KEYS, ...]");
op("stream-loggers", bpo::value<std::vector<string>>()->composing(),
"Logger Streams if any; Format: [routing_keys, ...]");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation to the PR description that includes these options. Some example uses would be good also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indicate the routing_keys are eosio::name


void streamer_plugin::plugin_initialize(const variables_map& options) {
try {
if (options.count("stream-loggers")) {
auto loggers = options.at("stream-loggers").as<std::vector<std::string>>();
initialize_loggers(my->streams, loggers);
}

if (options.count("stream-rabbits")) {
auto rabbits = options.at("stream-rabbits").as<std::vector<std::string>>();
initialize_rabbits(app().get_io_service(), my->streams, rabbits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plugin really should have its own io_service and not use the main application thread io_service. This is fine for now. I'll create a JIRA issue to fix this.

}

ilog("initialized streams: ${streams}", ("streams", my->streams.size()));
}
FC_LOG_AND_RETHROW()
}

void streamer_plugin::plugin_startup() {
cloner_plugin* cloner = app().find_plugin<cloner_plugin>();
heifner marked this conversation as resolved.
Show resolved Hide resolved
if (cloner) {
cloner->set_streamer([this](const char* data, uint64_t data_size) { stream_data(data, data_size); });
}
}

void streamer_plugin::plugin_shutdown() {}

void streamer_plugin::stream_data(const char* data, uint64_t data_size) {
eosio::input_stream bin(data, data_size);
stream_wrapper res = eosio::from_bin<stream_wrapper>(bin);
auto& sw = std::get<stream_wrapper_v0>(res);
heifner marked this conversation as resolved.
Show resolved Hide resolved
publish_to_streams(sw);
}

void streamer_plugin::publish_to_streams(const stream_wrapper_v0& sw) {
for (auto& stream : my->streams) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
if (stream->check_route(sw.route)) {
stream->publish(sw.data.data(), sw.data.size());
}
}
}

} // namespace b1
38 changes: 38 additions & 0 deletions programs/rodeos/streamer_plugin.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// copyright defined in LICENSE.txt

#pragma once
#include <appbase/application.hpp>
#include <eosio/abi.hpp>

#include "cloner_plugin.hpp"

namespace b1 {

struct stream_wrapper_v0 {
eosio::name route;
std::vector<char> data;
};
EOSIO_REFLECT(stream_wrapper_v0, route, data);
using stream_wrapper = std::variant<stream_wrapper_v0>;

class streamer_plugin : public appbase::plugin<streamer_plugin> {

public:
APPBASE_PLUGIN_REQUIRES((cloner_plugin))

streamer_plugin();
virtual ~streamer_plugin();

virtual void set_program_options(appbase::options_description& cli, appbase::options_description& cfg) override;
void plugin_initialize(const appbase::variables_map& options);
void plugin_startup();
void plugin_shutdown();
void stream_data(const char* data, uint64_t data_size);

private:
std::shared_ptr<struct streamer_plugin_impl> my;

void publish_to_streams(const stream_wrapper_v0& sw);
};

} // namespace b1
32 changes: 32 additions & 0 deletions programs/rodeos/streams/logger.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "stream.hpp"
#include <fc/log/logger.hpp>

namespace b1 {
heifner marked this conversation as resolved.
Show resolved Hide resolved

class logger : public stream_handler {
std::vector<eosio::name> routes_;

public:
logger(std::vector<eosio::name> routes) : routes_(routes) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
ilog("logger initialized");
}

std::vector<eosio::name>& get_routes() { return routes_; }

void publish(const char* data, uint64_t data_size) {
ilog("logger stream [${data_size}] >> ${data}", ("data", data)("data_size", data_size));
}
};

inline void initialize_loggers(std::vector<std::unique_ptr<stream_handler>>& streams,
const std::vector<std::string>& loggers) {
for (auto routings : loggers) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
std::vector<eosio::name> routing_keys = extract_routings(routings);
logger logger_streamer{ routing_keys };
streams.emplace_back(std::make_unique<logger>(logger_streamer));
heifner marked this conversation as resolved.
Show resolved Hide resolved
}
}

} // namespace b1
86 changes: 86 additions & 0 deletions programs/rodeos/streams/rabbitmq.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#pragma once

#include "stream.hpp"
#include "amqpcpp.h"
#include "amqpcpp/libboostasio.h"
#include "amqpcpp/linux_tcp.h"
#include <fc/log/logger.hpp>
#include <memory>

namespace b1 {
heifner marked this conversation as resolved.
Show resolved Hide resolved

class rabbitmq_handler;

class rabbitmq : public stream_handler {
std::shared_ptr<rabbitmq_handler> handler_;
std::shared_ptr<AMQP::TcpConnection> connection_;
std::shared_ptr<AMQP::TcpChannel> channel_;
std::string name_;
std::vector<eosio::name> routes_;

public:
rabbitmq(boost::asio::io_service& io_service, std::vector<eosio::name> routes, std::string address, std::string name)
: name_(name), routes_(routes) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
AMQP::Address amqp_address(address);
ilog("Connecting to RabbitMQ address ${a} - Queue: ${q}...", ("a", std::string(amqp_address))("q", name));
heifner marked this conversation as resolved.
Show resolved Hide resolved

handler_ = std::make_shared<rabbitmq_handler>(io_service);
connection_ = std::make_shared<AMQP::TcpConnection>(handler_.get(), amqp_address);
channel_ = std::make_shared<AMQP::TcpChannel>(connection_.get());
declare_queue();
}

std::vector<eosio::name>& get_routes() { return routes_; }
heifner marked this conversation as resolved.
Show resolved Hide resolved

void publish(const char* data, uint64_t data_size) { channel_->publish("", name_, data, data_size, 0); }

private:
void declare_queue() {
auto& queue = channel_->declareQueue(name_, AMQP::durable);
queue.onSuccess([](const std::string& name, uint32_t messagecount, uint32_t consumercount) {
ilog("RabbitMQ Connected Successfully!\n Queue ${q} - Messages: ${mc} - Consumers: ${cc}",
("q", name)("mc", messagecount)("cc", consumercount));
});
queue.onError([](const char* error_message) {
throw std::runtime_error("RabbitMQ Queue error: " + std::string(error_message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm looking at amqp-cpp correctly you should not throw from inside onError. I don't think amqp-cpp is setup to handle that correctly. Instead just elog the error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently needed to shutdown amqp-cpp

});
}
};

class rabbitmq_handler : public AMQP::LibBoostAsioHandler {
public:
rabbitmq_handler(boost::asio::io_service& io_service) : AMQP::LibBoostAsioHandler(io_service) {}
heifner marked this conversation as resolved.
Show resolved Hide resolved

void onError(AMQP::TcpConnection* connection, const char* message) {
throw std::runtime_error("rabbitmq connection failed: " + std::string(message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm looking at amqp-cpp correctly you should not throw from inside onError. I don't think amqp-cpp is setup to handle that correctly. Instead just elog the error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently needed to shutdown amqp-cpp

}
};

inline void initialize_rabbits(boost::asio::io_service& io_service,
std::vector<std::unique_ptr<stream_handler>>& streams,
const std::vector<std::string>& rabbits) {
for (auto rabbit : rabbits) {
rabbit = rabbit.substr(7, rabbit.length());
size_t pos = rabbit.find("/");
size_t pos_router = rabbit.find_last_of("/");
bool has_router = pos_router != pos;

std::vector<eosio::name> routings = {};
if (has_router) {
routings = extract_routings(rabbit.substr(pos_router + 1, rabbit.length()));
rabbit.erase(pos_router, rabbit.length());
}

std::string queue_name = "stream.default";
if (pos != std::string::npos) {
queue_name = rabbit.substr(pos + 1, rabbit.length());
rabbit.erase(pos, rabbit.length());
}

std::string address = "amqp://" + rabbit;
rabbitmq rmq{ io_service, routings, address, queue_name };
heifner marked this conversation as resolved.
Show resolved Hide resolved
streams.emplace_back(std::make_unique<rabbitmq>(rmq));
heifner marked this conversation as resolved.
Show resolved Hide resolved
}
}

} // namespace b1
43 changes: 43 additions & 0 deletions programs/rodeos/streams/stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once
#include <eosio/abi.hpp>
#include <fc/log/logger.hpp>

namespace b1 {

class stream_handler {
public:
virtual ~stream_handler() {}
virtual std::vector<eosio::name>& get_routes() = 0;
virtual void publish(const char* data, uint64_t data_size) = 0;

virtual bool check_route(const eosio::name& stream_route) {
if (get_routes().size() == 0) {
return true;
}

for (const auto& name : get_routes()) {
if (name == stream_route) {
return true;
}
}

return false;
}
};

inline std::vector<eosio::name> extract_routings(std::string routings) {
std::vector<eosio::name> routing_keys{};
while (routings.size() > 0) {
size_t pos = routings.find(",");
size_t route_length = pos == std::string::npos ? routings.length() : pos;
std::string route = routings.substr(0, pos);
ilog("extracting route ${route}", ("route", route));
if (route != "*") {
heifner marked this conversation as resolved.
Show resolved Hide resolved
routing_keys.emplace_back(eosio::name(route));
}
routings.erase(0, route_length + 1);
}
return routing_keys;
}

} // namespace b1