Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Rodeos with Streaming Plugin #9029

Merged
merged 18 commits into from
May 11, 2020
Merged

Rodeos with Streaming Plugin #9029

merged 18 commits into from
May 11, 2020

Conversation

leordev
Copy link
Contributor

@leordev leordev commented Apr 30, 2020

Change Description

This builds on #9018

This is a proposed Streamer Plugin to publish messages to external brokers. As a reference,
we implement two basic streams: a rabbitmq one, using amqp-cpp lib; and also a simple logger
stream, in case you want to print the filter data to your own logs.

Consensus Changes

  • Consensus Changes

API Changes

  • API Changes

Documentation Additions

  • Documentation Additions

To publish the queues from your filter contract, add the following code that will stream data
with the push_data intrinsic:

// struct required for streaming data
struct stream_wrapper_v0 {
    eosio::name       route;
    std::vector<char> data;
};
EOSIO_REFLECT(stream_wrapper_v0, route, data);
using stream_wrapper = std::variant<stream_wrapper_v0>;

// helper streaming serializer function
inline void push_stream(const name route, const std::string& text) {
    stream_wrapper    wrapper = stream_wrapper_v0{.route = route, .data = {text.begin(), text.end()}};
    std::vector<char> bin;
    eosio::convert_to_bin(wrapper, bin);
    eosio::push_data(bin);
}

// ... then inside your filter actions ...
// this will push the `hello world` message to the eos routing key
push_stream("eos"_n, "hello world");

--plugin b1::streamer_plugin

Enables the streamer plugin in rodeos instance

--stream-rabbits amqp://guest:guest@localhost:5672/myqueue/mykey

Adds a RabbitMQ streaming.

  • you can declare multiple rabbitmq connections
  • the template is always amqp://USER:PASSWORD@HOST:PORT/QUEUE_NAME[/KEY1,KEY2]
    • amqp:// prefix is fixed
    • USER is the rabbitmq user
    • PASSWORD is the rabbitmq password
    • HOST is the address of rabbitmq host
    • PORT is the rabbitmq's port
    • QUEUE_NAME is the name of the queue that you want to publish the messages
    • /KEY1,KEY2 is a set of routing keys that you want to publish to your queue. the default is empty, which means will publish any routing keys to this key; you can separate keys names by commas to restrict routing keys; the types of the key names are eosio::name, ex: eos,issue will publish only the messages that has either eos or issue as its routing key

--stream-loggers "*"

Add a Logger streaming

  • the logger only needs the keys that you want to print
  • if it's equal to * it will accept any routing keys
  • similar to the RabbitMQ streaming example you can restrict and filter only certain keys, separated by commas: --stream-loggers "eos,issue"

Additional Implementations

If you want to add your own streaming plugin, feel free to implement the stream_handler class, implementing the void publish(const char* data, uint64_t data_size) method. You will need to add new config params and a way to initialize it in the streamer_plugin::plugin_initialize.

@leordev leordev requested a review from tbfleming April 30, 2020 16:18
@leordev leordev force-pushed the rodeos_tester-streams branch from 032222d to 44201ec Compare May 4, 2020 15:02
@tbfleming
Copy link
Contributor

      if (filter && my->streamer) {
         filter->process(*rodeos_snapshot, result, bin,

This still needs fixing. cloner_plugin needs to operate even when there is no streamer.

@leordev leordev force-pushed the rodeos_tester-streams branch from 44201ec to 0e1c4d2 Compare May 5, 2020 18:32
@spoonincode
Copy link
Contributor

since amqp-cpp is apache licensed, I think we need to include its license file in the install. Toward the bottom of the root CMakeLists.txt is where we tend to add those.

.gitmodules Outdated Show resolved Hide resolved
@leordev leordev requested a review from spoonincode May 11, 2020 15:09
"RabbitMQ Streams if any; Format: amqp://USER:PASSWORD@ADDRESS:PORT/QUEUE[/ROUTING_KEYS, ...]");
op("stream-loggers", bpo::value<std::vector<string>>()->composing(),
"Logger Streams if any; Format: [routing_keys, ...]");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation to the PR description that includes these options. Some example uses would be good also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indicate the routing_keys are eosio::name

cloner_plugin.cpp
streamer_plugin.cpp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better as:

file(GLOB SRC *.cpp *.hpp streams/*.hpp)
add_executable( ${RODEOS_EXECUTABLE_NAME}
  ${SRC}
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do that, you really ought to add CONFIGURE_DEPENDS. Even then it's not recommended by cmake

We do not recommend using GLOB to collect a list of source files from your source tree. If no CMakeLists.txt file changes when a source is added or removed then the generated build system cannot know when to ask CMake to regenerate. The CONFIGURE_DEPENDS flag may not work reliably on all generators, or if a new generator is added in the future that cannot support it, projects using it will be stuck. Even if CONFIGURE_DEPENDS works reliably, there is still a cost to perform the check on every rebuild.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We glob header files in other CMakeLists.txt and add that in. Is that the preferred method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONFIGURE_DEPENDS

Actually, this is a cmake 3.12 feature

("q", name)("mc", messagecount)("cc", consumercount));
});
queue.onError([](const char* error_message) {
throw std::runtime_error("RabbitMQ Queue error: " + std::string(error_message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm looking at amqp-cpp correctly you should not throw from inside onError. I don't think amqp-cpp is setup to handle that correctly. Instead just elog the error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently needed to shutdown amqp-cpp

rabbitmq_handler(boost::asio::io_service& io_service) : AMQP::LibBoostAsioHandler(io_service) {}

void onError(AMQP::TcpConnection* connection, const char* message) {
throw std::runtime_error("rabbitmq connection failed: " + std::string(message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm looking at amqp-cpp correctly you should not throw from inside onError. I don't think amqp-cpp is setup to handle that correctly. Instead just elog the error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently needed to shutdown amqp-cpp


if (options.count("stream-rabbits")) {
auto rabbits = options.at("stream-rabbits").as<std::vector<std::string>>();
initialize_rabbits(app().get_io_service(), my->streams, rabbits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plugin really should have its own io_service and not use the main application thread io_service. This is fine for now. I'll create a JIRA issue to fix this.

Copy link
Contributor

@heifner heifner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments.

@leordev leordev merged commit e641c4d into develop May 11, 2020
@leordev leordev deleted the rodeos_tester-streams branch May 11, 2020 22:44
@swang-b1
Copy link

Hi, I tested this stream function with rabbitmq, the connection drop because server side not receive any heartbeat. It only works when I config server side with heartbeat=0 to disable heartbeat. is this expected ?

@leordev
Copy link
Contributor Author

leordev commented May 15, 2020

@swang-b1 hmmm so I guess the amqp-cpp implementation that I imported does not manage heartbeat out of the box. For now we can set the rabbitmq config heartbeat=0 and add this issue to our backlog.

@spoonincode
Copy link
Contributor

The documentation is deceptive.

The default behavior of the AMQP-CPP library is to disable heartbeats

That's true if you're using a vanilla AMQP::Connection but not true if you're using AMQP::TcpConnection. AMQP::TcpConnection will accept the server's recommended heartbeat interval.

But then the asio integration doesn't set up a timer to pump the heartbeat 😞
https://github.com/CopernicaMarketingSoftware/AMQP-CPP/blob/1c08399ab0ab9e4042ef8e2bbe9e208e5dcbc13b/include/amqpcpp/libboostasio.h#L533-L535

For now, this fault can be mitigated by doing something like

   uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval) override {
      return 0;
   }

in our own handler. That will disable heartbeats on the connection no matter how the server is configured.

@swang-b1
Copy link

Cool, this will work.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants