From 968663a49de1a42fd8ef039baff81ed848996a31 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Wed, 4 Oct 2023 13:03:18 +0200 Subject: [PATCH] - UNICAST3: removes only local non-members on view change, not remote (different site addresses) --- src/org/jgroups/protocols/UNICAST3.java | 36 ++++++++++++++++++++----- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/src/org/jgroups/protocols/UNICAST3.java b/src/org/jgroups/protocols/UNICAST3.java index ee561a4818..87a0cd1055 100644 --- a/src/org/jgroups/protocols/UNICAST3.java +++ b/src/org/jgroups/protocols/UNICAST3.java @@ -6,7 +6,9 @@ import org.jgroups.annotations.ManagedOperation; import org.jgroups.annotations.Property; import org.jgroups.conf.AttributeType; +import org.jgroups.protocols.relay.RELAY; import org.jgroups.stack.Protocol; +import org.jgroups.stack.ProtocolStack; import org.jgroups.util.*; import java.util.*; @@ -124,15 +126,19 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler
{ @ManagedAttribute(description="tracing is enabled or disabled for the given log",writable=true) protected boolean is_trace=log.isTraceEnabled(); + @ManagedAttribute(description="Whether or not a RELAY protocol was found below in the stack") + protected boolean relay_present; + /* --------------------------------------------- Fields ------------------------------------------------ */ - protected final ConcurrentMap send_table=Util.createConcurrentMap(); - protected final ConcurrentMap recv_table=Util.createConcurrentMap(); + protected final ConcurrentMap send_table=Util.createConcurrentMap(); + protected final ConcurrentMap recv_table=Util.createConcurrentMap(); protected final ReentrantLock recv_table_lock=new ReentrantLock(); - /** Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539) */ + /** Used by the retransmit task to keep the last retransmitted seqno per member (applicable only + * for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539 */ protected final Map xmit_task_map=new HashMap<>(); /** RetransmitTask running every xmit_interval ms */ @@ -171,6 +177,12 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler
{ protected static final BiConsumer BATCH_ACCUMULATOR=MessageBatch::add; + /** Used for testing only! */ + public Table getSendWindow(Address target) { + SenderEntry entry=send_table.get(target); + return entry != null? entry.msgs : null; + } + @ManagedAttribute(description="Returns the number of outgoing (send) connections") public int getNumSendConnections() { return send_table.size(); @@ -409,6 +421,7 @@ public void init() throws Exception { throw new IllegalStateException(e); } } + relay_present=ProtocolStack.findProtocol(this.down_prot, RELAY.class) != null; } @@ -617,7 +630,8 @@ public Object down(Event evt) { if(!non_members.isEmpty()) { log.trace("%s: closing connections of non members %s", local_addr, non_members); - non_members.forEach(this::closeConnection); + // remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729 + non_members.stream().filter(this::isLocal).forEach(this::closeConnection); } if(!new_members.isEmpty()) { for(Address mbr: new_members) { @@ -706,8 +720,18 @@ public Object down(Message msg) { protected boolean isLocalSiteMaster(Address dest) { // quick check to avoid the use of 'instanceof'; will be removed once https://bugs.openjdk.org/browse/JDK-8180450 // has been fixed (in Java 22, should be backported to older versions) - if(dest.isSiteMaster()) { - Object ret=down_prot.down(new Event(Event.IS_LOCAL, dest)); + if(relay_present && dest.isSiteMaster()) { + Object ret=down_prot.down(new Event(Event.IS_LOCAL_SITEMASTER, dest)); + return ret != null && (Boolean)ret; + } + return false; + } + + protected boolean isLocal(Address addr) { + // quick check to avoid the use of 'instanceof'; will be removed once https://bugs.openjdk.org/browse/JDK-8180450 + // has been fixed (in Java 22, should be backported to older versions) + if(relay_present && addr.isSiteAddress()) { + Object ret=down_prot.down(new Event(Event.IS_LOCAL, addr)); return ret != null && (Boolean)ret; } return false;