-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a shared ExecutorService for all AMQP connections / plugin instances #19
Comments
March Hare allows you to use any executor service you want. |
I -1 the idea of using the single threaded executor by default. It can be |
Hello, Yes, it was a very dirty workaround :) How can I do that properly ? By using the ":thread_pool_size" and or the ":executor_factory" parameter in the the self.connect method ? |
@micoq correct. The former is for cases when you only want to specify pool size and nothing else (the executor is of a fixed size). I'd probably recommend calculating pool size as |
Also worth pointing out that the setting is per I'd consider an alternative API for March Hare that would e.g. accept a connection factory. The next release will be |
So, we could create a single Just one more question: I don't really understand the exact purpose of the consumer work service executor, does it handle the consumer I/O, serialization of AMQP frames, acks... ? In VisualVM, the threads from this pool ( |
@micoq correct.
I think you can find enough comments in the Java client code as well test examples to learn more. |
@micoq well, change your workload to do something more intensive/time consuming and you will see a different pool thread utilization picture. |
@michaelklishin Thank you for the explanation. I could use the Java client directly in a custom application to charge the consumer threads. However, in Logstash, these threads never do an CPU-intensive job: They just put the paylod in a queue. The queue may block but in this case, the bottleneck are the worker threads which handle the filters (and the outputs) so all the CPU-intensives tasks (Groks, fields manipulations...) or the input thread themselves (payload deserialization, codec...) just after the queue if I don't make any mistake. Anyway, the |
@micoq I didn't say "CPU intensive", I said "time consuming." I'll see what would a suitable connection method look like in March Hare soon. |
@michaelklishin Yes, I just said "CPU intensive" compared to the job done by the workers threads in Logstash. |
@micoq thanks for the excellent sleuthing. We definitely should fix this issue. @michaelklishin I would love to see that option added. We'd use a single global executor for all of logstash, with perhaps |
Without touching the march_hare code, could it be possible to share an unique connection with multiple consumers (if all the rabbitmq input instances have to the same server/port/credentials) e.g. by caching the connection among the input instances ? In the ideal case, the input thread and the consumer thread could also be merged into an unique thread (so the internal_queue between the two threads could be removed). However, in practice, the consumer thread is declared as a callback and the run() method can't return before Logstash stops. Meanwhile, I discovered an issue in Logstash 5.1.1 where too many threads can severely slow down the whole pipeline : see here. If the monitoring can't be disabled, I think it should be good to reuse/share the threads as much as possible across the JVM. |
@micoq I think that kind of connection sharing makes a ton of sense. In the long run I'd like to make it explicit via new additions to the logstash grammar, but in the meantime I think it makes sense for plugins to detect this. I would say the best way to do this would be for the rabbitmq_connection mixin to set class variables in a special namespace as singletons and manage that pool. I'm glad to tackle that, but given some other priorities that change would be at least a month away. Do you have any interest in tackling that patch? I'm glad to give speedy PR reviews. |
In case it would be necessary (or more convenient) to extend March Hare to expose the new Java client settings, let me know. I plan to work on a March Hare release based on the 4.x Java client quite soon. |
@andrewvc I made a small patch on the mixin rabbitmq_connection. There's a new parameter After some tests, the best solution seems to have a single connection for the inputs and another connection for the outputs (on a single RabbitMQ). I think the consumers can be slowed down by the whole connection (in flow state). @michaelklishin I didn't have to touch march hare for this but for our earlier discussion it could be a good idea to let the user pass an already initialized executor to the self.connect method (and by default, to use an executor with |
I was curious where this was left off, we also connect to a number of RabbitMQ instances from our indexers that would likely benefit from having this in place. |
@jakauppila I haven't looked at a new March Hare version but would definitely find some time to review/guide a PR that adds support for the new Java client options. Integrating them into the Logstash plugins then should be pretty straightforward. |
Currently, for each AMQP connection, an ExecutorService is created with a number of threads equal to two times the number of CPUs/cores. These threads seems always to be in sleeping/blocking state (checked with VisualVM).
On a system with many cores and many AMQP connections this can waste a large amount of memory and sometimes slow down the whole pipeline.
Example on my environment:
Total of sleeping threads : 7680 (160 connections24 cores2) for about 2GB of threads stacks.
If we look at the code, the RabbitMQ client library create a FixedThreadPool with 2*core threads in the constructor of com.rabbitmq.client.impl.ConsumerWorkService if no pool is provided:
Actually, the March Hare library (which uses the RabbitMQ client) should provide this pool in the connection factory (com.rabbitmq.client.ConnectionFactory).
Here is a quick&dirty workaround to create a pool with only one thread for each connection by patching March Hare (/usr/share/logstash/vendor/bundle/jruby/1.9/gems/march_hare-2.20.0-java/lib/march_hare/session.rb). Add this line in self.connect:
cf.setSharedExecutor(Executors.newSingleThreadExecutor)
and the import line at the beginning:
java_import java.util.concurrent.Executors
On my setup:
We can even use a single thread for all the connections without a loss of speed (@4000 events/s with filters). In this case, the thread is running but not heavily used. It seems to handle the consumer delivery in the RabbitMQ client library.
The text was updated successfully, but these errors were encountered: