Skip to content

Commit

Permalink
- Added handling of MBR_UNREACHABLE plus unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 12, 2023
1 parent ea8ca24 commit 0054a75
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 124 deletions.
71 changes: 48 additions & 23 deletions src/org/jgroups/demos/RelayDemo.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package org.jgroups.demos;

import org.jgroups.*;
import org.jgroups.protocols.relay.RELAY;
import org.jgroups.protocols.relay.RELAY3;
import org.jgroups.protocols.relay.RouteStatusListener;
import org.jgroups.protocols.relay.*;
import org.jgroups.protocols.relay.Topology;
import org.jgroups.util.ExtendedUUID;
import org.jgroups.util.UUID;
import org.jgroups.util.Util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/** Demos RELAY. Create 2 *separate* clusters with RELAY as top protocol. Each RELAY has bridge_props="tcp.xml" (tcp.xml
* needs to be present). Then start 2 instances in the first cluster and 2 instances in the second cluster. They should
* find each other, and typing in a window should send the text to everyone, plus we should get 4 responses.
* @author Bela Ban
*/
public class RelayDemo implements Receiver {
protected static final String SITE_MASTERS="site-masters";
protected static final String SITE_MASTERS="site-masters", SENDTO="sendto";
protected JChannel ch;
protected RELAY relay;

Expand Down Expand Up @@ -84,24 +85,8 @@ protected void start(String props, String name, boolean print_route_status,
if(relay == null)
throw new IllegalStateException(String.format("Protocol %s not found", RELAY.class.getSimpleName()));
String site_name=relay.site();
if(print_route_status) {
relay.setRouteStatusListener(new RouteStatusListener() {
public void sitesUp(String... sites) {
System.out.printf("-- %s: site(s) %s came up\n",
ch.getAddress(), String.join(", ", sites));
}

public void sitesDown(String... sites) {
System.out.printf("-- %s: site(s) %s went down\n",
ch.getAddress(), String.join(", ", sites));
}

public void sitesUnreachable(String... sites) {
System.out.printf("-- %s: site(s) %s are unreachable\n",
ch.getAddress(), String.join(", ", sites));
}
});
}
if(print_route_status)
relay.setRouteStatusListener(new DefaultRouteStatusListener(() -> relay.addr()).verbose(true));
if(use_view_handler)
relay.topo().setViewHandler((s, v) -> {
if(!s.equals(relay.getSite()))
Expand Down Expand Up @@ -162,9 +147,48 @@ protected boolean process(String line) {
System.out.printf("%s\n", relay.printRoutes());
return true;
}
if(line.startsWith(SENDTO)) {
sendTo(line);
return true;
}
return false;
}

// sendto dest msg times sleep
protected void sendTo(String line) {
String[] list=line.split("\\s"); // whitespace, ie. " "
long sleep=Long.parseLong(list[list.length-1]);
int times=Integer.parseInt(list[list.length-2]);
String dest=list[1];
List<String> tmp=new ArrayList<>(Arrays.asList(list).subList(2, list.length - 2));
String msg=String.join(" ", tmp);
String[] s=dest.split(":");
String member_name=s[0], site_name=s[1];
Map<String,View> cache=relay.topo().cache();
View v=cache.get(site_name);
if(v == null)
throw new IllegalArgumentException(String.format("site %s not found in cache", site_name));
Address target=null;
for(Address addr: v) {
SiteUUID sa=(SiteUUID)addr;
if(member_name.equals(sa.getName())) {
target=sa;
break;
}
}
if(target == null)
throw new IllegalArgumentException(String.format("member %s not found in cache", member_name));
try {
for(int i=1; i <= times; i++) {
ch.send(target, String.format("%s [#%d]", msg, i));
Util.sleep(sleep);
}
}
catch(Exception ex) {
System.err.printf("failed sending msg to %s: %s\n", target, ex);
}
}

protected String printTopo(String line, String command, boolean refresh) {
String sub=line.substring(command.length()).trim();
String site=null;
Expand All @@ -188,7 +212,8 @@ protected static void help() {
"\nsites: prints the configured sites" +
"\ntopo: prints the topology (site masters and local members of all sites)" +
"\npt: prints the cache (no refresh)" +
"\nroutes: prints all routes (if site master)\n");
"\nroutes: prints all routes (if site master)" +
"\nsendto <dest> msg <number of times> <sleep (ms)>\n");
}


Expand Down
5 changes: 5 additions & 0 deletions src/org/jgroups/protocols/relay/RELAY.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,4 +434,9 @@ protected void triggerSiteUnreachableEvent(SiteAddress remoteSite) {
route_status_listener.sitesUnreachable(remoteSite.getSite());
}

protected void triggerMemberUnreachableEvent(Address mbr) {
if(route_status_listener != null)
route_status_listener.memberUnreachable(mbr);
}

}
3 changes: 2 additions & 1 deletion src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
Route route=tmp.getRoute(target_site, sender);
if(route == null) {
if(suppress_log_no_route != null)
suppress_log_no_route.log(SuppressLog.Level.error, target_site, suppress_time_no_route_errors, sender, target_site);
suppress_log_no_route.log(SuppressLog.Level.error, target_site, suppress_time_no_route_errors,
local_addr, sender, dest);
else
log.error(Util.getMessage("RelayNoRouteToSite"), local_addr, target_site);
sendSiteUnreachableTo(msg.getSrc(), target_site);
Expand Down
83 changes: 59 additions & 24 deletions src/org/jgroups/protocols/relay/RELAY3.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,42 +79,56 @@ public Object up(Message msg) {
Message copy=msg;
RelayHeader hdr=msg.getHeader(id);
if(hdr != null) {
if(hdr.getType() == SITE_UNREACHABLE) {
triggerSiteUnreachableEvent((SiteAddress)hdr.final_dest);
return null;
switch(hdr.type) {
case SITE_UNREACHABLE:
Set<String> tmp_sites=hdr.getSites();
if(tmp_sites != null) {
for(String s: tmp_sites)
triggerSiteUnreachableEvent(new SiteMaster(s));
}
return null;
case MBR_UNREACHABLE:
Address unreachable_mbr=msg.getObject();
triggerMemberUnreachableEvent(unreachable_mbr);
return null;
}

//todo: check if copy is needed!
copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr);
}
return process(false, copy);
}

public void up(MessageBatch batch) {
List<SiteAddress> unreachable_sites=null;
Set<String> unreachable_sites=null;
for(Iterator<Message> it=batch.iterator(); it.hasNext();) {
Message msg=it.next(), copy=msg;
// if(msg.isFlagSet(Flag.NO_RELAY))
// continue;
RelayHeader hdr=msg.getHeader(id);
it.remove();
if(hdr != null) {
if(hdr.getType() == SITE_UNREACHABLE) {
SiteAddress site_addr=(SiteAddress)hdr.final_dest;
String site_name=site_addr.getSite();
if(unreachable_sites == null)
unreachable_sites=new ArrayList<>();
boolean contains=unreachable_sites.stream().anyMatch(sa -> sa.getSite().equals(site_name));
if(!contains)
unreachable_sites.add(site_addr);
continue;
switch(hdr.type) {
case SITE_UNREACHABLE:
Set<String> tmp_sites=hdr.getSites();
if(tmp_sites != null) {
if(unreachable_sites == null)
unreachable_sites=new HashSet<>();
unreachable_sites.addAll(tmp_sites);
}
continue;
case MBR_UNREACHABLE:
Address unreachable_mbr=msg.getObject();
triggerMemberUnreachableEvent(unreachable_mbr);
continue;
}
copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr);
}
process(false, copy);
}
if(unreachable_sites != null) {
for(SiteAddress sa: unreachable_sites)
triggerSiteUnreachableEvent(sa); // https://issues.redhat.com/browse/JGRP-2586
for(String s: unreachable_sites)
triggerSiteUnreachableEvent(new SiteMaster(s)); // https://issues.redhat.com/browse/JGRP-2586
}
if(!batch.isEmpty())
up_prot.up(batch);
Expand Down Expand Up @@ -410,7 +424,8 @@ protected Object route(Message msg, Collection<String> sites) {
route=r.getForwardingRouteMatching(s, sender);
}
if(route == null) {
suppress_log_no_route.log(SuppressLog.Level.error, s, suppress_time_no_route_errors, sender, s);
suppress_log_no_route.log(SuppressLog.Level.error, s, suppress_time_no_route_errors,
local_addr, sender, dest);
sendSiteUnreachableTo(msg.getSrc(), s);
}
else
Expand All @@ -433,6 +448,10 @@ protected Object deliver(Address next_dest, Message msg, boolean dont_relay) {
protected Object deliver(Address next_dest, Message msg, boolean dont_relay, boolean dont_loopback) {
checkLocalAddress(next_dest);
Address final_dest=msg.dest(), original_sender=msg.src();
if(final_dest != null && view != null && !view.containsMember(final_dest)) {
sendMemberUnreachableTo(original_sender, final_dest);
return null;
}
RelayHeader tmp=msg.getHeader(this.id);
// todo: check if copy is needed here
RelayHeader hdr=tmp != null? tmp.copy().setOriginalSender(original_sender).setFinalDestination(final_dest)
Expand Down Expand Up @@ -493,24 +512,40 @@ protected boolean sameSite(SiteAddress addr) {
}

/**
* Sends a SITE-UNREACHABLE message to the sender of the message. Because the sender is always local (we're the
* relayer), no routing needs to be done
* @param src The node who is trying to send a message to the {@code target_site}
* Sends a SITE-UNREACHABLE message to the sender of the message.
* @param dest The node who is trying to send a message to the {@code target_site}
* @param target_site The remote site's name.
*/
protected void sendSiteUnreachableTo(Address src, String target_site) {
if (src == null || src.equals(local_addr)) {
protected void sendSiteUnreachableTo(Address dest, String target_site) {
if (dest == null || dest.equals(local_addr)) {
//short circuit
// if src == null, it means the message comes from the top protocol (i.e. the local node)
// if dest == null, it means the message comes from the top protocol (i.e. the local node)
triggerSiteUnreachableEvent(new SiteMaster(target_site));
return;
}
// send message back to the src node.
Message msg=new EmptyMessage(src).setFlag(Flag.OOB)
// send message back to the dest node.
Message msg=new EmptyMessage(dest).setFlag(Flag.OOB)
.putHeader(id, new RelayHeader(SITE_UNREACHABLE).addToSites(target_site));
down(msg);
}

/**
* Sends a MBR-UNREACHABLE message to the sender of a unicast message
* @param dest The node who original sent the unicast message. Must never be null
*/
protected void sendMemberUnreachableTo(Address dest, Address member) {
if (dest.equals(local_addr)) {
//short circuit
// if dest == null, it means the message comes from the top protocol (i.e. the local node)
triggerMemberUnreachableEvent(member);
return;
}
// send message back to the original sender
Message msg=new ObjectMessage(dest, member).setFlag(Flag.OOB)
.putHeader(id, new RelayHeader(MBR_UNREACHABLE));
down(msg);
}


protected CompletableFuture<Relayer> startRelayer(Relayer3 rel, String bridge_name) {
try {
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/protocols/relay/RelayHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class RelayHeader extends Header {
public static final byte DATA = 1;
public static final byte SITE_UNREACHABLE = 2; // final_dest is a SiteMaster
public static final byte MBR_UNREACHABLE = 3; // sent back when a given member is not part of the local cluster
public static final byte SITES_UP = 4;
public static final byte SITES_DOWN = 5;
public static final byte TOPO_REQ = 6;
Expand Down Expand Up @@ -184,6 +185,7 @@ protected static String typeToString(byte type) {
switch(type) {
case DATA: return "DATA";
case SITE_UNREACHABLE: return "SITE_UNREACHABLE";
case MBR_UNREACHABLE: return "MBR_UNREACHABLE";
case SITES_UP: return "SITES_UP";
case SITES_DOWN: return "SITES_DOWN";
case TOPO_REQ: return "TOPO_REQ";
Expand Down
37 changes: 16 additions & 21 deletions tests/junit-functional/org/jgroups/tests/RelayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class RelayTest extends RelayTests {
@DataProvider
protected Object[][] relayProvider() {
return new Object[][] {
// {RELAY2.class},
{RELAY2.class},
{RELAY3.class}
};
}
Expand Down Expand Up @@ -355,14 +355,14 @@ public void testSitesUp(Class<? extends RELAY> cl) throws Exception {

Stream.of(d,e,f,_g,_h,_i)
.map(ch -> (RELAY)ch.getProtocolStack().findProtocol(RELAY.class))
.forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(false)));
.forEach(r -> r.setRouteStatusListener(new DefaultRouteStatusListener(() -> r.getAddress()).verbose(false)));

// now stop A; B will become new site master and we should get a site-down(NYC), then site-up(NYC)
Util.close(a);
Util.waitUntil(5000, 500, () -> Stream.of(d,e,f,_g,_h,_i)
.map(ch -> (RELAY)ch.getProtocolStack().findProtocol(RELAY.class))
.peek(r -> System.out.printf("%s: %s\n", r.getAddress(), r.getRouteStatusListener()))
.map(r -> (MyRouteStatusListener)r.getRouteStatusListener())
.map(r -> (DefaultRouteStatusListener)r.getRouteStatusListener())
.allMatch(l -> l.down().contains(LON) && l.up().contains(LON)));
}
}
Expand Down Expand Up @@ -768,7 +768,12 @@ public void testFailover(Class<? extends RELAY> cl) throws Exception {
&& m.dest() != null && m.dest().isSiteMaster());
c.getProtocolStack().insertProtocol(drop, ProtocolStack.Position.BELOW, UNICAST3.class);

a.send(new SiteMaster("nyc"), "hello");
UNICAST3 unicast=a.getProtocolStack().findProtocol(UNICAST3.class);
RELAY relay=a.getProtocolStack().findProtocol(RELAY.class);
relay.delaySitesDown(true);
relay.setRouteStatusListener(new DefaultRouteStatusListener(() -> a.address()).verbose(true));
Address target=new SiteMaster(NYC);
a.send(target, "hello");

// now kill C; D will take over as site master
System.out.println("-- closing site master of nyc (C):");
Expand All @@ -777,6 +782,11 @@ public void testFailover(Class<? extends RELAY> cl) throws Exception {
() -> ((RELAY)d.getProtocolStack().findProtocol(RELAY.class)).isSiteMaster());
MyReceiver<Message> r=getReceiver(d);
Util.waitUntil(2000, 200, () -> r.size() == 1);

Table<Message> send_win=unicast.getSendWindow(target);
// wait until the ack from D has been received
Util.waitUntil(2000, 100, () -> send_win.getHighestDelivered() == send_win.getHighestReceived());
relay.setRouteStatusListener(null);
}

/** Same as above, but now DROP is added to the bridge stack between "lon" and "nyc" */
Expand Down Expand Up @@ -847,23 +857,7 @@ protected void _testFailoverSiteDown(Class<? extends RELAY> cl, Supplier<Address

DROP drop=new DROP(); // drops unicast message from _A:lon -> _C:nyc (site master)
RELAY relay=a.getProtocolStack().findProtocol(RELAY.class);

relay.setRouteStatusListener(new RouteStatusListener() {
@Override
public void sitesUp(String... sites) {
System.out.printf("** site-up(%s)\n", Arrays.toString(sites));
}

@Override
public void sitesDown(String... sites) {
System.out.printf("** site-down(%s)\n", Arrays.toString(sites));
}

@Override
public void sitesUnreachable(String... sites) {
System.out.printf("** sites-unreachable(%s)\n", Arrays.toString(sites));
}
});
relay.setRouteStatusListener(new DefaultRouteStatusListener(() -> relay.addr()).verbose(true));

JChannel bridge=relay.getBridge(NYC);
bridge.getProtocolStack().insertProtocol(drop, ProtocolStack.Position.BELOW, UNICAST3.class);
Expand All @@ -879,6 +873,7 @@ public void sitesUnreachable(String... sites) {
drop.clearDownFilters();
UNICAST3 unicast=a.getProtocolStack().findProtocol(UNICAST3.class);
Table<Message> send_win=unicast.getSendWindow(target);
relay.setRouteStatusListener(null);
// check if there is still retransmission going on in A
Util.waitUntil(5000, 1000, () -> {
long highest_acked=send_win.getHighestDelivered(); // highest delivered == highest ack (sender win)
Expand Down
Loading

0 comments on commit 0054a75

Please sign in to comment.