Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Added async down() method to Protocol, JChannel and ProtocolStack (https://issues.redhat.com/browse/JGRP-2603)
- Added async_executor to TP
- Added BufferSizeTest
- RED now handles messages with DONT_BLOCK set
- Added async down() method (https://issues.redhat.com/browse/JGRP-2603)
- MFC doesn't block sender when credits are exhausted, but DONT_BLOCK flag is set: the message is discarded
  • Loading branch information
belaban committed Apr 16, 2024
1 parent ab03668 commit 523a1dd
Show file tree
Hide file tree
Showing 17 changed files with 465 additions and 112 deletions.
4 changes: 2 additions & 2 deletions bin/jgroups.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ if [ -f $HOME/logging.properties ]; then
fi;

#JG_FLAGS="-Djgroups.bind_addr=match-address:192.168.1.*"
FLAGS="-server -Xmx1G -Xms500M"
FLAGS="-server -Xmx1G -Xms500M -XX:+HeapDumpOnOutOfMemoryError"

#FLAGS="$FLAGS -Duser.language=de"

Expand All @@ -49,5 +49,5 @@ FLAGS="-server -Xmx1G -Xms500M"

# SSL_FLAGS="-Djavax.net.debug=ssl:handshake"

java -cp $CP $SSL_FLAGS $DEBUG $LOG $JG_FLAGS $FLAGS $JMX $JMC $*
java -cp $CP $SSL_FLAGS $DEBUG $LOG $JG_FLAGS $FLAGS $JMX $JMC $*

17 changes: 16 additions & 1 deletion src/org/jgroups/JChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import org.jgroups.util.UUID;
import org.jgroups.util.*;

import java.io.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;

Expand Down Expand Up @@ -612,6 +616,17 @@ public Object down(Message msg) {
return msg != null? prot_stack.down(msg) : null;
}

/**
* Sends a message down asynchronously. The sending is executed in the transport's thread pool. If the pool is full
* and the message is marked as {@link org.jgroups.Message.TransientFlag#DONT_BLOCK}, then it will be dropped,
* otherwise it will be sent on the caller's thread.
* @param msg The message to be sent
* @param async Whether to send the message asynchronously
* @return A CompletableFuture of the result (or exception)
*/
public CompletableFuture<Object> down(Message msg, boolean async) {
return msg != null? prot_stack.down(msg, async) : null;
}

/**
* Callback method <BR>
Expand Down
9 changes: 8 additions & 1 deletion src/org/jgroups/blocks/MessageDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public MessageDispatcher(JChannel channel) {
channel.addChannelListener(this);
local_addr=channel.getAddress();
installUpHandler(prot_adapter, true);
Protocol top_prot=channel.stack() != null? channel.stack().getTopProtocol() : null;
if(top_prot != null)
prot_adapter.setDownProt(top_prot);
}
start();
}
Expand Down Expand Up @@ -453,7 +456,7 @@ public void channelClosed(JChannel channel) {
}


class ProtocolAdapter extends Protocol implements UpHandler {
protected class ProtocolAdapter extends Protocol implements UpHandler {


/* ------------------------- Protocol Interface --------------------------- */
Expand All @@ -463,6 +466,10 @@ public String getName() {
return "MessageDispatcher";
}

public <T extends Protocol> T setDownProt(Protocol d) {
down_prot=d;
return (T)this;
}

public <T extends Protocol> T setAddress(Address addr) {
local_addr=addr;
Expand Down
69 changes: 28 additions & 41 deletions src/org/jgroups/protocols/FlowControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,21 @@ public abstract class FlowControl extends Protocol {
@Property(description="Computed as max_credits x min_theshold unless explicitly set",type=AttributeType.BYTES)
protected long min_credits;

@ManagedAttribute(description="Number of credit requests received",type=AttributeType.SCALAR)
protected long num_credit_requests_received;



/* --------------------------------------------- JMX -------------------------------------------------- */
protected int num_credit_requests_received, num_credit_requests_sent;
protected int num_credit_responses_sent, num_credit_responses_received;
@ManagedAttribute(description="Number of credit requests sent",type=AttributeType.SCALAR)
protected long num_credit_requests_sent;

@ManagedAttribute(description="Number of credit responses received",type=AttributeType.SCALAR)
protected long num_credit_responses_received;

@ManagedAttribute(description="Number of credit responses sent",type=AttributeType.SCALAR)
protected long num_credit_responses_sent;

@ManagedAttribute(description="Number of messages dropped due to DONT_BLOCK flag",type=AttributeType.SCALAR)
protected long num_msgs_dropped;

/* --------------------------------------------- Fields ------------------------------------------------------ */


Expand All @@ -83,51 +90,32 @@ public abstract class FlowControl extends Protocol {

/** Whether FlowControl is still running, this is set to false when the protocol terminates (on stop()) */
protected volatile boolean running=true;

protected int frag_size; // remember frag_size from the fragmentation protocol





public void resetStats() {
super.resetStats();
num_credit_responses_sent=num_credit_responses_received=num_credit_requests_received=num_credit_requests_sent=0;
num_msgs_dropped=0;
}

public long getMaxCredits() {return max_credits;}
public <T extends FlowControl> T setMaxCredits(long m) {max_credits=m; return (T)this;}
public double getMinThreshold() {return min_threshold;}
public <T extends FlowControl> T setMinThreshold(double m) {min_threshold=m; return (T)this;}
public long getMinCredits() {return min_credits;}
public <T extends FlowControl> T setMinCredits(long m) {min_credits=m; return (T)this;}
public long getMaxBlockTime() {return max_block_time;}
public <T extends FlowControl> T setMaxBlockTime(long t) {max_block_time=t; return (T)this;}


public abstract int getNumberOfBlockings();

public abstract double getAverageTimeBlocked();

@ManagedAttribute(description="Number of credit requests received",type=AttributeType.SCALAR)
public int getNumberOfCreditRequestsReceived() {
return num_credit_requests_received;
}

@ManagedAttribute(description="Number of credit requests sent",type=AttributeType.SCALAR)
public int getNumberOfCreditRequestsSent() {
return num_credit_requests_sent;
}

@ManagedAttribute(description="Number of credit responses received",type=AttributeType.SCALAR)
public int getNumberOfCreditResponsesReceived() {
return num_credit_responses_received;
}
public abstract int getNumberOfBlockings();
public abstract double getAverageTimeBlocked();
public long getMaxCredits() {return max_credits;}
public <T extends FlowControl> T setMaxCredits(long m) {max_credits=m; return (T)this;}
public double getMinThreshold() {return min_threshold;}
public <T extends FlowControl> T setMinThreshold(double m) {min_threshold=m; return (T)this;}
public long getMinCredits() {return min_credits;}
public <T extends FlowControl> T setMinCredits(long m) {min_credits=m; return (T)this;}
public long getMaxBlockTime() {return max_block_time;}
public <T extends FlowControl> T setMaxBlockTime(long t) {max_block_time=t; return (T)this;}
public long getNumberOfCreditRequestsReceived() {return num_credit_requests_received;}
public long getNumberOfCreditRequestsSent() {return num_credit_requests_sent;}
public long getNumberOfCreditResponsesReceived() {return num_credit_responses_received;}
public long getNumberOfCreditResponsesSent() {return num_credit_responses_sent;}

@ManagedAttribute(description="Number of credit responses sent",type=AttributeType.SCALAR)
public int getNumberOfCreditResponsesSent() {
return num_credit_responses_sent;
}

public abstract String printSenderCredits();

Expand Down Expand Up @@ -164,8 +152,7 @@ public void unblock() {
;
}

public void init() throws Exception {
boolean min_credits_set = min_credits != 0;
public void init() throws Exception { boolean min_credits_set = min_credits != 0;
if(!min_credits_set)
min_credits=(long)(max_credits * min_threshold);
}
Expand Down
10 changes: 7 additions & 3 deletions src/org/jgroups/protocols/MFC.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,22 @@ protected Object handleDownMessage(final Message msg, int length) {
if(dest != null) // 2nd line of defense, not really needed
return down_prot.down(msg);

boolean dont_block=msg.isFlagSet(Message.TransientFlag.DONT_BLOCK);
while(running) {
boolean rc=credits.decrement(msg, length, max_block_time);
long timeout=dont_block? 0 : max_block_time; // timeout == 0 won't block the decrement() below
boolean rc=credits.decrement(msg, length, timeout);
if(rc || !running)
break;

if(needToSendCreditRequest()) {
List<Tuple<Address,Long>> targets=credits.getMembersWithCreditsLessThan(min_credits);
for(Tuple<Address,Long> tuple: targets)
sendCreditRequest(tuple.getVal1(), Math.min(max_credits, max_credits - tuple.getVal2()));
}
if(dont_block) {
num_msgs_dropped++;
return null;
}
}

// send message - either after regular processing, or after blocking (when enough credits are available again)
return down_prot.down(msg);
}
Expand Down
7 changes: 0 additions & 7 deletions src/org/jgroups/protocols/MsgStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ public class MsgStats {
/** The average number of messages in a received {@link MessageBatch} */
protected final AverageMinMax avg_batch_size=new AverageMinMax();

@ManagedAttribute(description="Number of messages rejected by the thread pool (because it was full)",type=SCALAR)
protected final LongAdder num_rejected_msgs=new LongAdder();

@ManagedAttribute(description="Number of multicast bytes sent",type=BYTES)
protected final LongAdder num_mcast_bytes_sent=new LongAdder();
@ManagedAttribute(description="Number of multicast bytes received",type=BYTES)
Expand Down Expand Up @@ -105,9 +102,6 @@ public class MsgStats {

public long getNumMcastBytesReceived() {return num_mcast_bytes_received.sum();}

public long getNumRejectedMsgs() {return num_rejected_msgs.sum();}
public MsgStats incrNumRejectedMsgs() {num_rejected_msgs.increment(); return this;}


public MsgStats sent(Address dest, int length) {
if(!enabled)
Expand Down Expand Up @@ -154,7 +148,6 @@ public MsgStats reset() {
num_mcasts_sent, num_mcasts_received,
num_single_msgs_sent, num_single_msgs_received,
num_batches_sent, num_batches_received,
num_rejected_msgs,
num_mcast_bytes_sent, num_mcast_bytes_received,
num_ucast_bytes_sent, num_ucast_bytes_received)
.forEach(LongAdder::reset);
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/protocols/RED.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public void resetStats() {
}

public Object down(Message msg) {
if(msg.isFlagSet(Message.TransientFlag.DONT_BLOCK))
return down_prot.down(msg);
if(enabled && bundler != null) {
int current_queue_size=bundler.getQueueSize();
double avg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public SimplifiedTransferQueueBundler() {
}

public SimplifiedTransferQueueBundler(int capacity) {
super(new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity)));
super(new ArrayBlockingQueue<>(Util.assertPositive(capacity, "bundler capacity cannot be " + capacity)));
}

public int size() {
Expand Down
28 changes: 18 additions & 10 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,10 @@ public <T extends TP> T enableBlockingTimerTasks(boolean flag) {

/** The thread pool which handles unmarshalling, version checks and dispatching of messages */
@Component(name="thread_pool")
protected ThreadPool thread_pool=new ThreadPool(this);
protected ThreadPool thread_pool=new ThreadPool().log(this.log);

@Component(name="async_executor")
protected AsyncExecutor<Object> async_executor=new AsyncExecutor<>(thread_pool);

/** Factory which is used by the thread pool */
protected ThreadFactory thread_factory;
Expand Down Expand Up @@ -479,6 +482,9 @@ public void resetStats() {
msg_processing_policy.reset();
if(local_transport != null)
local_transport.resetStats();
if(thread_pool != null)
thread_pool.resetStats();
async_executor.resetStats();
}

public <T extends TP> T registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
Expand Down Expand Up @@ -556,17 +562,20 @@ public <T extends TP> T setThreadPool(Executor thread_pool) {
return (T)this;
}

public ThreadFactory getThreadPoolThreadFactory() {
public ThreadFactory getThreadFactory() {
return thread_factory;
}

public <T extends TP> T setThreadPoolThreadFactory(ThreadFactory factory) {
public <T extends TP> T setThreadFactory(ThreadFactory factory) {
thread_factory=factory;
if(thread_pool != null)
thread_pool.setThreadFactory(factory);
return (T)this;
}

public AsyncExecutor<Object> getAsyncExecutor() {return async_executor;}
public <T extends TP> T setAsyncExecutor(AsyncExecutor<Object> e) {async_executor=e; return (T)this;}

public TimeScheduler getTimer() {return timer;}

/**
Expand All @@ -588,11 +597,6 @@ public <T extends TP> T setTimeService(TimeService ts) {
return (T)this;
}

public ThreadFactory getThreadFactory() {
return thread_factory;
}

public <T extends TP> T setThreadFactory(ThreadFactory factory) {thread_factory=factory; return (T)this;}

public SocketFactory getSocketFactory() {
return socket_factory;
Expand Down Expand Up @@ -694,6 +698,7 @@ public void init() throws Exception {
log.debug("use_virtual_threads was set to false, as virtual threads are not available in this Java version");
use_virtual_threads=false;
}
thread_pool.useVirtualThreads(this.use_virtual_threads);

if(local_transport_class != null) {
Class<?> cl=Util.loadClass(local_transport_class, getClass());
Expand All @@ -702,8 +707,8 @@ public void init() throws Exception {
}

if(thread_factory == null)
thread_factory=new LazyThreadFactory("jgroups", false, true)
.useVirtualThreads(use_virtual_threads).log(this.log);
setThreadFactory(new LazyThreadFactory("jgroups", false, true)
.useVirtualThreads(use_virtual_threads).log(this.log));

// local_addr is null when shared transport, channel_name is not used
setInAllThreadFactories(cluster_name != null? cluster_name.toString() : null, local_addr, thread_naming_pattern);
Expand Down Expand Up @@ -767,6 +772,8 @@ public String toString() {
/** Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads */
public void start() throws Exception {
timer.start();
thread_pool.setAddress(local_addr);
async_executor.start();
if(time_service != null)
time_service.start();
fetchLocalAddresses();
Expand All @@ -785,6 +792,7 @@ public void stop() {
if(time_service != null)
time_service.stop();
timer.stop();
async_executor.stop();
}

public void destroy() {
Expand Down
18 changes: 9 additions & 9 deletions src/org/jgroups/protocols/TransferQueueBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.Util;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -48,16 +49,19 @@ protected TransferQueueBundler(BlockingQueue<Message> queue) {
}

public TransferQueueBundler(int capacity) {
this(new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity)));
this(new ArrayBlockingQueue<>(Util.assertPositive(capacity, "bundler capacity cannot be " + capacity)));
}

public Thread getThread() {return bundler_thread;}
public Thread getThread() {return bundler_thread;}

@ManagedAttribute(description="Size of the queue")
public int getQueueSize() {return queue.size();}
public int getQueueSize() {return queue.size();}

@ManagedAttribute(description="Size of the remove-queue")
public int removeQueueSize() {return remove_queue.size();}
public int removeQueueSize() {return remove_queue.size();}

public boolean dropWhenFull() {return drop_when_full;}
public <T extends Bundler> T dropWhenFull(boolean d) {this.drop_when_full=d; return (T)this;}


@Override
Expand All @@ -75,7 +79,7 @@ public synchronized void start() {
if(running)
stop();
// todo: replace with LinkedBlockingQueue and measure impact (if any) on perf
queue=new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity));
queue=new ArrayBlockingQueue<>(Util.assertPositive(capacity, "bundler capacity cannot be " + capacity));
bundler_thread=transport.getThreadFactory().newThread(this, THREAD_NAME);
running=true;
bundler_thread.start();
Expand Down Expand Up @@ -161,8 +165,4 @@ protected void drain() {
}


protected static int assertPositive(int value, String message) {
if(value <= 0) throw new IllegalArgumentException(message);
return value;
}
}
Loading

0 comments on commit 523a1dd

Please sign in to comment.