From 523a1dda48b2cbc1a64014918a9e84974135b466 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Fri, 22 Mar 2024 15:40:55 +0100 Subject: [PATCH] - Added AsyncExecutor (https://issues.redhat.com/browse/JGRP-2603) - Added async down() method to Protocol, JChannel and ProtocolStack (https://issues.redhat.com/browse/JGRP-2603) - Added async_executor to TP - Added BufferSizeTest - RED now handles messages with DONT_BLOCK set - Added async down() method (https://issues.redhat.com/browse/JGRP-2603) - MFC doesn't block sender when credits are exhausted, but DONT_BLOCK flag is set: the message is discarded --- bin/jgroups.sh | 4 +- src/org/jgroups/JChannel.java | 17 ++- src/org/jgroups/blocks/MessageDispatcher.java | 9 +- src/org/jgroups/protocols/FlowControl.java | 69 ++++------ src/org/jgroups/protocols/MFC.java | 10 +- src/org/jgroups/protocols/MsgStats.java | 7 - src/org/jgroups/protocols/RED.java | 2 + .../SimplifiedTransferQueueBundler.java | 2 +- src/org/jgroups/protocols/TP.java | 28 ++-- .../protocols/TransferQueueBundler.java | 18 +-- src/org/jgroups/stack/Protocol.java | 33 ++++- src/org/jgroups/stack/ProtocolStack.java | 5 + src/org/jgroups/util/AsyncExecutor.java | 130 ++++++++++++++++++ src/org/jgroups/util/ThreadPool.java | 78 +++++++---- src/org/jgroups/util/Util.java | 5 + .../org/jgroups/protocols/MFC_Test.java | 98 ++++++++++++- .../org/jgroups/tests/BufferSizeTest.java | 62 +++++++++ 17 files changed, 465 insertions(+), 112 deletions(-) create mode 100644 src/org/jgroups/util/AsyncExecutor.java create mode 100644 tests/other/org/jgroups/tests/BufferSizeTest.java diff --git a/bin/jgroups.sh b/bin/jgroups.sh index c29643099c4..31d5e12d4f6 100755 --- a/bin/jgroups.sh +++ b/bin/jgroups.sh @@ -33,7 +33,7 @@ if [ -f $HOME/logging.properties ]; then fi; #JG_FLAGS="-Djgroups.bind_addr=match-address:192.168.1.*" -FLAGS="-server -Xmx1G -Xms500M" +FLAGS="-server -Xmx1G -Xms500M -XX:+HeapDumpOnOutOfMemoryError" #FLAGS="$FLAGS -Duser.language=de" @@ -49,5 +49,5 @@ FLAGS="-server -Xmx1G -Xms500M" # SSL_FLAGS="-Djavax.net.debug=ssl:handshake" -java -cp $CP $SSL_FLAGS $DEBUG $LOG $JG_FLAGS $FLAGS $JMX $JMC $* +java -cp $CP $SSL_FLAGS $DEBUG $LOG $JG_FLAGS $FLAGS $JMX $JMC $* diff --git a/src/org/jgroups/JChannel.java b/src/org/jgroups/JChannel.java index b3e87719fb3..9bc6a6473c9 100644 --- a/src/org/jgroups/JChannel.java +++ b/src/org/jgroups/JChannel.java @@ -13,11 +13,15 @@ import org.jgroups.util.UUID; import org.jgroups.util.*; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.InputStream; import java.net.Inet6Address; import java.net.InetAddress; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; import java.util.function.Consumer; @@ -612,6 +616,17 @@ public Object down(Message msg) { return msg != null? prot_stack.down(msg) : null; } + /** + * Sends a message down asynchronously. The sending is executed in the transport's thread pool. If the pool is full + * and the message is marked as {@link org.jgroups.Message.TransientFlag#DONT_BLOCK}, then it will be dropped, + * otherwise it will be sent on the caller's thread. + * @param msg The message to be sent + * @param async Whether to send the message asynchronously + * @return A CompletableFuture of the result (or exception) + */ + public CompletableFuture down(Message msg, boolean async) { + return msg != null? prot_stack.down(msg, async) : null; + } /** * Callback method
diff --git a/src/org/jgroups/blocks/MessageDispatcher.java b/src/org/jgroups/blocks/MessageDispatcher.java index a5fdba5937a..2ad7b916efc 100644 --- a/src/org/jgroups/blocks/MessageDispatcher.java +++ b/src/org/jgroups/blocks/MessageDispatcher.java @@ -71,6 +71,9 @@ public MessageDispatcher(JChannel channel) { channel.addChannelListener(this); local_addr=channel.getAddress(); installUpHandler(prot_adapter, true); + Protocol top_prot=channel.stack() != null? channel.stack().getTopProtocol() : null; + if(top_prot != null) + prot_adapter.setDownProt(top_prot); } start(); } @@ -453,7 +456,7 @@ public void channelClosed(JChannel channel) { } - class ProtocolAdapter extends Protocol implements UpHandler { + protected class ProtocolAdapter extends Protocol implements UpHandler { /* ------------------------- Protocol Interface --------------------------- */ @@ -463,6 +466,10 @@ public String getName() { return "MessageDispatcher"; } + public T setDownProt(Protocol d) { + down_prot=d; + return (T)this; + } public T setAddress(Address addr) { local_addr=addr; diff --git a/src/org/jgroups/protocols/FlowControl.java b/src/org/jgroups/protocols/FlowControl.java index c788e34adb5..4505a7487b7 100644 --- a/src/org/jgroups/protocols/FlowControl.java +++ b/src/org/jgroups/protocols/FlowControl.java @@ -63,14 +63,21 @@ public abstract class FlowControl extends Protocol { @Property(description="Computed as max_credits x min_theshold unless explicitly set",type=AttributeType.BYTES) protected long min_credits; + @ManagedAttribute(description="Number of credit requests received",type=AttributeType.SCALAR) + protected long num_credit_requests_received; - - - /* --------------------------------------------- JMX -------------------------------------------------- */ - protected int num_credit_requests_received, num_credit_requests_sent; - protected int num_credit_responses_sent, num_credit_responses_received; + @ManagedAttribute(description="Number of credit requests sent",type=AttributeType.SCALAR) + protected long num_credit_requests_sent; + + @ManagedAttribute(description="Number of credit responses received",type=AttributeType.SCALAR) + protected long num_credit_responses_received; + @ManagedAttribute(description="Number of credit responses sent",type=AttributeType.SCALAR) + protected long num_credit_responses_sent; + @ManagedAttribute(description="Number of messages dropped due to DONT_BLOCK flag",type=AttributeType.SCALAR) + protected long num_msgs_dropped; + /* --------------------------------------------- Fields ------------------------------------------------------ */ @@ -83,51 +90,32 @@ public abstract class FlowControl extends Protocol { /** Whether FlowControl is still running, this is set to false when the protocol terminates (on stop()) */ protected volatile boolean running=true; - protected int frag_size; // remember frag_size from the fragmentation protocol - public void resetStats() { super.resetStats(); num_credit_responses_sent=num_credit_responses_received=num_credit_requests_received=num_credit_requests_sent=0; + num_msgs_dropped=0; } - public long getMaxCredits() {return max_credits;} - public T setMaxCredits(long m) {max_credits=m; return (T)this;} - public double getMinThreshold() {return min_threshold;} - public T setMinThreshold(double m) {min_threshold=m; return (T)this;} - public long getMinCredits() {return min_credits;} - public T setMinCredits(long m) {min_credits=m; return (T)this;} - public long getMaxBlockTime() {return max_block_time;} - public T setMaxBlockTime(long t) {max_block_time=t; return (T)this;} - - - public abstract int getNumberOfBlockings(); - - public abstract double getAverageTimeBlocked(); - - @ManagedAttribute(description="Number of credit requests received",type=AttributeType.SCALAR) - public int getNumberOfCreditRequestsReceived() { - return num_credit_requests_received; - } - - @ManagedAttribute(description="Number of credit requests sent",type=AttributeType.SCALAR) - public int getNumberOfCreditRequestsSent() { - return num_credit_requests_sent; - } - - @ManagedAttribute(description="Number of credit responses received",type=AttributeType.SCALAR) - public int getNumberOfCreditResponsesReceived() { - return num_credit_responses_received; - } + public abstract int getNumberOfBlockings(); + public abstract double getAverageTimeBlocked(); + public long getMaxCredits() {return max_credits;} + public T setMaxCredits(long m) {max_credits=m; return (T)this;} + public double getMinThreshold() {return min_threshold;} + public T setMinThreshold(double m) {min_threshold=m; return (T)this;} + public long getMinCredits() {return min_credits;} + public T setMinCredits(long m) {min_credits=m; return (T)this;} + public long getMaxBlockTime() {return max_block_time;} + public T setMaxBlockTime(long t) {max_block_time=t; return (T)this;} + public long getNumberOfCreditRequestsReceived() {return num_credit_requests_received;} + public long getNumberOfCreditRequestsSent() {return num_credit_requests_sent;} + public long getNumberOfCreditResponsesReceived() {return num_credit_responses_received;} + public long getNumberOfCreditResponsesSent() {return num_credit_responses_sent;} - @ManagedAttribute(description="Number of credit responses sent",type=AttributeType.SCALAR) - public int getNumberOfCreditResponsesSent() { - return num_credit_responses_sent; - } public abstract String printSenderCredits(); @@ -164,8 +152,7 @@ public void unblock() { ; } - public void init() throws Exception { - boolean min_credits_set = min_credits != 0; + public void init() throws Exception { boolean min_credits_set = min_credits != 0; if(!min_credits_set) min_credits=(long)(max_credits * min_threshold); } diff --git a/src/org/jgroups/protocols/MFC.java b/src/org/jgroups/protocols/MFC.java index 82e9e81f80e..eec9a14305b 100644 --- a/src/org/jgroups/protocols/MFC.java +++ b/src/org/jgroups/protocols/MFC.java @@ -115,18 +115,22 @@ protected Object handleDownMessage(final Message msg, int length) { if(dest != null) // 2nd line of defense, not really needed return down_prot.down(msg); + boolean dont_block=msg.isFlagSet(Message.TransientFlag.DONT_BLOCK); while(running) { - boolean rc=credits.decrement(msg, length, max_block_time); + long timeout=dont_block? 0 : max_block_time; // timeout == 0 won't block the decrement() below + boolean rc=credits.decrement(msg, length, timeout); if(rc || !running) break; - if(needToSendCreditRequest()) { List> targets=credits.getMembersWithCreditsLessThan(min_credits); for(Tuple tuple: targets) sendCreditRequest(tuple.getVal1(), Math.min(max_credits, max_credits - tuple.getVal2())); } + if(dont_block) { + num_msgs_dropped++; + return null; + } } - // send message - either after regular processing, or after blocking (when enough credits are available again) return down_prot.down(msg); } diff --git a/src/org/jgroups/protocols/MsgStats.java b/src/org/jgroups/protocols/MsgStats.java index 353cfa9c723..ee4217f0d4e 100644 --- a/src/org/jgroups/protocols/MsgStats.java +++ b/src/org/jgroups/protocols/MsgStats.java @@ -47,9 +47,6 @@ public class MsgStats { /** The average number of messages in a received {@link MessageBatch} */ protected final AverageMinMax avg_batch_size=new AverageMinMax(); - @ManagedAttribute(description="Number of messages rejected by the thread pool (because it was full)",type=SCALAR) - protected final LongAdder num_rejected_msgs=new LongAdder(); - @ManagedAttribute(description="Number of multicast bytes sent",type=BYTES) protected final LongAdder num_mcast_bytes_sent=new LongAdder(); @ManagedAttribute(description="Number of multicast bytes received",type=BYTES) @@ -105,9 +102,6 @@ public class MsgStats { public long getNumMcastBytesReceived() {return num_mcast_bytes_received.sum();} - public long getNumRejectedMsgs() {return num_rejected_msgs.sum();} - public MsgStats incrNumRejectedMsgs() {num_rejected_msgs.increment(); return this;} - public MsgStats sent(Address dest, int length) { if(!enabled) @@ -154,7 +148,6 @@ public MsgStats reset() { num_mcasts_sent, num_mcasts_received, num_single_msgs_sent, num_single_msgs_received, num_batches_sent, num_batches_received, - num_rejected_msgs, num_mcast_bytes_sent, num_mcast_bytes_received, num_ucast_bytes_sent, num_ucast_bytes_received) .forEach(LongAdder::reset); diff --git a/src/org/jgroups/protocols/RED.java b/src/org/jgroups/protocols/RED.java index 91c3ee57dfc..f86af1de55a 100644 --- a/src/org/jgroups/protocols/RED.java +++ b/src/org/jgroups/protocols/RED.java @@ -94,6 +94,8 @@ public void resetStats() { } public Object down(Message msg) { + if(msg.isFlagSet(Message.TransientFlag.DONT_BLOCK)) + return down_prot.down(msg); if(enabled && bundler != null) { int current_queue_size=bundler.getQueueSize(); double avg; diff --git a/src/org/jgroups/protocols/SimplifiedTransferQueueBundler.java b/src/org/jgroups/protocols/SimplifiedTransferQueueBundler.java index e9e57955ac5..118d13e152b 100644 --- a/src/org/jgroups/protocols/SimplifiedTransferQueueBundler.java +++ b/src/org/jgroups/protocols/SimplifiedTransferQueueBundler.java @@ -21,7 +21,7 @@ public SimplifiedTransferQueueBundler() { } public SimplifiedTransferQueueBundler(int capacity) { - super(new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity))); + super(new ArrayBlockingQueue<>(Util.assertPositive(capacity, "bundler capacity cannot be " + capacity))); } public int size() { diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 6f2c9159a1a..f3c657cd2e0 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -371,7 +371,10 @@ public T enableBlockingTimerTasks(boolean flag) { /** The thread pool which handles unmarshalling, version checks and dispatching of messages */ @Component(name="thread_pool") - protected ThreadPool thread_pool=new ThreadPool(this); + protected ThreadPool thread_pool=new ThreadPool().log(this.log); + + @Component(name="async_executor") + protected AsyncExecutor async_executor=new AsyncExecutor<>(thread_pool); /** Factory which is used by the thread pool */ protected ThreadFactory thread_factory; @@ -479,6 +482,9 @@ public void resetStats() { msg_processing_policy.reset(); if(local_transport != null) local_transport.resetStats(); + if(thread_pool != null) + thread_pool.resetStats(); + async_executor.resetStats(); } public T registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) { @@ -556,17 +562,20 @@ public T setThreadPool(Executor thread_pool) { return (T)this; } - public ThreadFactory getThreadPoolThreadFactory() { + public ThreadFactory getThreadFactory() { return thread_factory; } - public T setThreadPoolThreadFactory(ThreadFactory factory) { + public T setThreadFactory(ThreadFactory factory) { thread_factory=factory; if(thread_pool != null) thread_pool.setThreadFactory(factory); return (T)this; } + public AsyncExecutor getAsyncExecutor() {return async_executor;} + public T setAsyncExecutor(AsyncExecutor e) {async_executor=e; return (T)this;} + public TimeScheduler getTimer() {return timer;} /** @@ -588,11 +597,6 @@ public T setTimeService(TimeService ts) { return (T)this; } - public ThreadFactory getThreadFactory() { - return thread_factory; - } - - public T setThreadFactory(ThreadFactory factory) {thread_factory=factory; return (T)this;} public SocketFactory getSocketFactory() { return socket_factory; @@ -694,6 +698,7 @@ public void init() throws Exception { log.debug("use_virtual_threads was set to false, as virtual threads are not available in this Java version"); use_virtual_threads=false; } + thread_pool.useVirtualThreads(this.use_virtual_threads); if(local_transport_class != null) { Class cl=Util.loadClass(local_transport_class, getClass()); @@ -702,8 +707,8 @@ public void init() throws Exception { } if(thread_factory == null) - thread_factory=new LazyThreadFactory("jgroups", false, true) - .useVirtualThreads(use_virtual_threads).log(this.log); + setThreadFactory(new LazyThreadFactory("jgroups", false, true) + .useVirtualThreads(use_virtual_threads).log(this.log)); // local_addr is null when shared transport, channel_name is not used setInAllThreadFactories(cluster_name != null? cluster_name.toString() : null, local_addr, thread_naming_pattern); @@ -767,6 +772,8 @@ public String toString() { /** Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads */ public void start() throws Exception { timer.start(); + thread_pool.setAddress(local_addr); + async_executor.start(); if(time_service != null) time_service.start(); fetchLocalAddresses(); @@ -785,6 +792,7 @@ public void stop() { if(time_service != null) time_service.stop(); timer.stop(); + async_executor.stop(); } public void destroy() { diff --git a/src/org/jgroups/protocols/TransferQueueBundler.java b/src/org/jgroups/protocols/TransferQueueBundler.java index dff42447695..953aaa9c341 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler.java +++ b/src/org/jgroups/protocols/TransferQueueBundler.java @@ -5,6 +5,7 @@ import org.jgroups.annotations.ManagedAttribute; import org.jgroups.annotations.Property; import org.jgroups.util.AverageMinMax; +import org.jgroups.util.Util; import java.util.ArrayList; import java.util.List; @@ -48,16 +49,19 @@ protected TransferQueueBundler(BlockingQueue queue) { } public TransferQueueBundler(int capacity) { - this(new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity))); + this(new ArrayBlockingQueue<>(Util.assertPositive(capacity, "bundler capacity cannot be " + capacity))); } - public Thread getThread() {return bundler_thread;} + public Thread getThread() {return bundler_thread;} @ManagedAttribute(description="Size of the queue") - public int getQueueSize() {return queue.size();} + public int getQueueSize() {return queue.size();} @ManagedAttribute(description="Size of the remove-queue") - public int removeQueueSize() {return remove_queue.size();} + public int removeQueueSize() {return remove_queue.size();} + + public boolean dropWhenFull() {return drop_when_full;} + public T dropWhenFull(boolean d) {this.drop_when_full=d; return (T)this;} @Override @@ -75,7 +79,7 @@ public synchronized void start() { if(running) stop(); // todo: replace with LinkedBlockingQueue and measure impact (if any) on perf - queue=new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity)); + queue=new ArrayBlockingQueue<>(Util.assertPositive(capacity, "bundler capacity cannot be " + capacity)); bundler_thread=transport.getThreadFactory().newThread(this, THREAD_NAME); running=true; bundler_thread.start(); @@ -161,8 +165,4 @@ protected void drain() { } - protected static int assertPositive(int value, String message) { - if(value <= 0) throw new IllegalArgumentException(message); - return value; - } } diff --git a/src/org/jgroups/stack/Protocol.java b/src/org/jgroups/stack/Protocol.java index 34ccb1ecbe9..d018397fc7e 100644 --- a/src/org/jgroups/stack/Protocol.java +++ b/src/org/jgroups/stack/Protocol.java @@ -13,17 +13,16 @@ import org.jgroups.logging.Log; import org.jgroups.logging.LogFactory; import org.jgroups.protocols.TP; -import org.jgroups.util.MessageBatch; -import org.jgroups.util.SocketFactory; -import org.jgroups.util.ThreadFactory; -import org.jgroups.util.Util; +import org.jgroups.util.*; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; @@ -68,7 +67,6 @@ public abstract class Protocol implements Lifecycle { protected final Log log=LogFactory.getLog(this.getClass()); - protected List policies; @@ -311,8 +309,8 @@ public Object down(Event evt) { } /** - * A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it - * before passing it down. + * A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) + * with it, before passing it down. * @since 4.0 */ public Object down(Message msg) { @@ -320,6 +318,27 @@ public Object down(Message msg) { } + /** + * Passes a message down asynchronously. The sending is executed in the transport's thread pool. If the pool is full + * and the message is marked as {@link org.jgroups.Message.TransientFlag#DONT_BLOCK}, then it will be dropped, + * otherwise it will be sent on the caller's thread. + * @param msg The message to be sent + * @param async Whether to send the message asynchronously + * @return A CompletableFuture of the result (or exception) + */ + public CompletableFuture down(Message msg, boolean async) { + AsyncExecutor executor=getTransport().getAsyncExecutor(); + Supplier s=() -> down(msg); + try { + return !async? CompletableFuture.completedFuture(down_prot.down(msg)): + executor.execute(s, msg.isFlagSet(Message.TransientFlag.DONT_BLOCK)); + } + catch(Throwable t) { + return CompletableFuture.failedFuture(t); + } + } + + /** * An event was received from the protocol below. Usually the current protocol will want to examine the event type * and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating diff --git a/src/org/jgroups/stack/ProtocolStack.java b/src/org/jgroups/stack/ProtocolStack.java index 4eb83dfc44d..dede8a15404 100644 --- a/src/org/jgroups/stack/ProtocolStack.java +++ b/src/org/jgroups/stack/ProtocolStack.java @@ -18,6 +18,7 @@ import java.net.InetAddress; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -952,6 +953,10 @@ public Object down(Message msg) { return top_prot != null? top_prot.down(msg) : null; } + public CompletableFuture down(Message msg, boolean async) { + return top_prot != null? top_prot.down(msg, async) : null; + } + protected static void callAfterCreationHook(Protocol prot, String classname) throws Exception { diff --git a/src/org/jgroups/util/AsyncExecutor.java b/src/org/jgroups/util/AsyncExecutor.java new file mode 100644 index 00000000000..a46b6dd595b --- /dev/null +++ b/src/org/jgroups/util/AsyncExecutor.java @@ -0,0 +1,130 @@ +package org.jgroups.util; + +import org.jgroups.Lifecycle; +import org.jgroups.annotations.ManagedAttribute; +import org.jgroups.annotations.Property; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Supplier; + +import static org.jgroups.conf.AttributeType.SCALAR; + +/** + * Used to execute asynchronous tasks, e.g. async-send (https://issues.redhat.com/browse/JGRP-2603). Uses a blockng + * queue and a dequeuer thread, which passes removed tasks to the thread pool + * @author Bela Ban + * @since 5.3.5 + */ +public class AsyncExecutor implements Lifecycle { + + @Property(description="If not enabled, tasks will executed on the runner's thread") + protected boolean enabled=true; + + @ManagedAttribute(description="Total number of times a message was sent (includes rejected messages)",type=SCALAR) + protected final LongAdder num_sends=new LongAdder(); + + @ManagedAttribute(description="Number of rejected message due to an exhausted thread pool (includes dropped " + + "messages and messages sent on the caller's thread",type=SCALAR) + protected final LongAdder num_rejected=new LongAdder(); + + @ManagedAttribute(description="Number of dropped tasks (when DONT_BLOCK flag is set in the message)",type=SCALAR) + protected final LongAdder num_drops_on_full_thread_pool=new LongAdder(); + + @ManagedAttribute(description="Messages that were sent on the caller's thread due to an exhausted pool",type=SCALAR) + protected final LongAdder num_sends_on_callers_thread=new LongAdder(); + + protected ThreadPool thread_pool; + protected Executor executor; + + public boolean enabled() {return enabled;} + public AsyncExecutor enable(boolean b) {enabled=b; return this;} + public ThreadPool threadPool() {return thread_pool;} + public AsyncExecutor threadPool(ThreadPool p) {this.thread_pool=p; return this;} + public long numSends() {return num_sends.sum();} + public long numSendsOnCallersThread() {return num_sends_on_callers_thread.sum();} + public long numDropsOnFullThreadPool() {return num_drops_on_full_thread_pool.sum();} + public long numRejected() {return num_rejected.sum();} + + + public AsyncExecutor() { + } + + public AsyncExecutor(ThreadPool p) { + this.thread_pool=p; + } + + public void resetStats() { + num_sends.reset(); + num_rejected.reset(); + num_drops_on_full_thread_pool.reset(); + num_sends_on_callers_thread.reset(); + } + + public CompletableFuture execute(Supplier t, boolean can_be_dropped) { + Task task=new Task<>(t, new CompletableFuture<>()); + Executor exec=executor; + try { + num_sends.increment(); + if(enabled && (exec=exec()) != null) + return CompletableFuture.supplyAsync(t, exec); + return CompletableFuture.completedFuture(t.get()); + } + catch(RejectedExecutionException ex) { + num_rejected.increment(); + if(!can_be_dropped) { + task.run(); // if we cannot drop the task, run it on the caller thread + num_sends_on_callers_thread.increment(); + } + else { + task.completeExceptionally(ex); + num_drops_on_full_thread_pool.increment(); + } + } + return task.cf; + } + + @Override + public String toString() { + return String.format("rejected: %,d, drops=%,d, sends_on_caller: %,d, pool: %s\n", + num_rejected.sum(), num_drops_on_full_thread_pool.sum(), num_sends_on_callers_thread.sum(), + thread_pool.toString()); + } + + protected Executor exec() { + Executor exec=executor; + return exec != null? exec : (exec=executor=thread_pool.pool()); + } + + + protected static class Task implements Runnable { + protected final Supplier task; + protected final CompletableFuture cf; + + protected Task(Supplier task, CompletableFuture cf) { + this.task=task; + this.cf=cf; + } + + protected Task completeExceptionally(Throwable t) { + cf.completeExceptionally(t); + return this; + } + + @Override + public void run() { + try { + T result=task.get(); + cf.complete(result); + } + catch(Throwable t) { + cf.completeExceptionally(t); + } + } + } + + + +} diff --git a/src/org/jgroups/util/ThreadPool.java b/src/org/jgroups/util/ThreadPool.java index a7624e90b8f..5d0668a7c13 100644 --- a/src/org/jgroups/util/ThreadPool.java +++ b/src/org/jgroups/util/ThreadPool.java @@ -1,5 +1,6 @@ package org.jgroups.util; +import org.jgroups.Address; import org.jgroups.Global; import org.jgroups.Lifecycle; import org.jgroups.annotations.ManagedAttribute; @@ -7,15 +8,17 @@ import org.jgroups.annotations.Property; import org.jgroups.conf.AttributeType; import org.jgroups.logging.Log; -import org.jgroups.protocols.TP; +import org.jgroups.logging.LogFactory; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; + +import static org.jgroups.conf.AttributeType.SCALAR; /** * Thread pool based on {@link java.util.concurrent.ThreadPoolExecutor} @@ -24,7 +27,9 @@ */ public class ThreadPool implements Lifecycle { protected Executor thread_pool; - protected final TP tp; + protected Log log; + protected ThreadFactory thread_factory; + protected Address address; // Incremented when a message is rejected due to a full thread pool. When this value exceeds thread_dumps_threshold, // the threads will be dumped at FATAL level, and thread_dumps will be reset to 0 @@ -33,6 +38,9 @@ public class ThreadPool implements Lifecycle { @Property(description="Whether or not the thread pool is enabled. If false, tasks will be run on the caller's thread") protected boolean enabled=true; + @Property(description="If true, create virtual threads, otherwise create native threads") + protected boolean use_virtual_threads; + @Property(description="Minimum thread pool size for the thread pool") protected int min_threads; @@ -60,13 +68,14 @@ public class ThreadPool implements Lifecycle { @Property(description="Added to the view size when the pool is increased dynamically") protected int delta=10; + @ManagedAttribute(description="The number of messages dropped because the thread pool was full",type= SCALAR) + protected final LongAdder num_rejected_msgs=new LongAdder(); - - - public ThreadPool(TP tp) { - this.tp=Objects.requireNonNull(tp); + public ThreadPool() { } + public boolean isEnabled() {return enabled;} + public Executor getThreadPool() { return thread_pool; } @@ -79,17 +88,18 @@ public ThreadPool setThreadPool(Executor thread_pool) { } public ThreadPool setThreadFactory(ThreadFactory factory) { + this.thread_factory=factory; if(thread_pool instanceof ThreadPoolExecutor) ((ThreadPoolExecutor)thread_pool).setThreadFactory(factory); return this; } + public ThreadFactory getThreadFactory() {return thread_factory;} + public boolean isShutdown() { return thread_pool instanceof ExecutorService && ((ExecutorService)thread_pool).isShutdown(); } - public boolean isEnabled() {return enabled;} - public int getMinThreads() {return min_threads;} public ThreadPool setMinThreads(int size) { @@ -144,11 +154,16 @@ public ThreadPool setThreadDumpsThreshold(int t) { return this; } + public Address getAddress() {return address;} + public ThreadPool setAddress(Address a) {this.address=a; return this;} public boolean getIncreaseMaxSizeDynamically() {return increase_max_size_dynamically;} public ThreadPool setIncreaseMaxSizeDynamically(boolean b) {increase_max_size_dynamically=b; return this;} - - public int getDelta() {return delta;} - public ThreadPool setDelta(int d) {delta=d; return this;} + public int getDelta() {return delta;} + public ThreadPool setDelta(int d) {delta=d; return this;} + public long numberOfRejectedMessages() {return num_rejected_msgs.sum();} + public ThreadPool log(Log l) {log=l; return this;} + public boolean useVirtualThreads() {return use_virtual_threads;} + public ThreadPool useVirtualThreads(boolean b) {use_virtual_threads=b; return this;} @ManagedAttribute(description="Number of thread dumps") public int getNumberOfThreadDumps() {return thread_dumps.get();} @@ -178,12 +193,20 @@ public int getLargestSize() { return 0; } + public void resetStats() { + num_rejected_msgs.reset(); + } + @Override public void init() throws Exception { + if(log == null) + log=LogFactory.getLog(getClass()); if(enabled) { + if(thread_factory == null) + thread_factory=new DefaultThreadFactory("thread-pool", true, true); thread_pool=ThreadCreator.createThreadPool(min_threads, max_threads, keep_alive_time, - rejection_policy, new SynchronousQueue<>(), tp.getThreadFactory(), tp.useVirtualThreads(), tp.getLog()); + rejection_policy, new SynchronousQueue<>(), thread_factory, use_virtual_threads, log); } else // otherwise use the caller's thread to unmarshal the byte buffer into a message thread_pool=new DirectExecutor(); @@ -202,6 +225,11 @@ public void destroy() { } } + public void doExecute(Runnable task) { + thread_pool.execute(task); + } + + public Executor pool() {return thread_pool;} public boolean execute(Runnable task) { try { @@ -209,34 +237,33 @@ public boolean execute(Runnable task) { return true; } catch(RejectedExecutionException ex) { - tp.getMessageStats().incrNumRejectedMsgs(); + num_rejected_msgs.increment(); // https://issues.redhat.com/browse/JGRP-2403 if(thread_dumps.incrementAndGet() == thread_dumps_threshold) { String thread_dump=Util.dumpThreads(); - Log l=tp.getLog(); if(thread_dump_path != null) { File file=new File(thread_dump_path, "jgroups_threaddump_" + System.currentTimeMillis() + ".txt"); try(BufferedWriter writer=new BufferedWriter(new FileWriter(file))) { writer.write(thread_dump); - l.fatal("%s: thread pool is full (max=%d, active=%d); thread dump (dumped once, until thread_dump is reset): %s", - tp.getAddress(), max_threads, getThreadPoolSize(), file.getAbsolutePath()); + log.fatal("%s: thread pool is full (max=%d, active=%d); thread dump (dumped once, until thread_dump is reset): %s", + address, max_threads, getThreadPoolSize(), file.getAbsolutePath()); } catch(IOException e) { - l.warn("%s: cannot generate the thread dump to %s: %s", tp.getAddress(), file.getAbsolutePath(), e); - l.fatal("%s: thread pool is full (max=%d, active=%d); " + - "thread dump (dumped once, until thread_dump is reset):\n%s", - tp.getAddress(), max_threads, getThreadPoolSize(), thread_dump); + log.warn("%s: cannot generate the thread dump to %s: %s", address, file.getAbsolutePath(), e); + log.fatal("%s: thread pool is full (max=%d, active=%d); " + + "thread dump (dumped once, until thread_dump is reset):\n%s", + address, max_threads, getThreadPoolSize(), thread_dump); } } else - l.fatal("%s: thread pool is full (max=%d, active=%d); thread dump (dumped once, until thread_dump is reset):\n%s", - tp.getAddress(), max_threads, getThreadPoolSize(), thread_dump); + log.fatal("%s: thread pool is full (max=%d, active=%d); thread dump (dumped once, until thread_dump is reset):\n%s", + address, max_threads, getThreadPoolSize(), thread_dump); } return false; } catch(Throwable t) { - tp.getLog().error("failure submitting task to thread pool", t); - tp.getMessageStats().incrNumRejectedMsgs(); + log.error("failure submitting task to thread pool", t); + num_rejected_msgs.increment(); return false; } } @@ -255,5 +282,4 @@ protected static ExecutorService createThreadPool(int min_threads, int max_threa return pool; } - } diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java index 77667c61c00..b2a8fd86d86 100644 --- a/src/org/jgroups/util/Util.java +++ b/src/org/jgroups/util/Util.java @@ -350,6 +350,11 @@ public static void asyncWaitUntilTrue(long timeout, long interval, BooleanSu }); } + public static int assertPositive(int value, String message) { + if(value <= 0) throw new IllegalArgumentException(message); + return value; + } + public static boolean allChannelsHaveSameView(JChannel... channels) { View first=channels[0].getView(); for(JChannel ch : channels) { diff --git a/tests/junit-functional/org/jgroups/protocols/MFC_Test.java b/tests/junit-functional/org/jgroups/protocols/MFC_Test.java index 39c26c37d05..39f41d306d5 100644 --- a/tests/junit-functional/org/jgroups/protocols/MFC_Test.java +++ b/tests/junit-functional/org/jgroups/protocols/MFC_Test.java @@ -1,8 +1,12 @@ package org.jgroups.protocols; import org.jgroups.*; +import org.jgroups.conf.ClassConfigurator; import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.protocols.pbcast.NakAckHeader2; import org.jgroups.stack.ProtocolStack; +import org.jgroups.util.MyReceiver; import org.jgroups.util.Util; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -10,9 +14,13 @@ import org.testng.annotations.Test; import java.util.Arrays; +import java.util.List; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.jgroups.stack.ProtocolStack.Position.ABOVE; + /** * Tests {@link MFC} and {@link MFC_NB} * @author Bela Ban @@ -22,7 +30,7 @@ public class MFC_Test { protected static final int MAX_CREDITS=10000; protected static final int MSG_SIZE=1000; - protected JChannel a,b,c,d; + protected JChannel a,b,c,d; @BeforeMethod protected void setup() throws Exception { @@ -34,8 +42,7 @@ public class MFC_Test { } @AfterMethod protected void destroy() throws Exception { - Util.shutdown(c); - Util.closeReverse(a,b,d); + Util.closeReverse(a,b,c,d); } @DataProvider @@ -133,7 +140,74 @@ public void testBlockingAndStop(Class clazz) throws Exception { } } + /** + * Mimicks the following scenario: MFC is _below_ NAKACK2. Members have 2MB of credits. Every member sends 1000 1K + * messages and then blocks on flow control. + *
+ * The switch (DROP) drops messages 1-999, only message #1000 is received. This leads to xmit requests for + * messages 1-999. + *
+ * MFC drops retransmission messages (DONT_BLOCK flag set) as no credits are available. When the switch is back to + * normal (DROP is removed), the original senders should send a CREDIT_REQUEST to the receivers, which send a + * REPLENISH message to the senders. As a result, the senders have enough credits to send messages 1-999, which + * stops retransmission. + */ + public void testSwitchDiscardingMessages(Class clazz) throws Exception { + if(!clazz.equals(MFC.class)) + return; + final int NUM_MSGS=1000; + final short NAKACK_ID=ClassConfigurator.getProtocolId(NAKACK2.class); + final short FC_ID=ClassConfigurator.getProtocolId(clazz); + inject(clazz, NUM_MSGS * 1000, a,b,c,d); + MyReceiver ra=new MyReceiver().name("A"), rb=new MyReceiver().name("B"), + rc=new MyReceiver().name("C"), rd=new MyReceiver().name("D"); + a.setReceiver(ra); b.setReceiver(rb); c.setReceiver(rc); d.setReceiver(rd); + + Predicate drop_under_1000=msg -> { + NakAckHeader2 hdr=msg.getHeader(NAKACK_ID); + return hdr != null && hdr.getType() == NakAckHeader2.MSG && hdr.getSeqno() < NUM_MSGS-10; + }; + Predicate drop_credit_rsp=msg -> { + FcHeader hdr=msg.getHeader(FC_ID); + return hdr != null && hdr.type == FcHeader.REPLENISH; + }; + + for(JChannel ch: List.of(a, b, c, d)) { + ProtocolStack st=ch.getProtocolStack(); + st.insertProtocol(new DROP().addDownFilter(drop_under_1000).addDownFilter(drop_credit_rsp), ABOVE, TP.class); + } + new Thread(() -> { // removes DROP: this needs to be done async, as one of the sends below will block on 0 credits + Util.waitUntilTrue(3000, 500, () -> Stream.of(ra,rb,rc,rd).allMatch(r -> r.size() >= NUM_MSGS * 4)); + System.out.printf("-- receivers:\n%s\n", print(ra,rb,rc,rd)); + System.out.println("-- clearing DROP filters:"); + for(JChannel ch: List.of(a,b,c,d)) { + DROP drop=ch.stack().findProtocol(DROP.class); + drop.clearAllFilters(); + } + System.out.println("-- done: waiting for messages to be received by all members:"); + }).start(); + + byte[] payload=new byte[1000]; + System.out.printf("-- sending %d messages:\n", NUM_MSGS); + for(int i=1; i <= NUM_MSGS; i++) { + a.send(new ObjectMessage(null, payload)); + b.send(new ObjectMessage(null, payload)); + c.send(new ObjectMessage(null, payload)); + d.send(new ObjectMessage(null, payload)); + } + + Util.waitUntil(10000, 500, () -> Stream.of(ra,rb,rc,rd).allMatch(r -> r.size() >= NUM_MSGS * 4), + () -> print(ra,rb,rc,rd)); + System.out.printf("-- receivers:\n%s\n", print(ra,rb,rc,rd)); + } + + + @SafeVarargs + protected static String print(MyReceiver... receivers) { + return Stream.of(receivers).map(r -> String.format("%s: %d msgs", r.name(), r.size())) + .collect(Collectors.joining("\n")); + } protected static void inject(Class clazz, JChannel ... channels) throws Exception { for(JChannel ch: channels) { @@ -144,7 +218,23 @@ protected static void inject(Class clazz, JChannel ... channels) throws Exc mfc.init(); ProtocolStack stack=ch.getProtocolStack(); stack.removeProtocol(MFC.class); // just in case we already have an MFC protocol - stack.insertProtocol(mfc, ProtocolStack.Position.ABOVE, GMS.class); + stack.insertProtocol(mfc, ABOVE, GMS.class); + View v=ch.getView(); + mfc.handleViewChange(v.getMembers()); + } + } + + /** Inserts the flow control protocol *below* NAKACK2 */ + protected static void inject(Class clazz, long max_credits, JChannel ... channels) throws Exception { + for(JChannel ch: channels) { + MFC mfc=clazz.getConstructor().newInstance(); + mfc.setMaxCredits(max_credits).setMaxBlockTime(2000); + if(mfc instanceof MFC_NB) + ((MFC_NB)mfc).setMaxQueueSize((int)max_credits); + mfc.init(); + ProtocolStack stack=ch.getProtocolStack(); + stack.removeProtocol(MFC.class); // just in case we already have an MFC protocol + stack.insertProtocol(mfc, ProtocolStack.Position.BELOW, NAKACK2.class); View v=ch.getView(); mfc.handleViewChange(v.getMembers()); } diff --git a/tests/other/org/jgroups/tests/BufferSizeTest.java b/tests/other/org/jgroups/tests/BufferSizeTest.java new file mode 100644 index 00000000000..fad01cdba80 --- /dev/null +++ b/tests/other/org/jgroups/tests/BufferSizeTest.java @@ -0,0 +1,62 @@ +package org.jgroups.tests; + +import org.jgroups.util.Util; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.MulticastSocket; +import java.net.SocketException; + +/** + * Tests max datagram and multicast socket buffer sizes + * @author Bela Ban + * @since 5.3.5 + */ +public class BufferSizeTest { + + protected static void start(int size) throws IOException { + MulticastSocket mcast_sock=new MulticastSocket(); + setBufferSize(mcast_sock, size, true); + setBufferSize(mcast_sock, size, false); + + DatagramSocket sock=new DatagramSocket(); + setBufferSize(sock, size, true); + setBufferSize(sock, size, false); + } + + protected static void setBufferSize(DatagramSocket sock, int size, boolean recv_buf) throws SocketException { + int prev_size=recv_buf? sock.getReceiveBufferSize() : sock.getSendBufferSize(); + if(recv_buf) + sock.setReceiveBufferSize(size); + else + sock.setSendBufferSize(size); + int new_size=recv_buf? sock.getReceiveBufferSize() : sock.getSendBufferSize(); + if(new_size == size) { + System.out.printf("-- %s OK: set %s buffer from %s to %s\n", + sock.getClass().getSimpleName(), recv_buf? "recv" : "send", Util.printBytes(prev_size), + Util.printBytes(size)); + } + else { + System.err.printf("-- %s FAIL: set %s buffer from %s to %s (actual size: %s)\n", + sock.getClass().getSimpleName(), recv_buf? "recv" : "send", Util.printBytes(prev_size), + Util.printBytes(size), Util.printBytes(new_size)); + } + } + + public static void main(String[] args) throws IOException { + int size=50_000_000; + for(int i=0; i < args.length; i++) { + if("-size".equals(args[i])) { + size=Integer.parseInt(args[++i]); + continue; + } + help(); + return; + } + BufferSizeTest.start(size); + } + + protected static void help() { + System.out.printf("%s [-size ]\n", BufferSizeTest.class.getSimpleName()); + } +}