diff --git a/src/org/jgroups/protocols/UNICAST3.java b/src/org/jgroups/protocols/UNICAST3.java
index ee561a4818..87a0cd1055 100644
--- a/src/org/jgroups/protocols/UNICAST3.java
+++ b/src/org/jgroups/protocols/UNICAST3.java
@@ -6,7 +6,9 @@
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
+import org.jgroups.protocols.relay.RELAY;
import org.jgroups.stack.Protocol;
+import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.*;
import java.util.*;
@@ -124,15 +126,19 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler
{
@ManagedAttribute(description="tracing is enabled or disabled for the given log",writable=true)
protected boolean is_trace=log.isTraceEnabled();
+ @ManagedAttribute(description="Whether or not a RELAY protocol was found below in the stack")
+ protected boolean relay_present;
+
/* --------------------------------------------- Fields ------------------------------------------------ */
- protected final ConcurrentMap send_table=Util.createConcurrentMap();
- protected final ConcurrentMap recv_table=Util.createConcurrentMap();
+ protected final ConcurrentMap send_table=Util.createConcurrentMap();
+ protected final ConcurrentMap recv_table=Util.createConcurrentMap();
protected final ReentrantLock recv_table_lock=new ReentrantLock();
- /** Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539) */
+ /** Used by the retransmit task to keep the last retransmitted seqno per member (applicable only
+ * for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539 */
protected final Map xmit_task_map=new HashMap<>();
/** RetransmitTask running every xmit_interval ms */
@@ -171,6 +177,12 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler {
protected static final BiConsumer BATCH_ACCUMULATOR=MessageBatch::add;
+ /** Used for testing only! */
+ public Table getSendWindow(Address target) {
+ SenderEntry entry=send_table.get(target);
+ return entry != null? entry.msgs : null;
+ }
+
@ManagedAttribute(description="Returns the number of outgoing (send) connections")
public int getNumSendConnections() {
return send_table.size();
@@ -409,6 +421,7 @@ public void init() throws Exception {
throw new IllegalStateException(e);
}
}
+ relay_present=ProtocolStack.findProtocol(this.down_prot, RELAY.class) != null;
}
@@ -617,7 +630,8 @@ public Object down(Event evt) {
if(!non_members.isEmpty()) {
log.trace("%s: closing connections of non members %s", local_addr, non_members);
- non_members.forEach(this::closeConnection);
+ // remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729
+ non_members.stream().filter(this::isLocal).forEach(this::closeConnection);
}
if(!new_members.isEmpty()) {
for(Address mbr: new_members) {
@@ -706,8 +720,18 @@ public Object down(Message msg) {
protected boolean isLocalSiteMaster(Address dest) {
// quick check to avoid the use of 'instanceof'; will be removed once https://bugs.openjdk.org/browse/JDK-8180450
// has been fixed (in Java 22, should be backported to older versions)
- if(dest.isSiteMaster()) {
- Object ret=down_prot.down(new Event(Event.IS_LOCAL, dest));
+ if(relay_present && dest.isSiteMaster()) {
+ Object ret=down_prot.down(new Event(Event.IS_LOCAL_SITEMASTER, dest));
+ return ret != null && (Boolean)ret;
+ }
+ return false;
+ }
+
+ protected boolean isLocal(Address addr) {
+ // quick check to avoid the use of 'instanceof'; will be removed once https://bugs.openjdk.org/browse/JDK-8180450
+ // has been fixed (in Java 22, should be backported to older versions)
+ if(relay_present && addr.isSiteAddress()) {
+ Object ret=down_prot.down(new Event(Event.IS_LOCAL, addr));
return ret != null && (Boolean)ret;
}
return false;